001    /*
002    Copyright (c) 1996-2011, Damon Hart-Davis
003    All rights reserved.
004    
005    Redistribution and use in source and binary forms, with or without
006    modification, are permitted provided that the following conditions are
007    met:
008    
009      * Redistributions of source code must retain the above copyright
010        notice, this list of conditions and the following disclaimer.
011    
012      * Redistributions in binary form must reproduce the above copyright
013        notice, this list of conditions and the following disclaimer in the
014        documentation and/or other materials provided with the
015        distribution.
016    
017    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028    */
029    package org.hd.d.pg2k.svrCore.vars;
030    
031    import java.io.IOException;
032    import java.io.InterruptedIOException;
033    import java.util.ArrayDeque;
034    import java.util.ArrayList;
035    import java.util.BitSet;
036    import java.util.Deque;
037    import java.util.EnumSet;
038    import java.util.Hashtable;
039    import java.util.Iterator;
040    import java.util.List;
041    import java.util.Map;
042    import java.util.concurrent.Callable;
043    import java.util.concurrent.ConcurrentHashMap;
044    import java.util.concurrent.ConcurrentMap;
045    import java.util.concurrent.ExecutionException;
046    import java.util.concurrent.Future;
047    import java.util.concurrent.TimeUnit;
048    import java.util.concurrent.TimeoutException;
049    import java.util.concurrent.locks.ReentrantLock;
050    
051    import org.hd.d.pg2k.svrCore.CoreConsts;
052    import org.hd.d.pg2k.svrCore.GenUtils;
053    import org.hd.d.pg2k.svrCore.Rnd;
054    import org.hd.d.pg2k.svrCore.ThreadUtils;
055    import org.hd.d.pg2k.svrCore.Tuple;
056    import org.hd.d.pg2k.svrCore.props.LocalProps;
057    
058    import ORG.hd.d.IsDebug;
059    
060    
061    /**Class to "manage" variables for an element of a SimpleExhibitPipelineIF.
062     * Caches values to minimise read (and optionally write) activity.
063     * <p>
064     * Always maintains a local store of variables,
065     * and normally sync() needs to be called periodically
066     * to merge with upstream values.
067     * <p>
068     * Typical places to inject one of these is in a caching stage,
069     * or at pipeline or tunnel end-points.
070     * <p>
071     * A non-write-though instance has the effect of grouping multiple
072     * setVariable() and setVariables() operations into a single
073     * setVariables() upstream action when sync() is invoked.
074     * This may still write-though in some circumstances.
075     * <p>
076     * In no case does a getVariable() or getVariables() operation
077     * directly cause a request to go upstream,
078     * (thus helping to ensure that get operations are fast even if there
079     * are I/O problems (etc) upstream),
080     * but rather the set of values is
081     * updated periodically with a single getVariables() operation.
082     * <p>
083     * FIXME: getEventValues() avoids, where possible, blocking indefinitely,
084     * and may return non-authoritative data instead,
085     * though may not be able to avoid such blocking in all circumstances.
086     * <p>
087     * Thread-safe; does not hold any externally-visible locks.
088     * <p>
089     * This does not count as an "end-point", ie is never the master data store.
090     */
091    public class PipelineVarMgr extends BasicVarMgr
092                                implements BasicVarMgrInterface,
093                                           SimpleVariablePipelineIF
094        {
095        /**If true, always use getVariables(-1) to get all values from upstream.
096         * Inefficient but very robust; generally not the right thing to do though.
097         */
098        private static final boolean ALWAYS_FETCH_ALL_UPSTREAM_VALUES = false;
099    
100        /**Create a default PipelineVarMgr instance.
101         * Defaults to write-through (ie read caching only),
102         * because that is "safer" behaviour.
103         *
104         * @param upstream  upstream source of data and variable values;
105         *     never null
106         */
107        public PipelineVarMgr(final SimpleVariablePipelineIF upstream)
108            {
109            this(upstream, true); // Write-through.
110            }
111    
112        /**Creates a customised VarMgr instance.
113         * May try to get the local system ID immediately from upstream,
114         * so that it is available for creating globals.
115         *
116         * @param upstream  upstream source of data and variable values;
117         *     never null
118         * @param writeThrough  if true, all set operations are propagated
119         *     downstream immediately
120         */
121        public PipelineVarMgr(final SimpleVariablePipelineIF upstream,
122                              final boolean writeThrough)
123            {
124            super(false, false);
125            assert(!super.isEndPoint()); // This is not an end point.
126            assert(!super.dontFilterDups); // Bad timestamps and replays must be being checked for.
127    
128            if(upstream == null)
129                { throw new IllegalArgumentException(); }
130    
131            // If this cache is write-through then we don't need an outbound store.
132            outboundUpdates = writeThrough ? null : new Hashtable<SimpleVariableDefinition, SimpleVariableValue>();
133            earlyUpdates = writeThrough ? null : new ArrayList<SimpleVariableValue>();
134            source = upstream;
135    
136            if(GET_SYSID_IN_CONS)
137                {
138                // Get the local system ID from upstream
139                // and save it since we shall need it when setting globals.
140                try
141                    {
142                    final SimpleVariableValue svv =
143                        upstream.getVariable(SystemVariables.LOCAL_SYS_ID);
144                    if(svv == null)
145                        { throw new IllegalStateException("unable to fetch system ID from upstream"); }
146                    super._setVariable(svv, true);
147                    assert(null != super.getVariable(SystemVariables.LOCAL_SYS_ID));
148                    }
149                catch(final IOException e)
150                    {
151                    // Ignore (but log) error; hope to get value later...
152                    e.printStackTrace();
153                    }
154                }
155            }
156    
157        /**If true, try in the constructor to get the local system ID from upstream.
158         * This helps us build globalMaps,
159         * and also ensures that the variable is available to clients downstream.
160         */
161        private final static boolean GET_SYSID_IN_CONS = true;
162    
163        /**Upstream source of variable values; never null. */
164        private final SimpleVariablePipelineIF source;
165    
166        /**Returns true if this is a write-through cache. */
167        public boolean isWriteThrough()
168            { return(outboundUpdates == null); }
169    
170        /**Cached outbound variable value updates to be sent upstream.
171         * Will not be used (and will be null) if in write-through mode.
172         * <p>
173         * Used by:
174         * <ul>
175         * <li>setVariable() (to add values)
176         * <li>sync() to push accumulated values upstream
177         * </ul>
178         * <p>
179         * Variable updates are held in this table and periodically flushed
180         * upstream in a block.  If multiple updates to a single variable
181         * occur between flushes, only the last one will be retained,
182         * (the others overwritten) thus reducing traffic.
183         * These may be re-ordered with respect to one another when flushed.
184         * <p>
185         * Note however that one value cannot be overwritten by another if
186         * they have different globalMap keys.
187         * This can only really happen on the master when simultaneous updates
188         * arrive from two different slaves for the same global value.
189         * In this case, we force the previous value upstream immediately.
190         * <p>
191         * A lock must be held on this object to do multiple operations atomically,
192         * but no other visible lock must be grabbed inside a lock on this.
193         * <p>
194         * Hashtable from SimpleVariableDefinition to SimpleVariableValue.
195         * <p>
196         * A Hashtable is used for thread-safety.
197         * <p>
198         * Never null.
199         */
200        private final Hashtable<SimpleVariableDefinition,SimpleVariableValue> outboundUpdates;
201    
202        /**A List of non-coalesced variable set operations to be done first.
203         * These are variables that could not be coalesced for whatever reason,
204         * and must be passed upstream ahead of anything in outboundUpdates.
205         * <p>
206         * This will be non-null when outboundUpdates is.
207         * <p>
208         * The content of this List must only be accessed in the scope of a lock
209         * on outboundUpdates, so this List implementation need not be inherently
210         * thread-safe (eg ArrayList can (and should) be used instead of Vector).
211         */
212        private final List<SimpleVariableValue> earlyUpdates;
213    
214    
215        /**Last time we successfully fetched variable values from upstream; initially null.
216         * Declared as volatile so allow safe access without a lock.
217         */
218        private volatile Long lastFetch;
219    
220        /**Set a single variable value.
221         * If writing though,
222         * then set the value upstream immediately,
223         * else simply put it in our "pending" list waiting for sync().
224         *
225         * @param newValue  new variable value to set; never null
226         * @throws java.io.IOException in case of difficulty setting the value upstream
227         */
228        @Override
229        public void setVariable(final SimpleVariableValue newValue)
230            throws IOException
231            {
232            // Store in our local cache before doing anything else...
233            super._setVariable(newValue, false);
234    
235            // Since we seem to have set the value OK
236            // (so it type-checked, for example),
237            // add to outgoing update list immediately,
238            // or write-though if appropriate.
239    
240            // Write-though.
241            if(outboundUpdates == null)
242                {
243                source.setVariable(newValue);
244                return;
245                }
246    
247            // Accumulate and wait for sync().
248            // Attempt to merge redundant (non-event) operations for efficiency.
249            synchronized(outboundUpdates)
250                {
251                // But for now we never coalesce events.
252                final SimpleVariableDefinition def = newValue.getDef();
253                if(def.isEvent())
254                    {
255                    earlyUpdates.add(newValue);
256                    return;
257                    }
258    
259                // Note however that one value cannot be overwritten by another
260                // iff they have different globalMap keys (or null-ness).
261                // This happens on the master when simultaneous updates
262                // arrive from two different slaves for the same global value.
263                // In this case, we force the previous value upstream now.
264                final SimpleVariableValue prevHeld =
265                        outboundUpdates.put(def, newValue);
266    
267                // Have to handle coalescing if there was a previous value
268                // for this variable.
269                if(prevHeld != null)
270                    {
271                    // Whoops, may conflict...
272                    // If global maps have different keys, or null-ness,
273                    // then they do conflict,
274                    // and we must flush the previous value
275                    // into the earlyUpdates list to go first,
276                    // else we can discard the previous version.
277                    final Map nGM = newValue.getGlobalMap();
278                    final Map pGM = prevHeld.getGlobalMap();
279                    final boolean differentMapKeys = (nGM == null) ?
280                            (pGM != null) :
281                            ((pGM == null) || !nGM.keySet().equals(pGM.keySet()));
282                    if(differentMapKeys)
283                        { earlyUpdates.add(prevHeld); }
284                    }
285                }
286            }
287    
288        /**Set several variable values.
289         * This is equivalent to set series of calls to setVariables()
290         * unless in write-through mode where a single setVariables()
291         * is done upstream for efficiency.
292         *
293         * @return number of values successfully set, at least locally...
294         * @param newValues  new variables values to set; never null
295         * @throws java.io.IOException in case of difficulty setting the values upstream
296         */
297        @Override
298        public int setVariables(final SimpleVariableValue newValues[])
299            throws IOException
300            {
301            if(newValues == null)
302                { throw new IllegalArgumentException(); }
303    
304            if(outboundUpdates != null)
305                {
306                // If not writing-through,
307                // then put in "outbound" list,
308                // being careful to preserve order,
309                // but eliminating duplicates.
310                final int nvLen = newValues.length;
311                for(int i = 0; i < nvLen; ++i)
312                    { setVariable(newValues[i]); }
313                return(newValues.length);
314                }
315            else
316                {
317                // If writing through,
318                // stick to single setVariables() call for efficiency.
319                super.setVariables(newValues);
320                return(source.setVariables(newValues));
321                }
322            }
323    
324    
325        /**Private lock to serialise calls on sync(). */
326        private final Object _sync_lock = new Object();
327    
328        /**Maximum number of set/update values to send upstream in one go; strictly positive.
329         * This limit is here to avoid vast on-the-wire messages straining sender and receiver,
330         * especially in the case where the receiver has been down for a while
331         * and so we have accumulated many messages.
332         * <p>
333         * More messages in each batch may be more efficient in terms of
334         * message overhead, inter-value compression, etc.
335         */
336        private static final int MAX_UPDATE_BATCH_COUNT = 256;
337    
338        /**Synchronise variables with upstream values.
339         * We push updated values upstream to the source,
340         * call sync on the source if this sync is "forced",
341         * and then retrieve changed values from upstream.
342         * <p>
343         * We remember when we last did this and request any changes since then,
344         * except on the first call or if called with force==true,
345         * in which case all values are retrieved.
346         * <p>
347         * To allow for (varying) clock skew between different server instances,
348         * we actually ask for somewhat older values than our last poll,
349         * which will result in some redundant traffic.
350         * <p>
351         * Holds no externally-visible locks,
352         * and does not hold locks while communicating upstream
353         * (so can handle set/get operations even if upstream is slow/blocking),
354         * but if called by multiple threads this will safely serialise the calls.
355         *
356         * @param force  if true, this will force a full sync on the read side
357         *     by using getVariables(-1) rather than attempting to choose a
358         *     nearer timestamp for efficiency;
359         *     the implementation is at liberty to use getVariables(-1)
360         *     at any time whatever the argument value,
361         *     and almost certainly should use it on the first call
362         *
363         * @throws java.io.IOException if one is received from upstream
364         */
365        public void syncVariables(final boolean force)
366            throws IOException
367            {
368    //if(isDebugMode) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+").]"); }
369    
370            // Serialise calls to sync() to avoid out-of-order updates...
371            synchronized(_sync_lock)
372                {
373                // Push locally-cached updates upstream,
374                // if there are any pending,
375                // and clear those pending values.
376                if(outboundUpdates != null)
377                    {
378                    // Get any earlyUpdates (first)
379                    // then any other outboundUpdates,
380                    // and assemble them into a single update to send upstream.
381                    // Upon failure, put them at the start of the (early) list to resend
382                    // in case new items have arrived.
383                    //
384                    // We *do not* hold the lock while communicating upstream.
385                    Deque<SimpleVariableValue> toGoUpstreamAll = null;
386                    synchronized(outboundUpdates)
387                        {
388                        // Atomically retrieve current pending updates, if any,
389                        // and clear the pending updates.
390                        // We can put them back afterwards if we have to, sort of.
391                        final int size = earlyUpdates.size() + outboundUpdates.size();
392    if(IsDebug.isDebug && (size > 0)) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+"): outboundUpdates.size()="+size+".]"); }
393    //if(isDebugMode && (size > 0)) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+"): outboundUpdates.size()="+size+".]"); }
394                        if(size > 0)
395                            {
396                            // Assemble updates in order, earlyUpdates first,
397                            // ready to send upstream.
398                            toGoUpstreamAll = new ArrayDeque<SimpleVariableValue>(size);
399                            toGoUpstreamAll.addAll(earlyUpdates);
400                            toGoUpstreamAll.addAll(outboundUpdates.values());
401    
402                            // Clear the originals...
403                            earlyUpdates.clear();
404                            outboundUpdates.clear();
405                            }
406                        }
407    
408                    if(toGoUpstreamAll != null)
409                        {
410                        // Attempt to send the outstanding values upstream...
411                        // However, if this fails due to an I/O problem,
412                        // then try to reinsert any not-yet-overwritten values
413                        // at the start of the earlyUpdates
414                        // so that they will go before any later arrivals.
415                        try
416                            {
417                            // We try to send updates in small-ish batches
418                            // to avoid blowing up the recipient if we have accumulated many.
419                            while(!toGoUpstreamAll.isEmpty())
420                                {
421                                // Extract the first updates and try to send them.
422                                final int toSend = Math.min(toGoUpstreamAll.size(), MAX_UPDATE_BATCH_COUNT);
423                                final SimpleVariableValue[] updates = new SimpleVariableValue[toSend];
424                                // Copy (not remove) the first 'toSend' updates to the array...
425                                final Iterator<SimpleVariableValue> iterator = toGoUpstreamAll.iterator();
426                                for(int i = 0; i < toSend; ++i)
427                                    { updates[i] = iterator.next(); }
428                                // Try to send upstream...
429    if(IsDebug.isDebug) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sending batch of updates upstream: "+toSend+"/"+toGoUpstreamAll.size()+".]"); }
430                                source.setVariables(updates);
431                                // If that was completely successful
432                                // then remove the initial items from the start of the list.
433                                // TODO: do this in a block to overcome the inefficiency of many calls.
434                                for(int i = 0; i < toSend; ++i)
435                                    { toGoUpstreamAll.removeFirst(); }
436                                }
437                            }
438                        catch(final IOException e)
439                            {
440                            // In case of error,
441                            // requeue anything that may not have been successfully received
442                            // and is not too old for the receiver to handle
443                            // to keep our queue size-bounded.
444    
445                            // Filter out anything too old.
446                            final long now = System.currentTimeMillis();
447                            final long tooOld = now - BasicVarMgr.MAX_APPARENT_MESSAGE_AGE_MS;
448                            // Ordinary non-event values can be dumped sooner...
449                            final long tooOldNonEvent = now - 101 - 2*SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS;
450                            // TODO: maybe make sure that we don't somehow have anything (too far) in the future?
451    
452                            final List<SimpleVariableValue> toGoUpstreamFiltered =
453                                    new ArrayList<SimpleVariableValue>(toGoUpstreamAll.size());
454                            for(final SimpleVariableValue svv : toGoUpstreamAll)
455                                {
456                                if(svv == null) { continue; }
457                                if(svv.getTimestamp() < tooOld) { continue; }
458                                if(!svv.getDef().isEvent() && (svv.getTimestamp() < tooOldNonEvent)) { continue; }
459                                toGoUpstreamFiltered.add(svv);
460                                }
461    
462    if(IsDebug.isDebug) { System.err.println("WARNING: PipelineVarMgr: values to resend up stream: " + toGoUpstreamFiltered.size()); }
463    
464                            // Protect access to earlyUpdates...
465                            synchronized(outboundUpdates)
466                                {
467                                // Insert the not-too-old items in order at the start...
468                                earlyUpdates.addAll(0, toGoUpstreamFiltered);
469                                }
470    //if(isDebugMode) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") got IOException.]"); }
471    //if(isDebugMode) { e.printStackTrace(); }
472                            throw e; // Rethrow the error...
473                            }
474                        }
475                    }
476    
477                // Now force any changes all the way to the master's end-point
478                // iff this is a "forced" sync().
479                if(force) { source.syncVariables(true); }
480    
481    //if(isDebugMode) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") about to get...]"); }
482    
483                // Now retrieve any values from downstream.
484                // For the moment we'll assume that our local clock,
485                // plus the cache hold time used as a skew corrector,
486                // will ensure that we don't miss anything
487                // at the cost of a few wasted/redundant transfers,
488                // though:
489                //   * on the first run, or
490                //   * if "force"d, or
491                //   * we got nothing on our last poll,
492                // then we fetch everything with a getVariables(-1).
493                final Long lastTime = lastFetch; // Fetch value just once.
494                final boolean firstRun = (lastTime == null);
495                final boolean getAll = (ALWAYS_FETCH_ALL_UPSTREAM_VALUES || force || firstRun);
496                final SimpleVariableValue[] newVals =
497                    source.getVariables(getAll ? -1 :
498                        (lastTime.longValue() - CoreConsts.MAX_PEER_CLOCK_SKEW_MS - 1));
499                // Apply the values fetched...
500                // (For safety, we discard any that appear to be invalid.)
501                for(int i = newVals.length; --i >= 0; )
502                    {
503                    final SimpleVariableValue svv = newVals[i];
504                    final SimpleVariableDefinition def = svv.getDef();
505                    if(!SystemVariables.defs.contains(def))
506                        {
507                        System.err.println("PipelineVarMgr: ERROR: upstream variable value rejected: " + svv.getDef());
508                        continue;
509                        }
510    
511                    // Store the new value in our local cache,
512                    // but don't have it propagate back upstream!
513                    // Note that it is from upstream,
514                    // so read-only values *can* be set in our cache.
515                    super._setVariable(svv, true);
516                    }
517    
518    
519                // If we have any deferred requests for current/all interval events,
520                // then attempt to fetch and store/cache one at random now
521                // in the hope of being able to satisfy the next request for it.
522                // If something goes wrong and we "lose" one of these
523                // deferred requests, that is OK, since we made no promises
524                // about the timeliness of current-interval data.
525                for(final boolean current : new boolean[]{ false, true } )
526                    {
527                    final Tuple.Pair<SimpleVariableDefinition, EventPeriod> deferred =
528                        _getAndClearDeferredRequestForInterval(current);
529                    if(deferred != null)
530                        {
531                        // Current interval, which we never expect to be authoritative...
532                        final long currentInterval =
533                            deferred.second.getIntervalNumber(System.currentTimeMillis());
534    
535                        final BitSet justTheOne = new BitSet(1);
536                        justTheOne.set(0); // Just get the one current value set.
537                        final EventVariableValue upstreamValues[] =
538                            source.getEventValues(deferred.first, deferred.second,
539                                                  current ? currentInterval : 0,
540                                                  justTheOne);
541    
542                        // Save the fetched value for the next request!
543                        if(upstreamValues.length > 0)
544                            {
545                            final EventVariableValue evv = upstreamValues[0];
546                            if(evv != null)
547                                {
548    if(IsDebug.isDebug) { System.out.println("[Post-fetched "+(current?"current":"all")+"-interval data for " + deferred + ".]"); }
549                                super.setEventValue(evv);
550                                }
551                            }
552                        }
553                    }
554    
555                // Consider a full fetch if there were no updated values returned.
556                lastFetch = ((newVals.length == 0) &&
557                        (0 == Rnd.fastRnd.nextInt(3 + ((GenUtils.mustConservePower()?171:101) * LocalProps.getServerSlowdownFactor())))) ?
558                    null : new Long(System.currentTimeMillis());
559                }
560    
561    //if(isDebugMode) { System.err.println("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") FINISHED]"); }
562            }
563    
564    
565        /**If true then generally don't go upstream (synchronously) just to fetch values for the current (or all) interval; respond from local store.
566         * We may nonetheless synchronously request current-interval data from upstream:
567         * <ul>
568         * <li>when asking for something else in the same request,
569         * <li>asynchronously when a particular event/period combination
570         *     has been asked for recently,
571         * <li>when we have nothing cached and would have to return empty-handed to the user,
572         * </ul>
573         * thus only making for a marginal extra cost.
574         * <p>
575         * If true, there is no guarantee that the "current" data reflects more than
576         * locally-originated events or will be hugely up-to-date,
577         * though the master's main copy will be.
578         */
579        private static final boolean DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY = true;
580    
581        /**Set of "current" interval event-values requested that we declined to fetch synchronously.
582         * We clear these as we asynchronously fetch the requested values.
583         * <p>
584         * A map from event variable definitions to
585         * a set of any deferred current-interval requests for that set
586         * (or to null if none have ever been deferred for that event type).
587         * <p>
588         * A Hashtable is used to guarantee basic thread safety.
589         * All manipulation of the contents of the table,
590         * and in particular of the non-threadsafe EnumSet values,
591         * must be done holding a lock on the table instance.
592         */
593        private final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> _deferredCIRequests =
594            new Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>>();
595    
596        /**Set of "all" interval event-values requested that we declined to fetch synchronously.
597         * We clear these as we asynchronously fetch the requested values.
598         * <p>
599         * A map from event variable definitions to
600         * a set of any deferred all-interval requests for that set
601         * (or to null if none have ever been deferred for that event type).
602         * <p>
603         * A Hashtable is used to guarantee basic thread safety.
604         * All manipulation of the contents of the table,
605         * and in particular of the non-threadsafe EnumSet values,
606         * must be done holding a lock on the table instance.
607         */
608        private final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> _deferredAIRequests =
609            new Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>>();
610    
611        /**Class definining all details of a deferred request.
612         * This comprises:
613         * <ul>
614         * <li>Event name/definition,
615         * <li>EventPeriod,
616         * <li>The all/current flag.
617         * </ul>
618         * <p>
619         * Equality depends on all fields.
620         */
621        private final class NextRequestKey
622            {
623            /**Event definition; never null. */
624            final SimpleVariableDefinition def;
625    
626            /**Event period; never null. */
627            final EventPeriod p;
628    
629            /**True if "all", else false for "current". */
630            final boolean isAll;
631    
632            public NextRequestKey(final SimpleVariableDefinition def,
633                                  final EventPeriod p,
634                                  final boolean isAll)
635                {
636                this.def = def;
637                this.p = p;
638                this.isAll = isAll;
639    
640                assert(p != null);
641                assert((def != null) && def.isEvent());
642                assert((def.getEvPeriodSubset() == null) ||
643                       def.getEvPeriodSubset().contains(p));
644                }
645    
646            /**The hash depends on all fields. */
647            @Override
648            public int hashCode()
649                { return(def.hashCode() ^ (isAll ? p.ordinal() : p.name().hashCode())); }
650    
651            /**Equality depends on all fields. */
652            @Override
653            public boolean equals(final Object obj)
654                {
655                if(!(obj instanceof NextRequestKey)) { return(false); }
656                final NextRequestKey other = (NextRequestKey) obj;
657                if(isAll != other.isAll) { return(false); }
658                if(!p.equals(other.p)) { return(false); }
659                if(!def.equals(other.def)) { return(false); }
660                return(true);
661                }
662            }
663    
664        /**Next permitted (deferred) request for a given all/current period data set.
665         * We defer repeated requests for the same item until either:
666         * <ul>
667         * <li>The start of the next interval for that item.
668         * <li>A reasonable fraction of the period for that item
669         *     has elapsed since the last request for it.
670         * </ul>
671         * <p>
672         * This means that we limit the cost of repeated requests for the same item
673         * in such a way that we still have a reasonable chance of seeing
674         * intra-interval updates.
675         * <p>
676         * The table is thread-safe and parameterised for quick, concurrent access.
677         */
678        private final Map<NextRequestKey,Long> _nextRequestNotBefore =
679            new ConcurrentHashMap<NextRequestKey, Long>(41, 0.7f, 4);
680    
681        /**Note that we have deferred a request for the given event and period.
682         *
683         * @param currentInterval  true for "current" interval,
684         *     false for "all" interval
685         * @param def  non-null event definition
686         * @param intervalSelector  non-null interval/period value
687         */
688        private final void _noteDeferredRequestForInterval(final boolean currentInterval,
689                                                           final SimpleVariableDefinition def,
690                                                           final EventPeriod intervalSelector)
691            {
692            if((def == null) || !def.isEvent() ||
693               (intervalSelector == null))
694                { throw new IllegalArgumentException(); }
695    
696            // Caller must be asking for valid period for this event type.
697            assert((def.getEvPeriodSubset() == null) ||
698                   def.getEvPeriodSubset().contains(intervalSelector));
699    
700    
701            // We may ignore rapid repeated requests for the same item,
702            // if the repeat is within a small fraction of the period.
703            final NextRequestKey key = new NextRequestKey(def, intervalSelector, !currentInterval);
704            final Long notBefore = _nextRequestNotBefore.get(key);
705            // Return if too early for another request for this value yet.
706            final long now = System.currentTimeMillis();
707            if((notBefore != null) && (notBefore.longValue() > now))
708                { return; }
709    
710            // Compute/record when it would next be OK to ask for an update.
711            // At most we put this off by a fraction of our temporal slackness,
712            // and as a minimum a substantial fraction of our sysvar latency.
713            final long nextNotBefore = now +
714                Math.min(CoreConsts.DEFAULT_TEMPORAL_SLACKNESS_S * 249,
715                    intervalSelector.getIntervalMs() >>> 4) +
716                Rnd.fastRnd.nextInt(SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS/2);
717            _nextRequestNotBefore.put(key, new Long(nextNotBefore));
718    
719    
720            // Select which deferred set we are working with...
721            final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> diSet =
722                currentInterval ? _deferredCIRequests : _deferredAIRequests;
723    
724            // Hold a lock briefly while we do our work.
725            synchronized(diSet)
726                {
727                // Get any existing Set of values.
728                final EnumSet<EventPeriod> s = diSet.get(def);
729                // If none then create and store a new Set with just this value.
730                if(s == null)
731                    {
732                    diSet.put(def, EnumSet.of(intervalSelector));
733                    return;
734                    }
735                // If non-null then just add this bit to the Set already held.
736                s.add(intervalSelector);
737                }
738            }
739    
740        /**Select, remove and return at random one of the deferred current/all interval requests; null if none.
741         * This is an atomic fetch-and-clear operation,
742         * designed to be used in association with
743         * _noteDeferredRequestForInterval().
744         *
745         * @param currentInterval  true for "current" interval,
746         *     false for "all" interval
747         * @return null or a pair with both elements valid and non-null
748         */
749        private Tuple.Pair<SimpleVariableDefinition, EventPeriod> _getAndClearDeferredRequestForInterval(
750                                                final boolean currentInterval)
751            {
752            // Select which deferred set we are working with...
753            final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> diSet =
754                currentInterval ? _deferredCIRequests : _deferredAIRequests;
755    
756            // Hold a lock briefly while we do our work.
757            synchronized(diSet)
758                {
759                // Return null immediately if no deferred requests pending.
760                final int n = diSet.size();
761                if(n == 0) { return(null); }
762    
763                // Pick one of the event types at random.
764                final SimpleVariableDefinition defs[] = new SimpleVariableDefinition[n];
765                diSet.keySet().toArray(defs);
766                final SimpleVariableDefinition def =
767                    defs[(n == 1) ? 0 : Rnd.fastRnd.nextInt(n)];
768    
769                // The def should be a valid event type.
770                assert(def != null);
771                assert(def.isEvent());
772    
773                // Get the Set of deferred requests for this event type.
774                final EnumSet<EventPeriod> s = diSet.get(def);
775    
776                // The Set should be non-null and non-empty.
777                assert(s != null);
778                assert(!s.isEmpty());
779    
780                // Pick one of the pending iterval/period values at random.
781                final int m = s.size();
782                final EventPeriod eps[] = new EventPeriod[m];
783                s.toArray(eps);
784                final EventPeriod ep =
785                    eps[(m == 1) ? 0 : Rnd.fastRnd.nextInt(m)];
786    
787                // We should have chosen a valid interval.
788                assert((def.getEvPeriodSubset() == null) ||
789                       def.getEvPeriodSubset().contains(ep));
790    
791                // Compose our result.
792                final Tuple.Pair<SimpleVariableDefinition, EventPeriod> result =
793                    new Tuple.Pair<SimpleVariableDefinition, EventPeriod>(def, ep);
794    
795                // Now remove our pending value,
796                // and indeed the whole Set if now empty.
797                if(m == 1) { diSet.remove(def); }
798                else { s.remove(ep); }
799    
800                return(result);
801                }
802            }
803    
804        /**Locks to prevent redundant concurrent fetches/updates of the same event type from upstream in getEventValues().
805         * The aim of this is to avoid making many fruitless upstream accesses
806         * over (for example) a slow or connection-limited medium such as an HTTP tunnel.
807         * <p>
808         * Locking at the per-def level probably admits almost all potential useful concurrency
809         * and is relatively simple.
810         * <p>
811         * We allow for a limited amount of concurrency in writing
812         * but we mainly use ConcurrentHashMap for thread-safety, read concurrency and putIfAbsent().
813         */
814        private final ConcurrentMap<SimpleVariableDefinition, ReentrantLock> _getEventValues_locks =
815            new ConcurrentHashMap<SimpleVariableDefinition, ReentrantLock>(2*SystemVariables.defs.size(), 0.5f, 4);
816    
817        /**Get event values; never null.
818         * We get what we can from our local store,
819         * then go upstream to fill in any unfilled requested slots
820         * for which we did not have authoritative data
821         * (and then attempt to cache them locally for next time).
822         * However, if unable to fetch authoritative data (quickly)
823         * then we will return null or non-authoritative data as appropriate for each slot;
824         * this does not throw IOException.
825         * <p>
826         * We treat the 'current' and 'all' intervals specially
827         * since upstream will never offer us an authoritative value
828         * for the current slot, only for older completed ones.
829         * We get requested values asynchronously,
830         * to be reasonably up-to-date for the <em>next</em> request.
831         * <p>
832         * If a request for a "local" event upstream fails with an IllegalArgumentException
833         * then we assume that upstream of us is a tunnel which local values don't cross
834         * so we simply report whatever value we have to hand.
835         *
836         * @param def  non-null event-variable definition
837         * @param intervalSelector  non-null valid interval for specified event
838         * @param whichValues  each true bit represents a slot for which data is
839         *     required, bit 0 indicating data from the slot within which
840         *     firstIntervalTime is located, bit 1 the previous slot, etc;
841         *     null is treated as the common case equivalent to just bit 0 set
842         *
843         * @throws IOException  if we have difficulty getting values from upstream
844         * @throws InterruptedIOException  if interrupted or if blocking for too long.
845         */
846        @Override
847        public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
848                                                   final EventPeriod intervalSelector,
849                                                   final long intervalNumber,
850                                                   final BitSet whichValues)
851            {
852    //System.out.println("PipelineVarMgr.getEventValues("+def+", "+intervalSelector+", "+intervalNumber+", "+whichValues+")");
853    
854            // Current interval number (never authoritative).
855            final long currentInterval = intervalSelector.getIntervalNumber(System.currentTimeMillis());
856    
857            // Optimisation fast-path for common/simple case:
858            // the single requested slot/value is available locally and is authoritative.
859            // If available from local store then we don't need to use our locks/throttling,
860            // else we fall through for our more general case.
861            final boolean nullWV = (whichValues == null);
862            if(nullWV ||
863               ((whichValues.cardinality() == 1) && whichValues.get(0)))
864                {
865                // Don't try to satisfy requests for the current/all slots here...
866                if((intervalNumber != 0) && (intervalNumber != currentInterval))
867                    {
868                    // Get what we can from local store.
869                    final EventVariableValue[] result =
870                        super.getEventValues(def, intervalSelector, intervalNumber, null);
871    
872                    if((result.length != 0) &&
873                        (result[0] != null) &&
874                        (result[0].isAuthoritative()))
875                        { return(result); /* Good, local cached value does the job. */ }
876                    }
877                }
878    
879            // Verify that this this def is an event and is genuine!
880            assert(def.isEvent());
881            assert(SystemVariables.defs.contains(def));
882    
883            // First get what we can from our local store/cache.
884            // This call will validate the method arguments too.
885            EventVariableValue[] result =
886                super.getEventValues(def, intervalSelector, intervalNumber, whichValues);
887    
888            // If any requested values are missing/null or non-authoritative,
889            // then try to go upstream for them (and then cache them locally).
890            BitSet wv = whichValues;
891            if(wv == null)
892                {
893                // Create synthetic BitSet for simplicity below.
894                wv = new BitSet(1);
895                wv.set(0);
896                }
897    
898            // Note any "missing"/non-authoritative values.
899            final BitSet nonAuthValues = new BitSet();
900            int resultsPresent = 0;
901            for(int i = wv.nextSetBit(0); i >= 0; i = wv.nextSetBit(i+1))
902                {
903                final boolean absent = ((i >= result.length) || (result[i] == null));
904                if(!absent) { ++resultsPresent; }
905                if(absent || !result[i].isAuthoritative())
906                    { nonAuthValues.set(i); }
907                }
908    
909            // If the entire request has been satisfied from local cache
910            // then we can return immediately!
911            if(nonAuthValues.length() == 0)
912                { return(result); }
913    
914            // Note if none of the request at all has been assembled from cache.
915            // In this case, we'll try harder to fetch values from upstream
916            // since we don't really want to return completely empty-handed.
917            final boolean noResultsFromCache = (resultsPresent == 0);
918    
919            // If the only non-authoritative answer is
920            // in the "current" or "all" slot
921            // (it never would be authoritative anyway)
922            // then return now what we have locally cached/available.
923            // Don't bother with the expense of going upstream synchronously
924            // providing we have some sort of answer from cache
925            // but just note that the value was requested and deferred.
926            // We will go and fetch a fresh value soon, in the background.
927            if(DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY &&
928                    (!noResultsFromCache) &&
929                    (nonAuthValues.cardinality() == 1))
930                {
931                final int which = nonAuthValues.length()-1; // Index of the one bit that is set...
932    
933                if(intervalNumber == 0)
934                    {
935                    _noteDeferredRequestForInterval(false, def, intervalSelector);
936                    return(result);
937                    }
938                else if(intervalNumber - which == currentInterval)
939                    {
940                    _noteDeferredRequestForInterval(true, def, intervalSelector);
941                    return(result);
942                    }
943                }
944    
945            // THIS REQUEST WILL NEED TO GO UPSTREAM TO BE COMPLETELY SATISFIED.
946    
947            // We allow at most one upstream access per def at once
948            // (through this front-end)
949            // to try to avoid overloading any tunnel or other choke-point.
950            // The lock is held around the underlying upstream call,
951            // even if that continues asynchronously out of sight from here.
952            // Get the unique lock for this event/def,
953            // with race-free create on first use.
954            ReentrantLock _getEventValues_lock;
955            while(null == (_getEventValues_lock = _getEventValues_locks.get(def)))
956                {
957                // Ensure no races for creation of a lock instance,
958                // ie at most one lock will ever be current for one def.
959                _getEventValues_locks.putIfAbsent(def, new ReentrantLock());
960                }
961            final ReentrantLock getEventValues_lock = _getEventValues_lock;
962            // Give up quickly if there is already a queue for the lock for this def,
963            // ie if we might well fail to get to make our call within a reasonable time.
964            if(getEventValues_lock.hasQueuedThreads())
965                {
966                return(result); // Return what we had collected so far, if anything.
967                }
968    
969            // Make sure that the result value is large enough to accommodate
970            // all the results requested.
971            // If not then replace it with a large enough one and copy existing results in.
972            final int wvl = nullWV ? 1 : whichValues.length();
973            if(result.length < wvl)
974                {
975                final EventVariableValue newResult[] = new EventVariableValue[wvl];
976                System.arraycopy(result, 0, newResult, 0, result.length);
977                result = newResult;
978                }
979    
980    //System.out.println("PipelineVarMgr.getEventValues() upstream request: " + nonAuthValues);
981    
982            // Make an upstream request for the slots/intervals
983            // where our local store did not have authoritative answers.
984            final BitSet upstreamRequest = (BitSet) nonAuthValues.clone(); // Logically immutable...
985            // We make an effort to prevent this request blocking indefinitely
986            // (though if the thread pool fills then we end up running synchronously in this thread).
987            final Future<EventVariableValue[]> fetchedValues = ThreadUtils.nonCPUThreadPool.submit(new Callable<EventVariableValue[]>(){
988                public EventVariableValue[] call()
989                    {
990                    // Now try to grab the lock associated with this def,
991                    // but we are only prepared to block for a very short while
992                    // (short enough time not to annoy interactive users for example).
993                    // If we can't get the lock quickly then we abort.
994                    // Try harder if we'd otherwise have nothing at all to return to the original caller.
995                    try
996                        {
997                        if(!getEventValues_lock.tryLock(1 + (noResultsFromCache ? (CoreConsts.MAX_INTERACTIVE_DELAY_MS*2) : (CoreConsts.MAX_INTERACTIVE_DELAY_MS/2)), TimeUnit.MILLISECONDS))
998                            {
999                            System.err.println("INFO: could not quickly acquire lock in PipelineVarMgr.getEventValues() for "+def);
1000                            return(new EventVariableValue[0]); // Abort: return empty result.
1001                            }
1002                        }
1003                    catch(final InterruptedException e)
1004                        {
1005                        return(new EventVariableValue[0]); // Abort: return empty result.
1006                        }
1007    
1008                    try {
1009                        // Get the data from upstream.
1010                        final EventVariableValue[] upstreamValues = source.getEventValues(def, intervalSelector, intervalNumber, upstreamRequest);
1011    
1012                        // Attempt to cache results here to avoid simply discarding them
1013                        // if the caller has to give up waiting for us
1014                        // before we've finished the RPC or handling/cacheing the response.
1015                        if(upstreamValues.length > 0)
1016                            {
1017                            // Offer the values that we requested from upstream to our local store
1018                            // and then merge them into our initial response.
1019                            for(int i = upstreamRequest.nextSetBit(0); i >= 0; i = upstreamRequest.nextSetBit(i+1))
1020                                {
1021                                if(i >= upstreamValues.length) { break; }
1022                                final EventVariableValue evv = upstreamValues[i];
1023                                // Skip missing responses.
1024                                if(evv == null) { continue; }
1025                                // Skip dubious claims to authoritative answers for the current interval or the future (!)
1026                                // which might be the result of a race or clock skew, etc...
1027                                if((evv.getIntervalNumber() >= currentInterval) && evv.isAuthoritative()) { continue; }
1028    
1029                                // Offer upstream value to the local store, ie cache it...
1030                                /*super.*/setEventValue(evv);
1031                                }
1032                            }
1033    
1034                        return(upstreamValues); // Return values to hand back to the original caller.
1035                        }
1036                    finally { getEventValues_lock.unlock(); }
1037                    }
1038                });
1039    
1040            EventVariableValue upstreamValues[];
1041            // Spend a limited time (much longer than we waited for the lock) fetching from upstream.
1042            // We allow at least long enough for a round-the-world RTT to marshal and send the data.
1043            try { upstreamValues = fetchedValues.get(Math.max(CoreConsts.MAX_TYPICAL_RPC_RTT_MS, 2*CoreConsts.MAX_INTERACTIVE_DELAY_MS), TimeUnit.MILLISECONDS); }
1044            // If the upstream result was not ready in time then return what we already had.
1045            catch(final TimeoutException e)
1046                {
1047                // Need to log failures to allow some system tuning...
1048                System.err.println("WARNING: PipelineVarMgr timed out fetch of " + def + " reason: " + e.getMessage());
1049    
1050                // Synchronously and without time limit retry a minimal subset of the original request
1051                // in case marshalling/transmission time (of the full request)
1052                // would always otherwise prevent completion.
1053                // This way we may make incremental progress.
1054                // Try for just the first (newest) item requested when we timed out.
1055                // Note that we do this without the normal cover of our per-def concurrency lock/limit.
1056                // TODO: Try a random subset in case of chronic difficulties with particular slots.
1057                if(upstreamRequest.length() > 1)
1058                    {
1059                    final int firstNonAuth = upstreamRequest.nextSetBit(0);
1060                    assert(firstNonAuth >= 0);
1061                    // Discard all but the first set bit.
1062                    final BitSet upstreamRequestMin = (BitSet) upstreamRequest.clone();
1063                    upstreamRequestMin.clear(firstNonAuth+1, upstreamRequestMin.length());
1064                    assert(upstreamRequestMin.cardinality() == 1); // Should be exactly one bit set now.
1065                    // Don't put a time limit on this to try to force/allow some progress.
1066                    upstreamValues = source.getEventValues(def, intervalSelector, intervalNumber, upstreamRequestMin);
1067                    // Make sure that this result, if any (and sane) is cached below...
1068                    // Fall through to use whatever we managed to fetch this time.
1069                    }
1070                // No subset is possible so return result collected so far.
1071                else { return(result); }
1072                }
1073            // If we were interrupted then return what we had.
1074            catch(final InterruptedException e)
1075                {
1076                Thread.currentThread().interrupt(); // Don't lose "interruptedness".
1077                return(result);
1078                }
1079            // Log unexpected error and return what we already had.
1080            catch(final ExecutionException e)
1081                {
1082                e.printStackTrace(); // Shouldn't happen, so log it.
1083                return(result);
1084                }
1085    
1086            // Merge the upstream results into our initial response.
1087            for(int i = upstreamRequest.nextSetBit(0); i >= 0; i = upstreamRequest.nextSetBit(i+1))
1088                {
1089                if(i >= upstreamValues.length) { break; }
1090                final EventVariableValue evv = upstreamValues[i];
1091                // Skip missing responses.
1092                if(evv == null) { continue; }
1093                // Skip dubious claims to authoritative answers for the current interval or the future (!)
1094                // which might be the result of a race or clock skew, etc...
1095                if((evv.getIntervalNumber() >= currentInterval) && evv.isAuthoritative()) { continue; }
1096    
1097                // Put upstream value into our response.
1098                result[i] = evv;
1099    
1100                // Offer upstream value to the local store, ie cache it, in case not already done...
1101                super.setEventValue(evv);
1102                }
1103    
1104            // OK, respond as well as we can...
1105            return(result);
1106            }
1107        }