001 /*
002 Copyright (c) 1996-2012, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029 package org.hd.d.pg2k.svrCore.vars;
030
031 import java.io.IOException;
032 import java.io.InterruptedIOException;
033 import java.util.ArrayDeque;
034 import java.util.ArrayList;
035 import java.util.BitSet;
036 import java.util.Deque;
037 import java.util.EnumSet;
038 import java.util.Hashtable;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.concurrent.Callable;
043 import java.util.concurrent.ConcurrentHashMap;
044 import java.util.concurrent.ConcurrentMap;
045 import java.util.concurrent.ExecutionException;
046 import java.util.concurrent.Future;
047 import java.util.concurrent.TimeUnit;
048 import java.util.concurrent.TimeoutException;
049 import java.util.concurrent.locks.ReentrantLock;
050
051 import org.hd.d.pg2k.svrCore.CoreConsts;
052 import org.hd.d.pg2k.svrCore.GenUtils;
053 import org.hd.d.pg2k.svrCore.Rnd;
054 import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
055 import org.hd.d.pg2k.svrCore.ThreadUtils;
056 import org.hd.d.pg2k.svrCore.Tuple;
057 import org.hd.d.pg2k.svrCore.props.LocalProps;
058
059 import ORG.hd.d.IsDebug;
060
061
062 /**Class to "manage" variables for an element of a SimpleExhibitPipelineIF.
063 * Caches values to minimise read (and optionally write) activity.
064 * <p>
065 * Always maintains a local store of variables,
066 * and normally sync() needs to be called periodically
067 * to merge with upstream values.
068 * <p>
069 * Typical places to inject one of these is in a caching stage,
070 * or at pipeline or tunnel end-points.
071 * <p>
072 * A non-write-though instance has the effect of grouping multiple
073 * setVariable() and setVariables() operations into a single
074 * setVariables() upstream action when sync() is invoked.
075 * This may still write-though in some circumstances.
076 * <p>
077 * In no case does a getVariable() or getVariables() operation
078 * directly cause a request to go upstream,
079 * (thus helping to ensure that get operations are fast even if there
080 * are I/O problems (etc) upstream),
081 * but rather the set of values is
082 * updated periodically with a single getVariables() operation.
083 * <p>
084 * FIXME: getEventValues() avoids, where possible, blocking indefinitely,
085 * and may return non-authoritative data instead,
086 * though may not be able to avoid such blocking in all circumstances.
087 * <p>
088 * Thread-safe; does not hold any externally-visible locks.
089 * <p>
090 * This does not count as an "end-point", ie is never the master data store.
091 */
092 public class PipelineVarMgr extends BasicVarMgr
093 implements BasicVarMgrInterface,
094 SimpleVariablePipelineIF
095 {
096 /**If true, always use getVariables(-1) to get all values from upstream.
097 * Inefficient but very robust; generally not the right thing to do though.
098 */
099 private static final boolean ALWAYS_FETCH_ALL_UPSTREAM_VALUES = false;
100
101 /**Create a default PipelineVarMgr instance.
102 * Defaults to write-through (ie read caching only),
103 * because that is "safer" behaviour.
104 * @param logger logger to write to; null if no logging to be done
105 * @param upstream upstream source of data and variable values;
106 * never null
107 */
108 public PipelineVarMgr(final SimpleLoggerIF logger,
109 final SimpleVariablePipelineIF upstream)
110 {
111 this(logger, upstream, true); // Write-through.
112 }
113
114 /**Creates a customised VarMgr instance.
115 * May try to get the local system ID immediately from upstream,
116 * so that it is available for creating globals.
117 * @param logger logger to write to; null if no logging to be done
118 * @param upstream upstream source of data and variable values;
119 * never null
120 * @param writeThrough if true, all set operations are propagated
121 * downstream immediately
122 */
123 public PipelineVarMgr(final SimpleLoggerIF logger,
124 final SimpleVariablePipelineIF upstream, final boolean writeThrough)
125 {
126 super(logger, false, false);
127 assert(!super.isEndPoint()); // This is not an end point.
128 assert(!super.dontFilterDups); // Bad timestamps and replays must be being checked for.
129
130 if(upstream == null)
131 { throw new IllegalArgumentException(); }
132
133 // If this cache is write-through then we don't need an outbound store.
134 outboundUpdates = writeThrough ? null : new Hashtable<SimpleVariableDefinition, SimpleVariableValue>();
135 earlyUpdates = writeThrough ? null : new ArrayList<SimpleVariableValue>();
136 source = upstream;
137
138 if(GET_SYSID_IN_CONS)
139 {
140 // Get the local system ID from upstream
141 // and save it since we shall need it when setting globals.
142 try
143 {
144 final SimpleVariableValue svv =
145 upstream.getVariable(SystemVariables.LOCAL_SYS_ID);
146 if(svv == null)
147 { throw new IllegalStateException("unable to fetch system ID from upstream"); }
148 super._setVariable(svv, true);
149 assert(null != super.getVariable(SystemVariables.LOCAL_SYS_ID));
150 }
151 catch(final IOException e)
152 {
153 // Ignore (but log) error; hope to get value later...
154 logger.log("unable to get system ID in cons", e);
155 }
156 }
157 }
158
159 /**If true, try in the constructor to get the local system ID from upstream.
160 * This helps us build globalMaps,
161 * and also ensures that the variable is available to clients downstream.
162 */
163 private final static boolean GET_SYSID_IN_CONS = true;
164
165 /**Upstream source of variable values; never null. */
166 private final SimpleVariablePipelineIF source;
167
168 /**Returns true if this is a write-through cache. */
169 public boolean isWriteThrough()
170 { return(outboundUpdates == null); }
171
172 /**Cached outbound variable value updates to be sent upstream.
173 * Will not be used (and will be null) if in write-through mode.
174 * <p>
175 * Used by:
176 * <ul>
177 * <li>setVariable() (to add values)
178 * <li>sync() to push accumulated values upstream
179 * </ul>
180 * <p>
181 * Variable updates are held in this table and periodically flushed
182 * upstream in a block. If multiple updates to a single variable
183 * occur between flushes, only the last one will be retained,
184 * (the others overwritten) thus reducing traffic.
185 * These may be re-ordered with respect to one another when flushed.
186 * <p>
187 * Note however that one value cannot be overwritten by another if
188 * they have different globalMap keys.
189 * This can only really happen on the master when simultaneous updates
190 * arrive from two different slaves for the same global value.
191 * In this case, we force the previous value upstream immediately.
192 * <p>
193 * A lock must be held on this object to do multiple operations atomically,
194 * but no other visible lock must be grabbed inside a lock on this.
195 * <p>
196 * Hashtable from SimpleVariableDefinition to SimpleVariableValue.
197 * <p>
198 * A Hashtable is used for thread-safety.
199 * <p>
200 * Never null.
201 */
202 private final Hashtable<SimpleVariableDefinition,SimpleVariableValue> outboundUpdates;
203
204 /**A List of non-coalesced variable set operations to be done first.
205 * These are variables that could not be coalesced for whatever reason,
206 * and must be passed upstream ahead of anything in outboundUpdates.
207 * <p>
208 * This will be non-null when outboundUpdates is.
209 * <p>
210 * The content of this List must only be accessed in the scope of a lock
211 * on outboundUpdates, so this List implementation need not be inherently
212 * thread-safe (eg ArrayList can (and should) be used instead of Vector).
213 */
214 private final List<SimpleVariableValue> earlyUpdates;
215
216
217 /**Last time we successfully fetched variable values from upstream; initially null.
218 * Declared as volatile so allow safe access without a lock.
219 */
220 private volatile Long lastFetch;
221
222 /**Set a single variable value.
223 * If writing though,
224 * then set the value upstream immediately,
225 * else simply put it in our "pending" list waiting for sync().
226 *
227 * @param newValue new variable value to set; never null
228 * @throws java.io.IOException in case of difficulty setting the value upstream
229 */
230 @Override
231 public void setVariable(final SimpleVariableValue newValue)
232 throws IOException
233 {
234 // Store in our local cache before doing anything else...
235 super._setVariable(newValue, false);
236
237 // Since we seem to have set the value OK
238 // (so it type-checked, for example),
239 // add to outgoing update list immediately,
240 // or write-though if appropriate.
241
242 // Write-though.
243 if(outboundUpdates == null)
244 {
245 source.setVariable(newValue);
246 return;
247 }
248
249 // Accumulate and wait for sync().
250 // Attempt to merge redundant (non-event) operations for efficiency.
251 synchronized(outboundUpdates)
252 {
253 // But for now we never coalesce events.
254 final SimpleVariableDefinition def = newValue.getDef();
255 if(def.isEvent())
256 {
257 earlyUpdates.add(newValue);
258 return;
259 }
260
261 // Note however that one value cannot be overwritten by another
262 // iff they have different globalMap keys (or null-ness).
263 // This happens on the master when simultaneous updates
264 // arrive from two different slaves for the same global value.
265 // In this case, we force the previous value upstream now.
266 final SimpleVariableValue prevHeld =
267 outboundUpdates.put(def, newValue);
268
269 // Have to handle coalescing if there was a previous value
270 // for this variable.
271 if(prevHeld != null)
272 {
273 // Whoops, may conflict...
274 // If global maps have different keys, or null-ness,
275 // then they do conflict,
276 // and we must flush the previous value
277 // into the earlyUpdates list to go first,
278 // else we can discard the previous version.
279 final Map<?,?> nGM = newValue.getGlobalMap();
280 final Map<?,?> pGM = prevHeld.getGlobalMap();
281 final boolean differentMapKeys = (nGM == null) ?
282 (pGM != null) :
283 ((pGM == null) || !nGM.keySet().equals(pGM.keySet()));
284 if(differentMapKeys)
285 { earlyUpdates.add(prevHeld); }
286 }
287 }
288 }
289
290 /**Set several variable values.
291 * This is equivalent to set series of calls to setVariables()
292 * unless in write-through mode where a single setVariables()
293 * is done upstream for efficiency.
294 *
295 * @return number of values successfully set, at least locally...
296 * @param newValues new variables values to set; never null
297 * @throws java.io.IOException in case of difficulty setting the values upstream
298 */
299 @Override
300 public int setVariables(final SimpleVariableValue newValues[])
301 throws IOException
302 {
303 if(newValues == null)
304 { throw new IllegalArgumentException(); }
305
306 if(outboundUpdates != null)
307 {
308 // If not writing-through,
309 // then put in "outbound" list,
310 // being careful to preserve order,
311 // but eliminating duplicates.
312 final int nvLen = newValues.length;
313 for(int i = 0; i < nvLen; ++i)
314 { setVariable(newValues[i]); }
315 return(newValues.length);
316 }
317 else
318 {
319 // If writing through,
320 // stick to single setVariables() call for efficiency.
321 super.setVariables(newValues);
322 return(source.setVariables(newValues));
323 }
324 }
325
326
327 /**Private lock to serialise calls on sync(). */
328 private final Object _sync_lock = new Object();
329
330 /**Maximum number of set/update values to send upstream in one go; strictly positive.
331 * This limit is here to avoid vast on-the-wire messages straining sender and receiver,
332 * especially in the case where the receiver has been down for a while
333 * and so we have accumulated many messages.
334 * <p>
335 * More messages in each batch may be more efficient in terms of
336 * message overhead, inter-value compression, etc.
337 */
338 private static final int MAX_UPDATE_BATCH_COUNT = 256;
339
340 /**Synchronise variables with upstream values.
341 * We push updated values upstream to the source,
342 * call sync on the source if this sync is "forced",
343 * and then retrieve changed values from upstream.
344 * <p>
345 * We remember when we last did this and request any changes since then,
346 * except on the first call or if called with force==true,
347 * in which case all values are retrieved.
348 * <p>
349 * To allow for (varying) clock skew between different server instances,
350 * we actually ask for somewhat older values than our last poll,
351 * which will result in some redundant traffic.
352 * <p>
353 * Holds no externally-visible locks,
354 * and does not hold locks while communicating upstream
355 * (so can handle set/get operations even if upstream is slow/blocking),
356 * but if called by multiple threads this will safely serialise the calls.
357 *
358 * @param force if true, this will force a full sync on the read side
359 * by using getVariables(-1) rather than attempting to choose a
360 * nearer timestamp for efficiency;
361 * the implementation is at liberty to use getVariables(-1)
362 * at any time whatever the argument value,
363 * and almost certainly should use it on the first call
364 *
365 * @throws java.io.IOException if one is received from upstream
366 */
367 public void syncVariables(final boolean force)
368 throws IOException
369 {
370 //if(isDebugMode) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+").]"); }
371
372 // Serialise calls to sync() to avoid out-of-order updates...
373 synchronized(_sync_lock)
374 {
375 // Push locally-cached updates upstream,
376 // if there are any pending,
377 // and clear those pending values.
378 if(outboundUpdates != null)
379 {
380 // Get any earlyUpdates (first)
381 // then any other outboundUpdates,
382 // and assemble them into a single update to send upstream.
383 // Upon failure, put them at the start of the (early) list to resend
384 // in case new items have arrived.
385 //
386 // We *do not* hold the lock while communicating upstream.
387 Deque<SimpleVariableValue> toGoUpstreamAll = null;
388 synchronized(outboundUpdates)
389 {
390 // Atomically retrieve current pending updates, if any,
391 // and clear the pending updates.
392 // We can put them back afterwards if we have to, sort of.
393 final int size = itemsQueuedForUpstream();
394 if(IsDebug.isDebug && (size > 0)) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+"): outboundUpdates.size()="+size+".]"); }
395 //if(isDebugMode && (size > 0)) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+"): outboundUpdates.size()="+size+".]"); }
396 if(size > 0)
397 {
398 // Assemble updates in order, earlyUpdates first,
399 // ready to send upstream.
400 toGoUpstreamAll = new ArrayDeque<SimpleVariableValue>(size);
401 toGoUpstreamAll.addAll(earlyUpdates);
402 toGoUpstreamAll.addAll(outboundUpdates.values());
403
404 // Clear the originals...
405 earlyUpdates.clear();
406 outboundUpdates.clear();
407 }
408 }
409
410 if(toGoUpstreamAll != null)
411 {
412 // Attempt to send the outstanding values upstream...
413 // However, if this fails due to an I/O problem,
414 // then try to reinsert any not-yet-overwritten values
415 // at the start of the earlyUpdates
416 // so that they will go before any later arrivals.
417 try
418 {
419 // We try to send updates in small-ish batches
420 // to avoid blowing up the recipient if we have accumulated many.
421 while(!toGoUpstreamAll.isEmpty())
422 {
423 // Extract the first updates and try to send them.
424 final int toSend = Math.min(toGoUpstreamAll.size(), MAX_UPDATE_BATCH_COUNT);
425 final SimpleVariableValue[] updates = new SimpleVariableValue[toSend];
426 // Copy (not remove) the first 'toSend' updates to the array...
427 final Iterator<SimpleVariableValue> iterator = toGoUpstreamAll.iterator();
428 for(int i = 0; i < toSend; ++i)
429 { updates[i] = iterator.next(); }
430 // Try to send upstream...
431 if(IsDebug.isDebug) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sending batch of updates upstream: "+toSend+"/"+toGoUpstreamAll.size()+".]"); }
432 source.setVariables(updates);
433 // If that was completely successful
434 // then remove the initial items from the start of the list.
435 // TODO: do this in a block to overcome the inefficiency of many calls.
436 for(int i = 0; i < toSend; ++i)
437 { toGoUpstreamAll.removeFirst(); }
438 }
439 }
440 catch(final IOException e)
441 {
442 // In case of error,
443 // requeue anything that may not have been successfully received
444 // and is not too old for the receiver to handle
445 // to keep our queue size-bounded.
446
447 // Filter out anything too old.
448 final long now = System.currentTimeMillis();
449 final long tooOld = now - BasicVarMgr.MAX_APPARENT_MESSAGE_AGE_MS;
450 // Ordinary non-event values can be dumped sooner...
451 final long tooOldNonEvent = now - 101 - 2*SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS;
452 // TODO: maybe make sure that we don't somehow have anything (too far) in the future?
453
454 final List<SimpleVariableValue> toGoUpstreamFiltered =
455 new ArrayList<SimpleVariableValue>(toGoUpstreamAll.size());
456 for(final SimpleVariableValue svv : toGoUpstreamAll)
457 {
458 if(svv == null) { continue; }
459 if(svv.getTimestamp() < tooOld) { continue; }
460 if(!svv.getDef().isEvent() && (svv.getTimestamp() < tooOldNonEvent)) { continue; }
461 toGoUpstreamFiltered.add(svv);
462 }
463
464 if(IsDebug.isDebug) { logger.log("WARNING: PipelineVarMgr: values to resend up stream: " + toGoUpstreamFiltered.size()); }
465
466 // Protect access to earlyUpdates...
467 synchronized(outboundUpdates)
468 {
469 // Insert the not-too-old items in order at the start...
470 earlyUpdates.addAll(0, toGoUpstreamFiltered);
471 }
472 //if(isDebugMode) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") got IOException.]", e); }
473 throw e; // Rethrow the error...
474 }
475 }
476 }
477
478 // Now force any changes all the way to the master's end-point
479 // iff this is a "forced" sync().
480 if(force) { source.syncVariables(true); }
481
482 //if(isDebugMode) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") about to get...]"); }
483
484 // Now retrieve any values from downstream.
485 // For the moment we'll assume that our local clock,
486 // plus the cache hold time used as a skew corrector,
487 // will ensure that we don't miss anything
488 // at the cost of a few wasted/redundant transfers,
489 // though:
490 // * on the first run, or
491 // * if "force"d, or
492 // * we got nothing on our last poll,
493 // then we fetch everything with a getVariables(-1).
494 final Long lastTime = lastFetch; // Fetch value just once.
495 final boolean firstRun = (lastTime == null);
496 final boolean getAll = (ALWAYS_FETCH_ALL_UPSTREAM_VALUES || force || firstRun);
497 final SimpleVariableValue[] newVals =
498 source.getVariables(getAll ? -1 :
499 (lastTime.longValue() - CoreConsts.MAX_PEER_CLOCK_SKEW_MS - 1));
500 // Apply the values fetched...
501 // (For safety, we discard any that appear to be invalid.)
502 for(int i = newVals.length; --i >= 0; )
503 {
504 final SimpleVariableValue svv = newVals[i];
505 final SimpleVariableDefinition def = svv.getDef();
506 if(!SystemVariables.defs.contains(def))
507 {
508 logger.log("PipelineVarMgr: ERROR: upstream variable value rejected: " + svv.getDef());
509 continue;
510 }
511
512 // Store the new value in our local cache,
513 // but don't have it propagate back upstream!
514 // Note that it is from upstream,
515 // so read-only values *can* be set in our cache.
516 super._setVariable(svv, true);
517 }
518
519
520 // If we have any deferred requests for current/all interval events,
521 // then attempt to fetch and store/cache one at random now
522 // in the hope of being able to satisfy the next request for it.
523 // If something goes wrong and we "lose" one of these
524 // deferred requests, that is OK, since we made no promises
525 // about the timeliness of current-interval data.
526 for(final boolean current : new boolean[]{ false, true } )
527 {
528 final Tuple.Pair<SimpleVariableDefinition, EventPeriod> deferred =
529 _getAndClearDeferredRequestForInterval(current);
530 if(deferred != null)
531 {
532 // Current interval, which we never expect to be authoritative...
533 final long currentInterval =
534 deferred.second.getIntervalNumber(System.currentTimeMillis());
535
536 final BitSet justTheOne = new BitSet(1);
537 justTheOne.set(0); // Just get the one current value set.
538 final EventVariableValue upstreamValues[] =
539 source.getEventValues(deferred.first, deferred.second,
540 current ? currentInterval : 0,
541 justTheOne);
542
543 // Save the fetched value for the next request!
544 if(upstreamValues.length > 0)
545 {
546 final EventVariableValue evv = upstreamValues[0];
547 if(evv != null)
548 {
549 if(IsDebug.isDebug) { logger.log("[Post-fetched "+(current?"current":"all")+"-interval data for " + deferred + ".]"); }
550 super.setEventValue(evv);
551 }
552 }
553 }
554 }
555
556 // Consider a full fetch if there were no updated values returned.
557 lastFetch = ((newVals.length == 0) &&
558 (0 == Rnd.fastRnd.nextInt(3 + ((GenUtils.mustConservePower()?171:101) * LocalProps.getServerSlowdownFactor())))) ?
559 null : new Long(System.currentTimeMillis());
560 }
561
562 //if(isDebugMode) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") FINISHED]"); }
563 }
564
565 /**Returns an estimate of the number of updates queued to go upstream; non-negative. */
566 public int itemsQueuedForUpstream()
567 {
568 synchronized(outboundUpdates) { return(earlyUpdates.size() + outboundUpdates.size()); }
569 }
570
571
572 /**If true then generally don't go upstream (synchronously) just to fetch values for the current (or all) interval; respond from local store.
573 * We may nonetheless synchronously request current-interval data from upstream:
574 * <ul>
575 * <li>when asking for something else in the same request,
576 * <li>asynchronously when a particular event/period combination
577 * has been asked for recently,
578 * <li>when we have nothing cached and would have to return empty-handed to the user,
579 * </ul>
580 * thus only making for a marginal extra cost.
581 * <p>
582 * If true, there is no guarantee that the "current" data reflects more than
583 * locally-originated events or will be hugely up-to-date,
584 * though the master's main copy will be.
585 */
586 private static final boolean DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY = true;
587
588 /**Set of "current" interval event-values requested that we declined to fetch synchronously.
589 * We clear these as we asynchronously fetch the requested values.
590 * <p>
591 * A map from event variable definitions to
592 * a set of any deferred current-interval requests for that set
593 * (or to null if none have ever been deferred for that event type).
594 * <p>
595 * A Hashtable is used to guarantee basic thread safety.
596 * All manipulation of the contents of the table,
597 * and in particular of the non-threadsafe EnumSet values,
598 * must be done holding a lock on the table instance.
599 */
600 private final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> _deferredCIRequests =
601 new Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>>();
602
603 /**Set of "all" interval event-values requested that we declined to fetch synchronously.
604 * We clear these as we asynchronously fetch the requested values.
605 * <p>
606 * A map from event variable definitions to
607 * a set of any deferred all-interval requests for that set
608 * (or to null if none have ever been deferred for that event type).
609 * <p>
610 * A Hashtable is used to guarantee basic thread safety.
611 * All manipulation of the contents of the table,
612 * and in particular of the non-threadsafe EnumSet values,
613 * must be done holding a lock on the table instance.
614 */
615 private final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> _deferredAIRequests =
616 new Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>>();
617
618 /**Class definining all details of a deferred request.
619 * This comprises:
620 * <ul>
621 * <li>Event name/definition,
622 * <li>EventPeriod,
623 * <li>The all/current flag.
624 * </ul>
625 * <p>
626 * Equality depends on all fields.
627 */
628 private final class NextRequestKey
629 {
630 /**Event definition; never null. */
631 final SimpleVariableDefinition def;
632
633 /**Event period; never null. */
634 final EventPeriod p;
635
636 /**True if "all", else false for "current". */
637 final boolean isAll;
638
639 public NextRequestKey(final SimpleVariableDefinition def,
640 final EventPeriod p,
641 final boolean isAll)
642 {
643 this.def = def;
644 this.p = p;
645 this.isAll = isAll;
646
647 assert(p != null);
648 assert((def != null) && def.isEvent());
649 assert((def.getEvPeriodSubset() == null) ||
650 def.getEvPeriodSubset().contains(p));
651 }
652
653 /**The hash depends on all fields. */
654 @Override
655 public int hashCode()
656 { return(def.hashCode() ^ (isAll ? p.ordinal() : p.name().hashCode())); }
657
658 /**Equality depends on all fields. */
659 @Override
660 public boolean equals(final Object obj)
661 {
662 if(!(obj instanceof NextRequestKey)) { return(false); }
663 final NextRequestKey other = (NextRequestKey) obj;
664 if(isAll != other.isAll) { return(false); }
665 if(!p.equals(other.p)) { return(false); }
666 if(!def.equals(other.def)) { return(false); }
667 return(true);
668 }
669 }
670
671 /**Next permitted (deferred) request for a given all/current period data set.
672 * We defer repeated requests for the same item until either:
673 * <ul>
674 * <li>The start of the next interval for that item.
675 * <li>A reasonable fraction of the period for that item
676 * has elapsed since the last request for it.
677 * </ul>
678 * <p>
679 * This means that we limit the cost of repeated requests for the same item
680 * in such a way that we still have a reasonable chance of seeing
681 * intra-interval updates.
682 * <p>
683 * The table is thread-safe and parameterised for quick, concurrent access.
684 */
685 private final Map<NextRequestKey,Long> _nextRequestNotBefore =
686 new ConcurrentHashMap<NextRequestKey, Long>(41, 0.7f, 4);
687
688 /**Note that we have deferred a request for the given event and period.
689 *
690 * @param currentInterval true for "current" interval,
691 * false for "all" interval
692 * @param def non-null event definition
693 * @param intervalSelector non-null interval/period value
694 */
695 private final void _noteDeferredRequestForInterval(final boolean currentInterval,
696 final SimpleVariableDefinition def,
697 final EventPeriod intervalSelector)
698 {
699 if((def == null) || !def.isEvent() ||
700 (intervalSelector == null))
701 { throw new IllegalArgumentException(); }
702
703 // Caller must be asking for valid period for this event type.
704 assert((def.getEvPeriodSubset() == null) ||
705 def.getEvPeriodSubset().contains(intervalSelector));
706
707
708 // We may ignore rapid repeated requests for the same item,
709 // if the repeat is within a small fraction of the period.
710 final NextRequestKey key = new NextRequestKey(def, intervalSelector, !currentInterval);
711 final Long notBefore = _nextRequestNotBefore.get(key);
712 // Return if too early for another request for this value yet.
713 final long now = System.currentTimeMillis();
714 if((notBefore != null) && (notBefore.longValue() > now))
715 { return; }
716
717 // Compute/record when it would next be OK to ask for an update.
718 // At most we put this off by a fraction of our temporal slackness,
719 // and as a minimum a substantial fraction of our sysvar latency.
720 final long nextNotBefore = now +
721 Math.min(CoreConsts.DEFAULT_TEMPORAL_SLACKNESS_S * 249,
722 intervalSelector.getIntervalMs() >>> 4) +
723 Rnd.fastRnd.nextInt(SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS/2);
724 _nextRequestNotBefore.put(key, new Long(nextNotBefore));
725
726
727 // Select which deferred set we are working with...
728 final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> diSet =
729 currentInterval ? _deferredCIRequests : _deferredAIRequests;
730
731 // Hold a lock briefly while we do our work.
732 synchronized(diSet)
733 {
734 // Get any existing Set of values.
735 final EnumSet<EventPeriod> s = diSet.get(def);
736 // If none then create and store a new Set with just this value.
737 if(s == null)
738 {
739 diSet.put(def, EnumSet.of(intervalSelector));
740 return;
741 }
742 // If non-null then just add this bit to the Set already held.
743 s.add(intervalSelector);
744 }
745 }
746
747 /**Select, remove and return at random one of the deferred current/all interval requests; null if none.
748 * This is an atomic fetch-and-clear operation,
749 * designed to be used in association with
750 * _noteDeferredRequestForInterval().
751 *
752 * @param currentInterval true for "current" interval,
753 * false for "all" interval
754 * @return null or a pair with both elements valid and non-null
755 */
756 private Tuple.Pair<SimpleVariableDefinition, EventPeriod> _getAndClearDeferredRequestForInterval(
757 final boolean currentInterval)
758 {
759 // Select which deferred set we are working with...
760 final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> diSet =
761 currentInterval ? _deferredCIRequests : _deferredAIRequests;
762
763 // Hold a lock briefly while we do our work.
764 synchronized(diSet)
765 {
766 // Return null immediately if no deferred requests pending.
767 final int n = diSet.size();
768 if(n == 0) { return(null); }
769
770 // Pick one of the event types at random.
771 final SimpleVariableDefinition defs[] = new SimpleVariableDefinition[n];
772 diSet.keySet().toArray(defs);
773 final SimpleVariableDefinition def =
774 defs[(n == 1) ? 0 : Rnd.fastRnd.nextInt(n)];
775
776 // The def should be a valid event type.
777 assert(def != null);
778 assert(def.isEvent());
779
780 // Get the Set of deferred requests for this event type.
781 final EnumSet<EventPeriod> s = diSet.get(def);
782
783 // The Set should be non-null and non-empty.
784 assert(s != null);
785 assert(!s.isEmpty());
786
787 // Pick one of the pending iterval/period values at random.
788 final int m = s.size();
789 final EventPeriod eps[] = new EventPeriod[m];
790 s.toArray(eps);
791 final EventPeriod ep =
792 eps[(m == 1) ? 0 : Rnd.fastRnd.nextInt(m)];
793
794 // We should have chosen a valid interval.
795 assert((def.getEvPeriodSubset() == null) ||
796 def.getEvPeriodSubset().contains(ep));
797
798 // Compose our result.
799 final Tuple.Pair<SimpleVariableDefinition, EventPeriod> result =
800 new Tuple.Pair<SimpleVariableDefinition, EventPeriod>(def, ep);
801
802 // Now remove our pending value,
803 // and indeed the whole Set if now empty.
804 if(m == 1) { diSet.remove(def); }
805 else { s.remove(ep); }
806
807 return(result);
808 }
809 }
810
811 /**Locks to prevent redundant concurrent fetches/updates of the same event type from upstream in getEventValues().
812 * The aim of this is to avoid making many fruitless upstream accesses
813 * over (for example) a slow or connection-limited medium such as an HTTP tunnel.
814 * <p>
815 * Locking at the per-def level probably admits almost all potential useful concurrency
816 * and is relatively simple.
817 * <p>
818 * We allow for a limited amount of concurrency in writing
819 * but we mainly use ConcurrentHashMap for thread-safety, read concurrency and putIfAbsent().
820 */
821 private final ConcurrentMap<SimpleVariableDefinition, ReentrantLock> _getEventValues_locks =
822 new ConcurrentHashMap<SimpleVariableDefinition, ReentrantLock>(2*SystemVariables.defs.size(), 0.5f, 4);
823
824 /**Get event values; never null.
825 * We get what we can from our local store,
826 * then go upstream to fill in any unfilled requested slots
827 * for which we did not have authoritative data
828 * (and then attempt to cache them locally for next time).
829 * However, if unable to fetch authoritative data (quickly)
830 * then we will return null or non-authoritative data as appropriate for each slot;
831 * this does not throw IOException.
832 * <p>
833 * We treat the 'current' and 'all' intervals specially
834 * since upstream will never offer us an authoritative value
835 * for the current slot, only for older completed ones.
836 * We get requested values asynchronously,
837 * to be reasonably up-to-date for the <em>next</em> request.
838 * <p>
839 * If a request for a "local" event upstream fails with an IllegalArgumentException
840 * then we assume that upstream of us is a tunnel which local values don't cross
841 * so we simply report whatever value we have to hand.
842 *
843 * @param def non-null event-variable definition
844 * @param intervalSelector non-null valid interval for specified event
845 * @param whichValues each true bit represents a slot for which data is
846 * required, bit 0 indicating data from the slot within which
847 * firstIntervalTime is located, bit 1 the previous slot, etc;
848 * null is treated as the common case equivalent to just bit 0 set
849 *
850 * @throws IOException if we have difficulty getting values from upstream
851 * @throws InterruptedIOException if interrupted or if blocking for too long.
852 */
853 @Override
854 public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
855 final EventPeriod intervalSelector,
856 final long intervalNumber,
857 final BitSet whichValues)
858 {
859 //logger.log("PipelineVarMgr.getEventValues("+def+", "+intervalSelector+", "+intervalNumber+", "+whichValues+")");
860
861 // Current interval number (never authoritative).
862 final long currentInterval = intervalSelector.getIntervalNumber(System.currentTimeMillis());
863
864 // Optimisation fast-path for common/simple case:
865 // the single requested slot/value is available locally and is authoritative.
866 // If available from local store then we don't need to use our locks/throttling,
867 // else we fall through for our more general case.
868 final boolean nullWV = (whichValues == null);
869 if(nullWV ||
870 ((whichValues.cardinality() == 1) && whichValues.get(0)))
871 {
872 // Don't try to satisfy requests for the current/all slots here...
873 if((intervalNumber != 0) && (intervalNumber != currentInterval))
874 {
875 // Get what we can from local store.
876 final EventVariableValue[] result =
877 super.getEventValues(def, intervalSelector, intervalNumber, null);
878
879 if((result.length != 0) &&
880 (result[0] != null) &&
881 (result[0].isAuthoritative()))
882 { return(result); /* Good, local cached value does the job. */ }
883 }
884 }
885
886 // Verify that this this def is an event and is genuine!
887 assert(def.isEvent());
888 assert(SystemVariables.defs.contains(def));
889
890 // First get what we can from our local store/cache.
891 // This call will validate the method arguments too.
892 EventVariableValue[] result =
893 super.getEventValues(def, intervalSelector, intervalNumber, whichValues);
894
895 // If any requested values are missing/null or non-authoritative,
896 // then try to go upstream for them (and then cache them locally).
897 BitSet wv = whichValues;
898 if(wv == null)
899 {
900 // Create synthetic BitSet for simplicity below.
901 wv = new BitSet(1);
902 wv.set(0);
903 }
904
905 // Note any "missing"/non-authoritative values.
906 final BitSet nonAuthValues = new BitSet();
907 int resultsPresent = 0;
908 for(int i = wv.nextSetBit(0); i >= 0; i = wv.nextSetBit(i+1))
909 {
910 final boolean absent = ((i >= result.length) || (result[i] == null));
911 if(!absent) { ++resultsPresent; }
912 if(absent || !result[i].isAuthoritative())
913 { nonAuthValues.set(i); }
914 }
915
916 // If the entire request has been satisfied from local cache
917 // then we can return immediately!
918 if(nonAuthValues.length() == 0)
919 { return(result); }
920
921 // Note if none of the request at all has been assembled from cache.
922 // In this case, we'll try harder to fetch values from upstream
923 // since we don't really want to return completely empty-handed.
924 final boolean noResultsFromCache = (resultsPresent == 0);
925
926 // If the only non-authoritative answer is
927 // in the "current" or "all" slot
928 // (it never would be authoritative anyway)
929 // then return now what we have locally cached/available.
930 // Don't bother with the expense of going upstream synchronously
931 // providing we have some sort of answer from cache
932 // but just note that the value was requested and deferred.
933 // We will go and fetch a fresh value soon, in the background.
934 if(DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY &&
935 (!noResultsFromCache) &&
936 (nonAuthValues.cardinality() == 1))
937 {
938 final int which = nonAuthValues.length()-1; // Index of the one bit that is set...
939
940 if(intervalNumber == 0)
941 {
942 _noteDeferredRequestForInterval(false, def, intervalSelector);
943 return(result);
944 }
945 else if(intervalNumber - which == currentInterval)
946 {
947 _noteDeferredRequestForInterval(true, def, intervalSelector);
948 return(result);
949 }
950 }
951
952 // THIS REQUEST WILL NEED TO GO UPSTREAM TO BE COMPLETELY SATISFIED.
953
954 // We allow at most one upstream access per def at once
955 // (through this front-end)
956 // to try to avoid overloading any tunnel or other choke-point.
957 // The lock is held around the underlying upstream call,
958 // even if that continues asynchronously out of sight from here.
959 // Get the unique lock for this event/def,
960 // with race-free create on first use.
961 ReentrantLock _getEventValues_lock;
962 while(null == (_getEventValues_lock = _getEventValues_locks.get(def)))
963 {
964 // Ensure no races for creation of a lock instance,
965 // ie at most one lock will ever be current for one def.
966 _getEventValues_locks.putIfAbsent(def, new ReentrantLock());
967 }
968 final ReentrantLock getEventValues_lock = _getEventValues_lock;
969 // Give up quickly if there is already a queue for the lock for this def,
970 // ie if we might well fail to get to make our call within a reasonable time.
971 if(getEventValues_lock.hasQueuedThreads())
972 {
973 return(result); // Return what we had collected so far, if anything.
974 }
975
976 // Make sure that the result value is large enough to accommodate
977 // all the results requested.
978 // If not then replace it with a large enough one and copy existing results in.
979 final int wvl = nullWV ? 1 : whichValues.length();
980 if(result.length < wvl)
981 {
982 final EventVariableValue newResult[] = new EventVariableValue[wvl];
983 System.arraycopy(result, 0, newResult, 0, result.length);
984 result = newResult;
985 }
986
987 //logger.log("PipelineVarMgr.getEventValues() upstream request: " + nonAuthValues);
988
989 // Make an upstream request for the slots/intervals
990 // where our local store did not have authoritative answers.
991 final BitSet upstreamRequest = (BitSet) nonAuthValues.clone(); // Logically immutable...
992 // We make an effort to prevent this request blocking indefinitely
993 // (though if the thread pool fills then we end up running synchronously in this thread).
994 final Future<EventVariableValue[]> fetchedValues = ThreadUtils.nonCPUThreadPool.submit(new Callable<EventVariableValue[]>(){
995 public EventVariableValue[] call()
996 {
997 // Now try to grab the lock associated with this def,
998 // but we are only prepared to block for a very short while
999 // (short enough time not to annoy interactive users for example).
1000 // If we can't get the lock quickly then we abort.
1001 // Try harder if we'd otherwise have nothing at all to return to the original caller.
1002 try
1003 {
1004 if(!getEventValues_lock.tryLock(1 + (noResultsFromCache ? (CoreConsts.MAX_INTERACTIVE_DELAY_MS*2) : (CoreConsts.MAX_INTERACTIVE_DELAY_MS/2)), TimeUnit.MILLISECONDS))
1005 {
1006 logger.log("INFO: could not quickly acquire lock in PipelineVarMgr.getEventValues() for "+def);
1007 return(new EventVariableValue[0]); // Abort: return empty result.
1008 }
1009 }
1010 catch(final InterruptedException e)
1011 {
1012 return(new EventVariableValue[0]); // Abort: return empty result.
1013 }
1014
1015 try {
1016 // Get the data from upstream.
1017 final EventVariableValue[] upstreamValues = source.getEventValues(def, intervalSelector, intervalNumber, upstreamRequest);
1018
1019 // Attempt to cache results here to avoid simply discarding them
1020 // if the caller has to give up waiting for us
1021 // before we've finished the RPC or handling/cacheing the response.
1022 if(upstreamValues.length > 0)
1023 {
1024 // Offer the values that we requested from upstream to our local store
1025 // and then merge them into our initial response.
1026 for(int i = upstreamRequest.nextSetBit(0); i >= 0; i = upstreamRequest.nextSetBit(i+1))
1027 {
1028 if(i >= upstreamValues.length) { break; }
1029 final EventVariableValue evv = upstreamValues[i];
1030 // Skip missing responses.
1031 if(evv == null) { continue; }
1032 // Skip dubious claims to authoritative answers for the current interval or the future (!)
1033 // which might be the result of a race or clock skew, etc...
1034 if((evv.getIntervalNumber() >= currentInterval) && evv.isAuthoritative()) { continue; }
1035
1036 // Offer upstream value to the local store, ie cache it...
1037 /*super.*/setEventValue(evv);
1038 }
1039 }
1040
1041 return(upstreamValues); // Return values to hand back to the original caller.
1042 }
1043 finally { getEventValues_lock.unlock(); }
1044 }
1045 });
1046
1047 EventVariableValue upstreamValues[];
1048 // Spend a limited time (much longer than we waited for the lock) fetching from upstream.
1049 // We allow at least long enough for a round-the-world RTT to marshal and send the data.
1050 try { upstreamValues = fetchedValues.get(Math.max(CoreConsts.MAX_TYPICAL_RPC_RTT_MS, 2*CoreConsts.MAX_INTERACTIVE_DELAY_MS), TimeUnit.MILLISECONDS); }
1051 // If the upstream result was not ready in time then return what we already had.
1052 catch(final TimeoutException e)
1053 {
1054 // Need to log failures to allow some system tuning...
1055 logger.log("WARNING: PipelineVarMgr timed out fetch of " + def + " reason: " + e.getMessage());
1056
1057 // Synchronously and without time limit retry a minimal subset of the original request
1058 // in case marshalling/transmission time (of the full request)
1059 // would always otherwise prevent completion.
1060 // This way we may make incremental progress.
1061 // Try for just the first (newest) item requested when we timed out.
1062 // Note that we do this without the normal cover of our per-def concurrency lock/limit.
1063 // TODO: Try a random subset in case of chronic difficulties with particular slots.
1064 if(upstreamRequest.length() > 1)
1065 {
1066 final int firstNonAuth = upstreamRequest.nextSetBit(0);
1067 assert(firstNonAuth >= 0);
1068 // Discard all but the first set bit.
1069 final BitSet upstreamRequestMin = (BitSet) upstreamRequest.clone();
1070 upstreamRequestMin.clear(firstNonAuth+1, upstreamRequestMin.length());
1071 assert(upstreamRequestMin.cardinality() == 1); // Should be exactly one bit set now.
1072 // Don't put a time limit on this to try to force/allow some progress.
1073 upstreamValues = source.getEventValues(def, intervalSelector, intervalNumber, upstreamRequestMin);
1074 // Make sure that this result, if any (and sane) is cached below...
1075 // Fall through to use whatever we managed to fetch this time.
1076 }
1077 // No subset is possible so return result collected so far.
1078 else { return(result); }
1079 }
1080 // If we were interrupted then return what we had.
1081 catch(final InterruptedException e)
1082 {
1083 Thread.currentThread().interrupt(); // Don't lose "interruptedness".
1084 return(result);
1085 }
1086 // Log unexpected error and return what we already had.
1087 catch(final ExecutionException e)
1088 {
1089 logger.log("unexpected error", e); // Shouldn't happen, so log it.
1090 return(result);
1091 }
1092
1093 // Merge the upstream results into our initial response.
1094 for(int i = upstreamRequest.nextSetBit(0); i >= 0; i = upstreamRequest.nextSetBit(i+1))
1095 {
1096 if(i >= upstreamValues.length) { break; }
1097 final EventVariableValue evv = upstreamValues[i];
1098 // Skip missing responses.
1099 if(evv == null) { continue; }
1100 // Skip dubious claims to authoritative answers for the current interval or the future (!)
1101 // which might be the result of a race or clock skew, etc...
1102 if((evv.getIntervalNumber() >= currentInterval) && evv.isAuthoritative()) { continue; }
1103
1104 // Put upstream value into our response.
1105 result[i] = evv;
1106
1107 // Offer upstream value to the local store, ie cache it, in case not already done...
1108 super.setEventValue(evv);
1109 }
1110
1111 // OK, respond as well as we can...
1112 return(result);
1113 }
1114 }