001 package org.hd.d.pg2k.svrCore.datasource;
002
003 import java.io.BufferedOutputStream;
004 import java.io.ByteArrayInputStream;
005 import java.io.ByteArrayOutputStream;
006 import java.io.DataInputStream;
007 import java.io.DataOutputStream;
008 import java.io.FilterInputStream;
009 import java.io.IOException;
010 import java.io.InputStream;
011 import java.io.InterruptedIOException;
012 import java.io.InvalidObjectException;
013 import java.io.ObjectInputStream;
014 import java.io.ObjectOutputStream;
015 import java.io.OutputStream;
016 import java.io.Serializable;
017 import java.nio.ByteBuffer;
018 import java.security.InvalidKeyException;
019 import java.security.Key;
020 import java.util.ArrayList;
021 import java.util.Arrays;
022 import java.util.BitSet;
023 import java.util.Collections;
024 import java.util.Date;
025 import java.util.Iterator;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Properties;
029 import java.util.concurrent.Callable;
030 import java.util.concurrent.TimeUnit;
031 import java.util.concurrent.atomic.AtomicReference;
032 import java.util.concurrent.locks.ReentrantLock;
033 import java.util.regex.Pattern;
034
035 import javax.crypto.Mac;
036 import javax.crypto.SecretKey;
037 import javax.management.RuntimeErrorException;
038
039 import org.hd.d.pg2k.svrCore.AllExhibitImmutableData;
040 import org.hd.d.pg2k.svrCore.AllExhibitProperties;
041 import org.hd.d.pg2k.svrCore.AllExhibitPropertiesDelta;
042 import org.hd.d.pg2k.svrCore.AllExhibitPropertiesDelta.DiffException;
043 import org.hd.d.pg2k.svrCore.CompressionLevel;
044 import org.hd.d.pg2k.svrCore.CoreConsts;
045 import org.hd.d.pg2k.svrCore.DefInputStream;
046 import org.hd.d.pg2k.svrCore.DefOutputStream;
047 import org.hd.d.pg2k.svrCore.ExhibitName;
048 import org.hd.d.pg2k.svrCore.ExhibitStaticAttr;
049 import org.hd.d.pg2k.svrCore.ExhibitThumbnails;
050 import org.hd.d.pg2k.svrCore.FileTools;
051 import org.hd.d.pg2k.svrCore.GenUtils;
052 import org.hd.d.pg2k.svrCore.MemoryTools;
053 import org.hd.d.pg2k.svrCore.Name;
054 import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
055 import org.hd.d.pg2k.svrCore.PGMasterNotInServiceException;
056 import org.hd.d.pg2k.svrCore.ROByteArray;
057 import org.hd.d.pg2k.svrCore.Rnd;
058 import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
059 import org.hd.d.pg2k.svrCore.Stratum;
060 import org.hd.d.pg2k.svrCore.TextUtils;
061 import org.hd.d.pg2k.svrCore.Tuple;
062 import org.hd.d.pg2k.svrCore.WrappedByteArrayCharSequence;
063 import org.hd.d.pg2k.svrCore.collections.SoftReferenceMap;
064 import org.hd.d.pg2k.svrCore.props.GenProps;
065 import org.hd.d.pg2k.svrCore.stats.StatsLogger;
066 import org.hd.d.pg2k.svrCore.vars.BasicVarMgr;
067 import org.hd.d.pg2k.svrCore.vars.EventPeriod;
068 import org.hd.d.pg2k.svrCore.vars.EventVariableValue;
069 import org.hd.d.pg2k.svrCore.vars.InstanceID;
070 import org.hd.d.pg2k.svrCore.vars.SimpleVariableDefinition;
071 import org.hd.d.pg2k.svrCore.vars.SimpleVariableValue;
072 import org.hd.d.pg2k.svrCore.vars.SystemVariables;
073
074 import ORG.hd.d.IsDebug;
075
076 /**Exhibit pipeline stage that fetches its data across a master/slave tunnel.
077 * Some of the visible types and values are used by the implementations
078 * of both ends of the tunnel (eg the tunnel servlet)
079 * to help implement the shared protocol.
080 * <p>
081 * This takes data from the server's own DataSourceBean by default.
082 */
083 public abstract class ExhibitDataTunnelSource implements SimpleExhibitPipelineIF
084 {
085 /**Create an instance, passing in a (non-null) logger to use. */
086 public ExhibitDataTunnelSource(final SimpleLoggerIF logger)
087 {
088 if(logger == null) { throw new IllegalArgumentException(); }
089 this.logger = logger;
090 statsIDTS =
091 new StatsLogger.StatsConfig("TUNNELSOURCE",
092 logger, // Use servlet log if poss.
093 false, // Only dump summaries...
094 12 * 3600, // About every 12 hours.
095 true); // Adaptive.
096 }
097
098 /**Thrown when RPC fails because of connection throttling on upstream side.
099 * Such exceptions do not represent true RPC failures,
100 * so need not put off another RPC attempt.
101 */
102 public static class TunnelBusyIOException extends InterruptedIOException
103 {
104 /**Unique serialisation ID. */
105 private static final long serialVersionUID = 5573883733879369445L;
106 public TunnelBusyIOException() { super(); }
107 //public TunnelBusyIOException(final String message, final Throwable cause) { super(message, cause); }
108 public TunnelBusyIOException(final String message) { super(message); }
109 //public TunnelBusyIOException(final Throwable cause) { super(cause); }
110 }
111
112 /**If true, try GC and running finalizers after some connection failures.
113 * If we think that connection failure may have been caused by
114 * failure to release resources (elsewhere in the VM!)
115 * then try to force a clean-up from time to time.
116 * But, in case we are wrong, do not do this too often anyway.
117 */
118 private static final boolean FORCE_GC_AND_FINALISERS_AFTER_POLL_FAIL = true;
119
120 /**Minimum wait after failed tunnel call before retry (ms); strictly positive.
121 * A few tens of milliseconds to a second is probably best,
122 * to recover from transient I/O problems quickly,
123 * but without actually busy-waiting...
124 */
125 public static final int FAIL_RETRY_WAIT_MIN_MS = 101;
126
127 /**Maximum wait after failed tunnel call before retry (ms).
128 * Several minutes is probably good,
129 * to cope with a crashed master server (and/or to wait for it to reboot),
130 * or a failed connection,
131 * but too long and our distributed variables will fail needlessly.
132 * <p>
133 * Most requests during the wait will be vetoed cleanly and quickly
134 * allowing timely service to the slave's clients.
135 * <p>
136 * We vary this limit slightly between slaves/instances to help avoid them
137 * colliding with one another especially if hitting this upper bound.
138 */
139 public static final int FAIL_RETRY_WAIT_MAX_MS = 2 * 60 * 1000 +
140 (1 | Rnd.fastRnd.nextInt(1 * 60 * 1000));
141
142 /**If true, allow connection/resource sharing between calls if possible.
143 * We may also periodically check that we can still contact the master.
144 * <p>
145 * An open connection would probably improve TCP efficiency
146 * and reduce latency for an underlying HTTP[S] connection, for example.
147 * (We might allow HTTP/1.1 streaming and connection sharing to be
148 * done for us by the JDK runtime if this is true.)
149 */
150 protected static final boolean KEEP_SERVER_CONNECTION_ALIVE = true;
151
152 /**If true, extra instrumentation for protocol debug; definitely not for release code. */
153 protected static final boolean _protocolDebug = false /* && ORG.hd.d.IsDebug.isDebug */ ;
154
155 /**Our unique client-side end-point identifier.
156 * The ID goes into all outgoing variable updates (and possibly other data)
157 * going upstream to the master to uniquely identify this client.
158 * <p>
159 * This is unique to each tunnel client end-point instance
160 * and is created afresh on each run,
161 * so a rebooted client gets a new ID.
162 * <p>
163 * Uses secure/good generator to try to ensure global uniqueness.
164 * (This may be expensive to compute;
165 * we might be able to defer this until first needed.)
166 */
167 public final SimpleVariableValue uniqueClientID =
168 new SimpleVariableValue(SystemVariables.LOCAL_SYS_ID, InstanceID.createInstanceID());
169
170
171 /**Our logger which falls back to System.out if servlet log not available; never null. */
172 protected final SimpleLoggerIF logger;
173
174 /**The stats set to which we log general tunnel source stats.
175 * The unique codes are the constants TSNAME_XXX.
176 */
177 private final StatsLogger.StatsConfig statsIDTS;
178
179 /**General stats event name: RPC request. */
180 public static final String TSNAME_RPCREQUEST = "RPC-request";
181
182 /**General stats event name: RPC failure due to an IOException. */
183 public static final String TSNAME_RPCIOEX = "RPC-IOException";
184
185 /**General stats event name prefix: RPC request packet type. */
186 public static final String TSNAMEPR_RPCTYPE = "RPC-type-";
187
188 /**General stats event name: "short" raw data read, less than bulk-transfer block size.. */
189 public static final String TSNAME_SHORTREAD = "shortRead";
190
191 /**Convenience value; (immutable) zero-length byte array for use as an empty packet payload.
192 * Used by local classes and TunnelServlet; package visible.
193 */
194 public static final byte[] EMPTY_PAYLOAD = new byte[0];
195
196 /**Time when last successful connection was made, or 0 if no connection..
197 * Declared volatile so that it can be accessed without a lock.
198 * <p>
199 * Set by _noteResult() after a good exchange with the master;
200 * never cleared and never null.
201 */
202 private volatile long lastSuccessfulConnectionTime;
203
204 /**Get time of last successful connection, or 0 if none. */
205 public long getLastSuccessfulConnectionTime()
206 { return(lastSuccessfulConnectionTime); }
207
208 /**Time before which we should not again try to contact the master; null if master is fine.
209 * Declared volatile so that it can be accessed without a lock.
210 * <p>
211 * Cleared to null by _noteResult() after a good exchange with the master;
212 * the wait to another attempt is approximately doubled after a failed exchange.
213 */
214 private volatile Long doNotTryMasterUntil;
215
216 /**Returns true if we currently cannot talk to the master, false otherwise. */
217 public boolean isBroken()
218 {
219 final Long notUntil = doNotTryMasterUntil;
220 return((notUntil != null) &&
221 (notUntil.longValue() > System.currentTimeMillis()));
222 }
223
224 /**Successive failure count; reset to zero upon success.
225 * Private to _noteResult() and accessed under its private lock.
226 * <p>
227 * Cleared to zero on a successful packet exchange;
228 * incremented after each failure.
229 */
230 private int successiveFailureCount;
231
232 /**Private lock for _noteResult. */
233 private final Object _nR_lock = new Object();
234
235 /**Immutable empty result indicating no event values available. */
236 private static final EventVariableValue[] NO_EVENT_VALUES = new EventVariableValue[0];
237
238 /**Routine to note success or failure of RPC call and adjust control variables.
239 * Has its own private lock to ensure atomic update of control variables.
240 *
241 * @param success call with this true immediately after successful RPC exchange with master;
242 * call with false otherwise
243 */
244 private void _noteResult(final boolean success)
245 {
246 synchronized(_nR_lock)
247 {
248 final long now = System.currentTimeMillis();
249 if(success)
250 {
251 // Successful RPC exchange with master.
252 lastSuccessfulConnectionTime = now;
253 doNotTryMasterUntil = null;
254 successiveFailureCount = 0;
255 return;
256 }
257
258 // Failed RPC exchange with the master...
259 if(++successiveFailureCount < 0)
260 { successiveFailureCount = Integer.MAX_VALUE; }
261
262 // Compute first-cut back-off time.
263 // Double for each successive failure.
264 final long approxBackoff = ((long) ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MIN_MS) <<
265 Math.min(32, successiveFailureCount);
266
267 // Coerce to reasonable bounds...
268 final int waitBounded = (int) Math.min(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MAX_MS,
269 Math.max(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MIN_MS, approxBackoff));
270
271 // Roughly double the pause before the next connection attempt,
272 // with a random component to help avoid inter-slave collisions.
273 final long newWaitTime = Math.min(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MAX_MS,
274 waitBounded/2 + Rnd.fastRnd.nextInt(waitBounded));
275
276 // Store the new "do not try before" time...
277 doNotTryMasterUntil = new Long(now + newWaitTime);
278 }
279 }
280
281 /**Does a NO-OP on the server.
282 * If this returns without throwing an exception,
283 * then the connection to the back-end (master) server is probably OK.
284 *
285 * @param unguarded if true, this ignores any recent connection problems
286 * and immediately tries to contact the master,
287 * else it behaves like other operations and is quickly vetoed
288 * while the master is failing
289 *
290 * @throws java.io.IOException in case of trouble communicating with master.
291 */
292 public void doNOOP(final boolean unguarded)
293 throws IOException
294 {
295 final RawPacket response;
296 if(unguarded)
297 { response = doRPCUnguarded(FixedEmptyFrames.RAW_PACKET_NOOP); }
298 else
299 { response = doRPC(FixedEmptyFrames.RAW_PACKET_NOOP); }
300 if(response.getActualPayloadLength() != 0)
301 { throw new IOException("invalid non-empty response to NOOP request"); }
302 return;
303 }
304
305 /**Get the static attributes for a given exhibit.
306 * Returns null if the named exhibit does not exist.
307 * <p>
308 * Sends name as a UTF string and allows compression.
309 * (We use UTF since we know that the length is limited and
310 * the all-ASCII nature of valid names should yield
311 * one-byte-per-char encoding before compression.)
312 * <p>
313 * This will fail with a null name argument.
314 * <p>
315 * (Big secret: if this class is used behind, for example,
316 * ExhibitDataSimpleCache, this method will never be called
317 * because all such calls are answered direct from the cache.)
318 */
319 public ExhibitStaticAttr getStaticAttr(final ExhibitFull name)
320 throws IOException
321 {
322 final ByteArrayOutputStream baos =
323 new ByteArrayOutputStream(name.length());
324 final DataOutputStream dos = new DataOutputStream(baos);
325 dos.writeUTF(name.toString());
326 final byte data[] = baos.toByteArray();
327 dos.close(); // Attempt to free some resources quickly.
328
329 final RawPacket request = new RawPacket(
330 RawPacket.OpCode.GetStaticAttr,
331 data);
332
333 final RawPacket response = doRPC(request);
334 return((ExhibitStaticAttr) response.getSerializedObjectPayload());
335 }
336
337 /**Get a chunk of the raw exhibit binary.
338 * The start position must be non-negative.
339 * <p>
340 * Sends name as a UTF string and allows compression of the request.
341 * (We use UTF since we know that the length is limited and
342 * the all-ASCII nature of valid names should yield
343 * one-byte-per-char encoding before compression.)
344 * <p>
345 * We send the start and (computed) afterEnd arguments as int values.
346 * <p>
347 * We return the data as-is.
348 *
349 *
350 * @throws java.lang.IllegalArgumentException for blatantly invalid name,
351 * or non-positive length
352 */
353 public void getRawFile(final ByteBuffer buf,
354 final Name.ExhibitFull exhibitName,
355 final int position,
356 final boolean dontCache)
357 throws IOException
358 {
359 // We check that that the name is not blatantly illegal...
360 if(!ExhibitName.validNameSyntaxBasic(exhibitName))
361 { throw new IllegalArgumentException("invalid name"); }
362
363 // We don't allow zero-byte requests over the expensive tunnel.
364 int len = buf.limit() - buf.position();
365 if(len < 1)
366 { throw new IllegalArgumentException("request length must be positive"); }
367
368 // Limit any request to a size that the back-end would allow.
369 if(len > MAX_USER_READ_SIZE)
370 { len = MAX_USER_READ_SIZE; }
371
372 if(ExhibitDataTunnelSource._protocolDebug)
373 {
374 System.err.println("ExhibitDataTunnelSource.getRawFile(" +
375 "len="+len+", dontCache="+dontCache+")");
376 }
377
378 // Do the transfer...
379 final int afterEnd = position + len;
380
381 // final ByteArrayOutputStream baos =
382 // new ByteArrayOutputStream(11 + exhibitName.length());
383 // final DataOutputStream dos = new DataOutputStream(baos);
384 // dos.writeUTF(exhibitName.toString());
385 // dos.writeInt(position);
386 // dos.writeInt(afterEnd);
387 // dos.writeBoolean(dontCache);
388 // final byte data[] = baos.toByteArray();
389 // dos.close(); // Attempt to free some resources quickly.
390
391 // Optimised creation of outgoing packet.
392 final int nameLength = exhibitName.length();
393 final byte[] data = new byte[11 + nameLength];
394 data[0] = (byte) (nameLength >>> 8);
395 data[1] = (byte) nameLength;
396 exhibitName.writeToByteArray(data, 2);
397 data[2 + nameLength] = (byte) (position >>> 24);
398 data[3 + nameLength] = (byte) (position >>> 16);
399 data[4 + nameLength] = (byte) (position >>> 8);
400 data[5 + nameLength] = (byte) position;
401 data[6 + nameLength] = (byte) (afterEnd >>> 24);
402 data[7 + nameLength] = (byte) (afterEnd >>> 16);
403 data[8 + nameLength] = (byte) (afterEnd >>> 8);
404 data[9 + nameLength] = (byte) afterEnd;
405 data[10 + nameLength] = (byte) (dontCache ? 1 : 0);
406
407 final RawPacket request = new RawPacket(
408 RawPacket.OpCode.GetRawFile,
409 data);
410
411 final RawPacket response = doRPC(request);
412 response.getPayloadCopy(buf); // Straight into buffer for efficiency.
413 }
414
415 /**Gets all static exhibit data if its timestamp is not that specified.
416 * If the time specified is negative the object will be returned unconditionally.
417 * <p>
418 * If no exhibits are currently installed a default set with a zero
419 * timestamp is returned.
420 * <p>
421 * If the caller's copy appears to be up-to-date (eg the oldStamp
422 * matches that that we would have been returned) null is returned.
423 */
424 public AllExhibitImmutableData getAllExhibitImmutableData(final long oldStamp)
425 throws IOException
426 {
427 final RawPacket response = doRPC(
428 new RawPacket(RawPacket.OpCode.GetAllExhibitImmutableData, ExhibitDataTunnelSource.longSer(oldStamp)));
429 return((AllExhibitImmutableData) response.getSerializedObjectPayload());
430 }
431
432 /**Gets set of all exhibit properties if its hash is not that specified.
433 * If the hash specified is negative the object will be returned unconditionally.
434 * <p>
435 * If no exhibits are currently installed
436 * then a default set with a zero timestamp is returned.
437 * <p>
438 * If the caller's copy appears to be up-to-date
439 * (eg the oldHash matches that that would have been returned)
440 * then null is returned.
441 * <p>
442 * Because the AEP drives the entire Gallery,
443 * we always let the request go over the tunnel even if recent calls have failed.
444 * For example, we don't want mirrors to remain out of sync for too long.
445 * <p>
446 * This also typically takes a lot of bandwidth and CPU/wallclock time,
447 * so is optimised and stripped down to a minimum,
448 * and relies on the AEP deserialisation for serious error checking.
449 * <p>
450 * It is hoped that using the streamed form will overlap I/O and CPU work
451 * and thus get the new AEP into the client quicker and smoother.
452 * <p>
453 * We attempt to be good citizens and note most RPC successes/failures
454 * to help maintain the tunnel's view of the master's status.
455 * <p>
456 * (Because satisfying these calls may take a long time,
457 * the upstream server may veto concurrent queries that return anything
458 * other than null.)
459 * <p>
460 * As a further attempt to minimise time and bandwidth,
461 * we may try the "diff" version of this call,
462 * falling back to the normal version if this fails.
463 */
464 public AllExhibitProperties getAllExhibitProperties(final long oldHash)
465 throws IOException
466 {
467 final RawPacket packetOut = new RawPacket(RawPacket.OpCode.GetAllExhibitProperties, ExhibitDataTunnelSource.longSer(oldHash));
468
469 // The RPC may take a long time to compute the response to, so have a big read timeout.
470 final InputStream stream = doRPCRawWithStreamResponse(packetOut, true);
471 if(stream == null)
472 {
473 // An empty response means a null AEP response.
474 _noteResult(true); /* Note RPC success. */
475 return(null);
476 }
477
478 // Actually deserialise from the input stream.
479 final ObjectInputStream ois = new ObjectInputStream(stream);
480 // For robustness, treat a class mismatch problem like a data problem.
481 // We rely on the AllExhibitProperties deserialisation for most error checking.
482 try
483 {
484 final AllExhibitProperties result = (AllExhibitProperties) ois.readObject();
485 _noteResult(true); /* Note RPC success. */
486
487 // Warn if the remote end of the tunnel seems to have sent us a redundant value,
488 // ie sent us something we already have with the hash code that we specified.
489 if((result != null) && (result.longHash == oldHash))
490 {
491 final String message = "WARNING: tunnel fetched AEP with existing hash: " + oldHash;
492 System.err.println(message);
493 logger.log(message);
494 }
495
496 return(result);
497 }
498 catch(final ClassNotFoundException e) { throw new IOException(e.getMessage()); }
499 catch(final InterruptedIOException e)
500 {
501 // Not a failure: can retry later...
502 throw e; /* Rethrow error as-is. */
503 }
504 catch(final IOException e)
505 {
506 _noteResult(false); /* Note RPC failure. */
507 throw e; /* Rethrow error as-is. */
508 }
509 finally { ois.close(); }
510 }
511
512 /**Maximum level of compression in AEP "diff" RPC supported in this implementation, client or server side. */
513 public static final CompressionLevel MAX_AEP_DIFF_COMP_LEVEL = CompressionLevel.LZMA; // GenUtils.MAX_SUPPORTED_COMPRESSION_LEVEL;
514
515 /**Gets set of all exhibit properties if not that specified, attempting minimise data transferred across the tunnel.
516 * This is a tunnel-specific optimisation to minimise bandwidth.
517 * For some clients this will save money and time.
518 * <p>
519 * If the AEP specified is null then the remote AEP will be fetched and returned unconditionally.
520 * <p>
521 * If no exhibits are currently installed
522 * then a default set with a zero timestamp is returned.
523 * <p>
524 * If the caller's copy appears to be up-to-date
525 * (eg the oldHash matches that that would have been returned)
526 * then null is returned.
527 * <p>
528 * This is an attempted optimisation of the getAllExhibitProperties(long oldHash),
529 * returning a diff or extra-highly-compressed AEP representation.
530 * <p>
531 * This may not be supported at all by the remote end of the connection,
532 * or may fail for lack of resources, etc,
533 * so we can automatically fall back to the usual call in case of difficulty.
534 * <p>
535 * (We do not record failures as connection/tunnel problems,
536 * though we do note successful calls in favour of the connection status.)
537 *
538 * @param oldAEP current latest AEP held by the caller
539 * @param allowAutoRecovery if true then allow fallback to generic AEP fetch method
540 */
541 public AllExhibitProperties getAllExhibitProperties(final AllExhibitProperties oldAEP,
542 final boolean allowAutoRecovery)
543 throws IOException
544 {
545 final long oldHash = (oldAEP == null) ? -1 : oldAEP.longHash;
546
547 try
548 {
549 // Construct the header.
550 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
551 final DataOutputStream dos = new DataOutputStream(baos);
552 // Write existing AEP longHash, or -1 if no extant AEP.
553 dos.writeLong(oldHash);
554 // Write current maximum supported AEP compression level as a UTF String.
555 dos.writeUTF(MAX_AEP_DIFF_COMP_LEVEL.name());
556 dos.close();
557 final byte data[] = baos.toByteArray();
558 final RawPacket packetOut = new RawPacket(RawPacket.OpCode.GetAllExhibitPropertiesDiff, data);
559
560 // An empty response (zero bytes) means that our AEP is up-to-date...
561 // The RPC may take a long time to compute the response to, so have a big read timeout.
562 final InputStream stream = doRPCRawWithStreamResponse(packetOut, true);
563 if(null == stream)
564 {
565 // An empty response means a null AEP response.
566 _noteResult(true); /* Note RPC success. */
567 return(null);
568 }
569
570 // Else read from the stream.
571 // 1) A UTF string for the compression level/type (abort if unsupported).
572 // 2) A compressed object stream (in the appropriate format) which may be:
573 // 2a) AllExhibitProperties, in which case return as-is.
574 // 2b) AllExhibitPropertiesDelta, in which case apply to the input AEP and return.
575 try
576 {
577 final DataInputStream dis = new DataInputStream(stream);
578 final CompressionLevel cl = CompressionLevel.valueOf(dis.readUTF());
579 if(cl.getLevel() > MAX_AEP_DIFF_COMP_LEVEL.getLevel())
580 { throw new IOException("server returned data compressed at too high a level"); }
581 final InputStream is = GenUtils.wrapForDecompression(stream, cl);
582 final ObjectInputStream ois = new ObjectInputStream(is);
583 // Read in the object from the stream on the fly.
584 // TODO: we may want to add a data pump to keep data flowing during processing.
585 final Object o = ois.readObject();
586
587 // If the result is a full (and hopefully super-compressed) AEP
588 // then unpack it and return it here.
589 if(o instanceof AllExhibitProperties)
590 {
591 final AllExhibitProperties result = (AllExhibitProperties) o;
592 _noteResult(true); /* Note RPC success. */
593 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource: successful getAEP(AEP) full: "+cl+".]"); }
594
595 // Warn if the remote end of the tunnel seems to have sent us a redundant value,
596 // ie sent us something we already have with the hash code that we specified.
597 if((result != null) && (result.longHash == oldHash))
598 {
599 final String message = "WARNING: tunnel fetched AEP (supercompressed) with existing hash: " + oldHash;
600 System.err.println(message);
601 logger.log(message);
602 }
603
604 return(result);
605 }
606
607 // If the result is a diff then apply it to the input AEP.
608 if(o instanceof AllExhibitPropertiesDelta)
609 {
610 final AllExhibitPropertiesDelta delta = (AllExhibitPropertiesDelta) o;
611 final AllExhibitProperties result = AllExhibitPropertiesDelta.applyDiff(oldAEP, delta);
612 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource: successful getAEP(AEP) diff: #before/after="+delta.lengthAEIDBefore+"/"+delta.lengthAEIDAfter+", aep before/after="+oldAEP+"/"+result+", "+cl+".]"); }
613
614 // /* TEMPORARY SAFETY MEASURE: REJECT RESULT WITH FEWER EPL/EPC ENTRIES... */
615 // if(result.getExhibitPropsLoadableMap().size() < oldAEP.getExhibitPropsLoadableMap().size())
616 // { throw new IOException("WARNING: rejecting AEP diff result with fewer EPLs..."); }
617 // if(result.getExhibitPropsComputableMap().size() < oldAEP.getExhibitPropsComputableMap().size())
618 // { throw new IOException("WARNING: rejecting AEP diff result with fewer EPCs..."); }
619
620 _noteResult(true); /* Note RPC success. */
621
622 // Warn if the remote end of the tunnel seems to have sent us a redundant value,
623 // ie sent us something we already have with the hash code that we specified.
624 if((result != null) && (result.longHash == oldHash))
625 {
626 final String message = "WARNING: tunnel fetched AEP (via delta) with existing hash: " + oldHash;
627 System.err.println(message);
628 logger.log(message);
629 }
630
631 return(result);
632 }
633
634 throw new IOException("unexpected return type");
635 }
636 finally { stream.close(); /* Free (network) resources. */ }
637 }
638 catch(final InterruptedIOException e)
639 { throw e; } // Rethrow as-is: don't attempt recovery yet...
640 catch(final Exception e)
641 {
642 // Always log the problem/exception.
643 e.printStackTrace();
644
645 if(!allowAutoRecovery)
646 {
647 if(e instanceof IOException) { throw (IOException) e; }
648 final IOException err = new IOException("aborting after failed AEP diff RPC");
649 err.initCause(e);
650 throw err;
651 }
652
653 logger.log("ERROR: ExhibitDataTunnelSource: automatically recovering from AEP diff error: "+e.getMessage()+".");
654
655 // In case of error, automatically fall back to generic hash-arg call.
656 // (Iff allowing auto-recovery...)
657 return(getAllExhibitProperties(oldHash));
658 }
659 }
660
661 /**Helper method to serialise a single free-standing long value.
662 * The long value is serialised just as DataOutputStream would,
663 * as 8 bytes with the high-byte first.
664 */
665 public static byte[] longSer(final long value)
666 {
667 final byte[] result = new byte[8];
668 result[0] = (byte) (value >> 56);
669 result[1] = (byte) (value >> 48);
670 result[2] = (byte) (value >> 40);
671 result[3] = (byte) (value >> 32);
672 result[4] = (byte) (value >> 24);
673 result[5] = (byte) (value >> 16);
674 result[6] = (byte) (value >> 8);
675 result[7] = (byte) (value );
676 return(result);
677 }
678
679 /**Helper method to serialise a single free-standing int value.
680 * The int value is serialised just as DataOutputStream would,
681 * as 4 bytes with the high-byte first.
682 */
683 public static byte[] intSer(final int value)
684 {
685 final byte[] result = new byte[4];
686 result[0] = (byte) (value >> 24);
687 result[1] = (byte) (value >> 16);
688 result[2] = (byte) (value >> 8);
689 result[3] = (byte) (value );
690 return(result);
691 }
692
693 /**Gets the general properties as a GenProps object if its timestamp is not that specified.
694 * If the time specified is negative the object will be returned unconditionally.
695 * <p>
696 * If no props are currently installed/available a default set with a zero
697 * timestamp is returned.
698 * <p>
699 * If the caller's copy appears to be up-to-date (eg the oldStamp
700 * matches that that we would have been returned) null is returned.
701 * <p>
702 * On the wire this is (outbound) a 8-byte timestamp and
703 * by return should be an empty payload (corresponding to null)
704 * or a serialised GenProps object (uncompressed for now).
705 * <p>
706 * Because GenProps are so important to the functioning of the entire Gallery,
707 * we always let the request go over the tunnel even if recent calls have failed.
708 */
709 public org.hd.d.pg2k.svrCore.props.GenProps getGenProps(final long oldStamp)
710 throws IOException
711 {
712 //System.err.println("[Fetching GenProps over HTTP tunnel...]");
713
714 final RawPacket response = doRPCUnguarded(
715 new RawPacket(RawPacket.OpCode.GetGenProps, longSer(oldStamp)));
716
717 final GenProps result = (GenProps) response.getSerializedObjectPayload();
718 //System.err.println(" [Fetched GenProps over HTTP tunnel.]");
719 return(result);
720 }
721
722 /**Gets the security properties as a Properties object if its timestamp is not that specified.
723 * If the time specified is negative the object will be returned unconditionally.
724 * <p>
725 * If no props are currently installed/available a default set with a zero
726 * timestamp is returned.
727 * <p>
728 * If the caller's copy appears to be up-to-date (eg the oldStamp
729 * matches that that would have been returned) null is returned.
730 */
731 public java.util.Properties getGenSecProps(final long oldStamp)
732 throws IOException
733 {
734 //System.err.println("[Fetching GenSecProps over HTTP tunnel...]");
735
736 final RawPacket response = doRPC(
737 new RawPacket(RawPacket.OpCode.GetGenSecProps, longSer(oldStamp)));
738
739 final Properties result = (Properties) response.getSerializedObjectPayload();
740 //System.err.println(" [Fetched GenSecProps over HTTP tunnel.]");
741 return(result);
742 }
743
744 /**Gets the thumbnails for an exhibit.
745 * A data source is at liberty to refuse to compute thumbnails
746 * in which case it may return null, else it returns a
747 * non-null value which may include the `could-not-compute'
748 * value to indicate that a thumbnail/sample cannot be made
749 * for this exhibit and no attempt need be made in future.
750 * <p>
751 * Because it is important for a new mirror to gather thumbnails ASAP
752 * to show to a user, we make this call immune to being blocked
753 * when the tunnel connection appears poor.
754 *
755 * @param create if true, and no thumbnail yet exists, try to
756 * create one if possible, else only return an existing one
757 */
758 public ExhibitThumbnails getThumbnails(final ExhibitFull name, final boolean create)
759 throws IOException
760 {
761 //System.err.println("[Fetching Thumbnails over HTTP tunnel...]");
762
763 // final ByteArrayOutputStream baos =
764 // new ByteArrayOutputStream(name.length());
765 // final DataOutputStream dos = new DataOutputStream(baos);
766 // dos.writeUTF(name.toString());
767 // dos.writeBoolean(create);
768 // final byte data[] = baos.toByteArray();
769 // dos.close(); // Attempt to free some resources quickly.
770
771 final int nameLength = name.length();
772 final byte[] data = new byte[3 + nameLength];
773 data[0] = (byte) (nameLength >>> 8);
774 data[1] = (byte) nameLength;
775 name.writeToByteArray(data, 2);
776 data[2 + nameLength] = (byte) (create ? 1 : 0);
777
778 final RawPacket response = doRPCUnguarded(
779 new RawPacket(RawPacket.OpCode.GetThumbnails, data));
780
781 final ExhibitThumbnails result = (ExhibitThumbnails) response.getSerializedObjectPayload();
782 //System.err.println(" [Fetched Thumbnails over HTTP tunnel.]");
783 return(result);
784 }
785
786 /**Set variable.
787 * Only non-local values are propagated upstream;
788 * others are explicitly rejected with an UnsupportedOperationException.
789 *
790 * @throws java.io.IOException in case of I/O problems
791 * @throws java.lang.UnsupportedOperationException when attempting to set locals
792 */
793 public void setVariable(final SimpleVariableValue newValue)
794 throws IOException,
795 UnsupportedOperationException
796 {
797 if(newValue == null) { throw new IllegalArgumentException(); }
798 if(newValue.getDef().isLocal())
799 { throw new UnsupportedOperationException("cannot send local variables over the tunnel"); }
800 // Implement in terms of setVariables().
801 setVariables(new SimpleVariableValue[]{newValue});
802 }
803
804 /**Adjust globalMap for outgoing (upstream) set operation.
805 * This expects the value passed to be:
806 * <ul>
807 * <li>Non-null
808 * <li>Non-local
809 * <li>Read/write
810 * <li>To have an empty globalMap or one with exactly one entry
811 * whose key is the system ID for this system;
812 * if the map is empty then this system is put in the global map
813 * </ul>
814 * <p>
815 * If the argument is in the correct form, it is returned unchanged.
816 * <p>
817 * If the argument is illegal, ie not suitable to send upstream,
818 * then an exception is thrown to veto the send.
819 *
820 * @throws java.lang.IllegalArgumentException if an invalid argument is passed,
821 * eg unable to be converted to be sent upstream because it has
822 * an invalid globalMap
823 */
824 private SimpleVariableValue adjustGlobalMapForSet(final SimpleVariableValue svv)
825 {
826 if(svv == null)
827 { throw new IllegalArgumentException(); }
828
829 final SimpleVariableDefinition def = svv.getDef();
830 if(def.isLocal() ||
831 def.isReadOnly() ||
832 !SystemVariables.defs.contains(def))
833 { throw new IllegalArgumentException(); }
834
835 final Map<InstanceID,SimpleVariableValue> globalMap = svv.getGlobalMap();
836 if((globalMap != null) && (globalMap.size() > 1))
837 { throw new IllegalArgumentException(); }
838
839 // If no globalMap,
840 // then make sure that we make an explicit map entry for this system.
841 final InstanceID sysID = (InstanceID) uniqueClientID.getValue();
842 if(globalMap == null)
843 {
844 // We're done.
845 return(svv.put(sysID, svv, true));
846 }
847
848 if(!Collections.singleton(sysID).equals(globalMap.keySet()))
849 {
850 throw new IllegalArgumentException("globalMap contains ID that is not ours during set: " + sysID);
851 }
852
853 // Value is fine as it is, so return it untouched...
854 return(svv);
855 }
856
857 /**Set variables; must not be null or contain nulls.
858 * Only non-local variables are sent upstream;
859 * locals are silently discarded.
860 * <p>
861 * Variables not defined in SystemVariables are rejected.
862 * <p>
863 * Duplicates may be discarded or rejected,
864 * or sent as is in which case it is undefined in which order they
865 * are applied.
866 * <p>
867 * We try to set all values that we can for
868 * <p>
869 * Format on the wire:
870 * <ul>
871 * <li>Outbound: serialised SimpleVariableValue[]
872 * <li>Inbound: number of values set (int)
873 * </ul>
874 *
875 * @throws java.io.IOException in case of I/O problems
876 */
877 public int setVariables(final SimpleVariableValue[] newValues)
878 throws IOException,
879 UnsupportedOperationException
880 {
881 if(newValues == null) { throw new IllegalArgumentException(); }
882
883 // Make a set of all non-local (and writable) values to pass upstream.
884 // Preserve order...
885 final int nvLen = newValues.length;
886 final List<SimpleVariableValue> toGo = new ArrayList<SimpleVariableValue>(nvLen);
887 for(int i = 0; i < nvLen; ++i)
888 {
889 final SimpleVariableValue svv = newValues[i];
890
891 // Reject null values...
892 if(svv == null)
893 { throw new IllegalArgumentException(); }
894
895 final SimpleVariableDefinition def = svv.getDef();
896
897 // Skip local values;
898 // they should not be propagated out of the machine.
899 // Silently ignore this so that any globals can be sent.
900 if(def.isLocal())
901 { continue; }
902
903 // Adjust globalMap if necessary for sending upstream.
904 // Veto invalid values with an IllegalArgumentException.
905 toGo.add(adjustGlobalMapForSet(svv));
906 }
907
908 // If nothing to send, then return immediately.
909 if(toGo.size() == 0)
910 { return(0); }
911
912 // TODO: eliminate duplicates
913 // TODO: sort into name order to help compression (but preserving order for the same variable)
914
915 // Send set of values upstream
916 // by putting them into an array and serialising it.
917 final SimpleVariableValue r[] = new SimpleVariableValue[toGo.size()];
918 toGo.toArray(r);
919
920 // Make the RPC.
921 final RawPacket response = doRPC(new RawPacket(
922 RawPacket.OpCode.SetVariables,
923 r,
924 true)); // Compression may be useful...
925
926 // Extract return value...
927 final DataInputStream dis = new DataInputStream(response.getPayloadAsInputStream());
928 try
929 {
930 final int nSet = dis.readInt();
931 // Validate result returned.
932 if((nSet < 0) || (nSet > nvLen))
933 { throw new IOException("corrupt reply packet: nSet=" + Integer.toHexString(nSet)); }
934 return(nSet);
935 }
936 finally { dis.close(); }
937 }
938
939 /**Get variable, or returns null if no such non-local variable.
940 * We attempt only to fetch non-local variables; return null for others
941 * (except for the system ID).
942 * <p>
943 * When asked for the the system ID,
944 * we always answer the question locally.
945 * <p>
946 * Note that the local system ID can be retrieved whether or not
947 * the master server is responding.
948 * <p>
949 * Format on the wire is:
950 * <ul>
951 * <li>Outbound: variable name in UTF format
952 * <li>Inbound: serialised form of SimpleVariableValue (or empty for null)
953 * </ul>
954 *
955 * @throws java.io.IOException in case of I/O error
956 * @throws UnsupportedOperationException if a local variable
957 * other than the system ID is requested
958 */
959 public SimpleVariableValue getVariable(final SimpleVariableDefinition var)
960 throws IOException,
961 UnsupportedOperationException
962 {
963 if(var == null) { throw new IllegalArgumentException(); }
964
965 // If the request is for the system ID,
966 // return it here.
967 if(var.equals(SystemVariables.LOCAL_SYS_ID))
968 { return(uniqueClientID); }
969
970 // Always return null to requests for local data (other than system ID);
971 // local variables should not be propagated over the tunnel.
972 if(var.isLocal())
973 { throw new UnsupportedOperationException("local variables not available at tunnel source"); }
974
975 // Fetch individual variable value over tunnel...
976 // We only send the name,
977 // and get back a whole variable value,
978 // but on return check for a compatible definition,
979 // returning null if the definitions don't match.
980 final String name = var.getName();
981 final ByteArrayOutputStream baos =
982 new ByteArrayOutputStream(name.length());
983 final DataOutputStream dos = new DataOutputStream(baos);
984 dos.writeUTF(name);
985 final byte data[] = baos.toByteArray();
986 dos.close(); // Attempt to free some resources quickly.
987
988 final RawPacket request = new RawPacket(
989 RawPacket.OpCode.GetVariable,
990 data);
991
992 final RawPacket response = doRPC(request);
993 final SimpleVariableValue svv =
994 (SimpleVariableValue) response.getSerializedObjectPayload();
995
996 // If there was no variable,
997 // then return null immediately.
998 if(svv == null)
999 { return(null); }
1000
1001 // Discard incompatible types,
1002 // and local variables.
1003 if(!var.equals(svv.getDef()))
1004 { return(null); }
1005
1006 return(svv);
1007 }
1008
1009 /**Fetch variable values from the master.
1010 * We fetch values from the master
1011 * (from which we eliminate any illegal values such as local variables
1012 * or values which are not locally valid)
1013 * and insert our unique local system ID
1014 * if the request stamp is -1 or older than our creation.
1015 * <p>
1016 * Note that the local system ID can be retrieved whether or not
1017 * the master server is responding providing the stamp is -1,
1018 * though in that case it might be the only value returned.
1019 * The caller should periodically use -1 to ensure that they
1020 * have a full set of global variables, eg in case the server rebooted.
1021 * <p>
1022 * Format on the wire:
1023 * <ul>
1024 * <li>Outbound: serialised timestamp
1025 * <li>Inbound: SimpleVariableValue[]
1026 * </ul>
1027 *
1028 * @throws java.io.IOException
1029 */
1030 public SimpleVariableValue[] getVariables(final long changedSince)
1031 throws IOException
1032 {
1033 final List<SimpleVariableValue> incoming = new ArrayList<SimpleVariableValue>();
1034
1035 // Fetch all (global) variables over the tunnel.
1036 try
1037 {
1038 final RawPacket response = doRPC(
1039 new RawPacket(RawPacket.OpCode.GetVariables, longSer(changedSince)));
1040 // Copy incoming array to List...
1041 incoming.addAll(Arrays.asList(
1042 (SimpleVariableValue[]) response.getSerializedObjectPayload()));
1043 }
1044 catch(final IOException e)
1045 {
1046 // If changedSince == -1 we guarantee to return the local ID,
1047 // else we rethrow the exception...
1048 if(changedSince != -1) { throw e; }
1049 }
1050
1051 // Remove any extant system ID value,
1052 // and any local variable (should not have come up the wire),
1053 // and any value that does not have an identical local definition.
1054 // We do not expect to find/remove any of these in normal operation.
1055 final SimpleVariableDefinition uCIDDef = uniqueClientID.getDef();
1056 for(int i = incoming.size(); --i >= 0; )
1057 {
1058 final SimpleVariableValue svv =
1059 incoming.get(i);
1060 final SimpleVariableDefinition def = svv.getDef();
1061 if(def.equals(uCIDDef) ||
1062 def.isLocal() ||
1063 !SystemVariables.defs.contains(def) ||
1064 !def.equals(SystemVariables.nameToDef.get(def.getName())))
1065 {
1066 incoming.remove(i);
1067 continue;
1068 }
1069 }
1070
1071 // Insert the system ID variable value into the result if appropriate.
1072 if(changedSince <= uniqueClientID.getTimestamp()) // Handles -1 case...
1073 {
1074 incoming.add(uniqueClientID);
1075 }
1076
1077 final SimpleVariableValue result[] =
1078 new SimpleVariableValue[incoming.size()];
1079 incoming.toArray(result);
1080 return(result);
1081 }
1082
1083 /**Get the current partial, or previous full, event set at the specified interval; never null.
1084 * This is a simplified interface to return either the current event set
1085 * that is being collected, or the previous completed set.
1086 * <p>
1087 * The current set is the most timely, but may not contain enough data
1088 * to be meaningful if the new interval has just started.
1089 * <p>
1090 * The previous set is complete and thus most likely to have enough samples
1091 * to be useful, but is not completely current.
1092 * <p>
1093 * Implemented in terms of the more general call
1094 * in the hope that batched calls for several values will be
1095 * more common and more efficient.
1096 * This also reduces the number of distinct RPC calls that
1097 * we have to implement.
1098 *
1099 * @param def event definition (must be for an event); never null
1100 * @param intervalSelector one of EVENT_INTERVAL_SELECTOR_xxx values
1101 * @param current if true the current event set is returned,
1102 * else the previous complete set is returned
1103 *
1104 * @return requested event set; may be empty but never null if requested set not available
1105 */
1106 public EventVariableValue getEventValue(final SimpleVariableDefinition def,
1107 final EventPeriod intervalSelector,
1108 final boolean current)
1109 {
1110 if((def == null) || (intervalSelector == null))
1111 { throw new IllegalArgumentException(); }
1112
1113 final long now = System.currentTimeMillis();
1114 final long currentIntervalNumber = intervalSelector.getIntervalNumber(now);
1115 final long intervalNumber = (current ? currentIntervalNumber : (currentIntervalNumber-1));
1116
1117 final EventVariableValue evv[] = getEventValues(def,
1118 intervalSelector,
1119 intervalNumber,
1120 null);
1121
1122 // If we got some real (non-null) data back, then return it...
1123 if((evv.length > 0) && (evv[0] != null))
1124 { return(evv[0]); }
1125
1126 // Else get empty non-authoritative return value (with no events)...
1127 return(BasicVarMgr.getEmptyNonAuthEVV(def, intervalSelector, intervalNumber));
1128 }
1129
1130 /**Get the specified global event sets for the specified intervals; never null.
1131 * This allows retrieval of zero or more event sets for the specified
1132 * interval size.
1133 * <p>
1134 * Requests for more than SystemVariables.EVENT_SAMPLES_RETAINED in the
1135 * past (or for the future!) cannot be satisfied and data will not be
1136 * returned for them.
1137 * <p>
1138 * Usually not more than SystemVariables.EVENT_SAMPLES_RETAINED samples
1139 * will be returned in response to any one request as a safety measure.
1140 * <p>
1141 * We do various small optimisations to reduce fruitless traffic over
1142 * what may be a slow and/or expensive connection:
1143 * <ul>
1144 * <li>This will reject requests will huge sets that go back to far so as
1145 * to avoid huge RPC arguments and since the request is unlikely to be
1146 * satisfiable. The upstream implementation may be even more restrictive.
1147 * <li>This ignores requests too far in the future or the past.
1148 * <li>Trim leading empty slots from request sent over the tunnel.
1149 * </ul>
1150 * <p>
1151 * The responses from the server may be large and slow,
1152 * so we allow streaming to give better incremental CPU/resource consumption
1153 * and better throughput.
1154 * <p>
1155 * Returns an empty result if asked for a 'local' value.
1156 * <p>
1157 * Returns an empty result if the tunnel is currently broken
1158 * or in case of other non-permanent error.
1159 * <p>
1160 * TODO: Optimise RPC call by allowing for sparse requests with different request/response format.
1161 *
1162 * @param def event definition (must be for an event); never null
1163 * @param intervalSelector one of EVENT_INTERVAL_SELECTOR_xxx values
1164 * @param intervalNumber a time (as from System.currentTimeMillis())
1165 * which identifies the first interval for which data is potentially
1166 * required; if too far in the past or future then possibly no data
1167 * will be available,
1168 * zero is used to access the "all" bucket
1169 * @param whichValues each true bit represents a slot for which data is
1170 * required, bit 0 indicating data from the slot within which
1171 * firstIntervalTime is located, bit 1 the previous slot, etc;
1172 * null is treated as the common case equivalent to just bit 0 set
1173 *
1174 * @return as many of the requested values as available,
1175 * at least long enough to return all the available values,
1176 * with [0] corresponding to bit 0 in the BitSet;
1177 * may contain nulls or be zero-length (eg in case of error) but is never null
1178 */
1179 public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
1180 final EventPeriod intervalSelector,
1181 final long intervalNumber,
1182 BitSet whichValues)
1183 {
1184 if((def == null) ||
1185 (intervalSelector == null) ||
1186 (intervalNumber < 0))
1187 { throw new IllegalArgumentException(); }
1188
1189 if(def.isLocal())
1190 { return(NO_EVENT_VALUES); }
1191
1192 // Don't bother trying this if the connection is currently known to be broken.
1193 if(isBroken()) // { throw new PGMasterNotInServiceException(); }
1194 { return(NO_EVENT_VALUES); }
1195
1196 // See if we can optimise the whichValues argument by replacing it
1197 // with a null value...
1198 // This will reduce heap churn, data on the wire, etc.
1199 if((whichValues != null) && (whichValues.length() == 1) && whichValues.get(0))
1200 { whichValues = null; }
1201
1202 // Various optimisations are possible for a null BitSet arg.
1203 final boolean nullWV = (whichValues == null);
1204
1205 // Take copy of BitSet to ensure that it is minimal size,
1206 // and to reduce thread/race/safety issues.
1207 final BitSet wvCopy = nullWV ? null : new BitSet(whichValues.length());
1208 if(!nullWV) { wvCopy.or(whichValues); }
1209
1210 final int wvl = nullWV ? 1 : wvCopy.length();
1211
1212 // Avoid pointless call for no request bits.
1213 if(wvl < 1)
1214 { return(NO_EVENT_VALUES); }
1215
1216 // Avoid a huge request/response packet.
1217 if(wvl > 2*SystemVariables.EVENT_SAMPLES_RETAINED)
1218 { throw new IllegalArgumentException("request set too large to send over the wire"); }
1219
1220 // Avoid a call for values too far in the past to be likely to be available.
1221 // (This also ensures that the intervalNumber is large and positive for later.)
1222 // Although we allow a zero value for the "all" bucket.
1223 final long currentInterval = intervalSelector.getIntervalNumber(System.currentTimeMillis());
1224 if(intervalNumber == 0)
1225 {
1226 if(!nullWV && (!whichValues.get(0) || (wvl != 1)))
1227 { throw new IllegalArgumentException(); }
1228 }
1229 else if(intervalNumber < currentInterval - 2*SystemVariables.EVENT_SAMPLES_RETAINED - 1)
1230 { return(NO_EVENT_VALUES); }
1231
1232 // Avoid a call for values far too far in the future
1233 // for any skew to account for, etc, so we're sure no data is available.
1234 if(intervalNumber > currentInterval + 4*SystemVariables.EVENT_SAMPLES_RETAINED + 1)
1235 { return(NO_EVENT_VALUES); }
1236
1237 // Get the offset of the first item in the request...
1238 // If this is greater than zero
1239 // then we can in principle optimise the request/response
1240 // by omitting the leading empty slots.
1241 final int firstItemOffset = nullWV ? 0 : wvCopy.nextSetBit(0);
1242 assert(firstItemOffset >= 0);
1243
1244 // Reduce the BitSet that we send if the request can be trimmed...
1245 final BitSet wvToSend = (firstItemOffset == 0) ? wvCopy :
1246 (wvCopy.get(firstItemOffset, wvl));
1247
1248 // Request is:
1249 // * A byte consisting of the ordinal of period/interval enum ordinal.
1250 // * The UTF-8 representation of the name of the definition.
1251 // * The interval number (8 bytes).
1252 // * The serialised form of the request BitSet if non-null.
1253 // Result is:
1254 // * The in-order array of EventVariableValues (never null).
1255 final String name = def.getName();
1256 final ByteArrayOutputStream baos = new ByteArrayOutputStream(32 + name.length() + wvl/2);
1257 final DataOutputStream dos = new DataOutputStream(baos);
1258 final byte data[];
1259 try
1260 {
1261 dos.write(intervalSelector.ordinal());
1262 dos.writeUTF(name);
1263 dos.writeLong(intervalNumber - firstItemOffset);
1264 if(wvToSend != null)
1265 {
1266 final ObjectOutputStream oos = new ObjectOutputStream(dos);
1267 oos.writeObject(wvToSend);
1268 oos.flush();
1269 }
1270 dos.flush();
1271 data = baos.toByteArray();
1272 dos.close(); // Attempt to free some resources quickly.
1273 }
1274 catch(final Exception e) { throw new Error(e); /* Should never happen. */ }
1275
1276 final RawPacket request = new RawPacket(
1277 RawPacket.OpCode.GetEventValues,
1278 data);
1279
1280 // Stream the result, since it may be large enough to be on the wire for a while.
1281 final EventVariableValue[] evvsRaw;
1282 try
1283 {
1284 final InputStream stream = doRPCRawWithStreamResponse(request, false);
1285 if(stream == null)
1286 { throw new IOException("unexpected null return value"); }
1287 try
1288 {
1289 final ObjectInputStream ois = new ObjectInputStream(stream);
1290 evvsRaw = (EventVariableValue[]) ois.readObject();
1291 ois.close();
1292 }
1293 finally { stream.close(); /* Ensure underlying resources are released. */ }
1294
1295 // Check for unexpected return values.
1296 if(evvsRaw == null)
1297 { throw new IOException("unexpected null return value"); }
1298
1299 // If we offset the request then make a copy set back.
1300 final EventVariableValue[] evvs;
1301 if(firstItemOffset != 0)
1302 {
1303 evvs = new EventVariableValue[evvsRaw.length + firstItemOffset];
1304 System.arraycopy(evvsRaw, 0, evvs, firstItemOffset, evvsRaw.length);
1305 }
1306 else
1307 {
1308 // Did not offset the request...
1309 evvs = evvsRaw;
1310 }
1311
1312 // Check for unexpected/incorrect return values.
1313 for(int i = evvs.length; --i >= 0; )
1314 {
1315 final EventVariableValue evv = evvs[i];
1316 if(evv == null) { continue; }
1317 if(!def.equals(evv.getDef()))
1318 { throw new IOException("unexpected return value with wrong def"); }
1319 if(!intervalSelector.equals(evv.getPeriod()))
1320 { throw new IOException("unexpected return value with wrong period"); }
1321 if(evv.getIntervalNumber() != intervalNumber - i)
1322 { throw new IOException("unexpected return value with wrong interval number"); }
1323 }
1324
1325 _noteResult(true); // Seems to have succeeded.
1326 return(evvs);
1327 }
1328 // Deal with non-permanent errors as empty result value.
1329 // catch(final IOException e) { _noteResult(false); throw e; }
1330 catch(final Exception e) { _noteResult(false); return(NO_EVENT_VALUES); }
1331 }
1332
1333 /**Synchronise variables with upstream values.
1334 * Pushes updated values upstream to the source,
1335 * calls sync on the source if called with the "force" argument true,
1336 * and then retrieves changed values from upstream.
1337 * <p>
1338 * When called with force==true, this acts like a full "memory barrier",
1339 * flushing all write-cached items downstream immediately and afterwards
1340 * getting the value of all upstream values with getVariables(-1),
1341 * but may be expensive in terms of CPU or bandwidth, so use sparingly.
1342 * <p>
1343 * When called with force=false, this returns immediately
1344 * and the operation is not propagated across the tunnel.
1345 * <p>
1346 * In any case, it is rarely the right thing for a casual user
1347 * to vall this as it may be very expensive.
1348 *
1349 * @param force if true, this will force a full write flush,
1350 * a full sync upstream,
1351 * then full read with getVariables(-1),
1352 * to get the effect of a full "barrier";
1353 * otherwise, do nothing
1354 *
1355 * @throws IOException if one is received from upstream
1356 */
1357 public void syncVariables(final boolean force)
1358 throws IOException
1359 {
1360 // Do not propagate across tunnel unless "force"d.
1361 if(!force)
1362 { return; }
1363
1364 // Propagate the "force"d sync upstream,
1365 // and return when done...
1366 doRPC(FixedEmptyFrames.RAW_PACKET_SV);
1367 }
1368
1369
1370 /**Get requested Properties selected by key and versionID.
1371 * Fetches a Properties set unconditionally (versionID == -1)
1372 * else if the versionID presented is not current.
1373 *
1374 * @param key selector (with possible embedded sub-key)
1375 * for desired properties set; never null
1376 * @param versionID if -1 then map is always returned if available,
1377 * else must be non-negative and null is returned if the versionID
1378 * presented matches that of the current version
1379 * (ie if the caller has presumably got the up-to-date version);
1380 * may be a timestamp or a hash or other value,
1381 * and by convention is zero only for an empty properties set
1382 *
1383 * @return null, or Properties map guaranteed to contain only
1384 * String keys and values
1385 */
1386 public java.util.Properties getProperties(final PropsKey key,
1387 final long versionID)
1388 throws IOException
1389 {
1390 throw new IOException("NOT IMPLEMENTED");
1391 }
1392
1393 /**Returns the (incremented) upstream stratum adjusted to include transit time; never null.
1394 * Returns Stratum.UNKNOWN if the upstream stratum is unknown
1395 * or the upstream instance is already at the maximum stratum.
1396 */
1397 public Stratum getStratum()
1398 throws IOException
1399 {
1400 final long before = System.currentTimeMillis();
1401 final RawPacket response = doRPCUnguarded(FixedEmptyFrames.RAW_PACKET_GETSTRATUM);
1402 final long after = System.currentTimeMillis();
1403
1404 // Get raw result from upstream host.
1405 final Stratum upstream = (Stratum) response.getSerializedObjectPayload();
1406 assert(null != upstream);
1407
1408 // If 'unknown' upstream stratum then return built-in 'unknown' value.
1409 if(upstream.isUnknownStratum()) { return(Stratum.UNKNOWN); }
1410
1411 // If upstream is already at maximum stratum then this instance must be regarded as unknown stratum.
1412 if(upstream.getStratum() >= Stratum.MAX_STRATUM) { return(Stratum.UNKNOWN); }
1413
1414 // Sanitised round-trip time.
1415 final int rtt = (int) Math.min(2*(int)Stratum.MAX_ROOT_DELAY, Math.max(0, after - before));
1416
1417 // Create a new value for this instance with an incremented stratum
1418 // and one half the RTT to our immediate upstream peer added to the root delay.
1419 final Stratum processed = new Stratum(1 + upstream.getStratum(),
1420 (rtt>>1) + upstream.getRootDelay(),
1421 _getStratumUpstreamName(),
1422 upstream.isUpstreamConserving());
1423
1424 return(processed);
1425 }
1426
1427 /**Return short unique name of upstream peer/server suitable for Stratum; never null but can be "" for 'unknown'.
1428 * Override this in implementations that know the upstream name.
1429 */
1430 protected String _getStratumUpstreamName()
1431 {
1432 return(""); // Unknown upstream peer/server name.
1433 }
1434
1435 /**Poll periodically.
1436 * We can use this to attempt to repair ailing connections, etc,
1437 * as well as poll for asynchronous messages being sent to us by the master.
1438 * <p>
1439 * However, our general policy is not to force traffic if we need not.
1440 * <p>
1441 * This can be overridden by a derived class,
1442 * though it is suggested that this be called with super.poll(gp) if so.
1443 */
1444 public void poll(final GenProps gp)
1445 {
1446 if(KEEP_SERVER_CONNECTION_ALIVE)
1447 {
1448 // If we have a broken connection to the master,
1449 // then a little while before other users can retry it,
1450 // we retry it to see if the master is reachable again.
1451 //
1452 // This enables us to repair a connection silently,
1453 // so that a normal caller is never blocked until
1454 // it is repaired (other calls are vetoed quickly).
1455 //
1456 // Our calls are still backed off if the master is really unwell.
1457 //
1458 // This does not try to force an initial connection,
1459 // just repair a failing one.
1460 final Long notUntil = doNotTryMasterUntil;
1461 if((notUntil != null) &&
1462 (notUntil.longValue() - _REPAIR_RETRY_LEAD_MS < System.currentTimeMillis()))
1463 {
1464 try { doNOOP(true); }
1465 catch(final Exception e)
1466 {
1467 // It is possible that our connection problem is caused
1468 // by a resource leak (eg of unclosed descriptors) elsewhere
1469 // so try and force come cleanup now.
1470 //
1471 // We rely on our normal pacing/backoff of retries
1472 // to avoid us hurting the system too much if we are wrong!
1473 if(FORCE_GC_AND_FINALISERS_AFTER_POLL_FAIL)
1474 {
1475 logger.log("[ExhibitDataTunnelSource: WARNING: failing to re-establish connection so attempting to free resources with gc(), etc...]");
1476 System.runFinalization();
1477 System.gc();
1478 System.runFinalization();
1479 }
1480 }
1481 }
1482 }
1483 }
1484
1485 /**May attempt to free up outbound connections and/or prevent new ones. */
1486 public void destroy()
1487 {
1488 // Prevent most new outgoing connections.
1489 doNotTryMasterUntil = new Long(Long.MAX_VALUE);
1490 }
1491
1492
1493 /**How soon ahead of other callers we try to silently repair a broken connection, ms; non-negative. */
1494 private final int _REPAIR_RETRY_LEAD_MS = 17001 + Rnd.fastRnd.nextInt(3007);
1495
1496
1497 /**Make an RPC call over the underlying medium with the given outgoing packet; never null.
1498 * This collects the response packet and will object
1499 * if it sees any IOException
1500 * or if the packets come back with the wrong op code.
1501 * <p>
1502 * This can be locked on the instance lock to serialise RPCs,
1503 * if the tunnel can only usefully handle one call at once.
1504 * <p>
1505 * Must be implemented by the deriving class
1506 * to suit its transmission medium.
1507 *
1508 * @param packetOut request packet; never null
1509 * @return response packet; never null
1510 * @throws java.io.IOException in case of I/O difficulties
1511 */
1512 protected abstract RawPacket doRPCRaw(final RawPacket packetOut)
1513 throws IOException;
1514
1515 /**Optimised RPC call with the given outgoing packet and returning packet body as an InputStream; null if an empty stream.
1516 * This is an <em>optimisation</em> for performance-critical cases <em>only</em>,
1517 * and foregoes some error checking/handling for speed
1518 * (and thus the caller should ensure that it performs integrity checks).
1519 * There is an onus on the streaming-related code to behave safely
1520 * even if fed bogus/corrupt data,
1521 * and it must be able to safely/easily undo any work done
1522 * if the message is found to be bogus as late as the final byte(s).
1523 * <p>
1524 * This streams the content of the response packet
1525 * and will object if it sees any IOException
1526 * or if the packets come back with the wrong op code.
1527 * <p>
1528 * A terminating trailer byte may or may not be visible on the returned stream
1529 * thus allowing the implementation to be as efficient as possible.
1530 * <p>
1531 * This may return after all the input data has been collected,
1532 * or while some or all is still to come,
1533 * and thus the returned stream may fail and throw an exception.
1534 * <p>
1535 * The first element of the result is the length of the response data
1536 * (not including any non-data trailer bytes from the packet even if present)
1537 * but may be null if this length is not known when the packet header is seen,
1538 * eg because the packet body was compressed.
1539 * <p>
1540 * This may be implemented/overridden by the deriving class
1541 * to suit its transmission medium,
1542 * and as an optimisation to reduce copying and allow streaming,
1543 * ie starting to process the input before it is all received.
1544 * <p>
1545 * The data stream is always of uncompressed data,
1546 * regardless of whether the data was sent compressed on the wire,
1547 * ie this routine will correctly decompress data on the fly as/when needed.
1548 * <p>
1549 * The caller <strong>must close</strong> the stream promptly
1550 * to release resources such as file handles and non-Java memory.
1551 *
1552 * @param packetOut request packet; never null
1553 * @param allowBigReadTimeout TODO
1554 * @return response length and data stream; never null
1555 * @throws java.io.IOException in case of I/O difficulties
1556 */
1557 protected InputStream doRPCRawWithStreamResponse(final RawPacket packetOut, final boolean allowBigReadTimeout)
1558 throws IOException
1559 {
1560 // Simple implementation in terms of basic doRPCRaw().
1561 final RawPacket result = doRPCRaw(packetOut);
1562
1563 // Check for mismatched response packets
1564 // or remote exceptions codes with special response op-codes.
1565 if(result.opCode != packetOut.opCode)
1566 {
1567 // Deal specially with some fixed remote exception types.
1568 switch(result.opCode)
1569 {
1570 case PGMNISEX:
1571 { throw new PGMasterNotInServiceException(); }
1572 case RUNTEX:
1573 { throw new RuntimeException("remote threw RuntimeException"); }
1574 case REMEX:
1575 { throw new IOException("remote threw RemoteException"); }
1576 case INTEX:
1577 { throw new InterruptedIOException("remote operation interrupted"); }
1578 }
1579 throw new IOException("packet response-type mismatch; got "+result.opCode+" expected "+packetOut.opCode.getCode());
1580 }
1581
1582 if(0 == result.getActualPayloadLength()) { return(null); }
1583 return(result.getPayloadAsInputStream());
1584 }
1585
1586
1587 /**Make an RPC call over HTTP[S] with the given outgoing packet.
1588 * This provides error recovery, back-off, stats, etc,
1589 * and serves as a wrapper for the medium-specific doRPCRaw().
1590 * <p>
1591 * Called by all the public data-pipeline methods to transport information
1592 * over the tunnel.
1593 *
1594 * @param packetOut request packet; never null
1595 * @return response packet; never null
1596 * @throws java.io.IOException in case of I/O difficulties
1597 */
1598 protected RawPacket doRPC(final RawPacket packetOut)
1599 throws IOException
1600 {
1601 // Note RPC request...
1602 StatsLogger.captureDataPoint(statsIDTS, TSNAME_RPCREQUEST);
1603 // Note its type...
1604 StatsLogger.captureDataPoint(statsIDTS, TSNAMEPR_RPCTYPE + ((int) packetOut.opCode.getCode()));
1605
1606 // If last communication with master failed too recently,
1607 // quickly veto a new connection attempt for now.
1608 if(isBroken())
1609 {
1610 if(_protocolDebug) { System.err.println("[doRPC(): vetoing connection to broken master.]"); }
1611 throw new PGMasterNotInServiceException("no connection to master");
1612 }
1613
1614 return(doRPCUnguarded(packetOut));
1615 }
1616
1617 /**Just like doRPC() but does not back off in face of previous failures; never null.
1618 * Can be used for calls that must attempt to contact the master anyway,
1619 * or are probing for it to be alive.
1620 * <p>
1621 * Most callers should use the normal doRPC().
1622 *
1623 * @param packetOut never null
1624 * @return response to RPC call; never null
1625 */
1626 protected RawPacket doRPCUnguarded(final RawPacket packetOut)
1627 throws IOException
1628 {
1629 if(_protocolDebug) { System.err.println("[doRPC(): connecting to master.]"); }
1630
1631 try {
1632 if(_protocolDebug) { System.err.println(" [doRPC(): about to do raw RPC.]"); }
1633
1634 // Do the raw, medium-specific RPC.
1635 final RawPacket result = doRPCRaw(packetOut);
1636
1637 if(_protocolDebug) { System.err.println(" [doRPC(): about to check result type.]"); }
1638
1639 // Check for mismatched response packets
1640 // or remote exceptions codes with special response op-codes.
1641 if(result.opCode != packetOut.opCode)
1642 {
1643 // Deal specially with some fixed remote exception types.
1644 // Note that we don't count a OP__RUNTEX as a success OR failure
1645 // for the purposes of link monitoring;
1646 // this may be the master gently rebuffing a request for some reasons.
1647 switch(result.opCode)
1648 {
1649 case PGMNISEX:
1650 { throw new PGMasterNotInServiceException(); }
1651 case RUNTEX:
1652 { throw new RuntimeException("remote threw RuntimeException"); }
1653 case REMEX:
1654 { throw new IOException("remote threw RemoteException"); }
1655 case INTEX:
1656 { throw new InterruptedIOException("remote operation interrupted"); }
1657 }
1658 throw new IOException("packet response-type mismatch; got "+result.opCode+" expected "+packetOut.opCode);
1659 }
1660
1661 // Note last successful use.
1662 _noteResult(true);
1663
1664 if(_protocolDebug) { System.err.println(" [doRPC(): about to return result.]"); }
1665
1666 // Return result successfully.
1667 return(result);
1668 }
1669 catch(final TunnelBusyIOException e)
1670 {
1671 // Not an error (nor a success); voluntary rejection of excess connection on our side.
1672 logger.log("ExhibitDataTunnelSource: deferred RPC connection attempt upstream, packet: " + packetOut + ", reason: " + e.getMessage());
1673 // Rethrow the error that we encountered...
1674 throw e;
1675 }
1676 catch(final InterruptedIOException e)
1677 {
1678 // Not an error (nor a success): can retry later.
1679 throw e;
1680 }
1681 catch(final IOException e)
1682 {
1683 // Regard (only) IOException as indicator or some sort of connection problem.
1684
1685 // Note RPC request failure with IOException...
1686 StatsLogger.captureDataPoint(statsIDTS, TSNAME_RPCIOEX);
1687
1688 // Note failed connection to the master...
1689 // This will defer our next attempt...
1690 _noteResult(false);
1691
1692 // Note failure in the logs:
1693 logger.log("ExhibitDataTunnelSource: RPC FAILED connection attempt upstream, failed packet: " + packetOut + ", reason: " + e.getMessage());
1694 final long lastS = lastSuccessfulConnectionTime;
1695 if(lastS > 0)
1696 { logger.log("[doRPC(): last success at "+(new Date(lastS))+".]"); }
1697 final Long notUntil2 = doNotTryMasterUntil;
1698 if(notUntil2 != null)
1699 { logger.log("[doRPC(): deferring next attempt until " + (new Date(notUntil2.longValue())) + ".]"); }
1700
1701 // Rethrow the error that we encountered...
1702 throw e;
1703 }
1704 }
1705
1706 /**Minimum time (ms) for which we will hold latest newly-created AEP response; strictly positive.
1707 * A client that automatically retries after an interrupted IO response
1708 * to some sort of AEP request indicating 'already in progress'
1709 * should retry within this time to avoid starting its creation all over again.
1710 * <p>
1711 * Note that in dire straits this lower limit may be ignored.
1712 * <p>
1713 * Should typically be of the order of many minutes.
1714 */
1715 public static final int MIN_AEP_RETENTION_MS = 17 * 60 * 1000;
1716
1717 /**Cache/lock to improve performance of inbound RPC.
1718 * Meant to be opaque to all but handleInboundRPC()
1719 * and is used to provide a better overall responsiveness,
1720 * especially from expensive calls
1721 * by vetoing concurrent expensive calls
1722 * and by cacheing the responses from especially expensive calls.
1723 * <p>
1724 * Not all possibilities are exploited,
1725 * just those that in practice seem to be important.
1726 * <p>
1727 * Can be registered to free its own content automagically
1728 * if the system is very short of memory.
1729 */
1730 public static final class HIRPCCache implements MemoryTools.RecurrentEmergencyFreeHandle
1731 {
1732 /**Lock to prevent servicing more than one very expensive call at once.
1733 * We do this to reduce strain on the memory/GC and CPU of the server.
1734 */
1735 private final ReentrantLock slowResponseLock = new ReentrantLock();
1736
1737 /**Time of creation / last use of basic AEP response; initially zero.
1738 * This covers the GZIPped / extra-compressed / delta responses for the latest AEP,
1739 * but no previous AEP values which are not essential to minimal client progress.
1740 */
1741 private volatile long currentAEPResponseCreatedOrLastUsed;
1742
1743 /**Cache of current AEP as pair of longHash and response packet for simple AEP fetch RPC; mutable.
1744 * Initially null.
1745 * <p>
1746 * Elements of this pair are never null.
1747 * <p>
1748 * Marked volatile for safe multi-threaded access.
1749 * <p>
1750 * Currently held for at least for a minimum period,
1751 * until the AEP fetched has a new hash,
1752 * not for example held via a SoftReference
1753 * since a SoftReference would probably be cleared too soon
1754 * for this to be effective.
1755 * <p>
1756 * This is likely to be many MB in size.
1757 */
1758 private volatile Tuple.Pair<Long, RawPacket> _AEP_response;
1759
1760 /**Cache of full extra-compressed AEP as tuple of longHash, compression format, and response packet for diff AEP fetch RPC; mutable.
1761 * Initially null.
1762 * <p>
1763 * Elements of this tuple are never null.
1764 * <p>
1765 * Marked volatile for safe multi-threaded access.
1766 * <p>
1767 * Currently held for at least for a minimum period,
1768 * until the AEP fetched has a new hash,
1769 * not for example held via a SoftReference
1770 * since a SoftReference would probably be cleared too soon
1771 * for this to be effective.
1772 * <p>
1773 * This is likely to be many MB in size.
1774 */
1775 private volatile Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> _AEP_extracomp_response;
1776
1777 /**Cache of extra-compressed AEP diff as tuple of longHash, compression format, and response packet for diff AEP fetch RPC; mutable.
1778 * Initially null.
1779 * <p>
1780 * Elements of this tuple are never null.
1781 * <p>
1782 * Marked volatile for safe multi-threaded access.
1783 * <p>
1784 * Currently held for at least for a minimum period,
1785 * until the AEP fetched has a new hash,
1786 * not for example held via a SoftReference
1787 * since a SoftReference would probably be cleared too soon
1788 * for this to be effective.
1789 * <p>
1790 * This is likely relatively small.
1791 */
1792 private volatile Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> _AEP_diff_response;
1793
1794 /**Cache of previous AEP value to assist with AEP diffs; initially null.
1795 * Currently held indefinitely (or until very short of free memory),
1796 * until the AEP fetched has a new hash,
1797 * not for example held via a SoftReference
1798 * since a SoftReference would probably be cleared too soon
1799 * for this to be effective.
1800 * <p>
1801 * This should generally share most of its state with the current AEP
1802 * and therefore should not represent a significant extra memory burden.
1803 */
1804 private volatile AllExhibitProperties aepPrev;
1805
1806 /**Soft cache of older AEP values to assist with AEP diffs; never null.
1807 * This is present to help with requests for diffs against AEPs
1808 * older than the previous value, if any such requests are made.
1809 * <p>
1810 * Thread-safe cache of SoftReferences to older AEPs,
1811 * mapped from the Long hash values.
1812 * <p>
1813 * Map is not auto-clear()ed on memory stress
1814 * other than the individual SoftReferences themselves during GC.
1815 * <p>
1816 * It is possible to synchronise on this instance to exclude other activity.
1817 */
1818 private final SoftReferenceMap<Long, AllExhibitProperties> prevAEPs =
1819 SoftReferenceMap.<Long, AllExhibitProperties>create("prevAEPs");
1820
1821 /**Can be called to purge most or all internal state when very short of memory.
1822 * Implements the emergency-free handler.
1823 * <p>
1824 * May not purge some elements which are not (or likely not) using significant memory
1825 * but usually save a lot of time for us and for clients.
1826 * To that end we try and estimate the space they are taking
1827 * vs actual current free heap space.
1828 */
1829 public void run()
1830 {
1831 // Clear strongly-referenced previous AEP value if very short of free memory.
1832 // Clearing this will prevent us from creating and sending space-efficient AEP deltas.
1833 if((null != aepPrev) && (MemoryTools.percentFreeWithinTarget() <= 5))
1834 {
1835 aepPrev = null;
1836 System.err.println("HIRPCCache: emergency free of previous AEP value");
1837 }
1838
1839 // Clear the list of previous AEPs if very short of free memory.
1840 // Note that the softly-referenced content will be released automatically in dire straits.
1841 // We only forcefully clear this if no longer retaining the previous AEP strongly any more.
1842 if(!prevAEPs.isEmpty() && (null == aepPrev))
1843 {
1844 prevAEPs.clear();
1845 System.err.println("HIRPCCache: emergency free of old AEP entries: "+prevAEPs.size());
1846 }
1847
1848 // If any basic of the AEP responses is only very recently created or used
1849 // then prevent them from being freed to avoid client starvation.
1850 final long lastUsedBAR = currentAEPResponseCreatedOrLastUsed;
1851 if(lastUsedBAR + MIN_AEP_RETENTION_MS > System.currentTimeMillis())
1852 { return; }
1853
1854 final long freeMem = Runtime.getRuntime().freeMemory();
1855
1856 // This GZIP-compressed response is a bit bulky and shouldn't usually be needed
1857 // unless we are too short of memory to super-compress a response
1858 // and/or a client doesn't have enough memory to receive a super-compressed response.
1859 // We hold onto it grimly for a minimum amount of time
1860 // as being forced to continually regenerate it asynchronously causes starvation
1861 // since this is required for the minimum level of response to support clients.
1862 Tuple.Pair<Long, RawPacket> r = _AEP_response;
1863 if((null != r) && (null != r.second) &&
1864 (r.second.getFrameLength() >= (freeMem>>1)))
1865 {
1866 _AEP_response = null;
1867 System.err.println("HIRPCCache: emergency free of AEP response "+r.second+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1868 }
1869 r = null; // Help GC.
1870
1871 // This extra-compressed response is smaller and saves network bandwidth and is thus more valuable
1872 // and so we'd like to hang onto it if possible.
1873 Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> ecr = _AEP_extracomp_response;
1874 if((null != ecr) && (null != ecr.second) && (null != ecr.second.second) &&
1875 (ecr.second.second.getFrameLength() >= (freeMem>>1)))
1876 {
1877 _AEP_extracomp_response = null;
1878 System.err.println("HIRPCCache: emergency free of super-compressed AEP response "+ecr.second.second+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1879 }
1880 ecr = null; // Help GC.
1881
1882 // This extra-compressed diff response is the smallest and most valuable and most likely to be needed
1883 // and so we'd like to hang onto it if at all possible.
1884 // Though holding onto the delta is potentially more problematic.
1885 Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> dr = _AEP_diff_response;
1886 if((null != dr) && (null != dr.third) &&
1887 (dr.third.getFrameLength() >= (freeMem>>1)))
1888 {
1889 _AEP_diff_response = null;
1890 System.err.println("HIRPCCache: emergency free of super-compressed AEP diff response "+dr.third+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1891 }
1892 dr = null; // Help GC.
1893 }
1894 }
1895
1896 /**Allows lazy instantiation of some constant frame values. */
1897 private static final class FixedEmptyFrames
1898 {
1899 /**Constant empty NOOP return frame; non-null. */
1900 static final RawPacket RAW_PACKET_NOOP = new RawPacket(
1901 RawPacket.OpCode.NOOP);
1902
1903 /**Constant empty/null GetGenProps return frame; non-null. */
1904 static final RawPacket RAW_PACKET_GP_NULL = new RawPacket(
1905 RawPacket.OpCode.GetGenProps);
1906
1907 /**Constant empty/null GetAllExhibitPropertiesDiff return frame; non-null. */
1908 static final ExhibitDataTunnelSource.RawPacket RAW_PACKET_GAEPD_NULL = new RawPacket(
1909 RawPacket.OpCode.GetAllExhibitPropertiesDiff);
1910
1911 /**Constant empty/null GetGenSecProps return frame; non-null. */
1912 static final ExhibitDataTunnelSource.RawPacket RAW_PACKET_GSP_NULL = new RawPacket(
1913 RawPacket.OpCode.GetGenSecProps);
1914
1915 /**Constant empty/null SyncVariables return/request frame; non-null. */
1916 static final ExhibitDataTunnelSource.RawPacket RAW_PACKET_SV = new RawPacket(
1917 RawPacket.OpCode.SyncVariables);
1918
1919 /**Constant empty/null SyncVariables request frame; non-null. */
1920 static final ExhibitDataTunnelSource.RawPacket RAW_PACKET_GETSTRATUM = new RawPacket(
1921 RawPacket.OpCode.GetStratum);
1922 }
1923
1924 /**Handles input request packet from slave across a tunnel.
1925 * Generates the response packet or throws an exception.
1926 *
1927 * @param reqPacket the request packet; never null
1928 * @param clientAddr the tunnel client's address as seen by us;
1929 * may be null if not applicable or available
1930 * @param cache if not null, then selected routines with
1931 * slow/expensive responses will not be serviced concurrently
1932 * but concurrent calls will be vetoed quickly instead
1933 * and expensive-to-compute values may be (partially) cached
1934 * @param logger logging area for warnings, etc; never null.
1935 *
1936 * @return the response packet; never null
1937 *
1938 * @throws java.io.IOException in case of difficulty upstream
1939 * @throws PGMasterNotInServiceException if the upstream data source
1940 * is down/unavailable
1941 */
1942 public static RawPacket handleInboundRPC(final SimpleExhibitPipelineIF source,
1943 final RawPacket reqPacket,
1944 final String clientAddr,
1945 final HIRPCCache cache,
1946 final SimpleLoggerIF logger)
1947 throws IOException,
1948 PGMasterNotInServiceException
1949 {
1950 if((source == null) || (reqPacket == null))
1951 { throw new IllegalArgumentException(); }
1952
1953 switch(reqPacket.opCode)
1954 {
1955 // For NO-OP we expect an empty data section
1956 // (we simply ignore it even if not empty)
1957 // and reply with an empty data section.
1958 case NOOP:
1959 {
1960 // Return a fixed (empty) response packet.
1961 // We don't attempt to compress this.
1962 return(FixedEmptyFrames.RAW_PACKET_NOOP);
1963 }
1964
1965 case GetGenProps:
1966 { return(handleGetGenProps(source, reqPacket)); }
1967
1968 case GetGenSecProps:
1969 { return(handleGetGenSecProps(source, reqPacket)); }
1970
1971 case GetAllExhibitImmutableData:
1972 { return(handleGetAllExhibitImmutableData(source, reqPacket, cache, logger)); }
1973
1974 case GetAllExhibitProperties:
1975 { return(handleGetAllExhibitProperties(source, reqPacket, clientAddr, cache, logger)); }
1976
1977 case GetAllExhibitPropertiesDiff:
1978 { return(handleGetAllExhibitPropertiesDiff(source, reqPacket, clientAddr, cache, logger)); }
1979
1980 case GetThumbnails:
1981 { return(handleGetThumbnails(source, reqPacket)); }
1982
1983 case GetStaticAttr:
1984 { return(handleGetStaticAttr(source, reqPacket)); }
1985
1986 case GetRawFile:
1987 { return(handleGetRawFile(source, reqPacket)); }
1988
1989 case GetVariable:
1990 { return(handleGetVariable(source, reqPacket)); }
1991
1992 case SetVariables:
1993 { return(handleSetVariables(source, reqPacket, clientAddr, logger)); }
1994
1995 case GetVariables:
1996 { return(handleGetVariables(source, reqPacket)); }
1997
1998 case SyncVariables:
1999 { return(handleSyncVariables(source, reqPacket)); }
2000
2001 case GetEventValues:
2002 { return(handleGetEventValues(source, reqPacket)); }
2003
2004 case GetStratum:
2005 { return(handleGetStratum(source, reqPacket)); }
2006
2007 // For unrecognised requests,
2008 // pretend to throw a RuntimeException.
2009 default:
2010 {
2011 final String message = "unrecognised opcode " +reqPacket.opCode+ " in request from tunnel user " + clientAddr;
2012 logger.log(message);
2013 System.err.println(message);
2014 return(new RawPacket(
2015 RawPacket.OpCode.RUNTEX,
2016 EMPTY_PAYLOAD,
2017 false));
2018 }
2019 }
2020 }
2021
2022
2023 /**Handle an incoming GetGenSecProps request; never null. */
2024 private static RawPacket handleGetGenSecProps(
2025 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2026 throws IOException
2027 {
2028 final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2029 readLong();
2030 final Properties gsp = source.getGenSecProps(stamp);
2031 // Create and send the response packet
2032 // (empty for null response,
2033 // else serialised form, compressed when helpful).
2034 if(null == gsp) { return(FixedEmptyFrames.RAW_PACKET_GSP_NULL); }
2035 return(new RawPacket(
2036 RawPacket.OpCode.GetGenSecProps,
2037 gsp,
2038 true)); // Attempt compression...
2039 }
2040
2041 /**Handle an incoming SyncVariables request; never null. */
2042 private static RawPacket handleSyncVariables(
2043 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2044 throws IOException
2045 {
2046 // No args, no return value.
2047 // The tunnel call is only made when "forced".
2048 source.syncVariables(true);
2049 return(FixedEmptyFrames.RAW_PACKET_SV);
2050 }
2051
2052 /**Handle an incoming GetAllExhibitPropertiesDiff request; never null.
2053 * Holds the 'slow response' lock while running
2054 * to exclude other such intensive/slow responses concurrently.
2055 */
2056 private static RawPacket handleGetAllExhibitPropertiesDiff(
2057 final SimpleExhibitPipelineIF source,
2058 final RawPacket reqPacket,
2059 final String clientAddr,
2060 final HIRPCCache cache,
2061 final SimpleLoggerIF logger)
2062 throws IOException, InterruptedIOException
2063 {
2064 // Prime a new empty cache with an extant non-empty AEP
2065 // so that we are likely to be able to service
2066 // the first responses after an AEP change with deltas.
2067 if(cache != null)
2068 {
2069 if((cache.aepPrev == null) || (cache.aepPrev.aeid.length == 0))
2070 {
2071 final AllExhibitProperties first = source.getAllExhibitProperties(-1);
2072 if((null != first) && (first.aeid.length != 0))
2073 {
2074 cache.aepPrev = first;
2075 cache.prevAEPs.put(first.longHash, first);
2076 }
2077 }
2078 }
2079
2080 final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2081 final long oldHash = dis.readLong();
2082 // We read the compression level, but need not parse it yet.
2083 final String compression = dis.readUTF();
2084 AllExhibitProperties _aep =
2085 source.getAllExhibitProperties(oldHash);
2086 logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitPropertiesDiff: client="+clientAddr+" hash="+oldHash+" compression="+compression+" response null/hash="+((null==_aep)?"null":String.valueOf(_aep.longHash)));
2087
2088 // Correct for expensive aberrant behaviour further up the stack.
2089 // Whine loudly...
2090 if((null != _aep) && (oldHash == _aep.longHash))
2091 {
2092 final String message = "ERROR: ExhibitDataTunnelSource.handleInboundRPC(): non-null return with identical hash "+oldHash+ " from source "+source;
2093 logger.log(message);
2094 System.err.println(message);
2095 _aep = null; // Pretend that no AEP was in fact returned...
2096 }
2097
2098 // Simple case; nothing to return since the caller is up to date.
2099 if(_aep == null) { return(FixedEmptyFrames.RAW_PACKET_GAEPD_NULL); }
2100
2101 final AllExhibitProperties aepFromUpstream = _aep;
2102
2103 // Full implementation is too expensive sans cache.
2104 if(cache == null)
2105 { throw new IOException("operation not fully implemented due to resource constraints"); }
2106
2107 // Decode the client's maximum supported compression level.
2108 // This will throw an IllegalArgumentException if unparseable.
2109 final CompressionLevel cl = CompressionLevel.valueOf(compression);
2110
2111 // If we already have a suitable response cached
2112 // ie for the very same AEP diff with the same old/new hashes.
2113 // then return it immediately.
2114 // We assume therefore that the entire packet will
2115 // be the same for the same response data
2116 // regardless of client, etc.
2117 //
2118 // (This does mean that the caller may miss some
2119 // internally cached updates within the AEP instance, etc,
2120 // but nothing that it cannot recompute if needed.)
2121 //
2122 // Try first for a cached diff/delta.
2123 final Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> cachedDiff = cache._AEP_diff_response;
2124 if((oldHash != -1) &&
2125 (cachedDiff != null) &&
2126 (aepFromUpstream.longHash == cachedDiff.first.longHashAEPAfter) &&
2127 (oldHash == cachedDiff.first.longHashAEPBefore))
2128 {
2129 // If result is compressed at level higher than caller can handle
2130 // then we abort.
2131 // (We could chose to try for the full AEP rather than abort now,
2132 // but it's unlikely to work if this didn't.)
2133 if(cachedDiff.second.getLevel() > cl.getLevel())
2134 { throw new IOException("compression level too high for client"); }
2135
2136 // Good, can use cached diff packet...
2137 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP (diff) delta response packet: "+cachedDiff.third+" to client "+clientAddr+".]");
2138 return(cachedDiff.third);
2139 }
2140
2141 // If the response is other than null
2142 // and we reject concurrent expensive calls,
2143 // then veto it if another such call is already in progress.
2144 // We allow a very limited wait for the lock.
2145 try
2146 {
2147 if(!cache.slowResponseLock.tryLock(CoreConsts.MAX_INTERACTIVE_DELAY_MS, TimeUnit.MILLISECONDS))
2148 {
2149 logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitPropertiesDiff: already in progress: aborting...");
2150 throw new InterruptedIOException("expensive call already in progress, please retry");
2151 }
2152 }
2153 catch(final InterruptedException e)
2154 {
2155 final InterruptedIOException err = new InterruptedIOException(e.getMessage());
2156 err.initCause(e);
2157 Thread.currentThread().interrupt(); // Don't lose "interrupt" status.
2158 throw err;
2159 }
2160
2161 // Create and send the response packet
2162 // (empty for null response,
2163 // else serialised form, full or delta/diff, compressed when helpful).
2164 try
2165 {
2166 // Note whatever was the previous AEP that we had sequestered,
2167 // partly to stop it being GCed until we finish handling this RPC.
2168 final AllExhibitProperties oldAepPrev;
2169 // If the AEP has changed then cache it for deltas, etc.
2170 if((null == (oldAepPrev = cache.aepPrev)) || (aepFromUpstream.longHash != oldAepPrev.longHash))
2171 {
2172 cache.aepPrev = aepFromUpstream;
2173 cache.prevAEPs.put(aepFromUpstream.longHash, aepFromUpstream);
2174 }
2175
2176 // See if we have the correct cached extra-compressed full AEP.
2177 RawPacket cachedFullAEPResponse = null; // Will null out as soon as known not useful to aid GC.
2178 final Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> cachedFull; // Minimise scope to aid GC.
2179 if(((cachedFull = cache._AEP_extracomp_response) != null) && (aepFromUpstream.longHash == cachedFull.first.longValue()))
2180 { cachedFullAEPResponse = cachedFull.second.second; }
2181
2182 if(oldHash != -1)
2183 {
2184 // Caller has supplied their current hash...
2185 // Look for caller's AEP cached here for us to diff against.
2186 // (Zap any dead cache entries that we find in passing.)
2187 AllExhibitProperties callerAEP = null;
2188 final Long hashKey = Long.valueOf(oldHash);
2189 final AllExhibitProperties caep = cache.prevAEPs.get(hashKey);
2190 if(caep != null)
2191 {
2192 assert(caep.longHash == oldHash);
2193 // Good, we have the same AEP that the caller does!
2194 callerAEP = caep;
2195 }
2196
2197 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): old AEP cache size:"+cache.prevAEPs.size()+", caep="+callerAEP+".]"); }
2198 // Do not hold lock on cache.prevAEPs
2199 // while trying to create and return diff/delta.
2200 if(callerAEP != null)
2201 {
2202 // Computing a diff (if possible).
2203 // This generally should not take an enormous amount of working memory.
2204 try
2205 {
2206 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): creating AEP diff, caep="+callerAEP+".]"); }
2207 final long diffStartTime = System.currentTimeMillis();
2208 // We can do a diff/delta!
2209 final AllExhibitPropertiesDelta diff =
2210 AllExhibitPropertiesDelta.createDiff(callerAEP, aepFromUpstream, false);
2211 final long diffEndTime = System.currentTimeMillis();
2212
2213 // Prepare the output packet...
2214 final CompressionLevel levelUsed = MAX_AEP_DIFF_COMP_LEVEL;
2215 ByteArrayOutputStream boas = new ByteArrayOutputStream(1 << 10);
2216 final DataOutputStream dos = new DataOutputStream(boas);
2217 dos.writeUTF(levelUsed.name());
2218 final ObjectOutputStream oos = new ObjectOutputStream(GenUtils.wrapForCompression(dos, levelUsed));
2219 oos.writeObject(diff);
2220 oos.close();
2221 final long compEndTime = System.currentTimeMillis();
2222 RawPacket response = new RawPacket(
2223 RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2224 boas.toByteArray(),
2225 false); // Already compressed.
2226 boas = null; // Help GC.
2227 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed AEP diff ("+diff+") response packet "+response+", caep="+callerAEP+".]"); }
2228
2229 // Abort if we have a full (non-delta) response
2230 // and the delta seems larger than the full response.
2231 if((null != cachedFullAEPResponse) &&
2232 (cachedFullAEPResponse.getActualPayloadLength() <= response.getActualPayloadLength()))
2233 { throw new IOException("AEP delta larger than full response"); }
2234
2235 // Check that correct client AEP is held.
2236 assert(callerAEP.longHash == oldHash);
2237 // Check that we can construct the correct new AEP.
2238 assert(aepFromUpstream.equals(AllExhibitPropertiesDelta.applyDiff(callerAEP, diff)));
2239
2240 // We intern() the (probably large) response
2241 // to try to avoid holding any duplicates
2242 // and to reduce old-generation heap churn.
2243 response = MemoryTools.intern(response);
2244 cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2245 cache._AEP_diff_response = new Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel,RawPacket>(diff, levelUsed, response);
2246 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed and cached AEP (diff) delta response packet: "+response+"; times: delta/comp="+(diffEndTime-diffStartTime)+"ms/"+(compEndTime-diffEndTime)+"ms; full: "+cachedFullAEPResponse+"; last full extra compressed response: "+cachedFull+".]"); }
2247
2248 // If the result is compressed at a level higher than the caller can handle
2249 // then we must abort.
2250 // The client will have to retry with the non-diff AEP call.
2251 if(levelUsed.getLevel() > cl.getLevel())
2252 { throw new IOException("compression level too high for client"); }
2253
2254 // Return it...
2255 return(response);
2256 }
2257 catch(final DiffException e)
2258 {
2259 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): could not compute AEP (diff) delta: "+e.getMessage()+".]"); }
2260 /* Fall through to send full AEP. */
2261 }
2262 catch(final Exception e)
2263 {
2264 logger.log("ERROR: unexpected exception creating AEP diff: " + e.getMessage());
2265 /* Fall through to send full AEP. */
2266 }
2267 }
2268 }
2269
2270 // CANNOT COMPUTE A DIFF...
2271 // So try to return/compute/cache a full response.
2272 if(cachedFullAEPResponse != null)
2273 {
2274 // If result is compressed at level higher than caller can handle
2275 // then we must abort.
2276 if(cachedFull.second.first.getLevel() > cl.getLevel())
2277 { throw new IOException("compression level too high for client"); }
2278
2279 // Good, can use cached packet...
2280 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP (diff) response packet: "+cachedFull.second+" to client "+clientAddr+".]");
2281 return(cachedFull.second.second);
2282 }
2283
2284 // Compute a new full AEP response packet...
2285 // Super-compressed full AEP; null if not (yet) available/computed.
2286 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computing super-compressed AEP response packet: (longHash="+aepFromUpstream.longHash+") for client "+clientAddr+".]");
2287
2288 // Discard known-defunct old value possibly allowing GC.
2289 cache._AEP_extracomp_response = null;
2290 cachedFullAEPResponse = null; // Help GC.
2291
2292 final CompressionLevel levelUsed = MAX_AEP_DIFF_COMP_LEVEL;
2293 // If result would be compressed at level higher than caller can handle
2294 // then we must abort.
2295 if(levelUsed.getLevel() > cl.getLevel())
2296 { throw new IOException("compression level too high for client"); }
2297
2298 // Computing a full super-compressed AEP may be very memory intensive.
2299 // We do not assume that unlimited resources are available.
2300 // We estimate the working memory required as a fixed overhead
2301 // plus about twice the estimated uncompressed size of the serialised data
2302 // which allows for compression dictionaries plus the actual output size.
2303 final AtomicReference<RawPacket> responseAR = new AtomicReference<RawPacket>();
2304 if(!MemoryTools.runMemoryIntensiveOperation(new Runnable(){
2305 public void run()
2306 {
2307 // Prepare the output packet...
2308 final ByteArrayOutputStream boas = new ByteArrayOutputStream(1 << 10);
2309 final DataOutputStream dos = new DataOutputStream(boas);
2310 try
2311 {
2312 dos.writeUTF(levelUsed.name());
2313 final ObjectOutputStream oos = new ObjectOutputStream(GenUtils.wrapForCompression(dos, levelUsed));
2314 oos.writeObject(aepFromUpstream);
2315 oos.close();
2316 }
2317 catch(final IOException e) { throw new Error("unexpected IOException when building super-compressed AEP", e); }
2318 // We intern() the (probably very large) response
2319 // to try to avoid holding duplicates
2320 // and to reduce old-generation heap churn.
2321 responseAR.set(MemoryTools.intern(new RawPacket(
2322 RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2323 boas.toByteArray(),
2324 false))); // Already compressed.
2325 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed super-compressed AEP response packet: (length="+boas.size()+", compression="+levelUsed+") for client "+clientAddr+".]");
2326 }
2327 },
2328 false, // Definitely not unlimited resources on the server!
2329 (1<<20) /* 1MB overhead */ + 2*aepFromUpstream.estimateSerialBytes()))
2330 { throw new IOException("not currently enough memory to attempt to build super-compressed AEP"); }
2331
2332 final RawPacket response = responseAR.get();
2333 if(null == response) { throw new IOException("failed to build super-compressed AEP"); }
2334 final Tuple.Pair<CompressionLevel, RawPacket> newFull = new Tuple.Pair<CompressionLevel,RawPacket>(levelUsed, response);
2335 cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2336 cache._AEP_extracomp_response = new Tuple.Pair<Long, Tuple.Pair<CompressionLevel,RawPacket>>(new Long(aepFromUpstream.longHash), newFull);
2337
2338 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed and cached supecompressed AEP (length="+levelUsed+") response packet: "+newFull+".]"); }
2339
2340 // // If result is compressed at level higher than caller can handle
2341 // // then we must abort.
2342 // if(tooCompressedForClient)
2343 // { throw new IOException("compression level too high for client"); }
2344
2345 // Return it...
2346 return(response);
2347 }
2348
2349 finally { cache.slowResponseLock.unlock(); }
2350 }
2351
2352 /**Handle an incoming GetAllExhibitProperties request; never null.
2353 * Holds the 'slow response' lock while running
2354 * to exclude other such intensive/slow responses concurrently.
2355 */
2356 private static RawPacket handleGetAllExhibitProperties(
2357 final SimpleExhibitPipelineIF source,
2358 final RawPacket reqPacket,
2359 final String clientAddr,
2360 final HIRPCCache cache,
2361 final SimpleLoggerIF logger)
2362 throws IOException
2363 {
2364 final long hash = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2365 readLong();
2366 final AllExhibitProperties aep = source.getAllExhibitProperties(hash);
2367
2368 // Simple case; nothing to return since caller is up to date,
2369 // or we allow concurrent expensive calls.
2370 if(aep == null)
2371 {
2372 // Trivial null case.
2373 return(new RawPacket(RawPacket.OpCode.GetAllExhibitProperties));
2374 }
2375 if(cache == null)
2376 {
2377 try
2378 {
2379 // Where the AEP is not null, force compressed format,
2380 // stream-serialised to minimise peak/intermediate memory footprint.
2381 return(MemoryTools.runMemoryIntensiveOperation((new Callable<RawPacket>(){
2382 public RawPacket call() throws IOException
2383 {
2384 return(RawPacket.streamSerialiseObject(
2385 RawPacket.OpCode.GetAllExhibitProperties,
2386 aep));
2387 }
2388 }),
2389 aep.estimateSerialBytes())); // Take a guess as to memory space required.
2390 }
2391 catch(final IOException e) { throw e; }
2392 catch(final RuntimeErrorException e) { throw e; }
2393 catch(final Exception e) { throw new Error("unexpected exception", e); }
2394 }
2395
2396
2397 // If we already have a suitable response cached
2398 // ie for the very same AEP logHash
2399 // then return it immediately.
2400 // We assume therefore that the entire packet will
2401 // be the same for the same response data
2402 // regardless of client, etc.
2403 //
2404 // (This does mean that the caller may miss some
2405 // internally cached updates within the AEP instance, etc,
2406 // but nothing that it cannot recompute if needed.)
2407 final Tuple.Pair<Long, RawPacket> cached; // Don't retain value preventing GC...
2408 if(((cached = cache._AEP_response) != null) && (aep.longHash == cached.first.longValue()))
2409 {
2410 // Good, can use cached packet...
2411 assert(cached.second != null);
2412 cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2413 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP response packet: "+cached.second+" to client "+clientAddr+".]");
2414 return(cached.second);
2415 }
2416
2417 // If the response is other than null
2418 // and we reject concurrent expensive calls,
2419 // then veto it if another such call is already in progress.
2420 if(!cache.slowResponseLock.tryLock())
2421 {
2422 logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitProperties: already in progress: aborting...");
2423 throw new IOException("expensive call already in progress, please retry");
2424 }
2425 // Create and send the response packet
2426 // (empty for null response,
2427 // else serialised form, compressed when helpful).
2428 try
2429 {
2430 // Clear defunct value while computing the new one.
2431 cache._AEP_response = null;
2432
2433 if(aep != null) { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): creating/cacheing/returning packet-compressed AEP response packet (longHash="+aep.longHash+") to client "+clientAddr+".]"); }
2434 // Compute the response packet...
2435 // Where the AEP is not null, force compressed format,
2436 // stream-serialised to minimise peak/intermediate memory footprint.
2437 RawPacket response;
2438 try
2439 {
2440 // Where the AEP is not null, force compressed format,
2441 // stream-serialised to minimise peak/intermediate memory footprint.
2442 response = MemoryTools.runMemoryIntensiveOperation((new Callable<RawPacket>(){
2443 public RawPacket call() throws IOException
2444 {
2445 return(RawPacket.streamSerialiseObject(
2446 RawPacket.OpCode.GetAllExhibitProperties,
2447 aep));
2448 }
2449 }),
2450 aep.estimateSerialBytes()); // Take a guess as to memory space required.
2451 }
2452 catch(final IOException e) { throw e; }
2453 catch(final RuntimeErrorException e) { throw e; }
2454 catch(final Exception e) { throw new Error("unexpected exception", e); }
2455
2456
2457 // Cache it if the AEP is not null.
2458 if(aep != null)
2459 {
2460 // We intern() the (probably very large) response
2461 // to try to avoid holding duplicates
2462 // and to reduce old-generation heap churn.
2463 response = MemoryTools.intern(response);
2464 cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2465 cache._AEP_response = new Tuple.Pair<Long,RawPacket>(new Long(aep.longHash), response);
2466 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): created/cached/returning packet-compressed AEP response packet (longHash="+aep.longHash+", actual payload length="+response.getActualPayloadLength()+") to client "+clientAddr+".]");
2467 }
2468 // Return it...
2469 return(response);
2470 }
2471 finally { cache.slowResponseLock.unlock(); }
2472 }
2473
2474 /**Handle an incoming GetGenProps request; never null. */
2475 private static RawPacket handleGetGenProps(
2476 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2477 throws IOException
2478 {
2479 final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2480 readLong();
2481 final GenProps gp = source.getGenProps(stamp);
2482 // Create and send the response packet
2483 // (fixed empty result for efficient null response,
2484 // else serialised form, compressed when helpful).
2485 if(null == gp) { return(FixedEmptyFrames.RAW_PACKET_GP_NULL);}
2486 // Non-null value to return.
2487 return(new RawPacket(
2488 RawPacket.OpCode.GetGenProps,
2489 gp,
2490 true)); // Attempt compression...
2491 }
2492
2493 /**Handle an incoming GetStaticAttr request; never null. */
2494 private static RawPacket handleGetStaticAttr(
2495 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2496 throws IOException
2497 {
2498 final String name = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2499 readUTF();
2500 final ExhibitStaticAttr esa;
2501 try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2502 catch(final IllegalArgumentException e)
2503 { throw new IOException("invalid argument: bad exhibit name"); }
2504 if(esa == null)
2505 { throw new IOException("invalid argument: non-existent exhibit"); }
2506 // Create and send the response packet
2507 // (empty for null response,
2508 // else serialised form, compressed when helpful).
2509 return(new RawPacket(
2510 RawPacket.OpCode.GetStaticAttr,
2511 esa,
2512 true)); // Allow compression.
2513 }
2514
2515 /**Handle an incoming GetEventValues request; never null.
2516 * Request is:
2517 * <ul>
2518 * <li>A byte consisting of the ordinal of period/interval enum val.
2519 * <li>The UTF-8 representation of the name of the definition.
2520 * <li>The interval number (8 bytes).
2521 * <li>The serialised form of the request BitSet.
2522 * </ul>
2523 * <p>
2524 * Result is:
2525 * <ul>
2526 * <li>The in-order array of EventVariableValues (never null).
2527 * </ul>
2528 */
2529 private static RawPacket handleGetEventValues(
2530 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2531 throws IOException
2532 {
2533 final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2534 final int ordinal = dis.readUnsignedByte();
2535 final EventPeriod epv[] = EventPeriod.values();
2536 if(ordinal >= epv.length)
2537 { throw new IOException("corrupt arguments: bad period ordinal"); }
2538 final EventPeriod intervalSelector = epv[ordinal];
2539 final String name = dis.readUTF();
2540 final SimpleVariableDefinition def = SystemVariables.nameToDef.get(name);
2541 // Deal kindly with possibly-new definition
2542 // (ie if there are newer clients than us)
2543 // by just returning an empty result.
2544 if(def == null)
2545 {
2546 return(new RawPacket(
2547 RawPacket.OpCode.GetEventValues,
2548 NO_EVENT_VALUES,
2549 false));
2550 }
2551 if(!def.isEvent() || def.isLocal())
2552 { throw new IOException("corrupt arguments: bad definition type: "+def); }
2553 final long intervalNumber = dis.readLong();
2554 if(intervalNumber < 0)
2555 { throw new IOException("corrupt arguments: bad intervalNumber"); }
2556
2557 // If there is trailing data then it is the serialised BitSet
2558 // else this is a null BitSet equivalent to just bit-0 true.
2559 final BitSet whichValues;
2560 if(dis.available() != 0)
2561 {
2562 final ObjectInputStream ois = new ObjectInputStream(dis);
2563 final Object o; // = null;
2564 try { o = ois.readObject(); }
2565 catch(final ClassNotFoundException e)
2566 { throw new IOException("corrupt arguments: class not found: " + e.getMessage()); }
2567 if((o != null) && !(o instanceof BitSet)) // Allow null.
2568 { throw new IOException("corrupt arguments: bad whichValues class on decode"); }
2569 whichValues = (BitSet) o;
2570 }
2571 else
2572 { whichValues = null; }
2573 assert(dis.available() == 0) : "There must be no excess/trailing data in getEventValues() RPC";
2574
2575 // Call upstream to actually get any values that we have...
2576 final EventVariableValue result[] =
2577 source.getEventValues(def, intervalSelector, intervalNumber, whichValues);
2578
2579 // Return the serialised result array.
2580 return(new RawPacket(
2581 RawPacket.OpCode.GetEventValues,
2582 result,
2583 true)); // Compression may be very effective.
2584 }
2585
2586 /**Handle an incoming GetVariables request; never null. */
2587 private static RawPacket handleGetVariables(
2588 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2589 throws IOException
2590 {
2591 final long changedSince = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2592 readLong();
2593 // Get values.
2594 final List<SimpleVariableValue> l = new ArrayList<SimpleVariableValue>(Arrays.asList(
2595 source.getVariables(changedSince)));
2596 // Return only non-local variables.
2597 for(final Iterator<SimpleVariableValue> it = l.iterator(); it.hasNext(); )
2598 {
2599 final SimpleVariableValue svv =
2600 it.next();
2601 if(svv.getDef().isLocal())
2602 { it.remove(); }
2603 }
2604 // Sort by variable name for better compression on the wire.
2605 Collections.sort(l, SimpleVariableValue.compByDef);
2606 // Convert to array.
2607 final SimpleVariableValue svvs[] =
2608 new SimpleVariableValue[l.size()];
2609 l.toArray(svvs);
2610 return(new RawPacket(
2611 RawPacket.OpCode.GetVariables,
2612 svvs,
2613 true)); // Compression may be very effective.
2614 }
2615
2616 /**Handle an incoming SetVariables request; never null.
2617 * NOTE: for security/sanity reasons,
2618 * reject any attempt to set where the value does not
2619 * have a credible globalMap of exactly size == 1.
2620 * <p>
2621 * We'll validate the entire set of values,
2622 * and reject the whole lot if any are dubious.
2623 */
2624 private static RawPacket handleSetVariables(
2625 final SimpleExhibitPipelineIF source, final RawPacket reqPacket,
2626 final String clientAddr, final SimpleLoggerIF logger)
2627 throws IOException
2628 {
2629 // Decode array of variable values to apply.
2630 final SimpleVariableValue svvs[] = (SimpleVariableValue[])
2631 reqPacket.getSerializedObjectPayload();
2632
2633 // Get our local ID so that we can veto changes
2634 // claiming to be from us for sanity/security.
2635 // If we get a null (not definition currently)
2636 // this will not cause an exception to be thrown,
2637 // we just won't be able to screen.
2638 final SimpleVariableValue ourID =
2639 source.getVariable(SystemVariables.LOCAL_SYS_ID);
2640 final InstanceID iid = (ourID == null) ? null:
2641 ((InstanceID) ourID.getValue());
2642
2643 // Validate the requests.
2644 // Silently ignore:
2645 // * null entries
2646 // * locals
2647 // * anything with a definition that we don't have
2648 // * anything without a globalMap of size 1
2649 // * TODO: anything claiming to be from our system ID
2650 // * claims to be setting values from multiple client IDs
2651 //
2652 // We ignore invalid individual entries rather than
2653 // rejecting the entire request for robustness if talking to
2654 // a slightly different age client to eliminate any
2655 // variable value that has no local definition,
2656 // but still apply the rest of the values,
2657 // though we still reject obviously bogus values.
2658 //
2659 // Capture the client system ID in passing
2660 // (each variable value should have a globalMap with
2661 // exactly one entry, which is the client's ID).
2662 final InstanceID clientID = null;
2663 for(int i = svvs.length; --i >= 0; )
2664 {
2665 final SimpleVariableValue svv = svvs[i];
2666
2667 // Reject obviously bogus values.
2668 if((svv == null) ||
2669 svv.getDef().isLocal() ||
2670 (svv.getGlobalMap() == null) ||
2671 (svv.getGlobalMap().size() != 1))
2672 { throw new IllegalArgumentException(); }
2673
2674 try {
2675 // Quietly ignore variables that we don't (currently)
2676 // have definitions for locally.
2677 if(!SystemVariables.defs.contains(svv.getDef()))
2678 {
2679 logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") with no local definition");
2680 continue;
2681 }
2682
2683 // Get the system that *is* in the globalMap...
2684 final InstanceID putativeClientID =
2685 svv.getGlobalMap().keySet().iterator().next();
2686 // Abort if it is our ID!
2687 if(clientID == null)
2688 {
2689 // OK, this must be from the first variable,
2690 // so trigger some one-off processing...
2691
2692 // Discard obviously-forged / looped values.
2693 if((iid != null) &&
2694 iid.equals(putativeClientID))
2695 {
2696 logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") with bad/looped IID: "+iid);
2697 continue;
2698 }
2699
2700 // Note the client's remote address
2701 // (ie make an entry in an upstream global Map)...
2702 final SimpleVariableValue newValue = new SimpleVariableValue(
2703 SystemVariables.TunnelServlet_SLAVE_ADDRS,
2704 clientAddr);
2705 source.setVariable(newValue.put(putativeClientID,
2706 newValue,
2707 true));
2708 }
2709 else
2710 {
2711 if(!clientID.equals(putativeClientID))
2712 {
2713 logger.log("Stopping before accepting tunnelled setVariable("+svv.getDef()+"): client claiming multiple InstanceIDs!");
2714 break; // Give up now...
2715 }
2716 }
2717 }
2718 catch(final IllegalArgumentException e)
2719 {
2720 logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") that got IllegalArgumentException: " + e.getMessage());
2721 continue;
2722 }
2723 catch(final UnsupportedOperationException e)
2724 {
2725 logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") that got UnsupportedOperationException: " + e.getMessage());
2726 continue;
2727 }
2728 }
2729
2730 // Apply the values, in the order received...
2731 final int nSet = source.setVariables(svvs);
2732
2733 // Create and send the response packet.
2734 return(new RawPacket(
2735 RawPacket.OpCode.SetVariables,
2736 intSer(nSet),
2737 false)); // Don't try compression; none to be had...
2738 }
2739
2740 /**Handle an incoming GetVariable request; never null. */
2741 private static RawPacket handleGetVariable(
2742 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2743 throws IOException
2744 {
2745 final String name = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2746 readUTF();
2747 // Look up variable definition from name.
2748 final SimpleVariableDefinition def =
2749 SystemVariables.nameToDef.get(name);
2750 // If no such variable,
2751 // or it is a local,
2752 // then immediately return an empty result.
2753 if((def == null) || def.isLocal())
2754 {
2755 return(new RawPacket(RawPacket.OpCode.GetVariable));
2756 }
2757 // Get the variable value (if any)...
2758 // Note that this may include a big globalMap.
2759 final SimpleVariableValue svv =
2760 source.getVariable(def);
2761 // Create and send the response packet
2762 // (empty for null response,
2763 // else serialised form, compressed when helpful).
2764 return(new RawPacket(
2765 RawPacket.OpCode.GetVariable,
2766 svv,
2767 true)); // Allow compression.
2768 }
2769
2770 /**If true then never pass through a request to cache content locally.
2771 * Content may be cached anyway,
2772 * but avoiding passing through a cache request here may help break request 'loops'
2773 * and/or conserve master cache space for example.
2774 */
2775 private static final boolean BLOCK_RAW_FILE_CACHEING = true;
2776
2777 /**Handle an incoming GetRawFile request; never null.
2778 * We satisfy the initial portion of over-length requests.
2779 * <p>
2780 * FIXME: decide whether to honour a false dontCache flag or not
2781 */
2782 private static RawPacket handleGetRawFile(
2783 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2784 throws IOException
2785 {
2786 // Decode the request directly from the payload (without a copy operation if possible).
2787 final byte[] payload = reqPacket.getPayload();
2788
2789 // Validate for reasonable-size request packet.
2790 if(payload.length < 11 + ExhibitName.MIN_NAME_LENGTH)
2791 { throw new IOException("invalid argument: bad exhibit name"); }
2792
2793 // First the exhibit name as-if a UTF String value (but we know it to be all 7-bit).
2794 final int nameLen = (((payload[0] & 0xff) << 8) + (payload[1] & 0xff));
2795 if((nameLen < ExhibitName.MIN_NAME_LENGTH) || (nameLen > ExhibitName.MAX_NAME_LENGTH))
2796 { throw new IOException("invalid argument: bad exhibit name length"); }
2797 final CharSequence name = new WrappedByteArrayCharSequence(payload, 2, nameLen);
2798 // Look up (validate) name
2799 // and efficiently convert to (or get) Name.ExhibitFull
2800 // and get size bound
2801 // all in one step...
2802 final ExhibitStaticAttr esa;
2803 try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2804 catch(final IllegalArgumentException e)
2805 { throw new IOException("invalid argument: bad exhibit name"); }
2806 if(esa == null)
2807 { throw new IOException("invalid argument: non-existent exhibit"); }
2808 final int start =
2809 ((payload[2 + nameLen] ) << 24) +
2810 ((payload[3 + nameLen] & 0xff) << 16) +
2811 ((payload[4 + nameLen] & 0xff) << 8) +
2812 ((payload[5 + nameLen] & 0xff) );
2813 final int afterEndRaw =
2814 ((payload[6 + nameLen] ) << 24) +
2815 ((payload[7 + nameLen] & 0xff) << 16) +
2816 ((payload[8 + nameLen] & 0xff) << 8) +
2817 ((payload[9 + nameLen] & 0xff) );
2818 // Coerce afterEnd into range, and allow zero-length requests.
2819 final int afterEnd = (int) Math.min(afterEndRaw, esa.length);
2820 final boolean dontCache = BLOCK_RAW_FILE_CACHEING || (payload[10 + nameLen] != 0);
2821
2822 // Disallow patently absurd values, eg due to request-packet corruption.
2823 if((start < 0) || (afterEnd < start))
2824 { throw new IOException("invalid argument: bad range"); }
2825 final int len = afterEnd - start;
2826 if(len > MAX_USER_READ_SIZE)
2827 { throw new IOException("invalid argument: request too large"); }
2828
2829 // Fetch the data from upstream...
2830 final byte rawData[] = new byte[len];
2831 final ByteBuffer buf = ByteBuffer.wrap(rawData);
2832 // Fetch whatever we quickly can.
2833 source.getRawFile(buf, esa.getExhibitFullName(), start, dontCache);
2834 // Flip the buffer to nominally switch to reading from it.
2835 buf.flip();
2836
2837 // Create and send the response packet containing raw exhibit data.
2838 //
2839 // Useful compression may be possible for some exhibit fragments/types,
2840 // but here we are slightly uncomfortable with
2841 // the potential vulnerability to invisible corruption of binary data
2842 // on the core (and non-re-compressible) exhibit type (JPEG)
2843 // with no application-level checksum on these frames,
2844 // so we simply avoid trying to compress *.jpg file data for now.
2845 return(new RawPacket(
2846 RawPacket.OpCode.GetRawFile,
2847 rawData, buf.limit(),
2848 !TextUtils.endsWith(esa.getCharSequence(), ".jpg")));
2849 }
2850
2851 /**Handle an incoming GetThumbnails request; never null. */
2852 private static RawPacket handleGetThumbnails(
2853 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2854 throws IOException
2855 {
2856 // Decode the request directly from the payload (without a copy operation if possible).
2857 final byte[] payload = reqPacket.getPayload();
2858 // Validate for reasonable-size request packet.
2859 if(payload.length < 3 + ExhibitName.MIN_NAME_LENGTH)
2860 { throw new IOException("invalid argument: bad exhibit name"); }
2861 // First the exhibit name as-if a UTF String value (but we know it to be all 7-bit).
2862 final int nameLen = (((payload[0] & 0xff) << 8) + (payload[1] & 0xff));
2863 if((nameLen < ExhibitName.MIN_NAME_LENGTH) || (nameLen > ExhibitName.MAX_NAME_LENGTH))
2864 { throw new IOException("invalid argument: bad exhibit name length"); }
2865 final CharSequence name = new WrappedByteArrayCharSequence(payload, 2, nameLen);
2866 final boolean create = (payload[2 + nameLen] != 0);
2867
2868 // final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2869 // final String name = dis.readUTF();
2870 // final boolean create = dis.readBoolean();
2871 // dis.close(); // Hopefully free some resources quickly.
2872
2873 final ExhibitStaticAttr esa;
2874 try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2875 catch(final IllegalArgumentException e)
2876 { throw new IOException("invalid argument: bad exhibit name"); }
2877 if(esa == null)
2878 { throw new IOException("invalid argument: non-existent exhibit"); }
2879
2880 final ExhibitThumbnails eth =
2881 source.getThumbnails(esa.getExhibitFullName(), create);
2882
2883 // Create and send the response packet
2884 // (empty for null response, else serialised form).
2885 // We allow compression because it may save a little bandwidth.
2886 return(new RawPacket(
2887 RawPacket.OpCode.GetThumbnails,
2888 eth,
2889 true)); // Allow compression to be attempted, though probably marginal at best.
2890 }
2891
2892 /**Handle an incoming GetAllExhibitImmutableData request; never null.
2893 * Holds the 'slow response' lock while running
2894 * to exclude other such intensive/slow responses concurrently.
2895 */
2896 private static RawPacket handleGetAllExhibitImmutableData(
2897 final SimpleExhibitPipelineIF source, final RawPacket reqPacket,
2898 final HIRPCCache cache, final SimpleLoggerIF logger)
2899 throws IOException
2900 {
2901 final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2902 readLong();
2903 final AllExhibitImmutableData aeid =
2904 source.getAllExhibitImmutableData(stamp);
2905
2906 // Simple case; nothing to return since caller is up to date,
2907 // or we allow concurrent expensive calls.
2908 if((aeid == null) || (cache == null))
2909 {
2910 return(new RawPacket(
2911 RawPacket.OpCode.GetAllExhibitImmutableData,
2912 aeid,
2913 true)); // Allow compression.
2914 }
2915
2916 // If the response is other than null
2917 // and we reject concurrent expensive calls,
2918 // then veto it if another such call is already in progress.
2919 if(!cache.slowResponseLock.tryLock())
2920 {
2921 logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitImmutableData: already in progress: aborting...");
2922 throw new IOException("expensive call already in progress, please retry");
2923 }
2924 // Create and send the response packet
2925 // (empty for null response,
2926 // else serialised form, compressed when helpful).
2927 try
2928 {
2929 return(new RawPacket(
2930 RawPacket.OpCode.GetAllExhibitImmutableData,
2931 aeid,
2932 true)); // Allow compression.
2933 }
2934 finally { cache.slowResponseLock.unlock(); }
2935 }
2936
2937 /**Handle an incoming GetStratum request; never null. */
2938 private static RawPacket handleGetStratum(
2939 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2940 throws IOException
2941 {
2942 final Stratum stRaw = source.getStratum();
2943
2944 // Just before returning the Stratum info,
2945 // generate an amended value to reflect our conservation status if necessary
2946 // so that it's logically right 'on the wire'.
2947 final Stratum st;
2948 if(GenUtils.mustConservePower() == stRaw.isUpstreamConserving())
2949 { st = stRaw; } // Will do as-is!
2950 else
2951 {
2952 // Flip conserving flag to correctly reflect this instance's state as the 'upstream' server.
2953 st = new Stratum(stRaw.getStratum(), stRaw.getRootDelay(), stRaw.getUpstreamName(),
2954 !stRaw.isUpstreamConserving());
2955 }
2956
2957 // Create and send the response packet.
2958 return(new RawPacket(
2959 RawPacket.OpCode.GetStratum,
2960 st,
2961 false)); // Don't attempt compression (probably won't work and could inflate RTT estimates).
2962 }
2963
2964
2965 /**Immutable raw packet to send in either direction over a byte stream connection.
2966 * This consists of an op-code byte,
2967 * a byte-array payload,
2968 * and a trailer byte (different to any op-code byte).
2969 * <p>
2970 * We informally attempt to keep a decent Hamming distance
2971 * between different op-codes (ie more than one bit at a time
2972 * should have to change to mutate one into another).
2973 * <p>
2974 * The byte array can be sent compressed (using the gzip
2975 * algorithm), and this compression is adaptive, ie if
2976 * compression does not save space then it is not used.
2977 * The assumption if compression is requested is that it
2978 * is worthwhile burning lots of CPU cycles to compress
2979 * the data as far as possible, and so we may attempt very
2980 * CPU-hungry settings. Compressed data is sent on the
2981 * wire with the length actually -compressedDataLength.
2982 * <p>
2983 * This NOT intended to be directly Serializable.
2984 * <p>
2985 * NOTE: this is only guaranteed immutable if the payload is not tampered with
2986 * after being passed to the constructor.
2987 */
2988 public static final class RawPacket implements MemoryTools.Internable
2989 {
2990 /**Operation code: NO-OP (no operation). */
2991 public static final byte OP_NOOP = 0;
2992
2993 /**Operation code: getGenProps(). */
2994 public static final byte OP_getGenProps = 3;
2995
2996 /**Operation code: getAllExhibitImmutableData(). */
2997 public static final byte OP_getAllExhibitImmutableData = 5;
2998
2999 /**Operation code: getStaticAttr(). */
3000 public static final byte OP_getStaticAttr = 9;
3001
3002 /**Operation code: getRawFile(). */
3003 public static final byte OP_getRawFile = 10;
3004
3005 /**Operation code: getGenSecProps(). */
3006 public static final byte OP_getGenSecProps = 13;
3007
3008 /**Operation code: getAllExhibitProperties(). */
3009 public static final byte OP_getAllExhibitProperties = 17;
3010 /**Operation code: getAllExhibitPropertiesDiff(). */
3011 public static final byte OP_getAllExhibitPropertiesDiff = 18;
3012
3013 /**Operation code: getThumbnails(). */
3014 public static final byte OP_getThumbnails = 23;
3015
3016 /**Operation code: getVariable(). */
3017 public static final byte OP_getVariable = 26;
3018
3019 /**Operation code: getVariables(). */
3020 public static final byte OP_getVariables = 29;
3021
3022 /**Operation code: setVariables(). */
3023 public static final byte OP_setVariables = 30;
3024
3025 /**Operation code: syncVariables(). */
3026 public static final byte OP_syncVariables = 33;
3027
3028 // /**Operation code: getEventValue(). */
3029 // public static final byte OP_getEventValue = 38;
3030
3031 /**Operation code: getEventValues(). */
3032 public static final byte OP_getEventValues = 41;
3033
3034 /**Operation code: getStratum(). */
3035 public static final byte OP_getStratum = 46;
3036
3037
3038 /**Reserved op-code for use in response packets to indicate that the operation was interrupted and can be retried (not an error). */
3039 public static final byte OP__INTEX = 119;
3040
3041 /**Reserved op-code for use in response packets to indicate a PGMasterNotInServiceException. */
3042 public static final byte OP__PGMNISEX = 121;
3043
3044 /**Reserved op-code for use in response packets to indicate a RemoteException. */
3045 public static final byte OP__REMEX = 125;
3046
3047 /**Reserved op-code for use in response packets to indicate a RuntimeException.
3048 * Receipt of one of these DOES NOT indicate a link failure
3049 * (and thus need to back off use of the link).
3050 */
3051 public static final byte OP__RUNTEX = 126;
3052
3053 public static enum OpCode
3054 {
3055 NOOP(OP_NOOP),
3056 GetGenProps(OP_getGenProps),
3057 GetAllExhibitImmutableData(OP_getAllExhibitImmutableData),
3058 GetStaticAttr(OP_getStaticAttr),
3059 GetRawFile(OP_getRawFile),
3060 GetGenSecProps(OP_getGenSecProps),
3061 GetAllExhibitProperties(OP_getAllExhibitProperties),
3062 GetAllExhibitPropertiesDiff(OP_getAllExhibitPropertiesDiff),
3063 GetThumbnails(OP_getThumbnails),
3064 GetVariable(OP_getVariable),
3065 GetVariables(OP_getVariables),
3066 SetVariables(OP_setVariables),
3067 SyncVariables(OP_syncVariables),
3068 GetEventValues(OP_getEventValues),
3069 GetStratum(OP_getStratum),
3070 INTEX(OP__INTEX),
3071 PGMNISEX(OP__PGMNISEX),
3072 REMEX(OP__REMEX),
3073 RUNTEX(OP__RUNTEX);
3074
3075 private OpCode(final byte code) { this.code = code; }
3076
3077 /**The byte code; normal values are small and positive.
3078 * We try to maintain a good Hamming distance between any two codes.
3079 */
3080 private final byte code;
3081
3082 /**Get the "closeness" factor higher meaning closer; strictly positive. */
3083 public final byte getCode() { return(code); }
3084
3085 /**Constant-time lookup from (unsigned) byte value to enum.
3086 * Created on first use.
3087 */
3088 private static final class LookupCache
3089 {
3090 private LookupCache() { /* No instances needed. */ }
3091 static final OpCode lookup[] = new OpCode[256];
3092 static
3093 {
3094 for(final OpCode oc : OpCode.values())
3095 {
3096 final int index = oc.getCode() & 0xff;
3097 assert(lookup[index] == null) : "all OpCode values must be unique";
3098 lookup[index] = oc;
3099 }
3100 }
3101 }
3102
3103 /**Look up from byte to enum value, else null if no matching enum value. */
3104 public static OpCode lookupCode(final byte code)
3105 {
3106 return(LookupCache.lookup[code & 0xff]); // Constant-time lookup.
3107 // // Above should be equivalent to this (slow) linear lookup...
3108 // for(final OpCode oc : OpCode.values())
3109 // { if(oc.code == code) { return(oc); } }
3110 // return(null); // None found.
3111 }
3112 }
3113
3114
3115 /**Trailer byte; different to all valid op-code byte values. */
3116 public static final byte TRAILER = (byte) 0xda;
3117 /**Verify that TRAILER is indeed not a valid op-code. */
3118 static { assert(null == OpCode.lookupCode(TRAILER)); }
3119
3120
3121 /**Minimum size of payload for which we will try heaviest compression modes; strictly positive.
3122 * This is for modes such as BZIP2 or LZMA with heavy memory, CPU and
3123 * start-up costs (ie much heavier than GZIP).
3124 * <p>
3125 * Below this we assume that the absolute bandwidth savings
3126 * are not worth the pain.
3127 * <p>
3128 * Probably bigger than a "bulk transfer" block too,
3129 * so that the heavy guns are wielded only for the biggest frames.
3130 */
3131 public static final int MIN_PLLENGTH_FOR_HEAVY_COMP_ALGS = 64 * 1024;
3132
3133 /**Opcode (single byte); never null.
3134 * Set first in the raw frame on the wire.
3135 * <p>
3136 * Never numerically equal to TRAILER.
3137 */
3138 public final OpCode opCode;
3139
3140 /**Frame overhead (header and trailer) in bytes; strictly positive. */
3141 public static final int FRAME_OVERHEAD_BYTES = 6;
3142
3143 /**Maximum permitted total frame length including header/trailer; strictly positive power of two minus the frame overhead.
3144 * Set much larger than the user-level transfer-size limit
3145 * to allow for some control data overhead on top of the given data,
3146 * and for much larger non-user-controlled items such as the AEP.
3147 * <p>
3148 * This limit mainly exists to prevent DoS-style out-of-memory problems
3149 * arising from corrupt (huge) length values.
3150 */
3151 public static final int MAX_FRAME_SIZE = 128*1024*1024;
3152
3153 /**Maximum permitted payload length; strictly positive power of two minus the frame overhead.
3154 * Set much larger than the user-level transfer-size limit
3155 * to allow for some control data overhead on top of the given data,
3156 * and for much larger non-user-controlled items such as the AEP.
3157 * <p>
3158 * This mainly exists to prevent DoS-style out-of-memory problems
3159 * arising from corrupt (huge) length values.
3160 */
3161 public static final int MAX_PAYLOAD_SIZE = MAX_FRAME_SIZE - FRAME_OVERHEAD_BYTES;
3162 /**Check that we really do allow room for the maximum user-data transfer and then some. */
3163 static { assert(MAX_PAYLOAD_SIZE > 1024+SimpleExhibitPipelineIF.MAX_USER_READ_SIZE); }
3164
3165 /**Payload data, non-null unless the data is stored compressed.
3166 * Private in order to keep the whole packet immutable.
3167 */
3168 private final byte payload[];
3169
3170 /**Compressed payload data, null unless the data is stored compressed.
3171 * Private in order to keep the whole packet immutable.
3172 */
3173 private final byte payloadCompressed[];
3174
3175 /**Trailer byte; not a valid op-code. */
3176 public static final byte trailer = TRAILER;
3177
3178 /**Depends on the whole packet being identical.
3179 * This depends on the header and data being identical,
3180 * including being compressed in the same way if at all.
3181 */
3182 @Override public boolean equals(final Object obj)
3183 {
3184 if(obj == this) { return(true); }
3185 if(!(obj instanceof RawPacket)) { return(false); }
3186 final RawPacket other = (RawPacket) obj;
3187
3188 if(opCode != other.opCode) { return(false); }
3189 if(getActualPayloadLength() != other.getActualPayloadLength()) { return(false); }
3190
3191 // Checking the data for equality may be very slow...
3192 if(!Arrays.equals(payloadCompressed, other.payloadCompressed)) { return(false); }
3193 if(!Arrays.equals(payload, other.payload)) { return(false); }
3194
3195 return(true); // Yes, identical.
3196 }
3197
3198 /**Checks for equivalent raw packets, ignoring internal representation details.
3199 * Mainly this means ignoring whether the data is held compressed or not,
3200 * as long as the values that the user can see are identical.
3201 * <p>
3202 * May be very slow and memory intensive.
3203 */
3204 public boolean equivalentTo(final RawPacket other)
3205 {
3206 if(opCode != other.opCode) { return(false); }
3207
3208 // Check that the (uncompressed) data appears identical to the user.
3209 return(Arrays.equals(payloadCompressed, other.payloadCompressed) ||
3210 Arrays.equals(payload, other.payload) ||
3211 Arrays.equals(getPayload(), other.getPayload()));
3212 }
3213
3214 /**Hash is based on just packet type and (actual) packet length. */
3215 @Override public int hashCode()
3216 { return(((opCode.getCode() * 69069) + getActualPayloadLength())); }
3217
3218 /**Construct a new raw packet with an empty payload.
3219 *
3220 * @param op the op-code of the RPC; never null
3221 */
3222 public RawPacket(final OpCode opCode)
3223 {
3224 if(null == opCode) { throw new IllegalArgumentException(); }
3225 this.opCode = opCode;
3226 this.payload = EMPTY_PAYLOAD;
3227 this.payloadCompressed = null;
3228 }
3229
3230 /**Construct a new raw packet with the whole data array as the payload.
3231 * Note that the data is <em>not</em> copied (nor modified);
3232 * the array should not subsequently be modified.
3233 * <p>
3234 * This assumes that it is worth trying to compress the payload,
3235 * if other conditions support it.
3236 *
3237 * @param op the op-code of the RPC; never null
3238 * @param data the payload bytes; never null
3239 */
3240 public RawPacket(final OpCode op, final byte data[])
3241 { this(op, data, true); }
3242
3243 /**Construct a new raw packet with the whole data array as the payload.
3244 * Note that the data is <em>not</em> copied (nor modified);
3245 * the array should not subsequently be modified.
3246 *
3247 * @param op the op-code of the RPC; never null
3248 * @param data the payload bytes; never null
3249 * @param attemptCompression if true, it is probably worth attempting
3250 * to compress the payload data unless very short
3251 */
3252 public RawPacket(final OpCode op, final byte data[],
3253 final boolean attemptCompression)
3254 { this(op, data, data.length, attemptCompression); }
3255
3256 /**Construct a new raw packet with the initial portion of the data array as the payload.
3257 * Note that the data is <em>not</em> copied (nor modified),
3258 * unless it is compressed or the data does not occupy the whole array;
3259 * the array should not subsequently be modified.
3260 * <p>
3261 * If compression is requested and is possible
3262 * then the compression may be performed at construction time
3263 * and the compressed form of the data held to save space.
3264 *
3265 * @param op the op-code of the RPC; never null
3266 * @param data the (uncompressed) payload bytes; never null
3267 * @param initialPortion the initial portion of data[] containing
3268 * payload data; never negative or larger than data.length or MAX_PAYLOAD_SIZE
3269 * @param attemptCompression if true, it is probably worth attempting
3270 * to compress the payload data unless very short
3271 */
3272 public RawPacket(final OpCode op, final byte data[], final int initialPortion,
3273 final boolean attemptCompression)
3274 {
3275 if((op == null) ||
3276 (data == null) ||
3277 (initialPortion > MAX_PAYLOAD_SIZE) ||
3278 (initialPortion > data.length) || (initialPortion < 0))
3279 { throw new IllegalArgumentException(); }
3280
3281 // Capture the opcode and the raw payload length immediately.
3282 opCode = op;
3283
3284 // See if the data needs compressing and can be compressed.
3285 // If not, then keep it uncompressed.
3286 //
3287 // If compression is requested and the payload size is significant
3288 // (allowing for for data overheads of the compression scheme)
3289 // then check out the possibility of compression,
3290 // else ignore, and send the data uncompressed.
3291 //
3292 // We assume that trying to deflate anything smaller than
3293 // (say) 256 bytes is a waste of effort given the
3294 // deflate overhead and, indeed, the packet overhead.
3295 if(attemptCompression &&
3296 (initialPortion >= FileTools.TYPICAL_DEFLATE_MIN_TEXT_SIZE_COMPRESSABLE))
3297 {
3298 final byte compressed[] = FileTools.compressDeflatableData(data, 0, initialPortion);
3299 final boolean savedSpace = (compressed.length < initialPortion);
3300
3301 // If successful then store the compressed data.
3302 if(savedSpace)
3303 {
3304 payload = null;
3305 payloadCompressed = compressed;
3306 if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[RawPacket: compressed payload from "+initialPortion+" to "+(payloadCompressed.length)+" bytes.]"); }
3307 return;
3308 }
3309 }
3310
3311 // Store uncompressed data.
3312 // Store without copying if data is entire array.
3313 if(data.length == initialPortion)
3314 { payload = data; }
3315 else
3316 {
3317 // It is inefficient to copy,
3318 // so we want to avoid this where possible,
3319 // and in debug will warn where it is happening.
3320 if(IsDebug.isDebug) { System.err.println("[RawPacket: WARNING: copying uncompressed payload from oversize buffer.]"); Thread.dumpStack(); }
3321 payload = Arrays.copyOf(data, initialPortion);
3322 }
3323 payloadCompressed = null;
3324 }
3325
3326 /**Construct a new raw packet with data provided.
3327 * This accepts whatever it is passed
3328 * (we check for some errors with run-time assertions in debug code),
3329 * and is intended only to be called from other constructors
3330 * or factory methods.
3331 *
3332 * @param op the op-code of the RPC; never null
3333 */
3334 private RawPacket(final OpCode opCode,
3335 final byte[] payload,
3336 final byte[] payloadCompressed)
3337 {
3338 if(IsDebug.isDebug)
3339 {
3340 assert(opCode != null);
3341 assert((payload == null) != (payloadCompressed == null)); // Exactly one must be null.
3342 }
3343
3344 this.opCode = opCode;
3345 this.payload = payload;
3346 this.payloadCompressed = payloadCompressed;
3347 }
3348
3349 /**Construct a new raw packet with a serialised object as the payload.
3350 * This will use an uncompressed form if more efficient on the wire.
3351 * <p>
3352 * This will have the entire uncompressed form in memory at peak,
3353 * so may be unsuitable for very large (and highly-compressible)
3354 * objects.
3355 *
3356 * @param op the op-code of the RPC; never null
3357 * @param obj the Serializable object to send,
3358 * or null to send empty payload
3359 * @param attemptCompression if true, it is probably worth attempting
3360 * to compress the payload data if not very short
3361 */
3362 public RawPacket(final OpCode op, final Serializable obj,
3363 final boolean attemptCompression)
3364 throws IOException
3365 {
3366 this(op, _serObjForPayload(obj), attemptCompression);
3367 }
3368
3369 /**Construct a new raw packet with a stream-serialised object as the payload.
3370 * This will always compress the supplied object
3371 * (unless null, in which case an empty payload is used)
3372 * to try to avoid ever having the full unserialised form in memory.
3373 * <p>
3374 * This will veto with an IOException any attempt to write more than
3375 * MAX_PAYLOAD_SIZE pre-compression or pre-compression bytes.
3376 *
3377 * @param op the op-code of the RPC; one of the OP_XXX values
3378 * @param obj the Serializable object to send,
3379 * or null to send empty payload
3380 * @param attemptCompression if true, it is probably worth attempting
3381 * to compress the payload data if not very short
3382 */
3383 public static RawPacket streamSerialiseObject(final OpCode op, final Serializable obj)
3384 throws IOException
3385 {
3386 if(op == null) { throw new IllegalArgumentException(); }
3387
3388 // If the object to be serialised is null
3389 // then return a result using an empty payload.
3390 if(null == obj) { return(new RawPacket(op)); }
3391
3392 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3393 // We veto any attempt to construct a frame
3394 // whose compressed or uncompressed payload would exceed MAX_PAYLOAD_SIZE bytes.
3395 // We apply exactly the same header-less deflate as in other places we compress.
3396 final DefOutputStream dos = new DefOutputStream(new GenUtils.LengthLimitedOutputStream(baos, MAX_PAYLOAD_SIZE));
3397 final GenUtils.LengthLimitedOutputStream llos = new GenUtils.LengthLimitedOutputStream(dos, MAX_PAYLOAD_SIZE); // Must not throw an exception...
3398 ObjectOutputStream oos = null;
3399 try
3400 {
3401 oos = new ObjectOutputStream(llos);
3402 oos.writeObject(obj);
3403 oos.flush();
3404 }
3405 finally
3406 { dos.finish(); } // Finish compression and ensure that native resources are released.
3407 if(oos != null) { oos.close(); } // Hopefully free resources quickly.
3408
3409 // Construct packet with compressed data...
3410 return(new RawPacket(op, null, baos.toByteArray()));
3411 }
3412
3413 /**Serialise object for payload.
3414 * Returns a zero-length array if the object is null.
3415 */
3416 private static byte[] _serObjForPayload(final Serializable obj)
3417 throws IOException
3418 {
3419 if(obj == null)
3420 { return(ExhibitDataTunnelSource.EMPTY_PAYLOAD); }
3421
3422 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3423 final ObjectOutputStream oos = new ObjectOutputStream(baos);
3424 oos.writeObject(obj);
3425 oos.flush();
3426 final byte[] responsePayload = baos.toByteArray();
3427 oos.close(); // Hopefully free resources quickly.
3428 return(responsePayload);
3429 }
3430
3431 /**Retrieves a single serialised Object from the payload.
3432 * If the payload is empty (zero-length), we return null.
3433 * <p>
3434 * For robustness, treats a class mismatch problem like a data problem,
3435 * ie this will re-throw any ClassNotFoundException as an IOException.
3436 */
3437 public Object getSerializedObjectPayload()
3438 throws IOException
3439 {
3440 if(getActualPayloadLength() == 0) { return(null); }
3441
3442 final InputStream is = getPayloadAsInputStream();
3443 final ObjectInputStream ois = new ObjectInputStream(is);
3444 try { return(ois.readObject()); }
3445 catch(final ClassNotFoundException e) { throw new IOException(e); }
3446 finally { ois.close(); } // Release resources, especially native in decompressor.
3447 }
3448
3449 /**Computes the total number of bytes that would be written by writePacket(); strictly positive.
3450 */
3451 public int getFrameLength()
3452 {
3453 // The total length on the wire of the entire packet.
3454 // An excess 6 bytes allows for 1 opCode, 4 length, 1 trailer.
3455 return(FRAME_OVERHEAD_BYTES + getActualPayloadLength());
3456 }
3457
3458 /**Send the packet down the given OutputStream with header and trailer.
3459 * This flushes the data after writing it.
3460 * <p>
3461 * This does not close() the stream.
3462 */
3463 public void writePacket(final OutputStream os)
3464 throws IOException
3465 { writePacket(os, false); }
3466
3467 /**Send the packet down the given OutputStream with header and trailer.
3468 * This flushes the data after writing it.
3469 * <p>
3470 * If the payload is not too large,
3471 * then the entire packet will be buffered
3472 * and sent in one write(),
3473 * else it will be sent in LAN-packet-size chunks.
3474 * <p>
3475 * This does not close() the stream.
3476 *
3477 * @param reduceWrites if true then make an effort to reduce the number of writes done
3478 * (ie try to write fewer larger blocks)
3479 * since the output may be to a raw network stream or similar
3480 * which may require more copying of data first
3481 */
3482 public void writePacket(final OutputStream os, final boolean reduceWrites)
3483 throws IOException
3484 {
3485 if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[writePacket(): op="+opCode+" len="+getActualPayloadLength()+".]"); }
3486
3487 final boolean isCompressed = (payload == null);
3488 final byte[] dataOnTheWire =
3489 isCompressed ? payloadCompressed : payload;
3490 final int realPayloadLen = dataOnTheWire.length;
3491
3492 // Optimisation for empty frame (no payload).
3493 if(0 == realPayloadLen)
3494 {
3495 final byte[] out = new byte[FRAME_OVERHEAD_BYTES];
3496 out[0] = opCode.getCode();
3497 // Length is all zeros.
3498 out[FRAME_OVERHEAD_BYTES-1] = trailer;
3499 os.write(out);
3500 os.flush();
3501 return;
3502 }
3503
3504 // The total length on the wire of the entire packet.
3505 final int totalPacketLen = FRAME_OVERHEAD_BYTES + realPayloadLen;
3506
3507 // Buffer the output (with a carefully-crafted buffer size) if requested
3508 // so that we can write it all in one operating-system interaction
3509 // if small enough, else in efficient-ish disc/network-size chunks.
3510 // (Note that Tomcat 4.1.31 writes chunked data to the network
3511 // in blocks of 2kB on Solaris 8/9,
3512 // but also note that the write of a large payload will cause
3513 // the header data to be flushed through the buffer first,
3514 // so anything other than quite a small buffer will be a waste.)
3515 // But we don't want to do pointless extra copies.
3516 final DataOutputStream dos = new DataOutputStream((!reduceWrites) ? os :
3517 new BufferedOutputStream(os,
3518 (totalPacketLen <= CoreConsts.WAN_LARGE_FRAME_PAYLOAD_BYTES) ?
3519 totalPacketLen : (FRAME_OVERHEAD_BYTES-1)));
3520
3521 // Send the opcode.
3522 dos.writeByte(opCode.getCode());
3523 // Send the payload length (negative indicating compressed).
3524 dos.writeInt(isCompressed ? -realPayloadLen : realPayloadLen);
3525 // Send the payload.
3526 dos.write(dataOnTheWire, 0, realPayloadLen);
3527 // Write the trailer.
3528 dos.writeByte(trailer);
3529 // Flush data though any buffers and down the wire if possible.
3530 dos.flush();
3531 }
3532
3533 /**Reads a packet from the input stream, blocking until done.
3534 * Preserves its compressed/uncompressed form,
3535 * possibly helping reduce memory footprint at the receiver.
3536 * <p>
3537 * Compressed data is signalled by a negative length on the wire,
3538 * where this is the negation of the compressed data size.
3539 * <p>
3540 * The ZLIB deflater with maximum compression and
3541 * no checksum is used for any (de)compression.
3542 * <p>
3543 * To prevent unpleasantness such a OutOfMemoryError from a corrupt length value,
3544 * we veto lengths greater than the maximum specified for this frame type.
3545 */
3546 public static RawPacket readPacket(final InputStream is)
3547 throws IOException
3548 {
3549 // While not wonderfully efficient,
3550 // to save a great deal of fiddly coding
3551 // the input stream is wrapped in a DataInputStream.
3552 final DataInputStream dis = new DataInputStream(is);
3553 // Start collecting the data...
3554 final byte opC = dis.readByte();
3555 final OpCode op = OpCode.lookupCode(opC);
3556 if(op == null) { throw new IOException("invalid opcode "+opC); }
3557 final int rawLen = dis.readInt();
3558 final boolean isCompressed = (rawLen < 0);
3559 final int len = ((isCompressed) ? -rawLen : rawLen);
3560 if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[readPacket(): op="+opC+" rawLen="+rawLen+"...]"); }
3561 if(len > MAX_PAYLOAD_SIZE) { throw new IOException("corrupt length"); }
3562 final byte data[];
3563 if(0 == len)
3564 { data = EMPTY_PAYLOAD; }
3565 else
3566 {
3567 data = new byte[len];
3568 dis.readFully(data);
3569 }
3570 final byte tr = dis.readByte();
3571 if(tr != TRAILER)
3572 { throw new IOException("corrupt trailer"); }
3573 try {
3574 // Decompress the data if necessary...
3575 if(isCompressed)
3576 {
3577 if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[readPacket(): compressed payload len="+data.length+" bytes.]"); }
3578 return(new RawPacket(op, null, data));
3579 }
3580 return(new RawPacket(op, data, null));
3581 }
3582 catch(final IllegalArgumentException e)
3583 { throw new IOException("corrupt packet: " + e.getMessage()); }
3584 }
3585
3586 /**Create a human-readable summary (not including the payload data). */
3587 @Override
3588 public String toString()
3589 {
3590 final StringBuilder sb = new StringBuilder(79);
3591 sb.append("RawPacket");
3592 sb.append(":opCode=").append(opCode);
3593 sb.append(":actualPayloadLength=").append(getActualPayloadLength());
3594 return(sb.toString());
3595 }
3596
3597 /**Get the payload data as an InputStream; never null.
3598 * If the internal data is stored compressed however,
3599 * then this will return the uncompressed version.
3600 * <p>
3601 * Each stream returned is independent.
3602 * <p>
3603 * This stream should be explicitly closed when finished with
3604 * to release underlying memory and other resources ASAP.
3605 *
3606 * @return the uncompressed payload data as a stream of length plLength
3607 */
3608 public InputStream getPayloadAsInputStream()
3609 {
3610 if(payload != null)
3611 {
3612 // Payload is uncompressed; wrap it as-is.
3613 return(new ByteArrayInputStream(payload));
3614 }
3615
3616 // Payload is held compressed so decompress it
3617 // (on the fly to reduce peak memory usage).
3618 try { return(new DefInputStream(new ByteArrayInputStream(payloadCompressed))); }
3619 catch(final IOException e) { throw new IllegalStateException("unable to decompress"); }
3620 }
3621
3622 /**Get a copy of the payload data; never null.
3623 * We return a copy to keep this object immutable.
3624 * <p>
3625 * If the internal data is stored compressed
3626 * then this will return the original uncompressed version.
3627 *
3628 * @return a copy of the payload data, of length plLength
3629 */
3630 public byte[] getPayloadCopy()
3631 {
3632 if(payload != null)
3633 {
3634 // Payload is uncompressed; clone it as-is.
3635 // Take copy to protect internal state.
3636 return(payload.clone());
3637 }
3638
3639 // Payload is held compressed, so decompress it.
3640 try
3641 {
3642 final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3643 // No need to copy (private) data returned from decompression.
3644 return(uncompressedData);
3645 }
3646 catch(final IOException e)
3647 {
3648 throw new IllegalStateException("corrupt compressed data");
3649 }
3650 }
3651
3652 /**Get direct access to the uncompressed payload; never null.
3653 * We avoid making a copy, for speed.
3654 * <em>The caller must not alter the byte array returned.</em>
3655 * <p>
3656 * If the internal data is stored compressed
3657 * then this will return an uncompressed copy.
3658 * <p>
3659 * This is package-visible and is only for trusted callers
3660 * that will not alter the content.
3661 *
3662 * @return a the payload data, of length plLength; never null,
3663 */
3664 byte[] getPayload()
3665 {
3666 if(payload != null)
3667 {
3668 // Trust caller not to mess around...
3669 return(payload);
3670 }
3671
3672 // Payload is held compressed, so decompress it.
3673 try
3674 {
3675 final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3676 // No need to copy (private) data returned from decompression.
3677 return(uncompressedData);
3678 }
3679 catch(final IOException e)
3680 {
3681 throw new IllegalStateException("corrupt compressed data");
3682 }
3683 }
3684
3685 /**Copy the payload data into the supplied (trusted) buffer.
3686 * We only make this package-visible to keep this object immutable.
3687 * <p>
3688 * If the internal data is stored compressed
3689 * then this will return the original uncompressed version.
3690 */
3691 void getPayloadCopy(final ByteBuffer buf)
3692 {
3693 if(payload != null)
3694 {
3695 // Payload is uncompressed; clone it as-is.
3696 // Copy uncompressed data directly.
3697 buf.put(payload);
3698 return;
3699 }
3700
3701 // Payload is held compressed, so decompress it.
3702 try
3703 {
3704 final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3705 // No need to copy (private) data returned from decompression.
3706 buf.put(uncompressedData);
3707 return;
3708 }
3709 catch(final IOException e)
3710 {
3711 throw new IllegalStateException("corrupt compressed data");
3712 }
3713 }
3714
3715 /**Return the actual payload length; non-negative.
3716 * The payload may be compressed
3717 * so this may be less than the uncompressed full payload size.
3718 * <p>
3719 * If the original payload is empty (zero length)
3720 * and getUncompressedPayloadLength() returns 0
3721 * then this returns 0 also.
3722 */
3723 public int getActualPayloadLength()
3724 { return((null != payload) ? payload.length : payloadCompressed.length); }
3725 }
3726
3727 /**The immutable adjunct for a RawPacket that includes the HMAC and other anti-attack data.
3728 * Note that this class is NOT directly serialisable,
3729 * with the data fields sent in some other way,
3730 * eg as HTTP header fields "out-of-band" from the actual HTTP message.
3731 */
3732 public static final class PacketProtector
3733 {
3734 /**The timestamp for the RawPacket, input to each MAC; strictly positive. */
3735 public final long timestamp;
3736
3737 /**The length of the entire frame/datastream being protected, input to each MAC; non-negative. */
3738 public final int length;
3739
3740 /**The immutable in-order list of MAC authenticator segments for the stream and fields herein; never null nor empty nor containing nulls.
3741 * This sequence of HMAC values together makes up the message MAC.
3742 * It is designed not to be possible to re-arrange these segments,
3743 * as each depends in part on the value of its predecessor
3744 * as well as the data in its segment.
3745 * <p>
3746 * Note that the <em>last</em> MAC, which depends on all the data being protected,
3747 * can be used as a unique message MAC on its own.
3748 */
3749 public final List<ROByteArray> mac;
3750
3751 /**Maximum size in (ASCII) characters of output of toCheckString(); strictly positive.
3752 * Chosen to allow inclusion of the output of toCheckString() in an HTTP header.
3753 */
3754 public static final int MAX_CHECK_STRING_CHARS = 900;
3755
3756 /**Generate (HTTP-header) check-string; never null nor empty.
3757 * This is a pure-ASCII single-line control-code-free check String
3758 * that contains the input fields and MAC from this object
3759 * to serve as the "checksum" of a RawPacket.
3760 * <p>
3761 * This object is suitable (short enough, avoidance of meta-characters)
3762 * to be used directly in an HTTP header.
3763 * <p>
3764 * This value is suitable to be decoded by fromCheckString().
3765 * <p>
3766 * The format is a space-separated list of fields:
3767 * <ol>
3768 * <li>The Java-style UTC timestamp in milliseconds as an unsigned decimal.
3769 * <li>The length of the protected stream in bytes as an unsigned decimal.
3770 * <li>The MAC values for each segment, in order, encoded Base64.
3771 * </ol>
3772 */
3773 public String toCheckString()
3774 {
3775 // Do a worst-case check given the size of the (first) MAC as encoded.
3776 assert((""+Long.MAX_VALUE+' '+Integer.MAX_VALUE+' ').length() + (MAX_SEGMENTS*(1+TextUtils.encode8To6(mac.get(0).toByteArray()).length()))-1 < MAX_CHECK_STRING_CHARS) : "check string must always be short enough for HTTP header";
3777
3778 final int initialCapacity = 32 + (45*mac.size());
3779 final StringBuilder sb = new StringBuilder(initialCapacity);
3780 sb.append(timestamp).append(' ');
3781 sb.append(length).append(' ');
3782 for(final ROByteArray r : mac)
3783 { sb.append(TextUtils.encode8To6(r.toByteArray())).append(' '); }
3784 // Rip off trailing space...
3785 sb.setLength(sb.length()-1);
3786 assert(sb.length() <= MAX_CHECK_STRING_CHARS) : "check string must be short enough for HTTP header";
3787 //if(IsDebug.isDebug && (sb.length() > initialCapacity)) { System.err.println("WARNING: incorrectly sized toCheckString() buffer: initial|actual|mac.size() = "+initialCapacity+"|"+sb.length()+"|"+mac.size()+ ": "+sb); }
3788 return(sb.toString());
3789 }
3790
3791 /**Field splitter regex pattern compiled once for efficiency; never null. */
3792 private static final Pattern fieldSplitPattern = Pattern.compile(" ");
3793
3794 /**Parses a check-string as generated by toCheckString(); never null.
3795 * This is tolerant of some leading and trailing whitespace.
3796 *
3797 * @throws IllegalArgumentException if the input is unparsable
3798 */
3799 public static PacketProtector fromCheckString(final String check)
3800 {
3801 if((check == null) || (check.length() > 2*MAX_CHECK_STRING_CHARS))
3802 { throw new IllegalArgumentException("too big before trim"); }
3803 final String trimmed = check.trim();
3804 if(trimmed.length() > MAX_CHECK_STRING_CHARS)
3805 { throw new IllegalArgumentException("too big"); }
3806 final String fields[] = fieldSplitPattern.split(check, 0);
3807 if(fields.length < 3)
3808 { throw new IllegalArgumentException("not enough fields"); }
3809 final long timestamp = Long.parseLong(fields[0], 10);
3810 final int length = Integer.parseInt(fields[1], 10);
3811 if((length < 0) || (length > RawPacket.MAX_FRAME_SIZE))
3812 { throw new IllegalArgumentException("out-of-range length value"); }
3813 final ArrayList<ROByteArray> mac = new ArrayList<ROByteArray>(fields.length-2);
3814 for(int i = 2; i < fields.length; ++i)
3815 { mac.add(new ROByteArray(TextUtils.decode8To6(fields[i]))); }
3816 return(new PacketProtector(timestamp, length, mac));
3817 }
3818
3819 /**Maximum number of segments protected stream may be broken into; strictly positive power of two.
3820 * This determines the maximum number of incremental portions
3821 * we can process while streaming a RawPacket.
3822 * <p>
3823 * This value is capped to limit the amount of MAC data that needs to be sent,
3824 * eg in an HTTP header of limited length.
3825 */
3826 public static final int MAX_SEGMENTS = 16;
3827
3828 /**Minimum segment size (other than final segment); strictly positive power of two.
3829 * This is set to be somewhat larger than the largest typical frame
3830 * (ie the entire header, payload and trailer)
3831 * that we usually don't want to stream, eg bulk data transfer,
3832 * so that such frames/packets will get a single MAC for efficiency.
3833 * This is also set large enough to amortise the cost of each segment MAC.
3834 * <p>
3835 * This is small enough to represent a reasonable incremental amount of CPU
3836 * for streamed inputs.
3837 */
3838 public static final int MIN_SEGMENT_SIZE = (1<<16);
3839
3840 /**Maximum segment size; strictly positive power of two.
3841 * This is determined from the maximum frame size
3842 * and the maximum number of segments.
3843 */
3844 public static final int MAX_SEGMENT_SIZE = RawPacket.MAX_FRAME_SIZE / MAX_SEGMENTS;
3845 /**Check some invariants. */
3846 static { assert(RawPacket.MAX_FRAME_SIZE == MAX_SEGMENT_SIZE * MAX_SEGMENTS); }
3847 static { assert(MAX_SEGMENT_SIZE > MIN_SEGMENT_SIZE); }
3848
3849 /**Create an adjunct to protect a RawPacket, including a current timestamp.
3850 * This must be supplied with the (secret) Key shared between the servers.
3851 *
3852 * @param raw the RawPacket to protect; never null
3853 * @param key the (secret) key for the HMAC; never null
3854 */
3855 public PacketProtector(final RawPacket raw,
3856 final SecretKey key)
3857 throws InvalidKeyException
3858 { this(raw, System.currentTimeMillis(), key); }
3859
3860 /**Create an adjunct to protect a RawPacket, including the given timestamp.
3861 * This must be supplied with the (secret) Key shared between the servers.
3862 *
3863 * @param raw the RawPacket to protect; never null
3864 * @param timestamp the timestamp for creation/send of the RawPacket; strictly positive
3865 * @param key the (secret) key for the HMAC; never null
3866 */
3867 public PacketProtector(final RawPacket raw,
3868 final long timestamp,
3869 final SecretKey key)
3870 throws InvalidKeyException
3871 { this(timestamp, raw.getFrameLength(), computeMAC(timestamp, raw, key)); }
3872
3873 /**Create an adjunct to protect a RawPacket.
3874 *
3875 * @param timestamp the timestamp for creation/send of the RawPacket; strictly positive
3876 * @param mac the HMAC for the RawPacket and other fields, not checked; never null
3877 */
3878 public PacketProtector(final long timestamp,
3879 final int length,
3880 final List<ROByteArray> mac)
3881 {
3882 this.timestamp = timestamp;
3883 this.length = length;
3884 // Take defensive immutable copy of MAC segment list.
3885 // A single entry is the common case (for typical/small packets).
3886 this.mac = (mac.size() == 1) ? Collections.singletonList(mac.get(0)) :
3887 Collections.unmodifiableList(new ArrayList<ROByteArray>(mac));
3888
3889 try { validateObject(); } // Validate this instance.
3890 catch(final InvalidObjectException e) { throw new IllegalArgumentException(e); }
3891 }
3892
3893 /**Compute segment size; strictly positive power of two.
3894 * Given the full message/frame length
3895 * (strictly positive and <= RawPacket.MAX_FRAME_SIZE)
3896 * this computes the segment size between/at the MIN/MAX limits
3897 * to maximise the number of segments
3898 * and thus maximise the the amount of incremental MAC processing
3899 * and incremental processing of streamed frames that can be done,
3900 * minimising buffer working space and time/bandwidth before corruption is found.
3901 * <p>
3902 * All segments are of the same size, except the last one which may be shorter.
3903 */
3904 private static final int computeSegmentSize(final int frameLength)
3905 {
3906 assert(frameLength <= RawPacket.MAX_FRAME_SIZE);
3907 assert(frameLength > 0);
3908
3909 // Common case (min segment size) for reasonable/small packets.
3910 if(frameLength <= MIN_SEGMENT_SIZE * MAX_SEGMENTS)
3911 { return(MIN_SEGMENT_SIZE); }
3912
3913 // Find minimum segment size that will cover the frame if not MIN_SEGMENT_SIZE.
3914 for(int segSize = MIN_SEGMENT_SIZE<<1; segSize <= MAX_SEGMENT_SIZE; segSize <<= 1)
3915 {
3916 if(segSize * MAX_SEGMENTS >= frameLength)
3917 { return(segSize); }
3918 }
3919
3920 throw new Error("should not get here");
3921 }
3922
3923 /**Compute the MAC given the message and other fields to be included; never null.
3924 * The (secret) Key must also be supplied
3925 * and be suitable for the HMAC algorithm used.
3926 * <p>
3927 * The MAC is computed on a series of segments of the input of the same length
3928 * (except for a possibly-shorter final segment)
3929 * producing an HMAC on each segment.
3930 * <p>
3931 * Each HMAC is computed over a binary message consisting of:
3932 * <ol>
3933 * <li>the 4-byte length of the entire protected stream/frame.
3934 * <li>the 8-byte (Java-style millisecond UTC) timestamp in network order,
3935 * <li>for all segments other than the first, the HMAC of the previous segment,
3936 * <li>the full on-the-wire form of the RawPacket (including header/trailer).
3937 * </ol>
3938 * <p>
3939 * The chaining should make it impossible to reorder the segments.
3940 * <p>
3941 * The segmentation is so that the data in each segment is known-safe
3942 * and can be safely consumed by incremental/streaming CPU-heavy operations
3943 * before subsequent segments have been received and decoded.
3944 * This segmentation also means that we can abort a damaged message
3945 * as soon as we check the damaged segment: we do not need to wait
3946 * to receive and store and check the whole message.
3947 *
3948 * @param raw the RawPacket to protect; never null
3949 * @param timestamp the timestamp for creation/send of the RawPacket; strictly positive
3950 * @param key the (secret) key for the HMAC; never null
3951 *
3952 * @throws InvalidKeyException if the Key supplied is inappropriate
3953 *
3954 * @throws IllegalArgumentException if the timestamp is non-positive
3955 * or any other argument is null
3956 * @throws IllegalStateException if the HMAC algorithm is unavailable
3957 *
3958 * @return immutable in-order list of HMACs to protect each stream segment
3959 */
3960 public static List<ROByteArray> computeMAC(final long timestamp,
3961 final RawPacket raw,
3962 final SecretKey key)
3963 throws InvalidKeyException
3964 {
3965 if((timestamp <= 0) || (raw == null) || (key == null))
3966 { throw new IllegalArgumentException(); }
3967
3968 // Compute the segment size that we will be using.
3969 final int frameLength = raw.getFrameLength();
3970 final int segmentSize = computeSegmentSize(frameLength);
3971 assert(0 == (segmentSize & (segmentSize-1))); // Segment size should be a power of two.
3972
3973 // Compute the number of segments that we will be using.
3974 final int nSegs = (frameLength <= MIN_SEGMENT_SIZE) ? 1 : // Note frameLength never zero.
3975 (frameLength + segmentSize - 1) / segmentSize;
3976
3977 // Compose our result MAC list...
3978 final List<ROByteArray> mac = new ArrayList<ROByteArray>(nSegs);
3979
3980 // Incrementally generate the MAC block by block.
3981 // We use an output stream sink to avoid having to copy large frames.
3982 // This OutputStream is not thread-safe and does not need to be.
3983 // Working memory required is basically only that for the current segment's Mac.
3984 final OutputStream sink = new OutputStream(){
3985 /**MAC for current segment; initialised for first block. */
3986 Mac current = startMacForSegment();
3987
3988 /**Start MAC for next segment.
3989 * The working MAC must be null before this is called.
3990 */
3991 private Mac startMacForSegment()
3992 {
3993 try
3994 {
3995 final Mac m = Mac.getInstance(CoreConsts.HMAC_ALG);
3996 m.init(key);
3997
3998 // First, write 4-byte length of entire message.
3999 m.update(intSer(frameLength));
4000 // Second, write 8-byte timestamp.
4001 m.update(longSer(timestamp));
4002 // If there is a previous MAC, stir it in.
4003 if(null != prevMAC) { m.update(prevMAC); }
4004
4005 return(m);
4006 }
4007 catch(final Exception e)
4008 {
4009 e.printStackTrace();
4010 throw new Error("internal error", e);
4011 }
4012 }
4013
4014 /**The previous MAC value, or null while gathering the first segment. */
4015 private byte[] prevMAC;
4016
4017 /**Count of bytes seen so far; non-negative.
4018 * Implicitly this also indicates how far into the current segment we are.
4019 */
4020 private int bytesSeenSoFar;
4021
4022 /**Process the next block to generate the next MAC.
4023 * Must only be called when we've collected a full segment, or at EOF/close().
4024 */
4025 private void endMacForSegment()
4026 {
4027 try
4028 {
4029 // Compute and record the MAC.
4030 final byte[] hmac = current.doFinal();
4031 prevMAC = hmac;
4032 mac.add(new ROByteArray(hmac));
4033 }
4034 catch(final Exception e)
4035 {
4036 e.printStackTrace();
4037 throw new Error("internal error", e);
4038 }
4039 }
4040
4041 @Override
4042 public void write(final byte[] b, int off, int len) throws IOException
4043 {
4044 // Loop to generate multiple segment MACs if needed.
4045 while(len > 0)
4046 {
4047 // Compute bytes needed to fill the buffer: always positive.
4048 final int offsetInSegBuf = bytesSeenSoFar & (segmentSize-1); // Faster version of bytesSeenSoFar % segmentSize;
4049 final int needed = segmentSize - (offsetInSegBuf);
4050 // How many will we copy in...
4051 final int toCopy = Math.min(needed, len);
4052
4053 // Write the frame bytes to protect.
4054 current.update(b, off, toCopy);
4055
4056 // Adjust all our indexes...
4057 bytesSeenSoFar += toCopy;
4058 off += toCopy;
4059 len -= toCopy;
4060
4061 // If we copied exactly up to the end of the segBuf
4062 // then complete the current MAC and start the next one...
4063 if(toCopy == needed) { endMacForSegment(); current = startMacForSegment(); }
4064 }
4065 }
4066
4067 /**Horribly inefficient, but correct, and should never actually get called. */
4068 @Override public void write(final int b) throws IOException
4069 {
4070 final byte buf[] = new byte[1];
4071 buf[0] = (byte) b;
4072 write(buf, 0, 1);
4073 }
4074
4075 /**Handle the tail of the input. */
4076 @Override public void close() throws IOException
4077 {
4078 assert(bytesSeenSoFar == frameLength); // Should have seen all the data.
4079 endMacForSegment(); // Push out the current (final) MAC...
4080 }
4081 };
4082 // Compute the HMAC value(s).
4083 try { raw.writePacket(sink, false); sink.close(); }
4084 catch(final IOException e) { throw new Error("internal error", e); }
4085 //System.out.println("frameLength/nSegs/mac.size() = " + frameLength + '/' + nSegs + '/' + mac.size());
4086 assert(nSegs == mac.size()); // Did we create the correct number of MACs values?
4087
4088 // Let caller wrap (private) List if needed.
4089 return(mac);
4090 }
4091
4092 /**Protect an input stream with our MAC; aborts with IOException in case of corruption.
4093 * This acts as a transparent pass-through filter
4094 * that will read a MAC-protected-segment at a time from the underlying stream
4095 * and either pass it through having verified it,
4096 * or abort with an IOException if the segment is corrupt.
4097 * <p>
4098 * This closes its input stream and vetoes any further operations
4099 * once any error has been encountered.
4100 * <p>
4101 * We use Key rather than SecretKey,
4102 * since the latter depends on Java extensions that may not be available,
4103 * eg when run in JWS.
4104 * <p>
4105 * Returned input stream not to be multi-threaded.
4106 */
4107 public InputStream protectInputStream(final Key key, final InputStream is)
4108 {
4109 if(key == null) { throw new IllegalArgumentException(); }
4110 if(is == null) { throw new IllegalArgumentException(); }
4111
4112 // Compute our segment size for buffering.
4113 final int segmentSize = computeSegmentSize(length);
4114
4115 // Number of segments.
4116 final int nSegs = mac.size();
4117
4118 // To avoid any confusion, we do not allow mark/reset.
4119 return(new FilterInputStream(is){
4120 /**Set true upon any error; subsequently we will not allow any further read()s. */
4121 private boolean hasErred;
4122
4123 /**Our read buffer for one segment's data (or whole frame if smaller); never null. */
4124 private final byte[] segBuf = new byte[Math.min(length, segmentSize)];
4125
4126 /**Data remaining in segBuf; non-negative. */
4127 private int bufLen;
4128
4129 /**Start of remaining data in segBuf; non-negative. */
4130 private int bufPos;
4131
4132 /**Which MAC segment is next to be loaded/verified. */
4133 private int nextSegNumber;
4134
4135 /**Whatever is in our buffer plus from upstream without blocking. */
4136 @Override public int available() throws IOException
4137 { return(in.available() + bufLen); }
4138
4139 @Override public void mark(final int readlimit)
4140 { throw new UnsupportedOperationException("mark not supported"); }
4141
4142 /**Mark/reset is not allowed to avoid confusion/complexity. */
4143 @Override public boolean markSupported() { return(false); }
4144
4145 /**Read a single byte: we try to make this moderately efficient. */
4146 @Override public int read() throws IOException
4147 {
4148 if(hasErred) { throw new IOException("corrupt stream"); }
4149
4150 // Fast path to return data already in our buffer.
4151 if(bufLen > 0)
4152 {
4153 --bufLen;
4154 return(segBuf[bufPos++] & 0xff);
4155 }
4156
4157 // Use full read() to do all the leg-work.
4158 final byte[] buf1 = new byte[1];
4159 final int n = read(buf1, 0, 1);
4160 if(n < 1) { return(-1); /* EOF */ }
4161 assert(n == 1);
4162 return(buf1[0] & 0xff);
4163 }
4164
4165 /**Bulk data read.
4166 * This routine returns any data it has buffered (ie already verified)
4167 * else it reads a segment of data from the underlying stream
4168 * (possibly less than a full segment at EOF, but verifying the length)
4169 * and validates it with the appropriate MAC.
4170 * <p>
4171 * If validation against the MAC fails
4172 * then this and all subsequent calls to read()
4173 * are vetoed with an IOException.
4174 * <p>
4175 * If validation succeeds then the requested data is returned.
4176 * <p>
4177 * We do not return data in one read across segment boundaries,
4178 * as we are not required to and it would increase complexity.
4179 */
4180 @Override
4181 public int read(final byte[] b, final int off, final int len) throws IOException
4182 {
4183 if((b == null) ||
4184 (off < 0) || (off >= b.length) ||
4185 (len < 0) || (off + len > b.length))
4186 { throw new IllegalArgumentException(); }
4187
4188 if(hasErred) { throw new IOException("corrupt stream"); }
4189
4190 if(len == 0) { return(0); }
4191
4192 // If we have no verified data left in our buffer
4193 // then unless at EOF we need to load another segment and verify it.
4194 if(bufLen == 0)
4195 {
4196 // We are at EOF if we have read the last segment.
4197 if(nextSegNumber >= nSegs) { return(-1); }
4198
4199 // Allow for the final segment to be smaller.
4200 final int readSize = (nextSegNumber < nSegs-1) ? segmentSize :
4201 (length - (segmentSize * (nSegs-1)));
4202 bufPos = 0; // Next caller will read from offset zero.
4203 bufLen = 0; // Buffer is currently empty...
4204 while(bufLen != readSize)
4205 {
4206 try
4207 {
4208 final int n = in.read(segBuf, bufLen, readSize - bufLen);
4209 if(n < 1) { throw new IOException("premature EOF"); }
4210 bufLen += n;
4211 }
4212 catch(final IOException e) { hasErred = true; throw e; }
4213 }
4214
4215 // Verify the data.
4216 try
4217 {
4218 final Mac m = Mac.getInstance(CoreConsts.HMAC_ALG);
4219 m.init(key);
4220
4221 // First, write 4-byte length of entire message.
4222 m.update(intSer(length));
4223 // Second, write 8-byte timestamp.
4224 m.update(longSer(timestamp));
4225 // If there is a previous MAC, stir it in.
4226 if(nextSegNumber > 0) { m.update(mac.get(nextSegNumber-1).toByteArray()); }
4227 // Finally, use the bytes read from upstream.
4228 m.update(segBuf, 0, bufLen);
4229
4230 // Compute and record the MAC.
4231 final byte[] hmac = m.doFinal();
4232 if(!Arrays.equals(mac.get(nextSegNumber++).toByteArray(), hmac))
4233 { throw new IOException("corrupt input data"); }
4234 }
4235 catch(final IOException e)
4236 {
4237 hasErred = true; // Prevent further read()s from this stream.
4238 throw e;
4239 }
4240 catch(final Exception e)
4241 {
4242 hasErred = true;
4243 e.printStackTrace();
4244 throw new Error("internal error", e);
4245 }
4246 }
4247
4248 // The buffer must now have something in it.
4249 assert(bufLen > 0);
4250 // Return verified data from our buffer to the caller.
4251 final int toCopy = Math.min(bufLen, len);
4252 System.arraycopy(segBuf, bufPos, b, off, toCopy);
4253 bufLen -= toCopy;
4254 bufPos += toCopy;
4255 return(toCopy);
4256 }
4257
4258 /**Implemented as read(b, 0, b.length). */
4259 @Override public int read(final byte[] b) throws IOException
4260 { return(read(b, 0, b.length)); }
4261
4262 @Override public void reset() throws IOException
4263 { throw new IOException("reset not allowed"); }
4264
4265 /**We have to read the input to skip it. */
4266 @Override public long skip(long n) throws IOException
4267 {
4268 long skipped = 0;
4269 final byte b[] = new byte[(int) Math.min(1024, n)];
4270 while(n > 0)
4271 {
4272 final int r = read(b, 0, (int) Math.min(n, b.length));
4273 if(r < 1) { break; /* EOF */ }
4274 n -= r;
4275 skipped += r;
4276 }
4277 return(skipped);
4278 }
4279 });
4280 }
4281
4282 /**Checks only that the object content is valid.
4283 * This does NOT attempt to check that the MAC matches the message and fields;
4284 * that must be done explicitly elsewhere.
4285 * <p>
4286 * Partly this does not check the MAC because it does not have access to the key.
4287 */
4288 public void validateObject() throws InvalidObjectException
4289 {
4290 if(timestamp <= 0) { throw new InvalidObjectException("bad object: bad timestamp"); }
4291 if(length < 0) { throw new InvalidObjectException("bad object: bad length"); }
4292 if(mac == null) { throw new InvalidObjectException("bad object: null MAC"); }
4293 if(mac.size() == 0) { throw new InvalidObjectException("bad object: empty MAC"); }
4294 for(final ROByteArray m : mac)
4295 {
4296 if(!(m instanceof ROByteArray)) { throw new InvalidObjectException("bad object: null/bad segment MAC"); }
4297 if(m.length() == 0) { throw new InvalidObjectException("bad object: empty segment MAC"); }
4298 }
4299 }
4300
4301 /**Equality depends on all the members being equal. */
4302 @Override
4303 public boolean equals(final Object obj)
4304 {
4305 if(this == obj) { return(true); }
4306 if(!(obj instanceof PacketProtector)) { return(false); }
4307 final PacketProtector other = (PacketProtector) obj;
4308 return((timestamp == other.timestamp) &&
4309 (length == other.length) &&
4310 mac.equals(other.mac));
4311 }
4312
4313 /**We use the timestamp and length fields in the hash for the entire collection. */
4314 @Override public int hashCode() { return(length ^ (int) timestamp); }
4315 }
4316 }