001    /*
002    Copyright (c) 1996-2011, Damon Hart-Davis
003    All rights reserved.
004    
005    Redistribution and use in source and binary forms, with or without
006    modification, are permitted provided that the following conditions are
007    met:
008    
009      * Redistributions of source code must retain the above copyright
010        notice, this list of conditions and the following disclaimer.
011    
012      * Redistributions in binary form must reproduce the above copyright
013        notice, this list of conditions and the following disclaimer in the
014        documentation and/or other materials provided with the
015        distribution.
016    
017    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028    */
029    package org.hd.d.pg2k.svrCore.vars;
030    
031    import java.io.IOException;
032    import java.io.InvalidObjectException;
033    import java.io.ObjectInputStream;
034    import java.io.ObjectInputValidation;
035    import java.io.Serializable;
036    import java.util.Arrays;
037    import java.util.BitSet;
038    
039    import ORG.hd.d.IsDebug;
040    
041    
042    /**Current and historical entries for one event forming one "row".
043     * This is mutable and used to hold live event stats in memory.
044     * <p>
045     * This maintains state incrementally on each new event, for example,
046     * shuffling out and discarding old historical event values if need be.
047     * <p>
048     * Thread-safe, with methods synchronized as necessary.
049     * <p>
050     * Serializable for direct persistence if required,
051     * though an XML format rather than a binary one would probably be more robust.
052     */
053    public final class EventVariableValuePeriodRow implements Serializable,
054                                                              ObjectInputValidation
055        {
056        /**Construct a single event variable value period "row".
057         *
058         * @param def  the variable definition; never null and must be an event
059         * @param period  the event period for this sample; never null
060         *
061         * @throws IllegalArgumentException  if the arguments are invalid
062         */
063        public EventVariableValuePeriodRow(final SimpleVariableDefinition def,
064                                           final EventPeriod period)
065            throws IllegalArgumentException
066            {
067            this.def = def;
068            this.period = period;
069    
070            current = new EventVariableValueBuffer(def, period, period.getIntervalNumber(System.currentTimeMillis()));
071            all = new EventVariableValueBuffer(def, period, 0);
072    
073            try { validateObject(); }
074            catch(final InvalidObjectException e)
075                { throw new IllegalArgumentException(e.getMessage()); }
076            }
077    
078    
079    
080        /**The variable definition; never null. */
081        private final SimpleVariableDefinition def;
082    
083        /**Get the variable definition; never null. */
084        public SimpleVariableDefinition getDef() { return(def); }
085    
086        /**The event period; never null. */
087        private final EventPeriod period;
088    
089        /**Get the event period; never null. */
090        public EventPeriod getPeriod() { return(period); }
091    
092    
093    
094        /**All events being collected, never null.
095         * Captures all events ever seen,
096         * but may be rescaled to prevent overflow,
097         * so should simply be regarded as a reasonably representative sub-sample.
098         * <p>
099         * Will never be authoritative.
100         */
101        private /* final */ EventVariableValueBuffer all;
102    
103        /**"Current" events being collected, never null.
104         * Will be replaced with new instance as the current one stops being current.
105         * <p>
106         * Initially created with value suitable to receive
107         * a new event immediately.
108         * <p>
109         * Will never be authoritative.
110         */
111        private EventVariableValueBuffer current;
112    
113        /**Set of "historical" periods, most-recent at index 0, possibly containing nulls; never null.
114         * Initially all nulls.
115         * <p>
116         * The intervalNumber of any entry at index i
117         * must be current.getIntervalNumber()-1-i.
118         * <p>
119         * FIXME: copy defensively during deserialisation.
120         */
121        private final EventVariableValue historical[] =
122                new EventVariableValue[SystemVariables.EVENT_SAMPLES_RETAINED];
123    
124    
125    
126        /**Record/add event to the tally for this period.
127         * Note that the event must match the definition that this instance is constructed with
128         * or an IllegalArgumentException will be thrown.
129         * <p>
130         * If the "current" event buffer is too old to accept this incoming event,
131         * we move all the historical data up and insert an immutable copy of
132         * the buffered data as appropriate
133         * (though we must cope with the case where we have to move data up several
134         * slots due to an extended interval between updates).
135         *
136         * @param svv  event to be recorded; must be non-null and of event type
137         */
138        public synchronized void addEvent(final SimpleVariableValue svv)
139            {
140            if((svv == null) || !def.equals(svv.getDef()))
141                { throw new IllegalArgumentException(); }
142    
143            // Get to normalised state if we need to...
144            _canonicaliseState();
145    
146            // Add event to current list.
147            current.addEvent(svv);
148    
149            // Add event to "all" event bucket.
150            all.addEvent(svv);
151            }
152    
153        /**Canonicalise our internal state, so that the current event collector covers "now".
154         * If current event collector is too old,
155         * then create a new one and age the historical data.
156         * If the clock seems to have gone backwards for whatever reason,
157         * we'll simply carry on collecting as we were until things get sane.
158         */
159        private void _canonicaliseState()
160            {
161    //        // State validation...
162    //        try { validateObject();  } catch(InvalidObjectException e) { throw new IllegalStateException(e); }
163    
164            final long now = System.currentTimeMillis();
165            final long calcCurrentIntervalNumber = period.getIntervalNumber(now);
166    
167            if(current.getIntervalNumber() < calcCurrentIntervalNumber)
168                {
169    //            System.out.println("DOING SHIFT UP...");
170                // Need to shift up data...
171                final EventVariableValueBuffer oldCurrent = current;
172                final EventVariableValueBuffer newCurrent = new EventVariableValueBuffer(def, period, calcCurrentIntervalNumber);
173                current = newCurrent;
174    
175                // Compute the number of slots that we need to shift data up by,
176                // usually 1, but may be more for rare events or after deserialisation.
177                final long toShiftBy = newCurrent.getIntervalNumber() - oldCurrent.getIntervalNumber();
178    //            System.out.println("SHIFTING BY: " + toShiftBy);
179                assert(toShiftBy > 0);
180    
181                // If a really huge shift,
182                // eg because something has been restored after a long period on disc,
183                // or a really rare event has just occurred,
184                // then just throw away all the old data by clearing the history
185                // and not even converting the oldCurrent to a historical value.
186                if(toShiftBy > historical.length)
187                    { Arrays.fill(historical, null); }
188                else
189                    {
190                    // Copy up all existing elements
191                    // to higher indexes (ie age them)...
192                    // This should correctly move nothing if toShiftBy == historical.length.
193                    System.arraycopy(historical, 0, historical, (int) toShiftBy, (int) (historical.length - toShiftBy));
194    
195                    // Convert the oldCurrent to immutable form
196                    // and insert into the appropriate slot...
197                    final int oldest = (int) (toShiftBy - 1);
198                    historical[oldest] = oldCurrent.toEventVariableValue();
199    
200                    // Null out any lower (newer) slots that we now have no data for.
201                    if(toShiftBy > 1)
202                        { Arrays.fill(historical, 0, oldest, null); }
203    
204                    assert(historical[oldest] != null); // We now have at least one historical entry...
205                    }
206    
207    //System.out.println("EVVPR current/historical[] = " + current + "/" + Arrays.asList(historical));
208                }
209    
210    //        // State validation...
211    //        try { validateObject();  } catch(InvalidObjectException e) { throw new IllegalStateException(e); }
212            }
213    
214    
215        /**Get the specified event sets for the specified intervals; never null.
216         * This allows retrieval of zero or more event sets for the specified
217         * interval size.
218         * <p>
219         * Requests for more than SystemVariables.EVENT_SAMPLES_RETAINED in the
220         * past (or for the future!) cannot be satisfied and data will not be
221         * returned for them.
222         * <p>
223         * Usually not more than SystemVariables.EVENT_SAMPLES_RETAINED samples
224         * will be returned in response to any one request as a safety measure.
225         *
226         * @param firstIntervalNumber  the number of the first interval
227         *     for which data is potentially required;
228         *     if too far in the past or future then possibly no data
229         *     will be available;
230         *     using zero accesses the "all" bucket
231         * @param whichValues  each true bit represents a slot for which data is
232         *     required, bit 0 indicating data from the slot within which
233         *     firstIntervalTime is located, bit 1 the previous slot, etc
234         *
235         * @return as many of the requested values as available,
236         *     at least long enough to return all the available values,
237         *     with [0] corresponding to bit 0 in the BitSet;
238         *     may contain nulls or be zero-length but is never null
239         */
240        synchronized EventVariableValue[] getEventValues(final long firstIntervalNumber,
241                                                         final BitSet whichValues)
242            {
243            if(firstIntervalNumber < 0)
244                { throw new IllegalArgumentException(); }
245    
246            final boolean nullWV = (whichValues == null);
247    
248            // If firstIntervalNumber == 0, then exactly slot zero must be requested.
249            if(firstIntervalNumber == 0)
250                {
251                if(!nullWV && (!whichValues.get(0) || (whichValues.length() != 1)))
252                    { throw new IllegalArgumentException("request for 'all' bucket must be for exactly that bucket and no others"); }
253    
254                // Convert "all" bucket to correct form and return it...
255                return(new EventVariableValue[]{ all.toEventVariableValue() });
256                }
257    
258    //System.out.println("[EventVariableValuePeriodRow.getEventValues("+firstIntervalNumber+", "+whichValues+")]");
259    
260    //        // State validation...
261    //        try { validateObject();  } catch(InvalidObjectException e) { throw new IllegalStateException(e); }
262    
263            // If requested result size silly/huge, then give up now to save space, etc.
264            final int resultSize = nullWV ? 1 : whichValues.length();
265            final long currentIntervalNumber = current.getIntervalNumber();
266    
267            // Optimisations for where the request will definitely generate no results...
268            if(resultSize < 1)
269                { return(_NO_EVENT_VALUES); }
270            if(resultSize > 1 + 2*SystemVariables.EVENT_SAMPLES_RETAINED)
271                { return(_NO_EVENT_VALUES); }
272            // If newest interval requested is too much newer than the "current" interval,
273            // then give up now.
274            if(firstIntervalNumber - resultSize > currentIntervalNumber)
275                { return(_NO_EVENT_VALUES); }
276            // If newest interval requested is too far in the past, then give up now.
277            final long oldestPossibleEventNumber = currentIntervalNumber - historical.length;
278            if(firstIntervalNumber < oldestPossibleEventNumber)
279                { return(_NO_EVENT_VALUES); }
280    
281            // Make result...
282            // Don't attempt to optimise its length here, yet...
283            final EventVariableValue result[] = new EventVariableValue[resultSize];
284    
285            // Index in historical[index] corresponding to result[0].
286            final int offset = (int) ((currentIntervalNumber-1) - firstIntervalNumber);
287    
288    //System.out.println("EVVPR offset = " + offset);
289    //System.out.println("EVVPR current/historical[] = " + current + "/" + Arrays.asList(historical));
290    
291            // Walk the request list, filling in values where we have them.
292            // The "current" item corresponds to index -1 in historical[],
293            // and needs to be converted before being slotted into the result, if used.
294            // Note that this value will never count as "authoritative" since not closed.
295            // Remember: higher "i" means older (smaller intervalNumber) values.
296            BitSet wv = whichValues;
297            if(wv == null)
298                {
299                // Create synthetic BitSet for simplicity below.
300                wv = new BitSet(1);
301                wv.set(0);
302                }
303            for(int i = wv.nextSetBit(0); i >= 0; i = wv.nextSetBit(i+1))
304                {
305                final int hIndex = i + offset;
306    
307                if(hIndex == -1) // Current...
308                    { result[i] = current.toEventVariableValue(); }
309                else if((hIndex >= 0) && (hIndex < historical.length))
310                    { result[i] = historical[hIndex]; }
311                }
312    
313    //        // Extra validation of output...
314    //        assert(result != null) : "unexpected null return value";
315    //        for(int i = result.length; --i >= 0; )
316    //            {
317    //            final EventVariableValue evv = result[i];
318    //            if(evv == null) { continue; }
319    //            assert(evv.getIntervalNumber() == firstIntervalNumber - i) : "unexpected return value with wrong interval number";
320    //            }
321    
322    //        // State validation...
323    //        try { validateObject();  } catch(InvalidObjectException e) { throw new IllegalStateException(e); }
324    
325    //System.out.println("EVVPR result[] = " + Arrays.asList(result));
326    
327            return(result);
328            }
329    
330        /**Zero-length array corresponding to "no results", immutable so made available to the package; non-null. */
331        static final EventVariableValue[] _NO_EVENT_VALUES = new EventVariableValue[0];
332    
333    
334        /**Set/override the specified event sets for the specified intervals; never null.
335         * This is used to set values that have been fetched from upstream
336         * (ie closer to the master data store).
337         * <p>
338         * This may ignore the call,
339         * and certainly will if the intervalNumber is outside the current time window it holds.
340         * <p>
341         * This will generally accept the value and store it if:
342         * <ul>
343         * <li>The intervalNumber is within the current window.
344         * <li>The intervalSelector is a period for which this event has data.
345         * <li>The supplied value is "better" than any current one held, ie:
346         *     <ul>
347         *     <li>We hold a null and evv is non-null.
348         *     <li>We hold a non-authoritative value and evv is authoritative.
349         *     <li>The authoritative-ness is the same bu we hold an entry
350         *         with a lower total count than evv.
351         *     <ul>
352         *     </ul>
353         * </ul>
354         *
355         * @param evv  non-null event value
356         */
357        public synchronized void setEventValue(final EventVariableValue evv)
358            {
359            if(evv == null)
360                { throw new IllegalArgumentException(); }
361            final SimpleVariableDefinition def = evv.getDef();
362            if(!def.equals(this.def))
363                { throw new IllegalArgumentException(); }
364            if(!evv.getPeriod().equals(period))
365                { throw new IllegalArgumentException(); }
366    
367            // Make sure that our state is canonicalised.
368            _canonicaliseState();
369    
370            // Event of "current" set, newer (greater) than any we can set.
371            final long currentIntervalNumber = current.getIntervalNumber();
372    
373            // Interval number from our new data.
374            final long updateIntervalNumber = evv.getIntervalNumber();
375    
376            // Note that we may accept an override for the all/current periods,
377            // (but we don't really expect it to be marked as authoritative)
378            // by folding in events from upstream not seen here.
379            if(updateIntervalNumber == 0)
380                {
381                // Accept an update if we seem not to have seen all events.
382                if(all.getTotalEventCount() < evv.getTotalEventCount())
383                    {
384    if(IsDebug.isDebug) { System.out.println("[EVVPR.setEventValue("+evv+"): accepting all-period override (isAuth="+evv.isAuthoritative()+").]"); }
385                    all.update(evv);
386                    }
387                return;
388                }
389            else if(updateIntervalNumber == currentIntervalNumber)
390                {
391                // Accept an update if we seem not to have seen all events.
392                if(current.getTotalEventCount() < evv.getTotalEventCount())
393                    {
394    if(IsDebug.isDebug) { System.out.println("[EVVPR.setEventValue("+evv+"): accepting current-period override (isAuth="+evv.isAuthoritative()+") at period "+updateIntervalNumber+".]"); }
395                    current.update(evv);
396                    }
397                return;
398                }
399            // Too new for us to accept the update.
400            else if(updateIntervalNumber > currentIntervalNumber) { return; }
401    
402            final long oldestPossibleEventNumber = currentIntervalNumber - historical.length;
403            // Too old for us to accept the update.
404            if(updateIntervalNumber < oldestPossibleEventNumber) { return; }
405    
406            // Index in historical[index] corresponding to new value we have been passed.
407            final int offset = (int) ((currentIntervalNumber-1) - updateIntervalNumber);
408    
409            // If the new data is "better" in some way
410            // then accept it in place of what we have already.
411            //
412            // Better is any of:
413            //   * We have now old value at all.
414            //   * We have a non-auth value and the new data is authoritative.
415            //   * We have a non-auth value and the new data has more events
416            //     (and this is dealt with specially for the current interval).
417            final EventVariableValue extant = historical[offset];
418            if((extant == null) ||
419               (!extant.isAuthoritative() && evv.isAuthoritative()) ||
420               (extant.getTotalEventCount() < evv.getTotalEventCount()))
421                {
422    //if(IsDebug.isDebug) { System.out.println("EVVPR.setEventValue("+evv+"): accepting override (isAuth="+evv.isAuthoritative()+") at period "+updateIntervalNumber+" (offset="+offset+")"); }
423                historical[offset] = evv;
424                }
425    
426            assert((extant == null) || (extant.getIntervalNumber() == updateIntervalNumber));
427    
428    //        try { validateObject(); } catch(InvalidObjectException e) { e.printStackTrace();  }
429            }
430    
431        /**Deserialise.
432         */
433        private void readObject(final ObjectInputStream ois)
434            throws IOException, ClassNotFoundException
435            {
436            ois.defaultReadObject();
437    
438            // If deserialising from an old object without an "all" field,
439            // create it empty now.
440            if(all == null) { all = new EventVariableValueBuffer(def, period, 0); }
441    
442            // Validate the state.
443            validateObject();
444            }
445    
446    
447    
448        /**Check that the object state is consistent and legal.
449         * Is synchronized to prevent changes of state while this instance is examined.
450         *
451         * @throws InvalidObjectException
452         */
453        public synchronized void validateObject()
454            throws InvalidObjectException
455            {
456            if(def == null)
457                { throw new InvalidObjectException("bad object: def must not be null"); }
458            if(!def.isEvent())
459                { throw new InvalidObjectException("bad object: def not an event type"); }
460    
461            if(period == null)
462                { throw new InvalidObjectException("bad object: period must not be null"); }
463    
464            if(current == null)
465                { throw new InvalidObjectException("bad object: current must not be null"); }
466            if((current.getPeriod() != period) || !current.getDef().equals(def))
467                { throw new InvalidObjectException("bad object: current def/period incorrect"); }
468            if(current.getIntervalNumber() <= 0)
469                { throw new InvalidObjectException("bad object: current interval invalid"); }
470    
471            if(all == null)
472                { throw new InvalidObjectException("bad object: all must not be null"); }
473            if((all.getPeriod() != period) || !all.getDef().equals(def))
474                { throw new InvalidObjectException("bad object: all def/period incorrect"); }
475            if(all.getIntervalNumber() != 0)
476                { throw new InvalidObjectException("bad object: all interval non-zero"); }
477    
478            if(all == current)
479                { throw new InvalidObjectException("bad object: all == current"); }
480    
481            if(historical == null)
482                { throw new InvalidObjectException("bad object: historical must not be null"); }
483            if(historical.length != SystemVariables.EVENT_SAMPLES_RETAINED)
484                { throw new InvalidObjectException("bad object: historical wrong length"); }
485            for(int i = historical.length; --i >= 0; )
486                {
487                final EventVariableValue evv = historical[i];
488                if(evv != null)
489                    {
490                    if((evv.getPeriod() != period) || !evv.getDef().equals(def))
491                        { throw new InvalidObjectException("bad object: historical def/period incorrect"); }
492                    final long expectedIntervalNumber = current.getIntervalNumber() - 1 - i;
493                    if(evv.getIntervalNumber() != expectedIntervalNumber)
494                        { throw new InvalidObjectException("bad object: historical intervalNumber incorrect"); }
495                    }
496                }
497            }
498    
499        /**Unique Serialisation class ID. */
500        private static final long serialVersionUID = -614477516323489097L;
501        }