001    package org.hd.d.pg2k.svrCore;
002    
003    import java.util.Date;
004    import java.util.Iterator;
005    import java.util.LinkedHashMap;
006    import java.util.Map;
007    
008    
009    /**Mechanism to check for replayed/duplicate messages by ID within a specified time window.
010     * This provides an optional fixed upper size;
011     * attempts to insert more items than the ceiling capacity
012     * will result in old items being removed in age order regardless of age.
013     * <p>
014     * Other after meeting any size limit,
015     * this always keeps entries for a specified minimum time.
016     * Optionally a larger time window may be used
017     * if the the system is not starved of memory.
018     * <p>
019     * Thread-safe.
020     * <p>
021     * A lock can be held on instances of this object to make compound operations atomic.
022     * <p>
023     * Does not support the full Map interface.
024     * <p>
025     * This aggressively trims dead/expired entries so as to minimise footprint.
026     */
027    public final class DuplicateIDChecker<K> implements MemoryTools.Compactable, MemoryTools.CacheMiniMap<K, Long>
028        {
029        /**Default load factor. */
030        public static final float DEFAULT_LOAD_FACTOR = 0.7f;
031    
032        /**Underlying LinkedHashMap on which this is based; never null. */
033        private final LinkedHashMap<K,Long> lhm;
034    
035        /**Maximum age that we will retain IDs for (ms); strictly positive. */
036        private final int minRetentionMs;
037    
038        /**Optional higher maximum age that we will retain IDs for when there's plenty of memory available (ms); strictly positive.
039         * Ignored unless higher than minRetentionMs.
040         */
041        private final int preferredRetentionMs;
042    
043        /**Name of this instance for diagnostics purposes; null if none. */
044        private final String name;
045    
046        /**Get name of this container instance for tracking purposes, or null if none. */
047        public String getCompactableInstanceName() { return(name); }
048    
049        /**Create an instance with the given parameters.
050         * The standard default is used for the load factor,
051         * and the map size is effectively unlimited.
052         *
053         * @param minRetentionMs  the minimum time that we will keep an ID for (ms);
054         *    strictly positive
055         * @param name  name of this instance for diagnostics purposes; null if none
056         */
057        public static <K> DuplicateIDChecker<K> create(final int minRetentionMs,
058                                                       final String name)
059            { return(create(minRetentionMs, minRetentionMs, Integer.MAX_VALUE, DEFAULT_LOAD_FACTOR, name)); }
060    
061        /**Create an instance with the given parameters.
062         * The standard default is used for the load factor,
063         * and the map size is effectively unlimited.
064         *
065         * @param minRetentionMs  the minimum time that we will keep an ID for (ms);
066         *    strictly positive
067         * @param preferredRetentionMs  preferred time to retain entries if there is plenty of free memory
068         *    and ineffective/ignored unless larger than minRetentionMs;
069         *    strictly positive
070         * @param name  name of this instance for diagnostics purposes; null if none
071         */
072        public static <K> DuplicateIDChecker<K> create(final int minRetentionMs,
073                                                       final int preferredRetentionMs,
074                                                       final String name)
075            { return(create(minRetentionMs, preferredRetentionMs, Integer.MAX_VALUE, DEFAULT_LOAD_FACTOR, name)); }
076    
077        /**Create an instance with the given parameters.
078         * @param minRetentionMs  the minimum time that we will keep an ID for (ms);
079         *    strictly positive
080         * @param preferredRetentionMs  preferred time to retain entries if there is plenty of free memory
081         *    and ineffective/ignored unless larger than minRetentionMs;
082         *    strictly positive
083         * @param maxCapacity  the maximum capacity; strictly positive
084         * @param loadFactor  the load factor of the underlying hash table;
085         *     in the range 0.0f to 1.0f exclusive (typically ~0.7f)
086         * @param name  name of this instance for diagnostics purposes; null if none
087         */
088        public static <K> DuplicateIDChecker<K> create(final int minRetentionMs,
089                                                       final int preferredRetentionMs,
090                                                       final int maxCapacity,
091                                                       final float loadFactor,
092                                                       final String name)
093            {
094            final DuplicateIDChecker<K> result =
095                new DuplicateIDChecker<K>(minRetentionMs, preferredRetentionMs, maxCapacity, loadFactor, name);
096            MemoryTools.registerCompactable(result);
097            return(result);
098            }
099    
100        /**Create an instance with the given parameters.
101         * @param minRetentionMs  the minimum time that we will keep an ID for (ms);
102         *    strictly positive
103         * @param preferredRetentionMs  preferred time to retain entries if there is lots of free memory
104         *    and ineffective/ignored unless larger than minRetentionMs;
105         *    strictly positive
106         * @param maxCapacity  the maximum capacity; strictly positive
107         * @param loadFactor  the load factor of the underlying hash table;
108         *     in the range 0.0f to 1.0f exclusive (typically ~0.7f)
109         */
110        private DuplicateIDChecker(final int minRetentionMs,
111                                   final int preferredRetentionMs,
112                                   final int maxCapacity,
113                                   final float loadFactor,
114                                   final String name)
115            {
116            if((maxCapacity <= 0) ||
117               (minRetentionMs <= 0) || (preferredRetentionMs < 0) ||
118               (loadFactor <= 0) || (loadFactor >= 1))
119                { throw new IllegalArgumentException(); }
120    
121            this.name = name;
122            this.minRetentionMs = minRetentionMs;
123            this.preferredRetentionMs = preferredRetentionMs;
124    
125            // Build insertion-order map with override for removal of oldest entry when map too big.
126            lhm = new LinkedHashMap<K,Long>(16, loadFactor, false){
127                @Override
128                protected final boolean removeEldestEntry(final Map.Entry<K, Long> entry)
129                    {
130                    final int s = size();
131    
132                    // If over-size then zap this oldest entry.
133                    if(s > maxCapacity) { return(true); }
134    
135                    // Zap an entry if expired, ie too old to matter.
136                    return(hasExpired(entry));
137                    }
138                /**Unique Serialisation class ID generated by http://random&#46;hd&#46;org/. */
139                private static final long serialVersionUID = 3013203108802112945L;
140                };
141            }
142    
143        /**Try to trim all dead entries to reduce memory footprint.
144         * Will attempt to minimise memory footprint.
145         */
146        public synchronized void compact()
147            {
148    if(!lhm.isEmpty() && MemoryTools.isMemoryStressed()) { System.err.println(this); } // Log data retention *before* emergency prune.
149            _compact();
150            }
151    
152        /**Try to trim all dead entries to reduce memory footprint.
153         * Will attempt to minimise memory footprint.
154         * <p>
155         * Must only be called while the instance lock is held.
156         */
157        public void _compact()
158            {
159            assert(Thread.holdsLock(this)); // Lock must be held.
160    
161            final long thresholdTime = thresholdTime(); // One computation however many entries we discard.
162            while(!lhm.isEmpty())
163                {
164                // Discard the oldest entry if expired so as to trim the map size.
165                final Iterator<Map.Entry<K,Long>> it = lhm.entrySet().iterator();
166                final Map.Entry<K,Long> oldest = it.next();
167                // Stop when we encounter a non-expired entry.
168                // (All others can be assume newer and thus not expired.)
169                if(!hasExpired(oldest, thresholdTime))
170                    { break; }
171                // Zap this expired entry.
172                it.remove();
173                }
174            }
175    
176        /**Returns true if we have a mapping from the supplied key (message ID), false otherwise.
177         * This is equivalent to checking if this is a duplicate message.
178         * <p>
179         * This ignores expired values and does not add anything to the collection.
180         */
181        public synchronized boolean containsKey(final K key) { _compact(); return(lhm.containsKey(key)); }
182    
183        /**Record an ID, returning the insertion time of the previous insertion of this ID if any. */
184        public synchronized Long add(final K key) { _compact(); return(lhm.put(key, new Long(System.currentTimeMillis()))); }
185    
186        /**Remove an ID and return the insertion time of the removed ID, if any, else null. */
187        public synchronized Long remove(final K key) { _compact(); return(lhm.remove(key)); }
188    
189        /**Returns the timestamp of the oldest entry currently in this Map, or null if none. */
190        public synchronized Long ageOfOldestEntry()
191            {
192            _compact();
193            final Iterator<Long> it = lhm.values().iterator();
194            if(!it.hasNext()) { return(null); } // Map is empty...
195            return(it.next());
196            }
197    
198        /**Clear the structure. */
199        public synchronized void clear() { lhm.clear(); }
200    
201        /**Return the number of non-expired IDs held; non-negative. */
202        public synchronized int size() { _compact(); return(lhm.size()); }
203    
204        /**Provide a human-readable summary of status.
205         * Useful for debugging/tuning, for example.
206         */
207        @Override public synchronized String toString()
208            {
209            final Long oldest = ageOfOldestEntry();
210            return("DuplicateIDChecker: name="+name+" size="+lhm.size() + (((null == oldest) ? "" : " oldest@"+(new Date(oldest)))));
211            }
212    
213        /**Returns true when an entry has expired and can be removed.
214         * @param entry  full entry including the timestamp; never null
215         * @param thresholdTime  expiry threshold time to use,
216         *     an older entry should be expired
217         */
218        private boolean hasExpired(final Map.Entry<K, Long> entry, final long thresholdTime)
219            {
220            final long timestamp = entry.getValue().longValue();
221            return(timestamp < thresholdTime);
222            }
223    
224        /**If true then be eager (super-linear) in discarding entries as free memory declines.
225         * Unless the retained items are especially large this can be false.
226         */
227        private static final boolean EAGER_UNLOAD = false;
228    
229        /**Compute threshold time for entry expiry.
230         * Lifetime drops as free space falls beneath our target.
231         * <p>
232         * This retains more entries if there is more visibly-free heap
233         * (though this may give up entries eagerly, ie faster than linear)
234         *  so that transients are less likely to discard a slow-to-recover window.
235         */
236        private long thresholdTime()
237            {
238            final long now = System.currentTimeMillis();
239    
240            // Expire an entry ASAP (minimum lifetime)
241            // if there is no higher (preferred) time set.
242            if(preferredRetentionMs <= minRetentionMs)
243                { return(now - minRetentionMs); }
244    
245            // Adjust lifetime in inverse proportion to heap shortage.
246            final int pcFree = MemoryTools.percentFreeWithinTarget();
247            if(pcFree < 1) // Dire shortage: minimise entry lifetime.
248                { return(now - minRetentionMs); }
249            if(pcFree > 99) // Loads of space: maximise entry lifetime.
250                { return(now - preferredRetentionMs); }
251    
252            if(EAGER_UNLOAD)
253                {
254                // Compute initially-fast-declining hyperbolic.
255                final int pcRecip = 100 / (100 - pcFree);
256    
257                // Interpolate linearly (avoiding overflow).
258                // As the percentage free goes up so does the entry lifetime.
259                final long retentionMs =
260                    ((((long)pcRecip) * preferredRetentionMs) + ((100L - pcRecip) * minRetentionMs)) / 100;
261    
262                return(now - retentionMs);
263                }
264    
265            // Linear discard as free memory reduces...
266            // As the percentage free goes up so does the entry lifetime, linearly.
267            final long retentionMs =
268                ((((long)pcFree) * preferredRetentionMs) + ((100L - pcFree) * minRetentionMs)) / 100;
269    
270            return(now - retentionMs);
271            }
272    
273        /**Returns true when an entry has expired and can be removed.
274         * Computes the expiry threshold itself,
275         * so is suitable for one-off expiry attempts.
276         *
277         * @param entry  full entry including the timestamp; never null
278         */
279        private boolean hasExpired(final Map.Entry<K, Long> entry)
280            { return(hasExpired(entry, thresholdTime())); }
281    
282        public synchronized Long get(final K key) { _compact(); return(lhm.get(key)); }
283    
284        // General put() operation not allowed.
285        public Long put(final K key, final Long value) { throw new RuntimeException("NOT IMPLEMENTED"); }
286        }