org.hd.d.pg2k.svrCore.vars
Class PipelineVarMgr

java.lang.Object
  extended by org.hd.d.pg2k.svrCore.vars.BasicVarMgr
      extended by org.hd.d.pg2k.svrCore.vars.PipelineVarMgr
All Implemented Interfaces:
BasicVarMgrInterface, SimpleVariablePipelineIF

public class PipelineVarMgr
extends BasicVarMgr
implements BasicVarMgrInterface, SimpleVariablePipelineIF

Class to "manage" variables for an element of a SimpleExhibitPipelineIF. Caches values to minimise read (and optionally write) activity.

Always maintains a local store of variables, and normally sync() needs to be called periodically to merge with upstream values.

Typical places to inject one of these is in a caching stage, or at pipeline or tunnel end-points.

A non-write-though instance has the effect of grouping multiple setVariable() and setVariables() operations into a single setVariables() upstream action when sync() is invoked. This may still write-though in some circumstances.

In no case does a getVariable() or getVariables() operation directly cause a request to go upstream, (thus helping to ensure that get operations are fast even if there are I/O problems (etc) upstream), but rather the set of values is updated periodically with a single getVariables() operation.

FIXME: getEventValues() avoids, where possible, blocking indefinitely, and may return non-authoritative data instead, though may not be able to avoid such blocking in all circumstances.

Thread-safe; does not hold any externally-visible locks.

This does not count as an "end-point", ie is never the master data store.


Nested Class Summary
private  class PipelineVarMgr.NextRequestKey
          Class definining all details of a deferred request.
 
Field Summary
private  java.util.Hashtable<SimpleVariableDefinition,java.util.EnumSet<EventPeriod>> _deferredAIRequests
          Set of "all" interval event-values requested that we declined to fetch synchronously.
private  java.util.Hashtable<SimpleVariableDefinition,java.util.EnumSet<EventPeriod>> _deferredCIRequests
          Set of "current" interval event-values requested that we declined to fetch synchronously.
private  java.util.concurrent.ConcurrentMap<SimpleVariableDefinition,java.util.concurrent.locks.ReentrantLock> _getEventValues_locks
          Locks to prevent redundant concurrent fetches/updates of the same event type from upstream in getEventValues().
private  java.util.Map<PipelineVarMgr.NextRequestKey,java.lang.Long> _nextRequestNotBefore
          Next permitted (deferred) request for a given all/current period data set.
private  java.lang.Object _sync_lock
          Private lock to serialise calls on sync().
private static boolean ALWAYS_FETCH_ALL_UPSTREAM_VALUES
          If true, always use getVariables(-1) to get all values from upstream.
private static boolean DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY
          If true then generally don't go upstream (synchronously) just to fetch values for the current (or all) interval; respond from local store.
private  java.util.List<SimpleVariableValue> earlyUpdates
          A List of non-coalesced variable set operations to be done first.
private static boolean GET_SYSID_IN_CONS
          If true, try in the constructor to get the local system ID from upstream.
private  java.lang.Long lastFetch
          Last time we successfully fetched variable values from upstream; initially null.
private static int MAX_UPDATE_BATCH_COUNT
          Maximum number of set/update values to send upstream in one go; strictly positive.
private  java.util.Hashtable<SimpleVariableDefinition,SimpleVariableValue> outboundUpdates
          Cached outbound variable value updates to be sent upstream.
private  SimpleVariablePipelineIF source
          Upstream source of variable values; never null.
 
Fields inherited from class org.hd.d.pg2k.svrCore.vars.BasicVarMgr
dontFilterDups, EVENT_STORE_FILENAME_PREFIX, EVENT_STORE_FILENAME_SUFFIX_SER_GZ, EVENT_STORE_NAMETERM, isDebugMode, MAX_APPARENT_MESSAGE_AGE_MS, MAX_UPSTREAM_RETENTION_MS
 
Constructor Summary
PipelineVarMgr(SimpleVariablePipelineIF upstream)
          Create a default PipelineVarMgr instance.
PipelineVarMgr(SimpleVariablePipelineIF upstream, boolean writeThrough)
          Creates a customised VarMgr instance.
 
Method Summary
private  Tuple.Pair<SimpleVariableDefinition,EventPeriod> _getAndClearDeferredRequestForInterval(boolean currentInterval)
          Select, remove and return at random one of the deferred current/all interval requests; null if none.
private  void _noteDeferredRequestForInterval(boolean currentInterval, SimpleVariableDefinition def, EventPeriod intervalSelector)
          Note that we have deferred a request for the given event and period.
 EventVariableValue[] getEventValues(SimpleVariableDefinition def, EventPeriod intervalSelector, long intervalNumber, java.util.BitSet whichValues)
          Get event values; never null.
 boolean isWriteThrough()
          Returns true if this is a write-through cache.
 void setVariable(SimpleVariableValue newValue)
          Set a single variable value.
 int setVariables(SimpleVariableValue[] newValues)
          Set several variable values.
 void syncVariables(boolean force)
          Synchronise variables with upstream values.
 
Methods inherited from class org.hd.d.pg2k.svrCore.vars.BasicVarMgr
_setVariable, getEmptyNonAuthEVV, getEventValue, getVariable, getVariables, isEndPoint, loadEventHistories, saveEventHistories, setEventValue
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface org.hd.d.pg2k.svrCore.vars.BasicVarMgrInterface
getEventValue, getVariable, getVariables
 

Field Detail

ALWAYS_FETCH_ALL_UPSTREAM_VALUES

private static final boolean ALWAYS_FETCH_ALL_UPSTREAM_VALUES
If true, always use getVariables(-1) to get all values from upstream. Inefficient but very robust; generally not the right thing to do though.

See Also:
Constant Field Values

GET_SYSID_IN_CONS

private static final boolean GET_SYSID_IN_CONS
If true, try in the constructor to get the local system ID from upstream. This helps us build globalMaps, and also ensures that the variable is available to clients downstream.

See Also:
Constant Field Values

source

private final SimpleVariablePipelineIF source
Upstream source of variable values; never null.


outboundUpdates

private final java.util.Hashtable<SimpleVariableDefinition,SimpleVariableValue> outboundUpdates
Cached outbound variable value updates to be sent upstream. Will not be used (and will be null) if in write-through mode.

Used by:

Variable updates are held in this table and periodically flushed upstream in a block. If multiple updates to a single variable occur between flushes, only the last one will be retained, (the others overwritten) thus reducing traffic. These may be re-ordered with respect to one another when flushed.

Note however that one value cannot be overwritten by another if they have different globalMap keys. This can only really happen on the master when simultaneous updates arrive from two different slaves for the same global value. In this case, we force the previous value upstream immediately.

A lock must be held on this object to do multiple operations atomically, but no other visible lock must be grabbed inside a lock on this.

Hashtable from SimpleVariableDefinition to SimpleVariableValue.

A Hashtable is used for thread-safety.

Never null.


earlyUpdates

private final java.util.List<SimpleVariableValue> earlyUpdates
A List of non-coalesced variable set operations to be done first. These are variables that could not be coalesced for whatever reason, and must be passed upstream ahead of anything in outboundUpdates.

This will be non-null when outboundUpdates is.

The content of this List must only be accessed in the scope of a lock on outboundUpdates, so this List implementation need not be inherently thread-safe (eg ArrayList can (and should) be used instead of Vector).


lastFetch

private volatile java.lang.Long lastFetch
Last time we successfully fetched variable values from upstream; initially null. Declared as volatile so allow safe access without a lock.


_sync_lock

private final java.lang.Object _sync_lock
Private lock to serialise calls on sync().


MAX_UPDATE_BATCH_COUNT

private static final int MAX_UPDATE_BATCH_COUNT
Maximum number of set/update values to send upstream in one go; strictly positive. This limit is here to avoid vast on-the-wire messages straining sender and receiver, especially in the case where the receiver has been down for a while and so we have accumulated many messages.

More messages in each batch may be more efficient in terms of message overhead, inter-value compression, etc.

See Also:
Constant Field Values

DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY

private static final boolean DONT_FETCH_SPECIAL_INTERVALS_SYNCHRONOUSLY
If true then generally don't go upstream (synchronously) just to fetch values for the current (or all) interval; respond from local store. We may nonetheless synchronously request current-interval data from upstream: thus only making for a marginal extra cost.

If true, there is no guarantee that the "current" data reflects more than locally-originated events or will be hugely up-to-date, though the master's main copy will be.

See Also:
Constant Field Values

_deferredCIRequests

private final java.util.Hashtable<SimpleVariableDefinition,java.util.EnumSet<EventPeriod>> _deferredCIRequests
Set of "current" interval event-values requested that we declined to fetch synchronously. We clear these as we asynchronously fetch the requested values.

A map from event variable definitions to a set of any deferred current-interval requests for that set (or to null if none have ever been deferred for that event type).

A Hashtable is used to guarantee basic thread safety. All manipulation of the contents of the table, and in particular of the non-threadsafe EnumSet values, must be done holding a lock on the table instance.


_deferredAIRequests

private final java.util.Hashtable<SimpleVariableDefinition,java.util.EnumSet<EventPeriod>> _deferredAIRequests
Set of "all" interval event-values requested that we declined to fetch synchronously. We clear these as we asynchronously fetch the requested values.

A map from event variable definitions to a set of any deferred all-interval requests for that set (or to null if none have ever been deferred for that event type).

A Hashtable is used to guarantee basic thread safety. All manipulation of the contents of the table, and in particular of the non-threadsafe EnumSet values, must be done holding a lock on the table instance.


_nextRequestNotBefore

private final java.util.Map<PipelineVarMgr.NextRequestKey,java.lang.Long> _nextRequestNotBefore
Next permitted (deferred) request for a given all/current period data set. We defer repeated requests for the same item until either:

This means that we limit the cost of repeated requests for the same item in such a way that we still have a reasonable chance of seeing intra-interval updates.

The table is thread-safe and parameterised for quick, concurrent access.


_getEventValues_locks

private final java.util.concurrent.ConcurrentMap<SimpleVariableDefinition,java.util.concurrent.locks.ReentrantLock> _getEventValues_locks
Locks to prevent redundant concurrent fetches/updates of the same event type from upstream in getEventValues(). The aim of this is to avoid making many fruitless upstream accesses over (for example) a slow or connection-limited medium such as an HTTP tunnel.

Locking at the per-def level probably admits almost all potential useful concurrency and is relatively simple.

We allow for a limited amount of concurrency in writing but we mainly use ConcurrentHashMap for thread-safety, read concurrency and putIfAbsent().

Constructor Detail

PipelineVarMgr

public PipelineVarMgr(SimpleVariablePipelineIF upstream)
Create a default PipelineVarMgr instance. Defaults to write-through (ie read caching only), because that is "safer" behaviour.

Parameters:
upstream - upstream source of data and variable values; never null

PipelineVarMgr

public PipelineVarMgr(SimpleVariablePipelineIF upstream,
                      boolean writeThrough)
Creates a customised VarMgr instance. May try to get the local system ID immediately from upstream, so that it is available for creating globals.

Parameters:
upstream - upstream source of data and variable values; never null
writeThrough - if true, all set operations are propagated downstream immediately
Method Detail

isWriteThrough

public boolean isWriteThrough()
Returns true if this is a write-through cache.


setVariable

public void setVariable(SimpleVariableValue newValue)
                 throws java.io.IOException
Set a single variable value. If writing though, then set the value upstream immediately, else simply put it in our "pending" list waiting for sync().

Specified by:
setVariable in interface BasicVarMgrInterface
Overrides:
setVariable in class BasicVarMgr
Parameters:
newValue - new variable value to set; never null
Throws:
java.io.IOException - in case of difficulty setting the value upstream

setVariables

public int setVariables(SimpleVariableValue[] newValues)
                 throws java.io.IOException
Set several variable values. This is equivalent to set series of calls to setVariables() unless in write-through mode where a single setVariables() is done upstream for efficiency.

Specified by:
setVariables in interface BasicVarMgrInterface
Overrides:
setVariables in class BasicVarMgr
Parameters:
newValues - new variables values to set; never null
Returns:
number of values successfully set, at least locally...
Throws:
java.io.IOException - in case of difficulty setting the values upstream

syncVariables

public void syncVariables(boolean force)
                   throws java.io.IOException
Synchronise variables with upstream values. We push updated values upstream to the source, call sync on the source if this sync is "forced", and then retrieve changed values from upstream.

We remember when we last did this and request any changes since then, except on the first call or if called with force==true, in which case all values are retrieved.

To allow for (varying) clock skew between different server instances, we actually ask for somewhat older values than our last poll, which will result in some redundant traffic.

Holds no externally-visible locks, and does not hold locks while communicating upstream (so can handle set/get operations even if upstream is slow/blocking), but if called by multiple threads this will safely serialise the calls.

Specified by:
syncVariables in interface SimpleVariablePipelineIF
Parameters:
force - if true, this will force a full sync on the read side by using getVariables(-1) rather than attempting to choose a nearer timestamp for efficiency; the implementation is at liberty to use getVariables(-1) at any time whatever the argument value, and almost certainly should use it on the first call
Throws:
java.io.IOException - if one is received from upstream

_noteDeferredRequestForInterval

private final void _noteDeferredRequestForInterval(boolean currentInterval,
                                                   SimpleVariableDefinition def,
                                                   EventPeriod intervalSelector)
Note that we have deferred a request for the given event and period.

Parameters:
currentInterval - true for "current" interval, false for "all" interval
def - non-null event definition
intervalSelector - non-null interval/period value

_getAndClearDeferredRequestForInterval

private Tuple.Pair<SimpleVariableDefinition,EventPeriod> _getAndClearDeferredRequestForInterval(boolean currentInterval)
Select, remove and return at random one of the deferred current/all interval requests; null if none. This is an atomic fetch-and-clear operation, designed to be used in association with _noteDeferredRequestForInterval().

Parameters:
currentInterval - true for "current" interval, false for "all" interval
Returns:
null or a pair with both elements valid and non-null

getEventValues

public EventVariableValue[] getEventValues(SimpleVariableDefinition def,
                                           EventPeriod intervalSelector,
                                           long intervalNumber,
                                           java.util.BitSet whichValues)
Get event values; never null. We get what we can from our local store, then go upstream to fill in any unfilled requested slots for which we did not have authoritative data (and then attempt to cache them locally for next time). However, if unable to fetch authoritative data (quickly) then we will return null or non-authoritative data as appropriate for each slot; this does not throw IOException.

We treat the 'current' and 'all' intervals specially since upstream will never offer us an authoritative value for the current slot, only for older completed ones. We get requested values asynchronously, to be reasonably up-to-date for the next request.

If a request for a "local" event upstream fails with an IllegalArgumentException then we assume that upstream of us is a tunnel which local values don't cross so we simply report whatever value we have to hand.

Specified by:
getEventValues in interface BasicVarMgrInterface
Overrides:
getEventValues in class BasicVarMgr
Parameters:
def - non-null event-variable definition
intervalSelector - non-null valid interval for specified event
whichValues - each true bit represents a slot for which data is required, bit 0 indicating data from the slot within which firstIntervalTime is located, bit 1 the previous slot, etc; null is treated as the common case equivalent to just bit 0 set
intervalNumber - the number of the first interval for which data is potentially required; if too far in the past or future then possibly no data will be available, zero is used to access the "all" bucket
Returns:
as many of the requested values as available, return may contain nulls or be zero-length but is never null
Throws:
java.io.IOException - if we have difficulty getting values from upstream
java.io.InterruptedIOException - if interrupted or if blocking for too long.

DHD Multimedia Gallery V1.53.0

Copyright (c) 1996-2009, Damon Hart-Davis. All rights reserved.