001 /*
002 Copyright (c) 1996-2011, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029 package org.hd.d.pg2k.svrCore.vars;
030
031 import java.io.IOException;
032 import java.io.InvalidObjectException;
033 import java.io.ObjectInputStream;
034 import java.io.ObjectInputValidation;
035 import java.io.ObjectOutputStream;
036 import java.io.ObjectStreamException;
037 import java.io.Serializable;
038 import java.io.StreamCorruptedException;
039 import java.util.ArrayList;
040 import java.util.Collections;
041 import java.util.HashMap;
042 import java.util.Iterator;
043 import java.util.List;
044 import java.util.Map;
045 import java.util.TreeMap;
046
047 import org.hd.d.pg2k.svrCore.MemoryTools;
048 import org.hd.d.pg2k.svrCore.Name;
049 import org.hd.d.pg2k.svrCore.Rnd;
050
051 import ORG.hd.d.IsDebug;
052
053
054 /**Thread-safe mutable value of a local or global system event variable.
055 * This is the mutable version of EventVariableValue
056 * as StringBuilder is to String.
057 * <p>
058 * The current stats should be collected in instances of this,
059 * then converted to EventVariableValue for longer-term storage
060 * and/or to return to users.
061 * <p>
062 * This is designed to be efficient and quick to mutate.
063 * <p>
064 * This class is thread-safe, and methods may be synchronized.
065 * <p>
066 * This attempts to conserve memory,
067 * at the cost of some CPU time,
068 * by attempting to effectively intern() event values.
069 * This therefore assumes that all event values are sensibly intern()able,
070 * eg immutable and with equals() and hashCode() methods that cover all visible state.
071 */
072 public final class EventVariableValueBuffer implements Serializable,
073 ObjectInputValidation
074 {
075 /**Construct a single variable value.
076 * This type-checks the value, vetoing construction if invalid.
077 *
078 * @param def the variable definition; never null and must be an event
079 * @param period the event period for this sample; never null
080 * @param intervalNumber the period number, zero means "all";
081 * non-negative
082 *
083 * @throws IllegalArgumentException if the arguments are invalid
084 */
085 public EventVariableValueBuffer(final SimpleVariableDefinition def,
086 final EventPeriod period,
087 final long intervalNumber)
088 throws IllegalArgumentException
089 {
090 this.def = def;
091 this.period = period;
092 this.intervalNumber = intervalNumber;
093
094 // If allowing compact internal state
095 // then for 'String' values use a SortedMap based on a generic CharSequence comparator
096 // which allows String and, for example, Name, instances to be mixed.
097 if(EventVariableValue.allowCompactStringValues(def))
098 { counts = new TreeMap(EventVariableValue.SV_COMPARATOR); }
099 else
100 { counts = new HashMap<Object, Count>(); }
101
102 try { validateObject(); }
103 catch(final InvalidObjectException e)
104 { throw new IllegalArgumentException(e.getMessage()); }
105 }
106
107 /**Private cache for toEventVariableValue().
108 * Cleared on any change to the buffer, ie a new event or a merge.
109 * <p>
110 * Avoids having to expensively recreate a new evv when nothing has changed.
111 * <p>
112 * We do not serialise/persist this redundant state.
113 */
114 private transient EventVariableValue evvCache;
115
116 /**Convert to (immutable) non-authoritative EventVariableValue.
117 * If the cached value is non-null then we return that,
118 * else we compute it, cache it, and return it.
119 */
120 public synchronized EventVariableValue toEventVariableValue()
121 {
122 if(evvCache != null) { return(evvCache); }
123
124 // Get counts ordered to get rankings...
125 final List<Count> cs = new ArrayList<Count>(counts.values());
126 Collections.sort(cs);
127 // Should be highest-count first...
128 assert((cs.size() < 2) || (cs.get(0).count >= cs.get(1).count)) : "array out of order";
129
130 // Create list of values and counts...
131 final Object values[] = new Object[cs.size()];
132 final int counts[] = new int[cs.size()];
133 for(int i = cs.size(); --i >= 0; )
134 {
135 final Count count = cs.get(i);
136 values[i] = count.value;
137 counts[i] = count.count;
138 }
139
140 // Now make the non-authoritative immutable EventVariableValue...
141 // We can skip expensively re-intern()ing all the values
142 // since we have already done that in this class.
143 final EventVariableValue evv = new EventVariableValue(false, def, period, intervalNumber, totalEventCount, values, counts, false);
144 evvCache = evv; // Cache it...
145 return(evv);
146 }
147
148
149 /**The (immutable) variable definition; never null. */
150 private final SimpleVariableDefinition def;
151
152 /**Get the variable definition; never null. */
153 public SimpleVariableDefinition getDef() { return(def); }
154
155 /**The (immutable) event period; never null. */
156 private final EventPeriod period;
157
158 /**Get the event period; never null. */
159 public EventPeriod getPeriod() { return(period); }
160
161 /**The (immutable) interval number; strictly positive. */
162 private final long intervalNumber;
163
164 /**Get the interval number; strictly positive. */
165 public long getIntervalNumber() { return(intervalNumber); }
166
167
168 /**Total event count; non-negative.
169 * Initially zero.
170 * <p>
171 * Mutable; accessed under the instance lock.
172 */
173 private int totalEventCount;
174
175 /**Get the total event count; non-negative. */
176 public int getTotalEventCount() { return(totalEventCount); }
177
178 /**Generate human-readable summary of state. */
179 @Override
180 public String toString()
181 {
182 return(def.getName() + ':' + totalEventCount);
183 }
184
185
186 /**Private class to contain an int count value.
187 * Contains the count of a particular value.
188 * <p>
189 * Is mutable to avoid unnecessary object churn in the heap.
190 * <p>
191 * When created has a count of 1.
192 * <p>
193 * Not thread-safe (access should be protected by other means).
194 * <p>
195 * Made available to EventVariableValue to help in canonicalising state.
196 */
197 static final class Count implements Comparable<Count>,
198 Serializable,
199 ObjectInputValidation
200 {
201 /**Construct an instance with the given (non-null) event value. */
202 Count(final Object o)
203 {
204 value = o;
205 try { validateObject(); } catch(final InvalidObjectException e)
206 { throw new IllegalArgumentException(e.getMessage()); }
207 }
208 /**Provide total ordering with largest count (lowest rank) first. */
209 public int compareTo(final Count c)
210 {
211 if(count < c.count) { return(+1); }
212 if(count > c.count) { return(-1); }
213 return(0);
214 }
215 /**Value (should be immutable); non-null. */
216 final Object value;
217 /**Count value; should be strictly positive. */
218 int count = 1;
219 /**Check that the object state is consistent and legal. */
220 public void validateObject() throws InvalidObjectException
221 {
222 if(value == null) { throw new InvalidObjectException("bad object: value is null"); }
223 if(count <= 0) { throw new InvalidObjectException("bad object: count not positive"); }
224 }
225 /**Deserialise: validate and then intern() value to economise on memory.
226 */
227 protected Object readResolve()
228 throws ObjectStreamException
229 {
230 validateObject();
231 final Count c = new Count(MemoryTools.intern(value));
232 c.count = count;
233 return(c);
234 }
235 /**Unique Serialisation class ID generated by http://random.hd.org/. */
236 private static final long serialVersionUID = 187928939275973214L;
237 };
238
239
240 /**Map from event value to count of events of that value.
241 * Initially empty.
242 * <p>
243 * Not thread-safe (access should be protected by other means).
244 */
245 private /* final */ Map<Object, Count> counts;
246
247
248 /**Record/add event to the tally.
249 * Note that the event must match the definition that this buffer is constructed with
250 * or an IllegalArgumentException will be thrown.
251 * <p>
252 * The total event count is incremented and if appropriate
253 * the value and/or count of occurrences of that value
254 * will be incremented.
255 * <p>
256 * Note that null values are not recorded in the map,
257 * but the totalCount is still incremented.
258 * <p>
259 * If the totalEventCount would be about to overflow,
260 * then we halve it and halve the count of all extant entries
261 * (removing any that become zero).
262 *
263 * @param svv event to be recorded; must be non-null and of event type
264 */
265 public synchronized void addEvent(final SimpleVariableValue svv)
266 {
267 if((svv == null) || !def.equals(svv.getDef()))
268 { throw new IllegalArgumentException(); }
269
270 // Clear any evv cache since we are changing the buffer state.
271 evvCache = null;
272
273 // Avoid overflow by scaling all existing values (halving them).
274 if(totalEventCount == Integer.MAX_VALUE)
275 {
276 for(final Iterator it = counts.keySet().iterator(); it.hasNext(); )
277 {
278 final Object key = it.next();
279 final Count c = counts.get(key);
280 // If the scaled count drops to zero then remove the entry.
281 if((c.count /= 2) < 1)
282 { it.remove(); }
283 }
284
285 totalEventCount /= 2;
286 }
287
288 // Count the event.
289 ++totalEventCount;
290
291 final Object v1 = svv.getValue();
292 // Void-valued events are not further counted.
293 if(v1 == null)
294 { return; }
295
296 Count c = counts.get(v1);
297 // Create new entry (with count==1) if needed.
298 if(c == null)
299 {
300 // Don't expand the table if already at/beyond the limit.
301 // We are prepared to evict an existing item at random
302 // and use the slot so freed.
303 // The probability of eviction is inversely proportional
304 // to its count to allow new values to get into a full table.
305 final int maxDiffEventCount = def.getMaxDiffEventCount();
306 if(counts.size() >= maxDiffEventCount)
307 {
308 // We amortise the cost of finding an evictee to be about
309 // constant regardless of table size.
310 // So we may give up and reject this putative new entry.
311 // (If we don't keep value counts at all, we must give up too!)
312 if((maxDiffEventCount < 1) || (Rnd.fastRnd.nextInt(maxDiffEventCount) != 0))
313 { return; }
314
315 // OK, look for a victim...
316 final List<Count> candiatesToEvict = new ArrayList<Count>();
317 for(final Count cc : counts.values())
318 {
319 // Even high-count entries can ejected,
320 // but only at low priority,
321 // so that novel event values can enter a full table.
322 if((cc.count < 2) || (Rnd.fastRnd.nextInt(cc.count) == 0))
323 { candiatesToEvict.add(cc); }
324 }
325 final int cs = candiatesToEvict.size();
326 // No candidate victims to remove,
327 // so we really will have to ignore the new entry!
328 if(cs < 1) { return; }
329
330 // There were some candidates,
331 // so pick one at random,
332 // and remove it from the table.
333 // We now have space for our new event.
334 final int toEvict = Rnd.fastRnd.nextInt(cs);
335 final Object vToEvict = candiatesToEvict.get(toEvict).value;
336 counts.remove(vToEvict);
337 }
338
339 // De-dup and compact (new) values to economise on memory
340 // if it is likely to be worthwhile long-term.
341 final Object value = compactValue(EventVariableValue.shouldIntern(def), v1);
342
343 // Lookups with the original value or a compact version should be equivalent.
344 assert(counts.containsKey(v1) == counts.containsKey(value));
345
346 c = new Count(value);
347 counts.put(c.value, c);
348 }
349 // Else increment existing count.
350 else
351 { ++c.count; }
352 }
353
354 /**If true, replace our data rather than merge on update(). */
355 private static final boolean REPLACE_ON_UPDATE = true;
356
357 /**Update/merge the value with upstream data.
358 * This is taken to be a set of (usually non-authoritative)
359 * values from upstream which possible contains the events from
360 * all (or more) participants in the distributed system.
361 * <p>
362 * We compute an approximate merge,
363 * which is assumed to be OK to do at all but the master node since
364 * downstream nodes will eventually get overridden with
365 * correct authoritative values from upstream.
366 * <p>
367 * The approximation consists of increasing the event count to
368 * the maximum found in this item and the supplied value,
369 * and then adjusting the total event count suitably
370 * (allowing for events not stored as values in the map)
371 * This may result in a total event count higher than
372 * either this or the upstream data.
373 * <p>
374 * This should <em>not</em> be used at the very top-level master store
375 * to avoid double-counting values, etc.
376 * <p>
377 * This may be quite slow.
378 */
379 public synchronized void update(final EventVariableValue upstream)
380 {
381 // Clear any evv cache since we are changing the buffer state.
382 evvCache = null;
383
384 // If there is any danger at all of us exceeding the maximum
385 // possible number of different entries on merge,
386 // then simply replace our state with that from upstream.
387 if(REPLACE_ON_UPDATE ||
388 (upstream.getTotalDistinctValues() + counts.size() > def.getMaxDiffEventCount()))
389 {
390 counts.clear();
391 totalEventCount = upstream.getTotalEventCount();
392 final boolean shouldIntern = EventVariableValue.shouldIntern(def);
393 for(final Object value : upstream.getDistinctValuesInRankOrder())
394 {
395 final Object compacted = compactValue(shouldIntern, value);
396 final Count c = new Count(compacted);
397 c.count = upstream.getCount(value);
398 counts.put(c.value, c);
399 }
400 }
401
402 else // Do a merge...
403 {
404 // For all upstream values,
405 // make sure that the count is adjusted upwards
406 // to the higher of the upstream and extant values,
407 // adjusting totalEventCount too if needed.
408 final boolean shouldIntern = EventVariableValue.shouldIntern(def);
409 for(final Object value : upstream.getDistinctValuesInRankOrder())
410 {
411 final int count = upstream.getCount(value);
412 assert(count > 0);
413 Count c = counts.get(value);
414 // If no record for this value then make a zero-count entry now.
415 if(c == null)
416 {
417 c = new Count(compactValue(shouldIntern, value));
418 c.count = count;
419 totalEventCount += count;
420 counts.put(c.value, c);
421 continue;
422 }
423 // If the upstream count is greater then adjust our records.
424 else if(count > c.count)
425 {
426 totalEventCount += (count - c.count);
427 c.count = count; // Adjust extant record in place.
428 }
429 }
430
431 // Ensure that the total count reflects the highest available
432 // to allow for events whose values have not been captured.
433 final int upstreamTotalEventCount = upstream.getTotalEventCount();
434 if(upstreamTotalEventCount > totalEventCount)
435 { totalEventCount = upstreamTotalEventCount; }
436 if(counts.size() > totalEventCount)
437 { totalEventCount = counts.size(); }
438 }
439
440 // Double-check state...
441 if(IsDebug.isDebug)
442 {
443 try { validateObject(); }
444 catch(final InvalidObjectException e)
445 { throw new Error(); }
446 }
447 }
448
449 /**Return the compacted form of a value or the original, possibly intern()ing. */
450 private Object compactValue(final boolean shouldIntern, final Object value)
451 {
452 if(EventVariableValue.AVOID_RECOMPRESSING && (value instanceof Name))
453 { return(value); }
454 return(EventVariableValue.allowCompactStringValues(def) ? (Name.createOrStringFallback((CharSequence) value, null)) :
455 (shouldIntern ? MemoryTools.intern(value) : value));
456 }
457
458
459 /**Equality is based on the definition, period and interval number, but not the event values.
460 */
461 @Override
462 public boolean equals(final Object obj)
463 {
464 if(!(obj instanceof EventVariableValueBuffer)) { return(false); }
465 final EventVariableValueBuffer o = (EventVariableValueBuffer) obj;
466
467 if(!def.equals(o.def)) { return(false); }
468 if(period != o.period) { return(false); }
469 if(intervalNumber != o.intervalNumber) { return(false); }
470
471 // All relevant fields seem to match, so objects are equal.
472 return(true);
473 }
474
475 /**The hash is built on a subset of the fields.
476 * This implementation uses the hash of the definition and the interval number and period values.
477 */
478 @Override
479 public int hashCode()
480 {
481 return(def.hashCode() ^ ((int)intervalNumber) ^ period.getIntervalMs());
482 }
483
484 /**Deserialise.
485 */
486 private void readObject(final ObjectInputStream ois)
487 throws IOException, ClassNotFoundException
488 {
489 ois.defaultReadObject();
490
491 // Take a defensive copy of counts,
492 // and if it is too large, trim it if at all possible,
493 // else clear it completely.
494 // We may also convert to a more compact representation for the values.
495 // TODO: sort newCounts by value and compact it taking advantage of adjacency.
496 final List<Count> newCounts = new ArrayList<Count>(counts.values());
497 final int diffValues = newCounts.size();
498 // Discard the deserialised Map and rebuild it completely.
499 counts = // new HashMap<Object,Count>(1 + 2 * diffValues);
500 (EventVariableValue.allowCompactStringValues(def)) ?
501 new TreeMap(EventVariableValue.SV_COMPARATOR) : // TreeMap over CharSequence keys...
502 new HashMap<Object, Count>(2*diffValues); // Fast HashMap for most types.
503 final boolean tooLarge = diffValues > def.getMaxDiffEventCount();
504 if(tooLarge) { System.err.println("EventVariableValueBuffer.readObject(): WARNING: coercing value for "+def+" by trimming from "+diffValues+" different values."); }
505 int total = 0; // Recompute totalCount.
506 final boolean shouldIntern = EventVariableValue.shouldIntern(def);
507 for(final Count rc : newCounts)
508 {
509 final int cc = rc.count;
510 if(cc < 1) { throw new StreamCorruptedException(); }
511
512 total += cc;
513
514 // If currently too large, simply drop all single-count entries
515 // to try to slim us down a little.
516 if(tooLarge && (cc < 2)) { continue; }
517
518 // Make a safe (and possibly-compacted) copy of the count data
519 // with the (key) value also intern()ed if useful.
520 final Object compactValue;
521 if(EventVariableValue.allowCompactStringValues(def))
522 {
523 // We may wish to avoid attempting possibly-futile and expensive recompression.
524 if(EventVariableValue.AVOID_RECOMPRESSING && (rc.value instanceof Name))
525 { compactValue = rc.value; }
526 else
527 { compactValue = Name.createOrStringFallback((CharSequence) rc.value, null); }
528 }
529 else if(shouldIntern)
530 { compactValue = MemoryTools.intern(rc.value); }
531 else
532 { compactValue = rc.value; }
533 final Count c = new Count(compactValue);
534 c.count = cc;
535 counts.put(c.value, c);
536 }
537 // If still too large then clear all values.
538 if(counts.size() > def.getMaxDiffEventCount())
539 { counts.clear(); }
540
541 // Coerce totalEventCount to be sensible.
542 if(total > totalEventCount)
543 { totalEventCount = total; }
544
545 // Check that object is now sensible.
546 validateObject();
547 }
548
549 /**Customise how we save state when serialising.
550 */
551 private synchronized void writeObject(final ObjectOutputStream oos)
552 throws IOException
553 {
554 // Capture just the fields that we want to persist.
555 final ObjectOutputStream.PutField fields = oos.putFields();
556 fields.put("def", def);
557 fields.put("intervalNumber", intervalNumber);
558 fields.put("period", period);
559 fields.put("totalEventCount", totalEventCount);
560
561 if(EventVariableValue.allowCompactStringValues(def))
562 {
563 // Rewrite counts to a HashMap() of (intern()ed) String-based values for serialising,
564 // if preserving previous serialised form (even if though not especially compact nor robust).
565 final HashMap<Object, Count> cc = new HashMap<Object, Count>(counts.size()*2 + 1);
566 for(final Map.Entry<Object, Count> e : counts.entrySet())
567 {
568 final Count c = e.getValue();
569 // If preserving on-the-wire format then convert back to String.
570 // Else the resulting new Map of potentially-mixed key types
571 // is not much good for anything other than storage;
572 // we know all the values to be unique so there should be no loss of data.
573 final CharSequence valueAsCS = (CharSequence) e.getKey();
574 final Count newCount = new Count(valueAsCS);
575 newCount.count = c.count;
576 cc.put(valueAsCS, newCount);
577 }
578 assert(cc.size() == counts.size()) : "there should be no key collisions";
579 fields.put("counts", cc);
580 }
581 else
582 { fields.put("counts", counts); }
583
584 oos.writeFields();
585 }
586
587
588 /**Check that the object state is consistent and legal. */
589 public synchronized void validateObject()
590 throws InvalidObjectException
591 {
592 if(def == null)
593 { throw new InvalidObjectException("bad object: def must not be null"); }
594 if(!def.isEvent())
595 { throw new InvalidObjectException("bad object: def not an event type"); }
596
597 if(period == null)
598 { throw new InvalidObjectException("bad object: period must not be null"); }
599
600 if(totalEventCount < 0)
601 { throw new InvalidObjectException("bad object: negative totalEventCount"); }
602
603 if(intervalNumber < 0)
604 { throw new InvalidObjectException("bad object: negative intervalNumber"); }
605 if(Long.MAX_VALUE / period.getIntervalMs() < intervalNumber)
606 { throw new InvalidObjectException("bad object: intervalNumber too large"); }
607
608 // Validate map.
609 if(counts == null)
610 { throw new InvalidObjectException("bad object: counts must not be null"); }
611 // Check that we don't have too many distinct entries.
612 if(counts.size() > def.getMaxDiffEventCount())
613 { throw new InvalidObjectException("bad object: counts too large"); }
614 // Check that all keys are valid (correct type and not null).
615 for(final Object k : counts.keySet())
616 {
617 if((k == null) || (!def.checkType(k) && !(EventVariableValue.allowCompactStringValues(def) && (Name.class == k.getClass()))))
618 { throw new InvalidObjectException("bad object: invalid key in counts"); }
619 if(!k.equals(counts.get(k).value))
620 { throw new InvalidObjectException("bad object: mismatched key and count value"); }
621 }
622 // Check that all counts are valid (positive and not null).
623 for(final Count c : counts.values())
624 {
625 if(c == null)
626 { throw new InvalidObjectException("bad object: null count in counts"); }
627 c.validateObject();
628 }
629 }
630
631 /**Unique Serialisation class ID generated by http://random.hd.org/. */
632 private static final long serialVersionUID = -1721389511619513375L;
633 }