001 /*
002 Copyright (c) 1996-2011, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029 package org.hd.d.pg2k.svrCore.vars;
030
031 import java.io.IOException;
032 import java.io.InterruptedIOException;
033 import java.util.ArrayDeque;
034 import java.util.ArrayList;
035 import java.util.BitSet;
036 import java.util.Deque;
037 import java.util.EnumSet;
038 import java.util.Hashtable;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.concurrent.Callable;
043 import java.util.concurrent.ConcurrentHashMap;
044 import java.util.concurrent.ConcurrentMap;
045 import java.util.concurrent.ExecutionException;
046 import java.util.concurrent.Future;
047 import java.util.concurrent.TimeUnit;
048 import java.util.concurrent.TimeoutException;
049 import java.util.concurrent.locks.ReentrantLock;
050
051 import org.hd.d.pg2k.svrCore.CoreConsts;
052 import org.hd.d.pg2k.svrCore.GenUtils;
053 import org.hd.d.pg2k.svrCore.Rnd;
054 import org.hd.d.pg2k.svrCore.ThreadUtils;
055 import org.hd.d.pg2k.svrCore.Tuple;
056 import org.hd.d.pg2k.svrCore.props.LocalProps;
057
058 import ORG.hd.d.IsDebug;
059
060
061 /**Class to "manage" variables for an element of a SimpleExhibitPipelineIF.
062 * Caches values to minimise read (and optionally write) activity.
063 * <p>
064 * Always maintains a local store of variables,
065 * and normally sync() needs to be called periodically
066 * to merge with upstream values.
067 * <p>
068 * Typical places to inject one of these is in a caching stage,
069 * or at pipeline or tunnel end-points.
070 * <p>
071 * A non-write-though instance has the effect of grouping multiple
072 * setVariable() and setVariables() operations into a single
073 * setVariables() upstream action when sync() is invoked.
074 * This may still write-though in some circumstances.
075 * <p>
076 * In no case does a getVariable() or getVariables() operation
077 * directly cause a request to go upstream,
078 * (thus helping to ensure that get operations are fast even if there
079 * are I/O problems (etc) upstream),
080 * but rather the set of values is
081 * updated periodically with a single getVariables() operation.
082 * <p>
083 * FIXME: getEventValues() avoids, where possible, blocking indefinitely,
084 * and may return non-authoritative data instead,
085 * though may not be able to avoid such blocking in all circumstances.
086 * <p>
087 * Thread-safe; does not hold any externally-visible locks.
088 * <p>
089 * This does not count as an "end-point", ie is never the master data store.
090 */
091 public class PipelineVarMgr extends BasicVarMgr
092 implements BasicVarMgrInterface,
093 SimpleVariablePipelineIF
094 {
095 /**If true, always use getVariables(-1) to get all values from upstream.
096 * Inefficient but very robust; generally not the right thing to do though.
097 */
098 private static final boolean ALWAYS_FETCH_ALL_UPSTREAM_VALUES = false;
099
100 /**Create a default PipelineVarMgr instance.
101 * Defaults to write-through (ie read caching only),
102 * because that is "safer" behaviour.
103 *
104 * @param upstream upstream source of data and variable values;
105 * never null
106 */
107 public PipelineVarMgr(final SimpleVariablePipelineIF upstream)
108 {
109 this(upstream, true); // Write-through.
110 }
111
112 /**Creates a customised VarMgr instance.
113 * May try to get the local system ID immediately from upstream,
114 * so that it is available for creating globals.
115 *
116 * @param upstream upstream source of data and variable values;
117 * never null
118 * @param writeThrough if true, all set operations are propagated
119 * downstream immediately
120 */
121 public PipelineVarMgr(final SimpleVariablePipelineIF upstream,
122 final boolean writeThrough)
123 {
124 super(false, false);
125 assert(!super.isEndPoint()); // This is not an end point.
126 assert(!super.dontFilterDups); // Bad timestamps and replays must be being checked for.
127
128 if(upstream == null)
129 { throw new IllegalArgumentException(); }
130
131 // If this cache is write-through then we don't need an outbound store.
132 outboundUpdates = writeThrough ? null : new Hashtable<SimpleVariableDefinition, SimpleVariableValue>();
133 earlyUpdates = writeThrough ? null : new ArrayList<SimpleVariableValue>();
134 source = upstream;
135
136 if(GET_SYSID_IN_CONS)
137 {
138 // Get the local system ID from upstream
139 // and save it since we shall need it when setting globals.
140 try
141 {
142 final SimpleVariableValue svv =
143 upstream.getVariable(SystemVariables.LOCAL_SYS_ID);
144 if(svv == null)
145 { throw new IllegalStateException("unable to fetch system ID from upstream"); }
146 super._setVariable(svv, true);
147 assert(null != super.getVariable(SystemVariables.LOCAL_SYS_ID));
148 }
149 catch(final IOException e)
150 {
151 // Ignore (but log) error; hope to get value later...
152 e.printStackTrace();
153 }
154 }
155 }
156
157 /**If true, try in the constructor to get the local system ID from upstream.
158 * This helps us build globalMaps,
159 * and also ensures that the variable is available to clients downstream.
160 */
161 private final static boolean GET_SYSID_IN_CONS = true;
162
163 /**Upstream source of variable values; never null. */
164 private final SimpleVariablePipelineIF source;
165
166 /**Returns true if this is a write-through cache. */
167 public boolean isWriteThrough()
168 { return(outboundUpdates == null); }
169
170 /**Cached outbound variable value updates to be sent upstream.
171 * Will not be used (and will be null) if in write-through mode.
172 * <p>
173 * Used by:
174 * <ul>
175 * <li>setVariable() (to add values)
176 * <li>sync() to push accumulated values upstream
177 * </ul>
178 * <p>
179 * Variable updates are held in this table and periodically flushed
180 * upstream in a block. If multiple updates to a single variable
181 * occur between flushes, only the last one will be retained,
182 * (the others overwritten) thus reducing traffic.
183 * These may be re-ordered with respect to one another when flushed.
184 * <p>
185 * Note however that one value cannot be overwritten by another if
186 * they have different globalMap keys.
187 * This can only really happen on the master when simultaneous updates
188 * arrive from two different slaves for the same global value.
189 * In this case, we force the previous value upstream immediately.
190 * <p>
191 * A lock must be held on this object to do multiple operations atomically,
192 * but no other visible lock must be grabbed inside a lock on this.
193 * <p>
194 * Hashtable from SimpleVariableDefinition to SimpleVariableValue.
195 * <p>
196 * A Hashtable is used for thread-safety.
197 * <p>
198 * Never null.
199 */
200 private final Hashtable<SimpleVariableDefinition,SimpleVariableValue> outboundUpdates;
201
202 /**A List of non-coalesced variable set operations to be done first.
203 * These are variables that could not be coalesced for whatever reason,
204 * and must be passed upstream ahead of anything in outboundUpdates.
205 * <p>
206 * This will be non-null when outboundUpdates is.
207 * <p>
208 * The content of this List must only be accessed in the scope of a lock
209 * on outboundUpdates, so this List implementation need not be inherently
210 * thread-safe (eg ArrayList can (and should) be used instead of Vector).
211 */
212 private final List<SimpleVariableValue> earlyUpdates;
213
214
215 /**Last time we successfully fetched variable values from upstream; initially null.
216 * Declared as volatile so allow safe access without a lock.
217 */
218 private volatile Long lastFetch;
219
220 /**Set a single variable value.
221 * If writing though,
222 * then set the value upstream immediately,
223 * else simply put it in our "pending" list waiting for sync().
224 *
225 * @param newValue new variable value to set; never null
226 * @throws java.io.IOException in case of difficulty setting the value upstream
227 */
228 @Override
229 public void setVariable(final SimpleVariableValue newValue)
230 throws IOException
231 {
232 // Store in our local cache before doing anything else...
233 super._setVariable(newValue, false);
234
235 // Since we seem to have set the value OK
236 // (so it type-checked, for example),
237 // add to outgoing update list immediately,
238 // or write-though if appropriate.
239
240 // Write-though.
241 if(outboundUpdates == null)
242 {
243 source.setVariable(newValue);
244 return;
245 }
246
247 // Accumulate and wait for sync().
248 // Attempt to merge redundant (non-event) operations for efficiency.
249 synchronized(outboundUpdates)
250 {
251 // But for now we never coalesce events.
252 final SimpleVariableDefinition def = newValue.getDef();
253 if(def.isEvent())
254 {
255 earlyUpdates.add(newValue);
256 return;
257 }
258
259 // Note however that one value cannot be overwritten by another
260 // iff they have different globalMap keys (or null-ness).
261 // This happens on the master when simultaneous updates
262 // arrive from two different slaves for the same global value.
263 // In this case, we force the previous value upstream now.
264 final SimpleVariableValue prevHeld =
265 outboundUpdates.put(def, newValue);
266
267 // Have to handle coalescing if there was a previous value
268 // for this variable.
269 if(prevHeld != null)
270 {
271 // Whoops, may conflict...
272 // If global maps have different keys, or null-ness,
273 // then they do conflict,
274 // and we must flush the previous value
275 // into the earlyUpdates list to go first,
276 // else we can discard the previous version.
277 final Map nGM = newValue.getGlobalMap();
278 final Map pGM = prevHeld.getGlobalMap();
279 final boolean differentMapKeys = (nGM == null) ?
280 (pGM != null) :
281 ((pGM == null) || !nGM.keySet().equals(pGM.keySet()));
282 if(differentMapKeys)
283 { earlyUpdates.add(prevHeld); }
284 }
285 }
286 }
287
288 /**Set several variable values.
289 * This is equivalent to set series of calls to setVariables()
290 * unless in write-through mode where a single setVariables()
291 * is done upstream for efficiency.
292 *
293 * @return number of values successfully set, at least locally...
294 * @param newValues new variables values to set; never null
295 * @throws java.io.IOException in case of difficulty setting the values upstream
296 */
297 @Override
298 public int setVariables(final SimpleVariableValue newValues[])
299 throws IOException
300 {
301 if(newValues == null)
302 { throw new IllegalArgumentException(); }
303
304 if(outboundUpdates != null)
305 {
306 // If not writing-through,
307 // then put in "outbound" list,
308 // being careful to preserve order,
309 // but eliminating duplicates.
310 final int nvLen = newValues.length;
311 for(int i = 0; i < nvLen; ++i)
312 { setVariable(newValues[i]); }
313 return(newValues.length);
314 }
315 else
316 {
317 // If writing through,
318 // stick to single setVariables() call for efficiency.
319 super.setVariables(newValues);
320 return(source.setVariables(newValues));
321 }
322 }
323
324
325 /**Private lock to serialise calls on sync(). */
326 private final Object _sync_lock = new Object();
327
328 /**Maximum number of set/update values to send upstream in one go; strictly positive.
329 * This limit is here to avoid vast on-the-wire messages straining sender and receiver,
330 * especially in the case where the receiver has been down for a while
331 * and so we have accumulated many messages.
332 * <p>
333 * More messages in each batch may be more efficient in terms of
334 * message overhead, inter-value compression, etc.
335 */
336 private static final int MAX_UPDATE_BATCH_COUNT = 256;
337
338 /**Synchronise variables with upstream values.
339 * We push updated values upstream to the source,
340 * call sync on the source if this sync is "forced",
341 * and then retrieve changed values from upstream.
342 * <p>
343 * We remember when we last did this and request any changes since then,
344 * except on the first call or if called with force==true,
345 * in which case all values are retrieved.
346 * <p>
347 * To allow for (varying) clock skew between different server instances,
348 * we actually ask for somewhat older values than our last poll,
349 * which will result in some redundant traffic.
350 * <p>
351 * Holds no externally-visible locks,
352 * and does not hold locks while communicating upstream
353 * (so can handle set/get operations even if upstream is slow/blocking),
354 * but if called by multiple threads this will safely serialise the calls.
355 *
356 * @param force if true, this will force a full sync on the read side
357 * by using getVariables(-1) rather than attempting to choose a
358 * nearer timestamp for efficiency;
359 * the implementation is at liberty to use getVariables(-1)
360 * at any time whatever the argument value,
361 * and almost certainly should use it on the first call
362 *
363 * @throws java.io.IOException if one is received from upstream
364 */
365 public void syncVariables(final boolean force)
366 throws IOException
367 {
368 //if(isDebugMode) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+").]"); }
369
370 // Serialise calls to sync() to avoid out-of-order updates...
371 synchronized(_sync_lock)
372 {
373 // Push locally-cached updates upstream,
374 // if there are any pending,
375 // and clear those pending values.
376 if(outboundUpdates != null)
377 {
378 // Get any earlyUpdates (first)
379 // then any other outboundUpdates,
380 // and assemble them into a single update to send upstream.
381 // Upon failure, put them at the start of the (early) list to resend
382 // in case new items have arrived.
383 //
384 // We *do not* hold the lock while communicating upstream.
385 Deque<SimpleVariableValue> toGoUpstreamAll = null;
386 synchronized(outboundUpdates)
387 {
388 // Atomically retrieve current pending updates, if any,
389 // and clear the pending updates.
390 // We can put them back afterwards if we have to, sort of.
391 final int size = earlyUpdates.size() + outboundUpdates.size();
392 if(IsDebug.isDebug && (size > 0)) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+"): outboundUpdates.size()="+size+".]"); }
393 //if(isDebugMode && (size > 0)) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+"): outboundUpdates.size()="+size+".]"); }
394 if(size > 0)
395 {
396 // Assemble updates in order, earlyUpdates first,
397 // ready to send upstream.
398 toGoUpstreamAll = new ArrayDeque<SimpleVariableValue>(size);
399 toGoUpstreamAll.addAll(earlyUpdates);
400 toGoUpstreamAll.addAll(outboundUpdates.values());
401
402 // Clear the originals...
403 earlyUpdates.clear();
404 outboundUpdates.clear();
405 }
406 }
407
408 if(toGoUpstreamAll != null)
409 {
410 // Attempt to send the outstanding values upstream...
411 // However, if this fails due to an I/O problem,
412 // then try to reinsert any not-yet-overwritten values
413 // at the start of the earlyUpdates
414 // so that they will go before any later arrivals.
415 try
416 {
417 // We try to send updates in small-ish batches
418 // to avoid blowing up the recipient if we have accumulated many.
419 while(!toGoUpstreamAll.isEmpty())
420 {
421 // Extract the first updates and try to send them.
422 final int toSend = Math.min(toGoUpstreamAll.size(), MAX_UPDATE_BATCH_COUNT);
423 final SimpleVariableValue[] updates = new SimpleVariableValue[toSend];
424 // Copy (not remove) the first 'toSend' updates to the array...
425 final Iterator<SimpleVariableValue> iterator = toGoUpstreamAll.iterator();
426 for(int i = 0; i < toSend; ++i)
427 { updates[i] = iterator.next(); }
428 // Try to send upstream...
429 if(IsDebug.isDebug) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sending batch of updates upstream: "+toSend+"/"+toGoUpstreamAll.size()+".]"); }
430 source.setVariables(updates);
431 // If that was completely successful
432 // then remove the initial items from the start of the list.
433 // TODO: do this in a block to overcome the inefficiency of many calls.
434 for(int i = 0; i < toSend; ++i)
435 { toGoUpstreamAll.removeFirst(); }
436 }
437 }
438 catch(final IOException e)
439 {
440 // In case of error,
441 // requeue anything that may not have been successfully received
442 // and is not too old for the receiver to handle
443 // to keep our queue size-bounded.
444
445 // Filter out anything too old.
446 final long now = System.currentTimeMillis();
447 final long tooOld = now - BasicVarMgr.MAX_APPARENT_MESSAGE_AGE_MS;
448 // Ordinary non-event values can be dumped sooner...
449 final long tooOldNonEvent = now - 101 - 2*SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS;
450 // TODO: maybe make sure that we don't somehow have anything (too far) in the future?
451
452 final List<SimpleVariableValue> toGoUpstreamFiltered =
453 new ArrayList<SimpleVariableValue>(toGoUpstreamAll.size());
454 for(final SimpleVariableValue svv : toGoUpstreamAll)
455 {
456 if(svv == null) { continue; }
457 if(svv.getTimestamp() < tooOld) { continue; }
458 if(!svv.getDef().isEvent() && (svv.getTimestamp() < tooOldNonEvent)) { continue; }
459 toGoUpstreamFiltered.add(svv);
460 }
461
462 if(IsDebug.isDebug) { System.err.println("WARNING: PipelineVarMgr: values to resend up stream: " + toGoUpstreamFiltered.size()); }
463
464 // Protect access to earlyUpdates...
465 synchronized(outboundUpdates)
466 {
467 // Insert the not-too-old items in order at the start...
468 earlyUpdates.addAll(0, toGoUpstreamFiltered);
469 }
470 //if(isDebugMode) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") got IOException.]"); }
471 //if(isDebugMode) { e.printStackTrace(); }
472 throw e; // Rethrow the error...
473 }
474 }
475 }
476
477 // Now force any changes all the way to the master's end-point
478 // iff this is a "forced" sync().
479 if(force) { source.syncVariables(true); }
480
481 //if(isDebugMode) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") about to get...]"); }
482
483 // Now retrieve any values from downstream.
484 // For the moment we'll assume that our local clock,
485 // plus the cache hold time used as a skew corrector,
486 // will ensure that we don't miss anything
487 // at the cost of a few wasted/redundant transfers,
488 // though:
489 // * on the first run, or
490 // * if "force"d, or
491 // * we got nothing on our last poll,
492 // then we fetch everything with a getVariables(-1).
493 final Long lastTime = lastFetch; // Fetch value just once.
494 final boolean firstRun = (lastTime == null);
495 final boolean getAll = (ALWAYS_FETCH_ALL_UPSTREAM_VALUES || force || firstRun);
496 final SimpleVariableValue[] newVals =
497 source.getVariables(getAll ? -1 :
498 (lastTime.longValue() - CoreConsts.MAX_PEER_CLOCK_SKEW_MS - 1));
499 // Apply the values fetched...
500 // (For safety, we discard any that appear to be invalid.)
501 for(int i = newVals.length; --i >= 0; )
502 {
503 final SimpleVariableValue svv = newVals[i];
504 final SimpleVariableDefinition def = svv.getDef();
505 if(!SystemVariables.defs.contains(def))
506 {
507 System.err.println("PipelineVarMgr: ERROR: upstream variable value rejected: " + svv.getDef());
508 continue;
509 }
510
511 // Store the new value in our local cache,
512 // but don't have it propagate back upstream!
513 // Note that it is from upstream,
514 // so read-only values *can* be set in our cache.
515 super._setVariable(svv, true);
516 }
517
518
519 // If we have any deferred requests for current/all interval events,
520 // then attempt to fetch and store/cache one at random now
521 // in the hope of being able to satisfy the next request for it.
522 // If something goes wrong and we "lose" one of these
523 // deferred requests, that is OK, since we made no promises
524 // about the timeliness of current-interval data.
525 for(final boolean current : new boolean[]{ false, true } )
526 {
527 final Tuple.Pair<SimpleVariableDefinition, EventPeriod> deferred =
528 _getAndClearDeferredRequestForInterval(current);
529 if(deferred != null)
530 {
531 // Current interval, which we never expect to be authoritative...
532 final long currentInterval =
533 deferred.second.getIntervalNumber(System.currentTimeMillis());
534
535 final BitSet justTheOne = new BitSet(1);
536 justTheOne.set(0); // Just get the one current value set.
537 final EventVariableValue upstreamValues[] =
538 source.getEventValues(deferred.first, deferred.second,
539 current ? currentInterval : 0,
540 justTheOne);
541
542 // Save the fetched value for the next request!
543 if(upstreamValues.length > 0)
544 {
545 final EventVariableValue evv = upstreamValues[0];
546 if(evv != null)
547 {
548 if(IsDebug.isDebug) { System.out.println("[Post-fetched "+(current?"current":"all")+"-interval data for " + deferred + ".]"); }
549 super.setEventValue(evv);
550 }
551 }
552 }
553 }
554
555 // Consider a full fetch if there were no updated values returned.
556 lastFetch = ((newVals.length == 0) &&
557 (0 == Rnd.fastRnd.nextInt(3 + ((GenUtils.mustConservePower()?171:101) * LocalProps.getServerSlowdownFactor())))) ?
558 null : new Long(System.currentTimeMillis());
559 }
560
561 //if(isDebugMode) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") FINISHED]"); }
562 }
563
564
565 /**If true then generally don't go upstream (synchronously) just to fetch values for the current (or all) interval; respond from local store.
566 * We may nonetheless synchronously request current-interval data from upstream:
567 * <ul>
568 * <li>when asking for something else in the same request,
569 * <li>asynchronously when a particular event/period combination
570 * has been asked for recently,
571 * <li>when we have nothing cached and would have to return empty-handed to the user,
572 * </ul>
573 * thus only making for a marginal extra cost.
574 * <p>
575 * If true, there is no guarantee that the "current" data reflects more than
576 * locally-originated events or will be hugely up-to-date,
577 * though the master's main copy will be.
578 */
579 private static final boolean DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY = true;
580
581 /**Set of "current" interval event-values requested that we declined to fetch synchronously.
582 * We clear these as we asynchronously fetch the requested values.
583 * <p>
584 * A map from event variable definitions to
585 * a set of any deferred current-interval requests for that set
586 * (or to null if none have ever been deferred for that event type).
587 * <p>
588 * A Hashtable is used to guarantee basic thread safety.
589 * All manipulation of the contents of the table,
590 * and in particular of the non-threadsafe EnumSet values,
591 * must be done holding a lock on the table instance.
592 */
593 private final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> _deferredCIRequests =
594 new Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>>();
595
596 /**Set of "all" interval event-values requested that we declined to fetch synchronously.
597 * We clear these as we asynchronously fetch the requested values.
598 * <p>
599 * A map from event variable definitions to
600 * a set of any deferred all-interval requests for that set
601 * (or to null if none have ever been deferred for that event type).
602 * <p>
603 * A Hashtable is used to guarantee basic thread safety.
604 * All manipulation of the contents of the table,
605 * and in particular of the non-threadsafe EnumSet values,
606 * must be done holding a lock on the table instance.
607 */
608 private final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> _deferredAIRequests =
609 new Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>>();
610
611 /**Class definining all details of a deferred request.
612 * This comprises:
613 * <ul>
614 * <li>Event name/definition,
615 * <li>EventPeriod,
616 * <li>The all/current flag.
617 * </ul>
618 * <p>
619 * Equality depends on all fields.
620 */
621 private final class NextRequestKey
622 {
623 /**Event definition; never null. */
624 final SimpleVariableDefinition def;
625
626 /**Event period; never null. */
627 final EventPeriod p;
628
629 /**True if "all", else false for "current". */
630 final boolean isAll;
631
632 public NextRequestKey(final SimpleVariableDefinition def,
633 final EventPeriod p,
634 final boolean isAll)
635 {
636 this.def = def;
637 this.p = p;
638 this.isAll = isAll;
639
640 assert(p != null);
641 assert((def != null) && def.isEvent());
642 assert((def.getEvPeriodSubset() == null) ||
643 def.getEvPeriodSubset().contains(p));
644 }
645
646 /**The hash depends on all fields. */
647 @Override
648 public int hashCode()
649 { return(def.hashCode() ^ (isAll ? p.ordinal() : p.name().hashCode())); }
650
651 /**Equality depends on all fields. */
652 @Override
653 public boolean equals(final Object obj)
654 {
655 if(!(obj instanceof NextRequestKey)) { return(false); }
656 final NextRequestKey other = (NextRequestKey) obj;
657 if(isAll != other.isAll) { return(false); }
658 if(!p.equals(other.p)) { return(false); }
659 if(!def.equals(other.def)) { return(false); }
660 return(true);
661 }
662 }
663
664 /**Next permitted (deferred) request for a given all/current period data set.
665 * We defer repeated requests for the same item until either:
666 * <ul>
667 * <li>The start of the next interval for that item.
668 * <li>A reasonable fraction of the period for that item
669 * has elapsed since the last request for it.
670 * </ul>
671 * <p>
672 * This means that we limit the cost of repeated requests for the same item
673 * in such a way that we still have a reasonable chance of seeing
674 * intra-interval updates.
675 * <p>
676 * The table is thread-safe and parameterised for quick, concurrent access.
677 */
678 private final Map<NextRequestKey,Long> _nextRequestNotBefore =
679 new ConcurrentHashMap<NextRequestKey, Long>(41, 0.7f, 4);
680
681 /**Note that we have deferred a request for the given event and period.
682 *
683 * @param currentInterval true for "current" interval,
684 * false for "all" interval
685 * @param def non-null event definition
686 * @param intervalSelector non-null interval/period value
687 */
688 private final void _noteDeferredRequestForInterval(final boolean currentInterval,
689 final SimpleVariableDefinition def,
690 final EventPeriod intervalSelector)
691 {
692 if((def == null) || !def.isEvent() ||
693 (intervalSelector == null))
694 { throw new IllegalArgumentException(); }
695
696 // Caller must be asking for valid period for this event type.
697 assert((def.getEvPeriodSubset() == null) ||
698 def.getEvPeriodSubset().contains(intervalSelector));
699
700
701 // We may ignore rapid repeated requests for the same item,
702 // if the repeat is within a small fraction of the period.
703 final NextRequestKey key = new NextRequestKey(def, intervalSelector, !currentInterval);
704 final Long notBefore = _nextRequestNotBefore.get(key);
705 // Return if too early for another request for this value yet.
706 final long now = System.currentTimeMillis();
707 if((notBefore != null) && (notBefore.longValue() > now))
708 { return; }
709
710 // Compute/record when it would next be OK to ask for an update.
711 // At most we put this off by a fraction of our temporal slackness,
712 // and as a minimum a substantial fraction of our sysvar latency.
713 final long nextNotBefore = now +
714 Math.min(CoreConsts.DEFAULT_TEMPORAL_SLACKNESS_S * 249,
715 intervalSelector.getIntervalMs() >>> 4) +
716 Rnd.fastRnd.nextInt(SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS/2);
717 _nextRequestNotBefore.put(key, new Long(nextNotBefore));
718
719
720 // Select which deferred set we are working with...
721 final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> diSet =
722 currentInterval ? _deferredCIRequests : _deferredAIRequests;
723
724 // Hold a lock briefly while we do our work.
725 synchronized(diSet)
726 {
727 // Get any existing Set of values.
728 final EnumSet<EventPeriod> s = diSet.get(def);
729 // If none then create and store a new Set with just this value.
730 if(s == null)
731 {
732 diSet.put(def, EnumSet.of(intervalSelector));
733 return;
734 }
735 // If non-null then just add this bit to the Set already held.
736 s.add(intervalSelector);
737 }
738 }
739
740 /**Select, remove and return at random one of the deferred current/all interval requests; null if none.
741 * This is an atomic fetch-and-clear operation,
742 * designed to be used in association with
743 * _noteDeferredRequestForInterval().
744 *
745 * @param currentInterval true for "current" interval,
746 * false for "all" interval
747 * @return null or a pair with both elements valid and non-null
748 */
749 private Tuple.Pair<SimpleVariableDefinition, EventPeriod> _getAndClearDeferredRequestForInterval(
750 final boolean currentInterval)
751 {
752 // Select which deferred set we are working with...
753 final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> diSet =
754 currentInterval ? _deferredCIRequests : _deferredAIRequests;
755
756 // Hold a lock briefly while we do our work.
757 synchronized(diSet)
758 {
759 // Return null immediately if no deferred requests pending.
760 final int n = diSet.size();
761 if(n == 0) { return(null); }
762
763 // Pick one of the event types at random.
764 final SimpleVariableDefinition defs[] = new SimpleVariableDefinition[n];
765 diSet.keySet().toArray(defs);
766 final SimpleVariableDefinition def =
767 defs[(n == 1) ? 0 : Rnd.fastRnd.nextInt(n)];
768
769 // The def should be a valid event type.
770 assert(def != null);
771 assert(def.isEvent());
772
773 // Get the Set of deferred requests for this event type.
774 final EnumSet<EventPeriod> s = diSet.get(def);
775
776 // The Set should be non-null and non-empty.
777 assert(s != null);
778 assert(!s.isEmpty());
779
780 // Pick one of the pending iterval/period values at random.
781 final int m = s.size();
782 final EventPeriod eps[] = new EventPeriod[m];
783 s.toArray(eps);
784 final EventPeriod ep =
785 eps[(m == 1) ? 0 : Rnd.fastRnd.nextInt(m)];
786
787 // We should have chosen a valid interval.
788 assert((def.getEvPeriodSubset() == null) ||
789 def.getEvPeriodSubset().contains(ep));
790
791 // Compose our result.
792 final Tuple.Pair<SimpleVariableDefinition, EventPeriod> result =
793 new Tuple.Pair<SimpleVariableDefinition, EventPeriod>(def, ep);
794
795 // Now remove our pending value,
796 // and indeed the whole Set if now empty.
797 if(m == 1) { diSet.remove(def); }
798 else { s.remove(ep); }
799
800 return(result);
801 }
802 }
803
804 /**Locks to prevent redundant concurrent fetches/updates of the same event type from upstream in getEventValues().
805 * The aim of this is to avoid making many fruitless upstream accesses
806 * over (for example) a slow or connection-limited medium such as an HTTP tunnel.
807 * <p>
808 * Locking at the per-def level probably admits almost all potential useful concurrency
809 * and is relatively simple.
810 * <p>
811 * We allow for a limited amount of concurrency in writing
812 * but we mainly use ConcurrentHashMap for thread-safety, read concurrency and putIfAbsent().
813 */
814 private final ConcurrentMap<SimpleVariableDefinition, ReentrantLock> _getEventValues_locks =
815 new ConcurrentHashMap<SimpleVariableDefinition, ReentrantLock>(2*SystemVariables.defs.size(), 0.5f, 4);
816
817 /**Get event values; never null.
818 * We get what we can from our local store,
819 * then go upstream to fill in any unfilled requested slots
820 * for which we did not have authoritative data
821 * (and then attempt to cache them locally for next time).
822 * However, if unable to fetch authoritative data (quickly)
823 * then we will return null or non-authoritative data as appropriate for each slot;
824 * this does not throw IOException.
825 * <p>
826 * We treat the 'current' and 'all' intervals specially
827 * since upstream will never offer us an authoritative value
828 * for the current slot, only for older completed ones.
829 * We get requested values asynchronously,
830 * to be reasonably up-to-date for the <em>next</em> request.
831 * <p>
832 * If a request for a "local" event upstream fails with an IllegalArgumentException
833 * then we assume that upstream of us is a tunnel which local values don't cross
834 * so we simply report whatever value we have to hand.
835 *
836 * @param def non-null event-variable definition
837 * @param intervalSelector non-null valid interval for specified event
838 * @param whichValues each true bit represents a slot for which data is
839 * required, bit 0 indicating data from the slot within which
840 * firstIntervalTime is located, bit 1 the previous slot, etc;
841 * null is treated as the common case equivalent to just bit 0 set
842 *
843 * @throws IOException if we have difficulty getting values from upstream
844 * @throws InterruptedIOException if interrupted or if blocking for too long.
845 */
846 @Override
847 public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
848 final EventPeriod intervalSelector,
849 final long intervalNumber,
850 final BitSet whichValues)
851 {
852 //System.out.println("PipelineVarMgr.getEventValues("+def+", "+intervalSelector+", "+intervalNumber+", "+whichValues+")");
853
854 // Current interval number (never authoritative).
855 final long currentInterval = intervalSelector.getIntervalNumber(System.currentTimeMillis());
856
857 // Optimisation fast-path for common/simple case:
858 // the single requested slot/value is available locally and is authoritative.
859 // If available from local store then we don't need to use our locks/throttling,
860 // else we fall through for our more general case.
861 final boolean nullWV = (whichValues == null);
862 if(nullWV ||
863 ((whichValues.cardinality() == 1) && whichValues.get(0)))
864 {
865 // Don't try to satisfy requests for the current/all slots here...
866 if((intervalNumber != 0) && (intervalNumber != currentInterval))
867 {
868 // Get what we can from local store.
869 final EventVariableValue[] result =
870 super.getEventValues(def, intervalSelector, intervalNumber, null);
871
872 if((result.length != 0) &&
873 (result[0] != null) &&
874 (result[0].isAuthoritative()))
875 { return(result); /* Good, local cached value does the job. */ }
876 }
877 }
878
879 // Verify that this this def is an event and is genuine!
880 assert(def.isEvent());
881 assert(SystemVariables.defs.contains(def));
882
883 // First get what we can from our local store/cache.
884 // This call will validate the method arguments too.
885 EventVariableValue[] result =
886 super.getEventValues(def, intervalSelector, intervalNumber, whichValues);
887
888 // If any requested values are missing/null or non-authoritative,
889 // then try to go upstream for them (and then cache them locally).
890 BitSet wv = whichValues;
891 if(wv == null)
892 {
893 // Create synthetic BitSet for simplicity below.
894 wv = new BitSet(1);
895 wv.set(0);
896 }
897
898 // Note any "missing"/non-authoritative values.
899 final BitSet nonAuthValues = new BitSet();
900 int resultsPresent = 0;
901 for(int i = wv.nextSetBit(0); i >= 0; i = wv.nextSetBit(i+1))
902 {
903 final boolean absent = ((i >= result.length) || (result[i] == null));
904 if(!absent) { ++resultsPresent; }
905 if(absent || !result[i].isAuthoritative())
906 { nonAuthValues.set(i); }
907 }
908
909 // If the entire request has been satisfied from local cache
910 // then we can return immediately!
911 if(nonAuthValues.length() == 0)
912 { return(result); }
913
914 // Note if none of the request at all has been assembled from cache.
915 // In this case, we'll try harder to fetch values from upstream
916 // since we don't really want to return completely empty-handed.
917 final boolean noResultsFromCache = (resultsPresent == 0);
918
919 // If the only non-authoritative answer is
920 // in the "current" or "all" slot
921 // (it never would be authoritative anyway)
922 // then return now what we have locally cached/available.
923 // Don't bother with the expense of going upstream synchronously
924 // providing we have some sort of answer from cache
925 // but just note that the value was requested and deferred.
926 // We will go and fetch a fresh value soon, in the background.
927 if(DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY &&
928 (!noResultsFromCache) &&
929 (nonAuthValues.cardinality() == 1))
930 {
931 final int which = nonAuthValues.length()-1; // Index of the one bit that is set...
932
933 if(intervalNumber == 0)
934 {
935 _noteDeferredRequestForInterval(false, def, intervalSelector);
936 return(result);
937 }
938 else if(intervalNumber - which == currentInterval)
939 {
940 _noteDeferredRequestForInterval(true, def, intervalSelector);
941 return(result);
942 }
943 }
944
945 // THIS REQUEST WILL NEED TO GO UPSTREAM TO BE COMPLETELY SATISFIED.
946
947 // We allow at most one upstream access per def at once
948 // (through this front-end)
949 // to try to avoid overloading any tunnel or other choke-point.
950 // The lock is held around the underlying upstream call,
951 // even if that continues asynchronously out of sight from here.
952 // Get the unique lock for this event/def,
953 // with race-free create on first use.
954 ReentrantLock _getEventValues_lock;
955 while(null == (_getEventValues_lock = _getEventValues_locks.get(def)))
956 {
957 // Ensure no races for creation of a lock instance,
958 // ie at most one lock will ever be current for one def.
959 _getEventValues_locks.putIfAbsent(def, new ReentrantLock());
960 }
961 final ReentrantLock getEventValues_lock = _getEventValues_lock;
962 // Give up quickly if there is already a queue for the lock for this def,
963 // ie if we might well fail to get to make our call within a reasonable time.
964 if(getEventValues_lock.hasQueuedThreads())
965 {
966 return(result); // Return what we had collected so far, if anything.
967 }
968
969 // Make sure that the result value is large enough to accommodate
970 // all the results requested.
971 // If not then replace it with a large enough one and copy existing results in.
972 final int wvl = nullWV ? 1 : whichValues.length();
973 if(result.length < wvl)
974 {
975 final EventVariableValue newResult[] = new EventVariableValue[wvl];
976 System.arraycopy(result, 0, newResult, 0, result.length);
977 result = newResult;
978 }
979
980 //System.out.println("PipelineVarMgr.getEventValues() upstream request: " + nonAuthValues);
981
982 // Make an upstream request for the slots/intervals
983 // where our local store did not have authoritative answers.
984 final BitSet upstreamRequest = (BitSet) nonAuthValues.clone(); // Logically immutable...
985 // We make an effort to prevent this request blocking indefinitely
986 // (though if the thread pool fills then we end up running synchronously in this thread).
987 final Future<EventVariableValue[]> fetchedValues = ThreadUtils.nonCPUThreadPool.submit(new Callable<EventVariableValue[]>(){
988 public EventVariableValue[] call()
989 {
990 // Now try to grab the lock associated with this def,
991 // but we are only prepared to block for a very short while
992 // (short enough time not to annoy interactive users for example).
993 // If we can't get the lock quickly then we abort.
994 // Try harder if we'd otherwise have nothing at all to return to the original caller.
995 try
996 {
997 if(!getEventValues_lock.tryLock(1 + (noResultsFromCache ? (CoreConsts.MAX_INTERACTIVE_DELAY_MS*2) : (CoreConsts.MAX_INTERACTIVE_DELAY_MS/2)), TimeUnit.MILLISECONDS))
998 {
999 System.err.println("INFO: could not quickly acquire lock in PipelineVarMgr.getEventValues() for "+def);
1000 return(new EventVariableValue[0]); // Abort: return empty result.
1001 }
1002 }
1003 catch(final InterruptedException e)
1004 {
1005 return(new EventVariableValue[0]); // Abort: return empty result.
1006 }
1007
1008 try {
1009 // Get the data from upstream.
1010 final EventVariableValue[] upstreamValues = source.getEventValues(def, intervalSelector, intervalNumber, upstreamRequest);
1011
1012 // Attempt to cache results here to avoid simply discarding them
1013 // if the caller has to give up waiting for us
1014 // before we've finished the RPC or handling/cacheing the response.
1015 if(upstreamValues.length > 0)
1016 {
1017 // Offer the values that we requested from upstream to our local store
1018 // and then merge them into our initial response.
1019 for(int i = upstreamRequest.nextSetBit(0); i >= 0; i = upstreamRequest.nextSetBit(i+1))
1020 {
1021 if(i >= upstreamValues.length) { break; }
1022 final EventVariableValue evv = upstreamValues[i];
1023 // Skip missing responses.
1024 if(evv == null) { continue; }
1025 // Skip dubious claims to authoritative answers for the current interval or the future (!)
1026 // which might be the result of a race or clock skew, etc...
1027 if((evv.getIntervalNumber() >= currentInterval) && evv.isAuthoritative()) { continue; }
1028
1029 // Offer upstream value to the local store, ie cache it...
1030 /*super.*/setEventValue(evv);
1031 }
1032 }
1033
1034 return(upstreamValues); // Return values to hand back to the original caller.
1035 }
1036 finally { getEventValues_lock.unlock(); }
1037 }
1038 });
1039
1040 EventVariableValue upstreamValues[];
1041 // Spend a limited time (much longer than we waited for the lock) fetching from upstream.
1042 // We allow at least long enough for a round-the-world RTT to marshal and send the data.
1043 try { upstreamValues = fetchedValues.get(Math.max(CoreConsts.MAX_TYPICAL_RPC_RTT_MS, 2*CoreConsts.MAX_INTERACTIVE_DELAY_MS), TimeUnit.MILLISECONDS); }
1044 // If the upstream result was not ready in time then return what we already had.
1045 catch(final TimeoutException e)
1046 {
1047 // Need to log failures to allow some system tuning...
1048 System.err.println("WARNING: PipelineVarMgr timed out fetch of " + def + " reason: " + e.getMessage());
1049
1050 // Synchronously and without time limit retry a minimal subset of the original request
1051 // in case marshalling/transmission time (of the full request)
1052 // would always otherwise prevent completion.
1053 // This way we may make incremental progress.
1054 // Try for just the first (newest) item requested when we timed out.
1055 // Note that we do this without the normal cover of our per-def concurrency lock/limit.
1056 // TODO: Try a random subset in case of chronic difficulties with particular slots.
1057 if(upstreamRequest.length() > 1)
1058 {
1059 final int firstNonAuth = upstreamRequest.nextSetBit(0);
1060 assert(firstNonAuth >= 0);
1061 // Discard all but the first set bit.
1062 final BitSet upstreamRequestMin = (BitSet) upstreamRequest.clone();
1063 upstreamRequestMin.clear(firstNonAuth+1, upstreamRequestMin.length());
1064 assert(upstreamRequestMin.cardinality() == 1); // Should be exactly one bit set now.
1065 // Don't put a time limit on this to try to force/allow some progress.
1066 upstreamValues = source.getEventValues(def, intervalSelector, intervalNumber, upstreamRequestMin);
1067 // Make sure that this result, if any (and sane) is cached below...
1068 // Fall through to use whatever we managed to fetch this time.
1069 }
1070 // No subset is possible so return result collected so far.
1071 else { return(result); }
1072 }
1073 // If we were interrupted then return what we had.
1074 catch(final InterruptedException e)
1075 {
1076 Thread.currentThread().interrupt(); // Don't lose "interruptedness".
1077 return(result);
1078 }
1079 // Log unexpected error and return what we already had.
1080 catch(final ExecutionException e)
1081 {
1082 e.printStackTrace(); // Shouldn't happen, so log it.
1083 return(result);
1084 }
1085
1086 // Merge the upstream results into our initial response.
1087 for(int i = upstreamRequest.nextSetBit(0); i >= 0; i = upstreamRequest.nextSetBit(i+1))
1088 {
1089 if(i >= upstreamValues.length) { break; }
1090 final EventVariableValue evv = upstreamValues[i];
1091 // Skip missing responses.
1092 if(evv == null) { continue; }
1093 // Skip dubious claims to authoritative answers for the current interval or the future (!)
1094 // which might be the result of a race or clock skew, etc...
1095 if((evv.getIntervalNumber() >= currentInterval) && evv.isAuthoritative()) { continue; }
1096
1097 // Put upstream value into our response.
1098 result[i] = evv;
1099
1100 // Offer upstream value to the local store, ie cache it, in case not already done...
1101 super.setEventValue(evv);
1102 }
1103
1104 // OK, respond as well as we can...
1105 return(result);
1106 }
1107 }