001    package org.hd.d.pg2k.svrCore.vars;
002    
003    import java.io.File;
004    import java.io.FileNotFoundException;
005    import java.io.IOException;
006    import java.text.SimpleDateFormat;
007    import java.util.ArrayList;
008    import java.util.BitSet;
009    import java.util.Calendar;
010    import java.util.Collections;
011    import java.util.Comparator;
012    import java.util.Date;
013    import java.util.HashSet;
014    import java.util.Hashtable;
015    import java.util.InvalidPropertiesFormatException;
016    import java.util.Iterator;
017    import java.util.List;
018    import java.util.Map;
019    import java.util.Queue;
020    import java.util.Set;
021    import java.util.concurrent.Callable;
022    import java.util.concurrent.ConcurrentHashMap;
023    import java.util.concurrent.ConcurrentLinkedQueue;
024    import java.util.concurrent.ConcurrentMap;
025    import java.util.concurrent.ExecutionException;
026    import java.util.concurrent.ExecutorService;
027    import java.util.concurrent.Future;
028    import java.util.concurrent.atomic.AtomicInteger;
029    import java.util.concurrent.locks.ReentrantLock;
030    
031    import org.hd.d.pg2k.svrCore.CoreConsts;
032    import org.hd.d.pg2k.svrCore.DuplicateIDChecker;
033    import org.hd.d.pg2k.svrCore.FileTools;
034    import org.hd.d.pg2k.svrCore.GenUtils;
035    import org.hd.d.pg2k.svrCore.MemoryTools;
036    import org.hd.d.pg2k.svrCore.Rnd;
037    import org.hd.d.pg2k.svrCore.ThreadUtils;
038    import org.hd.d.pg2k.svrCore.Tuple;
039    import org.hd.d.pg2k.svrCore.Tuple.Triple;
040    
041    import ORG.hd.d.IsDebug;
042    
043    // TODO: allow set operations to queue without blocking the caller while a history is being loaded?
044    
045    // TODO: add 'noBlock' arg to _setVariable() which won't block but instead return immediately (false) having initiated async load if need be; normally true; caller may for example throw IOException
046    
047    /**Base class to "manage" the set of system variables.
048     * This holds the actual values and does:
049     * <ul>
050     * <li>Type checking.
051     * <li>Merging local values into globalmap values.
052     * <li>Trimming entries in global maps for servers now dead/unreachable.
053     * </ul>
054     * <p>
055     * This has a mechanism for discarding duplicate messages as they arrive.
056     * <p>
057     * Attempts to reign in retained state and 'acceptability' window
058     * in case of extreme system memory pressure,
059     * ie this tries to avoid ever being the cause of OOMs.
060     * <p>
061     * Thread-safe; does not hold any externally-visible locks.
062     * <p>
063     * This implements the generic variable-manager interface.
064     */
065    public class BasicVarMgr implements BasicVarMgrInterface
066        {
067        /**If true, this class (and possibly its derivatives) are in debug mode.
068         * This will normally result in extra []-bracketed comments in System.err.
069         */
070        protected static final boolean isDebugMode = false /* && ORG.hd.d.IsDebug.isDebug */;
071    
072        /**Create a normal set of managed system variables, initially empty.
073         * This instance is not marked as being at an end point,
074         * and it does filter out duplicate messages and invalid timestamps.
075         */
076        public BasicVarMgr() { this(false, false); }
077    
078        /**Create a set of managed system variables.
079         * If an instance is created as an end point,
080         * that means that it is the master variable copy in the master server,
081         * and so a unique, local, read-only instance ID is created
082         * and inserted into the variable cache.
083         * <p>
084         * This instance does filter out duplicate messages and invalid timestamps.
085         *
086         * @param  endPoint  if true then mark the system as an end-point
087         */
088        public BasicVarMgr(final boolean endPoint) { this(endPoint, false); }
089    
090        /**Create a set of managed system variables.
091         * If an instance is created as an end point,
092         * that means that it is the master variable copy in the master server,
093         * and so a unique, local, read-only instance ID is created
094         * and inserted into the variable cache.
095         *
096         * @param  endPoint  if true then mark the system as an end-point
097         * @param  dontFilterDups  if true then do NOT filter out duplicate
098         *     or out-of-time messages
099         */
100        public BasicVarMgr(final boolean endPoint,
101                           final boolean dontFilterDups)
102            {
103    //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": constructor.]"); }
104    
105            this.endPoint = endPoint;
106            this.dontFilterDups = dontFilterDups;
107            if(endPoint)
108                {
109                final InstanceID iid = InstanceID.createInstanceID();
110                final SimpleVariableValue svv = new SimpleVariableValue(
111                    SystemVariables.LOCAL_SYS_ID, iid);
112    
113                // The instance ID variable must be local and read-only.
114                assert(SystemVariables.LOCAL_SYS_ID.isReadOnly());
115                assert(SystemVariables.LOCAL_SYS_ID.isLocal());
116    
117                variables.put(SystemVariables.LOCAL_SYS_ID, svv);
118                assert(null != getVariable(SystemVariables.LOCAL_SYS_ID));
119    
120    //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": isEndPoint, ID=" + iid + ".]"); }
121                }
122            }
123    
124        /**If true then this variable set is at an end-point. */
125        private final boolean endPoint;
126    
127        /**Returns true if this is at an endpoint. */
128        public boolean isEndPoint() { return(endPoint); }
129    
130        /**Unless true we screen out duplicate events/updates and those with silly timestamps.
131         * We do this so that values can be delivered by a flooding algorithm if need be,
132         * but also so that mistakes and repeats will not cause duplication.
133         * <p>
134         * This should be enabled by default since it will quietly eliminate many problems;
135         * it may have to be off to run replays of old data for example.
136         */
137        protected final boolean dontFilterDups;
138    
139        /**Time after which items pending to go upstream are dropped after an error.
140         * This is to keep a finite bound on the size of any outbound queue
141         * in case of some long-lasting error (eg the master server down).
142         * <p>
143         * This is taken to be somewhat longer than the longest time that the data
144         * could reasonably be useful at first glance,
145         * to allow for clock skew, restarts of the master, network outages, and other insults.
146         * <p>
147         * Everything can probably be discarded safely after this time;
148         * some values can probably be discarded much sooner.
149         */
150        public final static long MAX_UPSTREAM_RETENTION_MS = 11*60*1000 +
151            2*SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS;
152    
153    //    public final static long MAX_UPSTREAM_RETENTION_MS = 1001 +
154    //        ((3 * Math.max(SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS,
155    //                       SystemVariables.EVENT_INTERVAL_VLONG_TERM_MS)) / 2);
156    
157        /**Maximum apparent age of message wrt local clock for message to pass filtering. */
158        public static final long MAX_APPARENT_MESSAGE_AGE_MS = MAX_UPSTREAM_RETENTION_MS;
159    
160        /**Returns true if the supplied timestamp from an incoming message/event/update is OK.
161         * To be considered OK it should be within the allowed peer-skew window,
162         * plus an extra allowance for historical values to be sent to us en masse
163         * after a network outage for example.
164         * <p>
165         * This can be used by other classes, eg upstream of a temporary network disconnect,
166         * to decide if it is worth retaining an old message
167         * or if it would simply get discarded anyway at the other end by a var manager instance.
168         * <p>
169         * Note that events may and non-event values/types may get expired at different rates.
170         */
171        private boolean timestampAcceptable(final SimpleVariableValue value)
172            {
173            if(value == null) { throw new IllegalArgumentException(); }
174            final long timestamp = value.getTimestamp();
175            final long now = System.currentTimeMillis();
176            // Reject if timestamp is too far in the future (beyond skew tolerance).
177            if(timestamp > now + CoreConsts.MAX_PEER_CLOCK_SKEW_MS)
178                { return(false); }
179            // Accept immediately if timestamp is within +/- skew tolerance.
180            if(timestamp >= now - CoreConsts.MAX_PEER_CLOCK_SKEW_MS)
181                { return(true); }
182            // Reject if timestamp is much more than VLONG period old plus skew tolerance
183            // which caps the resources that we ever need to put aside for this.
184            if(timestamp < now - MAX_APPARENT_MESSAGE_AGE_MS)
185                { return(false); }
186            // Reject if at least as old as the oldest de-dup message that we've retained.
187            // Since we might not have retained all messages at the same age,
188            // we reject anything equal to or older than the oldest retained.
189            // If there is no retained history at the moment then reject the message.
190            // Note that unless a lock is held on the dup collection
191            // between this and checking for a dup in it
192            // there is a race is we trim old entries from the collection.
193            final Long oldestUpdateRemembered = _updatesSeen.ageOfOldestEntry();
194            if(null == oldestUpdateRemembered)
195                { return(false); } // No history at all at the moment.
196            if(timestamp <= oldestUpdateRemembered)
197                { return(false); } // Too old to match against our history.
198            // Seems OK.
199            return(true);
200            }
201    
202        /**Minimum message retention time for duplicate checking (ms); strictly positive.
203         * Must be at least the maximum allowed clock skew plus something for transit time.
204         * We might hope to widen the window when resources allow.
205         * <p>
206         * We must not accept messages at all that are outwith this minimum window.
207         */
208        private static final int MIN_DUP_CHECK_RETENTION_MS = 2*CoreConsts.MAX_PEER_CLOCK_SKEW_MS;
209    
210        /**Immutable message ID suitable for use as a key in detecting duplicates.
211         * Has functional hashCode() and equals().
212         * <p>
213         * Does not contain the message or its content;
214         * only message meta-data to minimise memory footprint.
215         */
216        private static final class MessageIDKey
217            {
218            /**Message timestamp. */
219            final long timestamp;
220    
221            /**Variable definition/name; never null */
222            final SimpleVariableDefinition def;
223    
224            /**Message value hash. */
225            final int valueHash;
226    
227            /**Construct an instance. */
228            public MessageIDKey(final long timestamp,
229                                final SimpleVariableDefinition def,
230                                final int valueHash)
231                {
232                if(null == def) { throw new IllegalArgumentException(); }
233                this.timestamp = timestamp;
234                this.def = def;
235                this.valueHash = valueHash;
236                }
237    
238            /**Equality is based on all fields. */
239            @Override public boolean equals(final Object o)
240                {
241                if(this == o) { return(true); }
242                if(!(o instanceof MessageIDKey)) { return(false); }
243                final MessageIDKey other = (MessageIDKey) o;
244                return((valueHash == other.valueHash) &&
245                       (timestamp == other.timestamp) &&
246                       def.equals(other.def));
247                }
248    
249            /**Hash is computed from the value hash and timestamp low-order bits. */
250            @Override public int hashCode()
251                { return(valueHash ^ ((int) timestamp)); }
252            }
253    
254        /**Set of historical values that we check incoming message against for duplicates.
255         * A thread-safe set of triples of timestamp, definition (name) and value (hash),
256         * sorted in order by timestamp to make removal of expiring oldest entries easy.
257         * <p>
258         * As an absolute minimum we must retain messages at least MAX_PEER_CLOCK_SKEW_MS
259         * but preferably up to MAX_APPARENT_MESSAGE_AGE_MS.
260         * <p>
261         * A lock can be held on this instance to make multiple operations atomic.
262         */
263        private final DuplicateIDChecker<MessageIDKey> _updatesSeen =
264            DuplicateIDChecker.<MessageIDKey>create(
265                    MIN_DUP_CHECK_RETENTION_MS,
266                    (int) Math.min(Math.max(MIN_DUP_CHECK_RETENTION_MS, MAX_APPARENT_MESSAGE_AGE_MS), Integer.MAX_VALUE),
267                    "BasicVarMgr["+this.getClass().getSimpleName()+"]._updatesSeen");
268    
269        /**Check/accept message (with true return value) if not a duplicate.
270         * A message is treated as duplicate if it has the same def, timestamp and value hash,
271         * so we reject multiple occurrences of the same value at exactly the same time.
272         * <p>
273         * Note that we store the value hash to avoid keeping large message values
274         * in memory indefinitely.
275         * <p>
276         * In passing this removes any expired entries outside the valid time window.
277         *
278         * @return  true if message is not a duplicate, false otherwise
279         */
280        private boolean isNonDup(final SimpleVariableValue svv)
281            {
282            assert(svv != null);
283            final Object value = svv.getValue();
284            final MessageIDKey key = new MessageIDKey(
285                    svv.getTimestamp(),
286                    svv.getDef(),
287                    (value == null) ? -1 : value.hashCode());
288            // Atomically add to the set of values and check if new...
289            final boolean isNew = (null == _updatesSeen.add(key));
290    
291            // Return true if not a duplicate.
292            return(isNew);
293            }
294    
295        /**Cached variable values as map; never null.
296         * Map from SimpleVariableDefinition to SimpleVariableValue.
297         * <p>
298         * Used by:
299         * <ul>
300         * <li>getVariable() and getVariables(),
301         * <li>setVariable(), which can update it too.
302         * </ul>
303         * <p>
304         * A lock must be held on this object to do multiple operations atomically,
305         * but no other visible lock must be grabbed inside a lock on this.
306         * <p>
307         * Mappings are never remove()d from this table;
308         * it is limited in size to the maximum number of variables
309         * defined in SystemVariables.
310         * <p>
311         * Hashtable from SimpleVariableDefinition to SimpleVariableValue.
312         * <p>
313         * A Hashtable is used for thread-safety
314         * and because a lock on it can exclude other operations.
315         * <p>
316         * Never null.
317         */
318        private final Hashtable<SimpleVariableDefinition,SimpleVariableValue> variables =
319                new Hashtable<SimpleVariableDefinition, SimpleVariableValue>();
320    
321        /**Map of event history; never null.
322         * Map from definition to the (recent) history of an event,
323         * to be updated when a setVariable() is done on an event-type variable.
324         * <p>
325         * No lock must be held on this object,
326         * eg to do multiple operations atomically.
327         * <p>
328         * Mappings are never remove()d from this table;
329         * it is limited in size to the maximum number of variables
330         * defined in SystemVariables.
331         * <p>
332         * Thread-safe Map from SimpleVariableDefinition to SimpleVariableValue.
333         * <p>
334         * Never null.
335         */
336        private final ConcurrentMap<SimpleVariableDefinition,EventVariableValueSet> eventHistory =
337                new ConcurrentHashMap<SimpleVariableDefinition, EventVariableValueSet>();
338    
339    
340        /**Set variable.
341         * Set local cached value immediately;
342         * store global values to periodically propagate upstream to master
343         * but show last global values obtained from master on periodic poll.
344         * <p>
345         * We validate the the name/type of the variable being presented.
346         * <p>
347         * For local and globalMap variables,
348         * the last value set is the one that can be retrieved.
349         *
350         * @throws java.lang.IllegalArgumentException  on attempt to:
351         *     set variable with value of wrong type or incompatible definition,
352         *     set non-existent or read-only variable (or these can be ignored)
353         */
354        public void setVariable(final SimpleVariableValue newValue)
355            throws IOException
356            {
357    //        if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": setVariable("+newValue+").]"); }
358    
359            _setVariable(newValue, false);
360    
361    //        if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": setVariable("+newValue+") FINISHED.]"); }
362            }
363    
364        /**Set variable for upstream and downstream values.
365         * Set local cached value immediately;
366         * store global values to periodically propagate upstream to master
367         * but show last global values obtained from master on periodic poll.
368         * <p>
369         * We validate the the name/type of the variable being presented.
370         * <p>
371         * Since this allows read-only values to be updated,
372         * this is only visible to derived classes.
373         * <p>
374         * For local and globalMap variables,
375         * the last value set is the one that can be retrieved.
376         * <p>
377         * When setting a globalMap variable,
378         * we revisit the list of potentially-live systems and trim it if need be.
379         * <p>
380         * If eliminating/filtering duplicates and updates with invalid timestamps
381         * we do so silently here.
382         *
383         * @param fromUpstream  if true, we allow read-only variables to be set
384         *     in our cache as they represent updated values from the source
385         *     and we only allow updates with globalMaps of at least size 1
386         *
387         * @throws IllegalArgumentException  on attempt to:
388         *     set variable with value of wrong type or incompatible definition,
389         *     set non-existent or read-only variable (or these can be ignored)
390         */
391        protected void _setVariable(final SimpleVariableValue newValue,
392                                   final boolean fromUpstream)
393            {
394            if(newValue == null) { throw new IllegalArgumentException(); }
395    
396    //        if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": _setVariable("+newValue.getDef() +':'+ newValue+", "+fromUpstream+").]"); }
397    
398            final SimpleVariableDefinition def = newValue.getDef();
399            // Reject if not a variable known on this system.
400            if(!SystemVariables.defs.contains(def))
401                { throw new IllegalArgumentException("bad variable name or type: " + def); }
402    
403            // Disallow updates from "upstream"
404            // when at an upstream end-point.
405            if(endPoint && fromUpstream)
406                { throw new IllegalStateException("at upstream endpoint; cannot accept updates from upstream"); }
407    
408            if(!fromUpstream)
409                {
410                // If this is a read-only variable
411                // then reject the attempted update if from a set operation.
412                if(def.isReadOnly())
413                    { throw new IllegalArgumentException("variable is read-only and may not be set (from downstream): " + def); }
414    
415                // If it has a globalMap,
416                // then reject updates with more than one entry.
417                final Map<InstanceID,SimpleVariableValue> globalMap = newValue.getGlobalMap();
418                if((globalMap != null) && (globalMap.size() > 1))
419                    { throw new IllegalArgumentException("variable has globalMap with more than one entry and may not be set (from downstream): " + def); }
420                }
421    
422            // Allow an incoming global with an empty globalMap
423            // as indicative of an "expired" value,
424            // but only if its timestamp is a long time in the past
425            // and the variable really would have expired by now...
426            if(fromUpstream &&
427                !def.isLocal() &&
428                (newValue.getTimestamp() + 2*SystemVariables.MAX_QUIET_SYSTEM_VAR_LIFE_MS > System.currentTimeMillis()))
429                {
430                // From upstream must have at least one entry if a global,
431                // unless very old.
432                final Map<InstanceID,SimpleVariableValue> globalMap = newValue.getGlobalMap();
433                if((globalMap == null) || (globalMap.size() < 1))
434                    {
435    if(IsDebug.isDebug) { System.err.println("WARNING: non-local, non-expired, upstream variable value has empty globalMap: " + newValue.getFullDescription()); }
436                    return; /* Quietly discard for now. */
437                    }
438    //                { throw new IllegalArgumentException("non-local, non-expired, upstream variable value has empty globalMap: " + newValue.getFullDescription()); }
439                }
440    
441            // If filtering dups, etc, then do so here.
442            // None of this applies to local values where we don't expect replays, clock skew, etc.
443            if(!dontFilterDups && !def.isLocal())
444                {
445                // Check the acceptability of the timestamp and then for a duplicate
446                // within a lock on _updatesSeen to prevent races.
447                synchronized(_updatesSeen)
448                    {
449                    // Discard anything (non-local) with a dodgy timestamp, ie outside the valid time window.
450                    if(!timestampAcceptable(newValue))
451                        {
452    if(IsDebug.isDebug) { System.err.println("WARNING: discarded invalid-timestamp variable set for "+newValue.getDef()+" "+newValue.getTimestamp()+" "+(new Date(newValue.getTimestamp()))); }
453                        return;
454                        }
455                    // Discard duplicate value (within the acceptable time window).
456                    if(!isNonDup(newValue))
457                        {
458    if(IsDebug.isDebug) { System.err.println("WARNING: discarded duplicate variable set for "+newValue.getDef()+" identity="+System.identityHashCode(newValue)+": "+newValue); }
459    //if(IsDebug.isDebug) { (new Error("STOP")).printStackTrace(); }
460    //                return;
461                        }
462                    }
463                }
464    
465    //        // Report when values propagate up or down...
466    //        if(isDebugMode)
467    //            {
468    //            if(!newValue.equals(variables.get(def)))
469    //                {
470    //                System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": UPDATING VALUE "+(fromUpstream?"(from upstream)":"")+": _setVariable("+newValue.getFullDescription()+", "+fromUpstream+").]");
471    //                }
472    //            }
473    
474            // Merge the new value into the cache.
475            _mergeValues(newValue, fromUpstream);
476    
477            // If this is an event
478            // then record it in the event history as appropriate.
479            if(def.isEvent())
480                {
481                // Get any extant value from the cache.
482                final EventVariableValueSet extant = getEventHistory(def);
483    
484                // Record this event if it is not from upstream.
485                if(!fromUpstream)
486                    { extant.addEvent(newValue); }
487                }
488            }
489    
490        /**Get the current event history for the given definition; never null but may return an empty value.
491         */
492        private EventVariableValueSet getEventHistory(final SimpleVariableDefinition def)
493            {
494            assert(def.isEvent());
495    
496            EventVariableValueSet result;
497            // If there isn't one then create an empty one atomically
498            // or complete its load from persistent store if pending.
499            while(null == (result = eventHistory.get(def)))
500                {
501                assert(def.isEvent());
502    
503                // We only need to load files already discovered during loadEventHistories();
504                // if not already known about then there is no history available for this event.
505                Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock> j = null;
506                if(null != (j = remainingToBeLoaded.get(def)))
507                    {
508                    assert(def.isPersistent());
509    
510                    // Attempt to grab our job from the work queue and lock it.
511                    Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock> job = null;
512                    for(final Iterator<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>> it = loadOrder.iterator(); it.hasNext(); )
513                        {
514                        final Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock> next = it.next();
515                        if(def.equals(next.first))
516                            {
517                            job = next; // Found the job.
518                            it.remove(); // Ensure not in the work queue.
519                            assert(!loadOrder.contains(job));
520                            break;
521                            }
522                        }
523    
524                    // Just before loading the history or waiting for another thread to finish doing so
525                    // then, on the grounds that other synchronous loads may otherwise be needed soon,
526                    // we may attempt to speculatively asynchronously force loading of any remaining ones.
527                    // We use a discardable thread pool to avoid ever blocking here.
528                    // We don't do this if (temporarily) conserving resources.
529                    //
530                    // THIS MUST NOT throw an exception nor return early here, else the history will never get loaded...
531                    try
532                        {
533                        if(!loadOrder.isEmpty() && !GenUtils.mustConservePower())
534                            { ThreadUtils.lowPriorityThreadPoolDiscardable.submit(_createLoaderCallable()); }
535                        }
536                    catch(final Exception e) { e.printStackTrace(); } // Log problem by continue.
537    
538                    // Could not find job on work queue; may be in progress or complete, ie this thread lost a race.
539                    // Else try to lock the jobs exclusively to this thread, or abort, ie this thread lost a race.
540                    if((null == job) || !job.third.tryLock())
541                        {
542    if(IsDebug.isDebug && j.third.isLocked()) { System.out.println("Queueing to wait for history load for "+j.first); (new Throwable()).printStackTrace(System.out); }
543                        j.third.lock(); j.third.unlock(); // Wait until load of this history by other thread is complete.
544                        continue;
545                        }
546                    // Load the properties and remove from 'remaining' list, success or fail.
547                    try { _loadEVVS(job); continue; }
548                    catch(final Exception e)
549                        {
550                        // In case if error, whinge...
551                        e.printStackTrace();
552                        // ...and fall through to store empty state.
553                        break;
554                        }
555                    }
556    
557                // Store empty value (atomically), as no persisted/saved history to be loaded.
558                eventHistory.putIfAbsent(def, new EventVariableValueSet(def));
559                }
560            return(result);
561            }
562    
563        /**Records the current loading concurrency across all instances; ref never null, value never negative. */
564        private static final AtomicInteger _loadEVVSConcurrency = new AtomicInteger();
565    
566        /**Loads into the internal store the specified EVVS from its serialised form in the file.
567         * Only to be used when the caller has exclusive access to load this job,
568         * eg the current thread must have the lock on this job.
569         * <p>
570         * In case of error nothing is posted to the internal store
571         * but the lock is released and the entry removed from remainingToBeLoaded.
572         */
573        private void _loadEVVS(final Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock> job)
574            throws IOException, InvalidPropertiesFormatException
575            {
576            assert(job.third.isHeldByCurrentThread());
577    
578            final int concurrencyS = _loadEVVSConcurrency.incrementAndGet();
579    //if(IsDebug.isDebug && (concurrencyS > 1)) { System.out.println("[BasicVarMgr._loadEVVS() starting; concurrency: "+concurrencyS); }
580    
581            final long thisStart = System.currentTimeMillis();
582            final SimpleVariableDefinition def = job.first;
583            final File f = job.second;
584    
585            try
586                {
587                final Object o = FileTools.deserialiseFromFile(f, SAVE_EVENT_HISTORIES_GZIPPED);
588                if(!(o instanceof EventVariableValueSet))
589                    { throw new InvalidPropertiesFormatException("unexpected event history data deserialised from " + f); }
590    
591                final EventVariableValueSet evvs = (EventVariableValueSet) o;
592                if(!def.equals(evvs.getDef()))
593                    { throw new InvalidPropertiesFormatException("wrong event history data deserialised from " + f); }
594    
595                // Post the results into the history map atomically.
596                eventHistory.putIfAbsent(job.first, evvs);
597    
598    if(ORG.hd.d.IsDebug.isDebug || (System.currentTimeMillis() - thisStart > 2000) || (remainingToBeLoaded.size() == 0)) { System.out.println("[BasicVarMgr._loadEVVS(): loaded history ("+job.second.length()+" bytes, concurrency~"+Math.min(concurrencyS,_loadEVVSConcurrency.get())+", queue="+job.third.getQueueLength()+") for "+def+" in "+(System.currentTimeMillis() - thisStart)+"ms, remaining="+remainingToBeLoaded.size()+".]"); }
599    //if(IsDebug.isDebug && (job.third.getQueueLength() > 0)) { System.out.println("[BasicVarMgr._loadEVVS(): count of threads waiting for loading of "+job.first+" is "+job.third.getQueueLength()); }
600                }
601            finally
602                {
603                // Now remove from the remainingToBeLoaded whether this failed or succeeded.
604                // Do so atomically.
605                final boolean removed = remainingToBeLoaded.remove(job.first, job);
606                assert(removed) : "job has already gone or been overwritten";
607                job.third.unlock(); // Release the lock to let loose any threads waiting for this def to load...
608    
609                // Drop the concurrency count.
610                _loadEVVSConcurrency.decrementAndGet();
611                }
612            }
613    
614        /**Merge a new value into a variable in the cache.
615         * This very carefully merges any extant cached value
616         * with a set coming upstream or a new global value coming downstream,
617         * trying to cope with the fact that a new value outbound may meet
618         * an updated global value coming back from upstream
619         * (from the master for example).
620         * <p>
621         * We always immediately apply a local
622         * or override the value and globalMap entry of a global,
623         * though for a global it will probably require the value merged
624         * at the master's end-point coming back to sort things out,
625         * with some instability/flapping in the interim.
626         * <p>
627         * The update must already have been validated,
628         * eg must not have more than one globalMap entry if a set.
629         * <p>
630         * This deals with events as well as plain variables.
631         * <p>
632         * Takes a lock on variables to make this atomic.
633         *
634         * @param newValue  a variable update/set coming upstream/downstream
635         * @param fromUpstream  if true, this is an update coming downstream
636         */
637        private void _mergeValues(final SimpleVariableValue newValue,
638                                  final boolean fromUpstream)
639            {
640            if(newValue == null)
641                { throw new IllegalArgumentException(); }
642    
643            final SimpleVariableDefinition def = newValue.getDef();
644    
645            // If local, apply the value unconditionally...
646            if(def.isLocal())
647                {
648                variables.put(def, newValue); // Atomic update...
649    //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": _mergeValues("+newValue.getDef() +':'+ newValue+", "+fromUpstream+"): LOCAL UPDATE/SET; prev: "+extant+"]"); }
650                return;
651                }
652    
653    
654            // WE ARE UPDATING A GLOBAL...
655    
656            // Get the local system ID, ready for a merge operation.
657            final SimpleVariableValue iidVar = variables.get(SystemVariables.LOCAL_SYS_ID);
658            if(iidVar == null)
659                { throw new IllegalStateException("Cannot set global variable without our system ID; should this instance be an endpoint?"); }
660            final InstanceID sysID = (InstanceID) iidVar.getValue();
661    
662            // Do the merge atomically...
663            synchronized(variables)
664                {
665                // Get any extant value from the cache.
666                final SimpleVariableValue extant = variables.get(def);
667    
668                // If the new value is a set (from downstream),
669                // and has no global map entry,
670                // then we assume it is locally generated and inject one for us,
671                // preserving the existing timestamp.
672                final SimpleVariableValue adjNewValue =
673                    (!fromUpstream && (newValue.getGlobalMap() == null)) ?
674                    newValue.put(sysID, newValue, true) : newValue;
675    
676                // By now, we should not see any value with an empty globalMap,
677                // except possibly an "expired" value from upstream...
678                assert(fromUpstream || (adjNewValue.getGlobalMap() != null));
679                if(!fromUpstream && (adjNewValue.getGlobalMap() == null))
680                    { throw new IllegalStateException("ERROR: should NOT be set()ting global with empty globalMap"); }
681    
682                // If the extant value is null,
683                // then save the new value as-is and return immediately.
684                if(extant == null)
685                    {
686                    variables.put(def, adjNewValue);
687    //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": g1: _mergeValues("+adjNewValue.getFullDescription()+", "+fromUpstream+"): GLOBAL FIRST UPDATE/SET.]"); }
688                    return;
689                    }
690    
691                // Merge in new values from the update,
692                // essentially taking something like the union
693                // of the old and new values.
694                SimpleVariableValue toSave = extant;
695                final Map<InstanceID,SimpleVariableValue> newGM = adjNewValue.getGlobalMap();
696                if(newGM != null)
697                    {
698                    for(final InstanceID key : newGM.keySet())
699                        {
700                        final SimpleVariableValue svv = newGM.get(key);
701                        toSave = toSave.put(key, svv, true);
702                        }
703                    variables.put(def, toSave);
704    //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": gm: _mergeValues("+toSave.getFullDescription()+", "+fromUpstream+"): GLOBAL UPDATE/SET; prev: "+extant.getFullDescription()+".]"); }
705                    }
706                }
707            }
708    
709        /**Update a number of variables at once for efficiency; returns the number set.
710         * Is passed a Set of SimpleVariableValues and behaves as if it
711         * operates on all of them by calling setVariable() for each item
712         * in the Set in order (from low to high index).
713         * <p>
714         * This implementation "fails fast" on the first error.
715         *
716         * @return the number of variable values set;
717         *     never negative, never more than the number passed in
718         *
719         * @throws java.lang.IllegalArgumentException  on attempt to:
720         *     set variable with value of wrong type or incompatible definition,
721         *     set non-existent or read-only variable (or these can be ignored)
722         */
723        public int setVariables(final SimpleVariableValue newValues[])
724            throws IOException
725            {
726            if(newValues == null) { throw new IllegalArgumentException(); }
727    
728    //        if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": setVariables("+Arrays.asList(newValues)+").]"); }
729    
730            final int nvLen = newValues.length;
731            for(int i = 0; i < nvLen; ++i)
732                { _setVariable(newValues[i], false); }
733    
734    //        if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": setVariables("+Arrays.asList(newValues)+") FINISHED.]"); }
735    
736            // Claims to have set all the values passed to it.
737            return(nvLen);
738            }
739    
740        /**Get a single variable value; returns null if no such value or wrong type.
741         * Always get from local cache.
742         * <p>
743         * This trims from the globalMap any apparently-dead systems,
744         * and stores any such trimmed variable back in the cache atomically
745         * so as to incrementally eliminate stale data
746         * and return only reasonably fresh data to the caller.
747         *
748         * @param var definition of variable to fetch; never null
749         *
750         * @throws IllegalArgumentException if var is null
751         */
752        public SimpleVariableValue getVariable(final SimpleVariableDefinition var)
753            {
754    //        if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": getVariable("+var+").]"); }
755    
756            if(var == null)
757                { throw new IllegalArgumentException(); }
758    
759            // Do fetch/winnowing operation atomically.
760            synchronized(variables)
761                {
762                // Get the variable if set/extant,
763                // else return null.
764                final SimpleVariableValue svv =
765                        variables.get(var);
766    
767                // If null or no globalMap entries,
768                // then return as-is immediately;
769                // no winnowing to be done...
770                if((svv == null) || (svv.getGlobalMap() == null))
771                    { return(svv); }
772    
773                // Attempt to trim out dead very old values from the map.
774                final SimpleVariableValue svvWinnowed =
775                    svv.removeAllKeysOlder(System.currentTimeMillis() -
776                        2*SystemVariables.MAX_QUIET_SYSTEM_VAR_LIFE_MS);
777    
778                // If this actually trimmed anything,
779                // quietly replace the cached entry with the trimmed version.
780                // This way we incrementally remove stale data.
781                final Map<InstanceID,SimpleVariableValue> gMapWinnowed = svvWinnowed.getGlobalMap();
782                if((gMapWinnowed == null) ||
783                   (gMapWinnowed.size() < svv.getGlobalMap().size()))
784                    {
785    //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": getVariable() trimmed globalMap from "+svv.getGlobalMap().size()+" to "+((gMapWinnowed==null)?0:gMapWinnowed.size())+" entries.]"); }
786                    // Store trimmed variable,
787                    // discarding stale state.
788                    variables.put(var, svvWinnowed);
789                    return(svvWinnowed);
790                    }
791    
792                // Original was OK,
793                // so return that to avoid duplicates in the heap.
794                return(svv);
795                }
796            }
797    
798        /**Get set of variable values altered on or after a given time, or all for -1; never null.
799         * Always get from local cache
800         * (the variable cache being periodically updated from the master).
801         * <p>
802         * This may be slow if there are many live variables.
803         * <p>
804         * This implementation never throws an IOException.
805         * <p>
806         * Note: we may limit the notion of "all" to be something a little greater
807         * than the maximum live-variable time window (ie variable lifetime),
808         * since beyond this the variable value has expired
809         * and is not deemed trustworthy anyway.
810         */
811        public SimpleVariableValue[] getVariables(final long changedSince)
812            {
813    //        if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": getVariables("+changedSince+").]"); }
814    
815            // Get definitions of all extant variables atomically.
816            // Note that we never remove variables once set.
817            final Set<SimpleVariableDefinition> variableDefs;
818            synchronized(variables)
819                { variableDefs = new HashSet<SimpleVariableDefinition>(variables.keySet()); }
820    
821            // Approximately size the result...
822            final List<SimpleVariableValue> result = new ArrayList<SimpleVariableValue>(variableDefs.size());
823    
824            // Compute a backstop based on the maximum variable life
825            // and the maximum permitted clock skew between participating systems.
826            // We use quite a conservative value to avoid over-filtering here.
827            final long backStop = System.currentTimeMillis() - 2*SystemVariables.MAX_QUIET_SYSTEM_VAR_LIFE_MS
828                                                             - 2*CoreConsts.MAX_PEER_CLOCK_SKEW_MS;
829            final long cutoffTime = Math.max(backStop, changedSince);
830    
831            // Go through the set,
832            // removing anything definitely older than the specified time,
833            // and winnowing out stale data.
834            for(final Iterator<SimpleVariableDefinition> it = variableDefs.iterator(); it.hasNext(); )
835                {
836                final SimpleVariableDefinition def = it.next();
837    
838                // Get value, trimmed of stale data if necessary.
839                final SimpleVariableValue svv = getVariable(def);
840    
841                // If timestamp is new enough,
842                // add the value to the result.
843                if((svv.getTimestamp() >= cutoffTime)) // TODO: test this clause!
844                    { result.add(svv); }
845                }
846    
847            final SimpleVariableValue r[] = new SimpleVariableValue[result.size()];
848            result.toArray(r);
849            return(r);
850            }
851    
852    
853        /**Get the current partial, or previous full, event set at the specified interval; never returns null.
854         * This is a simplified interface to return either the current event set
855         * that is being collected, or the previous completed set.
856         * <p>
857         * The current set is the most timely, but may not contain enough data
858         * to be meaningful if the new interval has just started.
859         * <p>
860         * The previous set is complete and thus most likely to have enough samples
861         * to be useful, but is not completely current.
862         * <p>
863         * If the requested event set is not available,
864         * an empty synthetic one is created and returned.
865         * Thus, with this interface, it is not possible to distinguish between
866         * there being no events in the given interval or simply no data.
867         * <p>
868         * This calls getEventValues().
869         * <p>
870         * This implementation will not throw an IOException.
871         *
872         * @param def  event definition (must be for an event); never null
873         * @param intervalSelector  never null
874         * @param current  if true the current event set is returned,
875         *     else the previous complete set is returned
876         *
877         * @return  requested event set; never null
878         *
879         * @throws IllegalArgumentException  if the request arguments are invalid
880         */
881        public EventVariableValue getEventValue(final SimpleVariableDefinition def,
882                                                final EventPeriod intervalSelector,
883                                                final boolean current)
884            {
885            if((def == null) || !def.isEvent() || (intervalSelector == null))
886                { throw new IllegalArgumentException(); }
887    
888            final long now = System.currentTimeMillis();
889            final long currentIntervalNumber = intervalSelector.getIntervalNumber(now);
890            final long intervalNumber = (current ? currentIntervalNumber : (currentIntervalNumber-1));
891    
892            final EventVariableValue evv[] = getEventValues(def,
893                                                            intervalSelector,
894                                                            intervalNumber,
895                                                            null);
896    
897            // If we got some real (non-null) data back, then return it...
898            if((evv.length > 0) && (evv[0] != null))
899                { return(evv[0]); }
900    
901            // Else invent empty non-authoritative return value (with no events)...
902            return(getEmptyNonAuthEVV(def, intervalSelector, intervalNumber));
903            }
904    
905        /**Cache of recently-requested 'empty' non-auth EVVs.
906         * We make this large enough to accommodate the current and previous/full values for each def
907         * for at least a couple of period values,
908         * but not so large as to be likely to be a problem.
909         * <p>
910         * We are prepared to drop this entirely in acute memory shortage.
911         */
912        private static final MemoryTools.SimpleProbabilisticCache<Tuple.Triple<SimpleVariableDefinition, EventPeriod, Long>, EventVariableValue> _getEmptyNonAuthEVV_cache =
913            MemoryTools.SimpleProbabilisticCache.<Tuple.Triple<SimpleVariableDefinition,EventPeriod,Long>, EventVariableValue>create(8*SystemVariables.defs.size(), "_getEmptyNonAuthEVV_cache");
914    
915        /**Get a non-authoritative empty EventVariableValue with the specified definition/period/interval; never null.
916         * This may cache results to help control the number of instances in circulation,
917         * though this empty values are assumed to otherwise have a fairly short life.
918         *
919         * @param def  valid event definition; never null
920         * @param intervalSelector  valid period for the definition; never null
921         * @param intervalNumber  valid interval number for the period; strictly positive
922         * @return empty non-auth value; never null
923         */
924        public static EventVariableValue getEmptyNonAuthEVV(final SimpleVariableDefinition def,
925                                                            final EventPeriod intervalSelector,
926                                                            final long intervalNumber)
927            {
928            // Try from cache to avoid the cost of constructing a whole new instance.
929            final Tuple.Triple<SimpleVariableDefinition, EventPeriod, Long> cacheKey =
930                new Tuple.Triple<SimpleVariableDefinition, EventPeriod, Long>(def, intervalSelector, intervalNumber);
931            final EventVariableValue cachedValue = _getEmptyNonAuthEVV_cache.get(cacheKey);
932            if(cachedValue != null) { return(cachedValue); }
933    
934            // Compute and cache the result for next time.
935            final EventVariableValue result = new EventVariableValue(false,
936                                          def,
937                                          intervalSelector,
938                                          intervalNumber,
939                                          0, null, null);
940            _getEmptyNonAuthEVV_cache.put(cacheKey, result);
941    
942            return(result);
943            }
944    
945        /**Get the specified event sets for the specified intervals; never null.
946         *
947         * @param intervalNumber  the number of the first interval
948         *     for which data is potentially required;
949         *     if too far in the past or future then possibly no data
950         *     will be available,
951         *     zero is used to access the "all" bucket
952         * @param whichValues  each true bit represents a slot for which data is
953         *     required, bit 0 indicating data from the slot within which
954         *     firstIntervalTime is located, bit 1 the previous slot, etc;
955         *     null is treated as the common case equivalent to just bit 0 set
956         *
957         * @return as many of the requested values as available,
958         *     return may contain nulls or be zero-length but is never null
959         */
960        public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
961                                                   final EventPeriod intervalSelector,
962                                                   final long intervalNumber,
963                                                   final BitSet whichValues)
964            {
965            if((def == null) || !def.isEvent() ||
966                    (intervalSelector == null) ||
967                    (intervalNumber < 0))
968                { throw new IllegalArgumentException(); }
969            final Set<EventPeriod> evPeriodSubset = def.getEvPeriodSubset();
970            if((evPeriodSubset != null) && !evPeriodSubset.contains(intervalSelector))
971                { throw new IllegalArgumentException("bad interval requested"); }
972    
973            // Get the event history, loading it if necessary...
974            final EventVariableValueSet evvs = getEventHistory(def);
975    
976            // If no entry at all for this event, then return an empty result set.
977            if(evvs == null) { return(EventVariableValuePeriodRow._NO_EVENT_VALUES); }
978    
979            assert(def.equals(evvs.getDef()));
980    
981            // Delegate handling of request to event set.
982            return(evvs.getEventValues(intervalSelector, intervalNumber, whichValues));
983            }
984    
985    
986        /**Set/override the specified event sets for the specified intervals; never null.
987         * This is used to set values that have been fetched from upstream
988         * (ie closer to the master data store).
989         * <p>
990         * This may ignore the call,
991         * and certainly will if the intervalNumber is outside the current time window it holds.
992         * <p>
993         * This will generally accept the value and store it if:
994         * <ul>
995         * <li>The intervalNumber is within the current window.
996         * <li>The intervalSelector is a period for which this event has data.
997         * <li>The supplied value is "better" than any current one held, ie:
998         *     <ul>
999         *     <li>We hold a null and evv is non-null.
1000         *     <li>We hold a non-authoritative value and evv is authoritative.
1001         *     <li>The authoritative-ness is the same but we hold an entry
1002         *         with a lower total count than evv.
1003         *     <ul>
1004         *     </ul>
1005         * </ul>
1006         * However, we will <em>not</em> accept non-authoritative updates
1007         * if at the end-point (ie this is the master store),
1008         * since we are the authoritative source of data.
1009         *
1010         * @param evv  non-null event value
1011         */
1012        public void setEventValue(final EventVariableValue evv)
1013    //        throws IOException
1014            {
1015            if(evv == null)
1016                { throw new IllegalArgumentException(); }
1017            final SimpleVariableDefinition def = evv.getDef();
1018            if(!def.isEvent())
1019                { throw new IllegalArgumentException(); }
1020    
1021            // If we are at an endpoint
1022            // then disregard attempts to set non-auth overrides.
1023            if(endPoint && !evv.isAuthoritative())
1024                { return; }
1025    
1026            // Find our cached event history for this event,
1027            // atomically creating it if need be.
1028            final EventVariableValueSet extant = getEventHistory(def);
1029    
1030            extant.setEventValue(evv);
1031            }
1032    
1033    
1034    
1035    
1036        /**Persist variables and events to disc.
1037         * This can either save everything applicable,
1038         * or just those things things that seem to be out-of-date on disc.
1039         * <p>
1040         * Only persistent event data is saved,
1041         * and generally each named event type has its data saved in a separate file,
1042         * in a form that should be reasonably robust
1043         * (eg a failure to load one item should not have all items unloadable).
1044         * <p>
1045         * This data may be stored in binary serialised or XML forms,
1046         * but is usually GZIPped to save space and speed up read/write.
1047         * <p>
1048         * If some saves fail then this will try to continue with others,
1049         * but throw one of the errors encountered (usually the last one)
1050         * at the end, so as to save as much as possible.
1051         * <p>
1052         * The incremental mode helps keeps the duration of any one call
1053         * as small as is reasonably possible, but will be somewhat less efficient
1054         * then non-incremental mode.
1055         * <p>
1056         * This locks out other activity while saving to ensure consistency;
1057         * it need not hold a lock continuously for whole period of the call
1058         * as it would usually be enough to do so while collecting candidate
1059         * histories to save, and while actually saving each one of them separately.
1060         * <p>
1061         * To avoid deadlocks, this takes a lock on this instance
1062         * before taking any other lock.
1063         *
1064         * @param dir  the directory to save to; never null
1065         * @param justChanges  just write histories changed since the last save if possible,
1066         *     else unconditionally save all persistent event histories
1067         * @param keepHistory  if true, try to roll the logs at least daily
1068         *     to help keep a trail of older snapshot though the storage space
1069         *     required may grow without bound,
1070         * @param incremental  if true, then at most one event is saved,
1071         *     chosen in such a way (eg by max time since last save or randomly)
1072         *     that eventually all eligible histories will be saved
1073         */
1074        public void saveEventHistories(final File dir,
1075                                       final boolean justChanges,
1076                                       final boolean keepHistory,
1077                                       final boolean incremental)
1078            throws IOException
1079            {
1080            if(dir == null)
1081                { throw new IllegalArgumentException(); }
1082    
1083            if(!dir.isDirectory())
1084                { throw new FileNotFoundException("not a directory: " + dir); }
1085    
1086            if(!dir.canWrite())
1087                { throw new IOException("cannot write to " + dir); }
1088    
1089            // Create/format the date string once, if we are going to use it.
1090            final String dateString = (!keepHistory) ? "" : ("." + dateFmtEHFile.format(new Date()));
1091    
1092            // Get potential events to be saved (assumed to be in a thread-safe manner).
1093            // Note that histories due to be asynchronously loaded but not yet done
1094            // will not be candidates here.
1095            final List<SimpleVariableDefinition> events =
1096                new ArrayList<SimpleVariableDefinition>(eventHistory.keySet());
1097    
1098            // Randomise the ordering for incremental mode to avoid starvation.
1099            // We trust the fast random generator to be OK...
1100            if(incremental)
1101                { Collections.shuffle(events, Rnd.fastRnd); }
1102    
1103            // Iterate over event data,
1104            // saving each persistent event's history to a separate file.
1105            // Include today's date in the name to provide backups/history where requested.
1106            IOException err = null; // Any error encountered.
1107            for(final SimpleVariableDefinition def : events)
1108                {
1109                assert(def.isEvent()); // Should not be any non-event variable data in here!
1110    
1111                // Only save persistent events' data.
1112                if(!def.isPersistent()) { continue; }
1113    
1114                // Get the latest values for this definition.
1115                final EventVariableValueSet history = eventHistory.get(def);
1116                if(history == null) { continue; }
1117    
1118                final StringBuilder sb = new StringBuilder(80);
1119                sb.append(EVENT_STORE_FILENAME_PREFIX);
1120                sb.append(def.getName());
1121                sb.append(EVENT_STORE_NAMETERM);
1122                sb.append(dateString); // Optional date...
1123                sb.append(EVENT_STORE_FILENAME_SUFFIX_SER_GZ);
1124    
1125                // Persist the data if necessary.
1126                final File file = new File(dir, sb.toString());
1127                try
1128                    {
1129                    // If only saving changes,
1130                    // we see if any events arrived since the file was last saved.
1131                    if(justChanges)
1132                        {
1133                        // If the history file appears to be present and correct
1134                        // and newer than the last added event
1135                        // then skip the save...
1136                        // We don't care much about minor race possibilities here.
1137                        if(file.exists() && file.isFile() && file.canRead() && (file.length() > 0) &&
1138                                (history.getLastEventTime() < file.lastModified()))
1139                            { continue; }
1140                        }
1141    
1142                    // Save the data as a compressed serialised object...
1143                    // Lock out mutators on the history object during the save.
1144                    synchronized(history)
1145                        { FileTools.serialiseToFile(history, file, SAVE_EVENT_HISTORIES_GZIPPED, !ORG.hd.d.IsDebug.isDebug); }
1146    
1147                    // If incremental, bucket out after the first successful save.
1148                    if(incremental)
1149                        { break; }
1150                    }
1151                catch(final IOException e)
1152                    {
1153                    // Print and temporarily absorb any errors encountered during save.
1154                    // Continue with other saves as far as possible.
1155                    e.printStackTrace();
1156                    err = e;
1157                    }
1158                }
1159    
1160            // Re-throw the last error encountered, if any.
1161            if(err != null) { throw err; }
1162            }
1163    
1164        /**Our private static instance of a GMT calendar.
1165         * No one must change the ZONE or DST offset.
1166         */
1167        private static final Calendar GMTCalendar = Calendar.getInstance();
1168        /**Initialise GMTCalendar. */
1169        static
1170            {
1171            // Set to GMT
1172            // i.e 0 Offset
1173            GMTCalendar.set(Calendar.ZONE_OFFSET, 0);
1174            // and 0 Daylight savings
1175            GMTCalendar.set(Calendar.DST_OFFSET, 0);
1176            }
1177    
1178        /**Format we use to insert into event history file name. */
1179        private static final SimpleDateFormat dateFmtEHFile =
1180            new SimpleDateFormat("yyyyMMdd");
1181        /**Set the Calendar for dateFmtEHFile to be the GMT calendar. */
1182        static { dateFmtEHFile.setCalendar(GMTCalendar); }
1183    
1184        /**If true, GZIP the saved histories to save space and read/write time. */
1185        private static final boolean SAVE_EVENT_HISTORIES_GZIPPED = true;
1186    
1187        /**Prefix to event history store file name. */
1188        public static final String EVENT_STORE_FILENAME_PREFIX = "eventStore.";
1189    
1190        /**Terminator to go after event name in file store.
1191         * Format of file is:
1192         * <pre>
1193         *     EVENT_STORE_FILENAME_PREFIX + name + EVENT_STORE_NAMETERM [ + date ] + legal suffix
1194         * </pre>
1195         * <p>
1196         * The nameterm includes at least one character that cannot appear in
1197         * an event name to avoid ambiguity where one name is a prefix of another.
1198         */
1199        public static final String EVENT_STORE_NAMETERM = "-T";
1200    
1201        /**Suffix for GZIPped serialised event history store file name. */
1202        public static final String EVENT_STORE_FILENAME_SUFFIX_SER_GZ = ".ser.gz";
1203    
1204    
1205        /**Definitions and the files from which their event histories are to be loaded; never null but may be empty.
1206         * The items are queued in the preferred order they should be loaded,
1207         * but items from anywhere in the list queue can be loaded on demand.
1208         * <p>
1209         * Entries should be taken from this with poll() and immediately locked with the Lock
1210         * which will have them processed in preferred order,
1211         * or in random order while attempting to load a history on demand
1212         * by using the iterator and remove() and immediately attempting to lock
1213         * with a failure treated as if the item had not been in the queue at all
1214         * (because another Thread has stolen it).
1215         * <p>
1216         * The lock must be released after the item is removed from allToBeLoaded.
1217         * <p>
1218         * ConcurrentLinkedQueue for thread-safe lock-free access.
1219         */
1220        private final Queue<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>> loadOrder = new ConcurrentLinkedQueue<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>>();
1221    
1222        /**This is the thread-safe collection of all items waiting to be loaded; never null but may be empty.
1223         * An item is only removed from this collection once it has completed loading
1224         * successfully or not.
1225         */
1226        private final ConcurrentMap<SimpleVariableDefinition, Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>> remainingToBeLoaded = new ConcurrentHashMap<SimpleVariableDefinition, Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>>();
1227    
1228        /**Load event histories from disc.
1229         * This will load the "latest" saved state, if any,
1230         * for each persistent event.
1231         * "Latest" may be measured by file timestamp (potentially slow and fragile)
1232         * or lexically-latest (eg by the date embedded in the filename)
1233         * or by some combination of the two.
1234         * <p>
1235         * Only the event history will be loaded.
1236         * <p>
1237         * If some loads fail then this will try to continue with others,
1238         * but re-throw one of the errors encountered (usually the last one)
1239         * at the end, so as to load as much as possible.
1240         * <p>
1241         * Though running this call concurrently with other (mutator) operations
1242         * (or indeed other instances of this call)
1243         * should be safe, such operation is not advised.
1244         * <p>
1245         * Should be called at most once, before any set (and get) operations.
1246         * <p>
1247         * We try to load largest-first to maximise concurrency.
1248         *
1249         * @param dir  the directory to load from; never null
1250         * @param async  if true then the caller will not be blocked waiting to load histories
1251         *     (else if false the caller will be blocked until they are all loaded)
1252         *     and if possible the event histories will be loaded in the background else as-needed,
1253         *     blocking access to individual histories not yet loaded while they are loaded;
1254         *     this only works for event histories with no state loaded at all
1255         */
1256        public void loadEventHistories(final File dir, final boolean async)
1257            throws IOException
1258            {
1259            if(dir == null)
1260                { throw new IllegalArgumentException(); }
1261    
1262            if(!dir.isDirectory())
1263                { throw new FileNotFoundException("not a directory: " + dir); }
1264    
1265            if(!dir.canRead())
1266                { throw new IOException("cannot read from " + dir); }
1267    
1268            if(!remainingToBeLoaded.isEmpty())
1269                { throw new IllegalStateException("already loading"); }
1270    
1271            final long startTime = System.currentTimeMillis();
1272    
1273            // Get listing of files in the directory...
1274            final File files[] = dir.listFiles();
1275    
1276            // Construct a short-list of those that look like usable, ie are:
1277            //   * starts with correct prefix
1278            //   * contains the name terminator
1279            //   * ends with a legal suffix (may be more than one in future)
1280            //
1281            // We should  probably also only accept candidiates that are:
1282            //   * plain files
1283            //   * readable
1284            //   * non-zero-length
1285            // but retrieval of such stats from disc at start-up may be slow.
1286            // This shortlist is of short filenames, with no directory components.
1287            final List<String> candiates = new ArrayList<String>(files.length);
1288            for(final File f : files)
1289                {
1290                final String name = f.getName();
1291                if(!name.startsWith(EVENT_STORE_FILENAME_PREFIX)) { continue; }
1292                if(-1 == name.indexOf(EVENT_STORE_NAMETERM)) { continue; }
1293                if(!name.endsWith(EVENT_STORE_FILENAME_SUFFIX_SER_GZ)) { continue; }
1294    
1295    //            final File fullPath = new File(dir, name);
1296    //            if(!fullPath.isFile()) { continue; }
1297    //            if(!fullPath.canRead()) { continue; }
1298    //            if(fullPath.length() <= 0) { continue; }
1299    
1300                // Good candidate, so add it.
1301                candiates.add(name);
1302                }
1303    
1304    if(ORG.hd.d.IsDebug.isDebug) { System.out.println("[BasicVarMgr.loadEventHistories(): found event history "+candiates.size()+" candidates in "+dir+".]"); }
1305    
1306            if(candiates.size() == 0)
1307                {
1308                System.err.println("WARNING: BasicVarMgr.loadEventHistories() found no event-history files in " + dir);
1309                return;
1310                }
1311    
1312            // Compute list of persistent events.
1313            final List<SimpleVariableDefinition> defs =
1314                new ArrayList<SimpleVariableDefinition>(SystemVariables.nameToDef.size());
1315            for(final SimpleVariableDefinition def : SystemVariables.nameToDef.values())
1316                {
1317                if(!def.isEvent() || !def.isPersistent()) { continue; }
1318                defs.add(def);
1319                }
1320            if(defs.isEmpty()) { return; /* Nothing to do... */ }
1321    
1322    if(ORG.hd.d.IsDebug.isDebug) { System.out.println("[BasicVarMgr.loadEventHistories(): candidates found for defs "+defs+".]"); }
1323    
1324    
1325            // Create an ordered List of <def,File> pairs
1326            // with the largest files, assumed slowest to load, first.
1327            final List<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>> sorted = new ArrayList<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>>(defs.size());
1328            for(final SimpleVariableDefinition def : defs)
1329                {
1330                // For each persistent event in the SystemVariables list,
1331                // look for the newest/best candidate file.
1332                // (Double-check after deserialising
1333                // that we have loaded data for the right def!)
1334    
1335                // Best candidate so far...
1336                File bestCandidate = null;
1337    
1338                // Minimum non-ambiguous prefix for an event-store file
1339                // for this event name.
1340                final String minUniquePrefix = EVENT_STORE_FILENAME_PREFIX +
1341                        def.getName() + EVENT_STORE_NAMETERM;
1342                for(final String name : candiates)
1343                    {
1344                    if(!name.startsWith(minUniquePrefix)) { continue; }
1345    
1346                    final File candiate = new File(dir, name);
1347    
1348                    // If there is no existing candidate,
1349                    // or this one is newer,
1350                    // then this becomes our best candidate.
1351                    if((bestCandidate == null) ||
1352                       (candiate.lastModified() > bestCandidate.lastModified()))
1353                        {
1354                        // Skip/veto obviously bogus/broken files.
1355                        if(!candiate.isFile()) { continue; }
1356                        if(!candiate.canRead()) { continue; }
1357                        if(candiate.length() <= 0) { continue; }
1358    
1359                        // Looks to be the best so far.
1360                        bestCandidate = candiate;
1361                        }
1362                    }
1363    
1364                // If a suitable candidate has been found then record it.
1365                if(bestCandidate != null)
1366                    { sorted.add(new Tuple.Triple<SimpleVariableDefinition, File, ReentrantLock>(def, bestCandidate, new ReentrantLock())); }
1367                }
1368            Collections.sort(sorted, new Comparator<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>>(){
1369                /**Order by file size, largest first. */
1370                public int compare(final Triple<SimpleVariableDefinition, File, ReentrantLock> o1,
1371                        final Triple<SimpleVariableDefinition, File, ReentrantLock> o2)
1372                    {
1373                    final long s1 = o1.second.length();
1374                    final long s2 = o2.second.length();
1375                    if(s1 > s2) { return(-1); }
1376                    if(s1 < s2) { return(+1); }
1377                    return(0); // Improbably the same size!
1378                    }
1379                });
1380            // Add all the items to the queue to be processed...
1381            loadOrder.addAll(sorted);
1382            // Record them all as 'remaining' items to be completed.
1383            for(final Triple<SimpleVariableDefinition, File, ReentrantLock> e : sorted)
1384                { remainingToBeLoaded.put(e.first, e); }
1385    
1386    //        if(true) { return; } // Force totally async loads...
1387    
1388            // Do not try to load the data asynchronously if (temporarily) short of power.
1389            if(async && GenUtils.mustConservePower()) { return; }
1390    
1391            // Run as many loader tasks as possible at once without blocking.
1392            // We regard these as mainly CPU intensive in practice.
1393            // All of the tasks are identical and simply loop pooling the load queue for work,
1394            // do the loads serially, and quit when the load queue is empty..
1395            // If running asynchronously then don't wait for them
1396            // and put them on a discarding pool to ensure that we never block for them.
1397            // (So if loading asynchronously we may need to load on demand
1398            // whatever histories have not been pre-loaded in time.)
1399            final ExecutorService pool = async ? ThreadUtils.lowPriorityThreadPoolDiscardable :
1400                ThreadUtils.computeIntensiveThreadPool;
1401            // Always start at least one task (possibly discardable) and no more than the number of CPUs
1402            // but otherwise to fill up the chosen thread pool to be done as fast as possible.
1403            // Try not to completely fill the target pool if possible!
1404            final int nTasks = Math.max(1, Math.min(ThreadUtils.AVAILABLE_PROCESSORS,
1405                (async ? ThreadUtils.lowPriorityThreadPoolDiscardableSpace() : ThreadUtils.computeIntensiveThreadPoolSpace()) - 1));
1406    if(ORG.hd.d.IsDebug.isDebug || (nTasks > 1)) { System.out.println("[BasicVarMgr.loadEventHistories(): async="+async+", files/defs="+sorted.size()+", nTasks="+nTasks+".]"); }
1407    
1408            // Create and attempt to launch the tasks.
1409            final List<Future<Object>> futures = new ArrayList<Future<Object>>(nTasks);
1410            for(int i = nTasks; --i >= 0; )
1411                { futures.add(pool.submit(_createLoaderCallable())); }
1412    
1413            try
1414                {
1415                // If running synchronously then wait for all the tasks to finish.
1416                if(!async)
1417                    {
1418                    for(final Future<Object> future : futures) { future.get(); }
1419                    if(!loadOrder.isEmpty() || !remainingToBeLoaded.isEmpty())
1420                        { throw new IOException("not all load tasks completed"); }
1421                    }
1422                }
1423            // Field some task-related exceptions...
1424            catch(final InterruptedException e)
1425                {
1426                // Log the exception.
1427                e.printStackTrace();
1428                Thread.currentThread().interrupt(); // Keep our interruptedness.
1429                throw new RuntimeException(e);
1430                }
1431            catch(final ExecutionException e)
1432                {
1433                // Log the exception.
1434                e.printStackTrace();
1435                // Rethrow any IOException cause directly.
1436                if(e.getCause() instanceof IOException)
1437                    { throw (IOException) e.getCause(); }
1438                // Wrap other exception types to rethrow unchecked.
1439                throw new RuntimeException(e);
1440                }
1441    
1442    if(ORG.hd.d.IsDebug.isDebug || (System.currentTimeMillis() - startTime > 2000)) { System.out.println("[BasicVarMgr.loadEventHistories(): "+(System.currentTimeMillis()-startTime)+"ms.]"); }
1443            }
1444    
1445        /**Creates a Callable that processes all outstanding queued loads; never null.
1446         * The call() returns null when done.
1447         * @param err
1448         * @return
1449         */
1450        private Callable<Object> _createLoaderCallable()
1451            {
1452            return new Callable<Object>(){
1453                public Object call() throws Exception
1454                    {
1455    if(ORG.hd.d.IsDebug.isDebug || (_loadEVVSConcurrency.get() > 1)) { System.out.println("[BasicVarMgr.loadEventHistories(): starting background loader task, load concurrency currently "+_loadEVVSConcurrency.get()+"...]"); }
1456    
1457                    Exception err = null; // Last error encountered, if any.
1458    
1459                    // Continue until we run out of jobs...
1460                    Triple<SimpleVariableDefinition, File, ReentrantLock> job;
1461                    while(null != (job = loadOrder.poll()))
1462                        {
1463                        if(!job.third.tryLock()) // Will be unlocked by _loadEVVS() when done.
1464                            {
1465                            // Someone else grabbed and locked this just as we were taking it from the head!
1466    if(ORG.hd.d.IsDebug.isDebug) { System.out.println("[BasicVarMgr.loadEventHistories(): work snatched from our grasp for "+job.first+".]"); }
1467                            continue;
1468                            }
1469                        try
1470                            {
1471                            // Log if it has taken a long time to get started.
1472    //if(ORG.hd.d.IsDebug.isDebug || (System.currentTimeMillis() - startTime > 2000)) { System.out.println("[BasicVarMgr.loadEventHistories(): loading history ("+job.second.length()+" bytes) for "+job.first+" starting after "+(System.currentTimeMillis() - startTime)+"ms.]"); }
1473    
1474                            _loadEVVS(job);
1475                            }
1476                        catch(final Exception e)
1477                            {
1478                            err = e; // Note the error, but absorb it and continue with next load.
1479                            e.printStackTrace();
1480                            }
1481                        }
1482    
1483                    // Re-throw last error encountered.
1484                    if(err != null) { throw err; }
1485    
1486                    return(null); // No real return value...
1487                    }
1488                };
1489            }
1490        }