001 package org.hd.d.pg2k.svrCore.vars;
002
003 import java.io.File;
004 import java.io.FileNotFoundException;
005 import java.io.IOException;
006 import java.text.SimpleDateFormat;
007 import java.util.ArrayList;
008 import java.util.BitSet;
009 import java.util.Calendar;
010 import java.util.Collections;
011 import java.util.Comparator;
012 import java.util.Date;
013 import java.util.HashSet;
014 import java.util.Hashtable;
015 import java.util.InvalidPropertiesFormatException;
016 import java.util.Iterator;
017 import java.util.List;
018 import java.util.Map;
019 import java.util.Queue;
020 import java.util.Set;
021 import java.util.concurrent.Callable;
022 import java.util.concurrent.ConcurrentHashMap;
023 import java.util.concurrent.ConcurrentLinkedQueue;
024 import java.util.concurrent.ConcurrentMap;
025 import java.util.concurrent.ExecutionException;
026 import java.util.concurrent.ExecutorService;
027 import java.util.concurrent.Future;
028 import java.util.concurrent.atomic.AtomicInteger;
029 import java.util.concurrent.locks.ReentrantLock;
030
031 import org.hd.d.pg2k.svrCore.CoreConsts;
032 import org.hd.d.pg2k.svrCore.DuplicateIDChecker;
033 import org.hd.d.pg2k.svrCore.FileTools;
034 import org.hd.d.pg2k.svrCore.GenUtils;
035 import org.hd.d.pg2k.svrCore.MemoryTools;
036 import org.hd.d.pg2k.svrCore.Rnd;
037 import org.hd.d.pg2k.svrCore.ThreadUtils;
038 import org.hd.d.pg2k.svrCore.Tuple;
039 import org.hd.d.pg2k.svrCore.Tuple.Triple;
040
041 import ORG.hd.d.IsDebug;
042
043 // TODO: allow set operations to queue without blocking the caller while a history is being loaded?
044
045 // TODO: add 'noBlock' arg to _setVariable() which won't block but instead return immediately (false) having initiated async load if need be; normally true; caller may for example throw IOException
046
047 /**Base class to "manage" the set of system variables.
048 * This holds the actual values and does:
049 * <ul>
050 * <li>Type checking.
051 * <li>Merging local values into globalmap values.
052 * <li>Trimming entries in global maps for servers now dead/unreachable.
053 * </ul>
054 * <p>
055 * This has a mechanism for discarding duplicate messages as they arrive.
056 * <p>
057 * Attempts to reign in retained state and 'acceptability' window
058 * in case of extreme system memory pressure,
059 * ie this tries to avoid ever being the cause of OOMs.
060 * <p>
061 * Thread-safe; does not hold any externally-visible locks.
062 * <p>
063 * This implements the generic variable-manager interface.
064 */
065 public class BasicVarMgr implements BasicVarMgrInterface
066 {
067 /**If true, this class (and possibly its derivatives) are in debug mode.
068 * This will normally result in extra []-bracketed comments in System.err.
069 */
070 protected static final boolean isDebugMode = false /* && ORG.hd.d.IsDebug.isDebug */;
071
072 /**Create a normal set of managed system variables, initially empty.
073 * This instance is not marked as being at an end point,
074 * and it does filter out duplicate messages and invalid timestamps.
075 */
076 public BasicVarMgr() { this(false, false); }
077
078 /**Create a set of managed system variables.
079 * If an instance is created as an end point,
080 * that means that it is the master variable copy in the master server,
081 * and so a unique, local, read-only instance ID is created
082 * and inserted into the variable cache.
083 * <p>
084 * This instance does filter out duplicate messages and invalid timestamps.
085 *
086 * @param endPoint if true then mark the system as an end-point
087 */
088 public BasicVarMgr(final boolean endPoint) { this(endPoint, false); }
089
090 /**Create a set of managed system variables.
091 * If an instance is created as an end point,
092 * that means that it is the master variable copy in the master server,
093 * and so a unique, local, read-only instance ID is created
094 * and inserted into the variable cache.
095 *
096 * @param endPoint if true then mark the system as an end-point
097 * @param dontFilterDups if true then do NOT filter out duplicate
098 * or out-of-time messages
099 */
100 public BasicVarMgr(final boolean endPoint,
101 final boolean dontFilterDups)
102 {
103 //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": constructor.]"); }
104
105 this.endPoint = endPoint;
106 this.dontFilterDups = dontFilterDups;
107 if(endPoint)
108 {
109 final InstanceID iid = InstanceID.createInstanceID();
110 final SimpleVariableValue svv = new SimpleVariableValue(
111 SystemVariables.LOCAL_SYS_ID, iid);
112
113 // The instance ID variable must be local and read-only.
114 assert(SystemVariables.LOCAL_SYS_ID.isReadOnly());
115 assert(SystemVariables.LOCAL_SYS_ID.isLocal());
116
117 variables.put(SystemVariables.LOCAL_SYS_ID, svv);
118 assert(null != getVariable(SystemVariables.LOCAL_SYS_ID));
119
120 //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": isEndPoint, ID=" + iid + ".]"); }
121 }
122 }
123
124 /**If true then this variable set is at an end-point. */
125 private final boolean endPoint;
126
127 /**Returns true if this is at an endpoint. */
128 public boolean isEndPoint() { return(endPoint); }
129
130 /**Unless true we screen out duplicate events/updates and those with silly timestamps.
131 * We do this so that values can be delivered by a flooding algorithm if need be,
132 * but also so that mistakes and repeats will not cause duplication.
133 * <p>
134 * This should be enabled by default since it will quietly eliminate many problems;
135 * it may have to be off to run replays of old data for example.
136 */
137 protected final boolean dontFilterDups;
138
139 /**Time after which items pending to go upstream are dropped after an error.
140 * This is to keep a finite bound on the size of any outbound queue
141 * in case of some long-lasting error (eg the master server down).
142 * <p>
143 * This is taken to be somewhat longer than the longest time that the data
144 * could reasonably be useful at first glance,
145 * to allow for clock skew, restarts of the master, network outages, and other insults.
146 * <p>
147 * Everything can probably be discarded safely after this time;
148 * some values can probably be discarded much sooner.
149 */
150 public final static long MAX_UPSTREAM_RETENTION_MS = 11*60*1000 +
151 2*SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS;
152
153 // public final static long MAX_UPSTREAM_RETENTION_MS = 1001 +
154 // ((3 * Math.max(SystemVariables.MAX_VALUE_DISTRIBUTION_LATENCY_MS,
155 // SystemVariables.EVENT_INTERVAL_VLONG_TERM_MS)) / 2);
156
157 /**Maximum apparent age of message wrt local clock for message to pass filtering. */
158 public static final long MAX_APPARENT_MESSAGE_AGE_MS = MAX_UPSTREAM_RETENTION_MS;
159
160 /**Returns true if the supplied timestamp from an incoming message/event/update is OK.
161 * To be considered OK it should be within the allowed peer-skew window,
162 * plus an extra allowance for historical values to be sent to us en masse
163 * after a network outage for example.
164 * <p>
165 * This can be used by other classes, eg upstream of a temporary network disconnect,
166 * to decide if it is worth retaining an old message
167 * or if it would simply get discarded anyway at the other end by a var manager instance.
168 * <p>
169 * Note that events may and non-event values/types may get expired at different rates.
170 */
171 private boolean timestampAcceptable(final SimpleVariableValue value)
172 {
173 if(value == null) { throw new IllegalArgumentException(); }
174 final long timestamp = value.getTimestamp();
175 final long now = System.currentTimeMillis();
176 // Reject if timestamp is too far in the future (beyond skew tolerance).
177 if(timestamp > now + CoreConsts.MAX_PEER_CLOCK_SKEW_MS)
178 { return(false); }
179 // Accept immediately if timestamp is within +/- skew tolerance.
180 if(timestamp >= now - CoreConsts.MAX_PEER_CLOCK_SKEW_MS)
181 { return(true); }
182 // Reject if timestamp is much more than VLONG period old plus skew tolerance
183 // which caps the resources that we ever need to put aside for this.
184 if(timestamp < now - MAX_APPARENT_MESSAGE_AGE_MS)
185 { return(false); }
186 // Reject if at least as old as the oldest de-dup message that we've retained.
187 // Since we might not have retained all messages at the same age,
188 // we reject anything equal to or older than the oldest retained.
189 // If there is no retained history at the moment then reject the message.
190 // Note that unless a lock is held on the dup collection
191 // between this and checking for a dup in it
192 // there is a race is we trim old entries from the collection.
193 final Long oldestUpdateRemembered = _updatesSeen.ageOfOldestEntry();
194 if(null == oldestUpdateRemembered)
195 { return(false); } // No history at all at the moment.
196 if(timestamp <= oldestUpdateRemembered)
197 { return(false); } // Too old to match against our history.
198 // Seems OK.
199 return(true);
200 }
201
202 /**Minimum message retention time for duplicate checking (ms); strictly positive.
203 * Must be at least the maximum allowed clock skew plus something for transit time.
204 * We might hope to widen the window when resources allow.
205 * <p>
206 * We must not accept messages at all that are outwith this minimum window.
207 */
208 private static final int MIN_DUP_CHECK_RETENTION_MS = 2*CoreConsts.MAX_PEER_CLOCK_SKEW_MS;
209
210 /**Immutable message ID suitable for use as a key in detecting duplicates.
211 * Has functional hashCode() and equals().
212 * <p>
213 * Does not contain the message or its content;
214 * only message meta-data to minimise memory footprint.
215 */
216 private static final class MessageIDKey
217 {
218 /**Message timestamp. */
219 final long timestamp;
220
221 /**Variable definition/name; never null */
222 final SimpleVariableDefinition def;
223
224 /**Message value hash. */
225 final int valueHash;
226
227 /**Construct an instance. */
228 public MessageIDKey(final long timestamp,
229 final SimpleVariableDefinition def,
230 final int valueHash)
231 {
232 if(null == def) { throw new IllegalArgumentException(); }
233 this.timestamp = timestamp;
234 this.def = def;
235 this.valueHash = valueHash;
236 }
237
238 /**Equality is based on all fields. */
239 @Override public boolean equals(final Object o)
240 {
241 if(this == o) { return(true); }
242 if(!(o instanceof MessageIDKey)) { return(false); }
243 final MessageIDKey other = (MessageIDKey) o;
244 return((valueHash == other.valueHash) &&
245 (timestamp == other.timestamp) &&
246 def.equals(other.def));
247 }
248
249 /**Hash is computed from the value hash and timestamp low-order bits. */
250 @Override public int hashCode()
251 { return(valueHash ^ ((int) timestamp)); }
252 }
253
254 /**Set of historical values that we check incoming message against for duplicates.
255 * A thread-safe set of triples of timestamp, definition (name) and value (hash),
256 * sorted in order by timestamp to make removal of expiring oldest entries easy.
257 * <p>
258 * As an absolute minimum we must retain messages at least MAX_PEER_CLOCK_SKEW_MS
259 * but preferably up to MAX_APPARENT_MESSAGE_AGE_MS.
260 * <p>
261 * A lock can be held on this instance to make multiple operations atomic.
262 */
263 private final DuplicateIDChecker<MessageIDKey> _updatesSeen =
264 DuplicateIDChecker.<MessageIDKey>create(
265 MIN_DUP_CHECK_RETENTION_MS,
266 (int) Math.min(Math.max(MIN_DUP_CHECK_RETENTION_MS, MAX_APPARENT_MESSAGE_AGE_MS), Integer.MAX_VALUE),
267 "BasicVarMgr["+this.getClass().getSimpleName()+"]._updatesSeen");
268
269 /**Check/accept message (with true return value) if not a duplicate.
270 * A message is treated as duplicate if it has the same def, timestamp and value hash,
271 * so we reject multiple occurrences of the same value at exactly the same time.
272 * <p>
273 * Note that we store the value hash to avoid keeping large message values
274 * in memory indefinitely.
275 * <p>
276 * In passing this removes any expired entries outside the valid time window.
277 *
278 * @return true if message is not a duplicate, false otherwise
279 */
280 private boolean isNonDup(final SimpleVariableValue svv)
281 {
282 assert(svv != null);
283 final Object value = svv.getValue();
284 final MessageIDKey key = new MessageIDKey(
285 svv.getTimestamp(),
286 svv.getDef(),
287 (value == null) ? -1 : value.hashCode());
288 // Atomically add to the set of values and check if new...
289 final boolean isNew = (null == _updatesSeen.add(key));
290
291 // Return true if not a duplicate.
292 return(isNew);
293 }
294
295 /**Cached variable values as map; never null.
296 * Map from SimpleVariableDefinition to SimpleVariableValue.
297 * <p>
298 * Used by:
299 * <ul>
300 * <li>getVariable() and getVariables(),
301 * <li>setVariable(), which can update it too.
302 * </ul>
303 * <p>
304 * A lock must be held on this object to do multiple operations atomically,
305 * but no other visible lock must be grabbed inside a lock on this.
306 * <p>
307 * Mappings are never remove()d from this table;
308 * it is limited in size to the maximum number of variables
309 * defined in SystemVariables.
310 * <p>
311 * Hashtable from SimpleVariableDefinition to SimpleVariableValue.
312 * <p>
313 * A Hashtable is used for thread-safety
314 * and because a lock on it can exclude other operations.
315 * <p>
316 * Never null.
317 */
318 private final Hashtable<SimpleVariableDefinition,SimpleVariableValue> variables =
319 new Hashtable<SimpleVariableDefinition, SimpleVariableValue>();
320
321 /**Map of event history; never null.
322 * Map from definition to the (recent) history of an event,
323 * to be updated when a setVariable() is done on an event-type variable.
324 * <p>
325 * No lock must be held on this object,
326 * eg to do multiple operations atomically.
327 * <p>
328 * Mappings are never remove()d from this table;
329 * it is limited in size to the maximum number of variables
330 * defined in SystemVariables.
331 * <p>
332 * Thread-safe Map from SimpleVariableDefinition to SimpleVariableValue.
333 * <p>
334 * Never null.
335 */
336 private final ConcurrentMap<SimpleVariableDefinition,EventVariableValueSet> eventHistory =
337 new ConcurrentHashMap<SimpleVariableDefinition, EventVariableValueSet>();
338
339
340 /**Set variable.
341 * Set local cached value immediately;
342 * store global values to periodically propagate upstream to master
343 * but show last global values obtained from master on periodic poll.
344 * <p>
345 * We validate the the name/type of the variable being presented.
346 * <p>
347 * For local and globalMap variables,
348 * the last value set is the one that can be retrieved.
349 *
350 * @throws java.lang.IllegalArgumentException on attempt to:
351 * set variable with value of wrong type or incompatible definition,
352 * set non-existent or read-only variable (or these can be ignored)
353 */
354 public void setVariable(final SimpleVariableValue newValue)
355 throws IOException
356 {
357 // if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": setVariable("+newValue+").]"); }
358
359 _setVariable(newValue, false);
360
361 // if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": setVariable("+newValue+") FINISHED.]"); }
362 }
363
364 /**Set variable for upstream and downstream values.
365 * Set local cached value immediately;
366 * store global values to periodically propagate upstream to master
367 * but show last global values obtained from master on periodic poll.
368 * <p>
369 * We validate the the name/type of the variable being presented.
370 * <p>
371 * Since this allows read-only values to be updated,
372 * this is only visible to derived classes.
373 * <p>
374 * For local and globalMap variables,
375 * the last value set is the one that can be retrieved.
376 * <p>
377 * When setting a globalMap variable,
378 * we revisit the list of potentially-live systems and trim it if need be.
379 * <p>
380 * If eliminating/filtering duplicates and updates with invalid timestamps
381 * we do so silently here.
382 *
383 * @param fromUpstream if true, we allow read-only variables to be set
384 * in our cache as they represent updated values from the source
385 * and we only allow updates with globalMaps of at least size 1
386 *
387 * @throws IllegalArgumentException on attempt to:
388 * set variable with value of wrong type or incompatible definition,
389 * set non-existent or read-only variable (or these can be ignored)
390 */
391 protected void _setVariable(final SimpleVariableValue newValue,
392 final boolean fromUpstream)
393 {
394 if(newValue == null) { throw new IllegalArgumentException(); }
395
396 // if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": _setVariable("+newValue.getDef() +':'+ newValue+", "+fromUpstream+").]"); }
397
398 final SimpleVariableDefinition def = newValue.getDef();
399 // Reject if not a variable known on this system.
400 if(!SystemVariables.defs.contains(def))
401 { throw new IllegalArgumentException("bad variable name or type: " + def); }
402
403 // Disallow updates from "upstream"
404 // when at an upstream end-point.
405 if(endPoint && fromUpstream)
406 { throw new IllegalStateException("at upstream endpoint; cannot accept updates from upstream"); }
407
408 if(!fromUpstream)
409 {
410 // If this is a read-only variable
411 // then reject the attempted update if from a set operation.
412 if(def.isReadOnly())
413 { throw new IllegalArgumentException("variable is read-only and may not be set (from downstream): " + def); }
414
415 // If it has a globalMap,
416 // then reject updates with more than one entry.
417 final Map<InstanceID,SimpleVariableValue> globalMap = newValue.getGlobalMap();
418 if((globalMap != null) && (globalMap.size() > 1))
419 { throw new IllegalArgumentException("variable has globalMap with more than one entry and may not be set (from downstream): " + def); }
420 }
421
422 // Allow an incoming global with an empty globalMap
423 // as indicative of an "expired" value,
424 // but only if its timestamp is a long time in the past
425 // and the variable really would have expired by now...
426 if(fromUpstream &&
427 !def.isLocal() &&
428 (newValue.getTimestamp() + 2*SystemVariables.MAX_QUIET_SYSTEM_VAR_LIFE_MS > System.currentTimeMillis()))
429 {
430 // From upstream must have at least one entry if a global,
431 // unless very old.
432 final Map<InstanceID,SimpleVariableValue> globalMap = newValue.getGlobalMap();
433 if((globalMap == null) || (globalMap.size() < 1))
434 {
435 if(IsDebug.isDebug) { System.err.println("WARNING: non-local, non-expired, upstream variable value has empty globalMap: " + newValue.getFullDescription()); }
436 return; /* Quietly discard for now. */
437 }
438 // { throw new IllegalArgumentException("non-local, non-expired, upstream variable value has empty globalMap: " + newValue.getFullDescription()); }
439 }
440
441 // If filtering dups, etc, then do so here.
442 // None of this applies to local values where we don't expect replays, clock skew, etc.
443 if(!dontFilterDups && !def.isLocal())
444 {
445 // Check the acceptability of the timestamp and then for a duplicate
446 // within a lock on _updatesSeen to prevent races.
447 synchronized(_updatesSeen)
448 {
449 // Discard anything (non-local) with a dodgy timestamp, ie outside the valid time window.
450 if(!timestampAcceptable(newValue))
451 {
452 if(IsDebug.isDebug) { System.err.println("WARNING: discarded invalid-timestamp variable set for "+newValue.getDef()+" "+newValue.getTimestamp()+" "+(new Date(newValue.getTimestamp()))); }
453 return;
454 }
455 // Discard duplicate value (within the acceptable time window).
456 if(!isNonDup(newValue))
457 {
458 if(IsDebug.isDebug) { System.err.println("WARNING: discarded duplicate variable set for "+newValue.getDef()+" identity="+System.identityHashCode(newValue)+": "+newValue); }
459 //if(IsDebug.isDebug) { (new Error("STOP")).printStackTrace(); }
460 // return;
461 }
462 }
463 }
464
465 // // Report when values propagate up or down...
466 // if(isDebugMode)
467 // {
468 // if(!newValue.equals(variables.get(def)))
469 // {
470 // System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": UPDATING VALUE "+(fromUpstream?"(from upstream)":"")+": _setVariable("+newValue.getFullDescription()+", "+fromUpstream+").]");
471 // }
472 // }
473
474 // Merge the new value into the cache.
475 _mergeValues(newValue, fromUpstream);
476
477 // If this is an event
478 // then record it in the event history as appropriate.
479 if(def.isEvent())
480 {
481 // Get any extant value from the cache.
482 final EventVariableValueSet extant = getEventHistory(def);
483
484 // Record this event if it is not from upstream.
485 if(!fromUpstream)
486 { extant.addEvent(newValue); }
487 }
488 }
489
490 /**Get the current event history for the given definition; never null but may return an empty value.
491 */
492 private EventVariableValueSet getEventHistory(final SimpleVariableDefinition def)
493 {
494 assert(def.isEvent());
495
496 EventVariableValueSet result;
497 // If there isn't one then create an empty one atomically
498 // or complete its load from persistent store if pending.
499 while(null == (result = eventHistory.get(def)))
500 {
501 assert(def.isEvent());
502
503 // We only need to load files already discovered during loadEventHistories();
504 // if not already known about then there is no history available for this event.
505 Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock> j = null;
506 if(null != (j = remainingToBeLoaded.get(def)))
507 {
508 assert(def.isPersistent());
509
510 // Attempt to grab our job from the work queue and lock it.
511 Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock> job = null;
512 for(final Iterator<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>> it = loadOrder.iterator(); it.hasNext(); )
513 {
514 final Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock> next = it.next();
515 if(def.equals(next.first))
516 {
517 job = next; // Found the job.
518 it.remove(); // Ensure not in the work queue.
519 assert(!loadOrder.contains(job));
520 break;
521 }
522 }
523
524 // Just before loading the history or waiting for another thread to finish doing so
525 // then, on the grounds that other synchronous loads may otherwise be needed soon,
526 // we may attempt to speculatively asynchronously force loading of any remaining ones.
527 // We use a discardable thread pool to avoid ever blocking here.
528 // We don't do this if (temporarily) conserving resources.
529 //
530 // THIS MUST NOT throw an exception nor return early here, else the history will never get loaded...
531 try
532 {
533 if(!loadOrder.isEmpty() && !GenUtils.mustConservePower())
534 { ThreadUtils.lowPriorityThreadPoolDiscardable.submit(_createLoaderCallable()); }
535 }
536 catch(final Exception e) { e.printStackTrace(); } // Log problem by continue.
537
538 // Could not find job on work queue; may be in progress or complete, ie this thread lost a race.
539 // Else try to lock the jobs exclusively to this thread, or abort, ie this thread lost a race.
540 if((null == job) || !job.third.tryLock())
541 {
542 if(IsDebug.isDebug && j.third.isLocked()) { System.out.println("Queueing to wait for history load for "+j.first); (new Throwable()).printStackTrace(System.out); }
543 j.third.lock(); j.third.unlock(); // Wait until load of this history by other thread is complete.
544 continue;
545 }
546 // Load the properties and remove from 'remaining' list, success or fail.
547 try { _loadEVVS(job); continue; }
548 catch(final Exception e)
549 {
550 // In case if error, whinge...
551 e.printStackTrace();
552 // ...and fall through to store empty state.
553 break;
554 }
555 }
556
557 // Store empty value (atomically), as no persisted/saved history to be loaded.
558 eventHistory.putIfAbsent(def, new EventVariableValueSet(def));
559 }
560 return(result);
561 }
562
563 /**Records the current loading concurrency across all instances; ref never null, value never negative. */
564 private static final AtomicInteger _loadEVVSConcurrency = new AtomicInteger();
565
566 /**Loads into the internal store the specified EVVS from its serialised form in the file.
567 * Only to be used when the caller has exclusive access to load this job,
568 * eg the current thread must have the lock on this job.
569 * <p>
570 * In case of error nothing is posted to the internal store
571 * but the lock is released and the entry removed from remainingToBeLoaded.
572 */
573 private void _loadEVVS(final Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock> job)
574 throws IOException, InvalidPropertiesFormatException
575 {
576 assert(job.third.isHeldByCurrentThread());
577
578 final int concurrencyS = _loadEVVSConcurrency.incrementAndGet();
579 //if(IsDebug.isDebug && (concurrencyS > 1)) { System.out.println("[BasicVarMgr._loadEVVS() starting; concurrency: "+concurrencyS); }
580
581 final long thisStart = System.currentTimeMillis();
582 final SimpleVariableDefinition def = job.first;
583 final File f = job.second;
584
585 try
586 {
587 final Object o = FileTools.deserialiseFromFile(f, SAVE_EVENT_HISTORIES_GZIPPED);
588 if(!(o instanceof EventVariableValueSet))
589 { throw new InvalidPropertiesFormatException("unexpected event history data deserialised from " + f); }
590
591 final EventVariableValueSet evvs = (EventVariableValueSet) o;
592 if(!def.equals(evvs.getDef()))
593 { throw new InvalidPropertiesFormatException("wrong event history data deserialised from " + f); }
594
595 // Post the results into the history map atomically.
596 eventHistory.putIfAbsent(job.first, evvs);
597
598 if(ORG.hd.d.IsDebug.isDebug || (System.currentTimeMillis() - thisStart > 2000) || (remainingToBeLoaded.size() == 0)) { System.out.println("[BasicVarMgr._loadEVVS(): loaded history ("+job.second.length()+" bytes, concurrency~"+Math.min(concurrencyS,_loadEVVSConcurrency.get())+", queue="+job.third.getQueueLength()+") for "+def+" in "+(System.currentTimeMillis() - thisStart)+"ms, remaining="+remainingToBeLoaded.size()+".]"); }
599 //if(IsDebug.isDebug && (job.third.getQueueLength() > 0)) { System.out.println("[BasicVarMgr._loadEVVS(): count of threads waiting for loading of "+job.first+" is "+job.third.getQueueLength()); }
600 }
601 finally
602 {
603 // Now remove from the remainingToBeLoaded whether this failed or succeeded.
604 // Do so atomically.
605 final boolean removed = remainingToBeLoaded.remove(job.first, job);
606 assert(removed) : "job has already gone or been overwritten";
607 job.third.unlock(); // Release the lock to let loose any threads waiting for this def to load...
608
609 // Drop the concurrency count.
610 _loadEVVSConcurrency.decrementAndGet();
611 }
612 }
613
614 /**Merge a new value into a variable in the cache.
615 * This very carefully merges any extant cached value
616 * with a set coming upstream or a new global value coming downstream,
617 * trying to cope with the fact that a new value outbound may meet
618 * an updated global value coming back from upstream
619 * (from the master for example).
620 * <p>
621 * We always immediately apply a local
622 * or override the value and globalMap entry of a global,
623 * though for a global it will probably require the value merged
624 * at the master's end-point coming back to sort things out,
625 * with some instability/flapping in the interim.
626 * <p>
627 * The update must already have been validated,
628 * eg must not have more than one globalMap entry if a set.
629 * <p>
630 * This deals with events as well as plain variables.
631 * <p>
632 * Takes a lock on variables to make this atomic.
633 *
634 * @param newValue a variable update/set coming upstream/downstream
635 * @param fromUpstream if true, this is an update coming downstream
636 */
637 private void _mergeValues(final SimpleVariableValue newValue,
638 final boolean fromUpstream)
639 {
640 if(newValue == null)
641 { throw new IllegalArgumentException(); }
642
643 final SimpleVariableDefinition def = newValue.getDef();
644
645 // If local, apply the value unconditionally...
646 if(def.isLocal())
647 {
648 variables.put(def, newValue); // Atomic update...
649 //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": _mergeValues("+newValue.getDef() +':'+ newValue+", "+fromUpstream+"): LOCAL UPDATE/SET; prev: "+extant+"]"); }
650 return;
651 }
652
653
654 // WE ARE UPDATING A GLOBAL...
655
656 // Get the local system ID, ready for a merge operation.
657 final SimpleVariableValue iidVar = variables.get(SystemVariables.LOCAL_SYS_ID);
658 if(iidVar == null)
659 { throw new IllegalStateException("Cannot set global variable without our system ID; should this instance be an endpoint?"); }
660 final InstanceID sysID = (InstanceID) iidVar.getValue();
661
662 // Do the merge atomically...
663 synchronized(variables)
664 {
665 // Get any extant value from the cache.
666 final SimpleVariableValue extant = variables.get(def);
667
668 // If the new value is a set (from downstream),
669 // and has no global map entry,
670 // then we assume it is locally generated and inject one for us,
671 // preserving the existing timestamp.
672 final SimpleVariableValue adjNewValue =
673 (!fromUpstream && (newValue.getGlobalMap() == null)) ?
674 newValue.put(sysID, newValue, true) : newValue;
675
676 // By now, we should not see any value with an empty globalMap,
677 // except possibly an "expired" value from upstream...
678 assert(fromUpstream || (adjNewValue.getGlobalMap() != null));
679 if(!fromUpstream && (adjNewValue.getGlobalMap() == null))
680 { throw new IllegalStateException("ERROR: should NOT be set()ting global with empty globalMap"); }
681
682 // If the extant value is null,
683 // then save the new value as-is and return immediately.
684 if(extant == null)
685 {
686 variables.put(def, adjNewValue);
687 //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": g1: _mergeValues("+adjNewValue.getFullDescription()+", "+fromUpstream+"): GLOBAL FIRST UPDATE/SET.]"); }
688 return;
689 }
690
691 // Merge in new values from the update,
692 // essentially taking something like the union
693 // of the old and new values.
694 SimpleVariableValue toSave = extant;
695 final Map<InstanceID,SimpleVariableValue> newGM = adjNewValue.getGlobalMap();
696 if(newGM != null)
697 {
698 for(final InstanceID key : newGM.keySet())
699 {
700 final SimpleVariableValue svv = newGM.get(key);
701 toSave = toSave.put(key, svv, true);
702 }
703 variables.put(def, toSave);
704 //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": gm: _mergeValues("+toSave.getFullDescription()+", "+fromUpstream+"): GLOBAL UPDATE/SET; prev: "+extant.getFullDescription()+".]"); }
705 }
706 }
707 }
708
709 /**Update a number of variables at once for efficiency; returns the number set.
710 * Is passed a Set of SimpleVariableValues and behaves as if it
711 * operates on all of them by calling setVariable() for each item
712 * in the Set in order (from low to high index).
713 * <p>
714 * This implementation "fails fast" on the first error.
715 *
716 * @return the number of variable values set;
717 * never negative, never more than the number passed in
718 *
719 * @throws java.lang.IllegalArgumentException on attempt to:
720 * set variable with value of wrong type or incompatible definition,
721 * set non-existent or read-only variable (or these can be ignored)
722 */
723 public int setVariables(final SimpleVariableValue newValues[])
724 throws IOException
725 {
726 if(newValues == null) { throw new IllegalArgumentException(); }
727
728 // if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": setVariables("+Arrays.asList(newValues)+").]"); }
729
730 final int nvLen = newValues.length;
731 for(int i = 0; i < nvLen; ++i)
732 { _setVariable(newValues[i], false); }
733
734 // if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": setVariables("+Arrays.asList(newValues)+") FINISHED.]"); }
735
736 // Claims to have set all the values passed to it.
737 return(nvLen);
738 }
739
740 /**Get a single variable value; returns null if no such value or wrong type.
741 * Always get from local cache.
742 * <p>
743 * This trims from the globalMap any apparently-dead systems,
744 * and stores any such trimmed variable back in the cache atomically
745 * so as to incrementally eliminate stale data
746 * and return only reasonably fresh data to the caller.
747 *
748 * @param var definition of variable to fetch; never null
749 *
750 * @throws IllegalArgumentException if var is null
751 */
752 public SimpleVariableValue getVariable(final SimpleVariableDefinition var)
753 {
754 // if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": getVariable("+var+").]"); }
755
756 if(var == null)
757 { throw new IllegalArgumentException(); }
758
759 // Do fetch/winnowing operation atomically.
760 synchronized(variables)
761 {
762 // Get the variable if set/extant,
763 // else return null.
764 final SimpleVariableValue svv =
765 variables.get(var);
766
767 // If null or no globalMap entries,
768 // then return as-is immediately;
769 // no winnowing to be done...
770 if((svv == null) || (svv.getGlobalMap() == null))
771 { return(svv); }
772
773 // Attempt to trim out dead very old values from the map.
774 final SimpleVariableValue svvWinnowed =
775 svv.removeAllKeysOlder(System.currentTimeMillis() -
776 2*SystemVariables.MAX_QUIET_SYSTEM_VAR_LIFE_MS);
777
778 // If this actually trimmed anything,
779 // quietly replace the cached entry with the trimmed version.
780 // This way we incrementally remove stale data.
781 final Map<InstanceID,SimpleVariableValue> gMapWinnowed = svvWinnowed.getGlobalMap();
782 if((gMapWinnowed == null) ||
783 (gMapWinnowed.size() < svv.getGlobalMap().size()))
784 {
785 //if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": getVariable() trimmed globalMap from "+svv.getGlobalMap().size()+" to "+((gMapWinnowed==null)?0:gMapWinnowed.size())+" entries.]"); }
786 // Store trimmed variable,
787 // discarding stale state.
788 variables.put(var, svvWinnowed);
789 return(svvWinnowed);
790 }
791
792 // Original was OK,
793 // so return that to avoid duplicates in the heap.
794 return(svv);
795 }
796 }
797
798 /**Get set of variable values altered on or after a given time, or all for -1; never null.
799 * Always get from local cache
800 * (the variable cache being periodically updated from the master).
801 * <p>
802 * This may be slow if there are many live variables.
803 * <p>
804 * This implementation never throws an IOException.
805 * <p>
806 * Note: we may limit the notion of "all" to be something a little greater
807 * than the maximum live-variable time window (ie variable lifetime),
808 * since beyond this the variable value has expired
809 * and is not deemed trustworthy anyway.
810 */
811 public SimpleVariableValue[] getVariables(final long changedSince)
812 {
813 // if(isDebugMode) { System.err.println("[BasicVarMgr "+System.identityHashCode(this)+": getVariables("+changedSince+").]"); }
814
815 // Get definitions of all extant variables atomically.
816 // Note that we never remove variables once set.
817 final Set<SimpleVariableDefinition> variableDefs;
818 synchronized(variables)
819 { variableDefs = new HashSet<SimpleVariableDefinition>(variables.keySet()); }
820
821 // Approximately size the result...
822 final List<SimpleVariableValue> result = new ArrayList<SimpleVariableValue>(variableDefs.size());
823
824 // Compute a backstop based on the maximum variable life
825 // and the maximum permitted clock skew between participating systems.
826 // We use quite a conservative value to avoid over-filtering here.
827 final long backStop = System.currentTimeMillis() - 2*SystemVariables.MAX_QUIET_SYSTEM_VAR_LIFE_MS
828 - 2*CoreConsts.MAX_PEER_CLOCK_SKEW_MS;
829 final long cutoffTime = Math.max(backStop, changedSince);
830
831 // Go through the set,
832 // removing anything definitely older than the specified time,
833 // and winnowing out stale data.
834 for(final Iterator<SimpleVariableDefinition> it = variableDefs.iterator(); it.hasNext(); )
835 {
836 final SimpleVariableDefinition def = it.next();
837
838 // Get value, trimmed of stale data if necessary.
839 final SimpleVariableValue svv = getVariable(def);
840
841 // If timestamp is new enough,
842 // add the value to the result.
843 if((svv.getTimestamp() >= cutoffTime)) // TODO: test this clause!
844 { result.add(svv); }
845 }
846
847 final SimpleVariableValue r[] = new SimpleVariableValue[result.size()];
848 result.toArray(r);
849 return(r);
850 }
851
852
853 /**Get the current partial, or previous full, event set at the specified interval; never returns null.
854 * This is a simplified interface to return either the current event set
855 * that is being collected, or the previous completed set.
856 * <p>
857 * The current set is the most timely, but may not contain enough data
858 * to be meaningful if the new interval has just started.
859 * <p>
860 * The previous set is complete and thus most likely to have enough samples
861 * to be useful, but is not completely current.
862 * <p>
863 * If the requested event set is not available,
864 * an empty synthetic one is created and returned.
865 * Thus, with this interface, it is not possible to distinguish between
866 * there being no events in the given interval or simply no data.
867 * <p>
868 * This calls getEventValues().
869 * <p>
870 * This implementation will not throw an IOException.
871 *
872 * @param def event definition (must be for an event); never null
873 * @param intervalSelector never null
874 * @param current if true the current event set is returned,
875 * else the previous complete set is returned
876 *
877 * @return requested event set; never null
878 *
879 * @throws IllegalArgumentException if the request arguments are invalid
880 */
881 public EventVariableValue getEventValue(final SimpleVariableDefinition def,
882 final EventPeriod intervalSelector,
883 final boolean current)
884 {
885 if((def == null) || !def.isEvent() || (intervalSelector == null))
886 { throw new IllegalArgumentException(); }
887
888 final long now = System.currentTimeMillis();
889 final long currentIntervalNumber = intervalSelector.getIntervalNumber(now);
890 final long intervalNumber = (current ? currentIntervalNumber : (currentIntervalNumber-1));
891
892 final EventVariableValue evv[] = getEventValues(def,
893 intervalSelector,
894 intervalNumber,
895 null);
896
897 // If we got some real (non-null) data back, then return it...
898 if((evv.length > 0) && (evv[0] != null))
899 { return(evv[0]); }
900
901 // Else invent empty non-authoritative return value (with no events)...
902 return(getEmptyNonAuthEVV(def, intervalSelector, intervalNumber));
903 }
904
905 /**Cache of recently-requested 'empty' non-auth EVVs.
906 * We make this large enough to accommodate the current and previous/full values for each def
907 * for at least a couple of period values,
908 * but not so large as to be likely to be a problem.
909 * <p>
910 * We are prepared to drop this entirely in acute memory shortage.
911 */
912 private static final MemoryTools.SimpleProbabilisticCache<Tuple.Triple<SimpleVariableDefinition, EventPeriod, Long>, EventVariableValue> _getEmptyNonAuthEVV_cache =
913 MemoryTools.SimpleProbabilisticCache.<Tuple.Triple<SimpleVariableDefinition,EventPeriod,Long>, EventVariableValue>create(8*SystemVariables.defs.size(), "_getEmptyNonAuthEVV_cache");
914
915 /**Get a non-authoritative empty EventVariableValue with the specified definition/period/interval; never null.
916 * This may cache results to help control the number of instances in circulation,
917 * though this empty values are assumed to otherwise have a fairly short life.
918 *
919 * @param def valid event definition; never null
920 * @param intervalSelector valid period for the definition; never null
921 * @param intervalNumber valid interval number for the period; strictly positive
922 * @return empty non-auth value; never null
923 */
924 public static EventVariableValue getEmptyNonAuthEVV(final SimpleVariableDefinition def,
925 final EventPeriod intervalSelector,
926 final long intervalNumber)
927 {
928 // Try from cache to avoid the cost of constructing a whole new instance.
929 final Tuple.Triple<SimpleVariableDefinition, EventPeriod, Long> cacheKey =
930 new Tuple.Triple<SimpleVariableDefinition, EventPeriod, Long>(def, intervalSelector, intervalNumber);
931 final EventVariableValue cachedValue = _getEmptyNonAuthEVV_cache.get(cacheKey);
932 if(cachedValue != null) { return(cachedValue); }
933
934 // Compute and cache the result for next time.
935 final EventVariableValue result = new EventVariableValue(false,
936 def,
937 intervalSelector,
938 intervalNumber,
939 0, null, null);
940 _getEmptyNonAuthEVV_cache.put(cacheKey, result);
941
942 return(result);
943 }
944
945 /**Get the specified event sets for the specified intervals; never null.
946 *
947 * @param intervalNumber the number of the first interval
948 * for which data is potentially required;
949 * if too far in the past or future then possibly no data
950 * will be available,
951 * zero is used to access the "all" bucket
952 * @param whichValues each true bit represents a slot for which data is
953 * required, bit 0 indicating data from the slot within which
954 * firstIntervalTime is located, bit 1 the previous slot, etc;
955 * null is treated as the common case equivalent to just bit 0 set
956 *
957 * @return as many of the requested values as available,
958 * return may contain nulls or be zero-length but is never null
959 */
960 public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
961 final EventPeriod intervalSelector,
962 final long intervalNumber,
963 final BitSet whichValues)
964 {
965 if((def == null) || !def.isEvent() ||
966 (intervalSelector == null) ||
967 (intervalNumber < 0))
968 { throw new IllegalArgumentException(); }
969 final Set<EventPeriod> evPeriodSubset = def.getEvPeriodSubset();
970 if((evPeriodSubset != null) && !evPeriodSubset.contains(intervalSelector))
971 { throw new IllegalArgumentException("bad interval requested"); }
972
973 // Get the event history, loading it if necessary...
974 final EventVariableValueSet evvs = getEventHistory(def);
975
976 // If no entry at all for this event, then return an empty result set.
977 if(evvs == null) { return(EventVariableValuePeriodRow._NO_EVENT_VALUES); }
978
979 assert(def.equals(evvs.getDef()));
980
981 // Delegate handling of request to event set.
982 return(evvs.getEventValues(intervalSelector, intervalNumber, whichValues));
983 }
984
985
986 /**Set/override the specified event sets for the specified intervals; never null.
987 * This is used to set values that have been fetched from upstream
988 * (ie closer to the master data store).
989 * <p>
990 * This may ignore the call,
991 * and certainly will if the intervalNumber is outside the current time window it holds.
992 * <p>
993 * This will generally accept the value and store it if:
994 * <ul>
995 * <li>The intervalNumber is within the current window.
996 * <li>The intervalSelector is a period for which this event has data.
997 * <li>The supplied value is "better" than any current one held, ie:
998 * <ul>
999 * <li>We hold a null and evv is non-null.
1000 * <li>We hold a non-authoritative value and evv is authoritative.
1001 * <li>The authoritative-ness is the same but we hold an entry
1002 * with a lower total count than evv.
1003 * <ul>
1004 * </ul>
1005 * </ul>
1006 * However, we will <em>not</em> accept non-authoritative updates
1007 * if at the end-point (ie this is the master store),
1008 * since we are the authoritative source of data.
1009 *
1010 * @param evv non-null event value
1011 */
1012 public void setEventValue(final EventVariableValue evv)
1013 // throws IOException
1014 {
1015 if(evv == null)
1016 { throw new IllegalArgumentException(); }
1017 final SimpleVariableDefinition def = evv.getDef();
1018 if(!def.isEvent())
1019 { throw new IllegalArgumentException(); }
1020
1021 // If we are at an endpoint
1022 // then disregard attempts to set non-auth overrides.
1023 if(endPoint && !evv.isAuthoritative())
1024 { return; }
1025
1026 // Find our cached event history for this event,
1027 // atomically creating it if need be.
1028 final EventVariableValueSet extant = getEventHistory(def);
1029
1030 extant.setEventValue(evv);
1031 }
1032
1033
1034
1035
1036 /**Persist variables and events to disc.
1037 * This can either save everything applicable,
1038 * or just those things things that seem to be out-of-date on disc.
1039 * <p>
1040 * Only persistent event data is saved,
1041 * and generally each named event type has its data saved in a separate file,
1042 * in a form that should be reasonably robust
1043 * (eg a failure to load one item should not have all items unloadable).
1044 * <p>
1045 * This data may be stored in binary serialised or XML forms,
1046 * but is usually GZIPped to save space and speed up read/write.
1047 * <p>
1048 * If some saves fail then this will try to continue with others,
1049 * but throw one of the errors encountered (usually the last one)
1050 * at the end, so as to save as much as possible.
1051 * <p>
1052 * The incremental mode helps keeps the duration of any one call
1053 * as small as is reasonably possible, but will be somewhat less efficient
1054 * then non-incremental mode.
1055 * <p>
1056 * This locks out other activity while saving to ensure consistency;
1057 * it need not hold a lock continuously for whole period of the call
1058 * as it would usually be enough to do so while collecting candidate
1059 * histories to save, and while actually saving each one of them separately.
1060 * <p>
1061 * To avoid deadlocks, this takes a lock on this instance
1062 * before taking any other lock.
1063 *
1064 * @param dir the directory to save to; never null
1065 * @param justChanges just write histories changed since the last save if possible,
1066 * else unconditionally save all persistent event histories
1067 * @param keepHistory if true, try to roll the logs at least daily
1068 * to help keep a trail of older snapshot though the storage space
1069 * required may grow without bound,
1070 * @param incremental if true, then at most one event is saved,
1071 * chosen in such a way (eg by max time since last save or randomly)
1072 * that eventually all eligible histories will be saved
1073 */
1074 public void saveEventHistories(final File dir,
1075 final boolean justChanges,
1076 final boolean keepHistory,
1077 final boolean incremental)
1078 throws IOException
1079 {
1080 if(dir == null)
1081 { throw new IllegalArgumentException(); }
1082
1083 if(!dir.isDirectory())
1084 { throw new FileNotFoundException("not a directory: " + dir); }
1085
1086 if(!dir.canWrite())
1087 { throw new IOException("cannot write to " + dir); }
1088
1089 // Create/format the date string once, if we are going to use it.
1090 final String dateString = (!keepHistory) ? "" : ("." + dateFmtEHFile.format(new Date()));
1091
1092 // Get potential events to be saved (assumed to be in a thread-safe manner).
1093 // Note that histories due to be asynchronously loaded but not yet done
1094 // will not be candidates here.
1095 final List<SimpleVariableDefinition> events =
1096 new ArrayList<SimpleVariableDefinition>(eventHistory.keySet());
1097
1098 // Randomise the ordering for incremental mode to avoid starvation.
1099 // We trust the fast random generator to be OK...
1100 if(incremental)
1101 { Collections.shuffle(events, Rnd.fastRnd); }
1102
1103 // Iterate over event data,
1104 // saving each persistent event's history to a separate file.
1105 // Include today's date in the name to provide backups/history where requested.
1106 IOException err = null; // Any error encountered.
1107 for(final SimpleVariableDefinition def : events)
1108 {
1109 assert(def.isEvent()); // Should not be any non-event variable data in here!
1110
1111 // Only save persistent events' data.
1112 if(!def.isPersistent()) { continue; }
1113
1114 // Get the latest values for this definition.
1115 final EventVariableValueSet history = eventHistory.get(def);
1116 if(history == null) { continue; }
1117
1118 final StringBuilder sb = new StringBuilder(80);
1119 sb.append(EVENT_STORE_FILENAME_PREFIX);
1120 sb.append(def.getName());
1121 sb.append(EVENT_STORE_NAMETERM);
1122 sb.append(dateString); // Optional date...
1123 sb.append(EVENT_STORE_FILENAME_SUFFIX_SER_GZ);
1124
1125 // Persist the data if necessary.
1126 final File file = new File(dir, sb.toString());
1127 try
1128 {
1129 // If only saving changes,
1130 // we see if any events arrived since the file was last saved.
1131 if(justChanges)
1132 {
1133 // If the history file appears to be present and correct
1134 // and newer than the last added event
1135 // then skip the save...
1136 // We don't care much about minor race possibilities here.
1137 if(file.exists() && file.isFile() && file.canRead() && (file.length() > 0) &&
1138 (history.getLastEventTime() < file.lastModified()))
1139 { continue; }
1140 }
1141
1142 // Save the data as a compressed serialised object...
1143 // Lock out mutators on the history object during the save.
1144 synchronized(history)
1145 { FileTools.serialiseToFile(history, file, SAVE_EVENT_HISTORIES_GZIPPED, !ORG.hd.d.IsDebug.isDebug); }
1146
1147 // If incremental, bucket out after the first successful save.
1148 if(incremental)
1149 { break; }
1150 }
1151 catch(final IOException e)
1152 {
1153 // Print and temporarily absorb any errors encountered during save.
1154 // Continue with other saves as far as possible.
1155 e.printStackTrace();
1156 err = e;
1157 }
1158 }
1159
1160 // Re-throw the last error encountered, if any.
1161 if(err != null) { throw err; }
1162 }
1163
1164 /**Our private static instance of a GMT calendar.
1165 * No one must change the ZONE or DST offset.
1166 */
1167 private static final Calendar GMTCalendar = Calendar.getInstance();
1168 /**Initialise GMTCalendar. */
1169 static
1170 {
1171 // Set to GMT
1172 // i.e 0 Offset
1173 GMTCalendar.set(Calendar.ZONE_OFFSET, 0);
1174 // and 0 Daylight savings
1175 GMTCalendar.set(Calendar.DST_OFFSET, 0);
1176 }
1177
1178 /**Format we use to insert into event history file name. */
1179 private static final SimpleDateFormat dateFmtEHFile =
1180 new SimpleDateFormat("yyyyMMdd");
1181 /**Set the Calendar for dateFmtEHFile to be the GMT calendar. */
1182 static { dateFmtEHFile.setCalendar(GMTCalendar); }
1183
1184 /**If true, GZIP the saved histories to save space and read/write time. */
1185 private static final boolean SAVE_EVENT_HISTORIES_GZIPPED = true;
1186
1187 /**Prefix to event history store file name. */
1188 public static final String EVENT_STORE_FILENAME_PREFIX = "eventStore.";
1189
1190 /**Terminator to go after event name in file store.
1191 * Format of file is:
1192 * <pre>
1193 * EVENT_STORE_FILENAME_PREFIX + name + EVENT_STORE_NAMETERM [ + date ] + legal suffix
1194 * </pre>
1195 * <p>
1196 * The nameterm includes at least one character that cannot appear in
1197 * an event name to avoid ambiguity where one name is a prefix of another.
1198 */
1199 public static final String EVENT_STORE_NAMETERM = "-T";
1200
1201 /**Suffix for GZIPped serialised event history store file name. */
1202 public static final String EVENT_STORE_FILENAME_SUFFIX_SER_GZ = ".ser.gz";
1203
1204
1205 /**Definitions and the files from which their event histories are to be loaded; never null but may be empty.
1206 * The items are queued in the preferred order they should be loaded,
1207 * but items from anywhere in the list queue can be loaded on demand.
1208 * <p>
1209 * Entries should be taken from this with poll() and immediately locked with the Lock
1210 * which will have them processed in preferred order,
1211 * or in random order while attempting to load a history on demand
1212 * by using the iterator and remove() and immediately attempting to lock
1213 * with a failure treated as if the item had not been in the queue at all
1214 * (because another Thread has stolen it).
1215 * <p>
1216 * The lock must be released after the item is removed from allToBeLoaded.
1217 * <p>
1218 * ConcurrentLinkedQueue for thread-safe lock-free access.
1219 */
1220 private final Queue<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>> loadOrder = new ConcurrentLinkedQueue<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>>();
1221
1222 /**This is the thread-safe collection of all items waiting to be loaded; never null but may be empty.
1223 * An item is only removed from this collection once it has completed loading
1224 * successfully or not.
1225 */
1226 private final ConcurrentMap<SimpleVariableDefinition, Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>> remainingToBeLoaded = new ConcurrentHashMap<SimpleVariableDefinition, Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>>();
1227
1228 /**Load event histories from disc.
1229 * This will load the "latest" saved state, if any,
1230 * for each persistent event.
1231 * "Latest" may be measured by file timestamp (potentially slow and fragile)
1232 * or lexically-latest (eg by the date embedded in the filename)
1233 * or by some combination of the two.
1234 * <p>
1235 * Only the event history will be loaded.
1236 * <p>
1237 * If some loads fail then this will try to continue with others,
1238 * but re-throw one of the errors encountered (usually the last one)
1239 * at the end, so as to load as much as possible.
1240 * <p>
1241 * Though running this call concurrently with other (mutator) operations
1242 * (or indeed other instances of this call)
1243 * should be safe, such operation is not advised.
1244 * <p>
1245 * Should be called at most once, before any set (and get) operations.
1246 * <p>
1247 * We try to load largest-first to maximise concurrency.
1248 *
1249 * @param dir the directory to load from; never null
1250 * @param async if true then the caller will not be blocked waiting to load histories
1251 * (else if false the caller will be blocked until they are all loaded)
1252 * and if possible the event histories will be loaded in the background else as-needed,
1253 * blocking access to individual histories not yet loaded while they are loaded;
1254 * this only works for event histories with no state loaded at all
1255 */
1256 public void loadEventHistories(final File dir, final boolean async)
1257 throws IOException
1258 {
1259 if(dir == null)
1260 { throw new IllegalArgumentException(); }
1261
1262 if(!dir.isDirectory())
1263 { throw new FileNotFoundException("not a directory: " + dir); }
1264
1265 if(!dir.canRead())
1266 { throw new IOException("cannot read from " + dir); }
1267
1268 if(!remainingToBeLoaded.isEmpty())
1269 { throw new IllegalStateException("already loading"); }
1270
1271 final long startTime = System.currentTimeMillis();
1272
1273 // Get listing of files in the directory...
1274 final File files[] = dir.listFiles();
1275
1276 // Construct a short-list of those that look like usable, ie are:
1277 // * starts with correct prefix
1278 // * contains the name terminator
1279 // * ends with a legal suffix (may be more than one in future)
1280 //
1281 // We should probably also only accept candidiates that are:
1282 // * plain files
1283 // * readable
1284 // * non-zero-length
1285 // but retrieval of such stats from disc at start-up may be slow.
1286 // This shortlist is of short filenames, with no directory components.
1287 final List<String> candiates = new ArrayList<String>(files.length);
1288 for(final File f : files)
1289 {
1290 final String name = f.getName();
1291 if(!name.startsWith(EVENT_STORE_FILENAME_PREFIX)) { continue; }
1292 if(-1 == name.indexOf(EVENT_STORE_NAMETERM)) { continue; }
1293 if(!name.endsWith(EVENT_STORE_FILENAME_SUFFIX_SER_GZ)) { continue; }
1294
1295 // final File fullPath = new File(dir, name);
1296 // if(!fullPath.isFile()) { continue; }
1297 // if(!fullPath.canRead()) { continue; }
1298 // if(fullPath.length() <= 0) { continue; }
1299
1300 // Good candidate, so add it.
1301 candiates.add(name);
1302 }
1303
1304 if(ORG.hd.d.IsDebug.isDebug) { System.out.println("[BasicVarMgr.loadEventHistories(): found event history "+candiates.size()+" candidates in "+dir+".]"); }
1305
1306 if(candiates.size() == 0)
1307 {
1308 System.err.println("WARNING: BasicVarMgr.loadEventHistories() found no event-history files in " + dir);
1309 return;
1310 }
1311
1312 // Compute list of persistent events.
1313 final List<SimpleVariableDefinition> defs =
1314 new ArrayList<SimpleVariableDefinition>(SystemVariables.nameToDef.size());
1315 for(final SimpleVariableDefinition def : SystemVariables.nameToDef.values())
1316 {
1317 if(!def.isEvent() || !def.isPersistent()) { continue; }
1318 defs.add(def);
1319 }
1320 if(defs.isEmpty()) { return; /* Nothing to do... */ }
1321
1322 if(ORG.hd.d.IsDebug.isDebug) { System.out.println("[BasicVarMgr.loadEventHistories(): candidates found for defs "+defs+".]"); }
1323
1324
1325 // Create an ordered List of <def,File> pairs
1326 // with the largest files, assumed slowest to load, first.
1327 final List<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>> sorted = new ArrayList<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>>(defs.size());
1328 for(final SimpleVariableDefinition def : defs)
1329 {
1330 // For each persistent event in the SystemVariables list,
1331 // look for the newest/best candidate file.
1332 // (Double-check after deserialising
1333 // that we have loaded data for the right def!)
1334
1335 // Best candidate so far...
1336 File bestCandidate = null;
1337
1338 // Minimum non-ambiguous prefix for an event-store file
1339 // for this event name.
1340 final String minUniquePrefix = EVENT_STORE_FILENAME_PREFIX +
1341 def.getName() + EVENT_STORE_NAMETERM;
1342 for(final String name : candiates)
1343 {
1344 if(!name.startsWith(minUniquePrefix)) { continue; }
1345
1346 final File candiate = new File(dir, name);
1347
1348 // If there is no existing candidate,
1349 // or this one is newer,
1350 // then this becomes our best candidate.
1351 if((bestCandidate == null) ||
1352 (candiate.lastModified() > bestCandidate.lastModified()))
1353 {
1354 // Skip/veto obviously bogus/broken files.
1355 if(!candiate.isFile()) { continue; }
1356 if(!candiate.canRead()) { continue; }
1357 if(candiate.length() <= 0) { continue; }
1358
1359 // Looks to be the best so far.
1360 bestCandidate = candiate;
1361 }
1362 }
1363
1364 // If a suitable candidate has been found then record it.
1365 if(bestCandidate != null)
1366 { sorted.add(new Tuple.Triple<SimpleVariableDefinition, File, ReentrantLock>(def, bestCandidate, new ReentrantLock())); }
1367 }
1368 Collections.sort(sorted, new Comparator<Tuple.Triple<SimpleVariableDefinition,File,ReentrantLock>>(){
1369 /**Order by file size, largest first. */
1370 public int compare(final Triple<SimpleVariableDefinition, File, ReentrantLock> o1,
1371 final Triple<SimpleVariableDefinition, File, ReentrantLock> o2)
1372 {
1373 final long s1 = o1.second.length();
1374 final long s2 = o2.second.length();
1375 if(s1 > s2) { return(-1); }
1376 if(s1 < s2) { return(+1); }
1377 return(0); // Improbably the same size!
1378 }
1379 });
1380 // Add all the items to the queue to be processed...
1381 loadOrder.addAll(sorted);
1382 // Record them all as 'remaining' items to be completed.
1383 for(final Triple<SimpleVariableDefinition, File, ReentrantLock> e : sorted)
1384 { remainingToBeLoaded.put(e.first, e); }
1385
1386 // if(true) { return; } // Force totally async loads...
1387
1388 // Do not try to load the data asynchronously if (temporarily) short of power.
1389 if(async && GenUtils.mustConservePower()) { return; }
1390
1391 // Run as many loader tasks as possible at once without blocking.
1392 // We regard these as mainly CPU intensive in practice.
1393 // All of the tasks are identical and simply loop pooling the load queue for work,
1394 // do the loads serially, and quit when the load queue is empty..
1395 // If running asynchronously then don't wait for them
1396 // and put them on a discarding pool to ensure that we never block for them.
1397 // (So if loading asynchronously we may need to load on demand
1398 // whatever histories have not been pre-loaded in time.)
1399 final ExecutorService pool = async ? ThreadUtils.lowPriorityThreadPoolDiscardable :
1400 ThreadUtils.computeIntensiveThreadPool;
1401 // Always start at least one task (possibly discardable) and no more than the number of CPUs
1402 // but otherwise to fill up the chosen thread pool to be done as fast as possible.
1403 // Try not to completely fill the target pool if possible!
1404 final int nTasks = Math.max(1, Math.min(ThreadUtils.AVAILABLE_PROCESSORS,
1405 (async ? ThreadUtils.lowPriorityThreadPoolDiscardableSpace() : ThreadUtils.computeIntensiveThreadPoolSpace()) - 1));
1406 if(ORG.hd.d.IsDebug.isDebug || (nTasks > 1)) { System.out.println("[BasicVarMgr.loadEventHistories(): async="+async+", files/defs="+sorted.size()+", nTasks="+nTasks+".]"); }
1407
1408 // Create and attempt to launch the tasks.
1409 final List<Future<Object>> futures = new ArrayList<Future<Object>>(nTasks);
1410 for(int i = nTasks; --i >= 0; )
1411 { futures.add(pool.submit(_createLoaderCallable())); }
1412
1413 try
1414 {
1415 // If running synchronously then wait for all the tasks to finish.
1416 if(!async)
1417 {
1418 for(final Future<Object> future : futures) { future.get(); }
1419 if(!loadOrder.isEmpty() || !remainingToBeLoaded.isEmpty())
1420 { throw new IOException("not all load tasks completed"); }
1421 }
1422 }
1423 // Field some task-related exceptions...
1424 catch(final InterruptedException e)
1425 {
1426 // Log the exception.
1427 e.printStackTrace();
1428 Thread.currentThread().interrupt(); // Keep our interruptedness.
1429 throw new RuntimeException(e);
1430 }
1431 catch(final ExecutionException e)
1432 {
1433 // Log the exception.
1434 e.printStackTrace();
1435 // Rethrow any IOException cause directly.
1436 if(e.getCause() instanceof IOException)
1437 { throw (IOException) e.getCause(); }
1438 // Wrap other exception types to rethrow unchecked.
1439 throw new RuntimeException(e);
1440 }
1441
1442 if(ORG.hd.d.IsDebug.isDebug || (System.currentTimeMillis() - startTime > 2000)) { System.out.println("[BasicVarMgr.loadEventHistories(): "+(System.currentTimeMillis()-startTime)+"ms.]"); }
1443 }
1444
1445 /**Creates a Callable that processes all outstanding queued loads; never null.
1446 * The call() returns null when done.
1447 * @param err
1448 * @return
1449 */
1450 private Callable<Object> _createLoaderCallable()
1451 {
1452 return new Callable<Object>(){
1453 public Object call() throws Exception
1454 {
1455 if(ORG.hd.d.IsDebug.isDebug || (_loadEVVSConcurrency.get() > 1)) { System.out.println("[BasicVarMgr.loadEventHistories(): starting background loader task, load concurrency currently "+_loadEVVSConcurrency.get()+"...]"); }
1456
1457 Exception err = null; // Last error encountered, if any.
1458
1459 // Continue until we run out of jobs...
1460 Triple<SimpleVariableDefinition, File, ReentrantLock> job;
1461 while(null != (job = loadOrder.poll()))
1462 {
1463 if(!job.third.tryLock()) // Will be unlocked by _loadEVVS() when done.
1464 {
1465 // Someone else grabbed and locked this just as we were taking it from the head!
1466 if(ORG.hd.d.IsDebug.isDebug) { System.out.println("[BasicVarMgr.loadEventHistories(): work snatched from our grasp for "+job.first+".]"); }
1467 continue;
1468 }
1469 try
1470 {
1471 // Log if it has taken a long time to get started.
1472 //if(ORG.hd.d.IsDebug.isDebug || (System.currentTimeMillis() - startTime > 2000)) { System.out.println("[BasicVarMgr.loadEventHistories(): loading history ("+job.second.length()+" bytes) for "+job.first+" starting after "+(System.currentTimeMillis() - startTime)+"ms.]"); }
1473
1474 _loadEVVS(job);
1475 }
1476 catch(final Exception e)
1477 {
1478 err = e; // Note the error, but absorb it and continue with next load.
1479 e.printStackTrace();
1480 }
1481 }
1482
1483 // Re-throw last error encountered.
1484 if(err != null) { throw err; }
1485
1486 return(null); // No real return value...
1487 }
1488 };
1489 }
1490 }