org.hd.d.pg2k.svrCore.vars
Class BasicVarMgr

java.lang.Object
  extended by org.hd.d.pg2k.svrCore.vars.BasicVarMgr
All Implemented Interfaces:
BasicVarMgrInterface
Direct Known Subclasses:
PipelineVarMgr

public class BasicVarMgr
extends java.lang.Object
implements BasicVarMgrInterface

Base class to "manage" the set of system variables. This holds the actual values and does:

This has a mechanism for discarding duplicate messages as they arrive.

Attempts to reign in retained state and 'acceptability' window in case of extreme system memory pressure, ie this tries to avoid ever being the cause of OOMs.

Thread-safe; does not hold any externally-visible locks.

This implements the generic variable-manager interface.


Nested Class Summary
private static class BasicVarMgr.MessageIDKey
          Immutable message ID suitable for use as a key in detecting duplicates.
 
Field Summary
private static MemoryTools.SimpleLRUMapAutoSizeForHitRate<Tuple.Triple<SimpleVariableDefinition,EventPeriod,java.lang.Long>,EventVariableValue> _getEmptyNonAuthEVV_cache
          Cache of recently-requested 'empty' non-auth EVVs.
private static java.util.concurrent.atomic.AtomicInteger _loadEVVSConcurrency
          Records the current loading concurrency across all instances; ref never null, value never negative.
private  DuplicateIDChecker<BasicVarMgr.MessageIDKey> _updatesSeen
          Set of historical values that we check incoming message against for duplicates.
private static java.text.SimpleDateFormat dateFmtEHFile
          Format we use to insert into event history file name.
protected  boolean dontFilterDups
          Unless true we screen out duplicate events/updates and those with silly timestamps.
private  boolean endPoint
          If true then this variable set is at an end-point.
static java.lang.String EVENT_STORE_FILENAME_PREFIX
          Prefix to event history store file name.
static java.lang.String EVENT_STORE_FILENAME_SUFFIX_SER_GZ
          Suffix for GZIPped serialised event history store file name.
static java.lang.String EVENT_STORE_NAMETERM
          Terminator to go after event name in file store.
private  java.util.concurrent.ConcurrentMap<SimpleVariableDefinition,EventVariableValueSet> eventHistory
          Map of event history; never null.
private static java.util.Calendar GMTCalendar
          Our private static instance of a GMT calendar.
protected static boolean isDebugMode
          If true, this class (and possibly its derivatives) are in debug mode.
private  java.util.Queue<Tuple.Triple<SimpleVariableDefinition,java.io.File,java.util.concurrent.locks.ReentrantLock>> loadOrder
          Definitions and the files from which their event histories are to be loaded; never null but may be empty.
static long MAX_APPARENT_MESSAGE_AGE_MS
          Maximum apparent age of message wrt local clock for message to pass filtering.
static long MAX_UPSTREAM_RETENTION_MS
          Time after which items pending to go upstream are dropped after an error.
private static int MIN_DUP_CHECK_RETENTION_MS
          Minimum message retention time for duplicate checking (ms); strictly positive.
private  java.util.concurrent.ConcurrentMap<SimpleVariableDefinition,Tuple.Triple<SimpleVariableDefinition,java.io.File,java.util.concurrent.locks.ReentrantLock>> remainingToBeLoaded
          This is the thread-safe collection of all items waiting to be loaded; never null but may be empty.
private static boolean SAVE_EVENT_HISTORIES_GZIPPED
          If true, GZIP the saved histories to save space and read/write time.
private  java.util.Hashtable<SimpleVariableDefinition,SimpleVariableValue> variables
          Cached variable values as map; never null.
 
Constructor Summary
BasicVarMgr()
          Create a normal set of managed system variables, initially empty.
BasicVarMgr(boolean endPoint)
          Create a set of managed system variables.
BasicVarMgr(boolean endPoint, boolean dontFilterDups)
          Create a set of managed system variables.
 
Method Summary
private  java.util.concurrent.Callable<java.lang.Object> _createLoaderCallable()
          Creates a Callable that processes all outstanding queued loads; never null.
private  void _loadEVVS(Tuple.Triple<SimpleVariableDefinition,java.io.File,java.util.concurrent.locks.ReentrantLock> job)
          Loads into the internal store the specified EVVS from its serialised form in the file.
private  void _mergeValues(SimpleVariableValue newValue, boolean fromUpstream)
          Merge a new value into a variable in the cache.
protected  void _setVariable(SimpleVariableValue newValue, boolean fromUpstream)
          Set variable for upstream and downstream values.
static EventVariableValue getEmptyNonAuthEVV(SimpleVariableDefinition def, EventPeriod intervalSelector, long intervalNumber)
          Get a non-authoritative empty EventVariableValue with the specified definition/period/interval; never null.
private  EventVariableValueSet getEventHistory(SimpleVariableDefinition def)
          Get the current event history for the given definition; never null but may return an empty value.
 EventVariableValue getEventValue(SimpleVariableDefinition def, EventPeriod intervalSelector, boolean current)
          Get the current partial, or previous full, event set at the specified interval; never returns null.
 EventVariableValue[] getEventValues(SimpleVariableDefinition def, EventPeriod intervalSelector, long intervalNumber, java.util.BitSet whichValues)
          Get the specified event sets for the specified intervals; never null.
 SimpleVariableValue getVariable(SimpleVariableDefinition var)
          Get a single variable value; returns null if no such value or wrong type.
 SimpleVariableValue[] getVariables(long changedSince)
          Get set of variable values altered on or after a given time, or all for -1; never null.
 boolean isEndPoint()
          Returns true if this is at an endpoint.
private  boolean isNonDup(SimpleVariableValue svv)
          Check/accept message (with true return value) if not a duplicate.
 void loadEventHistories(java.io.File dir, boolean async)
          Load event histories from disc.
 void saveEventHistories(java.io.File dir, boolean justChanges, boolean keepHistory, boolean incremental)
          Persist variables and events to disc.
 void setEventValue(EventVariableValue evv)
          Set/override the specified event sets for the specified intervals; never null.
 void setVariable(SimpleVariableValue newValue)
          Set variable.
 int setVariables(SimpleVariableValue[] newValues)
          Update a number of variables at once for efficiency; returns the number set.
private  boolean timestampAcceptable(SimpleVariableValue value)
          Returns true if the supplied timestamp from an incoming message/event/update is OK.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

isDebugMode

protected static final boolean isDebugMode
If true, this class (and possibly its derivatives) are in debug mode. This will normally result in extra []-bracketed comments in System.err.

See Also:
Constant Field Values

endPoint

private final boolean endPoint
If true then this variable set is at an end-point.


dontFilterDups

protected final boolean dontFilterDups
Unless true we screen out duplicate events/updates and those with silly timestamps. We do this so that values can be delivered by a flooding algorithm if need be, but also so that mistakes and repeats will not cause duplication.

This should be enabled by default since it will quietly eliminate many problems; it may have to be off to run replays of old data for example.


MAX_UPSTREAM_RETENTION_MS

public static final long MAX_UPSTREAM_RETENTION_MS
Time after which items pending to go upstream are dropped after an error. This is to keep a finite bound on the size of any outbound queue in case of some long-lasting error (eg the master server down).

This is taken to be somewhat longer than the longest time that the data could reasonably be useful at first glance, to allow for clock skew and other insults.

Everything can probably be discarded safely after this time; some values can probably be discarded much sooner.


MAX_APPARENT_MESSAGE_AGE_MS

public static final long MAX_APPARENT_MESSAGE_AGE_MS
Maximum apparent age of message wrt local clock for message to pass filtering.


MIN_DUP_CHECK_RETENTION_MS

private static final int MIN_DUP_CHECK_RETENTION_MS
Minimum message retention time for duplicate checking (ms); strictly positive. Must be at least the maximum allowed clock skew plus something for transit time. We might hope to widen the window when resources allow.

We must not accept messages at all that are outwith this minimum window.

See Also:
Constant Field Values

_updatesSeen

private final DuplicateIDChecker<BasicVarMgr.MessageIDKey> _updatesSeen
Set of historical values that we check incoming message against for duplicates. A thread-safe set of triples of timestamp, definition (name) and value (hash), sorted in order by timestamp to make removal of expiring oldest entries easy.

As a minimum we must retain messages at least MAX_PEER_CLOCK_SKEW_MS but preferably up to MAX_APPARENT_MESSAGE_AGE_MS + MAX_PEER_CLOCK_SKEW_MS.

A lock can be held on this instance to make multiple operations atomic.


variables

private final java.util.Hashtable<SimpleVariableDefinition,SimpleVariableValue> variables
Cached variable values as map; never null. Map from SimpleVariableDefinition to SimpleVariableValue.

Used by:

A lock must be held on this object to do multiple operations atomically, but no other visible lock must be grabbed inside a lock on this.

Mappings are never remove()d from this table; it is limited in size to the maximum number of variables defined in SystemVariables.

Hashtable from SimpleVariableDefinition to SimpleVariableValue.

A Hashtable is used for thread-safety and because a lock on it can exclude other operations.

Never null.


eventHistory

private final java.util.concurrent.ConcurrentMap<SimpleVariableDefinition,EventVariableValueSet> eventHistory
Map of event history; never null. Map from definition to the (recent) history of an event, to be updated when a setVariable() is done on an event-type variable.

No lock must be held on this object, eg to do multiple operations atomically.

Mappings are never remove()d from this table; it is limited in size to the maximum number of variables defined in SystemVariables.

Thread-safe Map from SimpleVariableDefinition to SimpleVariableValue.

Never null.


_loadEVVSConcurrency

private static final java.util.concurrent.atomic.AtomicInteger _loadEVVSConcurrency
Records the current loading concurrency across all instances; ref never null, value never negative.


_getEmptyNonAuthEVV_cache

private static final MemoryTools.SimpleLRUMapAutoSizeForHitRate<Tuple.Triple<SimpleVariableDefinition,EventPeriod,java.lang.Long>,EventVariableValue> _getEmptyNonAuthEVV_cache
Cache of recently-requested 'empty' non-auth EVVs. We make this large enough to accommodate the current and previous/full values for each def for at least a couple of period values, but not so large as to be likely to be a problem.


GMTCalendar

private static final java.util.Calendar GMTCalendar
Our private static instance of a GMT calendar. No one must change the ZONE or DST offset.


dateFmtEHFile

private static final java.text.SimpleDateFormat dateFmtEHFile
Format we use to insert into event history file name.


SAVE_EVENT_HISTORIES_GZIPPED

private static final boolean SAVE_EVENT_HISTORIES_GZIPPED
If true, GZIP the saved histories to save space and read/write time.

See Also:
Constant Field Values

EVENT_STORE_FILENAME_PREFIX

public static final java.lang.String EVENT_STORE_FILENAME_PREFIX
Prefix to event history store file name.

See Also:
Constant Field Values

EVENT_STORE_NAMETERM

public static final java.lang.String EVENT_STORE_NAMETERM
Terminator to go after event name in file store. Format of file is:
     EVENT_STORE_FILENAME_PREFIX + name + EVENT_STORE_NAMETERM [ + date ] + legal suffix
 

The nameterm includes at least one character that cannot appear in an event name to avoid ambiguity where one name is a prefix of another.

See Also:
Constant Field Values

EVENT_STORE_FILENAME_SUFFIX_SER_GZ

public static final java.lang.String EVENT_STORE_FILENAME_SUFFIX_SER_GZ
Suffix for GZIPped serialised event history store file name.

See Also:
Constant Field Values

loadOrder

private final java.util.Queue<Tuple.Triple<SimpleVariableDefinition,java.io.File,java.util.concurrent.locks.ReentrantLock>> loadOrder
Definitions and the files from which their event histories are to be loaded; never null but may be empty. The items are queued in the preferred order they should be loaded, but items from anywhere in the list queue can be loaded on demand.

Entries should be taken from this with poll() and immediately locked with the Lock which will have them processed in preferred order, or in random order while attempting to load a history on demand by using the iterator and remove() and immediately attempting to lock with a failure treated as if the item had not been in the queue at all (because another Thread has stolen it).

The lock must be released after the item is removed from allToBeLoaded.

ConcurrentLinkedQueue for thread-safe lock-free access.


remainingToBeLoaded

private final java.util.concurrent.ConcurrentMap<SimpleVariableDefinition,Tuple.Triple<SimpleVariableDefinition,java.io.File,java.util.concurrent.locks.ReentrantLock>> remainingToBeLoaded
This is the thread-safe collection of all items waiting to be loaded; never null but may be empty. An item is only removed from this collection once it has completed loading successfully or not.

Constructor Detail

BasicVarMgr

public BasicVarMgr()
Create a normal set of managed system variables, initially empty. This instance is not marked as being at an end point, and it does filter out duplicate messages and invalid timestamps.


BasicVarMgr

public BasicVarMgr(boolean endPoint)
Create a set of managed system variables. If an instance is created as an end point, that means that it is the master variable copy in the master server, and so a unique, local, read-only instance ID is created and inserted into the variable cache.

This instance does filter out duplicate messages and invalid timestamps.

Parameters:
endPoint - if true then mark the system as an end-point

BasicVarMgr

public BasicVarMgr(boolean endPoint,
                   boolean dontFilterDups)
Create a set of managed system variables. If an instance is created as an end point, that means that it is the master variable copy in the master server, and so a unique, local, read-only instance ID is created and inserted into the variable cache.

Parameters:
endPoint - if true then mark the system as an end-point
dontFilterDups - if true then do NOT filter out duplicate or out-of-time messages
Method Detail

isEndPoint

public boolean isEndPoint()
Returns true if this is at an endpoint.


timestampAcceptable

private boolean timestampAcceptable(SimpleVariableValue value)
Returns true if the supplied timestamp from an incoming message/event/update is OK. To be considered OK it should be within the allowed peer-skew window, plus an extra allowance for historical values to be sent to us en masse after a network outage for example.

This can be used by other classes, eg upstream of a temporary network disconnect, to decide if it is worth retaining an old message or if it would simply get discarded anyway at the other end by a var manager instance.

Note that events may and non-event values/types may get expired at different rates.


isNonDup

private boolean isNonDup(SimpleVariableValue svv)
Check/accept message (with true return value) if not a duplicate. A message is treated as duplicate if it has the same def, timestamp and value hash, so we reject multiple occurrences of the same value at exactly the same time.

Note that we store the value hash to avoid keeping large message values in memory indefinitely.

In passing this removes any expired entries outside the valid time window.

Returns:
true if message is not a duplicate, false otherwise

setVariable

public void setVariable(SimpleVariableValue newValue)
                 throws java.io.IOException
Set variable. Set local cached value immediately; store global values to periodically propagate upstream to master but show last global values obtained from master on periodic poll.

We validate the the name/type of the variable being presented.

For local and globalMap variables, the last value set is the one that can be retrieved.

Specified by:
setVariable in interface BasicVarMgrInterface
Throws:
java.lang.IllegalArgumentException - on attempt to: set variable with value of wrong type or incompatible definition, set non-existent or read-only variable (or these can be ignored)
java.io.IOException - in case of I/O difficulty

_setVariable

protected void _setVariable(SimpleVariableValue newValue,
                            boolean fromUpstream)
Set variable for upstream and downstream values. Set local cached value immediately; store global values to periodically propagate upstream to master but show last global values obtained from master on periodic poll.

We validate the the name/type of the variable being presented.

Since this allows read-only values to be updated, this is only visible to derived classes.

For local and globalMap variables, the last value set is the one that can be retrieved.

When setting a globalMap variable, we revisit the list of potentially-live systems and trim it if need be.

If eliminating/filtering duplicates and updates with invalid timestamps we do so silently here.

Parameters:
fromUpstream - if true, we allow read-only variables to be set in our cache as they represent updated values from the source and we only allow updates with globalMaps of at least size 1
Throws:
java.lang.IllegalArgumentException - on attempt to: set variable with value of wrong type or incompatible definition, set non-existent or read-only variable (or these can be ignored)

getEventHistory

private EventVariableValueSet getEventHistory(SimpleVariableDefinition def)
Get the current event history for the given definition; never null but may return an empty value.


_loadEVVS

private void _loadEVVS(Tuple.Triple<SimpleVariableDefinition,java.io.File,java.util.concurrent.locks.ReentrantLock> job)
                throws java.io.IOException,
                       java.util.InvalidPropertiesFormatException
Loads into the internal store the specified EVVS from its serialised form in the file. Only to be used when the caller has exclusive access to load this job, eg the current thread must have the lock on this job.

In case of error nothing is posted to the internal store but the lock is released and the entry removed from remainingToBeLoaded.

Throws:
java.io.IOException
java.util.InvalidPropertiesFormatException

_mergeValues

private void _mergeValues(SimpleVariableValue newValue,
                          boolean fromUpstream)
Merge a new value into a variable in the cache. This very carefully merges any extant cached value with a set coming upstream or a new global value coming downstream, trying to cope with the fact that a new value outbound may meet an updated global value coming back from upstream (from the master for example).

We always immediately apply a local or override the value and globalMap entry of a global, though for a global it will probably require the value merged at the master's end-point coming back to sort things out, with some instability/flapping in the interim.

The update must already have been validated, eg must not have more than one globalMap entry if a set.

This deals with events as well as plain variables.

Takes a lock on variables to make this atomic.

Parameters:
newValue - a variable update/set coming upstream/downstream
fromUpstream - if true, this is an update coming downstream

setVariables

public int setVariables(SimpleVariableValue[] newValues)
                 throws java.io.IOException
Update a number of variables at once for efficiency; returns the number set. Is passed a Set of SimpleVariableValues and behaves as if it operates on all of them by calling setVariable() for each item in the Set in order (from low to high index).

This implementation "fails fast" on the first error.

Specified by:
setVariables in interface BasicVarMgrInterface
Returns:
the number of variable values set; never negative, never more than the number passed in
Throws:
java.lang.IllegalArgumentException - on attempt to: set variable with value of wrong type or incompatible definition, set non-existent or read-only variable (or these can be ignored)
java.io.IOException

getVariable

public SimpleVariableValue getVariable(SimpleVariableDefinition var)
Get a single variable value; returns null if no such value or wrong type. Always get from local cache.

This trims from the globalMap any apparently-dead systems, and stores any such trimmed variable back in the cache atomically so as to incrementally eliminate stale data and return only reasonably fresh data to the caller.

Specified by:
getVariable in interface BasicVarMgrInterface
Parameters:
var - definition of variable to fetch; never null
Throws:
java.lang.IllegalArgumentException - if var is null

getVariables

public SimpleVariableValue[] getVariables(long changedSince)
Get set of variable values altered on or after a given time, or all for -1; never null. Always get from local cache (the variable cache being periodically updated from the master).

This may be slow if there are many live variables.

This implementation never throws an IOException.

Note: we may limit the notion of "all" to be something a little greater than the maximum live-variable time window (ie variable lifetime), since beyond this the variable value has expired and is not deemed trustworthy anyway.

Specified by:
getVariables in interface BasicVarMgrInterface

getEventValue

public EventVariableValue getEventValue(SimpleVariableDefinition def,
                                        EventPeriod intervalSelector,
                                        boolean current)
Get the current partial, or previous full, event set at the specified interval; never returns null. This is a simplified interface to return either the current event set that is being collected, or the previous completed set.

The current set is the most timely, but may not contain enough data to be meaningful if the new interval has just started.

The previous set is complete and thus most likely to have enough samples to be useful, but is not completely current.

If the requested event set is not available, an empty synthetic one is created and returned. Thus, with this interface, it is not possible to distinguish between there being no events in the given interval or simply no data.

This calls getEventValues().

This implementation will not throw an IOException.

Specified by:
getEventValue in interface BasicVarMgrInterface
Parameters:
def - event definition (must be for an event); never null
intervalSelector - never null
current - if true the current event set is returned, else the previous complete set is returned
Returns:
requested event set; never null
Throws:
java.lang.IllegalArgumentException - if the request arguments are invalid

getEmptyNonAuthEVV

public static EventVariableValue getEmptyNonAuthEVV(SimpleVariableDefinition def,
                                                    EventPeriod intervalSelector,
                                                    long intervalNumber)
Get a non-authoritative empty EventVariableValue with the specified definition/period/interval; never null. This may cache results to help control the number of instances in circulation, though this empty values are assumed to otherwise have a fairly short life.

Parameters:
def - valid event definition; never null
intervalSelector - valid period for the definition; never null
intervalNumber - valid interval number for the period; strictly positive
Returns:
emtpty non-auth value; never null

getEventValues

public EventVariableValue[] getEventValues(SimpleVariableDefinition def,
                                           EventPeriod intervalSelector,
                                           long intervalNumber,
                                           java.util.BitSet whichValues)
Get the specified event sets for the specified intervals; never null.

Specified by:
getEventValues in interface BasicVarMgrInterface
Parameters:
intervalNumber - the number of the first interval for which data is potentially required; if too far in the past or future then possibly no data will be available, zero is used to access the "all" bucket
whichValues - each true bit represents a slot for which data is required, bit 0 indicating data from the slot within which firstIntervalTime is located, bit 1 the previous slot, etc; null is treated as the common case equivalent to just bit 0 set
def - event definition (must be for an event); never null
intervalSelector - never null
Returns:
as many of the requested values as available, return may contain nulls or be zero-length but is never null

setEventValue

public void setEventValue(EventVariableValue evv)
Set/override the specified event sets for the specified intervals; never null. This is used to set values that have been fetched from upstream (ie closer to the master data store).

This may ignore the call, and certainly will if the intervalNumber is outside the current time window it holds.

This will generally accept the value and store it if:


saveEventHistories

public void saveEventHistories(java.io.File dir,
                               boolean justChanges,
                               boolean keepHistory,
                               boolean incremental)
                        throws java.io.IOException
Persist variables and events to disc. This can either save everything applicable, or just those things things that seem to be out-of-date on disc.

Only persistent event data is saved, and generally each named event type has its data saved in a separate file, in a form that should be reasonably robust (eg a failure to load one item should not have all items unloadable).

This data may be stored in binary serialised or XML forms, but is usually GZIPped to save space and speed up read/write.

If some saves fail then this will try to continue with others, but throw one of the errors encountered (usually the last one) at the end, so as to save as much as possible.

The incremental mode helps keeps the duration of any one call as small as is reasonably possible, but will be somewhat less efficient then non-incremental mode.

This locks out other activity while saving to ensure consistency; it need not hold a lock continuously for whole period of the call as it would usually be enough to do so while collecting candidate histories to save, and while actually saving each one of them separately.

To avoid deadlocks, this takes a lock on this instance before taking any other lock.

Parameters:
dir - the directory to save to; never null
justChanges - just write histories changed since the last save if possible, else unconditionally save all persistent event histories
keepHistory - if true, try to roll the logs at least daily to help keep a trail of older snapshot though the storage space required may grow without bound,
incremental - if true, then at most one event is saved, chosen in such a way (eg by max time since last save or randomly) that eventually all eligible histories will be saved
Throws:
java.io.IOException

loadEventHistories

public void loadEventHistories(java.io.File dir,
                               boolean async)
                        throws java.io.IOException
Load event histories from disc. This will load the "latest" saved state, if any, for each persistent event. "Latest" may be measured by file timestamp (potentially slow and fragile) or lexically-latest (eg by the date embedded in the filename) or by some combination of the two.

Only the event history will be loaded.

If some loads fail then this will try to continue with others, but re-throw one of the errors encountered (usually the last one) at the end, so as to load as much as possible.

Though running this call concurrently with other (mutator) operations (or indeed other instances of this call) should be safe, such operation is not advised.

Should be called at most once, before any set (and get) operations.

We try to load largest-first to maximise concurrency.

Parameters:
dir - the directory to load from; never null
async - if true then the caller will not be blocked waiting to load histories (else if false the caller will be blocked until they are all loaded) and if possible the event histories will be loaded in the background else as-needed, blocking access to individual histories not yet loaded while they are loaded; this only works for event histories with no state loaded at all
Throws:
java.io.IOException

_createLoaderCallable

private java.util.concurrent.Callable<java.lang.Object> _createLoaderCallable()
Creates a Callable that processes all outstanding queued loads; never null. The call() returns null when done.

Parameters:
err -
Returns:

DHD Multimedia Gallery V1.53.0

Copyright (c) 1996-2009, Damon Hart-Davis. All rights reserved.