001    package org.hd.d.pg2k.svrCore.datasource;
002    
003    import java.io.BufferedOutputStream;
004    import java.io.ByteArrayInputStream;
005    import java.io.ByteArrayOutputStream;
006    import java.io.DataInputStream;
007    import java.io.DataOutputStream;
008    import java.io.FilterInputStream;
009    import java.io.IOException;
010    import java.io.InputStream;
011    import java.io.InterruptedIOException;
012    import java.io.InvalidObjectException;
013    import java.io.ObjectInputStream;
014    import java.io.ObjectOutputStream;
015    import java.io.OutputStream;
016    import java.io.Serializable;
017    import java.nio.ByteBuffer;
018    import java.security.InvalidKeyException;
019    import java.security.Key;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.BitSet;
023    import java.util.Collections;
024    import java.util.Date;
025    import java.util.Iterator;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Properties;
029    import java.util.concurrent.Callable;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.atomic.AtomicReference;
032    import java.util.concurrent.locks.ReentrantLock;
033    import java.util.regex.Pattern;
034    
035    import javax.crypto.Mac;
036    import javax.crypto.SecretKey;
037    import javax.management.RuntimeErrorException;
038    
039    import org.hd.d.pg2k.svrCore.AllExhibitImmutableData;
040    import org.hd.d.pg2k.svrCore.AllExhibitProperties;
041    import org.hd.d.pg2k.svrCore.AllExhibitPropertiesDelta;
042    import org.hd.d.pg2k.svrCore.AllExhibitPropertiesDelta.DiffException;
043    import org.hd.d.pg2k.svrCore.CompressionLevel;
044    import org.hd.d.pg2k.svrCore.CoreConsts;
045    import org.hd.d.pg2k.svrCore.DefInputStream;
046    import org.hd.d.pg2k.svrCore.DefOutputStream;
047    import org.hd.d.pg2k.svrCore.ExhibitName;
048    import org.hd.d.pg2k.svrCore.ExhibitStaticAttr;
049    import org.hd.d.pg2k.svrCore.ExhibitThumbnails;
050    import org.hd.d.pg2k.svrCore.FileTools;
051    import org.hd.d.pg2k.svrCore.GenUtils;
052    import org.hd.d.pg2k.svrCore.MemoryTools;
053    import org.hd.d.pg2k.svrCore.Name;
054    import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
055    import org.hd.d.pg2k.svrCore.PGMasterNotInServiceException;
056    import org.hd.d.pg2k.svrCore.ROByteArray;
057    import org.hd.d.pg2k.svrCore.Rnd;
058    import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
059    import org.hd.d.pg2k.svrCore.Stratum;
060    import org.hd.d.pg2k.svrCore.TextUtils;
061    import org.hd.d.pg2k.svrCore.Tuple;
062    import org.hd.d.pg2k.svrCore.Tuple.Pair;
063    import org.hd.d.pg2k.svrCore.WrappedByteArrayCharSequence;
064    import org.hd.d.pg2k.svrCore.props.GenProps;
065    import org.hd.d.pg2k.svrCore.stats.StatsLogger;
066    import org.hd.d.pg2k.svrCore.vars.BasicVarMgr;
067    import org.hd.d.pg2k.svrCore.vars.EventPeriod;
068    import org.hd.d.pg2k.svrCore.vars.EventVariableValue;
069    import org.hd.d.pg2k.svrCore.vars.InstanceID;
070    import org.hd.d.pg2k.svrCore.vars.SimpleVariableDefinition;
071    import org.hd.d.pg2k.svrCore.vars.SimpleVariableValue;
072    import org.hd.d.pg2k.svrCore.vars.SystemVariables;
073    
074    import ORG.hd.d.IsDebug;
075    
076    /**Exhibit pipeline stage that fetches its data across a master/slave tunnel.
077     * Some of the visible types and values are used by the implementations
078     * of both ends of the tunnel (eg the tunnel servlet)
079     * to help implement the shared protocol.
080     * <p>
081     * This takes data from the server's own DataSourceBean by default.
082     */
083    public abstract class ExhibitDataTunnelSource implements SimpleExhibitPipelineIF
084        {
085        /**Create an instance, passing in a (non-null) logger to use. */
086        public ExhibitDataTunnelSource(final SimpleLoggerIF logger)
087            {
088            if(logger == null) { throw new IllegalArgumentException(); }
089            this.logger = logger;
090            statsIDTS =
091                new StatsLogger.StatsConfig("TUNNELSOURCE",
092                                            logger, // Use servlet log if poss.
093                                            false, // Only dump summaries...
094                                            12 * 3600, // About every 12 hours.
095                                            true); // Adaptive.
096            }
097    
098        /**If true, try GC and running finalizers after some connection failures.
099         * If we think that connection failure may have been caused by
100         * failure to release resources (elsewhere in the VM!)
101         * then try to force a clean-up from time to time.
102         * But, in case we are wrong, do not do this too often anyway.
103         */
104        private static final boolean FORCE_GC_AND_FINALISERS_AFTER_POLL_FAIL = true;
105    
106        /**Minimum wait after failed tunnel call before retry (ms); strictly positive.
107         * A few tens of milliseconds to a second is probably best,
108         * to recover from transient I/O problems quickly,
109         * but without actually busy-waiting...
110         */
111        public static final int FAIL_RETRY_WAIT_MIN_MS = 101;
112    
113        /**Maximum wait after failed tunnel call before retry (ms).
114         * Several minutes is probably good,
115         * to cope with a crashed master server (and/or to wait for it to reboot),
116         * or a failed connection,
117         * but too long and our distributed variables will fail needlessly.
118         * <p>
119         * Most requests during the wait will be vetoed cleanly and quickly
120         * allowing timely service to the slave's clients.
121         * <p>
122         * We vary this limit slightly between slaves/instances to help avoid them
123         * colliding with one another especially if hitting this upper bound.
124         */
125        public static final int FAIL_RETRY_WAIT_MAX_MS = 2 * 60 * 1000 +
126            (1 | Rnd.fastRnd.nextInt(1 * 60 * 1000));
127    
128        /**If true, allow connection/resource sharing between calls if possible.
129         * We may also periodically check that we can still contact the master.
130         * <p>
131         * An open connection would probably improve TCP efficiency
132         * and reduce latency for an underlying HTTP[S] connection, for example.
133         * (We might allow HTTP/1.1 streaming and connection sharing to be
134         * done for us by the JDK runtime if this is true.)
135         */
136        protected static final boolean KEEP_SERVER_CONNECTION_ALIVE = true;
137    
138        /**If true, extra instrumentation for protocol debug; definitely not for release code. */
139        protected static final boolean _protocolDebug = false /* && ORG.hd.d.IsDebug.isDebug */ ;
140    
141        /**Our unique client-side end-point identifier.
142         * The ID goes into all outgoing variable updates (and possibly other data)
143         * going upstream to the master to uniquely identify this client.
144         * <p>
145         * This is unique to each tunnel client end-point instance
146         * and is created afresh on each run,
147         * so a rebooted client gets a new ID.
148         * <p>
149         * Uses secure/good generator to try to ensure global uniqueness.
150         * (This may be expensive to compute;
151         * we might be able to defer this until first needed.)
152         */
153        public final SimpleVariableValue uniqueClientID =
154            new SimpleVariableValue(SystemVariables.LOCAL_SYS_ID, InstanceID.createInstanceID());
155    
156    
157        /**Our logger which falls back to System.out if servlet log not available; never null. */
158        protected final SimpleLoggerIF logger;
159    
160        /**The stats set to which we log general tunnel source stats.
161         * The unique codes are the constants TSNAME_XXX.
162         */
163        private final StatsLogger.StatsConfig statsIDTS;
164    
165        /**General stats event name: RPC request. */
166        public static final String TSNAME_RPCREQUEST = "RPC-request";
167    
168        /**General stats event name: RPC failure due to an IOException. */
169        public static final String TSNAME_RPCIOEX = "RPC-IOException";
170    
171        /**General stats event name prefix: RPC request packet type. */
172        public static final String TSNAMEPR_RPCTYPE = "RPC-type-";
173    
174        /**General stats event name: "short" raw data read, less than bulk-transfer block size.. */
175        public static final String TSNAME_SHORTREAD = "shortRead";
176    
177        /**Convenience value; (immutable) zero-length byte array for use as an empty packet payload.
178         * Used by local classes and TunnelServlet; package visible.
179         */
180        public static final byte[] EMPTY_PAYLOAD = new byte[0];
181    
182        /**Time when last successful connection was made, or 0 if no connection..
183         * Declared volatile so that it can be accessed without a lock.
184         * <p>
185         * Set by _noteResult() after a good exchange with the master;
186         * never cleared and never null.
187         */
188        private volatile long lastSuccessfulConnectionTime;
189    
190        /**Get time of last successful connection, or 0 if none. */
191        public long getLastSuccessfulConnectionTime()
192            { return(lastSuccessfulConnectionTime); }
193    
194        /**Time before which we should not again try to contact the master; null if master is fine.
195         * Declared volatile so that it can be accessed without a lock.
196         * <p>
197         * Cleared to null by _noteResult() after a good exchange with the master;
198         * the wait to another attempt is approximately doubled after a failed exchange.
199         */
200        private volatile Long doNotTryMasterUntil;
201    
202        /**Returns true if we currently cannot talk to the master, false otherwise. */
203        public boolean isBroken()
204            {
205            final Long notUntil = doNotTryMasterUntil;
206            return((notUntil != null) &&
207                   (notUntil.longValue() > System.currentTimeMillis()));
208            }
209    
210        /**Successive failure count; reset to zero upon success.
211         * Private to _noteResult() and accessed under its private lock.
212         * <p>
213         * Cleared to zero on a successful packet exchange;
214         * incremented after each failure.
215         */
216        private int successiveFailureCount;
217    
218        /**Private lock for _noteResult. */
219        private final Object _nR_lock = new Object();
220    
221        /**Immutable empty result indicating no event values available. */
222        private static final EventVariableValue[] NO_EVENT_VALUES = new EventVariableValue[0];
223    
224        /**Routine to note success or failure of RPC call and adjust control variables.
225         * Has its own private lock to ensure atomic update of control variables.
226         *
227         * @param success  call with this true immediately after successful RPC exchange with master;
228         *     call with false otherwise
229         */
230        private void _noteResult(final boolean success)
231            {
232            synchronized(_nR_lock)
233                {
234                final long now = System.currentTimeMillis();
235                if(success)
236                    {
237                    // Successful RPC exchange with master.
238                    lastSuccessfulConnectionTime = now;
239                    doNotTryMasterUntil = null;
240                    successiveFailureCount = 0;
241                    return;
242                    }
243    
244                // Failed RPC exchange with the master...
245                if(++successiveFailureCount < 0)
246                    { successiveFailureCount = Integer.MAX_VALUE; }
247    
248                // Compute first-cut back-off time.
249                // Double for each successive failure.
250                final long approxBackoff = ((long) ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MIN_MS) <<
251                    Math.min(32, successiveFailureCount);
252    
253                // Coerce to reasonable bounds...
254                final int waitBounded = (int) Math.min(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MAX_MS,
255                    Math.max(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MIN_MS, approxBackoff));
256    
257                // Roughly double the pause before the next connection attempt,
258                // with a random component to help avoid inter-slave collisions.
259                final long newWaitTime = Math.min(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MAX_MS,
260                    waitBounded/2 + Rnd.fastRnd.nextInt(waitBounded));
261    
262                // Store the new "do not try before" time...
263                doNotTryMasterUntil = new Long(now + newWaitTime);
264                }
265            }
266    
267        /**Does a NO-OP on the server.
268         * If this returns without throwing an exception,
269         * then the connection to the back-end (master) server is probably OK.
270         *
271         * @param unguarded  if true, this ignores any recent connection problems
272         *     and immediately tries to contact the master,
273         *     else it behaves like other operations and is quickly vetoed
274         *     while the master is failing
275         *
276         * @throws java.io.IOException  in case of trouble communicating with master.
277         */
278        public void doNOOP(final boolean unguarded)
279            throws IOException
280            {
281            final RawPacket response;
282            if(unguarded)
283                {
284                response = doRPCUnguarded(
285                    new RawPacket(RawPacket.OpCode.NOOP, ExhibitDataTunnelSource.EMPTY_PAYLOAD));
286                }
287            else
288                {
289                response = doRPC(
290                    new RawPacket(RawPacket.OpCode.NOOP, ExhibitDataTunnelSource.EMPTY_PAYLOAD));
291                }
292            if(response.plLength != 0)
293                { throw new IOException("invalid non-empty response to NOOP request"); }
294            return;
295            }
296    
297        /**Get the static attributes for a given exhibit.
298         * Returns null if the named exhibit does not exist.
299         * <p>
300         * Sends name as a UTF string and allows compression.
301         * (We use UTF since we know that the length is limited and
302         * the all-ASCII nature of valid names should yield
303         * one-byte-per-char encoding before compression.)
304         * <p>
305         * This will fail with a null name argument.
306         * <p>
307         * (Big secret: if this class is used behind, for example,
308         * ExhibitDataSimpleCache, this method will never be called
309         * because all such calls are answered direct from the cache.)
310         */
311        public ExhibitStaticAttr getStaticAttr(final ExhibitFull name)
312            throws IOException
313            {
314            final ByteArrayOutputStream baos =
315                new ByteArrayOutputStream(name.length());
316            final DataOutputStream dos = new DataOutputStream(baos);
317            dos.writeUTF(name.toString());
318            final byte data[] = baos.toByteArray();
319            dos.close(); // Attempt to free some resources quickly.
320    
321            final RawPacket request = new RawPacket(
322                RawPacket.OpCode.GetStaticAttr,
323                data);
324    
325            final RawPacket response = doRPC(request);
326            return((ExhibitStaticAttr) response.getSerializedObjectPayload());
327            }
328    
329        /**Get a chunk of the raw exhibit binary.
330         * The start position must be non-negative.
331         * <p>
332         * Sends name as a UTF string and allows compression of the request.
333         * (We use UTF since we know that the length is limited and
334         * the all-ASCII nature of valid names should yield
335         * one-byte-per-char encoding before compression.)
336         * <p>
337         * We send the start and (computed) afterEnd arguments as int values.
338         * <p>
339         * We return the data as-is.
340         *
341         *
342         * @throws java.lang.IllegalArgumentException  for blatantly invalid name,
343         *     or non-positive length
344         */
345        public void getRawFile(final ByteBuffer buf,
346                               final Name.ExhibitFull exhibitName,
347                               final int position,
348                               final boolean dontCache)
349            throws IOException
350            {
351            // We check that that the name is not blatantly illegal...
352            if(!ExhibitName.validNameSyntaxBasic(exhibitName))
353                { throw new IllegalArgumentException("invalid name"); }
354    
355            // We don't allow zero-byte requests over the expensive tunnel.
356            int len = buf.limit() - buf.position();
357            if(len < 1)
358                { throw new IllegalArgumentException("request length must be positive"); }
359    
360            // Limit any request to a size that the back-end would allow.
361            if(len > MAX_USER_READ_SIZE)
362                { len = MAX_USER_READ_SIZE; }
363    
364            if(ExhibitDataTunnelSource._protocolDebug)
365                {
366                System.err.println("ExhibitDataTunnelSource.getRawFile(" +
367                    "len="+len+", dontCache="+dontCache+")");
368                }
369    
370            // Do the transfer...
371            final int afterEnd = position + len;
372    
373    //        final ByteArrayOutputStream baos =
374    //            new ByteArrayOutputStream(11 + exhibitName.length());
375    //        final DataOutputStream dos = new DataOutputStream(baos);
376    //        dos.writeUTF(exhibitName.toString());
377    //        dos.writeInt(position);
378    //        dos.writeInt(afterEnd);
379    //        dos.writeBoolean(dontCache);
380    //        final byte data[] = baos.toByteArray();
381    //        dos.close(); // Attempt to free some resources quickly.
382    
383            // Optimised creation of outgoing packet.
384            final int nameLength = exhibitName.length();
385            final byte[] data = new byte[11 + nameLength];
386            data[0] = (byte) (nameLength >>> 8);
387            data[1] = (byte) nameLength;
388            exhibitName.writeToByteArray(data, 2);
389            data[2 + nameLength] = (byte) (position >>> 24);
390            data[3 + nameLength] = (byte) (position >>> 16);
391            data[4 + nameLength] = (byte) (position >>>  8);
392            data[5 + nameLength] = (byte) position;
393            data[6 + nameLength] = (byte) (afterEnd >>> 24);
394            data[7 + nameLength] = (byte) (afterEnd >>> 16);
395            data[8 + nameLength] = (byte) (afterEnd >>>  8);
396            data[9 + nameLength] = (byte) afterEnd;
397            data[10 + nameLength] = (byte) (dontCache ? 1 : 0);
398    
399            final RawPacket request = new RawPacket(
400                RawPacket.OpCode.GetRawFile,
401                data);
402    
403            final RawPacket response = doRPC(request);
404            response.getPayloadCopy(buf); // Straight into buffer for efficiency.
405            }
406    
407        /**Gets all static exhibit data if its timestamp is not that specified.
408         * If the time specified is negative the object will be returned unconditionally.
409         * <p>
410         * If no exhibits are currently installed a default set with a zero
411         * timestamp is returned.
412         * <p>
413         * If the caller's copy appears to be up-to-date (eg the oldStamp
414         * matches that that we would have been returned) null is returned.
415         */
416        public AllExhibitImmutableData getAllExhibitImmutableData(final long oldStamp)
417            throws IOException
418            {
419            final RawPacket response = doRPC(
420                new RawPacket(RawPacket.OpCode.GetAllExhibitImmutableData, ExhibitDataTunnelSource.longSer(oldStamp)));
421            return((AllExhibitImmutableData) response.getSerializedObjectPayload());
422            }
423    
424        /**Gets set of all exhibit properties if its hash is not that specified.
425         * If the hash specified is negative the object will be returned unconditionally.
426         * <p>
427         * If no exhibits are currently installed
428         * then a default set with a zero timestamp is returned.
429         * <p>
430         * If the caller's copy appears to be up-to-date
431         * (eg the oldHash matches that that would have been returned)
432         * then null is returned.
433         * <p>
434         * Because the AEP drives the entire Gallery,
435         * we always let the request go over the tunnel even if recent calls have failed.
436         * For example, we don't want mirrors to remain out of sync for too long.
437         * <p>
438         * This also typically takes a lot of bandwidth and CPU/wallclock time,
439         * so is optimised and stripped down to a minimum,
440         * and relies on the AEP deserialisation for serious error checking.
441         * <p>
442         * It is hoped that using the streamed form will overlap I/O and CPU work
443         * and thus get the new AEP into the client quicker and smoother.
444         * <p>
445         * We attempt to be good citizens and note most RPC successes/failures
446         * to help maintain the tunnel's view of the master's status.
447         * <p>
448         * (Because satisfying these calls may take a long time,
449         * the upstream server may veto concurrent queries that return anything
450         * other than null.)
451         * <p>
452         * As a further attempt to minimise time and bandwidth,
453         * we may try the "diff" version of this call,
454         * falling back to the normal version if this fails.
455         */
456        public AllExhibitProperties getAllExhibitProperties(final long oldHash)
457            throws IOException
458            {
459            final RawPacket packetOut = new RawPacket(RawPacket.OpCode.GetAllExhibitProperties, ExhibitDataTunnelSource.longSer(oldHash));
460    
461            // The RPC may take a long time to compute the response to, so have a big read timeout.
462            final Pair<Integer, InputStream> stream = doRPCRawWithStreamResponse(packetOut, true);
463            if((stream.first != null) && (stream.first.intValue() == 0))
464                {
465                // An empty response means a null AEP response.
466                stream.second.close();
467                _noteResult(true); /* Note RPC success. */
468                return(null);
469                }
470    
471            // Actually deserialise from the input stream.
472            final ObjectInputStream ois = new ObjectInputStream(stream.second);
473            // For robustness, treat a class mismatch problem like a data problem.
474            // We rely on the AllExhibitProperties deserialisation for most error checking.
475            try
476                {
477                final AllExhibitProperties result = (AllExhibitProperties) ois.readObject();
478                _noteResult(true); /* Note RPC success. */
479    
480                // Warn if the remote end of the tunnel seems to have sent us a redundant value,
481                // ie sent us something we already have with the hash code that we specified.
482                if((result != null) && (result.longHash == oldHash))
483                    {
484                    final String message = "WARNING: tunnel fetched AEP with existing hash: " + oldHash;
485                    System.err.println(message);
486                    logger.log(message);
487                    }
488    
489                return(result);
490                }
491            catch(final ClassNotFoundException e) { throw new IOException(e.getMessage()); }
492            catch(final InterruptedIOException e)
493                {
494                // Not a failure: can retry later...
495                throw e; /* Rethrow error as-is. */
496                }
497            catch(final IOException e)
498                {
499                _noteResult(false); /* Note RPC failure. */
500                throw e; /* Rethrow error as-is. */
501                }
502            finally { ois.close(); }
503            }
504    
505        /**Maximum level of compression in AEP "diff" RPC supported in this implementation, client or server side. */
506        public static final CompressionLevel MAX_AEP_DIFF_COMP_LEVEL = CompressionLevel.LZMA; // GenUtils.MAX_SUPPORTED_COMPRESSION_LEVEL;
507    
508        /**Gets set of all exhibit properties if not that specified, attempting minimise data transferred across the tunnel.
509         * This is a tunnel-specific optimisation to minimise bandwidth.
510         * For some clients this will save money and time.
511         * <p>
512         * If the AEP specified is null then the remote AEP will be fetched and returned unconditionally.
513         * <p>
514         * If no exhibits are currently installed
515         * then a default set with a zero timestamp is returned.
516         * <p>
517         * If the caller's copy appears to be up-to-date
518         * (eg the oldHash matches that that would have been returned)
519         * then null is returned.
520         * <p>
521         * This is an attempted optimisation of the getAllExhibitProperties(long oldHash),
522         * returning a diff or extra-highly-compressed AEP representation.
523         * <p>
524         * This may not be supported at all by the remote end of the connection,
525         * or may fail for lack of resources, etc,
526         * so we can automatically fall back to the usual call in case of difficulty.
527         * <p>
528         * (We do not record failures as connection/tunnel problems,
529         * though we do note successful calls in favour of the connection status.)
530         *
531         * @param oldAEP  current latest AEP held by the caller
532         * @param allowAutoRecovery  if true then allow fallback to generic AEP fetch method
533         */
534        public AllExhibitProperties getAllExhibitProperties(final AllExhibitProperties oldAEP,
535                                                            final boolean allowAutoRecovery)
536            throws IOException
537            {
538            final long oldHash = (oldAEP == null) ? -1 : oldAEP.longHash;
539    
540            try
541                {
542                // Construct the header.
543                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
544                final DataOutputStream dos = new DataOutputStream(baos);
545                // Write existing AEP longHash, or -1 if no extant AEP.
546                dos.writeLong(oldHash);
547                // Write current maximum supported AEP compression level as a UTF String.
548                dos.writeUTF(MAX_AEP_DIFF_COMP_LEVEL.name());
549                dos.close();
550                final byte data[] = baos.toByteArray();
551                final RawPacket packetOut = new RawPacket(RawPacket.OpCode.GetAllExhibitPropertiesDiff, data);
552    
553                // An empty response (zero bytes) means that our AEP is up-to-date...
554                // The RPC may take a long time to compute the response to, so have a big read timeout.
555                final Pair<Integer, InputStream> stream = doRPCRawWithStreamResponse(packetOut, true);
556                if((stream.first != null) && (stream.first.intValue() == 0))
557                    {
558                    // An empty response means a null AEP response.
559                    stream.second.close();
560                    _noteResult(true); /* Note RPC success. */
561                    return(null);
562                    }
563    
564                // Else read from the stream.
565                //  1) A UTF string for the compression level/type (abort if unsupported).
566                //  2) A compressed object stream (in the appropriate format) which may be:
567                //  2a) AllExhibitProperties, in which case return as-is.
568                //  2b) AllExhibitPropertiesDelta, in which case apply to the input AEP and return.
569                try
570                    {
571                    final DataInputStream dis = new DataInputStream(stream.second);
572                    final CompressionLevel cl = CompressionLevel.valueOf(dis.readUTF());
573                    if(cl.getLevel() > MAX_AEP_DIFF_COMP_LEVEL.getLevel())
574                        { throw new IOException("server returned data compressed at too high a level"); }
575                    final InputStream is = GenUtils.wrapForDecompression(stream.second, cl);
576                    final ObjectInputStream ois = new ObjectInputStream(is);
577                    // Read in the object from the stream on the fly.
578                    // TODO: we may want to add a data pump to keep data flowing during processing.
579                    final Object o = ois.readObject();
580    
581                    // If the result is a full (and hopefully super-compressed) AEP
582                    // then unpack it and return it here.
583                    if(o instanceof AllExhibitProperties)
584                        {
585                        final AllExhibitProperties result = (AllExhibitProperties) o;
586                        _noteResult(true); /* Note RPC success. */
587    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource: successful getAEP(AEP) full: "+cl+".]"); }
588    
589                        // Warn if the remote end of the tunnel seems to have sent us a redundant value,
590                        // ie sent us something we already have with the hash code that we specified.
591                        if((result != null) && (result.longHash == oldHash))
592                            {
593                            final String message = "WARNING: tunnel fetched AEP (supercompressed) with existing hash: " + oldHash;
594                            System.err.println(message);
595                            logger.log(message);
596                            }
597    
598                        return(result);
599                        }
600    
601                    // If the result is a diff then apply it to the input AEP.
602                    if(o instanceof AllExhibitPropertiesDelta)
603                        {
604                        final AllExhibitPropertiesDelta delta = (AllExhibitPropertiesDelta) o;
605                        final AllExhibitProperties result = AllExhibitPropertiesDelta.applyDiff(oldAEP, delta);
606    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource: successful getAEP(AEP) diff: #before/after="+delta.lengthAEIDBefore+"/"+delta.lengthAEIDAfter+", aep before/after="+oldAEP+"/"+result+", "+cl+".]"); }
607    
608    //                    /* TEMPORARY SAFETY MEASURE: REJECT RESULT WITH FEWER EPL/EPC ENTRIES... */
609    //                    if(result.getExhibitPropsLoadableMap().size() < oldAEP.getExhibitPropsLoadableMap().size())
610    //                        { throw new IOException("WARNING: rejecting AEP diff result with fewer EPLs..."); }
611    //                    if(result.getExhibitPropsComputableMap().size() < oldAEP.getExhibitPropsComputableMap().size())
612    //                        { throw new IOException("WARNING: rejecting AEP diff result with fewer EPCs..."); }
613    
614                        _noteResult(true); /* Note RPC success. */
615    
616                        // Warn if the remote end of the tunnel seems to have sent us a redundant value,
617                        // ie sent us something we already have with the hash code that we specified.
618                        if((result != null) && (result.longHash == oldHash))
619                            {
620                            final String message = "WARNING: tunnel fetched AEP (via delta) with existing hash: " + oldHash;
621                            System.err.println(message);
622                            logger.log(message);
623                            }
624    
625                        return(result);
626                        }
627    
628                    throw new IOException("unexpected return type");
629                    }
630                finally { stream.second.close(); /* Free (network) resources. */ }
631                }
632            catch(final InterruptedIOException e)
633                { throw e; } // Rethrow as-is: don't attempt recovery yet...
634            catch(final Exception e)
635                {
636                // Always log the problem/exception.
637                e.printStackTrace();
638    
639                if(!allowAutoRecovery)
640                    {
641                    if(e instanceof IOException) { throw (IOException) e; }
642                    final IOException err = new IOException("aborting after failed AEP diff RPC");
643                    err.initCause(e);
644                    throw err;
645                    }
646    
647                logger.log("ERROR: ExhibitDataTunnelSource: automatically recovering from AEP diff error: "+e.getMessage()+".");
648    
649                // In case of error, automatically fall back to generic hash-arg call.
650                // (Iff allowing auto-recovery...)
651                return(getAllExhibitProperties(oldHash));
652                }
653            }
654    
655        /**Helper method to serialise a single free-standing long value.
656         * The long value is serialised just as DataOutputStream would,
657         * as 8 bytes with the high-byte first.
658         */
659        public static byte[] longSer(final long value)
660            {
661            final byte[] result = new byte[8];
662            result[0] = (byte) (value >> 56);
663            result[1] = (byte) (value >> 48);
664            result[2] = (byte) (value >> 40);
665            result[3] = (byte) (value >> 32);
666            result[4] = (byte) (value >> 24);
667            result[5] = (byte) (value >> 16);
668            result[6] = (byte) (value >>  8);
669            result[7] = (byte) (value      );
670            return(result);
671            }
672    
673        /**Helper method to serialise a single free-standing int value.
674         * The int value is serialised just as DataOutputStream would,
675         * as 4 bytes with the high-byte first.
676         */
677        public static byte[] intSer(final int value)
678            {
679            final byte[] result = new byte[4];
680            result[0] = (byte) (value >> 24);
681            result[1] = (byte) (value >> 16);
682            result[2] = (byte) (value >>  8);
683            result[3] = (byte) (value      );
684            return(result);
685            }
686    
687        /**Gets the general properties as a GenProps object if its timestamp is not that specified.
688         * If the time specified is negative the object will be returned unconditionally.
689         * <p>
690         * If no props are currently installed/available a default set with a zero
691         * timestamp is returned.
692         * <p>
693         * If the caller's copy appears to be up-to-date (eg the oldStamp
694         * matches that that we would have been returned) null is returned.
695         * <p>
696         * On the wire this is (outbound) a 8-byte timestamp and
697         * by return should be an empty payload (corresponding to null)
698         * or a serialised GenProps object (uncompressed for now).
699         * <p>
700         * Because GenProps are so important to the functioning of the entire Gallery,
701         * we always let the request go over the tunnel even if recent calls have failed.
702         */
703        public org.hd.d.pg2k.svrCore.props.GenProps getGenProps(final long oldStamp)
704            throws IOException
705            {
706    //System.err.println("[Fetching GenProps over HTTP tunnel...]");
707    
708            final RawPacket response = doRPCUnguarded(
709                new RawPacket(RawPacket.OpCode.GetGenProps, longSer(oldStamp)));
710    
711            final GenProps result = (GenProps) response.getSerializedObjectPayload();
712    //System.err.println(" [Fetched GenProps over HTTP tunnel.]");
713            return(result);
714            }
715    
716        /**Gets the security properties as a Properties object if its timestamp is not that specified.
717         * If the time specified is negative the object will be returned unconditionally.
718         * <p>
719         * If no props are currently installed/available a default set with a zero
720         * timestamp is returned.
721         * <p>
722         * If the caller's copy appears to be up-to-date (eg the oldStamp
723         * matches that that would have been returned) null is returned.
724         */
725        public java.util.Properties getGenSecProps(final long oldStamp)
726            throws IOException
727            {
728    //System.err.println("[Fetching GenSecProps over HTTP tunnel...]");
729    
730            final RawPacket response = doRPC(
731                new RawPacket(RawPacket.OpCode.GetGenSecProps, longSer(oldStamp)));
732    
733            final Properties result = (Properties) response.getSerializedObjectPayload();
734    //System.err.println(" [Fetched GenSecProps over HTTP tunnel.]");
735            return(result);
736            }
737    
738        /**Gets the thumbnails for an exhibit.
739         * A data source is at liberty to refuse to compute thumbnails
740         * in which case it may return null, else it returns a
741         * non-null value which may include the `could-not-compute'
742         * value to indicate that a thumbnail/sample cannot be made
743         * for this exhibit and no attempt need be made in future.
744         * <p>
745         * Because it is important for a new mirror to gather thumbnails ASAP
746         * to show to a user, we make this call immune to being blocked
747         * when the tunnel connection appears poor.
748         *
749         * @param create  if true, and no thumbnail yet exists, try to
750         *     create one if possible, else only return an existing one
751         */
752        public ExhibitThumbnails getThumbnails(final ExhibitFull name, final boolean create)
753            throws IOException
754            {
755    //System.err.println("[Fetching Thumbnails over HTTP tunnel...]");
756    
757    //        final ByteArrayOutputStream baos =
758    //            new ByteArrayOutputStream(name.length());
759    //        final DataOutputStream dos = new DataOutputStream(baos);
760    //        dos.writeUTF(name.toString());
761    //        dos.writeBoolean(create);
762    //        final byte data[] = baos.toByteArray();
763    //        dos.close(); // Attempt to free some resources quickly.
764    
765            final int nameLength = name.length();
766            final byte[] data = new byte[3 + nameLength];
767            data[0] = (byte) (nameLength >>> 8);
768            data[1] = (byte) nameLength;
769            name.writeToByteArray(data, 2);
770            data[2 + nameLength] = (byte) (create ? 1 : 0);
771    
772            final RawPacket response = doRPCUnguarded(
773                new RawPacket(RawPacket.OpCode.GetThumbnails, data));
774    
775            final ExhibitThumbnails result = (ExhibitThumbnails) response.getSerializedObjectPayload();
776    //System.err.println(" [Fetched Thumbnails over HTTP tunnel.]");
777            return(result);
778            }
779    
780        /**Set variable.
781         * Only non-local values are propagated upstream;
782         * others are explicitly rejected with an UnsupportedOperationException.
783         *
784         * @throws java.io.IOException in case of I/O problems
785         * @throws java.lang.UnsupportedOperationException  when attempting to set locals
786         */
787        public void setVariable(final SimpleVariableValue newValue)
788            throws IOException,
789            UnsupportedOperationException
790            {
791            if(newValue == null) { throw new IllegalArgumentException(); }
792            if(newValue.getDef().isLocal())
793                { throw new UnsupportedOperationException("cannot send local variables over the tunnel"); }
794            // Implement in terms of setVariables().
795            setVariables(new SimpleVariableValue[]{newValue});
796            }
797    
798        /**Adjust globalMap for outgoing (upstream) set operation.
799         * This expects the value passed to be:
800         * <ul>
801         * <li>Non-null
802         * <li>Non-local
803         * <li>Read/write
804         * <li>To have an empty globalMap or one with exactly one entry
805         *     whose key is the system ID for this system;
806         *     if the map is empty then this system is put in the global map
807         * </ul>
808         * <p>
809         * If the argument is in the correct form, it is returned unchanged.
810         * <p>
811         * If the argument is illegal, ie not suitable to send upstream,
812         * then an exception is thrown to veto the send.
813         *
814         * @throws java.lang.IllegalArgumentException  if an invalid argument is passed,
815         *     eg unable to be converted to be sent upstream because it has
816         *     an invalid globalMap
817         */
818        private SimpleVariableValue adjustGlobalMapForSet(final SimpleVariableValue svv)
819            {
820            if(svv == null)
821                { throw new IllegalArgumentException(); }
822    
823            final SimpleVariableDefinition def = svv.getDef();
824            if(def.isLocal() ||
825               def.isReadOnly() ||
826               !SystemVariables.defs.contains(def))
827                { throw new IllegalArgumentException(); }
828    
829            final Map<InstanceID,SimpleVariableValue> globalMap = svv.getGlobalMap();
830            if((globalMap != null) && (globalMap.size() > 1))
831                { throw new IllegalArgumentException(); }
832    
833            // If no globalMap,
834            // then make sure that we make an explicit map entry for this system.
835            final InstanceID sysID = (InstanceID) uniqueClientID.getValue();
836            if(globalMap == null)
837                {
838                // We're done.
839                return(svv.put(sysID, svv, true));
840                }
841    
842            if(!Collections.singleton(sysID).equals(globalMap.keySet()))
843                {
844                throw new IllegalArgumentException("globalMap contains ID that is not ours during set: " + sysID);
845                }
846    
847            // Value is fine as it is, so return it untouched...
848            return(svv);
849            }
850    
851        /**Set variables; must not be null or contain nulls.
852         * Only non-local variables are sent upstream;
853         * locals are silently discarded.
854         * <p>
855         * Variables not defined in SystemVariables are rejected.
856         * <p>
857         * Duplicates may be discarded or rejected,
858         * or sent as is in which case it is undefined in which order they
859         * are applied.
860         * <p>
861         * We try to set all values that we can for
862         * <p>
863         * Format on the wire:
864         * <ul>
865         * <li>Outbound: serialised SimpleVariableValue[]
866         * <li>Inbound: number of values set (int)
867         * </ul>
868         *
869         * @throws java.io.IOException in case of I/O problems
870         */
871        public int setVariables(final SimpleVariableValue[] newValues)
872            throws IOException,
873            UnsupportedOperationException
874            {
875            if(newValues == null) { throw new IllegalArgumentException(); }
876    
877            // Make a set of all non-local (and writable) values to pass upstream.
878            // Preserve order...
879            final int nvLen = newValues.length;
880            final List<SimpleVariableValue> toGo = new ArrayList<SimpleVariableValue>(nvLen);
881            for(int i = 0; i < nvLen; ++i)
882                {
883                final SimpleVariableValue svv = newValues[i];
884    
885                // Reject null values...
886                if(svv == null)
887                    { throw new IllegalArgumentException(); }
888    
889                final SimpleVariableDefinition def = svv.getDef();
890    
891                // Skip local values;
892                // they should not be propagated out of the machine.
893                // Silently ignore this so that any globals can be sent.
894                if(def.isLocal())
895                    { continue; }
896    
897                // Adjust globalMap if necessary for sending upstream.
898                // Veto invalid values with an IllegalArgumentException.
899                toGo.add(adjustGlobalMapForSet(svv));
900                }
901    
902            // If nothing to send, then return immediately.
903            if(toGo.size() == 0)
904                { return(0); }
905    
906            // TODO: eliminate duplicates
907            // TODO: sort into name order to help compression (but preserving order for the same variable)
908    
909            // Send set of values upstream
910            // by putting them into an array and serialising it.
911            final SimpleVariableValue r[] = new SimpleVariableValue[toGo.size()];
912            toGo.toArray(r);
913    
914            // Make the RPC.
915            final RawPacket response = doRPC(new RawPacket(
916                RawPacket.OpCode.SetVariables,
917                r,
918                true)); // Compression may be useful...
919    
920            // Extract return value...
921            if(response.plLength != 4)
922                { throw new IOException("corrupt reply packet: plLength=" + response.plLength); }
923            final DataInputStream dis = new DataInputStream(response.getPayloadAsInputStream());
924            try
925                {
926                final int nSet = dis.readInt();
927                // Validate result returned.
928                if((nSet < 0) || (nSet > nvLen))
929                    { throw new IOException("corrupt reply packet: nSet=" + Integer.toHexString(nSet)); }
930                return(nSet);
931                }
932            finally { dis.close(); }
933            }
934    
935        /**Get variable, or returns null if no such non-local variable.
936         * We attempt only to fetch non-local variables; return null for others
937         * (except for the system ID).
938         * <p>
939         * When asked for the the system ID,
940         * we always answer the question locally.
941         * <p>
942         * Note that the local system ID can be retrieved whether or not
943         * the master server is responding.
944         * <p>
945         * Format on the wire is:
946         * <ul>
947         * <li>Outbound: variable name in UTF format
948         * <li>Inbound: serialised form of SimpleVariableValue (or empty for null)
949         * </ul>
950         *
951         * @throws java.io.IOException  in case of I/O error
952         * @throws UnsupportedOperationException  if a local variable
953         *     other than the system ID is requested
954         */
955        public SimpleVariableValue getVariable(final SimpleVariableDefinition var)
956            throws IOException,
957                   UnsupportedOperationException
958            {
959            if(var == null) { throw new IllegalArgumentException(); }
960    
961            // If the request is for the system ID,
962            // return it here.
963            if(var.equals(SystemVariables.LOCAL_SYS_ID))
964                { return(uniqueClientID); }
965    
966            // Always return null to requests for local data (other than system ID);
967            // local variables should not be propagated over the tunnel.
968            if(var.isLocal())
969                { throw new UnsupportedOperationException("local variables not available at tunnel source"); }
970    
971            // Fetch individual variable value over tunnel...
972            // We only send the name,
973            // and get back a whole variable value,
974            // but on return check for a compatible definition,
975            // returning null if the definitions don't match.
976            final String name = var.getName();
977            final ByteArrayOutputStream baos =
978                new ByteArrayOutputStream(name.length());
979            final DataOutputStream dos = new DataOutputStream(baos);
980            dos.writeUTF(name);
981            final byte data[] = baos.toByteArray();
982            dos.close(); // Attempt to free some resources quickly.
983    
984            final RawPacket request = new RawPacket(
985                RawPacket.OpCode.GetVariable,
986                data);
987    
988            final RawPacket response = doRPC(request);
989            final SimpleVariableValue svv =
990                (SimpleVariableValue) response.getSerializedObjectPayload();
991    
992            // If there was no variable,
993            // then return null immediately.
994            if(svv == null)
995                { return(null); }
996    
997            // Discard incompatible types,
998            // and local variables.
999            if(!var.equals(svv.getDef()))
1000                { return(null); }
1001    
1002            return(svv);
1003            }
1004    
1005        /**Fetch variable values from the master.
1006         * We fetch values from the master
1007         * (from which we eliminate any illegal values such as local variables
1008         * or values which are not locally valid)
1009         * and insert our unique local system ID
1010         * if the request stamp is -1 or older than our creation.
1011         * <p>
1012         * Note that the local system ID can be retrieved whether or not
1013         * the master server is responding providing the stamp is -1,
1014         * though in that case it might be the only value returned.
1015         * The caller should periodically use -1 to ensure that they
1016         * have a full set of global variables, eg in case the server rebooted.
1017         * <p>
1018         * Format on the wire:
1019         * <ul>
1020         * <li>Outbound: serialised timestamp
1021         * <li>Inbound: SimpleVariableValue[]
1022         * </ul>
1023         *
1024         * @throws java.io.IOException
1025         */
1026        public SimpleVariableValue[] getVariables(final long changedSince)
1027            throws IOException
1028            {
1029            final List<SimpleVariableValue> incoming = new ArrayList<SimpleVariableValue>();
1030    
1031            // Fetch all (global) variables over the tunnel.
1032            try
1033                {
1034                final RawPacket response = doRPC(
1035                    new RawPacket(RawPacket.OpCode.GetVariables, longSer(changedSince)));
1036                // Copy incoming array to List...
1037                incoming.addAll(Arrays.asList(
1038                    (SimpleVariableValue[]) response.getSerializedObjectPayload()));
1039                }
1040            catch(final IOException e)
1041                {
1042                // If changedSince == -1 we guarantee to return the local ID,
1043                // else we rethrow the exception...
1044                if(changedSince != -1) { throw e; }
1045                }
1046    
1047            // Remove any extant system ID value,
1048            // and any local variable (should not have come up the wire),
1049            // and any value that does not have an identical local definition.
1050            // We do not expect to find/remove any of these in normal operation.
1051            final SimpleVariableDefinition uCIDDef = uniqueClientID.getDef();
1052            for(int i = incoming.size(); --i >= 0; )
1053                {
1054                final SimpleVariableValue svv =
1055                        incoming.get(i);
1056                final SimpleVariableDefinition def = svv.getDef();
1057                if(def.equals(uCIDDef) ||
1058                   def.isLocal() ||
1059                   !SystemVariables.defs.contains(def) ||
1060                   !def.equals(SystemVariables.nameToDef.get(def.getName())))
1061                    {
1062                    incoming.remove(i);
1063                    continue;
1064                    }
1065                }
1066    
1067            // Insert the system ID variable value into the result if appropriate.
1068            if(changedSince <= uniqueClientID.getTimestamp()) // Handles -1 case...
1069                {
1070                incoming.add(uniqueClientID);
1071                }
1072    
1073            final SimpleVariableValue result[] =
1074                new SimpleVariableValue[incoming.size()];
1075            incoming.toArray(result);
1076            return(result);
1077            }
1078    
1079        /**Get the current partial, or previous full, event set at the specified interval; never null.
1080         * This is a simplified interface to return either the current event set
1081         * that is being collected, or the previous completed set.
1082         * <p>
1083         * The current set is the most timely, but may not contain enough data
1084         * to be meaningful if the new interval has just started.
1085         * <p>
1086         * The previous set is complete and thus most likely to have enough samples
1087         * to be useful, but is not completely current.
1088         * <p>
1089         * Implemented in terms of the more general call
1090         * in the hope that batched calls for several values will be
1091         * more common and more efficient.
1092         * This also reduces the number of distinct RPC calls that
1093         * we have to implement.
1094         *
1095         * @param def  event definition (must be for an event); never null
1096         * @param intervalSelector  one of EVENT_INTERVAL_SELECTOR_xxx values
1097         * @param current  if true the current event set is returned,
1098         *     else the previous complete set is returned
1099         *
1100         * @return  requested event set; may be empty but never null if requested set not available
1101         */
1102        public EventVariableValue getEventValue(final SimpleVariableDefinition def,
1103                                                final EventPeriod intervalSelector,
1104                                                final boolean current)
1105            {
1106            if((def == null) || (intervalSelector == null))
1107                { throw new IllegalArgumentException(); }
1108    
1109            final long now = System.currentTimeMillis();
1110            final long currentIntervalNumber = intervalSelector.getIntervalNumber(now);
1111            final long intervalNumber = (current ? currentIntervalNumber : (currentIntervalNumber-1));
1112    
1113            final EventVariableValue evv[] = getEventValues(def,
1114                                                            intervalSelector,
1115                                                            intervalNumber,
1116                                                            null);
1117    
1118            // If we got some real (non-null) data back, then return it...
1119            if((evv.length > 0) && (evv[0] != null))
1120                { return(evv[0]); }
1121    
1122            // Else get empty non-authoritative return value (with no events)...
1123            return(BasicVarMgr.getEmptyNonAuthEVV(def, intervalSelector, intervalNumber));
1124            }
1125    
1126        /**Get the specified global event sets for the specified intervals; never null.
1127         * This allows retrieval of zero or more event sets for the specified
1128         * interval size.
1129         * <p>
1130         * Requests for more than SystemVariables.EVENT_SAMPLES_RETAINED in the
1131         * past (or for the future!) cannot be satisfied and data will not be
1132         * returned for them.
1133         * <p>
1134         * Usually not more than SystemVariables.EVENT_SAMPLES_RETAINED samples
1135         * will be returned in response to any one request as a safety measure.
1136         * <p>
1137         * We do various small optimisations to reduce fruitless traffic over
1138         * what may be a slow and/or expensive connection:
1139         * <ul>
1140         * <li>This will reject requests will huge sets that go back to far so as
1141         *     to avoid huge RPC arguments and since the request is unlikely to be
1142         *     satisfiable.  The upstream implementation may be even more restrictive.
1143         * <li>This ignores requests too far in the future or the past.
1144         * <li>Trim leading empty slots from request sent over the tunnel.
1145         * </ul>
1146         * <p>
1147         * The responses from the server may be large and slow,
1148         * so we allow streaming to give better incremental CPU/resource consumption
1149         * and better throughput.
1150         * <p>
1151         * Returns an empty result if asked for a 'local' value.
1152         * <p>
1153         * Returns an empty result if the tunnel is currently broken
1154         * or in case of other non-permanent error.
1155         * <p>
1156         * TODO: Optimise RPC call by allowing for sparse requests with different request/response format.
1157         *
1158         * @param def  event definition (must be for an event); never null
1159         * @param intervalSelector  one of EVENT_INTERVAL_SELECTOR_xxx values
1160         * @param intervalNumber  a time (as from System.currentTimeMillis())
1161         *     which identifies the first interval for which data is potentially
1162         *     required; if too far in the past or future then possibly no data
1163         *     will be available,
1164         *     zero is used to access the "all" bucket
1165         * @param whichValues  each true bit represents a slot for which data is
1166         *     required, bit 0 indicating data from the slot within which
1167         *     firstIntervalTime is located, bit 1 the previous slot, etc;
1168         *     null is treated as the common case equivalent to just bit 0 set
1169         *
1170         * @return as many of the requested values as available,
1171         *     at least long enough to return all the available values,
1172         *     with [0] corresponding to bit 0 in the BitSet;
1173         *     may contain nulls or be zero-length (eg in case of error) but is never null
1174         */
1175        public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
1176                                                   final EventPeriod intervalSelector,
1177                                                   final long intervalNumber,
1178                                                   BitSet whichValues)
1179            {
1180            if((def == null) ||
1181                (intervalSelector == null) ||
1182                (intervalNumber < 0))
1183                { throw new IllegalArgumentException(); }
1184    
1185            if(def.isLocal())
1186                { return(NO_EVENT_VALUES); }
1187    
1188            // Don't bother trying this if the connection is currently known to be broken.
1189            if(isBroken()) // { throw new PGMasterNotInServiceException(); }
1190                { return(NO_EVENT_VALUES); }
1191    
1192            // See if we can optimise the whichValues argument by replacing it
1193            // with a null value...
1194            // This will reduce heap churn, data on the wire, etc.
1195            if((whichValues != null) && (whichValues.length() == 1) && whichValues.get(0))
1196                { whichValues = null; }
1197    
1198            // Various optimisations are possible for a null BitSet arg.
1199            final boolean nullWV = (whichValues == null);
1200    
1201            // Take copy of BitSet to ensure that it is minimal size,
1202            // and to reduce thread/race/safety issues.
1203            final BitSet wvCopy = nullWV ? null : new BitSet(whichValues.length());
1204            if(!nullWV) { wvCopy.or(whichValues); }
1205    
1206            final int wvl = nullWV ? 1 : wvCopy.length();
1207    
1208            // Avoid pointless call for no request bits.
1209            if(wvl < 1)
1210                { return(NO_EVENT_VALUES); }
1211    
1212            // Avoid a huge request/response packet.
1213            if(wvl > 2*SystemVariables.EVENT_SAMPLES_RETAINED)
1214                { throw new IllegalArgumentException("request set too large to send over the wire"); }
1215    
1216            // Avoid a call for values too far in the past to be likely to be available.
1217            // (This also ensures that the intervalNumber is large and positive for later.)
1218            // Although we allow a zero value for the "all" bucket.
1219            final long currentInterval = intervalSelector.getIntervalNumber(System.currentTimeMillis());
1220            if(intervalNumber == 0)
1221                {
1222                if(!nullWV && (!whichValues.get(0) || (wvl != 1)))
1223                    { throw new IllegalArgumentException(); }
1224                }
1225            else if(intervalNumber < currentInterval - 2*SystemVariables.EVENT_SAMPLES_RETAINED - 1)
1226                { return(NO_EVENT_VALUES); }
1227    
1228            // Avoid a call for values far too far in the future
1229            // for any skew to account for, etc, so we're sure no data is available.
1230            if(intervalNumber > currentInterval + 4*SystemVariables.EVENT_SAMPLES_RETAINED + 1)
1231                { return(NO_EVENT_VALUES); }
1232    
1233            // Get the offset of the first item in the request...
1234            // If this is greater than zero
1235            // then we can in principle optimise the request/response
1236            // by omitting the leading empty slots.
1237            final int firstItemOffset = nullWV ? 0 : wvCopy.nextSetBit(0);
1238            assert(firstItemOffset >= 0);
1239    
1240            // Reduce the BitSet that we send if the request can be trimmed...
1241            final BitSet wvToSend = (firstItemOffset == 0) ? wvCopy :
1242                (wvCopy.get(firstItemOffset, wvl));
1243    
1244            // Request is:
1245            //   * A byte consisting of the ordinal of period/interval enum ordinal.
1246            //   * The UTF-8 representation of the name of the definition.
1247            //   * The interval number (8 bytes).
1248            //   * The serialised form of the request BitSet if non-null.
1249            // Result is:
1250            //   * The in-order array of EventVariableValues (never null).
1251            final String name = def.getName();
1252            final ByteArrayOutputStream baos = new ByteArrayOutputStream(32 + name.length() + wvl/2);
1253            final DataOutputStream dos = new DataOutputStream(baos);
1254            final byte data[];
1255            try
1256                {
1257                dos.write(intervalSelector.ordinal());
1258                dos.writeUTF(name);
1259                dos.writeLong(intervalNumber - firstItemOffset);
1260                if(wvToSend != null)
1261                    {
1262                    final ObjectOutputStream oos = new ObjectOutputStream(dos);
1263                    oos.writeObject(wvToSend);
1264                    oos.flush();
1265                    }
1266                dos.flush();
1267                data = baos.toByteArray();
1268                dos.close(); // Attempt to free some resources quickly.
1269                }
1270            catch(final Exception e) { throw new Error(e); /* Should never happen. */ }
1271    
1272            final RawPacket request = new RawPacket(
1273                RawPacket.OpCode.GetEventValues,
1274                data);
1275    
1276            // Stream the result, since it may be large enough to be on the wire for a while.
1277            final EventVariableValue[] evvsRaw;
1278            try
1279                {
1280                final Pair<Integer, InputStream> stream = doRPCRawWithStreamResponse(request, false);
1281                try
1282                    {
1283                    final ObjectInputStream ois = new ObjectInputStream(stream.second);
1284                    evvsRaw = (EventVariableValue[]) ois.readObject();
1285                    ois.close();
1286                    }
1287                finally { stream.second.close(); /* Ensure underlying resources are released. */ }
1288    
1289                // Check for unexpected return values.
1290                if(evvsRaw == null)
1291                    { throw new IOException("unexpected null return value"); }
1292    
1293                // If we offset the request then make a copy set back.
1294                final EventVariableValue[] evvs;
1295                if(firstItemOffset != 0)
1296                    {
1297                    evvs = new EventVariableValue[evvsRaw.length + firstItemOffset];
1298                    System.arraycopy(evvsRaw, 0, evvs, firstItemOffset, evvsRaw.length);
1299                    }
1300                else
1301                    {
1302                    // Did not offset the request...
1303                    evvs = evvsRaw;
1304                    }
1305    
1306                // Check for unexpected/incorrect return values.
1307                for(int i = evvs.length; --i >= 0; )
1308                    {
1309                    final EventVariableValue evv = evvs[i];
1310                    if(evv == null) { continue; }
1311                    if(!def.equals(evv.getDef()))
1312                        { throw new IOException("unexpected return value with wrong def"); }
1313                    if(!intervalSelector.equals(evv.getPeriod()))
1314                        { throw new IOException("unexpected return value with wrong period"); }
1315                    if(evv.getIntervalNumber() != intervalNumber - i)
1316                        { throw new IOException("unexpected return value with wrong interval number"); }
1317                    }
1318    
1319                _noteResult(true); // Seems to have succeeded.
1320                return(evvs);
1321                }
1322            // Deal with non-permanent errors as empty result value.
1323    //        catch(final IOException e) { _noteResult(false); throw e; }
1324            catch(final Exception e) { _noteResult(false); return(NO_EVENT_VALUES); }
1325            }
1326    
1327        /**Synchronise variables with upstream values.
1328         * Pushes updated values upstream to the source,
1329         * calls sync on the source if called with the "force" argument true,
1330         * and then retrieves changed values from upstream.
1331         * <p>
1332         * When called with force==true, this acts like a full "memory barrier",
1333         * flushing all write-cached items downstream immediately and afterwards
1334         * getting the value of all upstream values with getVariables(-1),
1335         * but may be expensive in terms of CPU or bandwidth, so use sparingly.
1336         * <p>
1337         * When called with force=false, this returns immediately
1338         * and the operation is not propagated across the tunnel.
1339         * <p>
1340         * In any case, it is rarely the right thing for a casual user
1341         * to vall this as it may be very expensive.
1342         *
1343         * @param force  if true, this will force a full write flush,
1344         *     a full sync upstream,
1345         *     then full read with getVariables(-1),
1346         *     to get the effect of a full "barrier";
1347         *     otherwise, do nothing
1348         *
1349         * @throws IOException if one is received from upstream
1350         */
1351        public void syncVariables(final boolean force)
1352            throws IOException
1353            {
1354            // Do not propagate across tunnel unless "force"d.
1355            if(!force)
1356                { return; }
1357    
1358            // Propagate the "force"d sync upstream,
1359            // and return when done...
1360            doRPC(new RawPacket(RawPacket.OpCode.SyncVariables, EMPTY_PAYLOAD));
1361            }
1362    
1363    
1364        /**Get requested Properties selected by key and versionID.
1365         * Fetches a Properties set unconditionally (versionID == -1)
1366         * else if the versionID presented is not current.
1367         *
1368         * @param key  selector (with possible embedded sub-key)
1369         *     for desired properties set; never null
1370         * @param versionID  if -1 then map is always returned if available,
1371         *     else must be non-negative and null is returned if the versionID
1372         *     presented matches that of the current version
1373         *     (ie if the caller has presumably got the up-to-date version);
1374         *     may be a timestamp or a hash or other value,
1375         *     and by convention is zero only for an empty properties set
1376         *
1377         * @return null, or Properties map guaranteed to contain only
1378         *     String keys and values
1379         */
1380        public java.util.Properties getProperties(final PropsKey key,
1381                                                  final long versionID)
1382            throws IOException
1383            {
1384            throw new IOException("NOT IMPLEMENTED");
1385            }
1386    
1387        /**Returns the (incremented) upstream stratum adjusted to include transit time; never null.
1388         * Returns Stratum.UNKNOWN if the upstream stratum is unknown
1389         * or the upstream instance is already at the maximum stratum.
1390         */
1391        public Stratum getStratum()
1392            throws IOException
1393            {
1394            final long before = System.currentTimeMillis();
1395            final RawPacket response = doRPCUnguarded(
1396                new RawPacket(RawPacket.OpCode.GetStratum, EMPTY_PAYLOAD));
1397            final long after = System.currentTimeMillis();
1398    
1399            // Get raw result from upstream host.
1400            final Stratum upstream = (Stratum) response.getSerializedObjectPayload();
1401            assert(null != upstream);
1402    
1403            // If 'unknown' upstream stratum then return built-in 'unknown' value.
1404            if(upstream.isUnknownStratum()) { return(Stratum.UNKNOWN); }
1405    
1406            // If upstream is already at maximum stratum then this instance must be regarded as unknown stratum.
1407            if(upstream.getStratum() >= Stratum.MAX_STRATUM) { return(Stratum.UNKNOWN); }
1408    
1409            // Sanitised round-trip time.
1410            final int rtt = (int) Math.min(2*(int)Stratum.MAX_ROOT_DELAY, Math.max(0, after - before));
1411    
1412            // Create a new value for this instance with an incremented stratum
1413            // and one half the RTT to our immediate upstream peer added to the root delay.
1414            final Stratum processed = new Stratum(1 + upstream.getStratum(),
1415                                                  (rtt>>1) + upstream.getRootDelay(),
1416                                                  _getStratumUpstreamName(),
1417                                                  upstream.isUpstreamConserving());
1418    
1419            return(processed);
1420            }
1421    
1422        /**Return short unique name of upstream peer/server suitable for Stratum; never null but can be "" for 'unknown'.
1423         * Override this in implementations that know the upstream name.
1424         */
1425        protected String _getStratumUpstreamName()
1426            {
1427            return(""); // Unknown upstream peer/server name.
1428            }
1429    
1430        /**Poll periodically.
1431         * We can use this to attempt to repair ailing connections, etc,
1432         * as well as poll for asynchronous messages being sent to us by the master.
1433         * <p>
1434         * However, our general policy is not to force traffic if we need not.
1435         * <p>
1436         * This can be overridden by a derived class,
1437         * though it is suggested that this be called with super.poll(gp) if so.
1438         */
1439        public void poll(final GenProps gp)
1440            {
1441            if(KEEP_SERVER_CONNECTION_ALIVE)
1442                {
1443                // If we have a broken connection to the master,
1444                // then a little while before other users can retry it,
1445                // we retry it to see if the master is reachable again.
1446                //
1447                // This enables us to repair a connection silently,
1448                // so that a normal caller is never blocked until
1449                // it is repaired (other calls are vetoed quickly).
1450                //
1451                // Our calls are still backed off if the master is really unwell.
1452                //
1453                // This does not try to force an initial connection,
1454                // just repair a failing one.
1455                final Long notUntil = doNotTryMasterUntil;
1456                if((notUntil != null) &&
1457                   (notUntil.longValue() - _REPAIR_RETRY_LEAD_MS < System.currentTimeMillis()))
1458                    {
1459                    try { doNOOP(true); }
1460                    catch(final Exception e)
1461                        {
1462                        // It is possible that our connection problem is caused
1463                        // by a resource leak (eg of unclosed descriptors) elsewhere
1464                        // so try and force come cleanup now.
1465                        //
1466                        // We rely on our normal pacing/backoff of retries
1467                        // to avoid us hurting the system too much if we are wrong!
1468                        if(FORCE_GC_AND_FINALISERS_AFTER_POLL_FAIL)
1469                            {
1470    logger.log("[ExhibitDataTunnelSource: WARNING: failing to re-establish connection so attempting to free resources with gc(), etc...]");
1471                            System.runFinalization();
1472                            System.gc();
1473                            System.runFinalization();
1474                            }
1475                        }
1476                    }
1477                }
1478            }
1479    
1480        /**May attempt to free up outbound connections and/or prevent new ones. */
1481        public void destroy()
1482            {
1483            // Prevent most new outgoing connections.
1484            doNotTryMasterUntil = new Long(Long.MAX_VALUE);
1485            }
1486    
1487    
1488        /**How soon ahead of other callers we try to silently repair a broken connection, ms; non-negative. */
1489        private final int _REPAIR_RETRY_LEAD_MS = 17001 + Rnd.fastRnd.nextInt(3007);
1490    
1491    
1492        /**Make an RPC call over the underlying medium with the given outgoing packet; never null.
1493         * This collects the response packet and will object
1494         * if it sees any IOException
1495         * or if the packets come back with the wrong op code.
1496         * <p>
1497         * This can be locked on the instance lock to serialise RPCs,
1498         * if the tunnel can only usefully handle one call at once.
1499         * <p>
1500         * Must be implemented by the deriving class
1501         * to suit its transmission medium.
1502         *
1503         * @param  packetOut  request packet; never null
1504         * @return response packet; never null
1505         * @throws java.io.IOException  in case of I/O difficulties
1506         */
1507        protected abstract RawPacket doRPCRaw(final RawPacket packetOut)
1508            throws IOException;
1509    
1510        /**Optimised RPC call with the given outgoing packet and returning packet body as an InputStream; never null nor with a null stream.
1511         * This is an <em>optimisation</em> for performance-critical cases <em>only</em>,
1512         * and foregoes some error checking/handling for speed
1513         * (and thus the caller should ensure that it performs integrity checks).
1514         * There is an onus on the streaming-related code to behave safely
1515         * even if fed bogus/corrupt data,
1516         * and it must be able to safely/easily undo any work done
1517         * if the message is found to be bogus as late as the final byte(s).
1518         * <p>
1519         * This streams the content of the response packet
1520         * and will object if it sees any IOException
1521         * or if the packets come back with the wrong op code.
1522         * <p>
1523         * A terminating trailer byte may or may not be visible on the returned stream
1524         * thus allowing the implementation to be as efficient as possible.
1525         * <p>
1526         * This may return after all the input data has been collected,
1527         * or while some or all is still to come,
1528         * and thus the returned stream may fail and throw an exception.
1529         * <p>
1530         * The first element of the result is the length of the response data
1531         * (not including any non-data trailer bytes from the packet even if present)
1532         * but may be null if this length is not known when the packet header is seen,
1533         * eg because the packet body was compressed.
1534         * <p>
1535         * This may be implemented/overridden by the deriving class
1536         * to suit its transmission medium,
1537         * and as an optimisation to reduce copying and allow streaming,
1538         * ie starting to process the input before it is all received.
1539         * <p>
1540         * The data stream is always of uncompressed data,
1541         * regardless of whether the data was sent compressed on the wire,
1542         * ie this routine will correctly decompress data on the fly as/when needed.
1543         * <p>
1544         * The caller <strong>must close</strong> the stream promptly
1545         * to release resources such as file handles and non-Java memory.
1546         *
1547         * @param  packetOut  request packet; never null
1548         * @param allowBigReadTimeout TODO
1549         * @return response  length and data stream; never null
1550         * @throws java.io.IOException  in case of I/O difficulties
1551         */
1552        protected Tuple.Pair<Integer, InputStream> doRPCRawWithStreamResponse(final RawPacket packetOut, final boolean allowBigReadTimeout)
1553            throws IOException
1554            {
1555            // Simple implementation in terms of basic doRPCRaw().
1556            final RawPacket result = doRPCRaw(packetOut);
1557    
1558            // Check for mismatched response packets
1559            // or remote exceptions codes with special response op-codes.
1560            if(result.opCode != packetOut.opCode)
1561                {
1562                // Deal specially with some fixed remote exception types.
1563                switch(result.opCode)
1564                    {
1565                    case PGMNISEX:
1566                        { throw new PGMasterNotInServiceException(); }
1567                    case RUNTEX:
1568                        { throw new RuntimeException("remote threw RuntimeException"); }
1569                    case REMEX:
1570                        { throw new IOException("remote threw RemoteException"); }
1571                    case INTEX:
1572                        { throw new InterruptedIOException("remote operation interrupted"); }
1573                    }
1574                throw new IOException("packet response-type mismatch; got "+result.opCode+" expected "+packetOut.opCode.getCode());
1575                }
1576    
1577            final Tuple.Pair<Integer, InputStream> resultStream =
1578                new Tuple.Pair<Integer, InputStream>(result.plLength, result.getPayloadAsInputStream());
1579            return(resultStream);
1580            }
1581    
1582    
1583        /**Make an RPC call over HTTP[S] with the given outgoing packet.
1584         * This provides error recovery, back-off, stats, etc,
1585         * and serves as a wrapper for the medium-specific doRPCRaw().
1586         * <p>
1587         * Called by all the public data-pipeline methods to transport information
1588         * over the tunnel.
1589         *
1590         * @param  packetOut  request packet; never null
1591         * @return  response packet; never null
1592         * @throws java.io.IOException  in case of I/O difficulties
1593         */
1594        protected RawPacket doRPC(final RawPacket packetOut)
1595            throws IOException
1596            {
1597            // Note RPC request...
1598            StatsLogger.captureDataPoint(statsIDTS, TSNAME_RPCREQUEST);
1599            // Note its type...
1600            StatsLogger.captureDataPoint(statsIDTS, TSNAMEPR_RPCTYPE + ((int) packetOut.opCode.getCode()));
1601    
1602            // If last communication with master failed too recently,
1603            // quickly veto a new connection attempt for now.
1604            if(isBroken())
1605                {
1606    if(_protocolDebug) { System.err.println("[doRPC(): vetoing connection to broken master.]"); }
1607                throw new PGMasterNotInServiceException("no connection to master");
1608                }
1609    
1610            return(doRPCUnguarded(packetOut));
1611            }
1612    
1613        /**Just like doRPC() but does not back off in face of previous failures; never null.
1614         * Can be used for calls that must attempt to contact the master anyway,
1615         * or are probing for it to be alive.
1616         * <p>
1617         * Most callers should use the normal doRPC().
1618         *
1619         * @param packetOut  never null
1620         * @return  response to RPC call; never null
1621         */
1622        protected RawPacket doRPCUnguarded(final RawPacket packetOut)
1623            throws IOException
1624            {
1625            if(_protocolDebug) { System.err.println("[doRPC(): connecting to master.]"); }
1626    
1627            try {
1628                if(_protocolDebug) { System.err.println(" [doRPC(): about to do raw RPC.]"); }
1629    
1630                // Do the raw, medium-specific RPC.
1631                final RawPacket result = doRPCRaw(packetOut);
1632    
1633                if(_protocolDebug) { System.err.println(" [doRPC(): about to check result type.]"); }
1634    
1635                // Check for mismatched response packets
1636                // or remote exceptions codes with special response op-codes.
1637                if(result.opCode != packetOut.opCode)
1638                    {
1639                    // Deal specially with some fixed remote exception types.
1640                    // Note that we don't count a OP__RUNTEX as a success OR failure
1641                    // for the purposes of link monitoring;
1642                    // this may be the master gently rebuffing a request for some reasons.
1643                    switch(result.opCode)
1644                        {
1645                        case PGMNISEX:
1646                            { throw new PGMasterNotInServiceException(); }
1647                        case RUNTEX:
1648                            { throw new RuntimeException("remote threw RuntimeException"); }
1649                        case REMEX:
1650                            { throw new IOException("remote threw RemoteException"); }
1651                        case INTEX:
1652                            { throw new InterruptedIOException("remote operation interrupted"); }
1653                        }
1654                    throw new IOException("packet response-type mismatch; got "+result.opCode+" expected "+packetOut.opCode);
1655                    }
1656    
1657                // Note last successful use.
1658                _noteResult(true);
1659    
1660                if(_protocolDebug) { System.err.println(" [doRPC(): about to return result.]"); }
1661    
1662                // Return result successfully.
1663                return(result);
1664                }
1665            catch(final InterruptedIOException e)
1666                {
1667                // Not an error (nor a success): can retry later.
1668                throw e;
1669                }
1670            catch(final IOException e)
1671                {
1672                // Regard (only) IOException as indicator or some sort of connection problem.
1673    
1674                // Note RPC request failure with IOException...
1675                StatsLogger.captureDataPoint(statsIDTS, TSNAME_RPCIOEX);
1676    
1677                // Note failed connection to the master...
1678                // This will defer our next attempt...
1679                _noteResult(false);
1680    
1681                // Note failure in the logs:
1682                logger.log("ExhibitDataTunnelSource: RPC FAILED connection attempt upstream, failed packet: " + packetOut + ", reason: " + e.getMessage());
1683                final long lastS = lastSuccessfulConnectionTime;
1684                if(lastS > 0)
1685                    { logger.log("[doRPC(): last success at "+(new Date(lastS))+".]"); }
1686                final Long notUntil2 = doNotTryMasterUntil;
1687                if(notUntil2 != null)
1688                    { logger.log("[doRPC(): deferring next attempt until " + (new Date(notUntil2.longValue()))  + ".]"); }
1689    
1690                // Rethrow the error that we encountered...
1691                throw e;
1692                }
1693            }
1694    
1695        /**Minimum time (ms) for which we will hold latest newly-created AEP response; strictly positive.
1696         * A client that automatically retries after an interrupted IO response
1697         * to some sort of AEP request indicating 'already in progress'
1698         * should retry within this time to avoid starting its creation all over again.
1699         * <p>
1700         * Note that in dire straits this lower limit may be ignored.
1701         * <p>
1702         * Should typically be of the order of many minutes.
1703         */
1704        public static final int MIN_AEP_RETENTION_MS = 17 * 60 * 1000;
1705    
1706        /**Cache/lock to improve performance of inbound RPC.
1707         * Meant to be opaque to all but handleInboundRPC()
1708         * and is used to provide a better overall responsiveness,
1709         * especially from expensive calls
1710         * by vetoing concurrent expensive calls
1711         * and by cacheing the responses from especially expensive calls.
1712         * <p>
1713         * Not all possibilities are exploited,
1714         * just those that in practice seem to be important.
1715         * <p>
1716         * Can be registered to free its own content automagically
1717         * if the system is very short of memory.
1718         */
1719        public static final class HIRPCCache implements MemoryTools.RecurrentEmergencyFreeHandle
1720            {
1721            /**Lock to prevent servicing more than one very expensive call at once.
1722             * We do this to reduce strain on the memory/GC and CPU of the server.
1723             */
1724            private final ReentrantLock slowResponseLock = new ReentrantLock();
1725    
1726            /**Time of creation / last use of basic AEP response; initially zero.
1727             * This covers the GZIPped / extra-compressed / delta responses for the latest AEP,
1728             * but no previous AEP values which are not essential to minimal client progress.
1729             */
1730            private volatile long currentAEPResponseCreatedOrLastUsed;
1731    
1732            /**Cache of current AEP as pair of longHash and response packet for simple AEP fetch RPC; mutable.
1733             * Initially null.
1734             * <p>
1735             * Elements of this pair are never null.
1736             * <p>
1737             * Marked volatile for safe multi-threaded access.
1738             * <p>
1739             * Currently held for at least for a minimum period,
1740             * until the AEP fetched has a new hash,
1741             * not for example held via a SoftReference
1742             * since a SoftReference would probably be cleared too soon
1743             * for this to be effective.
1744             * <p>
1745             * This is likely to be many MB in size.
1746             */
1747            private volatile Tuple.Pair<Long, RawPacket> _AEP_response;
1748    
1749            /**Cache of full extra-compressed AEP as tuple of longHash, compression format, and response packet for diff AEP fetch RPC; mutable.
1750             * Initially null.
1751             * <p>
1752             * Elements of this tuple are never null.
1753             * <p>
1754             * Marked volatile for safe multi-threaded access.
1755             * <p>
1756             * Currently held for at least for a minimum period,
1757             * until the AEP fetched has a new hash,
1758             * not for example held via a SoftReference
1759             * since a SoftReference would probably be cleared too soon
1760             * for this to be effective.
1761             * <p>
1762             * This is likely to be many MB in size.
1763             */
1764            private volatile Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> _AEP_extracomp_response;
1765    
1766            /**Cache of extra-compressed AEP diff as tuple of longHash, compression format, and response packet for diff AEP fetch RPC; mutable.
1767             * Initially null.
1768             * <p>
1769             * Elements of this tuple are never null.
1770             * <p>
1771             * Marked volatile for safe multi-threaded access.
1772             * <p>
1773             * Currently held for at least for a minimum period,
1774             * until the AEP fetched has a new hash,
1775             * not for example held via a SoftReference
1776             * since a SoftReference would probably be cleared too soon
1777             * for this to be effective.
1778             * <p>
1779             * This is likely relatively small.
1780             */
1781            private volatile Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> _AEP_diff_response;
1782    
1783            /**Cache of previous AEP value to assist with AEP diffs; initially null.
1784             * Currently held indefinitely (or until very short of free memory),
1785             * until the AEP fetched has a new hash,
1786             * not for example held via a SoftReference
1787             * since a SoftReference would probably be cleared too soon
1788             * for this to be effective.
1789             * <p>
1790             * This should generally share most of its state with the current AEP
1791             * and therefore should not represent a significant extra memory burden.
1792             */
1793            private volatile AllExhibitProperties aepPrev;
1794    
1795            /**Soft cache of older AEP values to assist with AEP diffs; never null.
1796             * This is present to help with requests for diffs against AEPs
1797             * older than the previous value, if any such requests are made.
1798             * <p>
1799             * Thread-safe cache of SoftReferences to older AEPs,
1800             * mapped from the Long hash values.
1801             * <p>
1802             * Map is not auto-clear()ed on memory stress
1803             * other than the individual SoftReferences themselves during GC.
1804             * <p>
1805             * It is possible to synchronise on this instance to exclude other activity.
1806             */
1807            private final MemoryTools.SoftReferenceMap<Long, AllExhibitProperties> prevAEPs =
1808                MemoryTools.SoftReferenceMap.<Long, AllExhibitProperties>create("prevAEPs");
1809    
1810            /**Can be called to purge most or all internal state when very short of memory.
1811             * Implements the emergency-free handler.
1812             * <p>
1813             * May not purge some elements which are not (or likely not) using significant memory
1814             * but usually save a lot of time for us and for clients.
1815             * To that end we try and estimate the space they are taking
1816             * vs actual current free heap space.
1817             */
1818            public void run()
1819                {
1820                // Clear strongly-referenced previous AEP value if very short of free memory.
1821                // Clearing this will prevent us from creating and sending space-efficient AEP deltas.
1822                if((null != aepPrev) && (MemoryTools.percentFreeWithinTarget() <= 5))
1823                    {
1824                    aepPrev = null;
1825                    System.err.println("HIRPCCache: emergency free of previous AEP value");
1826                    }
1827    
1828                // Clear the list of previous AEPs if very short of free memory.
1829                // Note that the softly-referenced content will be released automatically in dire straits.
1830                // We only forcefully clear this if no longer retaining the previous AEP strongly any more.
1831                if(!prevAEPs.isEmpty() && (null == aepPrev))
1832                   {
1833                   prevAEPs.clear();
1834                   System.err.println("HIRPCCache: emergency free of old AEP entries: "+prevAEPs.size());
1835                   }
1836    
1837                // If any basic of the AEP responses is only very recently created or used
1838                // then prevent them from being freed to avoid client starvation.
1839                final long lastUsedBAR = currentAEPResponseCreatedOrLastUsed;
1840                if(lastUsedBAR + MIN_AEP_RETENTION_MS > System.currentTimeMillis())
1841                    { return; }
1842    
1843                final long freeMem = Runtime.getRuntime().freeMemory();
1844    
1845                // This GZIP-compressed response is a bit bulky and shouldn't usually be needed
1846                // unless we are too short of memory to super-compress a response
1847                // and/or a client doesn't have enough memory to receive a super-compressed response.
1848                // We hold onto it grimly for a minimum amount of time
1849                // as being forced to continually regenerate it asynchronously causes starvation
1850                // since this is required for the minimum level of response to support clients.
1851                Tuple.Pair<Long, RawPacket> r = _AEP_response;
1852                if((null != r) && (null != r.second) &&
1853                        (r.second.getFrameLength() >= (freeMem>>1)))
1854                    {
1855                    _AEP_response = null;
1856                    System.err.println("HIRPCCache: emergency free of AEP response "+r.second+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1857                    }
1858                r = null; // Help GC.
1859    
1860                // This extra-compressed response is smaller and saves network bandwidth and is thus more valuable
1861                // and so we'd like to hang onto it if possible.
1862                Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> ecr = _AEP_extracomp_response;
1863                if((null != ecr) && (null != ecr.second) && (null != ecr.second.second) &&
1864                        (ecr.second.second.getFrameLength() >= (freeMem>>1)))
1865                    {
1866                    _AEP_extracomp_response = null;
1867                    System.err.println("HIRPCCache: emergency free of super-compressed AEP response "+ecr.second.second+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1868                    }
1869                ecr = null; // Help GC.
1870    
1871                // This extra-compressed diff response is the smallest and most valuable and most likely to be needed
1872                // and so we'd like to hang onto it if at all possible.
1873                // Though holding onto the delta is potentially more problematic.
1874                Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> dr = _AEP_diff_response;
1875                if((null != dr) && (null != dr.third) &&
1876                        (dr.third.getFrameLength() >= (freeMem>>1)))
1877                    {
1878                    _AEP_diff_response = null;
1879                    System.err.println("HIRPCCache: emergency free of super-compressed AEP diff response "+dr.third+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1880                    }
1881                dr = null; // Help GC.
1882                }
1883            }
1884    
1885        /**Handles input request packet from slave across a tunnel.
1886         * Generates the response packet or throws an exception.
1887         *
1888         * @param reqPacket  the request packet; never null
1889         * @param clientAddr  the tunnel client's address as seen by us;
1890         *     may be null if not applicable or available
1891         * @param cache  if not null, then selected routines with
1892         *     slow/expensive responses will not be serviced concurrently
1893         *     but concurrent calls will be vetoed quickly instead
1894         *     and expensive-to-compute values may be (partially) cached
1895         * @param logger  logging area for warnings, etc; never null.
1896         *
1897         * @return the response packet; never null
1898         *
1899         * @throws java.io.IOException  in case of difficulty upstream
1900         * @throws PGMasterNotInServiceException  if the upstream data source
1901         *     is down/unavailable
1902         */
1903        public static RawPacket handleInboundRPC(final SimpleExhibitPipelineIF source,
1904                                                 final RawPacket reqPacket,
1905                                                 final String clientAddr,
1906                                                 final HIRPCCache cache,
1907                                                 final SimpleLoggerIF logger)
1908            throws IOException,
1909                   PGMasterNotInServiceException
1910            {
1911            if((source == null) || (reqPacket == null))
1912                { throw new IllegalArgumentException(); }
1913    
1914            switch(reqPacket.opCode)
1915                {
1916                // For NO-OP we expect an empty data section
1917                // (we simply ignore it even if not empty)
1918                // and reply with an empty data section.
1919                case NOOP:
1920                    {
1921                    // Create and send the (empty) response packet.
1922                    // We don't attempt to compress this.
1923                    return(new RawPacket(
1924                        RawPacket.OpCode.NOOP,
1925                        EMPTY_PAYLOAD,
1926                        false)); // Don't attempt compression...
1927                    }
1928    
1929                case GetGenProps:
1930                    { return(handleGetGenProps(source, reqPacket)); }
1931    
1932                case GetGenSecProps:
1933                    { return(handleGetGenSecProps(source, reqPacket)); }
1934    
1935                case GetAllExhibitImmutableData:
1936                    { return(handleGetAllExhibitImmutableData(source, reqPacket, cache, logger)); }
1937    
1938                case GetAllExhibitProperties:
1939                    { return(handleGetAllExhibitProperties(source, reqPacket, clientAddr, cache, logger)); }
1940    
1941                case GetAllExhibitPropertiesDiff:
1942                    { return(handleGetAllExhibitPropertiesDiff(source, reqPacket, clientAddr, cache, logger)); }
1943    
1944                case GetThumbnails:
1945                    { return(handleGetThumbnails(source, reqPacket)); }
1946    
1947                case GetStaticAttr:
1948                    { return(handleGetStaticAttr(source, reqPacket)); }
1949    
1950                case GetRawFile:
1951                    { return(handleGetRawFile(source, reqPacket)); }
1952    
1953                case GetVariable:
1954                    { return(handleGetVariable(source, reqPacket)); }
1955    
1956                case SetVariables:
1957                    { return(handleSetVariables(source, reqPacket, clientAddr, logger)); }
1958    
1959                case GetVariables:
1960                    { return(handleGetVariables(source, reqPacket)); }
1961    
1962                case SyncVariables:
1963                    { return(handleSyncVariables(source, reqPacket)); }
1964    
1965                case GetEventValues:
1966                    { return(handleGetEventValues(source, reqPacket)); }
1967    
1968                case GetStratum:
1969                    { return(handleGetStratum(source, reqPacket)); }
1970    
1971                // For unrecognised requests,
1972                // pretend to throw a RuntimeException.
1973                default:
1974                    {
1975                    final String message = "unrecognised opcode " +reqPacket.opCode+ " in request from tunnel user " + clientAddr;
1976                    logger.log(message);
1977                    System.err.println(message);
1978                    return(new RawPacket(
1979                            RawPacket.OpCode.RUNTEX,
1980                            EMPTY_PAYLOAD,
1981                            false));
1982                    }
1983                }
1984            }
1985    
1986    
1987        /**Handle an incoming GetGenSecProps request; never null. */
1988        private static RawPacket handleGetGenSecProps(
1989                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
1990            throws IOException
1991            {
1992            final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
1993                    readLong();
1994            final Properties gsp = source.getGenSecProps(stamp);
1995            // Create and send the response packet
1996            // (empty for null response,
1997            // else serialised form, compressed when helpful).
1998            return(new RawPacket(
1999                RawPacket.OpCode.GetGenSecProps,
2000                gsp,
2001                true)); // Attempt compression...
2002            }
2003    
2004        /**Handle an incoming SyncVariables request; never null. */
2005        private static RawPacket handleSyncVariables(
2006                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2007            throws IOException
2008            {
2009            // No args, no return value.
2010            // The tunnel call is only made when "forced".
2011            source.syncVariables(true);
2012            return(new RawPacket(
2013                RawPacket.OpCode.SyncVariables,
2014                EMPTY_PAYLOAD,
2015                false)); // Compression is not possible.
2016            }
2017    
2018        /**Handle an incoming GetAllExhibitPropertiesDiff request; never null.
2019         * Holds the 'slow response' lock while running
2020         * to exclude other such intensive/slow responses concurrently.
2021         */
2022        private static RawPacket handleGetAllExhibitPropertiesDiff(
2023                final SimpleExhibitPipelineIF source,
2024                final RawPacket reqPacket,
2025                final String clientAddr,
2026                final HIRPCCache cache,
2027                final SimpleLoggerIF logger)
2028            throws IOException, InterruptedIOException
2029            {
2030            // Prime a new empty cache with an extant non-empty AEP
2031            // so that we are likely to be able to service
2032            // the first responses after an AEP change with deltas.
2033            if(cache != null)
2034                {
2035                if((cache.aepPrev == null) || (cache.aepPrev.aeid.length == 0))
2036                    {
2037                    final AllExhibitProperties first = source.getAllExhibitProperties(-1);
2038                    if((null != first) && (first.aeid.length != 0))
2039                        {
2040                        cache.aepPrev = first;
2041                        cache.prevAEPs.put(first.longHash, first);
2042                        }
2043                    }
2044                }
2045    
2046            final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2047            final long oldHash = dis.readLong();
2048            // We read the compression level, but need not parse it yet.
2049            final String compression = dis.readUTF();
2050            AllExhibitProperties _aep =
2051                source.getAllExhibitProperties(oldHash);
2052    logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitPropertiesDiff: client="+clientAddr+" hash="+oldHash+" compression="+compression+" response null/hash="+((null==_aep)?"null":String.valueOf(_aep.longHash)));
2053    
2054            // Correct for expensive aberrant behaviour further up the stack.
2055            // Whine loudly...
2056            if((null != _aep) && (oldHash == _aep.longHash))
2057                {
2058                final String message = "ERROR: ExhibitDataTunnelSource.handleInboundRPC(): non-null return with identical hash "+oldHash+ " from source "+source;
2059                logger.log(message);
2060                System.err.println(message);
2061                _aep = null; // Pretend that no AEP was in fact returned...
2062                }
2063    
2064            // Simple case; nothing to return since the caller is up to date.
2065            if(_aep == null)
2066                {
2067                return(new RawPacket(
2068                                RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2069                                ExhibitDataTunnelSource.EMPTY_PAYLOAD,
2070                                false)); // Compression needed nor possible.
2071                }
2072            final AllExhibitProperties aepFromUpstream = _aep;
2073    
2074            // Full implementation is too expensive sans cache.
2075            if(cache == null)
2076                { throw new IOException("operation not fully implemented due to resource constraints"); }
2077    
2078            // Decode the client's maximum supported compression level.
2079            // This will throw an IllegalArgumentException if unparseable.
2080            final CompressionLevel cl = CompressionLevel.valueOf(compression);
2081    
2082            // If we already have a suitable response cached
2083            // ie for the very same AEP diff with the same old/new hashes.
2084            // then return it immediately.
2085            // We assume therefore that the entire packet will
2086            // be the same for the same response data
2087            // regardless of client, etc.
2088            //
2089            // (This does mean that the caller may miss some
2090            // internally cached updates within the AEP instance, etc,
2091            // but nothing that it cannot recompute if needed.)
2092            //
2093            // Try first for a cached diff/delta.
2094            final Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> cachedDiff = cache._AEP_diff_response;
2095            if((oldHash != -1) &&
2096                (cachedDiff != null) &&
2097                (aepFromUpstream.longHash == cachedDiff.first.longHashAEPAfter) &&
2098                (oldHash == cachedDiff.first.longHashAEPBefore))
2099                {
2100                // If result is compressed at level higher than caller can handle
2101                // then we abort.
2102                // (We could chose to try for the full AEP rather than abort now,
2103                // but it's unlikely to work if this didn't.)
2104                if(cachedDiff.second.getLevel() > cl.getLevel())
2105                    { throw new IOException("compression level too high for client"); }
2106    
2107                // Good, can use cached diff packet...
2108                logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP (diff) delta response packet: "+cachedDiff.third+" to client "+clientAddr+".]");
2109                return(cachedDiff.third);
2110                }
2111    
2112            // If the response is other than null
2113            // and we reject concurrent expensive calls,
2114            // then veto it if another such call is already in progress.
2115            // We allow a very limited wait for the lock.
2116            try
2117                {
2118                if(!cache.slowResponseLock.tryLock(CoreConsts.MAX_INTERACTIVE_DELAY_MS, TimeUnit.MILLISECONDS))
2119                    {
2120    logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitPropertiesDiff: already in progress: aborting...");
2121                    throw new InterruptedIOException("expensive call already in progress, please retry");
2122                    }
2123                }
2124            catch(final InterruptedException e)
2125                {
2126                final InterruptedIOException err = new InterruptedIOException(e.getMessage());
2127                err.initCause(e);
2128                Thread.currentThread().interrupt(); // Don't lose "interrupt" status.
2129                throw err;
2130                }
2131    
2132            // Create and send the response packet
2133            // (empty for null response,
2134            // else serialised form, full or delta/diff, compressed when helpful).
2135            try
2136                {
2137                // Note whatever was the previous AEP that we had sequestered,
2138                // partly to stop it being GCed until we finish handling this RPC.
2139                final AllExhibitProperties oldAepPrev;
2140                // If the AEP has changed then cache it for deltas, etc.
2141                if((null == (oldAepPrev = cache.aepPrev)) || (aepFromUpstream.longHash != oldAepPrev.longHash))
2142                    {
2143                    cache.aepPrev = aepFromUpstream;
2144                    cache.prevAEPs.put(aepFromUpstream.longHash, aepFromUpstream);
2145                    }
2146    
2147                // See if we have the correct cached extra-compressed full AEP.
2148                RawPacket cachedFullAEPResponse = null; // Will null out as soon as known not useful to aid GC.
2149                final Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> cachedFull; // Minimise scope to aid GC.
2150                if(((cachedFull  = cache._AEP_extracomp_response) != null) && (aepFromUpstream.longHash == cachedFull.first.longValue()))
2151                    { cachedFullAEPResponse = cachedFull.second.second; }
2152    
2153                if(oldHash != -1)
2154                    {
2155                    // Caller has supplied their current hash...
2156                    // Look for caller's AEP cached here for us to diff against.
2157                    // (Zap any dead cache entries that we find in passing.)
2158                    AllExhibitProperties callerAEP = null;
2159                    final Long hashKey = Long.valueOf(oldHash);
2160                    final AllExhibitProperties caep = cache.prevAEPs.get(hashKey);
2161                    if(caep != null)
2162                        {
2163                        assert(caep.longHash == oldHash);
2164                        // Good, we have the same AEP that the caller does!
2165                        callerAEP = caep;
2166                        }
2167    
2168    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): old AEP cache size:"+cache.prevAEPs.size()+", caep="+callerAEP+".]"); }
2169                    // Do not hold lock on cache.prevAEPs
2170                    // while trying to create and return diff/delta.
2171                    if(callerAEP != null)
2172                        {
2173                        // Computing a diff (if possible).
2174                        // This generally should not take an enormous amount of working memory.
2175                        try
2176                            {
2177    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): creating AEP diff, caep="+callerAEP+".]"); }
2178                            final long diffStartTime = System.currentTimeMillis();
2179                            // We can do a diff/delta!
2180                            final AllExhibitPropertiesDelta diff =
2181                                AllExhibitPropertiesDelta.createDiff(callerAEP, aepFromUpstream, false);
2182                            final long diffEndTime = System.currentTimeMillis();
2183    
2184                            // Prepare the output packet...
2185                            final CompressionLevel levelUsed = MAX_AEP_DIFF_COMP_LEVEL;
2186                            ByteArrayOutputStream boas = new ByteArrayOutputStream(1 << 10);
2187                            final DataOutputStream dos = new DataOutputStream(boas);
2188                            dos.writeUTF(levelUsed.name());
2189                            final ObjectOutputStream oos = new ObjectOutputStream(GenUtils.wrapForCompression(dos, levelUsed));
2190                            oos.writeObject(diff);
2191                            oos.close();
2192                            final long compEndTime = System.currentTimeMillis();
2193                            RawPacket response = new RawPacket(
2194                                            RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2195                                            boas.toByteArray(),
2196                                            false); // Already compressed.
2197                            boas = null; // Help GC.
2198    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed AEP diff ("+diff+") response packet "+response+", caep="+callerAEP+".]"); }
2199    
2200                            // Abort if we have a full (non-delta) response
2201                            // and the delta seems larger than the full response.
2202                            if((null != cachedFullAEPResponse) &&
2203                               (cachedFullAEPResponse.plLength <= response.plLength))
2204                                { throw new IOException("AEP delta larger than full response"); }
2205    
2206                            // Check that correct client AEP is held.
2207                            assert(callerAEP.longHash == oldHash);
2208                            // Check that we can construct the correct new AEP.
2209                            assert(aepFromUpstream.equals(AllExhibitPropertiesDelta.applyDiff(callerAEP, diff)));
2210    
2211                            // We intern() the (probably large) response
2212                            // to try to avoid holding any duplicates
2213                            // and to reduce old-generation heap churn.
2214                            response = MemoryTools.intern(response);
2215                            cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2216                            cache._AEP_diff_response = new Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel,RawPacket>(diff, levelUsed, response);
2217    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed and cached AEP (diff) delta response packet: "+response+"; times: delta/comp="+(diffEndTime-diffStartTime)+"ms/"+(compEndTime-diffEndTime)+"ms; full: "+cachedFullAEPResponse+"; last full extra compressed response: "+cachedFull+".]"); }
2218    
2219                            // If the result is compressed at a level higher than the caller can handle
2220                            // then we must abort.
2221                            // The client will have to retry with the non-diff AEP call.
2222                            if(levelUsed.getLevel() > cl.getLevel())
2223                                { throw new IOException("compression level too high for client"); }
2224    
2225                            // Return it...
2226                            return(response);
2227                            }
2228                        catch(final DiffException e)
2229                            {
2230    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): could not compute AEP (diff) delta: "+e.getMessage()+".]"); }
2231                            /* Fall through to send full AEP. */
2232                            }
2233                        catch(final Exception e)
2234                            {
2235                            logger.log("ERROR: unexpected exception creating AEP diff: " + e.getMessage());
2236                            /* Fall through to send full AEP. */
2237                            }
2238                        }
2239                    }
2240    
2241                // CANNOT COMPUTE A DIFF...
2242                // So try to return/compute/cache a full response.
2243                if(cachedFullAEPResponse != null)
2244                    {
2245                    // If result is compressed at level higher than caller can handle
2246                    // then we must abort.
2247                    if(cachedFull.second.first.getLevel() > cl.getLevel())
2248                        { throw new IOException("compression level too high for client"); }
2249    
2250                    // Good, can use cached packet...
2251                    logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP (diff) response packet: "+cachedFull.second+" to client "+clientAddr+".]");
2252                    return(cachedFull.second.second);
2253                    }
2254    
2255                // Compute a new full AEP response packet...
2256                // Super-compressed full AEP; null if not (yet) available/computed.
2257    logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computing super-compressed AEP response packet: (longHash="+aepFromUpstream.longHash+") for client "+clientAddr+".]");
2258    
2259                // Discard known-defunct old value possibly allowing GC.
2260                cache._AEP_extracomp_response = null;
2261                cachedFullAEPResponse = null; // Help GC.
2262    
2263                final CompressionLevel levelUsed = MAX_AEP_DIFF_COMP_LEVEL;
2264                // If result would be compressed at level higher than caller can handle
2265                // then we must abort.
2266                if(levelUsed.getLevel() > cl.getLevel())
2267                    { throw new IOException("compression level too high for client"); }
2268    
2269                // Computing a full super-compressed AEP may be very memory intensive.
2270                // We do not assume that unlimited resources are available.
2271                // We estimate the working memory required as a fixed overhead
2272                // plus about twice the estimated uncompressed size of the serialised data
2273                // which allows for compression dictionaries plus the actual output size.
2274                final AtomicReference<RawPacket> responseAR = new AtomicReference<RawPacket>();
2275                if(!MemoryTools.runMemoryIntensiveOperation(new Runnable(){
2276                    public void run()
2277                        {
2278                        // Prepare the output packet...
2279                        final ByteArrayOutputStream boas = new ByteArrayOutputStream(1 << 10);
2280                        final DataOutputStream dos = new DataOutputStream(boas);
2281                        try
2282                            {
2283                            dos.writeUTF(levelUsed.name());
2284                            final ObjectOutputStream oos = new ObjectOutputStream(GenUtils.wrapForCompression(dos, levelUsed));
2285                            oos.writeObject(aepFromUpstream);
2286                            oos.close();
2287                            }
2288                        catch(final IOException e) { throw new Error("unexpected IOException when building super-compressed AEP", e); }
2289                        // We intern() the (probably very large) response
2290                        // to try to avoid holding duplicates
2291                        // and to reduce old-generation heap churn.
2292                        responseAR.set(MemoryTools.intern(new RawPacket(
2293                                        RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2294                                        boas.toByteArray(),
2295                                        false))); // Already compressed.
2296    logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed super-compressed AEP response packet: (length="+boas.size()+", compression="+levelUsed+") for client "+clientAddr+".]");
2297                        }
2298                    },
2299                        false, // Definitely not unlimited resources on the server!
2300                        (1<<20) /* 1MB overhead */ + 2*aepFromUpstream.estimateSerialBytes()))
2301                    { throw new IOException("not currently enough memory to attempt to build super-compressed AEP"); }
2302    
2303                final RawPacket response = responseAR.get();
2304                if(null == response) { throw new IOException("failed to build super-compressed AEP"); }
2305                final Tuple.Pair<CompressionLevel, RawPacket> newFull = new Tuple.Pair<CompressionLevel,RawPacket>(levelUsed, response);
2306                cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2307                cache._AEP_extracomp_response = new Tuple.Pair<Long, Tuple.Pair<CompressionLevel,RawPacket>>(new Long(aepFromUpstream.longHash), newFull);
2308    
2309    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed and cached supecompressed AEP (length="+levelUsed+") response packet: "+newFull+".]"); }
2310    
2311    //                    // If result is compressed at level higher than caller can handle
2312    //                    // then we must abort.
2313    //                    if(tooCompressedForClient)
2314    //                        { throw new IOException("compression level too high for client"); }
2315    
2316                // Return it...
2317                return(response);
2318                }
2319    
2320            finally { cache.slowResponseLock.unlock(); }
2321            }
2322    
2323        /**Handle an incoming GetAllExhibitProperties request; never null.
2324         * Holds the 'slow response' lock while running
2325         * to exclude other such intensive/slow responses concurrently.
2326         */
2327        private static RawPacket handleGetAllExhibitProperties(
2328                final SimpleExhibitPipelineIF source,
2329                final RawPacket reqPacket,
2330                final String clientAddr,
2331                final HIRPCCache cache,
2332                final SimpleLoggerIF logger)
2333            throws IOException
2334            {
2335            final long hash = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2336                    readLong();
2337            final AllExhibitProperties aep = source.getAllExhibitProperties(hash);
2338    
2339            // Simple case; nothing to return since caller is up to date,
2340            // or we allow concurrent expensive calls.
2341            if(aep == null)
2342                {
2343                // Trivial null case.
2344                return(RawPacket.streamSerialiseObject(
2345                    RawPacket.OpCode.GetAllExhibitProperties,
2346                    null));
2347                }
2348            if(cache == null)
2349                {
2350                try
2351                    {
2352                    // Where the AEP is not null, force compressed format,
2353                    // stream-serialised to minimise peak/intermediate memory footprint.
2354                    return(MemoryTools.runMemoryIntensiveOperation((new Callable<RawPacket>(){
2355                        public RawPacket call() throws IOException
2356                            {
2357                            return(RawPacket.streamSerialiseObject(
2358                                    RawPacket.OpCode.GetAllExhibitProperties,
2359                                    aep));
2360                            }
2361                        }),
2362                        aep.estimateSerialBytes())); // Take a guess as to memory space required.
2363                    }
2364                catch(final IOException e) { throw e; }
2365                catch(final RuntimeErrorException e) { throw e; }
2366                catch(final Exception e) { throw new Error("unexpected exception", e); }
2367                }
2368    
2369    
2370            // If we already have a suitable response cached
2371            // ie for the very same AEP logHash
2372            // then return it immediately.
2373            // We assume therefore that the entire packet will
2374            // be the same for the same response data
2375            // regardless of client, etc.
2376            //
2377            // (This does mean that the caller may miss some
2378            // internally cached updates within the AEP instance, etc,
2379            // but nothing that it cannot recompute if needed.)
2380            final Tuple.Pair<Long, RawPacket> cached; // Don't retain value preventing GC...
2381            if(((cached = cache._AEP_response) != null) && (aep.longHash == cached.first.longValue()))
2382                {
2383                // Good, can use cached packet...
2384                assert(cached.second != null);
2385                cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2386                logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP response packet: "+cached.second+" to client "+clientAddr+".]");
2387                return(cached.second);
2388                }
2389    
2390            // If the response is other than null
2391            // and we reject concurrent expensive calls,
2392            // then veto it if another such call is already in progress.
2393            if(!cache.slowResponseLock.tryLock())
2394                {
2395    logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitProperties: already in progress: aborting...");
2396                throw new IOException("expensive call already in progress, please retry");
2397                }
2398            // Create and send the response packet
2399            // (empty for null response,
2400            // else serialised form, compressed when helpful).
2401            try
2402                {
2403                // Clear defunct value while computing the new one.
2404                cache._AEP_response = null;
2405    
2406    if(aep != null) { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): creating/cacheing/returning packet-compressed AEP response packet (longHash="+aep.longHash+") to client "+clientAddr+".]"); }
2407                // Compute the response packet...
2408                // Where the AEP is not null, force compressed format,
2409                // stream-serialised to minimise peak/intermediate memory footprint.
2410                RawPacket response;
2411                try
2412                    {
2413                    // Where the AEP is not null, force compressed format,
2414                    // stream-serialised to minimise peak/intermediate memory footprint.
2415                    response = MemoryTools.runMemoryIntensiveOperation((new Callable<RawPacket>(){
2416                        public RawPacket call() throws IOException
2417                            {
2418                            return(RawPacket.streamSerialiseObject(
2419                                    RawPacket.OpCode.GetAllExhibitProperties,
2420                                    aep));
2421                            }
2422                        }),
2423                        aep.estimateSerialBytes()); // Take a guess as to memory space required.
2424                    }
2425                catch(final IOException e) { throw e; }
2426                catch(final RuntimeErrorException e) { throw e; }
2427                catch(final Exception e) { throw new Error("unexpected exception", e); }
2428    
2429    
2430                // Cache it if the AEP is not null.
2431                if(aep != null)
2432                    {
2433                    // We intern() the (probably very large) response
2434                    // to try to avoid holding duplicates
2435                    // and to reduce old-generation heap churn.
2436                    response = MemoryTools.intern(response);
2437                    cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2438                    cache._AEP_response = new Tuple.Pair<Long,RawPacket>(new Long(aep.longHash), response);
2439    logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): created/cached/returning packet-compressed AEP response packet (longHash="+aep.longHash+", plLength="+response.plLength+") to client "+clientAddr+".]");
2440                    }
2441                // Return it...
2442                return(response);
2443                }
2444            finally { cache.slowResponseLock.unlock(); }
2445            }
2446    
2447        /**Handle an incoming GetGenProps request; never null. */
2448        private static RawPacket handleGetGenProps(
2449                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2450            throws IOException
2451            {
2452            final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2453                    readLong();
2454            final GenProps gp = source.getGenProps(stamp);
2455            // Create and send the response packet
2456            // (empty for null response,
2457            // else serialised form, compressed when helpful).
2458            return(new RawPacket(
2459                RawPacket.OpCode.GetGenProps,
2460                gp,
2461                true)); // Attempt compression...
2462            }
2463    
2464        /**Handle an incoming GetStaticAttr request; never null. */
2465        private static RawPacket handleGetStaticAttr(
2466                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2467            throws IOException
2468            {
2469            final String name = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2470                    readUTF();
2471            final ExhibitStaticAttr esa;
2472            try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2473            catch(final IllegalArgumentException e)
2474                { throw new IOException("invalid argument: bad exhibit name"); }
2475            if(esa == null)
2476                { throw new IOException("invalid argument: non-existent exhibit"); }
2477            // Create and send the response packet
2478            // (empty for null response,
2479            // else serialised form, compressed when helpful).
2480            return(new RawPacket(
2481                RawPacket.OpCode.GetStaticAttr,
2482                esa,
2483                true)); // Allow compression.
2484            }
2485    
2486        /**Handle an incoming GetEventValues request; never null.
2487         * Request is:
2488         * <ul>
2489         * <li>A byte consisting of the ordinal of period/interval enum val.
2490         * <li>The UTF-8 representation of the name of the definition.
2491         * <li>The interval number (8 bytes).
2492         * <li>The serialised form of the request BitSet.
2493         * </ul>
2494         * <p>
2495         * Result is:
2496         * <ul>
2497         * <li>The in-order array of EventVariableValues (never null).
2498         * </ul>
2499         */
2500        private static RawPacket handleGetEventValues(
2501                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2502            throws IOException
2503            {
2504            final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2505            final int ordinal = dis.readUnsignedByte();
2506            final EventPeriod epv[] = EventPeriod.values();
2507            if(ordinal >= epv.length)
2508                { throw new IOException("corrupt arguments: bad period ordinal"); }
2509            final EventPeriod intervalSelector = epv[ordinal];
2510            final String name = dis.readUTF();
2511            final SimpleVariableDefinition def = SystemVariables.nameToDef.get(name);
2512            // Deal kindly with possibly-new definition
2513            // (ie if there are newer clients than us)
2514            // by just returning an empty result.
2515            if(def == null)
2516                {
2517                return(new RawPacket(
2518                    RawPacket.OpCode.GetEventValues,
2519                    NO_EVENT_VALUES,
2520                    true)); // Compression may be effective.
2521                }
2522            if(!def.isEvent() || def.isLocal())
2523                { throw new IOException("corrupt arguments: bad definition type: "+def); }
2524            final long intervalNumber = dis.readLong();
2525            if(intervalNumber < 0)
2526                { throw new IOException("corrupt arguments: bad intervalNumber"); }
2527    
2528            // If there is trailing data then it is the serialised BitSet
2529            // else this is a null BitSet equivalent to just bit-0 true.
2530            final BitSet whichValues;
2531            if(dis.available() != 0)
2532                {
2533                final ObjectInputStream ois = new ObjectInputStream(dis);
2534                final Object o; // = null;
2535                try { o = ois.readObject(); }
2536                catch(final ClassNotFoundException e)
2537                    { throw new IOException("corrupt arguments: class not found: " + e.getMessage()); }
2538                if((o != null) && !(o instanceof BitSet)) // Allow null.
2539                    { throw new IOException("corrupt arguments: bad whichValues class on decode"); }
2540                whichValues = (BitSet) o;
2541                }
2542            else
2543                { whichValues = null; }
2544            assert(dis.available() == 0) : "There must be no excess/trailing data in getEventValues() RPC";
2545    
2546            // Call upstream to actually get any values that we have...
2547            final EventVariableValue result[] =
2548                source.getEventValues(def, intervalSelector, intervalNumber, whichValues);
2549    
2550            // Return the serialised result array.
2551            return(new RawPacket(
2552                RawPacket.OpCode.GetEventValues,
2553                result,
2554                true)); // Compression may be very effective.
2555            }
2556    
2557        /**Handle an incoming GetVariables request; never null. */
2558        private static RawPacket handleGetVariables(
2559                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2560            throws IOException
2561            {
2562            final long changedSince = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2563                    readLong();
2564            // Get values.
2565            final List<SimpleVariableValue> l = new ArrayList<SimpleVariableValue>(Arrays.asList(
2566                source.getVariables(changedSince)));
2567            // Return only non-local variables.
2568            for(final Iterator<SimpleVariableValue> it = l.iterator(); it.hasNext(); )
2569                {
2570                final SimpleVariableValue svv =
2571                        it.next();
2572                if(svv.getDef().isLocal())
2573                    { it.remove(); }
2574                }
2575            // Sort by variable name for better compression on the wire.
2576            Collections.sort(l, SimpleVariableValue.compByDef);
2577            // Convert to array.
2578            final SimpleVariableValue svvs[] =
2579                new SimpleVariableValue[l.size()];
2580            l.toArray(svvs);
2581            return(new RawPacket(
2582                RawPacket.OpCode.GetVariables,
2583                svvs,
2584                true)); // Compression may be very effective.
2585            }
2586    
2587        /**Handle an incoming SetVariables request; never null.
2588         * NOTE: for security/sanity reasons,
2589         *     reject any attempt to set where the value does not
2590         *     have a credible globalMap of exactly size == 1.
2591         * <p>
2592         * We'll validate the entire set of values,
2593         * and reject the whole lot if any are dubious.
2594         */
2595        private static RawPacket handleSetVariables(
2596                final SimpleExhibitPipelineIF source, final RawPacket reqPacket,
2597                final String clientAddr, final SimpleLoggerIF logger)
2598            throws IOException
2599            {
2600            // Decode array of variable values to apply.
2601            final SimpleVariableValue svvs[] = (SimpleVariableValue[])
2602                reqPacket.getSerializedObjectPayload();
2603    
2604            // Get our local ID so that we can veto changes
2605            // claiming to be from us for sanity/security.
2606            // If we get a null (not definition currently)
2607            // this will not cause an exception to be thrown,
2608            // we just won't be able to screen.
2609            final SimpleVariableValue ourID =
2610                source.getVariable(SystemVariables.LOCAL_SYS_ID);
2611            final InstanceID iid = (ourID == null) ? null:
2612                ((InstanceID) ourID.getValue());
2613    
2614            // Validate the requests.
2615            // Silently ignore:
2616            //   * null entries
2617            //   * locals
2618            //   * anything with a definition that we don't have
2619            //   * anything without a globalMap of size 1
2620            //   * TODO: anything claiming to be from our system ID
2621            //   * claims to be setting values from multiple client IDs
2622            //
2623            // We ignore invalid individual entries rather than
2624            // rejecting the entire request for robustness if talking to
2625            // a slightly different age client to eliminate any
2626            // variable value that has no local definition,
2627            // but still apply the rest of the values,
2628            // though we still reject obviously bogus values.
2629            //
2630            // Capture the client system ID in passing
2631            // (each variable value should have a globalMap with
2632            // exactly one entry, which is the client's ID).
2633            final InstanceID clientID = null;
2634            for(int i = svvs.length; --i >= 0; )
2635                {
2636                final SimpleVariableValue svv = svvs[i];
2637    
2638                // Reject obviously bogus values.
2639                if((svv == null) ||
2640                   svv.getDef().isLocal() ||
2641                   (svv.getGlobalMap() == null) ||
2642                        (svv.getGlobalMap().size() != 1))
2643                    { throw new IllegalArgumentException(); }
2644    
2645                try {
2646                    // Quietly ignore variables that we don't (currently)
2647                    // have definitions for locally.
2648                    if(!SystemVariables.defs.contains(svv.getDef()))
2649                        {
2650                        logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") with no local definition");
2651                        continue;
2652                        }
2653    
2654                    // Get the system that *is* in the globalMap...
2655                    final InstanceID putativeClientID =
2656                        svv.getGlobalMap().keySet().iterator().next();
2657                    // Abort if it is our ID!
2658                    if(clientID == null)
2659                        {
2660                        // OK, this must be from the first variable,
2661                        // so trigger some one-off processing...
2662    
2663                        // Discard obviously-forged / looped values.
2664                        if((iid != null) &&
2665                            iid.equals(putativeClientID))
2666                            {
2667                            logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") with bad/looped IID: "+iid);
2668                            continue;
2669                            }
2670    
2671                        // Note the client's remote address
2672                        // (ie make an entry in an upstream global Map)...
2673                        final SimpleVariableValue newValue = new SimpleVariableValue(
2674                                                    SystemVariables.TunnelServlet_SLAVE_ADDRS,
2675                                                    clientAddr);
2676                        source.setVariable(newValue.put(putativeClientID,
2677                                                        newValue,
2678                                                        true));
2679                        }
2680                    else
2681                        {
2682                        if(!clientID.equals(putativeClientID))
2683                            {
2684                            logger.log("Stopping before accepting tunnelled setVariable("+svv.getDef()+"): client claiming multiple InstanceIDs!");
2685                            break; // Give up now...
2686                            }
2687                        }
2688                    }
2689                catch(final IllegalArgumentException e)
2690                    {
2691                    logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") that got IllegalArgumentException: " + e.getMessage());
2692                    continue;
2693                    }
2694                catch(final UnsupportedOperationException e)
2695                    {
2696                    logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") that got UnsupportedOperationException: " + e.getMessage());
2697                    continue;
2698                    }
2699                }
2700    
2701            // Apply the values, in the order received...
2702            final int nSet = source.setVariables(svvs);
2703    
2704            // Create and send the response packet.
2705            return(new RawPacket(
2706                RawPacket.OpCode.SetVariables,
2707                intSer(nSet),
2708                false)); // Don't try compression; none to be had...
2709            }
2710    
2711        /**Handle an incoming GetVariable request; never null. */
2712        private static RawPacket handleGetVariable(
2713                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2714            throws IOException
2715            {
2716            final String name = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2717                    readUTF();
2718            // Look up variable definition from name.
2719            final SimpleVariableDefinition def =
2720                SystemVariables.nameToDef.get(name);
2721            // If no such variable,
2722            // or it is a local,
2723            // then immediately return an empty result.
2724            if((def == null) || def.isLocal())
2725                {
2726                return(new RawPacket(
2727                    RawPacket.OpCode.GetVariable,
2728                    EMPTY_PAYLOAD,
2729                    false)); // No compression...
2730                }
2731            // Get the variable value (if any)...
2732            // Note that this may include a big globalMap.
2733            final SimpleVariableValue svv =
2734                source.getVariable(def);
2735            // Create and send the response packet
2736            // (empty for null response,
2737            // else serialised form, compressed when helpful).
2738            return(new RawPacket(
2739                RawPacket.OpCode.GetVariable,
2740                svv,
2741                true)); // Allow compression.
2742            }
2743    
2744        /**If true then never pass through a request to cache content locally.
2745         * Content may be cached anyway,
2746         * but avoiding passing through a cache request here may help break request 'loops'
2747         * and/or conserve master cache space for example.
2748         */
2749        private static final boolean BLOCK_RAW_FILE_CACHEING = true;
2750    
2751        /**Handle an incoming GetRawFile request; never null.
2752         * We satisfy the initial portion of over-length requests.
2753         * <p>
2754         * FIXME: decide whether to honour a false dontCache flag or not
2755         */
2756        private static RawPacket handleGetRawFile(
2757                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2758            throws IOException
2759            {
2760            // Validate for reasonable-size request packet.
2761            if(reqPacket.plLength < 11 + ExhibitName.MIN_NAME_LENGTH)
2762                { throw new IOException("invalid argument: bad exhibit name"); }
2763    
2764            // Decode the request directly from the payload (without a copy operation if possible).
2765            final byte[] payload = reqPacket.getPayload();
2766    
2767            // First the exhibit name as-if a UTF String value (but we know it to be all 7-bit).
2768            final int nameLen = (((payload[0] & 0xff) << 8) + (payload[1] & 0xff));
2769            if((nameLen < ExhibitName.MIN_NAME_LENGTH) || (nameLen > ExhibitName.MAX_NAME_LENGTH))
2770                { throw new IOException("invalid argument: bad exhibit name length"); }
2771            final CharSequence name = new WrappedByteArrayCharSequence(payload, 2, nameLen);
2772            // Look up (validate) name
2773            // and efficiently convert to (or get) Name.ExhibitFull
2774            // and get size bound
2775            // all in one step...
2776            final ExhibitStaticAttr esa;
2777            try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2778            catch(final IllegalArgumentException e)
2779                { throw new IOException("invalid argument: bad exhibit name"); }
2780            if(esa == null)
2781                { throw new IOException("invalid argument: non-existent exhibit"); }
2782            final int start =
2783                ((payload[2 + nameLen]       ) << 24) +
2784                ((payload[3 + nameLen] & 0xff) << 16) +
2785                ((payload[4 + nameLen] & 0xff) <<  8) +
2786                ((payload[5 + nameLen] & 0xff)      );
2787            final int afterEndRaw =
2788                ((payload[6 + nameLen]       ) << 24) +
2789                ((payload[7 + nameLen] & 0xff) << 16) +
2790                ((payload[8 + nameLen] & 0xff) <<  8) +
2791                ((payload[9 + nameLen] & 0xff)      );
2792            // Coerce afterEnd into range, and allow zero-length requests.
2793            final int afterEnd = (int) Math.min(afterEndRaw, esa.length);
2794            final boolean dontCache = BLOCK_RAW_FILE_CACHEING || (payload[10 + nameLen] != 0);
2795    
2796            // Disallow patently absurd values, eg due to request-packet corruption.
2797            if((start < 0) || (afterEnd < start))
2798                { throw new IOException("invalid argument: bad range"); }
2799            final int len = afterEnd - start;
2800            if(len > MAX_USER_READ_SIZE)
2801                { throw new IOException("invalid argument: request too large"); }
2802    
2803            // Fetch the data from upstream...
2804            final byte rawData[] = new byte[len];
2805            final ByteBuffer buf = ByteBuffer.wrap(rawData);
2806            // Fetch whatever we quickly can.
2807            source.getRawFile(buf, esa.getExhibitFullName(), start, dontCache);
2808            // Flip the buffer to nominally switch to reading from it.
2809            buf.flip();
2810    
2811            // Create and send the response packet containing raw exhibit data.
2812            //
2813            // Useful compression may be possible for some exhibit fragments/types,
2814            // but here we are slightly uncomfortable with
2815            // the potential vulnerability to invisible corruption of binary data
2816            // on the core (and non-re-compressible) exhibit type (JPEG)
2817            // with no application-level checksum on these frames,
2818            // so we simply avoid trying to compress *.jpg file data for now.
2819            return(new RawPacket(
2820                RawPacket.OpCode.GetRawFile,
2821                rawData, buf.limit(),
2822                !TextUtils.endsWith(esa.getCharSequence(), ".jpg")));
2823            }
2824    
2825        /**Handle an incoming GetThumbnails request; never null. */
2826        private static RawPacket handleGetThumbnails(
2827                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2828            throws IOException
2829            {
2830            // Validate for reasonable-size request packet.
2831            if(reqPacket.plLength < 3 + ExhibitName.MIN_NAME_LENGTH)
2832                { throw new IOException("invalid argument: bad exhibit name"); }
2833            // Decode the request directly from the payload (without a copy operation if possible).
2834            final byte[] payload = reqPacket.getPayload();
2835            // First the exhibit name as-if a UTF String value (but we know it to be all 7-bit).
2836            final int nameLen = (((payload[0] & 0xff) << 8) + (payload[1] & 0xff));
2837            if((nameLen < ExhibitName.MIN_NAME_LENGTH) || (nameLen > ExhibitName.MAX_NAME_LENGTH))
2838                { throw new IOException("invalid argument: bad exhibit name length"); }
2839            final CharSequence name = new WrappedByteArrayCharSequence(payload, 2, nameLen);
2840            final boolean create = (payload[2 + nameLen] != 0);
2841    
2842    //        final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2843    //        final String name = dis.readUTF();
2844    //        final boolean create = dis.readBoolean();
2845    //        dis.close(); // Hopefully free some resources quickly.
2846    
2847            final ExhibitStaticAttr esa;
2848            try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2849            catch(final IllegalArgumentException e)
2850                { throw new IOException("invalid argument: bad exhibit name"); }
2851            if(esa == null)
2852                { throw new IOException("invalid argument: non-existent exhibit"); }
2853    
2854            final ExhibitThumbnails eth =
2855                source.getThumbnails(esa.getExhibitFullName(), create);
2856    
2857            // Create and send the response packet
2858            // (empty for null response, else serialised form).
2859            // We allow compression because it may save a little bandwidth.
2860            return(new RawPacket(
2861                RawPacket.OpCode.GetThumbnails,
2862                eth,
2863                true)); // Allow compression to be attempted, though probably marginal at best.
2864            }
2865    
2866        /**Handle an incoming GetAllExhibitImmutableData request; never null.
2867         * Holds the 'slow response' lock while running
2868         * to exclude other such intensive/slow responses concurrently.
2869         */
2870        private static RawPacket handleGetAllExhibitImmutableData(
2871                final SimpleExhibitPipelineIF source, final RawPacket reqPacket,
2872                final HIRPCCache cache, final SimpleLoggerIF logger)
2873            throws IOException
2874            {
2875            final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2876                    readLong();
2877            final AllExhibitImmutableData aeid =
2878                source.getAllExhibitImmutableData(stamp);
2879    
2880            // Simple case; nothing to return since caller is up to date,
2881            // or we allow concurrent expensive calls.
2882            if((aeid == null) || (cache == null))
2883                {
2884                return(new RawPacket(
2885                    RawPacket.OpCode.GetAllExhibitImmutableData,
2886                    aeid,
2887                    true)); // Allow compression.
2888                }
2889    
2890            // If the response is other than null
2891            // and we reject concurrent expensive calls,
2892            // then veto it if another such call is already in progress.
2893            if(!cache.slowResponseLock.tryLock())
2894                {
2895    logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitImmutableData: already in progress: aborting...");
2896                throw new IOException("expensive call already in progress, please retry");
2897                }
2898            // Create and send the response packet
2899            // (empty for null response,
2900            // else serialised form, compressed when helpful).
2901            try
2902                {
2903                return(new RawPacket(
2904                    RawPacket.OpCode.GetAllExhibitImmutableData,
2905                    aeid,
2906                    true)); // Allow compression.
2907                }
2908            finally { cache.slowResponseLock.unlock(); }
2909            }
2910    
2911        /**Handle an incoming GetStratum request; never null. */
2912        private static RawPacket handleGetStratum(
2913                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2914            throws IOException
2915            {
2916            final Stratum stRaw = source.getStratum();
2917    
2918            // Just before returning the Stratum info,
2919            // generate an amended value to reflect our conservation status if necessary
2920            // so that it's logically right 'on the wire'.
2921            final Stratum st;
2922            if(GenUtils.mustConservePower() == stRaw.isUpstreamConserving())
2923                { st = stRaw; } // Will do as-is!
2924            else
2925                {
2926                // Flip conserving flag to correctly reflect this instance's state as the 'upstream' server.
2927                st = new Stratum(stRaw.getStratum(), stRaw.getRootDelay(), stRaw.getUpstreamName(),
2928                                 !stRaw.isUpstreamConserving());
2929                }
2930    
2931            // Create and send the response packet.
2932            return(new RawPacket(
2933                RawPacket.OpCode.GetStratum,
2934                st,
2935                false)); // Don't attempt compression (probably won't work and could inflate RTT estimates).
2936            }
2937    
2938    
2939        /**Immutable raw packet to send in either direction over a byte stream connection.
2940         * This consists of an op-code byte,
2941         * a byte-array payload,
2942         * and a trailer byte (different to any op-code byte).
2943         * <p>
2944         * We informally attempt to keep a decent Hamming distance
2945         * between different op-codes (ie more than one bit at a time
2946         * should have to change to mutate one into another).
2947         * <p>
2948         * The byte array can be sent compressed (using the gzip
2949         * algorithm), and this compression is adaptive, ie if
2950         * compression does not save space then it is not used.
2951         * The assumption if compression is requested is that it
2952         * is worthwhile burning lots of CPU cycles to compress
2953         * the data as far as possible, and so we may attempt very
2954         * CPU-hungry settings.  Compressed data is sent on the
2955         * wire with the length actually -compressedDataLength.
2956         * <p>
2957         * NOTE: this is only guaranteed immutable if the payload is not tampered with
2958         * after being passed to the constructor.
2959         */
2960        public static final class RawPacket implements MemoryTools.Internable
2961            {
2962            /**Operation code: NO-OP (no operation). */
2963            public static final byte OP_NOOP = 0;
2964    
2965            /**Operation code: getGenProps(). */
2966            public static final byte OP_getGenProps = 3;
2967    
2968            /**Operation code: getAllExhibitImmutableData(). */
2969            public static final byte OP_getAllExhibitImmutableData = 5;
2970    
2971            /**Operation code: getStaticAttr(). */
2972            public static final byte OP_getStaticAttr = 9;
2973    
2974            /**Operation code: getRawFile(). */
2975            public static final byte OP_getRawFile = 10;
2976    
2977            /**Operation code: getGenSecProps(). */
2978            public static final byte OP_getGenSecProps = 13;
2979    
2980            /**Operation code: getAllExhibitProperties(). */
2981            public static final byte OP_getAllExhibitProperties = 17;
2982            /**Operation code: getAllExhibitPropertiesDiff(). */
2983            public static final byte OP_getAllExhibitPropertiesDiff = 18;
2984    
2985            /**Operation code: getThumbnails(). */
2986            public static final byte OP_getThumbnails = 23;
2987    
2988            /**Operation code: getVariable(). */
2989            public static final byte OP_getVariable = 26;
2990    
2991            /**Operation code: getVariables(). */
2992            public static final byte OP_getVariables = 29;
2993    
2994            /**Operation code: setVariables(). */
2995            public static final byte OP_setVariables = 30;
2996    
2997            /**Operation code: syncVariables(). */
2998            public static final byte OP_syncVariables = 33;
2999    
3000    //        /**Operation code: getEventValue(). */
3001    //        public static final byte OP_getEventValue = 38;
3002    
3003            /**Operation code: getEventValues(). */
3004            public static final byte OP_getEventValues = 41;
3005    
3006            /**Operation code: getStratum(). */
3007            public static final byte OP_getStratum = 46;
3008    
3009    
3010            /**Reserved op-code for use in response packets to indicate that the operation was interrupted and can be retried (not an error). */
3011            public static final byte OP__INTEX = 119;
3012    
3013            /**Reserved op-code for use in response packets to indicate a PGMasterNotInServiceException. */
3014            public static final byte OP__PGMNISEX = 121;
3015    
3016            /**Reserved op-code for use in response packets to indicate a RemoteException. */
3017            public static final byte OP__REMEX = 125;
3018    
3019            /**Reserved op-code for use in response packets to indicate a RuntimeException.
3020             * Receipt of one of these DOES NOT indicate a link failure
3021             * (and thus need to back off use of the link).
3022             */
3023            public static final byte OP__RUNTEX = 126;
3024    
3025            public static enum OpCode
3026                {
3027                NOOP(OP_NOOP),
3028                GetGenProps(OP_getGenProps),
3029                GetAllExhibitImmutableData(OP_getAllExhibitImmutableData),
3030                GetStaticAttr(OP_getStaticAttr),
3031                GetRawFile(OP_getRawFile),
3032                GetGenSecProps(OP_getGenSecProps),
3033                GetAllExhibitProperties(OP_getAllExhibitProperties),
3034                GetAllExhibitPropertiesDiff(OP_getAllExhibitPropertiesDiff),
3035                GetThumbnails(OP_getThumbnails),
3036                GetVariable(OP_getVariable),
3037                GetVariables(OP_getVariables),
3038                SetVariables(OP_setVariables),
3039                SyncVariables(OP_syncVariables),
3040                GetEventValues(OP_getEventValues),
3041                GetStratum(OP_getStratum),
3042                INTEX(OP__INTEX),
3043                PGMNISEX(OP__PGMNISEX),
3044                REMEX(OP__REMEX),
3045                RUNTEX(OP__RUNTEX);
3046    
3047                private OpCode(final byte code) { this.code = code; }
3048    
3049                /**The byte code; normal values are small and positive.
3050                 * We try to maintain a good Hamming distance between any two codes.
3051                 */
3052                private final byte code;
3053    
3054                /**Get the "closeness" factor higher meaning closer; strictly positive. */
3055                public final byte getCode() { return(code); }
3056    
3057                /**Constant-time lookup from (unsigned) byte value to enum.
3058                 * Created on first use.
3059                 */
3060                private static final class LookupCache
3061                    {
3062                    private LookupCache() { /* No instances needed. */ }
3063                    static final OpCode lookup[] = new OpCode[256];
3064                    static
3065                        {
3066                        for(final OpCode oc : OpCode.values())
3067                            {
3068                            final int index = oc.getCode() & 0xff;
3069                            assert(lookup[index] == null) : "all OpCode values must be unique";
3070                            lookup[index] = oc;
3071                            }
3072                        }
3073                    }
3074    
3075                /**Look up from byte to enum value, else null if no matching enum value. */
3076                public static OpCode lookupCode(final byte code)
3077                    {
3078                    return(LookupCache.lookup[code & 0xff]); // Constant-time lookup.
3079    //                // Above should be equivalent to this (slow) linear lookup...
3080    //                for(final OpCode oc : OpCode.values())
3081    //                    { if(oc.code == code) { return(oc); } }
3082    //                return(null); // None found.
3083                    }
3084                }
3085    
3086    
3087            /**Trailer byte; different to all valid op-code byte values. */
3088            public static final byte TRAILER = (byte) 0xda;
3089            /**Verify that TRAILER is indeed different not a valid op-code. */
3090            static { assert(null == OpCode.lookupCode(TRAILER)); }
3091    
3092    
3093            /**Minimum size of payload for which we will try heaviest compression modes; strictly positive.
3094             * This is for modes such as BZIP2 or LZMA with heavy memory, CPU and
3095             * start-up costs (ie much heavier than GZIP).
3096             * <p>
3097             * Below this we assume that the absolute bandwidth savings
3098             * are not worth the pain.
3099             * <p>
3100             * Probably bigger than a "bulk transfer" block too,
3101             * so that the heavy guns are wielded only for the biggest frames.
3102             */
3103            public static final int MIN_PLLENGTH_FOR_HEAVY_COMP_ALGS = 64 * 1024;
3104    
3105            /**Opcode (single byte); never null.
3106             * Set first in the raw frame on the wire.
3107             * <p>
3108             * Never numerically equal to TRAILER.
3109             */
3110            public final OpCode opCode;
3111    
3112            /**Frame overhead (header and trailer) in bytes; strictly positive. */
3113            public static final int FRAME_OVERHEAD_BYTES = 6;
3114    
3115            /**Maximum permitted total frame length including header/trailer; strictly positive power of two minus the frame overhead.
3116             * Set much larger than the user-level transfer-size limit
3117             * to allow for some control data overhead on top of the given data,
3118             * and for much larger non-user-controlled items such as the AEP.
3119             * <p>
3120             * This limit mainly exists to prevent DoS-style out-of-memory problems
3121             * arising from corrupt (huge) length values.
3122             */
3123            public static final int MAX_FRAME_SIZE = 128*1024*1024;
3124    
3125            /**Maximum permitted payload length; strictly positive power of two minus the frame overhead.
3126             * Set much larger than the user-level transfer-size limit
3127             * to allow for some control data overhead on top of the given data,
3128             * and for much larger non-user-controlled items such as the AEP.
3129             * <p>
3130             * This mainly exists to prevent DoS-style out-of-memory problems
3131             * arising from corrupt (huge) length values.
3132             */
3133            public static final int MAX_PAYLOAD_SIZE = MAX_FRAME_SIZE - FRAME_OVERHEAD_BYTES;
3134            /**Check that we really do allow room for the maximum user-data transfer and then some. */
3135            static { assert(MAX_PAYLOAD_SIZE > 1024+SimpleExhibitPipelineIF.MAX_USER_READ_SIZE); }
3136    
3137            /**Payload length, send second; non-negative. */
3138            public final int plLength;
3139    
3140            /**Payload data, non-null unless the data is stored compressed.
3141             * Private in order to keep the whole packet immutable.
3142             */
3143            private final byte payload[];
3144    
3145            /**Compressed payload data, null unless the data is stored compressed.
3146             * Private in order to keep the whole packet immutable.
3147             */
3148            private final byte payloadCompressed[];
3149    
3150            /**Trailer byte; not a valid op-code. */
3151            public static final byte trailer = TRAILER;
3152    
3153            /**Depends on the whole packet being identical.
3154             * This depends on the header and data being identical,
3155             * including being compressed in the same way if at all.
3156             */
3157            @Override public boolean equals(final Object obj)
3158                {
3159                if(obj == this) { return(true); }
3160                if(!(obj instanceof RawPacket)) { return(false); }
3161                final RawPacket other = (RawPacket) obj;
3162    
3163                if(opCode != other.opCode) { return(false); }
3164                if(plLength != other.plLength) { return(false); }
3165    
3166                // Checking the data for equality may be very slow...
3167                if(!Arrays.equals(payloadCompressed, other.payloadCompressed)) { return(false); }
3168                if(!Arrays.equals(payload, other.payload)) { return(false); }
3169    
3170                return(true); // Yes, identical.
3171                }
3172    
3173            /**Checks for equivalent raw packets, ignoring internal representation details.
3174             * Mainly this means ignoring whether the data is held compressed or not,
3175             * as long as the values that the user can see are identical.
3176             */
3177            public boolean equivalentTo(final RawPacket other)
3178                {
3179                if(opCode != other.opCode) { return(false); }
3180                if(plLength != other.plLength) { return(false); }
3181    
3182                // Check that the data appears identical to the user.
3183                return(Arrays.equals(getPayloadCopy(), other.getPayloadCopy()));
3184                }
3185    
3186            /**Hash is based on just packet type and (uncompressed) packet length. */
3187            @Override public int hashCode()
3188                { return(((opCode.getCode() * 69069) + plLength)); }
3189    
3190            /**Construct a new raw packet with the whole data array as the payload.
3191             * Note that the data is <em>not</em> copied (nor modified);
3192             * the array should not subsequently be modified.
3193             * <p>
3194             * This assumes that it is worth trying to compress the payload,
3195             * if other conditions support it.
3196             *
3197             * @param op  the op-code of the RPC; one of the OP_XXX values
3198             * @param data  the payload bytes; never null
3199             */
3200            public RawPacket(final OpCode op, final byte data[])
3201                { this(op, data, true); }
3202    
3203            /**Construct a new raw packet with the whole data array as the payload.
3204             * Note that the data is <em>not</em> copied (nor modified);
3205             * the array should not subsequently be modified.
3206             *
3207             * @param op  the op-code of the RPC; one of the OP_XXX values
3208             * @param data  the payload bytes; never null
3209             * @param attemptCompression  if true, it is probably worth attempting
3210             *     to compress the payload data unless very short
3211             */
3212            public RawPacket(final OpCode op, final byte data[],
3213                             final boolean attemptCompression)
3214                { this(op, data, data.length, attemptCompression); }
3215    
3216            /**Construct a new raw packet with the initial portion of the data array as the payload.
3217             * Note that the data is <em>not</em> copied (nor modified),
3218             * unless it is compressed or the data does not occupy the whole array;
3219             * the array should not subsequently be modified.
3220             * <p>
3221             * If compression is requested and is possible
3222             * then the compression may be performed at construction time
3223             * and the compressed form of the data held to save space.
3224             *
3225             * @param op  the op-code of the RPC; one of the OP_XXX values
3226             * @param data  the payload bytes; never null
3227             * @param initialPortion  the initial portion of data[] containing
3228             *     payload data; never negative or larger than data.length or MAX_PAYLOAD_SIZE
3229             * @param attemptCompression  if true, it is probably worth attempting
3230             *     to compress the payload data unless very short
3231             */
3232            public RawPacket(final OpCode op, final byte data[], final int initialPortion,
3233                             final boolean attemptCompression)
3234                {
3235                if((op == null) ||
3236                   (data == null) ||
3237                   (initialPortion > MAX_PAYLOAD_SIZE) ||
3238                   (initialPortion > data.length) || (initialPortion < 0))
3239                    { throw new IllegalArgumentException(); }
3240    
3241                // Capture the opcode and the raw payload length immediately.
3242                opCode = op;
3243                plLength = initialPortion;
3244    
3245                // See if the data needs compressing and can be compressed.
3246                // If not, then keep it uncompressed.
3247                //
3248                // If compression is requested and the payload size is significant
3249                // (allowing for for data overheads of the compression scheme)
3250                // then check out the possibility of compression,
3251                // else ignore, and send the data uncompressed.
3252                //
3253                // We assume that trying to deflate anything smaller than
3254                // (say) 256 bytes is a waste of effort given the
3255                // deflate overhead and, indeed, the packet overhead.
3256                if(attemptCompression &&
3257                   (plLength >= FileTools.TYPICAL_DEFLATE_MIN_TEXT_SIZE_COMPRESSABLE))
3258                    {
3259                    final byte compressed[] = FileTools.compressDeflatableData(data, 0, initialPortion);
3260                    final boolean savedSpace = (compressed.length < plLength);
3261    
3262                    // If successful then store the compressed data.
3263                    if(savedSpace)
3264                        {
3265                        payload = null;
3266                        payloadCompressed = compressed;
3267    if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[RawPacket: compressed payload from "+plLength+" to "+(payloadCompressed.length)+" bytes.]"); }
3268                        return;
3269                        }
3270                    }
3271    
3272                // Store uncompressed data.
3273                // Store without copying if data is entire array.
3274                if(data.length == plLength)
3275                    { payload = data; }
3276                else
3277                    {
3278                    // It is inefficient to copy,
3279                    // so we want to avoid this where possible,
3280                    // and in debug will warn where it is happening.
3281    if(IsDebug.isDebug) { System.err.println("[RawPacket: WARNING: copying uncompressed payload from oversize buffer.]"); Thread.dumpStack(); }
3282                    payload = new byte[plLength];
3283                    System.arraycopy(data, 0, payload, 0, plLength);
3284                    }
3285                payloadCompressed = null;
3286                }
3287    
3288            /**Construct a new raw packet with data provided.
3289             * This accepts whatever it is passed
3290             * (we check for some errors with run-time assertions),
3291             * and is intended only to be called from other constructors
3292             * or factory methods.
3293             */
3294            private RawPacket(final OpCode opCode,
3295                              final int plLength,
3296                              final byte[] payload,
3297                              final byte[] payloadCompressed)
3298                {
3299                assert(opCode != null);
3300                assert(plLength >= 0); // Uncompressed payload always has non-negative length.
3301                assert((payload == null) != (payloadCompressed == null)); // Exactly one must be null.
3302                assert((payload == null) || (payload.length == plLength)); // Check length of uncompressed payload.
3303    
3304                this.opCode = opCode;
3305                this.plLength = plLength;
3306                this.payload = payload;
3307                this.payloadCompressed = payloadCompressed;
3308                }
3309    
3310            /**Construct a new raw packet with a serialised object as the payload.
3311             * This will use an uncompressed form if more efficient on the wire.
3312             * <p>
3313             * This will have the entire uncompressed form in memory at peak,
3314             * so may be unsuitable for very large (and highly-compressable)
3315             * objects.
3316             *
3317             * @param op  the op-code of the RPC; one of the OP_XXX values
3318             * @param obj  the Serializable object to send,
3319             *     or null to send empty payload
3320             * @param attemptCompression  if true, it is probably worth attempting
3321             *     to compress the payload data if not very short
3322             */
3323            public RawPacket(final OpCode op, final Serializable obj,
3324                             final boolean attemptCompression)
3325                throws IOException
3326                {
3327                this(op, _serObjForPayload(obj), attemptCompression);
3328                }
3329    
3330            /**Construct a new raw packet with a stream-serialised object as the payload.
3331             * This will always compress the supplied object
3332             * (unless null, in which case an empty payload is used)
3333             * to try to avoid ever having the full unserialised form in memory.
3334             * <p>
3335             * This will veto with an IOException any attempt to write more than
3336             * MAX_PAYLOAD_SIZE pre-compression or pre-compression bytes.
3337             *
3338             * @param op  the op-code of the RPC; one of the OP_XXX values
3339             * @param obj  the Serializable object to send,
3340             *     or null to send empty payload
3341             * @param attemptCompression  if true, it is probably worth attempting
3342             *     to compress the payload data if not very short
3343             */
3344            public static RawPacket streamSerialiseObject(final OpCode op, final Serializable obj)
3345                throws IOException
3346                {
3347                if(op == null) { throw new IllegalArgumentException(); }
3348    
3349                // If the object to be serialised is null
3350                // then return a result using an empty payload.
3351                if(null == obj) { return(new RawPacket(op, 0, EMPTY_PAYLOAD, null)); }
3352    
3353                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3354                // We veto any attempt to construct a frame
3355                // whose compressed or uncompressed payload would exceed MAX_PAYLOAD_SIZE bytes.
3356                // We apply exactly the same header-less deflate as in other places we compress.
3357                final DefOutputStream dos = new DefOutputStream(new GenUtils.LengthLimitedOutputStream(baos, MAX_PAYLOAD_SIZE));
3358                final GenUtils.LengthLimitedOutputStream llos = new GenUtils.LengthLimitedOutputStream(dos, MAX_PAYLOAD_SIZE); // Must not throw an exception...
3359                ObjectOutputStream oos = null;
3360                try
3361                    {
3362                    oos = new ObjectOutputStream(llos);
3363                    oos.writeObject(obj);
3364                    oos.flush();
3365                    }
3366                finally
3367                    { dos.finish(); } // Finish compression and ensure that native resources are released.
3368                if(oos != null) { oos.close(); } // Hopefully free resources quickly.
3369    
3370                // Construct packet with compressed data...
3371                return(new RawPacket(op, llos.getBytesWritten(), null, baos.toByteArray()));
3372                }
3373    
3374            /**Serialise object for payload.
3375             * Returns a zero-length array if the object is null.
3376             */
3377            private static byte[] _serObjForPayload(final Serializable obj)
3378                throws IOException
3379                {
3380                if(obj == null)
3381                    { return(ExhibitDataTunnelSource.EMPTY_PAYLOAD); }
3382    
3383                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3384                final ObjectOutputStream oos = new ObjectOutputStream(baos);
3385                oos.writeObject(obj);
3386                oos.flush();
3387                final byte[] responsePayload = baos.toByteArray();
3388                oos.close(); // Hopefully free resources quickly.
3389                return(responsePayload);
3390                }
3391    
3392            /**Retrieves a single serialised Object from the payload.
3393             * If the payload is empty (zero-length), we return null.
3394             * <p>
3395             * For robustness, treats a class mismatch problem like a data problem,
3396             * ie this will re-throw any ClassNotFoundException as an IOException.
3397             */
3398            public Object getSerializedObjectPayload()
3399                throws IOException
3400                {
3401                if(plLength == 0) { return(null); }
3402    
3403                final InputStream is = getPayloadAsInputStream();
3404                final ObjectInputStream ois = new ObjectInputStream(is);
3405                try { return(ois.readObject()); }
3406                catch(final ClassNotFoundException e) { throw new IOException(e); }
3407                finally { ois.close(); } // Release resources, especially native in decompressor.
3408                }
3409    
3410            /**Computes the total number of bytes that would be written by writePacket(); strictly positive.
3411             */
3412            public int getFrameLength()
3413                {
3414                final boolean isCompressed = (payload == null);
3415                final byte[] dataOnTheWire =
3416                    isCompressed ? payloadCompressed : payload;
3417                final int realPayloadLen = dataOnTheWire.length;
3418    
3419                // The total length on the wire of the entire packet.
3420                // An excess 6 bytes allows for 1 opCode, 4 length, 1 trailer.
3421                return(6 + realPayloadLen);
3422                }
3423    
3424            /**Send the packet down the given OutputStream with header and trailer.
3425             * We flush the data after writing it.
3426             * <p>
3427             * If the payload is small enough,
3428             * then the entire packet will be buffered
3429             * and sent in one write().
3430             * <p>
3431             * We do not close() the stream.
3432             */
3433            public void writePacket(final OutputStream os)
3434                throws IOException
3435                {
3436    if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[writePacket(): op="+opCode+" len="+plLength+".]"); }
3437    
3438                final boolean isCompressed = (payload == null);
3439                final byte[] dataOnTheWire =
3440                    isCompressed ? payloadCompressed : payload;
3441                final int realPayloadLen = dataOnTheWire.length;
3442    
3443                // The total length on the wire of the entire packet.
3444                final int totalPacketLen = getFrameLength();
3445    
3446                // Buffer the output (with a carefully-crafted buffer size)
3447                // so that we can write it all in one operating-system interaction
3448                // if small enough, else in efficient-ish disc/network-size chunks.
3449                // (Note that Tomcat 4.1.31 writes chunked data to the network
3450                // in blocks of 2kB on Solaris 8/9,
3451                // but also note that the write of a large payload will cause
3452                // the header data to be flushed through the buffer first,
3453                // so anything other than quite a small buffer may be a waste.)
3454                final DataOutputStream dos = new DataOutputStream(
3455                    new BufferedOutputStream(os,
3456                        (realPayloadLen <= CoreConsts.BULK_DATA_TRANSFER_SIZE) ?
3457                            totalPacketLen : 512));
3458    
3459                // Send the opcode.
3460                dos.writeByte(opCode.getCode());
3461                // Send the payload length (negative indicating compressed).
3462                dos.writeInt(isCompressed ? -realPayloadLen : realPayloadLen);
3463                // Send the payload.
3464                dos.write(dataOnTheWire, 0, realPayloadLen);
3465                // Write the trailer.
3466                dos.writeByte(trailer);
3467                // Flush data though any buffers and down the wire if possible.
3468                dos.flush();
3469                }
3470    
3471            /**Reads a packet from the input stream, blocking until done.
3472             * Even if the data is received compressed it is stored in
3473             * the packet that this returns in uncompressed form, so the
3474             * presence or absence of compression on the wire is invisible
3475             * to the recipient.
3476             * This also makes (re)reading the packet data cheap.
3477             * <p>
3478             * Compressed data is signalled by a negative length on the wire,
3479             * where this is the negation of the compressed data size.
3480             * <p>
3481             * The ZLIB deflater with maximum compression and
3482             * no checksum is used for any (de)compression.
3483             * <p>
3484             * To prevent unpleasantness such a OutOfMemoryError from a corrupt length value,
3485             * we veto lengths greater than the maximum specified for this frame type.
3486             */
3487            public static RawPacket readPacket(final InputStream is)
3488                throws IOException
3489                {
3490                // While not wonderfully efficient,
3491                // to save a great deal of fiddly coding
3492                // the input stream is wrapped in a DataInputStream.
3493                final DataInputStream dis = new DataInputStream(is);
3494                // Start collecting the data...
3495                final byte opC = dis.readByte();
3496                final OpCode op = OpCode.lookupCode(opC);
3497                if(op == null) { throw new IOException("invalid opcode "+opC); }
3498                final int rawLen = dis.readInt();
3499                final boolean isCompressed = (rawLen < 0);
3500                final int len = ((isCompressed) ? -rawLen : rawLen);
3501    if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[readPacket(): op="+opC+" rawLen="+rawLen+"...]"); }
3502                if(len > MAX_PAYLOAD_SIZE) { throw new IOException("corrupt length"); }
3503                final byte data[] = new byte[len];
3504                dis.readFully(data);
3505                final byte tr = dis.readByte();
3506                if(tr != TRAILER)
3507                    { throw new IOException("corrupt trailer"); }
3508                try {
3509                    // Decompress the data if necessary...
3510                    if(isCompressed)
3511                        {
3512                        // Uncompress from the input stream...
3513                        final byte uncompressedData[] = FileTools.decompressDeflatedData(data);
3514    
3515    if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[readPacket(): compressed payload len="+data.length+", uncompressed = "+uncompressedData.length+" bytes.]"); }
3516                        return(new RawPacket(op, uncompressedData, false));
3517                        }
3518                    return(new RawPacket(op, data, false));
3519                    }
3520                catch(final IllegalArgumentException e)
3521                    { throw new IOException("corrupt packet: " + e.getMessage()); }
3522                }
3523    
3524            /**Create a human-readable summary (not including the payload data). */
3525            @Override
3526            public String toString()
3527                {
3528                final StringBuilder sb = new StringBuilder(79);
3529                sb.append("RawPacket");
3530                sb.append(":opCode=").append(opCode);
3531                sb.append(":plLength=").append(plLength);
3532                if(payloadCompressed != null)
3533                    { sb.append(":compressedLength=").append(payloadCompressed.length); }
3534                return(sb.toString());
3535                }
3536    
3537            /**Get the payload data as an InputStream; never null.
3538             * If the internal data is stored compressed however,
3539             * then this will return the uncompressed version.
3540             * <p>
3541             * Each stream returned is independent.
3542             * <p>
3543             * This stream should be explicitly closed when finished with
3544             * to release underlying memory and other resources ASAP.
3545             *
3546             * @return  the uncompressed payload data as a stream of length plLength
3547             */
3548            public InputStream getPayloadAsInputStream()
3549                {
3550                if(payload != null)
3551                    {
3552                    // Payload is uncompressed; wrap it as-is.
3553                    assert(payload.length == plLength);
3554                    return(new ByteArrayInputStream(payload));
3555                    }
3556    
3557                // Payload is held compressed so decompress it
3558                // (on the fly to reduce peak memory usage).
3559                try { return(new DefInputStream(new ByteArrayInputStream(payloadCompressed))); }
3560                catch(final IOException e) { throw new IllegalStateException("unable to decompress"); }
3561                }
3562    
3563            /**Get a copy of the payload data; never null.
3564             * We return a copy to keep this object immutable.
3565             * <p>
3566             * If the internal data is stored compressed
3567             * then this will return the original uncompressed version.
3568             *
3569             * @return  a copy of the payload data, of length plLength
3570             */
3571            public byte[] getPayloadCopy()
3572                {
3573                if(payload != null)
3574                    {
3575                    // Payload is uncompressed; clone it as-is.
3576                    assert(payload.length == plLength);
3577                    // Take copy to protect internal state.
3578                    return(payload.clone());
3579                    }
3580    
3581                // Payload is held compressed, so decompress it.
3582                try
3583                    {
3584                    final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3585                    assert(uncompressedData.length == plLength);
3586                    // No need to copy (private) data returned from decompression.
3587                    return(uncompressedData);
3588                    }
3589                catch(final IOException e)
3590                    {
3591                    throw new IllegalStateException("corrupt compressed data");
3592                    }
3593                }
3594    
3595            /**Get direct access to the uncompressed payload; never null.
3596             * We avoid making a copy, for speed.
3597             * <em>The caller must not alter the byte array returned.</em>
3598             * <p>
3599             * If the internal data is stored compressed
3600             * then this will return an uncompressed copy.
3601             * <p>
3602             * This is package-visible and is only for trusted callers
3603             * that will not alter the content.
3604             *
3605             * @return  a the payload data, of length plLength; never null,
3606             */
3607            byte[] getPayload()
3608                {
3609                if(payload != null)
3610                    {
3611                    // Payload is uncompressed; clone it as-is.
3612                    assert(payload.length == plLength);
3613                    // Trust caller not to mess around...
3614                    return(payload);
3615                    }
3616    
3617                // Payload is held compressed, so decompress it.
3618                try
3619                    {
3620                    final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3621                    assert(uncompressedData.length == plLength);
3622                    // No need to copy (private) data returned from decompression.
3623                    return(uncompressedData);
3624                    }
3625                catch(final IOException e)
3626                    {
3627                    throw new IllegalStateException("corrupt compressed data");
3628                    }
3629                }
3630    
3631            /**Copy the payload data into the supplied (trusted) buffer.
3632             * We only make this package-visible to keep this object immutable.
3633             * <p>
3634             * If the internal data is stored compressed
3635             * then this will return the original uncompressed version.
3636             */
3637            void getPayloadCopy(final ByteBuffer buf)
3638                {
3639                if(payload != null)
3640                    {
3641                    // Payload is uncompressed; clone it as-is.
3642                    assert(payload.length == plLength);
3643                    // Copy uncompressed data directly.
3644                    buf.put(payload);
3645                    return;
3646                    }
3647    
3648                // Payload is held compressed, so decompress it.
3649                try
3650                    {
3651                    final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3652                    assert(uncompressedData.length == plLength);
3653                    // No need to copy (private) data returned from decompression.
3654                    buf.put(uncompressedData);
3655                    return;
3656                    }
3657                catch(final IOException e)
3658                    {
3659                    throw new IllegalStateException("corrupt compressed data");
3660                    }
3661                }
3662            }
3663    
3664        /**The immutable adjunct for a RawPacket that includes the HMAC and other anti-attack data.
3665         * Note that this class is NOT directly serialisable,
3666         * with the data fields sent in some other way,
3667         * eg as HTTP header fields "out-of-band" from the actual HTTP message.
3668         */
3669        public static final class PacketProtector
3670            {
3671            /**The timestamp for the RawPacket, input to each MAC; strictly positive. */
3672            public final long timestamp;
3673    
3674            /**The length of the entire frame/datastream being protected, input to each MAC; non-negative. */
3675            public final int length;
3676    
3677            /**The immutable in-order list of MAC authenticator segments for the stream and fields herein; never null nor empty nor containing nulls.
3678             * This sequence of HMAC values together makes up the message MAC.
3679             * It is designed not to be possible to re-arrange these segments,
3680             * as each depends in part on the value of its predecessor
3681             * as well as the data in its segment.
3682             * <p>
3683             * Note that the <em>last</em> MAC, which depends on all the data being protected,
3684             * can be used as a unique message MAC on its own.
3685             */
3686            public final List<ROByteArray> mac;
3687    
3688            /**Maxium size in (ASCII) characters of output of toCheckString(); strictly positive.
3689             * Chosen to allow inclusion of the output of toCheckString() in an HTTP header.
3690             */
3691            public static final int MAX_CHECK_STRING_CHARS = 900;
3692    
3693            /**Generate (HTTP-header) check-string; never null nor empty.
3694             * This is a pure-ASCII single-line control-code-free check String
3695             * that contains the input fields and MAC from this object
3696             * to serve as the "checksum" of a RawPacket.
3697             * <p>
3698             * This object is suitable (short enough, avoidance of meta-characters)
3699             * to be used directly in an HTTP header.
3700             * <p>
3701             * This value is suitable to be decoded by fromCheckString().
3702             * <p>
3703             * The format is a space-separated list of fields:
3704             * <ol>
3705             * <li>The Java-style UTC timestamp in milliseconds as an unsigned decimal.
3706             * <li>The length of the protected stream in bytes as an unsigned decimal.
3707             * <li>The MAC values for each segment, in order, encoded Base64.
3708             * </ol>
3709             */
3710            public String toCheckString()
3711                {
3712                // Do a worst-case check given the size of the (first) MAC as encoded.
3713                assert((""+Long.MAX_VALUE+' '+Integer.MAX_VALUE+' ').length() + (MAX_SEGMENTS*(1+TextUtils.encode8To6(mac.get(0).toByteArray()).length()))-1 < MAX_CHECK_STRING_CHARS) : "check string must always be short enough for HTTP header";
3714    
3715                final StringBuilder sb = new StringBuilder(32 + (45*mac.size()));
3716                sb.append(timestamp).append(' ');
3717                sb.append(length).append(' ');
3718                for(final ROByteArray r : mac)
3719                    { sb.append(TextUtils.encode8To6(r.toByteArray())).append(' '); }
3720                // Rip off trailing space...
3721                sb.setLength(sb.length()-1);
3722                assert(sb.length() <= MAX_CHECK_STRING_CHARS) : "check string must be short enough for HTTP header";
3723                return(sb.toString());
3724                }
3725    
3726            /**Field splitter regex pattern compiled once for efficiency; never null. */
3727            private static final Pattern fieldSplitPattern = Pattern.compile(" ");
3728    
3729            /**Parses a check-string as generated by toCheckString(); never null.
3730             * This is tolerant of some leading and trailing whitespace.
3731             *
3732             * @throws IllegalArgumentException  if the input is unparsable
3733             */
3734            public static PacketProtector fromCheckString(final String check)
3735                {
3736                if((check == null) || (check.length() > 2*MAX_CHECK_STRING_CHARS))
3737                    { throw new IllegalArgumentException("too big before trim"); }
3738                final String trimmed = check.trim();
3739                if(trimmed.length() > MAX_CHECK_STRING_CHARS)
3740                    { throw new IllegalArgumentException("too big"); }
3741                final String fields[] = fieldSplitPattern.split(check, 0);
3742                if(fields.length < 3)
3743                    { throw new IllegalArgumentException("not enough fields"); }
3744                final long timestamp = Long.parseLong(fields[0], 10);
3745                final int length = Integer.parseInt(fields[1], 10);
3746                if((length < 0) || (length > RawPacket.MAX_FRAME_SIZE))
3747                    { throw new IllegalArgumentException("out-of-range length value"); }
3748                final ArrayList<ROByteArray> mac = new ArrayList<ROByteArray>(fields.length-2);
3749                for(int i = 2; i < fields.length; ++i)
3750                    { mac.add(new ROByteArray(TextUtils.decode8To6(fields[i]))); }
3751                return(new PacketProtector(timestamp, length, mac));
3752                }
3753    
3754            /**Maximum number of segments protected stream may be broken into; strictly positive power of two.
3755             * This determines the maximum number of incremental portions
3756             * we can process while streaming a RawPacket.
3757             * <p>
3758             * This value is capped to limit the amount of MAC data that needs to be sent,
3759             * eg in an HTTP header of limited length.
3760             */
3761            public static final int MAX_SEGMENTS = 16;
3762    
3763            /**Minimum segment size (other than final segment); strictly positive power of two.
3764             * This is set to be somewhat larger than the largest typical frame
3765             * (ie the entire header, payload and trailer)
3766             * that we usually don't want to stream, eg bulk data transfer,
3767             * so that such frames/packets will get a single MAC for efficiency.
3768             * This is also set large enough to amortise the cost of each segment MAC.
3769             * <p>
3770             * This is small enough to represent a reasonable incremental amount of CPU
3771             * for streamed inputs.
3772             */
3773            public static final int MIN_SEGMENT_SIZE = (1<<16);
3774    
3775            /**Maximum segment size; strictly positive power of two.
3776             * This is determined from the maximum frame size
3777             * and the maximum number of segments.
3778             */
3779            public static final int MAX_SEGMENT_SIZE = RawPacket.MAX_FRAME_SIZE / MAX_SEGMENTS;
3780            /**Check some invariants. */
3781            static { assert(RawPacket.MAX_FRAME_SIZE == MAX_SEGMENT_SIZE * MAX_SEGMENTS); }
3782            static { assert(MAX_SEGMENT_SIZE > MIN_SEGMENT_SIZE); }
3783    
3784            /**Create an adjunct to protect a RawPacket, including a current timestamp.
3785             * This must be supplied with the (secret) Key shared between the servers.
3786             *
3787             * @param raw  the RawPacket to protect; never null
3788             * @param key  the (secret) key for the HMAC; never null
3789             */
3790            public PacketProtector(final RawPacket raw,
3791                                   final SecretKey key)
3792                throws InvalidKeyException
3793                { this(raw, System.currentTimeMillis(), key); }
3794    
3795            /**Create an adjunct to protect a RawPacket, including the given timestamp.
3796             * This must be supplied with the (secret) Key shared between the servers.
3797             *
3798             * @param raw  the RawPacket to protect; never null
3799             * @param timestamp  the timestamp for creation/send of the RawPacket; strictly positive
3800             * @param key  the (secret) key for the HMAC; never null
3801             */
3802            public PacketProtector(final RawPacket raw,
3803                                   final long timestamp,
3804                                   final SecretKey key)
3805                throws InvalidKeyException
3806                { this(timestamp, raw.getFrameLength(), computeMAC(timestamp, raw, key)); }
3807    
3808            /**Create an adjunct to protect a RawPacket.
3809             *
3810             * @param timestamp  the timestamp for creation/send of the RawPacket; strictly positive
3811             * @param mac  the HMAC for the RawPacket and other fields, not checked; never null
3812             */
3813            public PacketProtector(final long timestamp,
3814                                   final int length,
3815                                   final List<ROByteArray> mac)
3816                {
3817                this.timestamp = timestamp;
3818                this.length = length;
3819                // Take defensive immutable copy of MAC segment list.
3820                this.mac = Collections.unmodifiableList(new ArrayList<ROByteArray>(mac));
3821    
3822                try { validateObject(); } // Validate this instance.
3823                catch(final InvalidObjectException e) { throw new IllegalArgumentException(e); }
3824                }
3825    
3826            /**Compute segment size; strictly positive power of two.
3827             * Given the full message/frame length
3828             * this computes the segment size between/at the MIN/MAX limits
3829             * to maximise the number of segments
3830             * and thus maximise the the amount of incremental MAC processing
3831             * and incremental processing of streamed frames that can be done,
3832             * minimising buffer working space and time/bandwidth before corruption is found.
3833             * <p>
3834             * All segments are of the same size, except the last one which may be shorter.
3835             */
3836            private static final int computeSegmentSize(final int frameLength)
3837                {
3838                if((frameLength > RawPacket.MAX_FRAME_SIZE) ||
3839                   (frameLength < 0))
3840                    { throw new IllegalArgumentException(); }
3841    
3842                // Find minimum segment size that will cover the frame.
3843                for(int segSize = MIN_SEGMENT_SIZE; segSize <= MAX_SEGMENT_SIZE; segSize <<= 1)
3844                    {
3845                    if(segSize * MAX_SEGMENTS >= frameLength)
3846                        { return(segSize); }
3847                    }
3848    
3849                throw new Error("should not get here");
3850                }
3851    
3852            /**Compute the MAC given the message and other fields to be included; never null.
3853             * The (secret) Key must also be supplied
3854             * and be suitable for the HMAC algorithm used.
3855             * <p>
3856             * The MAC is computed on a series of segments of the input of the same length
3857             * (except for a possibly-shorter final segment)
3858             * producing an HMAC on each segment.
3859             * <p>
3860             * Each HMAC is computed over a binary message consisting of:
3861             * <ol>
3862             * <li>the 4-byte length of the entire protected stream/frame.
3863             * <li>the 8-byte (Java-style millisecond UTC) timestamp in network order,
3864             * <li>for all segments other than the first, the HMAC of the previous segment,
3865             * <li>the full on-the-wire form of the RawPacket (including header/trailer).
3866             * </ol>
3867             * <p>
3868             * The chaining should make it impossible to reorder the segments.
3869             * <p>
3870             * The segmentation is so that the data in each segment is known-safe
3871             * and can be safely consumed by incremental/streaming CPU-heavy operations
3872             * before subsequent segments have been received and decoded.
3873             * This segmentation also means that we can abort a damaged message
3874             * as soon as we check the damaged segment: we do not need to wait
3875             * to receive and store and check the whole message.
3876             *
3877             * @param raw  the RawPacket to protect; never null
3878             * @param timestamp  the timestamp for creation/send of the RawPacket; strictly positive
3879             * @param key  the (secret) key for the HMAC; never null
3880             *
3881             * @throws InvalidKeyException  if the Key supplied is inappropriate
3882             *
3883             * @throws IllegalArgumentException  if the timestamp is non-positive
3884             *     or any other argument is null
3885             * @throws IllegalStateException  if the HMAC algorithm is unavailable
3886             *
3887             * @return immutable in-order list of HMACs to protect each stream segment
3888             */
3889            public static List<ROByteArray> computeMAC(final long timestamp,
3890                                                       final RawPacket raw,
3891                                                       final SecretKey key)
3892                throws InvalidKeyException
3893                {
3894                if((timestamp <= 0) || (raw == null) || (key == null))
3895                    { throw new IllegalArgumentException(); }
3896    
3897                // Compute the segment size that we will be using.
3898                final int frameLength = raw.getFrameLength();
3899                final int segmentSize = computeSegmentSize(frameLength);
3900                assert(0 == (segmentSize & (segmentSize-1))); // Segment size should be a power of two.
3901    
3902                // Compute the number of segments that we will be using.
3903                final int nSegs = (frameLength + segmentSize - 1) / segmentSize;
3904    
3905                // Compose our result MAC list...
3906                final List<ROByteArray> mac = new ArrayList<ROByteArray>(nSegs);
3907    
3908                // Incrementally generate the MAC block by block.
3909                // We use an output stream sink to avoid having to copy large frames.
3910                // This OutputStream is not thread-safe and does not need to be.
3911                // Working memory required is basically only that for the current segment's Mac.
3912                final OutputStream sink = new OutputStream(){
3913                    /**MAC for current segment; initialised for first block. */
3914                    Mac current = startMacForSegment();
3915    
3916                    /**Start MAC for next segment.
3917                     * The working MAC must be null before this is called.
3918                     */
3919                    private Mac startMacForSegment()
3920                        {
3921                        try
3922                            {
3923                            final Mac m = Mac.getInstance(CoreConsts.HMAC_ALG);
3924                            m.init(key);
3925    
3926                            // First, write 4-byte length of entire message.
3927                            m.update(intSer(frameLength));
3928                            // Second, write 8-byte timestamp.
3929                            m.update(longSer(timestamp));
3930                            // If there is a previous MAC, stir it in.
3931                            if(null != prevMAC) { m.update(prevMAC); }
3932    
3933                            return(m);
3934                            }
3935                        catch(final Exception e)
3936                            {
3937                            e.printStackTrace();
3938                            throw new Error("internal error", e);
3939                            }
3940                        }
3941    
3942                    /**The previous MAC value, or null while gathering the first segment. */
3943                    private byte[] prevMAC;
3944    
3945                    /**Count of bytes seen so far; non-negative.
3946                     * Implicitly this also indicates how far into the current segment we are.
3947                     */
3948                    private int bytesSeenSoFar;
3949    
3950                    /**Process the next block to generate the next MAC.
3951                     * Must only be called when we've collected a full segment, or at EOF/close().
3952                     */
3953                    private void endMacForSegment()
3954                        {
3955                        try
3956                            {
3957                            // Compute and record the MAC.
3958                            final byte[] hmac = current.doFinal();
3959                            prevMAC = hmac;
3960                            mac.add(new ROByteArray(hmac));
3961                            }
3962                        catch(final Exception e)
3963                            {
3964                            e.printStackTrace();
3965                            throw new Error("internal error", e);
3966                            }
3967                        }
3968    
3969                    @Override
3970                    public void write(final byte[] b, int off, int len) throws IOException
3971                        {
3972                        // Loop to generate multiple segment MACs if needed.
3973                        while(len > 0)
3974                            {
3975                            // Compute bytes needed to fill the buffer: always positive.
3976                            final int offsetInSegBuf = bytesSeenSoFar & (segmentSize-1); // Faster version of bytesSeenSoFar % segmentSize;
3977                            final int needed = segmentSize - (offsetInSegBuf);
3978                            // How many will we copy in...
3979                            final int toCopy = Math.min(needed, len);
3980    
3981                            // Write the frame bytes to protect.
3982                            current.update(b, off, toCopy);
3983    
3984                            // Adjust all our indexes...
3985                            bytesSeenSoFar += toCopy;
3986                            off += toCopy;
3987                            len -= toCopy;
3988    
3989                            // If we copied exactly up to the end of the segBuf
3990                            // then complete the current MAC and start the next one...
3991                            if(toCopy == needed) { endMacForSegment(); current = startMacForSegment(); }
3992                            }
3993                        }
3994    
3995                    /**Horribly inefficient, but correct, and should never actually get called. */
3996                    @Override public void write(final int b) throws IOException
3997                        {
3998                        final byte buf[] = new byte[1];
3999                        buf[0] = (byte) b;
4000                        write(buf, 0, 1);
4001                        }
4002    
4003                    /**Handle the tail of the input. */
4004                    @Override public void close() throws IOException
4005                        {
4006                        assert(bytesSeenSoFar == frameLength); // Should have seen all the data.
4007                        endMacForSegment(); // Push out the current (final) MAC...
4008                        }
4009                    };
4010                // Compute the HMAC value(s).
4011                try { raw.writePacket(sink); sink.close(); }
4012                catch(final IOException e) { throw new Error("internal error", e); }
4013    //System.out.println("frameLength/nSegs/mac.size() = " + frameLength + '/' + nSegs + '/' + mac.size());
4014                assert(nSegs == mac.size()); // Did we create the correct number of MACs values?
4015    
4016                return(Collections.unmodifiableList(mac));
4017                }
4018    
4019            /**Protect an input stream with our MAC; aborts with IOException in case of corruption.
4020             * This acts as a transparent pass-through filter
4021             * that will read a MAC-protected-segment at a time from the underlying stream
4022             * and either pass it through having verified it,
4023             * or abort with an IOException if the segment is corrupt.
4024             * <p>
4025             * This closes its input stream and vetoes any further operations
4026             * once any error has been encountered.
4027             * <p>
4028             * We use Key rather than SecretKey,
4029             * since the latter depends on Java extensions that may not be available,
4030             * eg when run in JWS.
4031             */
4032            public InputStream protectInputStream(final Key key, final InputStream is)
4033                {
4034                if(key == null) { throw new IllegalArgumentException(); }
4035                if(is == null) { throw new IllegalArgumentException(); }
4036    
4037                // Compute our segment size for buffering.
4038                final int segmentSize = computeSegmentSize(length);
4039    
4040                // Number of segments.
4041                final int nSegs = mac.size();
4042    
4043                // To avoid any confusion, we do not allow mark/reset.
4044                return(new FilterInputStream(is){
4045                    /**Set true upon any error; subsequently we will not allow any further read()s. */
4046                    private boolean hasErred;
4047    
4048                    /**Our read buffer for one segment's data (or whole frame if smaller); never null. */
4049                    private final byte[] segBuf = new byte[Math.min(length, segmentSize)];
4050    
4051                    /**Data remaining in segBuf; non-negative. */
4052                    private int bufLen;
4053    
4054                    /**Start of remaining data in segBuf; non-negative. */
4055                    private int bufPos;
4056    
4057                    /**Which MAC segment is next to be loaded/verified. */
4058                    private int nextSegNumber;
4059    
4060                    /**Whatever is in our buffer plus from upstream without blocking. */
4061                    @Override public synchronized int available() throws IOException
4062                        { return(in.available() + bufLen); }
4063    
4064                    @Override public void mark(final int readlimit)
4065                        { throw new UnsupportedOperationException("mark not supported"); }
4066    
4067                    /**Mark/reset is not allowed to avoid confusion/complexity. */
4068                    @Override public boolean markSupported() { return(false); }
4069    
4070                    /**Private buffer for read(); never null. */
4071                    private final byte[] buf1 = new byte[1];
4072    
4073                    /**Read a single byte: we try to make this moderately efficient. */
4074                    @Override public synchronized int read() throws IOException
4075                        {
4076                        if(hasErred) { throw new IOException("corrupt stream"); }
4077    
4078                        // Fast path to return data already in our buffer.
4079                        if(bufLen > 0)
4080                            {
4081                            --bufLen;
4082                            return(segBuf[bufPos++] & 0xff);
4083                            }
4084    
4085                        // Use full read() to do all the leg-work.
4086                        final int n = read(buf1, 0, 1);
4087                        if(n < 1) { return(-1); /* EOF */ }
4088                        assert(n == 1);
4089                        return(buf1[0] & 0xff);
4090                        }
4091    
4092                    /**Bulk data read.
4093                     * This routine returns any data it has buffered (ie already verified)
4094                     * else it reads a segment of data from the underlying stream
4095                     * (possibly less than a full segment at EOF, but verifying the length)
4096                     * and validates it with the appropriate MAC.
4097                     * <p>
4098                     * If validation against the MAC fails
4099                     * then this and all subsequent calls to read()
4100                     * are vetoed with an IOException.
4101                     * <p>
4102                     * If validation succeeds then the requested data is returned.
4103                     * <p>
4104                     * We do not return data in one read across segment boundaries,
4105                     * as we are not required to and it would increase complexity.
4106                     */
4107                    @Override
4108                    public synchronized int read(final byte[] b, final int off, final int len) throws IOException
4109                        {
4110                        if((b == null) ||
4111                           (off < 0) || (off >= b.length) ||
4112                           (len < 0) || (off + len > b.length))
4113                            { throw new IllegalArgumentException(); }
4114    
4115                        if(hasErred) { throw new IOException("corrupt stream"); }
4116    
4117                        if(len == 0) { return(0); }
4118    
4119                        // If we have no verified data left in our buffer
4120                        // then unless at EOF we need to load another segment and verify it.
4121                        if(bufLen == 0)
4122                            {
4123                            // We are at EOF if we have read the last segment.
4124                            if(nextSegNumber >= nSegs) { return(-1); }
4125    
4126                            // Allow for the final segment to be smaller.
4127                            final int readSize = (nextSegNumber < nSegs-1) ? segmentSize :
4128                                (length - (segmentSize * (nSegs-1)));
4129                            bufPos = 0; // Next caller will read from offset zero.
4130                            bufLen = 0; // Buffer is currently empty...
4131                            while(bufLen != readSize)
4132                                {
4133                                try
4134                                    {
4135                                    final int n = in.read(segBuf, bufLen, readSize - bufLen);
4136                                    if(n < 1) { throw new IOException("premature EOF"); }
4137                                    bufLen += n;
4138                                    }
4139                                catch(final IOException e) { hasErred = true; throw e; }
4140                                }
4141    
4142                            // Verify the data.
4143                            try
4144                                {
4145                                final Mac m = Mac.getInstance(CoreConsts.HMAC_ALG);
4146                                m.init(key);
4147    
4148                                // First, write 4-byte length of entire message.
4149                                m.update(intSer(length));
4150                                // Second, write 8-byte timestamp.
4151                                m.update(longSer(timestamp));
4152                                // If there is a previous MAC, stir it in.
4153                                if(nextSegNumber > 0) { m.update(mac.get(nextSegNumber-1).toByteArray()); }
4154                                // Finally, use the bytes read from upstream.
4155                                m.update(segBuf, 0, bufLen);
4156    
4157                                // Compute and record the MAC.
4158                                final byte[] hmac = m.doFinal();
4159                                if(!Arrays.equals(mac.get(nextSegNumber++).toByteArray(), hmac))
4160                                    { throw new IOException("corrupt input data"); }
4161                                }
4162                            catch(final IOException e)
4163                                {
4164                                hasErred = true; // Prevent further read()s from this stream.
4165                                throw e;
4166                                }
4167                            catch(final Exception e)
4168                                {
4169                                hasErred = true;
4170                                e.printStackTrace();
4171                                throw new Error("internal error", e);
4172                                }
4173                            }
4174    
4175                        // The buffer must now have something in it.
4176                        assert(bufLen > 0);
4177                        // Return verified data from our buffer to the caller.
4178                        final int toCopy = Math.min(bufLen, len);
4179                        System.arraycopy(segBuf, bufPos, b, off, toCopy);
4180                        bufLen -= toCopy;
4181                        bufPos += toCopy;
4182                        return(toCopy);
4183                        }
4184    
4185                    /**Implemented as read(b, 0, b.length). */
4186                    @Override public int read(final byte[] b) throws IOException
4187                        { return(read(b, 0, b.length)); }
4188    
4189                    @Override public void reset() throws IOException
4190                        { throw new IOException("reset not allowed"); }
4191    
4192                    /**We have to read the input to skip it. */
4193                    @Override public synchronized long skip(long n) throws IOException
4194                        {
4195                        long skipped = 0;
4196                        final byte b[] = new byte[(int) Math.min(1024, n)];
4197                        while(n > 0)
4198                            {
4199                            final int r = read(b, 0, (int) Math.min(n, b.length));
4200                            if(r < 1) { break; /* EOF */ }
4201                            n -= r;
4202                            skipped += r;
4203                            }
4204                        return(skipped);
4205                        }
4206                    });
4207                }
4208    
4209            /**Checks that the object content is valid only.
4210             * This does NOT attempt to check the MAC matches the message and fields;
4211             * that must be done explicitly elsewhere.
4212             * <p>
4213             * Partly this does not check the MAC because it does not have access to the key.
4214             */
4215            public void validateObject() throws InvalidObjectException
4216                {
4217                if(timestamp <= 0) { throw new InvalidObjectException("bad object: bad timestamp"); }
4218                if(length < 0) { throw new InvalidObjectException("bad object: bad length"); }
4219                if(mac == null) { throw new InvalidObjectException("bad object: null MAC"); }
4220                if(mac.size() == 0) { throw new InvalidObjectException("bad object: empty MAC"); }
4221                for(final ROByteArray m : mac)
4222                    {
4223                    if(m == null) { throw new InvalidObjectException("bad object: null segement MAC"); }
4224                    if(m.length() == 0) { throw new InvalidObjectException("bad object: empty segment MAC"); }
4225                    }
4226                }
4227    
4228            /**Equality depends on all the members being equal. */
4229            @Override
4230            public boolean equals(final Object obj)
4231                {
4232                if(this == obj) { return(true); }
4233                if(!(obj instanceof PacketProtector)) { return(false); }
4234                final PacketProtector other = (PacketProtector) obj;
4235                return((timestamp == other.timestamp) &&
4236                       (length == other.length) &&
4237                       mac.equals(other.mac));
4238                }
4239    
4240            /**We use the timestamp and length fields in the hash for the entire collection. */
4241            @Override public int hashCode() { return(length ^ (int) timestamp); }
4242            }
4243        }