001 /*
002 Copyright (c) 1996-2011, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029 package org.hd.d.pg2k.svrCore.vars;
030
031 import java.io.IOException;
032 import java.io.InvalidObjectException;
033 import java.io.ObjectInputStream;
034 import java.io.ObjectInputValidation;
035 import java.io.Serializable;
036 import java.util.Arrays;
037 import java.util.BitSet;
038
039 import ORG.hd.d.IsDebug;
040
041
042 /**Current and historical entries for one event forming one "row".
043 * This is mutable and used to hold live event stats in memory.
044 * <p>
045 * This maintains state incrementally on each new event, for example,
046 * shuffling out and discarding old historical event values if need be.
047 * <p>
048 * Thread-safe, with methods synchronized as necessary.
049 * <p>
050 * Serializable for direct persistence if required,
051 * though an XML format rather than a binary one would probably be more robust.
052 */
053 public final class EventVariableValuePeriodRow implements Serializable,
054 ObjectInputValidation
055 {
056 /**Construct a single event variable value period "row".
057 *
058 * @param def the variable definition; never null and must be an event
059 * @param period the event period for this sample; never null
060 *
061 * @throws IllegalArgumentException if the arguments are invalid
062 */
063 public EventVariableValuePeriodRow(final SimpleVariableDefinition def,
064 final EventPeriod period)
065 throws IllegalArgumentException
066 {
067 this.def = def;
068 this.period = period;
069
070 current = new EventVariableValueBuffer(def, period, period.getIntervalNumber(System.currentTimeMillis()));
071 all = new EventVariableValueBuffer(def, period, 0);
072
073 try { validateObject(); }
074 catch(final InvalidObjectException e)
075 { throw new IllegalArgumentException(e.getMessage()); }
076 }
077
078
079
080 /**The variable definition; never null. */
081 private final SimpleVariableDefinition def;
082
083 /**Get the variable definition; never null. */
084 public SimpleVariableDefinition getDef() { return(def); }
085
086 /**The event period; never null. */
087 private final EventPeriod period;
088
089 /**Get the event period; never null. */
090 public EventPeriod getPeriod() { return(period); }
091
092
093
094 /**All events being collected, never null.
095 * Captures all events ever seen,
096 * but may be rescaled to prevent overflow,
097 * so should simply be regarded as a reasonably representative sub-sample.
098 * <p>
099 * Will never be authoritative.
100 */
101 private /* final */ EventVariableValueBuffer all;
102
103 /**"Current" events being collected, never null.
104 * Will be replaced with new instance as the current one stops being current.
105 * <p>
106 * Initially created with value suitable to receive
107 * a new event immediately.
108 * <p>
109 * Will never be authoritative.
110 */
111 private EventVariableValueBuffer current;
112
113 /**Set of "historical" periods, most-recent at index 0, possibly containing nulls; never null.
114 * Initially all nulls.
115 * <p>
116 * The intervalNumber of any entry at index i
117 * must be current.getIntervalNumber()-1-i.
118 * <p>
119 * FIXME: copy defensively during deserialisation.
120 */
121 private final EventVariableValue historical[] =
122 new EventVariableValue[SystemVariables.EVENT_SAMPLES_RETAINED];
123
124
125
126 /**Record/add event to the tally for this period.
127 * Note that the event must match the definition that this instance is constructed with
128 * or an IllegalArgumentException will be thrown.
129 * <p>
130 * If the "current" event buffer is too old to accept this incoming event,
131 * we move all the historical data up and insert an immutable copy of
132 * the buffered data as appropriate
133 * (though we must cope with the case where we have to move data up several
134 * slots due to an extended interval between updates).
135 *
136 * @param svv event to be recorded; must be non-null and of event type
137 */
138 public synchronized void addEvent(final SimpleVariableValue svv)
139 {
140 if((svv == null) || !def.equals(svv.getDef()))
141 { throw new IllegalArgumentException(); }
142
143 // Get to normalised state if we need to...
144 _canonicaliseState();
145
146 // Add event to current list.
147 current.addEvent(svv);
148
149 // Add event to "all" event bucket.
150 all.addEvent(svv);
151 }
152
153 /**Canonicalise our internal state, so that the current event collector covers "now".
154 * If current event collector is too old,
155 * then create a new one and age the historical data.
156 * If the clock seems to have gone backwards for whatever reason,
157 * we'll simply carry on collecting as we were until things get sane.
158 */
159 private void _canonicaliseState()
160 {
161 // // State validation...
162 // try { validateObject(); } catch(InvalidObjectException e) { throw new IllegalStateException(e); }
163
164 final long now = System.currentTimeMillis();
165 final long calcCurrentIntervalNumber = period.getIntervalNumber(now);
166
167 if(current.getIntervalNumber() < calcCurrentIntervalNumber)
168 {
169 // System.out.println("DOING SHIFT UP...");
170 // Need to shift up data...
171 final EventVariableValueBuffer oldCurrent = current;
172 final EventVariableValueBuffer newCurrent = new EventVariableValueBuffer(def, period, calcCurrentIntervalNumber);
173 current = newCurrent;
174
175 // Compute the number of slots that we need to shift data up by,
176 // usually 1, but may be more for rare events or after deserialisation.
177 final long toShiftBy = newCurrent.getIntervalNumber() - oldCurrent.getIntervalNumber();
178 // System.out.println("SHIFTING BY: " + toShiftBy);
179 assert(toShiftBy > 0);
180
181 // If a really huge shift,
182 // eg because something has been restored after a long period on disc,
183 // or a really rare event has just occurred,
184 // then just throw away all the old data by clearing the history
185 // and not even converting the oldCurrent to a historical value.
186 if(toShiftBy > historical.length)
187 { Arrays.fill(historical, null); }
188 else
189 {
190 // Copy up all existing elements
191 // to higher indexes (ie age them)...
192 // This should correctly move nothing if toShiftBy == historical.length.
193 System.arraycopy(historical, 0, historical, (int) toShiftBy, (int) (historical.length - toShiftBy));
194
195 // Convert the oldCurrent to immutable form
196 // and insert into the appropriate slot...
197 final int oldest = (int) (toShiftBy - 1);
198 historical[oldest] = oldCurrent.toEventVariableValue();
199
200 // Null out any lower (newer) slots that we now have no data for.
201 if(toShiftBy > 1)
202 { Arrays.fill(historical, 0, oldest, null); }
203
204 assert(historical[oldest] != null); // We now have at least one historical entry...
205 }
206
207 //System.out.println("EVVPR current/historical[] = " + current + "/" + Arrays.asList(historical));
208 }
209
210 // // State validation...
211 // try { validateObject(); } catch(InvalidObjectException e) { throw new IllegalStateException(e); }
212 }
213
214
215 /**Get the specified event sets for the specified intervals; never null.
216 * This allows retrieval of zero or more event sets for the specified
217 * interval size.
218 * <p>
219 * Requests for more than SystemVariables.EVENT_SAMPLES_RETAINED in the
220 * past (or for the future!) cannot be satisfied and data will not be
221 * returned for them.
222 * <p>
223 * Usually not more than SystemVariables.EVENT_SAMPLES_RETAINED samples
224 * will be returned in response to any one request as a safety measure.
225 *
226 * @param firstIntervalNumber the number of the first interval
227 * for which data is potentially required;
228 * if too far in the past or future then possibly no data
229 * will be available;
230 * using zero accesses the "all" bucket
231 * @param whichValues each true bit represents a slot for which data is
232 * required, bit 0 indicating data from the slot within which
233 * firstIntervalTime is located, bit 1 the previous slot, etc
234 *
235 * @return as many of the requested values as available,
236 * at least long enough to return all the available values,
237 * with [0] corresponding to bit 0 in the BitSet;
238 * may contain nulls or be zero-length but is never null
239 */
240 synchronized EventVariableValue[] getEventValues(final long firstIntervalNumber,
241 final BitSet whichValues)
242 {
243 if(firstIntervalNumber < 0)
244 { throw new IllegalArgumentException(); }
245
246 final boolean nullWV = (whichValues == null);
247
248 // If firstIntervalNumber == 0, then exactly slot zero must be requested.
249 if(firstIntervalNumber == 0)
250 {
251 if(!nullWV && (!whichValues.get(0) || (whichValues.length() != 1)))
252 { throw new IllegalArgumentException("request for 'all' bucket must be for exactly that bucket and no others"); }
253
254 // Convert "all" bucket to correct form and return it...
255 return(new EventVariableValue[]{ all.toEventVariableValue() });
256 }
257
258 //System.out.println("[EventVariableValuePeriodRow.getEventValues("+firstIntervalNumber+", "+whichValues+")]");
259
260 // // State validation...
261 // try { validateObject(); } catch(InvalidObjectException e) { throw new IllegalStateException(e); }
262
263 // If requested result size silly/huge, then give up now to save space, etc.
264 final int resultSize = nullWV ? 1 : whichValues.length();
265 final long currentIntervalNumber = current.getIntervalNumber();
266
267 // Optimisations for where the request will definitely generate no results...
268 if(resultSize < 1)
269 { return(_NO_EVENT_VALUES); }
270 if(resultSize > 1 + 2*SystemVariables.EVENT_SAMPLES_RETAINED)
271 { return(_NO_EVENT_VALUES); }
272 // If newest interval requested is too much newer than the "current" interval,
273 // then give up now.
274 if(firstIntervalNumber - resultSize > currentIntervalNumber)
275 { return(_NO_EVENT_VALUES); }
276 // If newest interval requested is too far in the past, then give up now.
277 final long oldestPossibleEventNumber = currentIntervalNumber - historical.length;
278 if(firstIntervalNumber < oldestPossibleEventNumber)
279 { return(_NO_EVENT_VALUES); }
280
281 // Make result...
282 // Don't attempt to optimise its length here, yet...
283 final EventVariableValue result[] = new EventVariableValue[resultSize];
284
285 // Index in historical[index] corresponding to result[0].
286 final int offset = (int) ((currentIntervalNumber-1) - firstIntervalNumber);
287
288 //System.out.println("EVVPR offset = " + offset);
289 //System.out.println("EVVPR current/historical[] = " + current + "/" + Arrays.asList(historical));
290
291 // Walk the request list, filling in values where we have them.
292 // The "current" item corresponds to index -1 in historical[],
293 // and needs to be converted before being slotted into the result, if used.
294 // Note that this value will never count as "authoritative" since not closed.
295 // Remember: higher "i" means older (smaller intervalNumber) values.
296 BitSet wv = whichValues;
297 if(wv == null)
298 {
299 // Create synthetic BitSet for simplicity below.
300 wv = new BitSet(1);
301 wv.set(0);
302 }
303 for(int i = wv.nextSetBit(0); i >= 0; i = wv.nextSetBit(i+1))
304 {
305 final int hIndex = i + offset;
306
307 if(hIndex == -1) // Current...
308 { result[i] = current.toEventVariableValue(); }
309 else if((hIndex >= 0) && (hIndex < historical.length))
310 { result[i] = historical[hIndex]; }
311 }
312
313 // // Extra validation of output...
314 // assert(result != null) : "unexpected null return value";
315 // for(int i = result.length; --i >= 0; )
316 // {
317 // final EventVariableValue evv = result[i];
318 // if(evv == null) { continue; }
319 // assert(evv.getIntervalNumber() == firstIntervalNumber - i) : "unexpected return value with wrong interval number";
320 // }
321
322 // // State validation...
323 // try { validateObject(); } catch(InvalidObjectException e) { throw new IllegalStateException(e); }
324
325 //System.out.println("EVVPR result[] = " + Arrays.asList(result));
326
327 return(result);
328 }
329
330 /**Zero-length array corresponding to "no results", immutable so made available to the package; non-null. */
331 static final EventVariableValue[] _NO_EVENT_VALUES = new EventVariableValue[0];
332
333
334 /**Set/override the specified event sets for the specified intervals; never null.
335 * This is used to set values that have been fetched from upstream
336 * (ie closer to the master data store).
337 * <p>
338 * This may ignore the call,
339 * and certainly will if the intervalNumber is outside the current time window it holds.
340 * <p>
341 * This will generally accept the value and store it if:
342 * <ul>
343 * <li>The intervalNumber is within the current window.
344 * <li>The intervalSelector is a period for which this event has data.
345 * <li>The supplied value is "better" than any current one held, ie:
346 * <ul>
347 * <li>We hold a null and evv is non-null.
348 * <li>We hold a non-authoritative value and evv is authoritative.
349 * <li>The authoritative-ness is the same bu we hold an entry
350 * with a lower total count than evv.
351 * <ul>
352 * </ul>
353 * </ul>
354 *
355 * @param evv non-null event value
356 */
357 public synchronized void setEventValue(final EventVariableValue evv)
358 {
359 if(evv == null)
360 { throw new IllegalArgumentException(); }
361 final SimpleVariableDefinition def = evv.getDef();
362 if(!def.equals(this.def))
363 { throw new IllegalArgumentException(); }
364 if(!evv.getPeriod().equals(period))
365 { throw new IllegalArgumentException(); }
366
367 // Make sure that our state is canonicalised.
368 _canonicaliseState();
369
370 // Event of "current" set, newer (greater) than any we can set.
371 final long currentIntervalNumber = current.getIntervalNumber();
372
373 // Interval number from our new data.
374 final long updateIntervalNumber = evv.getIntervalNumber();
375
376 // Note that we may accept an override for the all/current periods,
377 // (but we don't really expect it to be marked as authoritative)
378 // by folding in events from upstream not seen here.
379 if(updateIntervalNumber == 0)
380 {
381 // Accept an update if we seem not to have seen all events.
382 if(all.getTotalEventCount() < evv.getTotalEventCount())
383 {
384 if(IsDebug.isDebug) { System.out.println("[EVVPR.setEventValue("+evv+"): accepting all-period override (isAuth="+evv.isAuthoritative()+").]"); }
385 all.update(evv);
386 }
387 return;
388 }
389 else if(updateIntervalNumber == currentIntervalNumber)
390 {
391 // Accept an update if we seem not to have seen all events.
392 if(current.getTotalEventCount() < evv.getTotalEventCount())
393 {
394 if(IsDebug.isDebug) { System.out.println("[EVVPR.setEventValue("+evv+"): accepting current-period override (isAuth="+evv.isAuthoritative()+") at period "+updateIntervalNumber+".]"); }
395 current.update(evv);
396 }
397 return;
398 }
399 // Too new for us to accept the update.
400 else if(updateIntervalNumber > currentIntervalNumber) { return; }
401
402 final long oldestPossibleEventNumber = currentIntervalNumber - historical.length;
403 // Too old for us to accept the update.
404 if(updateIntervalNumber < oldestPossibleEventNumber) { return; }
405
406 // Index in historical[index] corresponding to new value we have been passed.
407 final int offset = (int) ((currentIntervalNumber-1) - updateIntervalNumber);
408
409 // If the new data is "better" in some way
410 // then accept it in place of what we have already.
411 //
412 // Better is any of:
413 // * We have now old value at all.
414 // * We have a non-auth value and the new data is authoritative.
415 // * We have a non-auth value and the new data has more events
416 // (and this is dealt with specially for the current interval).
417 final EventVariableValue extant = historical[offset];
418 if((extant == null) ||
419 (!extant.isAuthoritative() && evv.isAuthoritative()) ||
420 (extant.getTotalEventCount() < evv.getTotalEventCount()))
421 {
422 //if(IsDebug.isDebug) { System.out.println("EVVPR.setEventValue("+evv+"): accepting override (isAuth="+evv.isAuthoritative()+") at period "+updateIntervalNumber+" (offset="+offset+")"); }
423 historical[offset] = evv;
424 }
425
426 assert((extant == null) || (extant.getIntervalNumber() == updateIntervalNumber));
427
428 // try { validateObject(); } catch(InvalidObjectException e) { e.printStackTrace(); }
429 }
430
431 /**Deserialise.
432 */
433 private void readObject(final ObjectInputStream ois)
434 throws IOException, ClassNotFoundException
435 {
436 ois.defaultReadObject();
437
438 // If deserialising from an old object without an "all" field,
439 // create it empty now.
440 if(all == null) { all = new EventVariableValueBuffer(def, period, 0); }
441
442 // Validate the state.
443 validateObject();
444 }
445
446
447
448 /**Check that the object state is consistent and legal.
449 * Is synchronized to prevent changes of state while this instance is examined.
450 *
451 * @throws InvalidObjectException
452 */
453 public synchronized void validateObject()
454 throws InvalidObjectException
455 {
456 if(def == null)
457 { throw new InvalidObjectException("bad object: def must not be null"); }
458 if(!def.isEvent())
459 { throw new InvalidObjectException("bad object: def not an event type"); }
460
461 if(period == null)
462 { throw new InvalidObjectException("bad object: period must not be null"); }
463
464 if(current == null)
465 { throw new InvalidObjectException("bad object: current must not be null"); }
466 if((current.getPeriod() != period) || !current.getDef().equals(def))
467 { throw new InvalidObjectException("bad object: current def/period incorrect"); }
468 if(current.getIntervalNumber() <= 0)
469 { throw new InvalidObjectException("bad object: current interval invalid"); }
470
471 if(all == null)
472 { throw new InvalidObjectException("bad object: all must not be null"); }
473 if((all.getPeriod() != period) || !all.getDef().equals(def))
474 { throw new InvalidObjectException("bad object: all def/period incorrect"); }
475 if(all.getIntervalNumber() != 0)
476 { throw new InvalidObjectException("bad object: all interval non-zero"); }
477
478 if(all == current)
479 { throw new InvalidObjectException("bad object: all == current"); }
480
481 if(historical == null)
482 { throw new InvalidObjectException("bad object: historical must not be null"); }
483 if(historical.length != SystemVariables.EVENT_SAMPLES_RETAINED)
484 { throw new InvalidObjectException("bad object: historical wrong length"); }
485 for(int i = historical.length; --i >= 0; )
486 {
487 final EventVariableValue evv = historical[i];
488 if(evv != null)
489 {
490 if((evv.getPeriod() != period) || !evv.getDef().equals(def))
491 { throw new InvalidObjectException("bad object: historical def/period incorrect"); }
492 final long expectedIntervalNumber = current.getIntervalNumber() - 1 - i;
493 if(evv.getIntervalNumber() != expectedIntervalNumber)
494 { throw new InvalidObjectException("bad object: historical intervalNumber incorrect"); }
495 }
496 }
497 }
498
499 /**Unique Serialisation class ID. */
500 private static final long serialVersionUID = -614477516323489097L;
501 }