001    /*
002    Copyright (c) 1996-2011, Damon Hart-Davis
003    All rights reserved.
004    
005    Redistribution and use in source and binary forms, with or without
006    modification, are permitted provided that the following conditions are
007    met:
008    
009      * Redistributions of source code must retain the above copyright
010        notice, this list of conditions and the following disclaimer.
011    
012      * Redistributions in binary form must reproduce the above copyright
013        notice, this list of conditions and the following disclaimer in the
014        documentation and/or other materials provided with the
015        distribution.
016    
017    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028    */
029    package org.hd.d.pg2k.svrCore.vars;
030    
031    import java.io.IOException;
032    import java.io.InvalidObjectException;
033    import java.io.ObjectInputStream;
034    import java.io.ObjectInputValidation;
035    import java.io.ObjectOutputStream;
036    import java.io.ObjectStreamException;
037    import java.io.Serializable;
038    import java.io.StreamCorruptedException;
039    import java.util.ArrayList;
040    import java.util.Collections;
041    import java.util.HashMap;
042    import java.util.Iterator;
043    import java.util.List;
044    import java.util.Map;
045    import java.util.TreeMap;
046    
047    import org.hd.d.pg2k.svrCore.MemoryTools;
048    import org.hd.d.pg2k.svrCore.Name;
049    import org.hd.d.pg2k.svrCore.Rnd;
050    
051    import ORG.hd.d.IsDebug;
052    
053    
054    /**Thread-safe mutable value of a local or global system event variable.
055     * This is the mutable version of EventVariableValue
056     * as StringBuilder is to String.
057     * <p>
058     * The current stats should be collected in instances of this,
059     * then converted to EventVariableValue for longer-term storage
060     * and/or to return to users.
061     * <p>
062     * This is designed to be efficient and quick to mutate.
063     * <p>
064     * This class is thread-safe, and methods may be synchronized.
065     * <p>
066     * This attempts to conserve memory,
067     * at the cost of some CPU time,
068     * by attempting to effectively intern() event values.
069     * This therefore assumes that all event values are sensibly intern()able,
070     * eg immutable and with equals() and hashCode() methods that cover all visible state.
071      */
072    public final class EventVariableValueBuffer implements Serializable,
073                                                           ObjectInputValidation
074        {
075        /**Construct a single variable value.
076         * This type-checks the value, vetoing construction if invalid.
077         *
078         * @param def  the variable definition; never null and must be an event
079         * @param period  the event period for this sample; never null
080         * @param intervalNumber  the period number, zero means "all";
081         *     non-negative
082         *
083         * @throws IllegalArgumentException  if the arguments are invalid
084         */
085        public EventVariableValueBuffer(final SimpleVariableDefinition def,
086                                        final EventPeriod period,
087                                        final long intervalNumber)
088            throws IllegalArgumentException
089            {
090            this.def = def;
091            this.period = period;
092            this.intervalNumber = intervalNumber;
093    
094            // If allowing compact internal state
095            // then for 'String' values use a SortedMap based on a generic CharSequence comparator
096            // which allows String and, for example, Name, instances to be mixed.
097            if(EventVariableValue.allowCompactStringValues(def))
098                { counts = new TreeMap(EventVariableValue.SV_COMPARATOR); }
099            else
100                { counts = new HashMap<Object, Count>(); }
101    
102            try { validateObject(); }
103            catch(final InvalidObjectException e)
104                { throw new IllegalArgumentException(e.getMessage()); }
105            }
106    
107        /**Private cache for toEventVariableValue().
108         * Cleared on any change to the buffer, ie a new event or a merge.
109         * <p>
110         * Avoids having to expensively recreate a new evv when nothing has changed.
111         * <p>
112         * We do not serialise/persist this redundant state.
113         */
114        private transient EventVariableValue evvCache;
115    
116        /**Convert to (immutable) non-authoritative EventVariableValue.
117         * If the cached value is non-null then we return that,
118         * else we compute it, cache it, and return it.
119         */
120        public synchronized EventVariableValue toEventVariableValue()
121            {
122            if(evvCache != null) { return(evvCache); }
123    
124            // Get counts ordered to get rankings...
125            final List<Count> cs = new ArrayList<Count>(counts.values());
126            Collections.sort(cs);
127            // Should be highest-count first...
128            assert((cs.size() < 2) || (cs.get(0).count >= cs.get(1).count)) : "array out of order";
129    
130            // Create list of values and counts...
131            final Object values[] = new Object[cs.size()];
132            final int counts[] = new int[cs.size()];
133            for(int i = cs.size(); --i >= 0; )
134                {
135                final Count count = cs.get(i);
136                values[i] = count.value;
137                counts[i] = count.count;
138                }
139    
140            // Now make the non-authoritative immutable EventVariableValue...
141            // We can skip expensively re-intern()ing all the values
142            // since we have already done that in this class.
143            final EventVariableValue evv = new EventVariableValue(false, def, period, intervalNumber, totalEventCount, values, counts, false);
144            evvCache = evv; // Cache it...
145            return(evv);
146            }
147    
148    
149        /**The (immutable) variable definition; never null. */
150        private final SimpleVariableDefinition def;
151    
152        /**Get the variable definition; never null. */
153        public SimpleVariableDefinition getDef() { return(def); }
154    
155        /**The (immutable) event period; never null. */
156        private final EventPeriod period;
157    
158        /**Get the event period; never null. */
159        public EventPeriod getPeriod() { return(period); }
160    
161        /**The (immutable) interval number; strictly positive. */
162        private final long intervalNumber;
163    
164        /**Get the interval number; strictly positive. */
165        public long getIntervalNumber() { return(intervalNumber); }
166    
167    
168        /**Total event count; non-negative.
169         * Initially zero.
170         * <p>
171         * Mutable; accessed under the instance lock.
172         */
173        private int totalEventCount;
174    
175        /**Get the total event count; non-negative. */
176        public int getTotalEventCount() { return(totalEventCount); }
177    
178        /**Generate human-readable summary of state. */
179        @Override
180        public String toString()
181            {
182            return(def.getName() + ':' + totalEventCount);
183            }
184    
185    
186        /**Private class to contain an int count value.
187         * Contains the count of a particular value.
188         * <p>
189         * Is mutable to avoid unnecessary object churn in the heap.
190         * <p>
191         * When created has a count of 1.
192         * <p>
193         * Not thread-safe (access should be protected by other means).
194         * <p>
195         * Made available to EventVariableValue to help in canonicalising state.
196         */
197        static final class Count implements Comparable<Count>,
198                                            Serializable,
199                                            ObjectInputValidation
200            {
201            /**Construct an instance with the given (non-null) event value. */
202            Count(final Object o)
203                {
204                value = o;
205                try { validateObject(); } catch(final InvalidObjectException e)
206                    { throw new IllegalArgumentException(e.getMessage()); }
207                }
208            /**Provide total ordering with largest count (lowest rank) first. */
209            public int compareTo(final Count c)
210                {
211                if(count < c.count) { return(+1); }
212                if(count > c.count) { return(-1); }
213                return(0);
214                }
215            /**Value (should be immutable); non-null. */
216            final Object value;
217            /**Count value; should be strictly positive. */
218            int count = 1;
219            /**Check that the object state is consistent and legal. */
220            public void validateObject() throws InvalidObjectException
221                {
222                if(value == null) { throw new InvalidObjectException("bad object: value is null"); }
223                if(count <= 0) { throw new InvalidObjectException("bad object: count not positive"); }
224                }
225            /**Deserialise: validate and then intern() value to economise on memory.
226             */
227            protected Object readResolve()
228                throws ObjectStreamException
229                {
230                validateObject();
231                final Count c = new Count(MemoryTools.intern(value));
232                c.count = count;
233                return(c);
234                }
235            /**Unique Serialisation class ID generated by http://random&#46;hd&#46;org/. */
236            private static final long serialVersionUID = 187928939275973214L;
237            };
238    
239    
240        /**Map from event value to count of events of that value.
241         * Initially empty.
242         * <p>
243         * Not thread-safe (access should be protected by other means).
244         */
245        private /* final */ Map<Object, Count> counts;
246    
247    
248        /**Record/add event to the tally.
249         * Note that the event must match the definition that this buffer is constructed with
250         * or an IllegalArgumentException will be thrown.
251         * <p>
252         * The total event count is incremented and if appropriate
253         * the value and/or count of occurrences of that value
254         * will be incremented.
255         * <p>
256         * Note that null values are not recorded in the map,
257         * but the totalCount is still incremented.
258         * <p>
259         * If the totalEventCount would be about to overflow,
260         * then we halve it and halve the count of all extant entries
261         * (removing any that become zero).
262         *
263         * @param svv  event to be recorded; must be non-null and of event type
264         */
265        public synchronized void addEvent(final SimpleVariableValue svv)
266            {
267            if((svv == null) || !def.equals(svv.getDef()))
268                { throw new IllegalArgumentException(); }
269    
270            // Clear any evv cache since we are changing the buffer state.
271            evvCache = null;
272    
273            // Avoid overflow by scaling all existing values (halving them).
274            if(totalEventCount == Integer.MAX_VALUE)
275                {
276                for(final Iterator it = counts.keySet().iterator(); it.hasNext(); )
277                    {
278                    final Object key = it.next();
279                    final Count c = counts.get(key);
280                    // If the scaled count drops to zero then remove the entry.
281                    if((c.count /= 2) < 1)
282                        { it.remove(); }
283                    }
284    
285                totalEventCount /= 2;
286                }
287    
288            // Count the event.
289            ++totalEventCount;
290    
291            final Object v1 = svv.getValue();
292            // Void-valued events are not further counted.
293            if(v1 == null)
294                { return; }
295    
296            Count c = counts.get(v1);
297            // Create new entry (with count==1) if needed.
298            if(c == null)
299                {
300                // Don't expand the table if already at/beyond the limit.
301                // We are prepared to evict an existing item at random
302                // and use the slot so freed.
303                // The probability of eviction is inversely proportional
304                // to its count to allow new values to get into a full table.
305                final int maxDiffEventCount = def.getMaxDiffEventCount();
306                if(counts.size() >= maxDiffEventCount)
307                    {
308                    // We amortise the cost of finding an evictee to be about
309                    // constant regardless of table size.
310                    // So we may give up and reject this putative new entry.
311                    // (If we don't keep value counts at all, we must give up too!)
312                    if((maxDiffEventCount < 1) || (Rnd.fastRnd.nextInt(maxDiffEventCount) != 0))
313                        { return; }
314    
315                    // OK, look for a victim...
316                    final List<Count> candiatesToEvict = new ArrayList<Count>();
317                    for(final Count cc : counts.values())
318                        {
319                        // Even high-count entries can ejected,
320                        // but only at low priority,
321                        // so that novel event values can enter a full table.
322                        if((cc.count < 2) || (Rnd.fastRnd.nextInt(cc.count) == 0))
323                            { candiatesToEvict.add(cc); }
324                        }
325                    final int cs = candiatesToEvict.size();
326                    // No candidate victims to remove,
327                    // so we really will have to ignore the new entry!
328                    if(cs < 1) { return; }
329    
330                    // There were some candidates,
331                    // so pick one at random,
332                    // and remove it from the table.
333                    // We now have space for our new event.
334                    final int toEvict = Rnd.fastRnd.nextInt(cs);
335                    final Object vToEvict = candiatesToEvict.get(toEvict).value;
336                    counts.remove(vToEvict);
337                    }
338    
339                // De-dup and compact (new) values to economise on memory
340                // if it is likely to be worthwhile long-term.
341                final Object value = compactValue(EventVariableValue.shouldIntern(def), v1);
342    
343                // Lookups with the original value or a compact version should be equivalent.
344                assert(counts.containsKey(v1) == counts.containsKey(value));
345    
346                c = new Count(value);
347                counts.put(c.value, c);
348                }
349            // Else increment existing count.
350            else
351                { ++c.count; }
352            }
353    
354        /**If true, replace our data rather than merge on update(). */
355        private static final boolean REPLACE_ON_UPDATE = true;
356    
357        /**Update/merge the value with upstream data.
358         * This is taken to be a set of (usually non-authoritative)
359         * values from upstream which possible contains the events from
360         * all (or more) participants in the distributed system.
361         * <p>
362         * We compute an approximate merge,
363         * which is assumed to be OK to do at all but the master node since
364         * downstream nodes will eventually get overridden with
365         * correct authoritative values from upstream.
366         * <p>
367         * The approximation consists of increasing the event count to
368         * the maximum found in this item and the supplied value,
369         * and then adjusting the total event count suitably
370         * (allowing for events not stored as values in the map)
371         * This may result in a total event count higher than
372         * either this or the upstream data.
373         * <p>
374         * This should <em>not</em> be used at the very top-level master store
375         * to avoid double-counting values, etc.
376         * <p>
377         * This may be quite slow.
378         */
379        public synchronized void update(final EventVariableValue upstream)
380            {
381            // Clear any evv cache since we are changing the buffer state.
382            evvCache = null;
383    
384            // If there is any danger at all of us exceeding the maximum
385            // possible number of different entries on merge,
386            // then simply replace our state with that from upstream.
387            if(REPLACE_ON_UPDATE ||
388               (upstream.getTotalDistinctValues() + counts.size() > def.getMaxDiffEventCount()))
389                {
390                counts.clear();
391                totalEventCount = upstream.getTotalEventCount();
392                final boolean shouldIntern = EventVariableValue.shouldIntern(def);
393                for(final Object value : upstream.getDistinctValuesInRankOrder())
394                    {
395                    final Object compacted = compactValue(shouldIntern, value);
396                    final Count c = new Count(compacted);
397                    c.count = upstream.getCount(value);
398                    counts.put(c.value, c);
399                    }
400                }
401    
402            else // Do a merge...
403                {
404                // For all upstream values,
405                // make sure that the count is adjusted upwards
406                // to the higher of the upstream and extant values,
407                // adjusting totalEventCount too if needed.
408                final boolean shouldIntern = EventVariableValue.shouldIntern(def);
409                for(final Object value : upstream.getDistinctValuesInRankOrder())
410                    {
411                    final int count = upstream.getCount(value);
412                    assert(count > 0);
413                    Count c = counts.get(value);
414                    // If no record for this value then make a zero-count entry now.
415                    if(c == null)
416                        {
417                        c = new Count(compactValue(shouldIntern, value));
418                        c.count = count;
419                        totalEventCount += count;
420                        counts.put(c.value, c);
421                        continue;
422                        }
423                    // If the upstream count is greater then adjust our records.
424                    else if(count > c.count)
425                        {
426                        totalEventCount += (count - c.count);
427                        c.count = count; // Adjust extant record in place.
428                        }
429                    }
430    
431                // Ensure that the total count reflects the highest available
432                // to allow for events whose values have not been captured.
433                final int upstreamTotalEventCount = upstream.getTotalEventCount();
434                if(upstreamTotalEventCount > totalEventCount)
435                    { totalEventCount = upstreamTotalEventCount; }
436                if(counts.size() > totalEventCount)
437                    { totalEventCount = counts.size(); }
438                }
439    
440            // Double-check state...
441            if(IsDebug.isDebug)
442                {
443                try { validateObject(); }
444                catch(final InvalidObjectException e)
445                    { throw new Error(); }
446                }
447            }
448    
449        /**Return the compacted form of a value or the original, possibly intern()ing. */
450        private Object compactValue(final boolean shouldIntern, final Object value)
451            {
452            if(EventVariableValue.AVOID_RECOMPRESSING && (value instanceof Name))
453                { return(value); }
454            return(EventVariableValue.allowCompactStringValues(def) ? (Name.createOrStringFallback((CharSequence) value, null)) :
455                (shouldIntern ? MemoryTools.intern(value) : value));
456            }
457    
458    
459        /**Equality is based on the definition, period and interval number, but not the event values.
460         */
461        @Override
462        public boolean equals(final Object obj)
463            {
464            if(!(obj instanceof EventVariableValueBuffer)) { return(false); }
465            final EventVariableValueBuffer o = (EventVariableValueBuffer) obj;
466    
467            if(!def.equals(o.def)) { return(false); }
468            if(period != o.period) { return(false); }
469            if(intervalNumber != o.intervalNumber) { return(false); }
470    
471            // All relevant fields seem to match, so objects are equal.
472            return(true);
473            }
474    
475        /**The hash is built on a subset of the fields.
476         * This implementation uses the hash of the definition and the interval number and period values.
477         */
478        @Override
479        public int hashCode()
480            {
481            return(def.hashCode() ^ ((int)intervalNumber) ^ period.getIntervalMs());
482            }
483    
484        /**Deserialise.
485         */
486        private void readObject(final ObjectInputStream ois)
487            throws IOException, ClassNotFoundException
488            {
489            ois.defaultReadObject();
490    
491            // Take a defensive copy of counts,
492            // and if it is too large, trim it if at all possible,
493            // else clear it completely.
494            // We may also convert to a more compact representation for the values.
495            // TODO: sort newCounts by value and compact it taking advantage of adjacency.
496            final List<Count> newCounts = new ArrayList<Count>(counts.values());
497            final int diffValues = newCounts.size();
498            // Discard the deserialised Map and rebuild it completely.
499            counts = // new HashMap<Object,Count>(1 + 2 * diffValues);
500                (EventVariableValue.allowCompactStringValues(def)) ?
501                    new TreeMap(EventVariableValue.SV_COMPARATOR) : // TreeMap over CharSequence keys...
502                    new HashMap<Object, Count>(2*diffValues); // Fast HashMap for most types.
503            final boolean tooLarge = diffValues > def.getMaxDiffEventCount();
504    if(tooLarge) { System.err.println("EventVariableValueBuffer.readObject(): WARNING: coercing value for "+def+" by trimming from "+diffValues+" different values."); }
505            int total = 0; // Recompute totalCount.
506            final boolean shouldIntern = EventVariableValue.shouldIntern(def);
507            for(final Count rc : newCounts)
508                {
509                final int cc = rc.count;
510                if(cc < 1) { throw new StreamCorruptedException(); }
511    
512                total += cc;
513    
514                // If currently too large, simply drop all single-count entries
515                // to try to slim us down a little.
516                if(tooLarge && (cc < 2)) { continue; }
517    
518                // Make a safe (and possibly-compacted) copy of the count data
519                // with the (key) value also intern()ed if useful.
520                final Object compactValue;
521                if(EventVariableValue.allowCompactStringValues(def))
522                    {
523                    // We may wish to avoid attempting possibly-futile and expensive recompression.
524                    if(EventVariableValue.AVOID_RECOMPRESSING && (rc.value instanceof Name))
525                        { compactValue = rc.value; }
526                    else
527                        { compactValue = Name.createOrStringFallback((CharSequence) rc.value, null); }
528                    }
529                else if(shouldIntern)
530                    { compactValue = MemoryTools.intern(rc.value); }
531                else
532                    { compactValue = rc.value; }
533                final Count c = new Count(compactValue);
534                c.count = cc;
535                counts.put(c.value, c);
536                }
537            // If still too large then clear all values.
538            if(counts.size() > def.getMaxDiffEventCount())
539                { counts.clear(); }
540    
541            // Coerce totalEventCount to be sensible.
542            if(total > totalEventCount)
543                { totalEventCount = total; }
544    
545            // Check that object is now sensible.
546            validateObject();
547            }
548    
549        /**Customise how we save state when serialising.
550         */
551        private synchronized void writeObject(final ObjectOutputStream oos)
552            throws IOException
553            {
554            // Capture just the fields that we want to persist.
555            final ObjectOutputStream.PutField fields = oos.putFields();
556            fields.put("def", def);
557            fields.put("intervalNumber", intervalNumber);
558            fields.put("period", period);
559            fields.put("totalEventCount", totalEventCount);
560    
561            if(EventVariableValue.allowCompactStringValues(def))
562                {
563                // Rewrite counts to a HashMap() of (intern()ed) String-based values for serialising,
564                // if preserving previous serialised form (even if though not especially compact nor robust).
565                final HashMap<Object, Count> cc = new HashMap<Object, Count>(counts.size()*2 + 1);
566                for(final Map.Entry<Object, Count> e : counts.entrySet())
567                    {
568                    final Count c = e.getValue();
569                    // If preserving on-the-wire format then convert back to String.
570                    // Else the resulting new Map of potentially-mixed key types
571                    // is not much good for anything other than storage;
572                    // we know all the values to be unique so there should be no loss of data.
573                    final CharSequence valueAsCS = (CharSequence) e.getKey();
574                    final Count newCount = new Count(valueAsCS);
575                    newCount.count = c.count;
576                    cc.put(valueAsCS, newCount);
577                    }
578                assert(cc.size() == counts.size()) : "there should be no key collisions";
579                fields.put("counts", cc);
580                }
581            else
582                { fields.put("counts", counts); }
583    
584            oos.writeFields();
585            }
586    
587    
588        /**Check that the object state is consistent and legal. */
589        public synchronized void validateObject()
590            throws InvalidObjectException
591            {
592            if(def == null)
593                { throw new InvalidObjectException("bad object: def must not be null"); }
594            if(!def.isEvent())
595                { throw new InvalidObjectException("bad object: def not an event type"); }
596    
597            if(period == null)
598                { throw new InvalidObjectException("bad object: period must not be null"); }
599    
600            if(totalEventCount < 0)
601                { throw new InvalidObjectException("bad object: negative totalEventCount"); }
602    
603            if(intervalNumber < 0)
604                { throw new InvalidObjectException("bad object: negative intervalNumber"); }
605            if(Long.MAX_VALUE / period.getIntervalMs() < intervalNumber)
606                { throw new InvalidObjectException("bad object: intervalNumber too large"); }
607    
608            // Validate map.
609            if(counts == null)
610                { throw new InvalidObjectException("bad object: counts must not be null"); }
611            // Check that we don't have too many distinct entries.
612            if(counts.size() > def.getMaxDiffEventCount())
613                { throw new InvalidObjectException("bad object: counts too large"); }
614            // Check that all keys are valid (correct type and not null).
615            for(final Object k : counts.keySet())
616                {
617                if((k == null) || (!def.checkType(k) && !(EventVariableValue.allowCompactStringValues(def) && (Name.class == k.getClass()))))
618                    { throw new InvalidObjectException("bad object: invalid key in counts"); }
619                if(!k.equals(counts.get(k).value))
620                    { throw new InvalidObjectException("bad object: mismatched key and count value"); }
621                }
622            // Check that all counts are valid (positive and not null).
623            for(final Count c : counts.values())
624                {
625                if(c == null)
626                    { throw new InvalidObjectException("bad object: null count in counts"); }
627                c.validateObject();
628                }
629            }
630    
631        /**Unique Serialisation class ID generated by http://random&#46;hd&#46;org/. */
632        private static final long serialVersionUID = -1721389511619513375L;
633        }