001    /*
002    Copyright (c) 1996-2012, Damon Hart-Davis
003    All rights reserved.
004    
005    Redistribution and use in source and binary forms, with or without
006    modification, are permitted provided that the following conditions are
007    met:
008    
009      * Redistributions of source code must retain the above copyright
010        notice, this list of conditions and the following disclaimer.
011    
012      * Redistributions in binary form must reproduce the above copyright
013        notice, this list of conditions and the following disclaimer in the
014        documentation and/or other materials provided with the
015        distribution.
016    
017    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028    */
029    package org.hd.d.pg2k.svrCore.vars;
030    
031    import java.io.IOException;
032    import java.io.InterruptedIOException;
033    import java.util.ArrayDeque;
034    import java.util.ArrayList;
035    import java.util.BitSet;
036    import java.util.Deque;
037    import java.util.EnumSet;
038    import java.util.Hashtable;
039    import java.util.Iterator;
040    import java.util.List;
041    import java.util.Map;
042    import java.util.concurrent.Callable;
043    import java.util.concurrent.ConcurrentHashMap;
044    import java.util.concurrent.ConcurrentMap;
045    import java.util.concurrent.ExecutionException;
046    import java.util.concurrent.Future;
047    import java.util.concurrent.TimeUnit;
048    import java.util.concurrent.TimeoutException;
049    import java.util.concurrent.locks.ReentrantLock;
050    
051    import org.hd.d.pg2k.svrCore.CoreConsts;
052    import org.hd.d.pg2k.svrCore.GenUtils;
053    import org.hd.d.pg2k.svrCore.Rnd;
054    import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
055    import org.hd.d.pg2k.svrCore.ThreadUtils;
056    import org.hd.d.pg2k.svrCore.Tuple;
057    import org.hd.d.pg2k.svrCore.props.LocalProps;
058    
059    import ORG.hd.d.IsDebug;
060    
061    
062    /**Class to "manage" variables for an element of a SimpleExhibitPipelineIF.
063     * Caches values to minimise read (and optionally write) activity.
064     * <p>
065     * Always maintains a local store of variables,
066     * and normally sync() needs to be called periodically
067     * to merge with upstream values.
068     * <p>
069     * Typical places to inject one of these is in a caching stage,
070     * or at pipeline or tunnel end-points.
071     * <p>
072     * A non-write-though instance has the effect of grouping multiple
073     * setVariable() and setVariables() operations into a single
074     * setVariables() upstream action when sync() is invoked.
075     * This may still write-though in some circumstances.
076     * <p>
077     * In no case does a getVariable() or getVariables() operation
078     * directly cause a request to go upstream,
079     * (thus helping to ensure that get operations are fast even if there
080     * are I/O problems (etc) upstream),
081     * but rather the set of values is
082     * updated periodically with a single getVariables() operation.
083     * <p>
084     * FIXME: getEventValues() avoids, where possible, blocking indefinitely,
085     * and may return non-authoritative data instead,
086     * though may not be able to avoid such blocking in all circumstances.
087     * <p>
088     * Thread-safe; does not hold any externally-visible locks.
089     * <p>
090     * This does not count as an "end-point", ie is never the master data store.
091     */
092    public class PipelineVarMgr extends BasicVarMgr
093                                implements BasicVarMgrInterface,
094                                           SimpleVariablePipelineIF
095        {
096        /**If true, always use getVariables(-1) to get all values from upstream.
097         * Inefficient but very robust; generally not the right thing to do though.
098         */
099        private static final boolean ALWAYS_FETCH_ALL_UPSTREAM_VALUES = false;
100    
101        /**Create a default PipelineVarMgr instance.
102         * Defaults to write-through (ie read caching only),
103         * because that is "safer" behaviour.
104         * @param logger  logger to write to; null if no logging to be done
105         * @param upstream  upstream source of data and variable values;
106         *     never null
107         */
108        public PipelineVarMgr(final SimpleLoggerIF logger,
109                              final SimpleVariablePipelineIF upstream)
110            {
111            this(logger, upstream, true); // Write-through.
112            }
113    
114        /**Creates a customised VarMgr instance.
115         * May try to get the local system ID immediately from upstream,
116         * so that it is available for creating globals.
117         * @param logger  logger to write to; null if no logging to be done
118         * @param upstream  upstream source of data and variable values;
119         *     never null
120         * @param writeThrough  if true, all set operations are propagated
121         *     downstream immediately
122         */
123        public PipelineVarMgr(final SimpleLoggerIF logger,
124                              final SimpleVariablePipelineIF upstream, final boolean writeThrough)
125            {
126            super(logger, false, false);
127            assert(!super.isEndPoint()); // This is not an end point.
128            assert(!super.dontFilterDups); // Bad timestamps and replays must be being checked for.
129    
130            if(upstream == null)
131                { throw new IllegalArgumentException(); }
132    
133            // If this cache is write-through then we don't need an outbound store.
134            outboundUpdates = writeThrough ? null : new Hashtable<SimpleVariableDefinition, SimpleVariableValue>();
135            earlyUpdates = writeThrough ? null : new ArrayList<SimpleVariableValue>();
136            source = upstream;
137    
138            if(GET_SYSID_IN_CONS)
139                {
140                // Get the local system ID from upstream
141                // and save it since we shall need it when setting globals.
142                try
143                    {
144                    final SimpleVariableValue svv =
145                        upstream.getVariable(SystemVariables.LOCAL_SYS_ID);
146                    if(svv == null)
147                        { throw new IllegalStateException("unable to fetch system ID from upstream"); }
148                    super._setVariable(svv, true);
149                    assert(null != super.getVariable(SystemVariables.LOCAL_SYS_ID));
150                    }
151                catch(final IOException e)
152                    {
153                    // Ignore (but log) error; hope to get value later...
154                    logger.log("unable to get system ID in cons", e);
155                    }
156                }
157            }
158    
159        /**If true, try in the constructor to get the local system ID from upstream.
160         * This helps us build globalMaps,
161         * and also ensures that the variable is available to clients downstream.
162         */
163        private final static boolean GET_SYSID_IN_CONS = true;
164    
165        /**Upstream source of variable values; never null. */
166        private final SimpleVariablePipelineIF source;
167    
168        /**Returns true if this is a write-through cache. */
169        public boolean isWriteThrough()
170            { return(outboundUpdates == null); }
171    
172        /**Cached outbound variable value updates to be sent upstream.
173         * Will not be used (and will be null) if in write-through mode.
174         * <p>
175         * Used by:
176         * <ul>
177         * <li>setVariable() (to add values)
178         * <li>sync() to push accumulated values upstream
179         * </ul>
180         * <p>
181         * Variable updates are held in this table and periodically flushed
182         * upstream in a block.  If multiple updates to a single variable
183         * occur between flushes, only the last one will be retained,
184         * (the others overwritten) thus reducing traffic.
185         * These may be re-ordered with respect to one another when flushed.
186         * <p>
187         * Note however that one value cannot be overwritten by another if
188         * they have different globalMap keys.
189         * This can only really happen on the master when simultaneous updates
190         * arrive from two different slaves for the same global value.
191         * In this case, we force the previous value upstream immediately.
192         * <p>
193         * A lock must be held on this object to do multiple operations atomically,
194         * but no other visible lock must be grabbed inside a lock on this.
195         * <p>
196         * Hashtable from SimpleVariableDefinition to SimpleVariableValue.
197         * <p>
198         * A Hashtable is used for thread-safety.
199         * <p>
200         * Never null.
201         */
202        private final Hashtable<SimpleVariableDefinition,SimpleVariableValue> outboundUpdates;
203    
204        /**A List of non-coalesced variable set operations to be done first.
205         * These are variables that could not be coalesced for whatever reason,
206         * and must be passed upstream ahead of anything in outboundUpdates.
207         * <p>
208         * This will be non-null when outboundUpdates is.
209         * <p>
210         * The content of this List must only be accessed in the scope of a lock
211         * on outboundUpdates, so this List implementation need not be inherently
212         * thread-safe (eg ArrayList can (and should) be used instead of Vector).
213         */
214        private final List<SimpleVariableValue> earlyUpdates;
215    
216    
217        /**Last time we successfully fetched variable values from upstream; initially null.
218         * Declared as volatile so allow safe access without a lock.
219         */
220        private volatile Long lastFetch;
221    
222        /**Set a single variable value.
223         * If writing though,
224         * then set the value upstream immediately,
225         * else simply put it in our "pending" list waiting for sync().
226         *
227         * @param newValue  new variable value to set; never null
228         * @throws java.io.IOException in case of difficulty setting the value upstream
229         */
230        @Override
231        public void setVariable(final SimpleVariableValue newValue)
232            throws IOException
233            {
234            // Store in our local cache before doing anything else...
235            super._setVariable(newValue, false);
236    
237            // Since we seem to have set the value OK
238            // (so it type-checked, for example),
239            // add to outgoing update list immediately,
240            // or write-though if appropriate.
241    
242            // Write-though.
243            if(outboundUpdates == null)
244                {
245                source.setVariable(newValue);
246                return;
247                }
248    
249            // Accumulate and wait for sync().
250            // Attempt to merge redundant (non-event) operations for efficiency.
251            synchronized(outboundUpdates)
252                {
253                // But for now we never coalesce events.
254                final SimpleVariableDefinition def = newValue.getDef();
255                if(def.isEvent())
256                    {
257                    earlyUpdates.add(newValue);
258                    return;
259                    }
260    
261                // Note however that one value cannot be overwritten by another
262                // iff they have different globalMap keys (or null-ness).
263                // This happens on the master when simultaneous updates
264                // arrive from two different slaves for the same global value.
265                // In this case, we force the previous value upstream now.
266                final SimpleVariableValue prevHeld =
267                        outboundUpdates.put(def, newValue);
268    
269                // Have to handle coalescing if there was a previous value
270                // for this variable.
271                if(prevHeld != null)
272                    {
273                    // Whoops, may conflict...
274                    // If global maps have different keys, or null-ness,
275                    // then they do conflict,
276                    // and we must flush the previous value
277                    // into the earlyUpdates list to go first,
278                    // else we can discard the previous version.
279                    final Map<?,?> nGM = newValue.getGlobalMap();
280                    final Map<?,?> pGM = prevHeld.getGlobalMap();
281                    final boolean differentMapKeys = (nGM == null) ?
282                            (pGM != null) :
283                            ((pGM == null) || !nGM.keySet().equals(pGM.keySet()));
284                    if(differentMapKeys)
285                        { earlyUpdates.add(prevHeld); }
286                    }
287                }
288            }
289    
290        /**Set several variable values.
291         * This is equivalent to set series of calls to setVariables()
292         * unless in write-through mode where a single setVariables()
293         * is done upstream for efficiency.
294         *
295         * @return number of values successfully set, at least locally...
296         * @param newValues  new variables values to set; never null
297         * @throws java.io.IOException in case of difficulty setting the values upstream
298         */
299        @Override
300        public int setVariables(final SimpleVariableValue newValues[])
301            throws IOException
302            {
303            if(newValues == null)
304                { throw new IllegalArgumentException(); }
305    
306            if(outboundUpdates != null)
307                {
308                // If not writing-through,
309                // then put in "outbound" list,
310                // being careful to preserve order,
311                // but eliminating duplicates.
312                final int nvLen = newValues.length;
313                for(int i = 0; i < nvLen; ++i)
314                    { setVariable(newValues[i]); }
315                return(newValues.length);
316                }
317            else
318                {
319                // If writing through,
320                // stick to single setVariables() call for efficiency.
321                super.setVariables(newValues);
322                return(source.setVariables(newValues));
323                }
324            }
325    
326    
327        /**Private lock to serialise calls on sync(). */
328        private final Object _sync_lock = new Object();
329    
330        /**Maximum number of set/update values to send upstream in one go; strictly positive.
331         * This limit is here to avoid vast on-the-wire messages straining sender and receiver,
332         * especially in the case where the receiver has been down for a while
333         * and so we have accumulated many messages.
334         * <p>
335         * More messages in each batch may be more efficient in terms of
336         * message overhead, inter-value compression, etc.
337         */
338        private static final int MAX_UPDATE_BATCH_COUNT = 256;
339    
340        /**Synchronise variables with upstream values.
341         * We push updated values upstream to the source,
342         * call sync on the source if this sync is "forced",
343         * and then retrieve changed values from upstream.
344         * <p>
345         * We remember when we last did this and request any changes since then,
346         * except on the first call or if called with force==true,
347         * in which case all values are retrieved.
348         * <p>
349         * To allow for (varying) clock skew between different server instances,
350         * we actually ask for somewhat older values than our last poll,
351         * which will result in some redundant traffic.
352         * <p>
353         * Holds no externally-visible locks,
354         * and does not hold locks while communicating upstream
355         * (so can handle set/get operations even if upstream is slow/blocking),
356         * but if called by multiple threads this will safely serialise the calls.
357         *
358         * @param force  if true, this will force a full sync on the read side
359         *     by using getVariables(-1) rather than attempting to choose a
360         *     nearer timestamp for efficiency;
361         *     the implementation is at liberty to use getVariables(-1)
362         *     at any time whatever the argument value,
363         *     and almost certainly should use it on the first call
364         *
365         * @throws java.io.IOException if one is received from upstream
366         */
367        public void syncVariables(final boolean force)
368            throws IOException
369            {
370    //if(isDebugMode) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+").]"); }
371    
372            // Serialise calls to sync() to avoid out-of-order updates...
373            synchronized(_sync_lock)
374                {
375                // Push locally-cached updates upstream,
376                // if there are any pending,
377                // and clear those pending values.
378                if(outboundUpdates != null)
379                    {
380                    // Get any earlyUpdates (first)
381                    // then any other outboundUpdates,
382                    // and assemble them into a single update to send upstream.
383                    // Upon failure, put them at the start of the (early) list to resend
384                    // in case new items have arrived.
385                    //
386                    // We *do not* hold the lock while communicating upstream.
387                    Deque<SimpleVariableValue> toGoUpstreamAll = null;
388                    synchronized(outboundUpdates)
389                        {
390                        // Atomically retrieve current pending updates, if any,
391                        // and clear the pending updates.
392                        // We can put them back afterwards if we have to, sort of.
393                        final int size = itemsQueuedForUpstream();
394    if(IsDebug.isDebug && (size > 0)) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+"): outboundUpdates.size()="+size+".]"); }
395    //if(isDebugMode && (size > 0)) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+"): outboundUpdates.size()="+size+".]"); }
396                        if(size > 0)
397                            {
398                            // Assemble updates in order, earlyUpdates first,
399                            // ready to send upstream.
400                            toGoUpstreamAll = new ArrayDeque<SimpleVariableValue>(size);
401                            toGoUpstreamAll.addAll(earlyUpdates);
402                            toGoUpstreamAll.addAll(outboundUpdates.values());
403    
404                            // Clear the originals...
405                            earlyUpdates.clear();
406                            outboundUpdates.clear();
407                            }
408                        }
409    
410                    if(toGoUpstreamAll != null)
411                        {
412                        // Attempt to send the outstanding values upstream...
413                        // However, if this fails due to an I/O problem,
414                        // then try to reinsert any not-yet-overwritten values
415                        // at the start of the earlyUpdates
416                        // so that they will go before any later arrivals.
417                        try
418                            {
419                            // We try to send updates in small-ish batches
420                            // to avoid blowing up the recipient if we have accumulated many.
421                            while(!toGoUpstreamAll.isEmpty())
422                                {
423                                // Extract the first updates and try to send them.
424                                final int toSend = Math.min(toGoUpstreamAll.size(), MAX_UPDATE_BATCH_COUNT);
425                                final SimpleVariableValue[] updates = new SimpleVariableValue[toSend];
426                                // Copy (not remove) the first 'toSend' updates to the array...
427                                final Iterator<SimpleVariableValue> iterator = toGoUpstreamAll.iterator();
428                                for(int i = 0; i < toSend; ++i)
429                                    { updates[i] = iterator.next(); }
430                                // Try to send upstream...
431    if(IsDebug.isDebug) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sending batch of updates upstream: "+toSend+"/"+toGoUpstreamAll.size()+".]"); }
432                                source.setVariables(updates);
433                                // If that was completely successful
434                                // then remove the initial items from the start of the list.
435                                // TODO: do this in a block to overcome the inefficiency of many calls.
436                                for(int i = 0; i < toSend; ++i)
437                                    { toGoUpstreamAll.removeFirst(); }
438                                }
439                            }
440                        catch(final IOException e)
441                            {
442                            // In case of error,
443                            // requeue anything that may not have been successfully received
444                            // and is not too old for the receiver to handle
445                            // to keep our queue size-bounded.
446    
447                            // Filter out anything too old.
448                            final long now = System.currentTimeMillis();
449                            final long tooOld = now - BasicVarMgr.MAX_APPARENT_MESSAGE_AGE_MS;
450                            // Ordinary non-event values can be dumped sooner...
451                            final long tooOldNonEvent = now - 101 - 2*SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS;
452                            // TODO: maybe make sure that we don't somehow have anything (too far) in the future?
453    
454                            final List<SimpleVariableValue> toGoUpstreamFiltered =
455                                    new ArrayList<SimpleVariableValue>(toGoUpstreamAll.size());
456                            for(final SimpleVariableValue svv : toGoUpstreamAll)
457                                {
458                                if(svv == null) { continue; }
459                                if(svv.getTimestamp() < tooOld) { continue; }
460                                if(!svv.getDef().isEvent() && (svv.getTimestamp() < tooOldNonEvent)) { continue; }
461                                toGoUpstreamFiltered.add(svv);
462                                }
463    
464    if(IsDebug.isDebug) { logger.log("WARNING: PipelineVarMgr: values to resend up stream: " + toGoUpstreamFiltered.size()); }
465    
466                            // Protect access to earlyUpdates...
467                            synchronized(outboundUpdates)
468                                {
469                                // Insert the not-too-old items in order at the start...
470                                earlyUpdates.addAll(0, toGoUpstreamFiltered);
471                                }
472    //if(isDebugMode) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") got IOException.]", e); }
473                            throw e; // Rethrow the error...
474                            }
475                        }
476                    }
477    
478                // Now force any changes all the way to the master's end-point
479                // iff this is a "forced" sync().
480                if(force) { source.syncVariables(true); }
481    
482    //if(isDebugMode) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") about to get...]"); }
483    
484                // Now retrieve any values from downstream.
485                // For the moment we'll assume that our local clock,
486                // plus the cache hold time used as a skew corrector,
487                // will ensure that we don't miss anything
488                // at the cost of a few wasted/redundant transfers,
489                // though:
490                //   * on the first run, or
491                //   * if "force"d, or
492                //   * we got nothing on our last poll,
493                // then we fetch everything with a getVariables(-1).
494                final Long lastTime = lastFetch; // Fetch value just once.
495                final boolean firstRun = (lastTime == null);
496                final boolean getAll = (ALWAYS_FETCH_ALL_UPSTREAM_VALUES || force || firstRun);
497                final SimpleVariableValue[] newVals =
498                    source.getVariables(getAll ? -1 :
499                        (lastTime.longValue() - CoreConsts.MAX_PEER_CLOCK_SKEW_MS - 1));
500                // Apply the values fetched...
501                // (For safety, we discard any that appear to be invalid.)
502                for(int i = newVals.length; --i >= 0; )
503                    {
504                    final SimpleVariableValue svv = newVals[i];
505                    final SimpleVariableDefinition def = svv.getDef();
506                    if(!SystemVariables.defs.contains(def))
507                        {
508                        logger.log("PipelineVarMgr: ERROR: upstream variable value rejected: " + svv.getDef());
509                        continue;
510                        }
511    
512                    // Store the new value in our local cache,
513                    // but don't have it propagate back upstream!
514                    // Note that it is from upstream,
515                    // so read-only values *can* be set in our cache.
516                    super._setVariable(svv, true);
517                    }
518    
519    
520                // If we have any deferred requests for current/all interval events,
521                // then attempt to fetch and store/cache one at random now
522                // in the hope of being able to satisfy the next request for it.
523                // If something goes wrong and we "lose" one of these
524                // deferred requests, that is OK, since we made no promises
525                // about the timeliness of current-interval data.
526                for(final boolean current : new boolean[]{ false, true } )
527                    {
528                    final Tuple.Pair<SimpleVariableDefinition, EventPeriod> deferred =
529                        _getAndClearDeferredRequestForInterval(current);
530                    if(deferred != null)
531                        {
532                        // Current interval, which we never expect to be authoritative...
533                        final long currentInterval =
534                            deferred.second.getIntervalNumber(System.currentTimeMillis());
535    
536                        final BitSet justTheOne = new BitSet(1);
537                        justTheOne.set(0); // Just get the one current value set.
538                        final EventVariableValue upstreamValues[] =
539                            source.getEventValues(deferred.first, deferred.second,
540                                                  current ? currentInterval : 0,
541                                                  justTheOne);
542    
543                        // Save the fetched value for the next request!
544                        if(upstreamValues.length > 0)
545                            {
546                            final EventVariableValue evv = upstreamValues[0];
547                            if(evv != null)
548                                {
549    if(IsDebug.isDebug) { logger.log("[Post-fetched "+(current?"current":"all")+"-interval data for " + deferred + ".]"); }
550                                super.setEventValue(evv);
551                                }
552                            }
553                        }
554                    }
555    
556                // Consider a full fetch if there were no updated values returned.
557                lastFetch = ((newVals.length == 0) &&
558                        (0 == Rnd.fastRnd.nextInt(3 + ((GenUtils.mustConservePower()?171:101) * LocalProps.getServerSlowdownFactor())))) ?
559                    null : new Long(System.currentTimeMillis());
560                }
561    
562    //if(isDebugMode) { logger.log("[PipelineVarMgr "+System.identityHashCode(this)+": sync("+force+") FINISHED]"); }
563            }
564    
565        /**Returns an estimate of the number of updates queued to go upstream; non-negative. */
566        public int itemsQueuedForUpstream()
567            {
568            synchronized(outboundUpdates) { return(earlyUpdates.size() + outboundUpdates.size()); }
569            }
570    
571    
572        /**If true then generally don't go upstream (synchronously) just to fetch values for the current (or all) interval; respond from local store.
573         * We may nonetheless synchronously request current-interval data from upstream:
574         * <ul>
575         * <li>when asking for something else in the same request,
576         * <li>asynchronously when a particular event/period combination
577         *     has been asked for recently,
578         * <li>when we have nothing cached and would have to return empty-handed to the user,
579         * </ul>
580         * thus only making for a marginal extra cost.
581         * <p>
582         * If true, there is no guarantee that the "current" data reflects more than
583         * locally-originated events or will be hugely up-to-date,
584         * though the master's main copy will be.
585         */
586        private static final boolean DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY = true;
587    
588        /**Set of "current" interval event-values requested that we declined to fetch synchronously.
589         * We clear these as we asynchronously fetch the requested values.
590         * <p>
591         * A map from event variable definitions to
592         * a set of any deferred current-interval requests for that set
593         * (or to null if none have ever been deferred for that event type).
594         * <p>
595         * A Hashtable is used to guarantee basic thread safety.
596         * All manipulation of the contents of the table,
597         * and in particular of the non-threadsafe EnumSet values,
598         * must be done holding a lock on the table instance.
599         */
600        private final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> _deferredCIRequests =
601            new Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>>();
602    
603        /**Set of "all" interval event-values requested that we declined to fetch synchronously.
604         * We clear these as we asynchronously fetch the requested values.
605         * <p>
606         * A map from event variable definitions to
607         * a set of any deferred all-interval requests for that set
608         * (or to null if none have ever been deferred for that event type).
609         * <p>
610         * A Hashtable is used to guarantee basic thread safety.
611         * All manipulation of the contents of the table,
612         * and in particular of the non-threadsafe EnumSet values,
613         * must be done holding a lock on the table instance.
614         */
615        private final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> _deferredAIRequests =
616            new Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>>();
617    
618        /**Class definining all details of a deferred request.
619         * This comprises:
620         * <ul>
621         * <li>Event name/definition,
622         * <li>EventPeriod,
623         * <li>The all/current flag.
624         * </ul>
625         * <p>
626         * Equality depends on all fields.
627         */
628        private final class NextRequestKey
629            {
630            /**Event definition; never null. */
631            final SimpleVariableDefinition def;
632    
633            /**Event period; never null. */
634            final EventPeriod p;
635    
636            /**True if "all", else false for "current". */
637            final boolean isAll;
638    
639            public NextRequestKey(final SimpleVariableDefinition def,
640                                  final EventPeriod p,
641                                  final boolean isAll)
642                {
643                this.def = def;
644                this.p = p;
645                this.isAll = isAll;
646    
647                assert(p != null);
648                assert((def != null) && def.isEvent());
649                assert((def.getEvPeriodSubset() == null) ||
650                       def.getEvPeriodSubset().contains(p));
651                }
652    
653            /**The hash depends on all fields. */
654            @Override
655            public int hashCode()
656                { return(def.hashCode() ^ (isAll ? p.ordinal() : p.name().hashCode())); }
657    
658            /**Equality depends on all fields. */
659            @Override
660            public boolean equals(final Object obj)
661                {
662                if(!(obj instanceof NextRequestKey)) { return(false); }
663                final NextRequestKey other = (NextRequestKey) obj;
664                if(isAll != other.isAll) { return(false); }
665                if(!p.equals(other.p)) { return(false); }
666                if(!def.equals(other.def)) { return(false); }
667                return(true);
668                }
669            }
670    
671        /**Next permitted (deferred) request for a given all/current period data set.
672         * We defer repeated requests for the same item until either:
673         * <ul>
674         * <li>The start of the next interval for that item.
675         * <li>A reasonable fraction of the period for that item
676         *     has elapsed since the last request for it.
677         * </ul>
678         * <p>
679         * This means that we limit the cost of repeated requests for the same item
680         * in such a way that we still have a reasonable chance of seeing
681         * intra-interval updates.
682         * <p>
683         * The table is thread-safe and parameterised for quick, concurrent access.
684         */
685        private final Map<NextRequestKey,Long> _nextRequestNotBefore =
686            new ConcurrentHashMap<NextRequestKey, Long>(41, 0.7f, 4);
687    
688        /**Note that we have deferred a request for the given event and period.
689         *
690         * @param currentInterval  true for "current" interval,
691         *     false for "all" interval
692         * @param def  non-null event definition
693         * @param intervalSelector  non-null interval/period value
694         */
695        private final void _noteDeferredRequestForInterval(final boolean currentInterval,
696                                                           final SimpleVariableDefinition def,
697                                                           final EventPeriod intervalSelector)
698            {
699            if((def == null) || !def.isEvent() ||
700               (intervalSelector == null))
701                { throw new IllegalArgumentException(); }
702    
703            // Caller must be asking for valid period for this event type.
704            assert((def.getEvPeriodSubset() == null) ||
705                   def.getEvPeriodSubset().contains(intervalSelector));
706    
707    
708            // We may ignore rapid repeated requests for the same item,
709            // if the repeat is within a small fraction of the period.
710            final NextRequestKey key = new NextRequestKey(def, intervalSelector, !currentInterval);
711            final Long notBefore = _nextRequestNotBefore.get(key);
712            // Return if too early for another request for this value yet.
713            final long now = System.currentTimeMillis();
714            if((notBefore != null) && (notBefore.longValue() > now))
715                { return; }
716    
717            // Compute/record when it would next be OK to ask for an update.
718            // At most we put this off by a fraction of our temporal slackness,
719            // and as a minimum a substantial fraction of our sysvar latency.
720            final long nextNotBefore = now +
721                Math.min(CoreConsts.DEFAULT_TEMPORAL_SLACKNESS_S * 249,
722                    intervalSelector.getIntervalMs() >>> 4) +
723                Rnd.fastRnd.nextInt(SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS/2);
724            _nextRequestNotBefore.put(key, new Long(nextNotBefore));
725    
726    
727            // Select which deferred set we are working with...
728            final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> diSet =
729                currentInterval ? _deferredCIRequests : _deferredAIRequests;
730    
731            // Hold a lock briefly while we do our work.
732            synchronized(diSet)
733                {
734                // Get any existing Set of values.
735                final EnumSet<EventPeriod> s = diSet.get(def);
736                // If none then create and store a new Set with just this value.
737                if(s == null)
738                    {
739                    diSet.put(def, EnumSet.of(intervalSelector));
740                    return;
741                    }
742                // If non-null then just add this bit to the Set already held.
743                s.add(intervalSelector);
744                }
745            }
746    
747        /**Select, remove and return at random one of the deferred current/all interval requests; null if none.
748         * This is an atomic fetch-and-clear operation,
749         * designed to be used in association with
750         * _noteDeferredRequestForInterval().
751         *
752         * @param currentInterval  true for "current" interval,
753         *     false for "all" interval
754         * @return null or a pair with both elements valid and non-null
755         */
756        private Tuple.Pair<SimpleVariableDefinition, EventPeriod> _getAndClearDeferredRequestForInterval(
757                                                final boolean currentInterval)
758            {
759            // Select which deferred set we are working with...
760            final Hashtable<SimpleVariableDefinition,EnumSet<EventPeriod>> diSet =
761                currentInterval ? _deferredCIRequests : _deferredAIRequests;
762    
763            // Hold a lock briefly while we do our work.
764            synchronized(diSet)
765                {
766                // Return null immediately if no deferred requests pending.
767                final int n = diSet.size();
768                if(n == 0) { return(null); }
769    
770                // Pick one of the event types at random.
771                final SimpleVariableDefinition defs[] = new SimpleVariableDefinition[n];
772                diSet.keySet().toArray(defs);
773                final SimpleVariableDefinition def =
774                    defs[(n == 1) ? 0 : Rnd.fastRnd.nextInt(n)];
775    
776                // The def should be a valid event type.
777                assert(def != null);
778                assert(def.isEvent());
779    
780                // Get the Set of deferred requests for this event type.
781                final EnumSet<EventPeriod> s = diSet.get(def);
782    
783                // The Set should be non-null and non-empty.
784                assert(s != null);
785                assert(!s.isEmpty());
786    
787                // Pick one of the pending iterval/period values at random.
788                final int m = s.size();
789                final EventPeriod eps[] = new EventPeriod[m];
790                s.toArray(eps);
791                final EventPeriod ep =
792                    eps[(m == 1) ? 0 : Rnd.fastRnd.nextInt(m)];
793    
794                // We should have chosen a valid interval.
795                assert((def.getEvPeriodSubset() == null) ||
796                       def.getEvPeriodSubset().contains(ep));
797    
798                // Compose our result.
799                final Tuple.Pair<SimpleVariableDefinition, EventPeriod> result =
800                    new Tuple.Pair<SimpleVariableDefinition, EventPeriod>(def, ep);
801    
802                // Now remove our pending value,
803                // and indeed the whole Set if now empty.
804                if(m == 1) { diSet.remove(def); }
805                else { s.remove(ep); }
806    
807                return(result);
808                }
809            }
810    
811        /**Locks to prevent redundant concurrent fetches/updates of the same event type from upstream in getEventValues().
812         * The aim of this is to avoid making many fruitless upstream accesses
813         * over (for example) a slow or connection-limited medium such as an HTTP tunnel.
814         * <p>
815         * Locking at the per-def level probably admits almost all potential useful concurrency
816         * and is relatively simple.
817         * <p>
818         * We allow for a limited amount of concurrency in writing
819         * but we mainly use ConcurrentHashMap for thread-safety, read concurrency and putIfAbsent().
820         */
821        private final ConcurrentMap<SimpleVariableDefinition, ReentrantLock> _getEventValues_locks =
822            new ConcurrentHashMap<SimpleVariableDefinition, ReentrantLock>(2*SystemVariables.defs.size(), 0.5f, 4);
823    
824        /**Get event values; never null.
825         * We get what we can from our local store,
826         * then go upstream to fill in any unfilled requested slots
827         * for which we did not have authoritative data
828         * (and then attempt to cache them locally for next time).
829         * However, if unable to fetch authoritative data (quickly)
830         * then we will return null or non-authoritative data as appropriate for each slot;
831         * this does not throw IOException.
832         * <p>
833         * We treat the 'current' and 'all' intervals specially
834         * since upstream will never offer us an authoritative value
835         * for the current slot, only for older completed ones.
836         * We get requested values asynchronously,
837         * to be reasonably up-to-date for the <em>next</em> request.
838         * <p>
839         * If a request for a "local" event upstream fails with an IllegalArgumentException
840         * then we assume that upstream of us is a tunnel which local values don't cross
841         * so we simply report whatever value we have to hand.
842         *
843         * @param def  non-null event-variable definition
844         * @param intervalSelector  non-null valid interval for specified event
845         * @param whichValues  each true bit represents a slot for which data is
846         *     required, bit 0 indicating data from the slot within which
847         *     firstIntervalTime is located, bit 1 the previous slot, etc;
848         *     null is treated as the common case equivalent to just bit 0 set
849         *
850         * @throws IOException  if we have difficulty getting values from upstream
851         * @throws InterruptedIOException  if interrupted or if blocking for too long.
852         */
853        @Override
854        public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
855                                                   final EventPeriod intervalSelector,
856                                                   final long intervalNumber,
857                                                   final BitSet whichValues)
858            {
859    //logger.log("PipelineVarMgr.getEventValues("+def+", "+intervalSelector+", "+intervalNumber+", "+whichValues+")");
860    
861            // Current interval number (never authoritative).
862            final long currentInterval = intervalSelector.getIntervalNumber(System.currentTimeMillis());
863    
864            // Optimisation fast-path for common/simple case:
865            // the single requested slot/value is available locally and is authoritative.
866            // If available from local store then we don't need to use our locks/throttling,
867            // else we fall through for our more general case.
868            final boolean nullWV = (whichValues == null);
869            if(nullWV ||
870               ((whichValues.cardinality() == 1) && whichValues.get(0)))
871                {
872                // Don't try to satisfy requests for the current/all slots here...
873                if((intervalNumber != 0) && (intervalNumber != currentInterval))
874                    {
875                    // Get what we can from local store.
876                    final EventVariableValue[] result =
877                        super.getEventValues(def, intervalSelector, intervalNumber, null);
878    
879                    if((result.length != 0) &&
880                        (result[0] != null) &&
881                        (result[0].isAuthoritative()))
882                        { return(result); /* Good, local cached value does the job. */ }
883                    }
884                }
885    
886            // Verify that this this def is an event and is genuine!
887            assert(def.isEvent());
888            assert(SystemVariables.defs.contains(def));
889    
890            // First get what we can from our local store/cache.
891            // This call will validate the method arguments too.
892            EventVariableValue[] result =
893                super.getEventValues(def, intervalSelector, intervalNumber, whichValues);
894    
895            // If any requested values are missing/null or non-authoritative,
896            // then try to go upstream for them (and then cache them locally).
897            BitSet wv = whichValues;
898            if(wv == null)
899                {
900                // Create synthetic BitSet for simplicity below.
901                wv = new BitSet(1);
902                wv.set(0);
903                }
904    
905            // Note any "missing"/non-authoritative values.
906            final BitSet nonAuthValues = new BitSet();
907            int resultsPresent = 0;
908            for(int i = wv.nextSetBit(0); i >= 0; i = wv.nextSetBit(i+1))
909                {
910                final boolean absent = ((i >= result.length) || (result[i] == null));
911                if(!absent) { ++resultsPresent; }
912                if(absent || !result[i].isAuthoritative())
913                    { nonAuthValues.set(i); }
914                }
915    
916            // If the entire request has been satisfied from local cache
917            // then we can return immediately!
918            if(nonAuthValues.length() == 0)
919                { return(result); }
920    
921            // Note if none of the request at all has been assembled from cache.
922            // In this case, we'll try harder to fetch values from upstream
923            // since we don't really want to return completely empty-handed.
924            final boolean noResultsFromCache = (resultsPresent == 0);
925    
926            // If the only non-authoritative answer is
927            // in the "current" or "all" slot
928            // (it never would be authoritative anyway)
929            // then return now what we have locally cached/available.
930            // Don't bother with the expense of going upstream synchronously
931            // providing we have some sort of answer from cache
932            // but just note that the value was requested and deferred.
933            // We will go and fetch a fresh value soon, in the background.
934            if(DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY &&
935                    (!noResultsFromCache) &&
936                    (nonAuthValues.cardinality() == 1))
937                {
938                final int which = nonAuthValues.length()-1; // Index of the one bit that is set...
939    
940                if(intervalNumber == 0)
941                    {
942                    _noteDeferredRequestForInterval(false, def, intervalSelector);
943                    return(result);
944                    }
945                else if(intervalNumber - which == currentInterval)
946                    {
947                    _noteDeferredRequestForInterval(true, def, intervalSelector);
948                    return(result);
949                    }
950                }
951    
952            // THIS REQUEST WILL NEED TO GO UPSTREAM TO BE COMPLETELY SATISFIED.
953    
954            // We allow at most one upstream access per def at once
955            // (through this front-end)
956            // to try to avoid overloading any tunnel or other choke-point.
957            // The lock is held around the underlying upstream call,
958            // even if that continues asynchronously out of sight from here.
959            // Get the unique lock for this event/def,
960            // with race-free create on first use.
961            ReentrantLock _getEventValues_lock;
962            while(null == (_getEventValues_lock = _getEventValues_locks.get(def)))
963                {
964                // Ensure no races for creation of a lock instance,
965                // ie at most one lock will ever be current for one def.
966                _getEventValues_locks.putIfAbsent(def, new ReentrantLock());
967                }
968            final ReentrantLock getEventValues_lock = _getEventValues_lock;
969            // Give up quickly if there is already a queue for the lock for this def,
970            // ie if we might well fail to get to make our call within a reasonable time.
971            if(getEventValues_lock.hasQueuedThreads())
972                {
973                return(result); // Return what we had collected so far, if anything.
974                }
975    
976            // Make sure that the result value is large enough to accommodate
977            // all the results requested.
978            // If not then replace it with a large enough one and copy existing results in.
979            final int wvl = nullWV ? 1 : whichValues.length();
980            if(result.length < wvl)
981                {
982                final EventVariableValue newResult[] = new EventVariableValue[wvl];
983                System.arraycopy(result, 0, newResult, 0, result.length);
984                result = newResult;
985                }
986    
987    //logger.log("PipelineVarMgr.getEventValues() upstream request: " + nonAuthValues);
988    
989            // Make an upstream request for the slots/intervals
990            // where our local store did not have authoritative answers.
991            final BitSet upstreamRequest = (BitSet) nonAuthValues.clone(); // Logically immutable...
992            // We make an effort to prevent this request blocking indefinitely
993            // (though if the thread pool fills then we end up running synchronously in this thread).
994            final Future<EventVariableValue[]> fetchedValues = ThreadUtils.nonCPUThreadPool.submit(new Callable<EventVariableValue[]>(){
995                public EventVariableValue[] call()
996                    {
997                    // Now try to grab the lock associated with this def,
998                    // but we are only prepared to block for a very short while
999                    // (short enough time not to annoy interactive users for example).
1000                    // If we can't get the lock quickly then we abort.
1001                    // Try harder if we'd otherwise have nothing at all to return to the original caller.
1002                    try
1003                        {
1004                        if(!getEventValues_lock.tryLock(1 + (noResultsFromCache ? (CoreConsts.MAX_INTERACTIVE_DELAY_MS*2) : (CoreConsts.MAX_INTERACTIVE_DELAY_MS/2)), TimeUnit.MILLISECONDS))
1005                            {
1006                            logger.log("INFO: could not quickly acquire lock in PipelineVarMgr.getEventValues() for "+def);
1007                            return(new EventVariableValue[0]); // Abort: return empty result.
1008                            }
1009                        }
1010                    catch(final InterruptedException e)
1011                        {
1012                        return(new EventVariableValue[0]); // Abort: return empty result.
1013                        }
1014    
1015                    try {
1016                        // Get the data from upstream.
1017                        final EventVariableValue[] upstreamValues = source.getEventValues(def, intervalSelector, intervalNumber, upstreamRequest);
1018    
1019                        // Attempt to cache results here to avoid simply discarding them
1020                        // if the caller has to give up waiting for us
1021                        // before we've finished the RPC or handling/cacheing the response.
1022                        if(upstreamValues.length > 0)
1023                            {
1024                            // Offer the values that we requested from upstream to our local store
1025                            // and then merge them into our initial response.
1026                            for(int i = upstreamRequest.nextSetBit(0); i >= 0; i = upstreamRequest.nextSetBit(i+1))
1027                                {
1028                                if(i >= upstreamValues.length) { break; }
1029                                final EventVariableValue evv = upstreamValues[i];
1030                                // Skip missing responses.
1031                                if(evv == null) { continue; }
1032                                // Skip dubious claims to authoritative answers for the current interval or the future (!)
1033                                // which might be the result of a race or clock skew, etc...
1034                                if((evv.getIntervalNumber() >= currentInterval) && evv.isAuthoritative()) { continue; }
1035    
1036                                // Offer upstream value to the local store, ie cache it...
1037                                /*super.*/setEventValue(evv);
1038                                }
1039                            }
1040    
1041                        return(upstreamValues); // Return values to hand back to the original caller.
1042                        }
1043                    finally { getEventValues_lock.unlock(); }
1044                    }
1045                });
1046    
1047            EventVariableValue upstreamValues[];
1048            // Spend a limited time (much longer than we waited for the lock) fetching from upstream.
1049            // We allow at least long enough for a round-the-world RTT to marshal and send the data.
1050            try { upstreamValues = fetchedValues.get(Math.max(CoreConsts.MAX_TYPICAL_RPC_RTT_MS, 2*CoreConsts.MAX_INTERACTIVE_DELAY_MS), TimeUnit.MILLISECONDS); }
1051            // If the upstream result was not ready in time then return what we already had.
1052            catch(final TimeoutException e)
1053                {
1054                // Need to log failures to allow some system tuning...
1055                logger.log("WARNING: PipelineVarMgr timed out fetch of " + def + " reason: " + e.getMessage());
1056    
1057                // Synchronously and without time limit retry a minimal subset of the original request
1058                // in case marshalling/transmission time (of the full request)
1059                // would always otherwise prevent completion.
1060                // This way we may make incremental progress.
1061                // Try for just the first (newest) item requested when we timed out.
1062                // Note that we do this without the normal cover of our per-def concurrency lock/limit.
1063                // TODO: Try a random subset in case of chronic difficulties with particular slots.
1064                if(upstreamRequest.length() > 1)
1065                    {
1066                    final int firstNonAuth = upstreamRequest.nextSetBit(0);
1067                    assert(firstNonAuth >= 0);
1068                    // Discard all but the first set bit.
1069                    final BitSet upstreamRequestMin = (BitSet) upstreamRequest.clone();
1070                    upstreamRequestMin.clear(firstNonAuth+1, upstreamRequestMin.length());
1071                    assert(upstreamRequestMin.cardinality() == 1); // Should be exactly one bit set now.
1072                    // Don't put a time limit on this to try to force/allow some progress.
1073                    upstreamValues = source.getEventValues(def, intervalSelector, intervalNumber, upstreamRequestMin);
1074                    // Make sure that this result, if any (and sane) is cached below...
1075                    // Fall through to use whatever we managed to fetch this time.
1076                    }
1077                // No subset is possible so return result collected so far.
1078                else { return(result); }
1079                }
1080            // If we were interrupted then return what we had.
1081            catch(final InterruptedException e)
1082                {
1083                Thread.currentThread().interrupt(); // Don't lose "interruptedness".
1084                return(result);
1085                }
1086            // Log unexpected error and return what we already had.
1087            catch(final ExecutionException e)
1088                {
1089                logger.log("unexpected error", e); // Shouldn't happen, so log it.
1090                return(result);
1091                }
1092    
1093            // Merge the upstream results into our initial response.
1094            for(int i = upstreamRequest.nextSetBit(0); i >= 0; i = upstreamRequest.nextSetBit(i+1))
1095                {
1096                if(i >= upstreamValues.length) { break; }
1097                final EventVariableValue evv = upstreamValues[i];
1098                // Skip missing responses.
1099                if(evv == null) { continue; }
1100                // Skip dubious claims to authoritative answers for the current interval or the future (!)
1101                // which might be the result of a race or clock skew, etc...
1102                if((evv.getIntervalNumber() >= currentInterval) && evv.isAuthoritative()) { continue; }
1103    
1104                // Put upstream value into our response.
1105                result[i] = evv;
1106    
1107                // Offer upstream value to the local store, ie cache it, in case not already done...
1108                super.setEventValue(evv);
1109                }
1110    
1111            // OK, respond as well as we can...
1112            return(result);
1113            }
1114        }