001 package org.hd.d.pg2k.svrCore.datasource;
002
003 import java.io.BufferedOutputStream;
004 import java.io.ByteArrayInputStream;
005 import java.io.ByteArrayOutputStream;
006 import java.io.DataInputStream;
007 import java.io.DataOutputStream;
008 import java.io.FilterInputStream;
009 import java.io.IOException;
010 import java.io.InputStream;
011 import java.io.InterruptedIOException;
012 import java.io.InvalidObjectException;
013 import java.io.ObjectInputStream;
014 import java.io.ObjectOutputStream;
015 import java.io.OutputStream;
016 import java.io.Serializable;
017 import java.nio.ByteBuffer;
018 import java.security.InvalidKeyException;
019 import java.security.Key;
020 import java.util.ArrayList;
021 import java.util.Arrays;
022 import java.util.BitSet;
023 import java.util.Collections;
024 import java.util.Date;
025 import java.util.Iterator;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Properties;
029 import java.util.concurrent.Callable;
030 import java.util.concurrent.TimeUnit;
031 import java.util.concurrent.atomic.AtomicReference;
032 import java.util.concurrent.locks.ReentrantLock;
033 import java.util.regex.Pattern;
034
035 import javax.crypto.Mac;
036 import javax.crypto.SecretKey;
037 import javax.management.RuntimeErrorException;
038
039 import org.hd.d.pg2k.svrCore.AllExhibitImmutableData;
040 import org.hd.d.pg2k.svrCore.AllExhibitProperties;
041 import org.hd.d.pg2k.svrCore.AllExhibitPropertiesDelta;
042 import org.hd.d.pg2k.svrCore.AllExhibitPropertiesDelta.DiffException;
043 import org.hd.d.pg2k.svrCore.CompressionLevel;
044 import org.hd.d.pg2k.svrCore.CoreConsts;
045 import org.hd.d.pg2k.svrCore.DefInputStream;
046 import org.hd.d.pg2k.svrCore.DefOutputStream;
047 import org.hd.d.pg2k.svrCore.ExhibitName;
048 import org.hd.d.pg2k.svrCore.ExhibitStaticAttr;
049 import org.hd.d.pg2k.svrCore.ExhibitThumbnails;
050 import org.hd.d.pg2k.svrCore.FileTools;
051 import org.hd.d.pg2k.svrCore.GenUtils;
052 import org.hd.d.pg2k.svrCore.MemoryTools;
053 import org.hd.d.pg2k.svrCore.Name;
054 import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
055 import org.hd.d.pg2k.svrCore.PGMasterNotInServiceException;
056 import org.hd.d.pg2k.svrCore.ROByteArray;
057 import org.hd.d.pg2k.svrCore.Rnd;
058 import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
059 import org.hd.d.pg2k.svrCore.Stratum;
060 import org.hd.d.pg2k.svrCore.TextUtils;
061 import org.hd.d.pg2k.svrCore.Tuple;
062 import org.hd.d.pg2k.svrCore.Tuple.Pair;
063 import org.hd.d.pg2k.svrCore.WrappedByteArrayCharSequence;
064 import org.hd.d.pg2k.svrCore.props.GenProps;
065 import org.hd.d.pg2k.svrCore.stats.StatsLogger;
066 import org.hd.d.pg2k.svrCore.vars.BasicVarMgr;
067 import org.hd.d.pg2k.svrCore.vars.EventPeriod;
068 import org.hd.d.pg2k.svrCore.vars.EventVariableValue;
069 import org.hd.d.pg2k.svrCore.vars.InstanceID;
070 import org.hd.d.pg2k.svrCore.vars.SimpleVariableDefinition;
071 import org.hd.d.pg2k.svrCore.vars.SimpleVariableValue;
072 import org.hd.d.pg2k.svrCore.vars.SystemVariables;
073
074 import ORG.hd.d.IsDebug;
075
076 /**Exhibit pipeline stage that fetches its data across a master/slave tunnel.
077 * Some of the visible types and values are used by the implementations
078 * of both ends of the tunnel (eg the tunnel servlet)
079 * to help implement the shared protocol.
080 * <p>
081 * This takes data from the server's own DataSourceBean by default.
082 */
083 public abstract class ExhibitDataTunnelSource implements SimpleExhibitPipelineIF
084 {
085 /**Create an instance, passing in a (non-null) logger to use. */
086 public ExhibitDataTunnelSource(final SimpleLoggerIF logger)
087 {
088 if(logger == null) { throw new IllegalArgumentException(); }
089 this.logger = logger;
090 statsIDTS =
091 new StatsLogger.StatsConfig("TUNNELSOURCE",
092 logger, // Use servlet log if poss.
093 false, // Only dump summaries...
094 12 * 3600, // About every 12 hours.
095 true); // Adaptive.
096 }
097
098 /**If true, try GC and running finalizers after some connection failures.
099 * If we think that connection failure may have been caused by
100 * failure to release resources (elsewhere in the VM!)
101 * then try to force a clean-up from time to time.
102 * But, in case we are wrong, do not do this too often anyway.
103 */
104 private static final boolean FORCE_GC_AND_FINALISERS_AFTER_POLL_FAIL = true;
105
106 /**Minimum wait after failed tunnel call before retry (ms); strictly positive.
107 * A few tens of milliseconds to a second is probably best,
108 * to recover from transient I/O problems quickly,
109 * but without actually busy-waiting...
110 */
111 public static final int FAIL_RETRY_WAIT_MIN_MS = 101;
112
113 /**Maximum wait after failed tunnel call before retry (ms).
114 * Several minutes is probably good,
115 * to cope with a crashed master server (and/or to wait for it to reboot),
116 * or a failed connection,
117 * but too long and our distributed variables will fail needlessly.
118 * <p>
119 * Most requests during the wait will be vetoed cleanly and quickly
120 * allowing timely service to the slave's clients.
121 * <p>
122 * We vary this limit slightly between slaves/instances to help avoid them
123 * colliding with one another especially if hitting this upper bound.
124 */
125 public static final int FAIL_RETRY_WAIT_MAX_MS = 2 * 60 * 1000 +
126 (1 | Rnd.fastRnd.nextInt(1 * 60 * 1000));
127
128 /**If true, allow connection/resource sharing between calls if possible.
129 * We may also periodically check that we can still contact the master.
130 * <p>
131 * An open connection would probably improve TCP efficiency
132 * and reduce latency for an underlying HTTP[S] connection, for example.
133 * (We might allow HTTP/1.1 streaming and connection sharing to be
134 * done for us by the JDK runtime if this is true.)
135 */
136 protected static final boolean KEEP_SERVER_CONNECTION_ALIVE = true;
137
138 /**If true, extra instrumentation for protocol debug; definitely not for release code. */
139 protected static final boolean _protocolDebug = false /* && ORG.hd.d.IsDebug.isDebug */ ;
140
141 /**Our unique client-side end-point identifier.
142 * The ID goes into all outgoing variable updates (and possibly other data)
143 * going upstream to the master to uniquely identify this client.
144 * <p>
145 * This is unique to each tunnel client end-point instance
146 * and is created afresh on each run,
147 * so a rebooted client gets a new ID.
148 * <p>
149 * Uses secure/good generator to try to ensure global uniqueness.
150 * (This may be expensive to compute;
151 * we might be able to defer this until first needed.)
152 */
153 public final SimpleVariableValue uniqueClientID =
154 new SimpleVariableValue(SystemVariables.LOCAL_SYS_ID, InstanceID.createInstanceID());
155
156
157 /**Our logger which falls back to System.out if servlet log not available; never null. */
158 protected final SimpleLoggerIF logger;
159
160 /**The stats set to which we log general tunnel source stats.
161 * The unique codes are the constants TSNAME_XXX.
162 */
163 private final StatsLogger.StatsConfig statsIDTS;
164
165 /**General stats event name: RPC request. */
166 public static final String TSNAME_RPCREQUEST = "RPC-request";
167
168 /**General stats event name: RPC failure due to an IOException. */
169 public static final String TSNAME_RPCIOEX = "RPC-IOException";
170
171 /**General stats event name prefix: RPC request packet type. */
172 public static final String TSNAMEPR_RPCTYPE = "RPC-type-";
173
174 /**General stats event name: "short" raw data read, less than bulk-transfer block size.. */
175 public static final String TSNAME_SHORTREAD = "shortRead";
176
177 /**Convenience value; (immutable) zero-length byte array for use as an empty packet payload.
178 * Used by local classes and TunnelServlet; package visible.
179 */
180 public static final byte[] EMPTY_PAYLOAD = new byte[0];
181
182 /**Time when last successful connection was made, or 0 if no connection..
183 * Declared volatile so that it can be accessed without a lock.
184 * <p>
185 * Set by _noteResult() after a good exchange with the master;
186 * never cleared and never null.
187 */
188 private volatile long lastSuccessfulConnectionTime;
189
190 /**Get time of last successful connection, or 0 if none. */
191 public long getLastSuccessfulConnectionTime()
192 { return(lastSuccessfulConnectionTime); }
193
194 /**Time before which we should not again try to contact the master; null if master is fine.
195 * Declared volatile so that it can be accessed without a lock.
196 * <p>
197 * Cleared to null by _noteResult() after a good exchange with the master;
198 * the wait to another attempt is approximately doubled after a failed exchange.
199 */
200 private volatile Long doNotTryMasterUntil;
201
202 /**Returns true if we currently cannot talk to the master, false otherwise. */
203 public boolean isBroken()
204 {
205 final Long notUntil = doNotTryMasterUntil;
206 return((notUntil != null) &&
207 (notUntil.longValue() > System.currentTimeMillis()));
208 }
209
210 /**Successive failure count; reset to zero upon success.
211 * Private to _noteResult() and accessed under its private lock.
212 * <p>
213 * Cleared to zero on a successful packet exchange;
214 * incremented after each failure.
215 */
216 private int successiveFailureCount;
217
218 /**Private lock for _noteResult. */
219 private final Object _nR_lock = new Object();
220
221 /**Immutable empty result indicating no event values available. */
222 private static final EventVariableValue[] NO_EVENT_VALUES = new EventVariableValue[0];
223
224 /**Routine to note success or failure of RPC call and adjust control variables.
225 * Has its own private lock to ensure atomic update of control variables.
226 *
227 * @param success call with this true immediately after successful RPC exchange with master;
228 * call with false otherwise
229 */
230 private void _noteResult(final boolean success)
231 {
232 synchronized(_nR_lock)
233 {
234 final long now = System.currentTimeMillis();
235 if(success)
236 {
237 // Successful RPC exchange with master.
238 lastSuccessfulConnectionTime = now;
239 doNotTryMasterUntil = null;
240 successiveFailureCount = 0;
241 return;
242 }
243
244 // Failed RPC exchange with the master...
245 if(++successiveFailureCount < 0)
246 { successiveFailureCount = Integer.MAX_VALUE; }
247
248 // Compute first-cut back-off time.
249 // Double for each successive failure.
250 final long approxBackoff = ((long) ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MIN_MS) <<
251 Math.min(32, successiveFailureCount);
252
253 // Coerce to reasonable bounds...
254 final int waitBounded = (int) Math.min(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MAX_MS,
255 Math.max(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MIN_MS, approxBackoff));
256
257 // Roughly double the pause before the next connection attempt,
258 // with a random component to help avoid inter-slave collisions.
259 final long newWaitTime = Math.min(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MAX_MS,
260 waitBounded/2 + Rnd.fastRnd.nextInt(waitBounded));
261
262 // Store the new "do not try before" time...
263 doNotTryMasterUntil = new Long(now + newWaitTime);
264 }
265 }
266
267 /**Does a NO-OP on the server.
268 * If this returns without throwing an exception,
269 * then the connection to the back-end (master) server is probably OK.
270 *
271 * @param unguarded if true, this ignores any recent connection problems
272 * and immediately tries to contact the master,
273 * else it behaves like other operations and is quickly vetoed
274 * while the master is failing
275 *
276 * @throws java.io.IOException in case of trouble communicating with master.
277 */
278 public void doNOOP(final boolean unguarded)
279 throws IOException
280 {
281 final RawPacket response;
282 if(unguarded)
283 {
284 response = doRPCUnguarded(
285 new RawPacket(RawPacket.OpCode.NOOP, ExhibitDataTunnelSource.EMPTY_PAYLOAD));
286 }
287 else
288 {
289 response = doRPC(
290 new RawPacket(RawPacket.OpCode.NOOP, ExhibitDataTunnelSource.EMPTY_PAYLOAD));
291 }
292 if(response.plLength != 0)
293 { throw new IOException("invalid non-empty response to NOOP request"); }
294 return;
295 }
296
297 /**Get the static attributes for a given exhibit.
298 * Returns null if the named exhibit does not exist.
299 * <p>
300 * Sends name as a UTF string and allows compression.
301 * (We use UTF since we know that the length is limited and
302 * the all-ASCII nature of valid names should yield
303 * one-byte-per-char encoding before compression.)
304 * <p>
305 * This will fail with a null name argument.
306 * <p>
307 * (Big secret: if this class is used behind, for example,
308 * ExhibitDataSimpleCache, this method will never be called
309 * because all such calls are answered direct from the cache.)
310 */
311 public ExhibitStaticAttr getStaticAttr(final ExhibitFull name)
312 throws IOException
313 {
314 final ByteArrayOutputStream baos =
315 new ByteArrayOutputStream(name.length());
316 final DataOutputStream dos = new DataOutputStream(baos);
317 dos.writeUTF(name.toString());
318 final byte data[] = baos.toByteArray();
319 dos.close(); // Attempt to free some resources quickly.
320
321 final RawPacket request = new RawPacket(
322 RawPacket.OpCode.GetStaticAttr,
323 data);
324
325 final RawPacket response = doRPC(request);
326 return((ExhibitStaticAttr) response.getSerializedObjectPayload());
327 }
328
329 /**Get a chunk of the raw exhibit binary.
330 * The start position must be non-negative.
331 * <p>
332 * Sends name as a UTF string and allows compression of the request.
333 * (We use UTF since we know that the length is limited and
334 * the all-ASCII nature of valid names should yield
335 * one-byte-per-char encoding before compression.)
336 * <p>
337 * We send the start and (computed) afterEnd arguments as int values.
338 * <p>
339 * We return the data as-is.
340 *
341 *
342 * @throws java.lang.IllegalArgumentException for blatantly invalid name,
343 * or non-positive length
344 */
345 public void getRawFile(final ByteBuffer buf,
346 final Name.ExhibitFull exhibitName,
347 final int position,
348 final boolean dontCache)
349 throws IOException
350 {
351 // We check that that the name is not blatantly illegal...
352 if(!ExhibitName.validNameSyntaxBasic(exhibitName))
353 { throw new IllegalArgumentException("invalid name"); }
354
355 // We don't allow zero-byte requests over the expensive tunnel.
356 int len = buf.limit() - buf.position();
357 if(len < 1)
358 { throw new IllegalArgumentException("request length must be positive"); }
359
360 // Limit any request to a size that the back-end would allow.
361 if(len > MAX_USER_READ_SIZE)
362 { len = MAX_USER_READ_SIZE; }
363
364 if(ExhibitDataTunnelSource._protocolDebug)
365 {
366 System.err.println("ExhibitDataTunnelSource.getRawFile(" +
367 "len="+len+", dontCache="+dontCache+")");
368 }
369
370 // Do the transfer...
371 final int afterEnd = position + len;
372
373 // final ByteArrayOutputStream baos =
374 // new ByteArrayOutputStream(11 + exhibitName.length());
375 // final DataOutputStream dos = new DataOutputStream(baos);
376 // dos.writeUTF(exhibitName.toString());
377 // dos.writeInt(position);
378 // dos.writeInt(afterEnd);
379 // dos.writeBoolean(dontCache);
380 // final byte data[] = baos.toByteArray();
381 // dos.close(); // Attempt to free some resources quickly.
382
383 // Optimised creation of outgoing packet.
384 final int nameLength = exhibitName.length();
385 final byte[] data = new byte[11 + nameLength];
386 data[0] = (byte) (nameLength >>> 8);
387 data[1] = (byte) nameLength;
388 exhibitName.writeToByteArray(data, 2);
389 data[2 + nameLength] = (byte) (position >>> 24);
390 data[3 + nameLength] = (byte) (position >>> 16);
391 data[4 + nameLength] = (byte) (position >>> 8);
392 data[5 + nameLength] = (byte) position;
393 data[6 + nameLength] = (byte) (afterEnd >>> 24);
394 data[7 + nameLength] = (byte) (afterEnd >>> 16);
395 data[8 + nameLength] = (byte) (afterEnd >>> 8);
396 data[9 + nameLength] = (byte) afterEnd;
397 data[10 + nameLength] = (byte) (dontCache ? 1 : 0);
398
399 final RawPacket request = new RawPacket(
400 RawPacket.OpCode.GetRawFile,
401 data);
402
403 final RawPacket response = doRPC(request);
404 response.getPayloadCopy(buf); // Straight into buffer for efficiency.
405 }
406
407 /**Gets all static exhibit data if its timestamp is not that specified.
408 * If the time specified is negative the object will be returned unconditionally.
409 * <p>
410 * If no exhibits are currently installed a default set with a zero
411 * timestamp is returned.
412 * <p>
413 * If the caller's copy appears to be up-to-date (eg the oldStamp
414 * matches that that we would have been returned) null is returned.
415 */
416 public AllExhibitImmutableData getAllExhibitImmutableData(final long oldStamp)
417 throws IOException
418 {
419 final RawPacket response = doRPC(
420 new RawPacket(RawPacket.OpCode.GetAllExhibitImmutableData, ExhibitDataTunnelSource.longSer(oldStamp)));
421 return((AllExhibitImmutableData) response.getSerializedObjectPayload());
422 }
423
424 /**Gets set of all exhibit properties if its hash is not that specified.
425 * If the hash specified is negative the object will be returned unconditionally.
426 * <p>
427 * If no exhibits are currently installed
428 * then a default set with a zero timestamp is returned.
429 * <p>
430 * If the caller's copy appears to be up-to-date
431 * (eg the oldHash matches that that would have been returned)
432 * then null is returned.
433 * <p>
434 * Because the AEP drives the entire Gallery,
435 * we always let the request go over the tunnel even if recent calls have failed.
436 * For example, we don't want mirrors to remain out of sync for too long.
437 * <p>
438 * This also typically takes a lot of bandwidth and CPU/wallclock time,
439 * so is optimised and stripped down to a minimum,
440 * and relies on the AEP deserialisation for serious error checking.
441 * <p>
442 * It is hoped that using the streamed form will overlap I/O and CPU work
443 * and thus get the new AEP into the client quicker and smoother.
444 * <p>
445 * We attempt to be good citizens and note most RPC successes/failures
446 * to help maintain the tunnel's view of the master's status.
447 * <p>
448 * (Because satisfying these calls may take a long time,
449 * the upstream server may veto concurrent queries that return anything
450 * other than null.)
451 * <p>
452 * As a further attempt to minimise time and bandwidth,
453 * we may try the "diff" version of this call,
454 * falling back to the normal version if this fails.
455 */
456 public AllExhibitProperties getAllExhibitProperties(final long oldHash)
457 throws IOException
458 {
459 final RawPacket packetOut = new RawPacket(RawPacket.OpCode.GetAllExhibitProperties, ExhibitDataTunnelSource.longSer(oldHash));
460
461 // The RPC may take a long time to compute the response to, so have a big read timeout.
462 final Pair<Integer, InputStream> stream = doRPCRawWithStreamResponse(packetOut, true);
463 if((stream.first != null) && (stream.first.intValue() == 0))
464 {
465 // An empty response means a null AEP response.
466 stream.second.close();
467 _noteResult(true); /* Note RPC success. */
468 return(null);
469 }
470
471 // Actually deserialise from the input stream.
472 final ObjectInputStream ois = new ObjectInputStream(stream.second);
473 // For robustness, treat a class mismatch problem like a data problem.
474 // We rely on the AllExhibitProperties deserialisation for most error checking.
475 try
476 {
477 final AllExhibitProperties result = (AllExhibitProperties) ois.readObject();
478 _noteResult(true); /* Note RPC success. */
479
480 // Warn if the remote end of the tunnel seems to have sent us a redundant value,
481 // ie sent us something we already have with the hash code that we specified.
482 if((result != null) && (result.longHash == oldHash))
483 {
484 final String message = "WARNING: tunnel fetched AEP with existing hash: " + oldHash;
485 System.err.println(message);
486 logger.log(message);
487 }
488
489 return(result);
490 }
491 catch(final ClassNotFoundException e) { throw new IOException(e.getMessage()); }
492 catch(final InterruptedIOException e)
493 {
494 // Not a failure: can retry later...
495 throw e; /* Rethrow error as-is. */
496 }
497 catch(final IOException e)
498 {
499 _noteResult(false); /* Note RPC failure. */
500 throw e; /* Rethrow error as-is. */
501 }
502 finally { ois.close(); }
503 }
504
505 /**Maximum level of compression in AEP "diff" RPC supported in this implementation, client or server side. */
506 public static final CompressionLevel MAX_AEP_DIFF_COMP_LEVEL = CompressionLevel.LZMA; // GenUtils.MAX_SUPPORTED_COMPRESSION_LEVEL;
507
508 /**Gets set of all exhibit properties if not that specified, attempting minimise data transferred across the tunnel.
509 * This is a tunnel-specific optimisation to minimise bandwidth.
510 * For some clients this will save money and time.
511 * <p>
512 * If the AEP specified is null then the remote AEP will be fetched and returned unconditionally.
513 * <p>
514 * If no exhibits are currently installed
515 * then a default set with a zero timestamp is returned.
516 * <p>
517 * If the caller's copy appears to be up-to-date
518 * (eg the oldHash matches that that would have been returned)
519 * then null is returned.
520 * <p>
521 * This is an attempted optimisation of the getAllExhibitProperties(long oldHash),
522 * returning a diff or extra-highly-compressed AEP representation.
523 * <p>
524 * This may not be supported at all by the remote end of the connection,
525 * or may fail for lack of resources, etc,
526 * so we can automatically fall back to the usual call in case of difficulty.
527 * <p>
528 * (We do not record failures as connection/tunnel problems,
529 * though we do note successful calls in favour of the connection status.)
530 *
531 * @param oldAEP current latest AEP held by the caller
532 * @param allowAutoRecovery if true then allow fallback to generic AEP fetch method
533 */
534 public AllExhibitProperties getAllExhibitProperties(final AllExhibitProperties oldAEP,
535 final boolean allowAutoRecovery)
536 throws IOException
537 {
538 final long oldHash = (oldAEP == null) ? -1 : oldAEP.longHash;
539
540 try
541 {
542 // Construct the header.
543 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
544 final DataOutputStream dos = new DataOutputStream(baos);
545 // Write existing AEP longHash, or -1 if no extant AEP.
546 dos.writeLong(oldHash);
547 // Write current maximum supported AEP compression level as a UTF String.
548 dos.writeUTF(MAX_AEP_DIFF_COMP_LEVEL.name());
549 dos.close();
550 final byte data[] = baos.toByteArray();
551 final RawPacket packetOut = new RawPacket(RawPacket.OpCode.GetAllExhibitPropertiesDiff, data);
552
553 // An empty response (zero bytes) means that our AEP is up-to-date...
554 // The RPC may take a long time to compute the response to, so have a big read timeout.
555 final Pair<Integer, InputStream> stream = doRPCRawWithStreamResponse(packetOut, true);
556 if((stream.first != null) && (stream.first.intValue() == 0))
557 {
558 // An empty response means a null AEP response.
559 stream.second.close();
560 _noteResult(true); /* Note RPC success. */
561 return(null);
562 }
563
564 // Else read from the stream.
565 // 1) A UTF string for the compression level/type (abort if unsupported).
566 // 2) A compressed object stream (in the appropriate format) which may be:
567 // 2a) AllExhibitProperties, in which case return as-is.
568 // 2b) AllExhibitPropertiesDelta, in which case apply to the input AEP and return.
569 try
570 {
571 final DataInputStream dis = new DataInputStream(stream.second);
572 final CompressionLevel cl = CompressionLevel.valueOf(dis.readUTF());
573 if(cl.getLevel() > MAX_AEP_DIFF_COMP_LEVEL.getLevel())
574 { throw new IOException("server returned data compressed at too high a level"); }
575 final InputStream is = GenUtils.wrapForDecompression(stream.second, cl);
576 final ObjectInputStream ois = new ObjectInputStream(is);
577 // Read in the object from the stream on the fly.
578 // TODO: we may want to add a data pump to keep data flowing during processing.
579 final Object o = ois.readObject();
580
581 // If the result is a full (and hopefully super-compressed) AEP
582 // then unpack it and return it here.
583 if(o instanceof AllExhibitProperties)
584 {
585 final AllExhibitProperties result = (AllExhibitProperties) o;
586 _noteResult(true); /* Note RPC success. */
587 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource: successful getAEP(AEP) full: "+cl+".]"); }
588
589 // Warn if the remote end of the tunnel seems to have sent us a redundant value,
590 // ie sent us something we already have with the hash code that we specified.
591 if((result != null) && (result.longHash == oldHash))
592 {
593 final String message = "WARNING: tunnel fetched AEP (supercompressed) with existing hash: " + oldHash;
594 System.err.println(message);
595 logger.log(message);
596 }
597
598 return(result);
599 }
600
601 // If the result is a diff then apply it to the input AEP.
602 if(o instanceof AllExhibitPropertiesDelta)
603 {
604 final AllExhibitPropertiesDelta delta = (AllExhibitPropertiesDelta) o;
605 final AllExhibitProperties result = AllExhibitPropertiesDelta.applyDiff(oldAEP, delta);
606 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource: successful getAEP(AEP) diff: #before/after="+delta.lengthAEIDBefore+"/"+delta.lengthAEIDAfter+", aep before/after="+oldAEP+"/"+result+", "+cl+".]"); }
607
608 // /* TEMPORARY SAFETY MEASURE: REJECT RESULT WITH FEWER EPL/EPC ENTRIES... */
609 // if(result.getExhibitPropsLoadableMap().size() < oldAEP.getExhibitPropsLoadableMap().size())
610 // { throw new IOException("WARNING: rejecting AEP diff result with fewer EPLs..."); }
611 // if(result.getExhibitPropsComputableMap().size() < oldAEP.getExhibitPropsComputableMap().size())
612 // { throw new IOException("WARNING: rejecting AEP diff result with fewer EPCs..."); }
613
614 _noteResult(true); /* Note RPC success. */
615
616 // Warn if the remote end of the tunnel seems to have sent us a redundant value,
617 // ie sent us something we already have with the hash code that we specified.
618 if((result != null) && (result.longHash == oldHash))
619 {
620 final String message = "WARNING: tunnel fetched AEP (via delta) with existing hash: " + oldHash;
621 System.err.println(message);
622 logger.log(message);
623 }
624
625 return(result);
626 }
627
628 throw new IOException("unexpected return type");
629 }
630 finally { stream.second.close(); /* Free (network) resources. */ }
631 }
632 catch(final InterruptedIOException e)
633 { throw e; } // Rethrow as-is: don't attempt recovery yet...
634 catch(final Exception e)
635 {
636 // Always log the problem/exception.
637 e.printStackTrace();
638
639 if(!allowAutoRecovery)
640 {
641 if(e instanceof IOException) { throw (IOException) e; }
642 final IOException err = new IOException("aborting after failed AEP diff RPC");
643 err.initCause(e);
644 throw err;
645 }
646
647 logger.log("ERROR: ExhibitDataTunnelSource: automatically recovering from AEP diff error: "+e.getMessage()+".");
648
649 // In case of error, automatically fall back to generic hash-arg call.
650 // (Iff allowing auto-recovery...)
651 return(getAllExhibitProperties(oldHash));
652 }
653 }
654
655 /**Helper method to serialise a single free-standing long value.
656 * The long value is serialised just as DataOutputStream would,
657 * as 8 bytes with the high-byte first.
658 */
659 public static byte[] longSer(final long value)
660 {
661 final byte[] result = new byte[8];
662 result[0] = (byte) (value >> 56);
663 result[1] = (byte) (value >> 48);
664 result[2] = (byte) (value >> 40);
665 result[3] = (byte) (value >> 32);
666 result[4] = (byte) (value >> 24);
667 result[5] = (byte) (value >> 16);
668 result[6] = (byte) (value >> 8);
669 result[7] = (byte) (value );
670 return(result);
671 }
672
673 /**Helper method to serialise a single free-standing int value.
674 * The int value is serialised just as DataOutputStream would,
675 * as 4 bytes with the high-byte first.
676 */
677 public static byte[] intSer(final int value)
678 {
679 final byte[] result = new byte[4];
680 result[0] = (byte) (value >> 24);
681 result[1] = (byte) (value >> 16);
682 result[2] = (byte) (value >> 8);
683 result[3] = (byte) (value );
684 return(result);
685 }
686
687 /**Gets the general properties as a GenProps object if its timestamp is not that specified.
688 * If the time specified is negative the object will be returned unconditionally.
689 * <p>
690 * If no props are currently installed/available a default set with a zero
691 * timestamp is returned.
692 * <p>
693 * If the caller's copy appears to be up-to-date (eg the oldStamp
694 * matches that that we would have been returned) null is returned.
695 * <p>
696 * On the wire this is (outbound) a 8-byte timestamp and
697 * by return should be an empty payload (corresponding to null)
698 * or a serialised GenProps object (uncompressed for now).
699 * <p>
700 * Because GenProps are so important to the functioning of the entire Gallery,
701 * we always let the request go over the tunnel even if recent calls have failed.
702 */
703 public org.hd.d.pg2k.svrCore.props.GenProps getGenProps(final long oldStamp)
704 throws IOException
705 {
706 //System.err.println("[Fetching GenProps over HTTP tunnel...]");
707
708 final RawPacket response = doRPCUnguarded(
709 new RawPacket(RawPacket.OpCode.GetGenProps, longSer(oldStamp)));
710
711 final GenProps result = (GenProps) response.getSerializedObjectPayload();
712 //System.err.println(" [Fetched GenProps over HTTP tunnel.]");
713 return(result);
714 }
715
716 /**Gets the security properties as a Properties object if its timestamp is not that specified.
717 * If the time specified is negative the object will be returned unconditionally.
718 * <p>
719 * If no props are currently installed/available a default set with a zero
720 * timestamp is returned.
721 * <p>
722 * If the caller's copy appears to be up-to-date (eg the oldStamp
723 * matches that that would have been returned) null is returned.
724 */
725 public java.util.Properties getGenSecProps(final long oldStamp)
726 throws IOException
727 {
728 //System.err.println("[Fetching GenSecProps over HTTP tunnel...]");
729
730 final RawPacket response = doRPC(
731 new RawPacket(RawPacket.OpCode.GetGenSecProps, longSer(oldStamp)));
732
733 final Properties result = (Properties) response.getSerializedObjectPayload();
734 //System.err.println(" [Fetched GenSecProps over HTTP tunnel.]");
735 return(result);
736 }
737
738 /**Gets the thumbnails for an exhibit.
739 * A data source is at liberty to refuse to compute thumbnails
740 * in which case it may return null, else it returns a
741 * non-null value which may include the `could-not-compute'
742 * value to indicate that a thumbnail/sample cannot be made
743 * for this exhibit and no attempt need be made in future.
744 * <p>
745 * Because it is important for a new mirror to gather thumbnails ASAP
746 * to show to a user, we make this call immune to being blocked
747 * when the tunnel connection appears poor.
748 *
749 * @param create if true, and no thumbnail yet exists, try to
750 * create one if possible, else only return an existing one
751 */
752 public ExhibitThumbnails getThumbnails(final ExhibitFull name, final boolean create)
753 throws IOException
754 {
755 //System.err.println("[Fetching Thumbnails over HTTP tunnel...]");
756
757 // final ByteArrayOutputStream baos =
758 // new ByteArrayOutputStream(name.length());
759 // final DataOutputStream dos = new DataOutputStream(baos);
760 // dos.writeUTF(name.toString());
761 // dos.writeBoolean(create);
762 // final byte data[] = baos.toByteArray();
763 // dos.close(); // Attempt to free some resources quickly.
764
765 final int nameLength = name.length();
766 final byte[] data = new byte[3 + nameLength];
767 data[0] = (byte) (nameLength >>> 8);
768 data[1] = (byte) nameLength;
769 name.writeToByteArray(data, 2);
770 data[2 + nameLength] = (byte) (create ? 1 : 0);
771
772 final RawPacket response = doRPCUnguarded(
773 new RawPacket(RawPacket.OpCode.GetThumbnails, data));
774
775 final ExhibitThumbnails result = (ExhibitThumbnails) response.getSerializedObjectPayload();
776 //System.err.println(" [Fetched Thumbnails over HTTP tunnel.]");
777 return(result);
778 }
779
780 /**Set variable.
781 * Only non-local values are propagated upstream;
782 * others are explicitly rejected with an UnsupportedOperationException.
783 *
784 * @throws java.io.IOException in case of I/O problems
785 * @throws java.lang.UnsupportedOperationException when attempting to set locals
786 */
787 public void setVariable(final SimpleVariableValue newValue)
788 throws IOException,
789 UnsupportedOperationException
790 {
791 if(newValue == null) { throw new IllegalArgumentException(); }
792 if(newValue.getDef().isLocal())
793 { throw new UnsupportedOperationException("cannot send local variables over the tunnel"); }
794 // Implement in terms of setVariables().
795 setVariables(new SimpleVariableValue[]{newValue});
796 }
797
798 /**Adjust globalMap for outgoing (upstream) set operation.
799 * This expects the value passed to be:
800 * <ul>
801 * <li>Non-null
802 * <li>Non-local
803 * <li>Read/write
804 * <li>To have an empty globalMap or one with exactly one entry
805 * whose key is the system ID for this system;
806 * if the map is empty then this system is put in the global map
807 * </ul>
808 * <p>
809 * If the argument is in the correct form, it is returned unchanged.
810 * <p>
811 * If the argument is illegal, ie not suitable to send upstream,
812 * then an exception is thrown to veto the send.
813 *
814 * @throws java.lang.IllegalArgumentException if an invalid argument is passed,
815 * eg unable to be converted to be sent upstream because it has
816 * an invalid globalMap
817 */
818 private SimpleVariableValue adjustGlobalMapForSet(final SimpleVariableValue svv)
819 {
820 if(svv == null)
821 { throw new IllegalArgumentException(); }
822
823 final SimpleVariableDefinition def = svv.getDef();
824 if(def.isLocal() ||
825 def.isReadOnly() ||
826 !SystemVariables.defs.contains(def))
827 { throw new IllegalArgumentException(); }
828
829 final Map<InstanceID,SimpleVariableValue> globalMap = svv.getGlobalMap();
830 if((globalMap != null) && (globalMap.size() > 1))
831 { throw new IllegalArgumentException(); }
832
833 // If no globalMap,
834 // then make sure that we make an explicit map entry for this system.
835 final InstanceID sysID = (InstanceID) uniqueClientID.getValue();
836 if(globalMap == null)
837 {
838 // We're done.
839 return(svv.put(sysID, svv, true));
840 }
841
842 if(!Collections.singleton(sysID).equals(globalMap.keySet()))
843 {
844 throw new IllegalArgumentException("globalMap contains ID that is not ours during set: " + sysID);
845 }
846
847 // Value is fine as it is, so return it untouched...
848 return(svv);
849 }
850
851 /**Set variables; must not be null or contain nulls.
852 * Only non-local variables are sent upstream;
853 * locals are silently discarded.
854 * <p>
855 * Variables not defined in SystemVariables are rejected.
856 * <p>
857 * Duplicates may be discarded or rejected,
858 * or sent as is in which case it is undefined in which order they
859 * are applied.
860 * <p>
861 * We try to set all values that we can for
862 * <p>
863 * Format on the wire:
864 * <ul>
865 * <li>Outbound: serialised SimpleVariableValue[]
866 * <li>Inbound: number of values set (int)
867 * </ul>
868 *
869 * @throws java.io.IOException in case of I/O problems
870 */
871 public int setVariables(final SimpleVariableValue[] newValues)
872 throws IOException,
873 UnsupportedOperationException
874 {
875 if(newValues == null) { throw new IllegalArgumentException(); }
876
877 // Make a set of all non-local (and writable) values to pass upstream.
878 // Preserve order...
879 final int nvLen = newValues.length;
880 final List<SimpleVariableValue> toGo = new ArrayList<SimpleVariableValue>(nvLen);
881 for(int i = 0; i < nvLen; ++i)
882 {
883 final SimpleVariableValue svv = newValues[i];
884
885 // Reject null values...
886 if(svv == null)
887 { throw new IllegalArgumentException(); }
888
889 final SimpleVariableDefinition def = svv.getDef();
890
891 // Skip local values;
892 // they should not be propagated out of the machine.
893 // Silently ignore this so that any globals can be sent.
894 if(def.isLocal())
895 { continue; }
896
897 // Adjust globalMap if necessary for sending upstream.
898 // Veto invalid values with an IllegalArgumentException.
899 toGo.add(adjustGlobalMapForSet(svv));
900 }
901
902 // If nothing to send, then return immediately.
903 if(toGo.size() == 0)
904 { return(0); }
905
906 // TODO: eliminate duplicates
907 // TODO: sort into name order to help compression (but preserving order for the same variable)
908
909 // Send set of values upstream
910 // by putting them into an array and serialising it.
911 final SimpleVariableValue r[] = new SimpleVariableValue[toGo.size()];
912 toGo.toArray(r);
913
914 // Make the RPC.
915 final RawPacket response = doRPC(new RawPacket(
916 RawPacket.OpCode.SetVariables,
917 r,
918 true)); // Compression may be useful...
919
920 // Extract return value...
921 if(response.plLength != 4)
922 { throw new IOException("corrupt reply packet: plLength=" + response.plLength); }
923 final DataInputStream dis = new DataInputStream(response.getPayloadAsInputStream());
924 try
925 {
926 final int nSet = dis.readInt();
927 // Validate result returned.
928 if((nSet < 0) || (nSet > nvLen))
929 { throw new IOException("corrupt reply packet: nSet=" + Integer.toHexString(nSet)); }
930 return(nSet);
931 }
932 finally { dis.close(); }
933 }
934
935 /**Get variable, or returns null if no such non-local variable.
936 * We attempt only to fetch non-local variables; return null for others
937 * (except for the system ID).
938 * <p>
939 * When asked for the the system ID,
940 * we always answer the question locally.
941 * <p>
942 * Note that the local system ID can be retrieved whether or not
943 * the master server is responding.
944 * <p>
945 * Format on the wire is:
946 * <ul>
947 * <li>Outbound: variable name in UTF format
948 * <li>Inbound: serialised form of SimpleVariableValue (or empty for null)
949 * </ul>
950 *
951 * @throws java.io.IOException in case of I/O error
952 * @throws UnsupportedOperationException if a local variable
953 * other than the system ID is requested
954 */
955 public SimpleVariableValue getVariable(final SimpleVariableDefinition var)
956 throws IOException,
957 UnsupportedOperationException
958 {
959 if(var == null) { throw new IllegalArgumentException(); }
960
961 // If the request is for the system ID,
962 // return it here.
963 if(var.equals(SystemVariables.LOCAL_SYS_ID))
964 { return(uniqueClientID); }
965
966 // Always return null to requests for local data (other than system ID);
967 // local variables should not be propagated over the tunnel.
968 if(var.isLocal())
969 { throw new UnsupportedOperationException("local variables not available at tunnel source"); }
970
971 // Fetch individual variable value over tunnel...
972 // We only send the name,
973 // and get back a whole variable value,
974 // but on return check for a compatible definition,
975 // returning null if the definitions don't match.
976 final String name = var.getName();
977 final ByteArrayOutputStream baos =
978 new ByteArrayOutputStream(name.length());
979 final DataOutputStream dos = new DataOutputStream(baos);
980 dos.writeUTF(name);
981 final byte data[] = baos.toByteArray();
982 dos.close(); // Attempt to free some resources quickly.
983
984 final RawPacket request = new RawPacket(
985 RawPacket.OpCode.GetVariable,
986 data);
987
988 final RawPacket response = doRPC(request);
989 final SimpleVariableValue svv =
990 (SimpleVariableValue) response.getSerializedObjectPayload();
991
992 // If there was no variable,
993 // then return null immediately.
994 if(svv == null)
995 { return(null); }
996
997 // Discard incompatible types,
998 // and local variables.
999 if(!var.equals(svv.getDef()))
1000 { return(null); }
1001
1002 return(svv);
1003 }
1004
1005 /**Fetch variable values from the master.
1006 * We fetch values from the master
1007 * (from which we eliminate any illegal values such as local variables
1008 * or values which are not locally valid)
1009 * and insert our unique local system ID
1010 * if the request stamp is -1 or older than our creation.
1011 * <p>
1012 * Note that the local system ID can be retrieved whether or not
1013 * the master server is responding providing the stamp is -1,
1014 * though in that case it might be the only value returned.
1015 * The caller should periodically use -1 to ensure that they
1016 * have a full set of global variables, eg in case the server rebooted.
1017 * <p>
1018 * Format on the wire:
1019 * <ul>
1020 * <li>Outbound: serialised timestamp
1021 * <li>Inbound: SimpleVariableValue[]
1022 * </ul>
1023 *
1024 * @throws java.io.IOException
1025 */
1026 public SimpleVariableValue[] getVariables(final long changedSince)
1027 throws IOException
1028 {
1029 final List<SimpleVariableValue> incoming = new ArrayList<SimpleVariableValue>();
1030
1031 // Fetch all (global) variables over the tunnel.
1032 try
1033 {
1034 final RawPacket response = doRPC(
1035 new RawPacket(RawPacket.OpCode.GetVariables, longSer(changedSince)));
1036 // Copy incoming array to List...
1037 incoming.addAll(Arrays.asList(
1038 (SimpleVariableValue[]) response.getSerializedObjectPayload()));
1039 }
1040 catch(final IOException e)
1041 {
1042 // If changedSince == -1 we guarantee to return the local ID,
1043 // else we rethrow the exception...
1044 if(changedSince != -1) { throw e; }
1045 }
1046
1047 // Remove any extant system ID value,
1048 // and any local variable (should not have come up the wire),
1049 // and any value that does not have an identical local definition.
1050 // We do not expect to find/remove any of these in normal operation.
1051 final SimpleVariableDefinition uCIDDef = uniqueClientID.getDef();
1052 for(int i = incoming.size(); --i >= 0; )
1053 {
1054 final SimpleVariableValue svv =
1055 incoming.get(i);
1056 final SimpleVariableDefinition def = svv.getDef();
1057 if(def.equals(uCIDDef) ||
1058 def.isLocal() ||
1059 !SystemVariables.defs.contains(def) ||
1060 !def.equals(SystemVariables.nameToDef.get(def.getName())))
1061 {
1062 incoming.remove(i);
1063 continue;
1064 }
1065 }
1066
1067 // Insert the system ID variable value into the result if appropriate.
1068 if(changedSince <= uniqueClientID.getTimestamp()) // Handles -1 case...
1069 {
1070 incoming.add(uniqueClientID);
1071 }
1072
1073 final SimpleVariableValue result[] =
1074 new SimpleVariableValue[incoming.size()];
1075 incoming.toArray(result);
1076 return(result);
1077 }
1078
1079 /**Get the current partial, or previous full, event set at the specified interval; never null.
1080 * This is a simplified interface to return either the current event set
1081 * that is being collected, or the previous completed set.
1082 * <p>
1083 * The current set is the most timely, but may not contain enough data
1084 * to be meaningful if the new interval has just started.
1085 * <p>
1086 * The previous set is complete and thus most likely to have enough samples
1087 * to be useful, but is not completely current.
1088 * <p>
1089 * Implemented in terms of the more general call
1090 * in the hope that batched calls for several values will be
1091 * more common and more efficient.
1092 * This also reduces the number of distinct RPC calls that
1093 * we have to implement.
1094 *
1095 * @param def event definition (must be for an event); never null
1096 * @param intervalSelector one of EVENT_INTERVAL_SELECTOR_xxx values
1097 * @param current if true the current event set is returned,
1098 * else the previous complete set is returned
1099 *
1100 * @return requested event set; may be empty but never null if requested set not available
1101 */
1102 public EventVariableValue getEventValue(final SimpleVariableDefinition def,
1103 final EventPeriod intervalSelector,
1104 final boolean current)
1105 {
1106 if((def == null) || (intervalSelector == null))
1107 { throw new IllegalArgumentException(); }
1108
1109 final long now = System.currentTimeMillis();
1110 final long currentIntervalNumber = intervalSelector.getIntervalNumber(now);
1111 final long intervalNumber = (current ? currentIntervalNumber : (currentIntervalNumber-1));
1112
1113 final EventVariableValue evv[] = getEventValues(def,
1114 intervalSelector,
1115 intervalNumber,
1116 null);
1117
1118 // If we got some real (non-null) data back, then return it...
1119 if((evv.length > 0) && (evv[0] != null))
1120 { return(evv[0]); }
1121
1122 // Else get empty non-authoritative return value (with no events)...
1123 return(BasicVarMgr.getEmptyNonAuthEVV(def, intervalSelector, intervalNumber));
1124 }
1125
1126 /**Get the specified global event sets for the specified intervals; never null.
1127 * This allows retrieval of zero or more event sets for the specified
1128 * interval size.
1129 * <p>
1130 * Requests for more than SystemVariables.EVENT_SAMPLES_RETAINED in the
1131 * past (or for the future!) cannot be satisfied and data will not be
1132 * returned for them.
1133 * <p>
1134 * Usually not more than SystemVariables.EVENT_SAMPLES_RETAINED samples
1135 * will be returned in response to any one request as a safety measure.
1136 * <p>
1137 * We do various small optimisations to reduce fruitless traffic over
1138 * what may be a slow and/or expensive connection:
1139 * <ul>
1140 * <li>This will reject requests will huge sets that go back to far so as
1141 * to avoid huge RPC arguments and since the request is unlikely to be
1142 * satisfiable. The upstream implementation may be even more restrictive.
1143 * <li>This ignores requests too far in the future or the past.
1144 * <li>Trim leading empty slots from request sent over the tunnel.
1145 * </ul>
1146 * <p>
1147 * The responses from the server may be large and slow,
1148 * so we allow streaming to give better incremental CPU/resource consumption
1149 * and better throughput.
1150 * <p>
1151 * Returns an empty result if asked for a 'local' value.
1152 * <p>
1153 * Returns an empty result if the tunnel is currently broken
1154 * or in case of other non-permanent error.
1155 * <p>
1156 * TODO: Optimise RPC call by allowing for sparse requests with different request/response format.
1157 *
1158 * @param def event definition (must be for an event); never null
1159 * @param intervalSelector one of EVENT_INTERVAL_SELECTOR_xxx values
1160 * @param intervalNumber a time (as from System.currentTimeMillis())
1161 * which identifies the first interval for which data is potentially
1162 * required; if too far in the past or future then possibly no data
1163 * will be available,
1164 * zero is used to access the "all" bucket
1165 * @param whichValues each true bit represents a slot for which data is
1166 * required, bit 0 indicating data from the slot within which
1167 * firstIntervalTime is located, bit 1 the previous slot, etc;
1168 * null is treated as the common case equivalent to just bit 0 set
1169 *
1170 * @return as many of the requested values as available,
1171 * at least long enough to return all the available values,
1172 * with [0] corresponding to bit 0 in the BitSet;
1173 * may contain nulls or be zero-length (eg in case of error) but is never null
1174 */
1175 public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
1176 final EventPeriod intervalSelector,
1177 final long intervalNumber,
1178 BitSet whichValues)
1179 {
1180 if((def == null) ||
1181 (intervalSelector == null) ||
1182 (intervalNumber < 0))
1183 { throw new IllegalArgumentException(); }
1184
1185 if(def.isLocal())
1186 { return(NO_EVENT_VALUES); }
1187
1188 // Don't bother trying this if the connection is currently known to be broken.
1189 if(isBroken()) // { throw new PGMasterNotInServiceException(); }
1190 { return(NO_EVENT_VALUES); }
1191
1192 // See if we can optimise the whichValues argument by replacing it
1193 // with a null value...
1194 // This will reduce heap churn, data on the wire, etc.
1195 if((whichValues != null) && (whichValues.length() == 1) && whichValues.get(0))
1196 { whichValues = null; }
1197
1198 // Various optimisations are possible for a null BitSet arg.
1199 final boolean nullWV = (whichValues == null);
1200
1201 // Take copy of BitSet to ensure that it is minimal size,
1202 // and to reduce thread/race/safety issues.
1203 final BitSet wvCopy = nullWV ? null : new BitSet(whichValues.length());
1204 if(!nullWV) { wvCopy.or(whichValues); }
1205
1206 final int wvl = nullWV ? 1 : wvCopy.length();
1207
1208 // Avoid pointless call for no request bits.
1209 if(wvl < 1)
1210 { return(NO_EVENT_VALUES); }
1211
1212 // Avoid a huge request/response packet.
1213 if(wvl > 2*SystemVariables.EVENT_SAMPLES_RETAINED)
1214 { throw new IllegalArgumentException("request set too large to send over the wire"); }
1215
1216 // Avoid a call for values too far in the past to be likely to be available.
1217 // (This also ensures that the intervalNumber is large and positive for later.)
1218 // Although we allow a zero value for the "all" bucket.
1219 final long currentInterval = intervalSelector.getIntervalNumber(System.currentTimeMillis());
1220 if(intervalNumber == 0)
1221 {
1222 if(!nullWV && (!whichValues.get(0) || (wvl != 1)))
1223 { throw new IllegalArgumentException(); }
1224 }
1225 else if(intervalNumber < currentInterval - 2*SystemVariables.EVENT_SAMPLES_RETAINED - 1)
1226 { return(NO_EVENT_VALUES); }
1227
1228 // Avoid a call for values far too far in the future
1229 // for any skew to account for, etc, so we're sure no data is available.
1230 if(intervalNumber > currentInterval + 4*SystemVariables.EVENT_SAMPLES_RETAINED + 1)
1231 { return(NO_EVENT_VALUES); }
1232
1233 // Get the offset of the first item in the request...
1234 // If this is greater than zero
1235 // then we can in principle optimise the request/response
1236 // by omitting the leading empty slots.
1237 final int firstItemOffset = nullWV ? 0 : wvCopy.nextSetBit(0);
1238 assert(firstItemOffset >= 0);
1239
1240 // Reduce the BitSet that we send if the request can be trimmed...
1241 final BitSet wvToSend = (firstItemOffset == 0) ? wvCopy :
1242 (wvCopy.get(firstItemOffset, wvl));
1243
1244 // Request is:
1245 // * A byte consisting of the ordinal of period/interval enum ordinal.
1246 // * The UTF-8 representation of the name of the definition.
1247 // * The interval number (8 bytes).
1248 // * The serialised form of the request BitSet if non-null.
1249 // Result is:
1250 // * The in-order array of EventVariableValues (never null).
1251 final String name = def.getName();
1252 final ByteArrayOutputStream baos = new ByteArrayOutputStream(32 + name.length() + wvl/2);
1253 final DataOutputStream dos = new DataOutputStream(baos);
1254 final byte data[];
1255 try
1256 {
1257 dos.write(intervalSelector.ordinal());
1258 dos.writeUTF(name);
1259 dos.writeLong(intervalNumber - firstItemOffset);
1260 if(wvToSend != null)
1261 {
1262 final ObjectOutputStream oos = new ObjectOutputStream(dos);
1263 oos.writeObject(wvToSend);
1264 oos.flush();
1265 }
1266 dos.flush();
1267 data = baos.toByteArray();
1268 dos.close(); // Attempt to free some resources quickly.
1269 }
1270 catch(final Exception e) { throw new Error(e); /* Should never happen. */ }
1271
1272 final RawPacket request = new RawPacket(
1273 RawPacket.OpCode.GetEventValues,
1274 data);
1275
1276 // Stream the result, since it may be large enough to be on the wire for a while.
1277 final EventVariableValue[] evvsRaw;
1278 try
1279 {
1280 final Pair<Integer, InputStream> stream = doRPCRawWithStreamResponse(request, false);
1281 try
1282 {
1283 final ObjectInputStream ois = new ObjectInputStream(stream.second);
1284 evvsRaw = (EventVariableValue[]) ois.readObject();
1285 ois.close();
1286 }
1287 finally { stream.second.close(); /* Ensure underlying resources are released. */ }
1288
1289 // Check for unexpected return values.
1290 if(evvsRaw == null)
1291 { throw new IOException("unexpected null return value"); }
1292
1293 // If we offset the request then make a copy set back.
1294 final EventVariableValue[] evvs;
1295 if(firstItemOffset != 0)
1296 {
1297 evvs = new EventVariableValue[evvsRaw.length + firstItemOffset];
1298 System.arraycopy(evvsRaw, 0, evvs, firstItemOffset, evvsRaw.length);
1299 }
1300 else
1301 {
1302 // Did not offset the request...
1303 evvs = evvsRaw;
1304 }
1305
1306 // Check for unexpected/incorrect return values.
1307 for(int i = evvs.length; --i >= 0; )
1308 {
1309 final EventVariableValue evv = evvs[i];
1310 if(evv == null) { continue; }
1311 if(!def.equals(evv.getDef()))
1312 { throw new IOException("unexpected return value with wrong def"); }
1313 if(!intervalSelector.equals(evv.getPeriod()))
1314 { throw new IOException("unexpected return value with wrong period"); }
1315 if(evv.getIntervalNumber() != intervalNumber - i)
1316 { throw new IOException("unexpected return value with wrong interval number"); }
1317 }
1318
1319 _noteResult(true); // Seems to have succeeded.
1320 return(evvs);
1321 }
1322 // Deal with non-permanent errors as empty result value.
1323 // catch(final IOException e) { _noteResult(false); throw e; }
1324 catch(final Exception e) { _noteResult(false); return(NO_EVENT_VALUES); }
1325 }
1326
1327 /**Synchronise variables with upstream values.
1328 * Pushes updated values upstream to the source,
1329 * calls sync on the source if called with the "force" argument true,
1330 * and then retrieves changed values from upstream.
1331 * <p>
1332 * When called with force==true, this acts like a full "memory barrier",
1333 * flushing all write-cached items downstream immediately and afterwards
1334 * getting the value of all upstream values with getVariables(-1),
1335 * but may be expensive in terms of CPU or bandwidth, so use sparingly.
1336 * <p>
1337 * When called with force=false, this returns immediately
1338 * and the operation is not propagated across the tunnel.
1339 * <p>
1340 * In any case, it is rarely the right thing for a casual user
1341 * to vall this as it may be very expensive.
1342 *
1343 * @param force if true, this will force a full write flush,
1344 * a full sync upstream,
1345 * then full read with getVariables(-1),
1346 * to get the effect of a full "barrier";
1347 * otherwise, do nothing
1348 *
1349 * @throws IOException if one is received from upstream
1350 */
1351 public void syncVariables(final boolean force)
1352 throws IOException
1353 {
1354 // Do not propagate across tunnel unless "force"d.
1355 if(!force)
1356 { return; }
1357
1358 // Propagate the "force"d sync upstream,
1359 // and return when done...
1360 doRPC(new RawPacket(RawPacket.OpCode.SyncVariables, EMPTY_PAYLOAD));
1361 }
1362
1363
1364 /**Get requested Properties selected by key and versionID.
1365 * Fetches a Properties set unconditionally (versionID == -1)
1366 * else if the versionID presented is not current.
1367 *
1368 * @param key selector (with possible embedded sub-key)
1369 * for desired properties set; never null
1370 * @param versionID if -1 then map is always returned if available,
1371 * else must be non-negative and null is returned if the versionID
1372 * presented matches that of the current version
1373 * (ie if the caller has presumably got the up-to-date version);
1374 * may be a timestamp or a hash or other value,
1375 * and by convention is zero only for an empty properties set
1376 *
1377 * @return null, or Properties map guaranteed to contain only
1378 * String keys and values
1379 */
1380 public java.util.Properties getProperties(final PropsKey key,
1381 final long versionID)
1382 throws IOException
1383 {
1384 throw new IOException("NOT IMPLEMENTED");
1385 }
1386
1387 /**Returns the (incremented) upstream stratum adjusted to include transit time; never null.
1388 * Returns Stratum.UNKNOWN if the upstream stratum is unknown
1389 * or the upstream instance is already at the maximum stratum.
1390 */
1391 public Stratum getStratum()
1392 throws IOException
1393 {
1394 final long before = System.currentTimeMillis();
1395 final RawPacket response = doRPCUnguarded(
1396 new RawPacket(RawPacket.OpCode.GetStratum, EMPTY_PAYLOAD));
1397 final long after = System.currentTimeMillis();
1398
1399 // Get raw result from upstream host.
1400 final Stratum upstream = (Stratum) response.getSerializedObjectPayload();
1401 assert(null != upstream);
1402
1403 // If 'unknown' upstream stratum then return built-in 'unknown' value.
1404 if(upstream.isUnknownStratum()) { return(Stratum.UNKNOWN); }
1405
1406 // If upstream is already at maximum stratum then this instance must be regarded as unknown stratum.
1407 if(upstream.getStratum() >= Stratum.MAX_STRATUM) { return(Stratum.UNKNOWN); }
1408
1409 // Sanitised round-trip time.
1410 final int rtt = (int) Math.min(2*(int)Stratum.MAX_ROOT_DELAY, Math.max(0, after - before));
1411
1412 // Create a new value for this instance with an incremented stratum
1413 // and one half the RTT to our immediate upstream peer added to the root delay.
1414 final Stratum processed = new Stratum(1 + upstream.getStratum(),
1415 (rtt>>1) + upstream.getRootDelay(),
1416 _getStratumUpstreamName(),
1417 upstream.isUpstreamConserving());
1418
1419 return(processed);
1420 }
1421
1422 /**Return short unique name of upstream peer/server suitable for Stratum; never null but can be "" for 'unknown'.
1423 * Override this in implementations that know the upstream name.
1424 */
1425 protected String _getStratumUpstreamName()
1426 {
1427 return(""); // Unknown upstream peer/server name.
1428 }
1429
1430 /**Poll periodically.
1431 * We can use this to attempt to repair ailing connections, etc,
1432 * as well as poll for asynchronous messages being sent to us by the master.
1433 * <p>
1434 * However, our general policy is not to force traffic if we need not.
1435 * <p>
1436 * This can be overridden by a derived class,
1437 * though it is suggested that this be called with super.poll(gp) if so.
1438 */
1439 public void poll(final GenProps gp)
1440 {
1441 if(KEEP_SERVER_CONNECTION_ALIVE)
1442 {
1443 // If we have a broken connection to the master,
1444 // then a little while before other users can retry it,
1445 // we retry it to see if the master is reachable again.
1446 //
1447 // This enables us to repair a connection silently,
1448 // so that a normal caller is never blocked until
1449 // it is repaired (other calls are vetoed quickly).
1450 //
1451 // Our calls are still backed off if the master is really unwell.
1452 //
1453 // This does not try to force an initial connection,
1454 // just repair a failing one.
1455 final Long notUntil = doNotTryMasterUntil;
1456 if((notUntil != null) &&
1457 (notUntil.longValue() - _REPAIR_RETRY_LEAD_MS < System.currentTimeMillis()))
1458 {
1459 try { doNOOP(true); }
1460 catch(final Exception e)
1461 {
1462 // It is possible that our connection problem is caused
1463 // by a resource leak (eg of unclosed descriptors) elsewhere
1464 // so try and force come cleanup now.
1465 //
1466 // We rely on our normal pacing/backoff of retries
1467 // to avoid us hurting the system too much if we are wrong!
1468 if(FORCE_GC_AND_FINALISERS_AFTER_POLL_FAIL)
1469 {
1470 logger.log("[ExhibitDataTunnelSource: WARNING: failing to re-establish connection so attempting to free resources with gc(), etc...]");
1471 System.runFinalization();
1472 System.gc();
1473 System.runFinalization();
1474 }
1475 }
1476 }
1477 }
1478 }
1479
1480 /**May attempt to free up outbound connections and/or prevent new ones. */
1481 public void destroy()
1482 {
1483 // Prevent most new outgoing connections.
1484 doNotTryMasterUntil = new Long(Long.MAX_VALUE);
1485 }
1486
1487
1488 /**How soon ahead of other callers we try to silently repair a broken connection, ms; non-negative. */
1489 private final int _REPAIR_RETRY_LEAD_MS = 17001 + Rnd.fastRnd.nextInt(3007);
1490
1491
1492 /**Make an RPC call over the underlying medium with the given outgoing packet; never null.
1493 * This collects the response packet and will object
1494 * if it sees any IOException
1495 * or if the packets come back with the wrong op code.
1496 * <p>
1497 * This can be locked on the instance lock to serialise RPCs,
1498 * if the tunnel can only usefully handle one call at once.
1499 * <p>
1500 * Must be implemented by the deriving class
1501 * to suit its transmission medium.
1502 *
1503 * @param packetOut request packet; never null
1504 * @return response packet; never null
1505 * @throws java.io.IOException in case of I/O difficulties
1506 */
1507 protected abstract RawPacket doRPCRaw(final RawPacket packetOut)
1508 throws IOException;
1509
1510 /**Optimised RPC call with the given outgoing packet and returning packet body as an InputStream; never null nor with a null stream.
1511 * This is an <em>optimisation</em> for performance-critical cases <em>only</em>,
1512 * and foregoes some error checking/handling for speed
1513 * (and thus the caller should ensure that it performs integrity checks).
1514 * There is an onus on the streaming-related code to behave safely
1515 * even if fed bogus/corrupt data,
1516 * and it must be able to safely/easily undo any work done
1517 * if the message is found to be bogus as late as the final byte(s).
1518 * <p>
1519 * This streams the content of the response packet
1520 * and will object if it sees any IOException
1521 * or if the packets come back with the wrong op code.
1522 * <p>
1523 * A terminating trailer byte may or may not be visible on the returned stream
1524 * thus allowing the implementation to be as efficient as possible.
1525 * <p>
1526 * This may return after all the input data has been collected,
1527 * or while some or all is still to come,
1528 * and thus the returned stream may fail and throw an exception.
1529 * <p>
1530 * The first element of the result is the length of the response data
1531 * (not including any non-data trailer bytes from the packet even if present)
1532 * but may be null if this length is not known when the packet header is seen,
1533 * eg because the packet body was compressed.
1534 * <p>
1535 * This may be implemented/overridden by the deriving class
1536 * to suit its transmission medium,
1537 * and as an optimisation to reduce copying and allow streaming,
1538 * ie starting to process the input before it is all received.
1539 * <p>
1540 * The data stream is always of uncompressed data,
1541 * regardless of whether the data was sent compressed on the wire,
1542 * ie this routine will correctly decompress data on the fly as/when needed.
1543 * <p>
1544 * The caller <strong>must close</strong> the stream promptly
1545 * to release resources such as file handles and non-Java memory.
1546 *
1547 * @param packetOut request packet; never null
1548 * @param allowBigReadTimeout TODO
1549 * @return response length and data stream; never null
1550 * @throws java.io.IOException in case of I/O difficulties
1551 */
1552 protected Tuple.Pair<Integer, InputStream> doRPCRawWithStreamResponse(final RawPacket packetOut, final boolean allowBigReadTimeout)
1553 throws IOException
1554 {
1555 // Simple implementation in terms of basic doRPCRaw().
1556 final RawPacket result = doRPCRaw(packetOut);
1557
1558 // Check for mismatched response packets
1559 // or remote exceptions codes with special response op-codes.
1560 if(result.opCode != packetOut.opCode)
1561 {
1562 // Deal specially with some fixed remote exception types.
1563 switch(result.opCode)
1564 {
1565 case PGMNISEX:
1566 { throw new PGMasterNotInServiceException(); }
1567 case RUNTEX:
1568 { throw new RuntimeException("remote threw RuntimeException"); }
1569 case REMEX:
1570 { throw new IOException("remote threw RemoteException"); }
1571 case INTEX:
1572 { throw new InterruptedIOException("remote operation interrupted"); }
1573 }
1574 throw new IOException("packet response-type mismatch; got "+result.opCode+" expected "+packetOut.opCode.getCode());
1575 }
1576
1577 final Tuple.Pair<Integer, InputStream> resultStream =
1578 new Tuple.Pair<Integer, InputStream>(result.plLength, result.getPayloadAsInputStream());
1579 return(resultStream);
1580 }
1581
1582
1583 /**Make an RPC call over HTTP[S] with the given outgoing packet.
1584 * This provides error recovery, back-off, stats, etc,
1585 * and serves as a wrapper for the medium-specific doRPCRaw().
1586 * <p>
1587 * Called by all the public data-pipeline methods to transport information
1588 * over the tunnel.
1589 *
1590 * @param packetOut request packet; never null
1591 * @return response packet; never null
1592 * @throws java.io.IOException in case of I/O difficulties
1593 */
1594 protected RawPacket doRPC(final RawPacket packetOut)
1595 throws IOException
1596 {
1597 // Note RPC request...
1598 StatsLogger.captureDataPoint(statsIDTS, TSNAME_RPCREQUEST);
1599 // Note its type...
1600 StatsLogger.captureDataPoint(statsIDTS, TSNAMEPR_RPCTYPE + ((int) packetOut.opCode.getCode()));
1601
1602 // If last communication with master failed too recently,
1603 // quickly veto a new connection attempt for now.
1604 if(isBroken())
1605 {
1606 if(_protocolDebug) { System.err.println("[doRPC(): vetoing connection to broken master.]"); }
1607 throw new PGMasterNotInServiceException("no connection to master");
1608 }
1609
1610 return(doRPCUnguarded(packetOut));
1611 }
1612
1613 /**Just like doRPC() but does not back off in face of previous failures; never null.
1614 * Can be used for calls that must attempt to contact the master anyway,
1615 * or are probing for it to be alive.
1616 * <p>
1617 * Most callers should use the normal doRPC().
1618 *
1619 * @param packetOut never null
1620 * @return response to RPC call; never null
1621 */
1622 protected RawPacket doRPCUnguarded(final RawPacket packetOut)
1623 throws IOException
1624 {
1625 if(_protocolDebug) { System.err.println("[doRPC(): connecting to master.]"); }
1626
1627 try {
1628 if(_protocolDebug) { System.err.println(" [doRPC(): about to do raw RPC.]"); }
1629
1630 // Do the raw, medium-specific RPC.
1631 final RawPacket result = doRPCRaw(packetOut);
1632
1633 if(_protocolDebug) { System.err.println(" [doRPC(): about to check result type.]"); }
1634
1635 // Check for mismatched response packets
1636 // or remote exceptions codes with special response op-codes.
1637 if(result.opCode != packetOut.opCode)
1638 {
1639 // Deal specially with some fixed remote exception types.
1640 // Note that we don't count a OP__RUNTEX as a success OR failure
1641 // for the purposes of link monitoring;
1642 // this may be the master gently rebuffing a request for some reasons.
1643 switch(result.opCode)
1644 {
1645 case PGMNISEX:
1646 { throw new PGMasterNotInServiceException(); }
1647 case RUNTEX:
1648 { throw new RuntimeException("remote threw RuntimeException"); }
1649 case REMEX:
1650 { throw new IOException("remote threw RemoteException"); }
1651 case INTEX:
1652 { throw new InterruptedIOException("remote operation interrupted"); }
1653 }
1654 throw new IOException("packet response-type mismatch; got "+result.opCode+" expected "+packetOut.opCode);
1655 }
1656
1657 // Note last successful use.
1658 _noteResult(true);
1659
1660 if(_protocolDebug) { System.err.println(" [doRPC(): about to return result.]"); }
1661
1662 // Return result successfully.
1663 return(result);
1664 }
1665 catch(final InterruptedIOException e)
1666 {
1667 // Not an error (nor a success): can retry later.
1668 throw e;
1669 }
1670 catch(final IOException e)
1671 {
1672 // Regard (only) IOException as indicator or some sort of connection problem.
1673
1674 // Note RPC request failure with IOException...
1675 StatsLogger.captureDataPoint(statsIDTS, TSNAME_RPCIOEX);
1676
1677 // Note failed connection to the master...
1678 // This will defer our next attempt...
1679 _noteResult(false);
1680
1681 // Note failure in the logs:
1682 logger.log("ExhibitDataTunnelSource: RPC FAILED connection attempt upstream, failed packet: " + packetOut + ", reason: " + e.getMessage());
1683 final long lastS = lastSuccessfulConnectionTime;
1684 if(lastS > 0)
1685 { logger.log("[doRPC(): last success at "+(new Date(lastS))+".]"); }
1686 final Long notUntil2 = doNotTryMasterUntil;
1687 if(notUntil2 != null)
1688 { logger.log("[doRPC(): deferring next attempt until " + (new Date(notUntil2.longValue())) + ".]"); }
1689
1690 // Rethrow the error that we encountered...
1691 throw e;
1692 }
1693 }
1694
1695 /**Minimum time (ms) for which we will hold latest newly-created AEP response; strictly positive.
1696 * A client that automatically retries after an interrupted IO response
1697 * to some sort of AEP request indicating 'already in progress'
1698 * should retry within this time to avoid starting its creation all over again.
1699 * <p>
1700 * Note that in dire straits this lower limit may be ignored.
1701 * <p>
1702 * Should typically be of the order of many minutes.
1703 */
1704 public static final int MIN_AEP_RETENTION_MS = 17 * 60 * 1000;
1705
1706 /**Cache/lock to improve performance of inbound RPC.
1707 * Meant to be opaque to all but handleInboundRPC()
1708 * and is used to provide a better overall responsiveness,
1709 * especially from expensive calls
1710 * by vetoing concurrent expensive calls
1711 * and by cacheing the responses from especially expensive calls.
1712 * <p>
1713 * Not all possibilities are exploited,
1714 * just those that in practice seem to be important.
1715 * <p>
1716 * Can be registered to free its own content automagically
1717 * if the system is very short of memory.
1718 */
1719 public static final class HIRPCCache implements MemoryTools.RecurrentEmergencyFreeHandle
1720 {
1721 /**Lock to prevent servicing more than one very expensive call at once.
1722 * We do this to reduce strain on the memory/GC and CPU of the server.
1723 */
1724 private final ReentrantLock slowResponseLock = new ReentrantLock();
1725
1726 /**Time of creation / last use of basic AEP response; initially zero.
1727 * This covers the GZIPped / extra-compressed / delta responses for the latest AEP,
1728 * but no previous AEP values which are not essential to minimal client progress.
1729 */
1730 private volatile long currentAEPResponseCreatedOrLastUsed;
1731
1732 /**Cache of current AEP as pair of longHash and response packet for simple AEP fetch RPC; mutable.
1733 * Initially null.
1734 * <p>
1735 * Elements of this pair are never null.
1736 * <p>
1737 * Marked volatile for safe multi-threaded access.
1738 * <p>
1739 * Currently held for at least for a minimum period,
1740 * until the AEP fetched has a new hash,
1741 * not for example held via a SoftReference
1742 * since a SoftReference would probably be cleared too soon
1743 * for this to be effective.
1744 * <p>
1745 * This is likely to be many MB in size.
1746 */
1747 private volatile Tuple.Pair<Long, RawPacket> _AEP_response;
1748
1749 /**Cache of full extra-compressed AEP as tuple of longHash, compression format, and response packet for diff AEP fetch RPC; mutable.
1750 * Initially null.
1751 * <p>
1752 * Elements of this tuple are never null.
1753 * <p>
1754 * Marked volatile for safe multi-threaded access.
1755 * <p>
1756 * Currently held for at least for a minimum period,
1757 * until the AEP fetched has a new hash,
1758 * not for example held via a SoftReference
1759 * since a SoftReference would probably be cleared too soon
1760 * for this to be effective.
1761 * <p>
1762 * This is likely to be many MB in size.
1763 */
1764 private volatile Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> _AEP_extracomp_response;
1765
1766 /**Cache of extra-compressed AEP diff as tuple of longHash, compression format, and response packet for diff AEP fetch RPC; mutable.
1767 * Initially null.
1768 * <p>
1769 * Elements of this tuple are never null.
1770 * <p>
1771 * Marked volatile for safe multi-threaded access.
1772 * <p>
1773 * Currently held for at least for a minimum period,
1774 * until the AEP fetched has a new hash,
1775 * not for example held via a SoftReference
1776 * since a SoftReference would probably be cleared too soon
1777 * for this to be effective.
1778 * <p>
1779 * This is likely relatively small.
1780 */
1781 private volatile Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> _AEP_diff_response;
1782
1783 /**Cache of previous AEP value to assist with AEP diffs; initially null.
1784 * Currently held indefinitely (or until very short of free memory),
1785 * until the AEP fetched has a new hash,
1786 * not for example held via a SoftReference
1787 * since a SoftReference would probably be cleared too soon
1788 * for this to be effective.
1789 * <p>
1790 * This should generally share most of its state with the current AEP
1791 * and therefore should not represent a significant extra memory burden.
1792 */
1793 private volatile AllExhibitProperties aepPrev;
1794
1795 /**Soft cache of older AEP values to assist with AEP diffs; never null.
1796 * This is present to help with requests for diffs against AEPs
1797 * older than the previous value, if any such requests are made.
1798 * <p>
1799 * Thread-safe cache of SoftReferences to older AEPs,
1800 * mapped from the Long hash values.
1801 * <p>
1802 * Map is not auto-clear()ed on memory stress
1803 * other than the individual SoftReferences themselves during GC.
1804 * <p>
1805 * It is possible to synchronise on this instance to exclude other activity.
1806 */
1807 private final MemoryTools.SoftReferenceMap<Long, AllExhibitProperties> prevAEPs =
1808 MemoryTools.SoftReferenceMap.<Long, AllExhibitProperties>create("prevAEPs");
1809
1810 /**Can be called to purge most or all internal state when very short of memory.
1811 * Implements the emergency-free handler.
1812 * <p>
1813 * May not purge some elements which are not (or likely not) using significant memory
1814 * but usually save a lot of time for us and for clients.
1815 * To that end we try and estimate the space they are taking
1816 * vs actual current free heap space.
1817 */
1818 public void run()
1819 {
1820 // Clear strongly-referenced previous AEP value if very short of free memory.
1821 // Clearing this will prevent us from creating and sending space-efficient AEP deltas.
1822 if((null != aepPrev) && (MemoryTools.percentFreeWithinTarget() <= 5))
1823 {
1824 aepPrev = null;
1825 System.err.println("HIRPCCache: emergency free of previous AEP value");
1826 }
1827
1828 // Clear the list of previous AEPs if very short of free memory.
1829 // Note that the softly-referenced content will be released automatically in dire straits.
1830 // We only forcefully clear this if no longer retaining the previous AEP strongly any more.
1831 if(!prevAEPs.isEmpty() && (null == aepPrev))
1832 {
1833 prevAEPs.clear();
1834 System.err.println("HIRPCCache: emergency free of old AEP entries: "+prevAEPs.size());
1835 }
1836
1837 // If any basic of the AEP responses is only very recently created or used
1838 // then prevent them from being freed to avoid client starvation.
1839 final long lastUsedBAR = currentAEPResponseCreatedOrLastUsed;
1840 if(lastUsedBAR + MIN_AEP_RETENTION_MS > System.currentTimeMillis())
1841 { return; }
1842
1843 final long freeMem = Runtime.getRuntime().freeMemory();
1844
1845 // This GZIP-compressed response is a bit bulky and shouldn't usually be needed
1846 // unless we are too short of memory to super-compress a response
1847 // and/or a client doesn't have enough memory to receive a super-compressed response.
1848 // We hold onto it grimly for a minimum amount of time
1849 // as being forced to continually regenerate it asynchronously causes starvation
1850 // since this is required for the minimum level of response to support clients.
1851 Tuple.Pair<Long, RawPacket> r = _AEP_response;
1852 if((null != r) && (null != r.second) &&
1853 (r.second.getFrameLength() >= (freeMem>>1)))
1854 {
1855 _AEP_response = null;
1856 System.err.println("HIRPCCache: emergency free of AEP response "+r.second+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1857 }
1858 r = null; // Help GC.
1859
1860 // This extra-compressed response is smaller and saves network bandwidth and is thus more valuable
1861 // and so we'd like to hang onto it if possible.
1862 Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> ecr = _AEP_extracomp_response;
1863 if((null != ecr) && (null != ecr.second) && (null != ecr.second.second) &&
1864 (ecr.second.second.getFrameLength() >= (freeMem>>1)))
1865 {
1866 _AEP_extracomp_response = null;
1867 System.err.println("HIRPCCache: emergency free of super-compressed AEP response "+ecr.second.second+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1868 }
1869 ecr = null; // Help GC.
1870
1871 // This extra-compressed diff response is the smallest and most valuable and most likely to be needed
1872 // and so we'd like to hang onto it if at all possible.
1873 // Though holding onto the delta is potentially more problematic.
1874 Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> dr = _AEP_diff_response;
1875 if((null != dr) && (null != dr.third) &&
1876 (dr.third.getFrameLength() >= (freeMem>>1)))
1877 {
1878 _AEP_diff_response = null;
1879 System.err.println("HIRPCCache: emergency free of super-compressed AEP diff response "+dr.third+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1880 }
1881 dr = null; // Help GC.
1882 }
1883 }
1884
1885 /**Handles input request packet from slave across a tunnel.
1886 * Generates the response packet or throws an exception.
1887 *
1888 * @param reqPacket the request packet; never null
1889 * @param clientAddr the tunnel client's address as seen by us;
1890 * may be null if not applicable or available
1891 * @param cache if not null, then selected routines with
1892 * slow/expensive responses will not be serviced concurrently
1893 * but concurrent calls will be vetoed quickly instead
1894 * and expensive-to-compute values may be (partially) cached
1895 * @param logger logging area for warnings, etc; never null.
1896 *
1897 * @return the response packet; never null
1898 *
1899 * @throws java.io.IOException in case of difficulty upstream
1900 * @throws PGMasterNotInServiceException if the upstream data source
1901 * is down/unavailable
1902 */
1903 public static RawPacket handleInboundRPC(final SimpleExhibitPipelineIF source,
1904 final RawPacket reqPacket,
1905 final String clientAddr,
1906 final HIRPCCache cache,
1907 final SimpleLoggerIF logger)
1908 throws IOException,
1909 PGMasterNotInServiceException
1910 {
1911 if((source == null) || (reqPacket == null))
1912 { throw new IllegalArgumentException(); }
1913
1914 switch(reqPacket.opCode)
1915 {
1916 // For NO-OP we expect an empty data section
1917 // (we simply ignore it even if not empty)
1918 // and reply with an empty data section.
1919 case NOOP:
1920 {
1921 // Create and send the (empty) response packet.
1922 // We don't attempt to compress this.
1923 return(new RawPacket(
1924 RawPacket.OpCode.NOOP,
1925 EMPTY_PAYLOAD,
1926 false)); // Don't attempt compression...
1927 }
1928
1929 case GetGenProps:
1930 { return(handleGetGenProps(source, reqPacket)); }
1931
1932 case GetGenSecProps:
1933 { return(handleGetGenSecProps(source, reqPacket)); }
1934
1935 case GetAllExhibitImmutableData:
1936 { return(handleGetAllExhibitImmutableData(source, reqPacket, cache, logger)); }
1937
1938 case GetAllExhibitProperties:
1939 { return(handleGetAllExhibitProperties(source, reqPacket, clientAddr, cache, logger)); }
1940
1941 case GetAllExhibitPropertiesDiff:
1942 { return(handleGetAllExhibitPropertiesDiff(source, reqPacket, clientAddr, cache, logger)); }
1943
1944 case GetThumbnails:
1945 { return(handleGetThumbnails(source, reqPacket)); }
1946
1947 case GetStaticAttr:
1948 { return(handleGetStaticAttr(source, reqPacket)); }
1949
1950 case GetRawFile:
1951 { return(handleGetRawFile(source, reqPacket)); }
1952
1953 case GetVariable:
1954 { return(handleGetVariable(source, reqPacket)); }
1955
1956 case SetVariables:
1957 { return(handleSetVariables(source, reqPacket, clientAddr, logger)); }
1958
1959 case GetVariables:
1960 { return(handleGetVariables(source, reqPacket)); }
1961
1962 case SyncVariables:
1963 { return(handleSyncVariables(source, reqPacket)); }
1964
1965 case GetEventValues:
1966 { return(handleGetEventValues(source, reqPacket)); }
1967
1968 case GetStratum:
1969 { return(handleGetStratum(source, reqPacket)); }
1970
1971 // For unrecognised requests,
1972 // pretend to throw a RuntimeException.
1973 default:
1974 {
1975 final String message = "unrecognised opcode " +reqPacket.opCode+ " in request from tunnel user " + clientAddr;
1976 logger.log(message);
1977 System.err.println(message);
1978 return(new RawPacket(
1979 RawPacket.OpCode.RUNTEX,
1980 EMPTY_PAYLOAD,
1981 false));
1982 }
1983 }
1984 }
1985
1986
1987 /**Handle an incoming GetGenSecProps request; never null. */
1988 private static RawPacket handleGetGenSecProps(
1989 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
1990 throws IOException
1991 {
1992 final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
1993 readLong();
1994 final Properties gsp = source.getGenSecProps(stamp);
1995 // Create and send the response packet
1996 // (empty for null response,
1997 // else serialised form, compressed when helpful).
1998 return(new RawPacket(
1999 RawPacket.OpCode.GetGenSecProps,
2000 gsp,
2001 true)); // Attempt compression...
2002 }
2003
2004 /**Handle an incoming SyncVariables request; never null. */
2005 private static RawPacket handleSyncVariables(
2006 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2007 throws IOException
2008 {
2009 // No args, no return value.
2010 // The tunnel call is only made when "forced".
2011 source.syncVariables(true);
2012 return(new RawPacket(
2013 RawPacket.OpCode.SyncVariables,
2014 EMPTY_PAYLOAD,
2015 false)); // Compression is not possible.
2016 }
2017
2018 /**Handle an incoming GetAllExhibitPropertiesDiff request; never null.
2019 * Holds the 'slow response' lock while running
2020 * to exclude other such intensive/slow responses concurrently.
2021 */
2022 private static RawPacket handleGetAllExhibitPropertiesDiff(
2023 final SimpleExhibitPipelineIF source,
2024 final RawPacket reqPacket,
2025 final String clientAddr,
2026 final HIRPCCache cache,
2027 final SimpleLoggerIF logger)
2028 throws IOException, InterruptedIOException
2029 {
2030 // Prime a new empty cache with an extant non-empty AEP
2031 // so that we are likely to be able to service
2032 // the first responses after an AEP change with deltas.
2033 if(cache != null)
2034 {
2035 if((cache.aepPrev == null) || (cache.aepPrev.aeid.length == 0))
2036 {
2037 final AllExhibitProperties first = source.getAllExhibitProperties(-1);
2038 if((null != first) && (first.aeid.length != 0))
2039 {
2040 cache.aepPrev = first;
2041 cache.prevAEPs.put(first.longHash, first);
2042 }
2043 }
2044 }
2045
2046 final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2047 final long oldHash = dis.readLong();
2048 // We read the compression level, but need not parse it yet.
2049 final String compression = dis.readUTF();
2050 AllExhibitProperties _aep =
2051 source.getAllExhibitProperties(oldHash);
2052 logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitPropertiesDiff: client="+clientAddr+" hash="+oldHash+" compression="+compression+" response null/hash="+((null==_aep)?"null":String.valueOf(_aep.longHash)));
2053
2054 // Correct for expensive aberrant behaviour further up the stack.
2055 // Whine loudly...
2056 if((null != _aep) && (oldHash == _aep.longHash))
2057 {
2058 final String message = "ERROR: ExhibitDataTunnelSource.handleInboundRPC(): non-null return with identical hash "+oldHash+ " from source "+source;
2059 logger.log(message);
2060 System.err.println(message);
2061 _aep = null; // Pretend that no AEP was in fact returned...
2062 }
2063
2064 // Simple case; nothing to return since the caller is up to date.
2065 if(_aep == null)
2066 {
2067 return(new RawPacket(
2068 RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2069 ExhibitDataTunnelSource.EMPTY_PAYLOAD,
2070 false)); // Compression needed nor possible.
2071 }
2072 final AllExhibitProperties aepFromUpstream = _aep;
2073
2074 // Full implementation is too expensive sans cache.
2075 if(cache == null)
2076 { throw new IOException("operation not fully implemented due to resource constraints"); }
2077
2078 // Decode the client's maximum supported compression level.
2079 // This will throw an IllegalArgumentException if unparseable.
2080 final CompressionLevel cl = CompressionLevel.valueOf(compression);
2081
2082 // If we already have a suitable response cached
2083 // ie for the very same AEP diff with the same old/new hashes.
2084 // then return it immediately.
2085 // We assume therefore that the entire packet will
2086 // be the same for the same response data
2087 // regardless of client, etc.
2088 //
2089 // (This does mean that the caller may miss some
2090 // internally cached updates within the AEP instance, etc,
2091 // but nothing that it cannot recompute if needed.)
2092 //
2093 // Try first for a cached diff/delta.
2094 final Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> cachedDiff = cache._AEP_diff_response;
2095 if((oldHash != -1) &&
2096 (cachedDiff != null) &&
2097 (aepFromUpstream.longHash == cachedDiff.first.longHashAEPAfter) &&
2098 (oldHash == cachedDiff.first.longHashAEPBefore))
2099 {
2100 // If result is compressed at level higher than caller can handle
2101 // then we abort.
2102 // (We could chose to try for the full AEP rather than abort now,
2103 // but it's unlikely to work if this didn't.)
2104 if(cachedDiff.second.getLevel() > cl.getLevel())
2105 { throw new IOException("compression level too high for client"); }
2106
2107 // Good, can use cached diff packet...
2108 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP (diff) delta response packet: "+cachedDiff.third+" to client "+clientAddr+".]");
2109 return(cachedDiff.third);
2110 }
2111
2112 // If the response is other than null
2113 // and we reject concurrent expensive calls,
2114 // then veto it if another such call is already in progress.
2115 // We allow a very limited wait for the lock.
2116 try
2117 {
2118 if(!cache.slowResponseLock.tryLock(CoreConsts.MAX_INTERACTIVE_DELAY_MS, TimeUnit.MILLISECONDS))
2119 {
2120 logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitPropertiesDiff: already in progress: aborting...");
2121 throw new InterruptedIOException("expensive call already in progress, please retry");
2122 }
2123 }
2124 catch(final InterruptedException e)
2125 {
2126 final InterruptedIOException err = new InterruptedIOException(e.getMessage());
2127 err.initCause(e);
2128 Thread.currentThread().interrupt(); // Don't lose "interrupt" status.
2129 throw err;
2130 }
2131
2132 // Create and send the response packet
2133 // (empty for null response,
2134 // else serialised form, full or delta/diff, compressed when helpful).
2135 try
2136 {
2137 // Note whatever was the previous AEP that we had sequestered,
2138 // partly to stop it being GCed until we finish handling this RPC.
2139 final AllExhibitProperties oldAepPrev;
2140 // If the AEP has changed then cache it for deltas, etc.
2141 if((null == (oldAepPrev = cache.aepPrev)) || (aepFromUpstream.longHash != oldAepPrev.longHash))
2142 {
2143 cache.aepPrev = aepFromUpstream;
2144 cache.prevAEPs.put(aepFromUpstream.longHash, aepFromUpstream);
2145 }
2146
2147 // See if we have the correct cached extra-compressed full AEP.
2148 RawPacket cachedFullAEPResponse = null; // Will null out as soon as known not useful to aid GC.
2149 final Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> cachedFull; // Minimise scope to aid GC.
2150 if(((cachedFull = cache._AEP_extracomp_response) != null) && (aepFromUpstream.longHash == cachedFull.first.longValue()))
2151 { cachedFullAEPResponse = cachedFull.second.second; }
2152
2153 if(oldHash != -1)
2154 {
2155 // Caller has supplied their current hash...
2156 // Look for caller's AEP cached here for us to diff against.
2157 // (Zap any dead cache entries that we find in passing.)
2158 AllExhibitProperties callerAEP = null;
2159 final Long hashKey = Long.valueOf(oldHash);
2160 final AllExhibitProperties caep = cache.prevAEPs.get(hashKey);
2161 if(caep != null)
2162 {
2163 assert(caep.longHash == oldHash);
2164 // Good, we have the same AEP that the caller does!
2165 callerAEP = caep;
2166 }
2167
2168 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): old AEP cache size:"+cache.prevAEPs.size()+", caep="+callerAEP+".]"); }
2169 // Do not hold lock on cache.prevAEPs
2170 // while trying to create and return diff/delta.
2171 if(callerAEP != null)
2172 {
2173 // Computing a diff (if possible).
2174 // This generally should not take an enormous amount of working memory.
2175 try
2176 {
2177 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): creating AEP diff, caep="+callerAEP+".]"); }
2178 final long diffStartTime = System.currentTimeMillis();
2179 // We can do a diff/delta!
2180 final AllExhibitPropertiesDelta diff =
2181 AllExhibitPropertiesDelta.createDiff(callerAEP, aepFromUpstream, false);
2182 final long diffEndTime = System.currentTimeMillis();
2183
2184 // Prepare the output packet...
2185 final CompressionLevel levelUsed = MAX_AEP_DIFF_COMP_LEVEL;
2186 ByteArrayOutputStream boas = new ByteArrayOutputStream(1 << 10);
2187 final DataOutputStream dos = new DataOutputStream(boas);
2188 dos.writeUTF(levelUsed.name());
2189 final ObjectOutputStream oos = new ObjectOutputStream(GenUtils.wrapForCompression(dos, levelUsed));
2190 oos.writeObject(diff);
2191 oos.close();
2192 final long compEndTime = System.currentTimeMillis();
2193 RawPacket response = new RawPacket(
2194 RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2195 boas.toByteArray(),
2196 false); // Already compressed.
2197 boas = null; // Help GC.
2198 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed AEP diff ("+diff+") response packet "+response+", caep="+callerAEP+".]"); }
2199
2200 // Abort if we have a full (non-delta) response
2201 // and the delta seems larger than the full response.
2202 if((null != cachedFullAEPResponse) &&
2203 (cachedFullAEPResponse.plLength <= response.plLength))
2204 { throw new IOException("AEP delta larger than full response"); }
2205
2206 // Check that correct client AEP is held.
2207 assert(callerAEP.longHash == oldHash);
2208 // Check that we can construct the correct new AEP.
2209 assert(aepFromUpstream.equals(AllExhibitPropertiesDelta.applyDiff(callerAEP, diff)));
2210
2211 // We intern() the (probably large) response
2212 // to try to avoid holding any duplicates
2213 // and to reduce old-generation heap churn.
2214 response = MemoryTools.intern(response);
2215 cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2216 cache._AEP_diff_response = new Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel,RawPacket>(diff, levelUsed, response);
2217 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed and cached AEP (diff) delta response packet: "+response+"; times: delta/comp="+(diffEndTime-diffStartTime)+"ms/"+(compEndTime-diffEndTime)+"ms; full: "+cachedFullAEPResponse+"; last full extra compressed response: "+cachedFull+".]"); }
2218
2219 // If the result is compressed at a level higher than the caller can handle
2220 // then we must abort.
2221 // The client will have to retry with the non-diff AEP call.
2222 if(levelUsed.getLevel() > cl.getLevel())
2223 { throw new IOException("compression level too high for client"); }
2224
2225 // Return it...
2226 return(response);
2227 }
2228 catch(final DiffException e)
2229 {
2230 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): could not compute AEP (diff) delta: "+e.getMessage()+".]"); }
2231 /* Fall through to send full AEP. */
2232 }
2233 catch(final Exception e)
2234 {
2235 logger.log("ERROR: unexpected exception creating AEP diff: " + e.getMessage());
2236 /* Fall through to send full AEP. */
2237 }
2238 }
2239 }
2240
2241 // CANNOT COMPUTE A DIFF...
2242 // So try to return/compute/cache a full response.
2243 if(cachedFullAEPResponse != null)
2244 {
2245 // If result is compressed at level higher than caller can handle
2246 // then we must abort.
2247 if(cachedFull.second.first.getLevel() > cl.getLevel())
2248 { throw new IOException("compression level too high for client"); }
2249
2250 // Good, can use cached packet...
2251 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP (diff) response packet: "+cachedFull.second+" to client "+clientAddr+".]");
2252 return(cachedFull.second.second);
2253 }
2254
2255 // Compute a new full AEP response packet...
2256 // Super-compressed full AEP; null if not (yet) available/computed.
2257 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computing super-compressed AEP response packet: (longHash="+aepFromUpstream.longHash+") for client "+clientAddr+".]");
2258
2259 // Discard known-defunct old value possibly allowing GC.
2260 cache._AEP_extracomp_response = null;
2261 cachedFullAEPResponse = null; // Help GC.
2262
2263 final CompressionLevel levelUsed = MAX_AEP_DIFF_COMP_LEVEL;
2264 // If result would be compressed at level higher than caller can handle
2265 // then we must abort.
2266 if(levelUsed.getLevel() > cl.getLevel())
2267 { throw new IOException("compression level too high for client"); }
2268
2269 // Computing a full super-compressed AEP may be very memory intensive.
2270 // We do not assume that unlimited resources are available.
2271 // We estimate the working memory required as a fixed overhead
2272 // plus about twice the estimated uncompressed size of the serialised data
2273 // which allows for compression dictionaries plus the actual output size.
2274 final AtomicReference<RawPacket> responseAR = new AtomicReference<RawPacket>();
2275 if(!MemoryTools.runMemoryIntensiveOperation(new Runnable(){
2276 public void run()
2277 {
2278 // Prepare the output packet...
2279 final ByteArrayOutputStream boas = new ByteArrayOutputStream(1 << 10);
2280 final DataOutputStream dos = new DataOutputStream(boas);
2281 try
2282 {
2283 dos.writeUTF(levelUsed.name());
2284 final ObjectOutputStream oos = new ObjectOutputStream(GenUtils.wrapForCompression(dos, levelUsed));
2285 oos.writeObject(aepFromUpstream);
2286 oos.close();
2287 }
2288 catch(final IOException e) { throw new Error("unexpected IOException when building super-compressed AEP", e); }
2289 // We intern() the (probably very large) response
2290 // to try to avoid holding duplicates
2291 // and to reduce old-generation heap churn.
2292 responseAR.set(MemoryTools.intern(new RawPacket(
2293 RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2294 boas.toByteArray(),
2295 false))); // Already compressed.
2296 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed super-compressed AEP response packet: (length="+boas.size()+", compression="+levelUsed+") for client "+clientAddr+".]");
2297 }
2298 },
2299 false, // Definitely not unlimited resources on the server!
2300 (1<<20) /* 1MB overhead */ + 2*aepFromUpstream.estimateSerialBytes()))
2301 { throw new IOException("not currently enough memory to attempt to build super-compressed AEP"); }
2302
2303 final RawPacket response = responseAR.get();
2304 if(null == response) { throw new IOException("failed to build super-compressed AEP"); }
2305 final Tuple.Pair<CompressionLevel, RawPacket> newFull = new Tuple.Pair<CompressionLevel,RawPacket>(levelUsed, response);
2306 cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2307 cache._AEP_extracomp_response = new Tuple.Pair<Long, Tuple.Pair<CompressionLevel,RawPacket>>(new Long(aepFromUpstream.longHash), newFull);
2308
2309 /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed and cached supecompressed AEP (length="+levelUsed+") response packet: "+newFull+".]"); }
2310
2311 // // If result is compressed at level higher than caller can handle
2312 // // then we must abort.
2313 // if(tooCompressedForClient)
2314 // { throw new IOException("compression level too high for client"); }
2315
2316 // Return it...
2317 return(response);
2318 }
2319
2320 finally { cache.slowResponseLock.unlock(); }
2321 }
2322
2323 /**Handle an incoming GetAllExhibitProperties request; never null.
2324 * Holds the 'slow response' lock while running
2325 * to exclude other such intensive/slow responses concurrently.
2326 */
2327 private static RawPacket handleGetAllExhibitProperties(
2328 final SimpleExhibitPipelineIF source,
2329 final RawPacket reqPacket,
2330 final String clientAddr,
2331 final HIRPCCache cache,
2332 final SimpleLoggerIF logger)
2333 throws IOException
2334 {
2335 final long hash = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2336 readLong();
2337 final AllExhibitProperties aep = source.getAllExhibitProperties(hash);
2338
2339 // Simple case; nothing to return since caller is up to date,
2340 // or we allow concurrent expensive calls.
2341 if(aep == null)
2342 {
2343 // Trivial null case.
2344 return(RawPacket.streamSerialiseObject(
2345 RawPacket.OpCode.GetAllExhibitProperties,
2346 null));
2347 }
2348 if(cache == null)
2349 {
2350 try
2351 {
2352 // Where the AEP is not null, force compressed format,
2353 // stream-serialised to minimise peak/intermediate memory footprint.
2354 return(MemoryTools.runMemoryIntensiveOperation((new Callable<RawPacket>(){
2355 public RawPacket call() throws IOException
2356 {
2357 return(RawPacket.streamSerialiseObject(
2358 RawPacket.OpCode.GetAllExhibitProperties,
2359 aep));
2360 }
2361 }),
2362 aep.estimateSerialBytes())); // Take a guess as to memory space required.
2363 }
2364 catch(final IOException e) { throw e; }
2365 catch(final RuntimeErrorException e) { throw e; }
2366 catch(final Exception e) { throw new Error("unexpected exception", e); }
2367 }
2368
2369
2370 // If we already have a suitable response cached
2371 // ie for the very same AEP logHash
2372 // then return it immediately.
2373 // We assume therefore that the entire packet will
2374 // be the same for the same response data
2375 // regardless of client, etc.
2376 //
2377 // (This does mean that the caller may miss some
2378 // internally cached updates within the AEP instance, etc,
2379 // but nothing that it cannot recompute if needed.)
2380 final Tuple.Pair<Long, RawPacket> cached; // Don't retain value preventing GC...
2381 if(((cached = cache._AEP_response) != null) && (aep.longHash == cached.first.longValue()))
2382 {
2383 // Good, can use cached packet...
2384 assert(cached.second != null);
2385 cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2386 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP response packet: "+cached.second+" to client "+clientAddr+".]");
2387 return(cached.second);
2388 }
2389
2390 // If the response is other than null
2391 // and we reject concurrent expensive calls,
2392 // then veto it if another such call is already in progress.
2393 if(!cache.slowResponseLock.tryLock())
2394 {
2395 logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitProperties: already in progress: aborting...");
2396 throw new IOException("expensive call already in progress, please retry");
2397 }
2398 // Create and send the response packet
2399 // (empty for null response,
2400 // else serialised form, compressed when helpful).
2401 try
2402 {
2403 // Clear defunct value while computing the new one.
2404 cache._AEP_response = null;
2405
2406 if(aep != null) { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): creating/cacheing/returning packet-compressed AEP response packet (longHash="+aep.longHash+") to client "+clientAddr+".]"); }
2407 // Compute the response packet...
2408 // Where the AEP is not null, force compressed format,
2409 // stream-serialised to minimise peak/intermediate memory footprint.
2410 RawPacket response;
2411 try
2412 {
2413 // Where the AEP is not null, force compressed format,
2414 // stream-serialised to minimise peak/intermediate memory footprint.
2415 response = MemoryTools.runMemoryIntensiveOperation((new Callable<RawPacket>(){
2416 public RawPacket call() throws IOException
2417 {
2418 return(RawPacket.streamSerialiseObject(
2419 RawPacket.OpCode.GetAllExhibitProperties,
2420 aep));
2421 }
2422 }),
2423 aep.estimateSerialBytes()); // Take a guess as to memory space required.
2424 }
2425 catch(final IOException e) { throw e; }
2426 catch(final RuntimeErrorException e) { throw e; }
2427 catch(final Exception e) { throw new Error("unexpected exception", e); }
2428
2429
2430 // Cache it if the AEP is not null.
2431 if(aep != null)
2432 {
2433 // We intern() the (probably very large) response
2434 // to try to avoid holding duplicates
2435 // and to reduce old-generation heap churn.
2436 response = MemoryTools.intern(response);
2437 cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2438 cache._AEP_response = new Tuple.Pair<Long,RawPacket>(new Long(aep.longHash), response);
2439 logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): created/cached/returning packet-compressed AEP response packet (longHash="+aep.longHash+", plLength="+response.plLength+") to client "+clientAddr+".]");
2440 }
2441 // Return it...
2442 return(response);
2443 }
2444 finally { cache.slowResponseLock.unlock(); }
2445 }
2446
2447 /**Handle an incoming GetGenProps request; never null. */
2448 private static RawPacket handleGetGenProps(
2449 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2450 throws IOException
2451 {
2452 final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2453 readLong();
2454 final GenProps gp = source.getGenProps(stamp);
2455 // Create and send the response packet
2456 // (empty for null response,
2457 // else serialised form, compressed when helpful).
2458 return(new RawPacket(
2459 RawPacket.OpCode.GetGenProps,
2460 gp,
2461 true)); // Attempt compression...
2462 }
2463
2464 /**Handle an incoming GetStaticAttr request; never null. */
2465 private static RawPacket handleGetStaticAttr(
2466 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2467 throws IOException
2468 {
2469 final String name = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2470 readUTF();
2471 final ExhibitStaticAttr esa;
2472 try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2473 catch(final IllegalArgumentException e)
2474 { throw new IOException("invalid argument: bad exhibit name"); }
2475 if(esa == null)
2476 { throw new IOException("invalid argument: non-existent exhibit"); }
2477 // Create and send the response packet
2478 // (empty for null response,
2479 // else serialised form, compressed when helpful).
2480 return(new RawPacket(
2481 RawPacket.OpCode.GetStaticAttr,
2482 esa,
2483 true)); // Allow compression.
2484 }
2485
2486 /**Handle an incoming GetEventValues request; never null.
2487 * Request is:
2488 * <ul>
2489 * <li>A byte consisting of the ordinal of period/interval enum val.
2490 * <li>The UTF-8 representation of the name of the definition.
2491 * <li>The interval number (8 bytes).
2492 * <li>The serialised form of the request BitSet.
2493 * </ul>
2494 * <p>
2495 * Result is:
2496 * <ul>
2497 * <li>The in-order array of EventVariableValues (never null).
2498 * </ul>
2499 */
2500 private static RawPacket handleGetEventValues(
2501 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2502 throws IOException
2503 {
2504 final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2505 final int ordinal = dis.readUnsignedByte();
2506 final EventPeriod epv[] = EventPeriod.values();
2507 if(ordinal >= epv.length)
2508 { throw new IOException("corrupt arguments: bad period ordinal"); }
2509 final EventPeriod intervalSelector = epv[ordinal];
2510 final String name = dis.readUTF();
2511 final SimpleVariableDefinition def = SystemVariables.nameToDef.get(name);
2512 // Deal kindly with possibly-new definition
2513 // (ie if there are newer clients than us)
2514 // by just returning an empty result.
2515 if(def == null)
2516 {
2517 return(new RawPacket(
2518 RawPacket.OpCode.GetEventValues,
2519 NO_EVENT_VALUES,
2520 true)); // Compression may be effective.
2521 }
2522 if(!def.isEvent() || def.isLocal())
2523 { throw new IOException("corrupt arguments: bad definition type: "+def); }
2524 final long intervalNumber = dis.readLong();
2525 if(intervalNumber < 0)
2526 { throw new IOException("corrupt arguments: bad intervalNumber"); }
2527
2528 // If there is trailing data then it is the serialised BitSet
2529 // else this is a null BitSet equivalent to just bit-0 true.
2530 final BitSet whichValues;
2531 if(dis.available() != 0)
2532 {
2533 final ObjectInputStream ois = new ObjectInputStream(dis);
2534 final Object o; // = null;
2535 try { o = ois.readObject(); }
2536 catch(final ClassNotFoundException e)
2537 { throw new IOException("corrupt arguments: class not found: " + e.getMessage()); }
2538 if((o != null) && !(o instanceof BitSet)) // Allow null.
2539 { throw new IOException("corrupt arguments: bad whichValues class on decode"); }
2540 whichValues = (BitSet) o;
2541 }
2542 else
2543 { whichValues = null; }
2544 assert(dis.available() == 0) : "There must be no excess/trailing data in getEventValues() RPC";
2545
2546 // Call upstream to actually get any values that we have...
2547 final EventVariableValue result[] =
2548 source.getEventValues(def, intervalSelector, intervalNumber, whichValues);
2549
2550 // Return the serialised result array.
2551 return(new RawPacket(
2552 RawPacket.OpCode.GetEventValues,
2553 result,
2554 true)); // Compression may be very effective.
2555 }
2556
2557 /**Handle an incoming GetVariables request; never null. */
2558 private static RawPacket handleGetVariables(
2559 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2560 throws IOException
2561 {
2562 final long changedSince = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2563 readLong();
2564 // Get values.
2565 final List<SimpleVariableValue> l = new ArrayList<SimpleVariableValue>(Arrays.asList(
2566 source.getVariables(changedSince)));
2567 // Return only non-local variables.
2568 for(final Iterator<SimpleVariableValue> it = l.iterator(); it.hasNext(); )
2569 {
2570 final SimpleVariableValue svv =
2571 it.next();
2572 if(svv.getDef().isLocal())
2573 { it.remove(); }
2574 }
2575 // Sort by variable name for better compression on the wire.
2576 Collections.sort(l, SimpleVariableValue.compByDef);
2577 // Convert to array.
2578 final SimpleVariableValue svvs[] =
2579 new SimpleVariableValue[l.size()];
2580 l.toArray(svvs);
2581 return(new RawPacket(
2582 RawPacket.OpCode.GetVariables,
2583 svvs,
2584 true)); // Compression may be very effective.
2585 }
2586
2587 /**Handle an incoming SetVariables request; never null.
2588 * NOTE: for security/sanity reasons,
2589 * reject any attempt to set where the value does not
2590 * have a credible globalMap of exactly size == 1.
2591 * <p>
2592 * We'll validate the entire set of values,
2593 * and reject the whole lot if any are dubious.
2594 */
2595 private static RawPacket handleSetVariables(
2596 final SimpleExhibitPipelineIF source, final RawPacket reqPacket,
2597 final String clientAddr, final SimpleLoggerIF logger)
2598 throws IOException
2599 {
2600 // Decode array of variable values to apply.
2601 final SimpleVariableValue svvs[] = (SimpleVariableValue[])
2602 reqPacket.getSerializedObjectPayload();
2603
2604 // Get our local ID so that we can veto changes
2605 // claiming to be from us for sanity/security.
2606 // If we get a null (not definition currently)
2607 // this will not cause an exception to be thrown,
2608 // we just won't be able to screen.
2609 final SimpleVariableValue ourID =
2610 source.getVariable(SystemVariables.LOCAL_SYS_ID);
2611 final InstanceID iid = (ourID == null) ? null:
2612 ((InstanceID) ourID.getValue());
2613
2614 // Validate the requests.
2615 // Silently ignore:
2616 // * null entries
2617 // * locals
2618 // * anything with a definition that we don't have
2619 // * anything without a globalMap of size 1
2620 // * TODO: anything claiming to be from our system ID
2621 // * claims to be setting values from multiple client IDs
2622 //
2623 // We ignore invalid individual entries rather than
2624 // rejecting the entire request for robustness if talking to
2625 // a slightly different age client to eliminate any
2626 // variable value that has no local definition,
2627 // but still apply the rest of the values,
2628 // though we still reject obviously bogus values.
2629 //
2630 // Capture the client system ID in passing
2631 // (each variable value should have a globalMap with
2632 // exactly one entry, which is the client's ID).
2633 final InstanceID clientID = null;
2634 for(int i = svvs.length; --i >= 0; )
2635 {
2636 final SimpleVariableValue svv = svvs[i];
2637
2638 // Reject obviously bogus values.
2639 if((svv == null) ||
2640 svv.getDef().isLocal() ||
2641 (svv.getGlobalMap() == null) ||
2642 (svv.getGlobalMap().size() != 1))
2643 { throw new IllegalArgumentException(); }
2644
2645 try {
2646 // Quietly ignore variables that we don't (currently)
2647 // have definitions for locally.
2648 if(!SystemVariables.defs.contains(svv.getDef()))
2649 {
2650 logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") with no local definition");
2651 continue;
2652 }
2653
2654 // Get the system that *is* in the globalMap...
2655 final InstanceID putativeClientID =
2656 svv.getGlobalMap().keySet().iterator().next();
2657 // Abort if it is our ID!
2658 if(clientID == null)
2659 {
2660 // OK, this must be from the first variable,
2661 // so trigger some one-off processing...
2662
2663 // Discard obviously-forged / looped values.
2664 if((iid != null) &&
2665 iid.equals(putativeClientID))
2666 {
2667 logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") with bad/looped IID: "+iid);
2668 continue;
2669 }
2670
2671 // Note the client's remote address
2672 // (ie make an entry in an upstream global Map)...
2673 final SimpleVariableValue newValue = new SimpleVariableValue(
2674 SystemVariables.TunnelServlet_SLAVE_ADDRS,
2675 clientAddr);
2676 source.setVariable(newValue.put(putativeClientID,
2677 newValue,
2678 true));
2679 }
2680 else
2681 {
2682 if(!clientID.equals(putativeClientID))
2683 {
2684 logger.log("Stopping before accepting tunnelled setVariable("+svv.getDef()+"): client claiming multiple InstanceIDs!");
2685 break; // Give up now...
2686 }
2687 }
2688 }
2689 catch(final IllegalArgumentException e)
2690 {
2691 logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") that got IllegalArgumentException: " + e.getMessage());
2692 continue;
2693 }
2694 catch(final UnsupportedOperationException e)
2695 {
2696 logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") that got UnsupportedOperationException: " + e.getMessage());
2697 continue;
2698 }
2699 }
2700
2701 // Apply the values, in the order received...
2702 final int nSet = source.setVariables(svvs);
2703
2704 // Create and send the response packet.
2705 return(new RawPacket(
2706 RawPacket.OpCode.SetVariables,
2707 intSer(nSet),
2708 false)); // Don't try compression; none to be had...
2709 }
2710
2711 /**Handle an incoming GetVariable request; never null. */
2712 private static RawPacket handleGetVariable(
2713 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2714 throws IOException
2715 {
2716 final String name = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2717 readUTF();
2718 // Look up variable definition from name.
2719 final SimpleVariableDefinition def =
2720 SystemVariables.nameToDef.get(name);
2721 // If no such variable,
2722 // or it is a local,
2723 // then immediately return an empty result.
2724 if((def == null) || def.isLocal())
2725 {
2726 return(new RawPacket(
2727 RawPacket.OpCode.GetVariable,
2728 EMPTY_PAYLOAD,
2729 false)); // No compression...
2730 }
2731 // Get the variable value (if any)...
2732 // Note that this may include a big globalMap.
2733 final SimpleVariableValue svv =
2734 source.getVariable(def);
2735 // Create and send the response packet
2736 // (empty for null response,
2737 // else serialised form, compressed when helpful).
2738 return(new RawPacket(
2739 RawPacket.OpCode.GetVariable,
2740 svv,
2741 true)); // Allow compression.
2742 }
2743
2744 /**If true then never pass through a request to cache content locally.
2745 * Content may be cached anyway,
2746 * but avoiding passing through a cache request here may help break request 'loops'
2747 * and/or conserve master cache space for example.
2748 */
2749 private static final boolean BLOCK_RAW_FILE_CACHEING = true;
2750
2751 /**Handle an incoming GetRawFile request; never null.
2752 * We satisfy the initial portion of over-length requests.
2753 * <p>
2754 * FIXME: decide whether to honour a false dontCache flag or not
2755 */
2756 private static RawPacket handleGetRawFile(
2757 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2758 throws IOException
2759 {
2760 // Validate for reasonable-size request packet.
2761 if(reqPacket.plLength < 11 + ExhibitName.MIN_NAME_LENGTH)
2762 { throw new IOException("invalid argument: bad exhibit name"); }
2763
2764 // Decode the request directly from the payload (without a copy operation if possible).
2765 final byte[] payload = reqPacket.getPayload();
2766
2767 // First the exhibit name as-if a UTF String value (but we know it to be all 7-bit).
2768 final int nameLen = (((payload[0] & 0xff) << 8) + (payload[1] & 0xff));
2769 if((nameLen < ExhibitName.MIN_NAME_LENGTH) || (nameLen > ExhibitName.MAX_NAME_LENGTH))
2770 { throw new IOException("invalid argument: bad exhibit name length"); }
2771 final CharSequence name = new WrappedByteArrayCharSequence(payload, 2, nameLen);
2772 // Look up (validate) name
2773 // and efficiently convert to (or get) Name.ExhibitFull
2774 // and get size bound
2775 // all in one step...
2776 final ExhibitStaticAttr esa;
2777 try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2778 catch(final IllegalArgumentException e)
2779 { throw new IOException("invalid argument: bad exhibit name"); }
2780 if(esa == null)
2781 { throw new IOException("invalid argument: non-existent exhibit"); }
2782 final int start =
2783 ((payload[2 + nameLen] ) << 24) +
2784 ((payload[3 + nameLen] & 0xff) << 16) +
2785 ((payload[4 + nameLen] & 0xff) << 8) +
2786 ((payload[5 + nameLen] & 0xff) );
2787 final int afterEndRaw =
2788 ((payload[6 + nameLen] ) << 24) +
2789 ((payload[7 + nameLen] & 0xff) << 16) +
2790 ((payload[8 + nameLen] & 0xff) << 8) +
2791 ((payload[9 + nameLen] & 0xff) );
2792 // Coerce afterEnd into range, and allow zero-length requests.
2793 final int afterEnd = (int) Math.min(afterEndRaw, esa.length);
2794 final boolean dontCache = BLOCK_RAW_FILE_CACHEING || (payload[10 + nameLen] != 0);
2795
2796 // Disallow patently absurd values, eg due to request-packet corruption.
2797 if((start < 0) || (afterEnd < start))
2798 { throw new IOException("invalid argument: bad range"); }
2799 final int len = afterEnd - start;
2800 if(len > MAX_USER_READ_SIZE)
2801 { throw new IOException("invalid argument: request too large"); }
2802
2803 // Fetch the data from upstream...
2804 final byte rawData[] = new byte[len];
2805 final ByteBuffer buf = ByteBuffer.wrap(rawData);
2806 // Fetch whatever we quickly can.
2807 source.getRawFile(buf, esa.getExhibitFullName(), start, dontCache);
2808 // Flip the buffer to nominally switch to reading from it.
2809 buf.flip();
2810
2811 // Create and send the response packet containing raw exhibit data.
2812 //
2813 // Useful compression may be possible for some exhibit fragments/types,
2814 // but here we are slightly uncomfortable with
2815 // the potential vulnerability to invisible corruption of binary data
2816 // on the core (and non-re-compressible) exhibit type (JPEG)
2817 // with no application-level checksum on these frames,
2818 // so we simply avoid trying to compress *.jpg file data for now.
2819 return(new RawPacket(
2820 RawPacket.OpCode.GetRawFile,
2821 rawData, buf.limit(),
2822 !TextUtils.endsWith(esa.getCharSequence(), ".jpg")));
2823 }
2824
2825 /**Handle an incoming GetThumbnails request; never null. */
2826 private static RawPacket handleGetThumbnails(
2827 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2828 throws IOException
2829 {
2830 // Validate for reasonable-size request packet.
2831 if(reqPacket.plLength < 3 + ExhibitName.MIN_NAME_LENGTH)
2832 { throw new IOException("invalid argument: bad exhibit name"); }
2833 // Decode the request directly from the payload (without a copy operation if possible).
2834 final byte[] payload = reqPacket.getPayload();
2835 // First the exhibit name as-if a UTF String value (but we know it to be all 7-bit).
2836 final int nameLen = (((payload[0] & 0xff) << 8) + (payload[1] & 0xff));
2837 if((nameLen < ExhibitName.MIN_NAME_LENGTH) || (nameLen > ExhibitName.MAX_NAME_LENGTH))
2838 { throw new IOException("invalid argument: bad exhibit name length"); }
2839 final CharSequence name = new WrappedByteArrayCharSequence(payload, 2, nameLen);
2840 final boolean create = (payload[2 + nameLen] != 0);
2841
2842 // final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2843 // final String name = dis.readUTF();
2844 // final boolean create = dis.readBoolean();
2845 // dis.close(); // Hopefully free some resources quickly.
2846
2847 final ExhibitStaticAttr esa;
2848 try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2849 catch(final IllegalArgumentException e)
2850 { throw new IOException("invalid argument: bad exhibit name"); }
2851 if(esa == null)
2852 { throw new IOException("invalid argument: non-existent exhibit"); }
2853
2854 final ExhibitThumbnails eth =
2855 source.getThumbnails(esa.getExhibitFullName(), create);
2856
2857 // Create and send the response packet
2858 // (empty for null response, else serialised form).
2859 // We allow compression because it may save a little bandwidth.
2860 return(new RawPacket(
2861 RawPacket.OpCode.GetThumbnails,
2862 eth,
2863 true)); // Allow compression to be attempted, though probably marginal at best.
2864 }
2865
2866 /**Handle an incoming GetAllExhibitImmutableData request; never null.
2867 * Holds the 'slow response' lock while running
2868 * to exclude other such intensive/slow responses concurrently.
2869 */
2870 private static RawPacket handleGetAllExhibitImmutableData(
2871 final SimpleExhibitPipelineIF source, final RawPacket reqPacket,
2872 final HIRPCCache cache, final SimpleLoggerIF logger)
2873 throws IOException
2874 {
2875 final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2876 readLong();
2877 final AllExhibitImmutableData aeid =
2878 source.getAllExhibitImmutableData(stamp);
2879
2880 // Simple case; nothing to return since caller is up to date,
2881 // or we allow concurrent expensive calls.
2882 if((aeid == null) || (cache == null))
2883 {
2884 return(new RawPacket(
2885 RawPacket.OpCode.GetAllExhibitImmutableData,
2886 aeid,
2887 true)); // Allow compression.
2888 }
2889
2890 // If the response is other than null
2891 // and we reject concurrent expensive calls,
2892 // then veto it if another such call is already in progress.
2893 if(!cache.slowResponseLock.tryLock())
2894 {
2895 logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitImmutableData: already in progress: aborting...");
2896 throw new IOException("expensive call already in progress, please retry");
2897 }
2898 // Create and send the response packet
2899 // (empty for null response,
2900 // else serialised form, compressed when helpful).
2901 try
2902 {
2903 return(new RawPacket(
2904 RawPacket.OpCode.GetAllExhibitImmutableData,
2905 aeid,
2906 true)); // Allow compression.
2907 }
2908 finally { cache.slowResponseLock.unlock(); }
2909 }
2910
2911 /**Handle an incoming GetStratum request; never null. */
2912 private static RawPacket handleGetStratum(
2913 final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2914 throws IOException
2915 {
2916 final Stratum stRaw = source.getStratum();
2917
2918 // Just before returning the Stratum info,
2919 // generate an amended value to reflect our conservation status if necessary
2920 // so that it's logically right 'on the wire'.
2921 final Stratum st;
2922 if(GenUtils.mustConservePower() == stRaw.isUpstreamConserving())
2923 { st = stRaw; } // Will do as-is!
2924 else
2925 {
2926 // Flip conserving flag to correctly reflect this instance's state as the 'upstream' server.
2927 st = new Stratum(stRaw.getStratum(), stRaw.getRootDelay(), stRaw.getUpstreamName(),
2928 !stRaw.isUpstreamConserving());
2929 }
2930
2931 // Create and send the response packet.
2932 return(new RawPacket(
2933 RawPacket.OpCode.GetStratum,
2934 st,
2935 false)); // Don't attempt compression (probably won't work and could inflate RTT estimates).
2936 }
2937
2938
2939 /**Immutable raw packet to send in either direction over a byte stream connection.
2940 * This consists of an op-code byte,
2941 * a byte-array payload,
2942 * and a trailer byte (different to any op-code byte).
2943 * <p>
2944 * We informally attempt to keep a decent Hamming distance
2945 * between different op-codes (ie more than one bit at a time
2946 * should have to change to mutate one into another).
2947 * <p>
2948 * The byte array can be sent compressed (using the gzip
2949 * algorithm), and this compression is adaptive, ie if
2950 * compression does not save space then it is not used.
2951 * The assumption if compression is requested is that it
2952 * is worthwhile burning lots of CPU cycles to compress
2953 * the data as far as possible, and so we may attempt very
2954 * CPU-hungry settings. Compressed data is sent on the
2955 * wire with the length actually -compressedDataLength.
2956 * <p>
2957 * NOTE: this is only guaranteed immutable if the payload is not tampered with
2958 * after being passed to the constructor.
2959 */
2960 public static final class RawPacket implements MemoryTools.Internable
2961 {
2962 /**Operation code: NO-OP (no operation). */
2963 public static final byte OP_NOOP = 0;
2964
2965 /**Operation code: getGenProps(). */
2966 public static final byte OP_getGenProps = 3;
2967
2968 /**Operation code: getAllExhibitImmutableData(). */
2969 public static final byte OP_getAllExhibitImmutableData = 5;
2970
2971 /**Operation code: getStaticAttr(). */
2972 public static final byte OP_getStaticAttr = 9;
2973
2974 /**Operation code: getRawFile(). */
2975 public static final byte OP_getRawFile = 10;
2976
2977 /**Operation code: getGenSecProps(). */
2978 public static final byte OP_getGenSecProps = 13;
2979
2980 /**Operation code: getAllExhibitProperties(). */
2981 public static final byte OP_getAllExhibitProperties = 17;
2982 /**Operation code: getAllExhibitPropertiesDiff(). */
2983 public static final byte OP_getAllExhibitPropertiesDiff = 18;
2984
2985 /**Operation code: getThumbnails(). */
2986 public static final byte OP_getThumbnails = 23;
2987
2988 /**Operation code: getVariable(). */
2989 public static final byte OP_getVariable = 26;
2990
2991 /**Operation code: getVariables(). */
2992 public static final byte OP_getVariables = 29;
2993
2994 /**Operation code: setVariables(). */
2995 public static final byte OP_setVariables = 30;
2996
2997 /**Operation code: syncVariables(). */
2998 public static final byte OP_syncVariables = 33;
2999
3000 // /**Operation code: getEventValue(). */
3001 // public static final byte OP_getEventValue = 38;
3002
3003 /**Operation code: getEventValues(). */
3004 public static final byte OP_getEventValues = 41;
3005
3006 /**Operation code: getStratum(). */
3007 public static final byte OP_getStratum = 46;
3008
3009
3010 /**Reserved op-code for use in response packets to indicate that the operation was interrupted and can be retried (not an error). */
3011 public static final byte OP__INTEX = 119;
3012
3013 /**Reserved op-code for use in response packets to indicate a PGMasterNotInServiceException. */
3014 public static final byte OP__PGMNISEX = 121;
3015
3016 /**Reserved op-code for use in response packets to indicate a RemoteException. */
3017 public static final byte OP__REMEX = 125;
3018
3019 /**Reserved op-code for use in response packets to indicate a RuntimeException.
3020 * Receipt of one of these DOES NOT indicate a link failure
3021 * (and thus need to back off use of the link).
3022 */
3023 public static final byte OP__RUNTEX = 126;
3024
3025 public static enum OpCode
3026 {
3027 NOOP(OP_NOOP),
3028 GetGenProps(OP_getGenProps),
3029 GetAllExhibitImmutableData(OP_getAllExhibitImmutableData),
3030 GetStaticAttr(OP_getStaticAttr),
3031 GetRawFile(OP_getRawFile),
3032 GetGenSecProps(OP_getGenSecProps),
3033 GetAllExhibitProperties(OP_getAllExhibitProperties),
3034 GetAllExhibitPropertiesDiff(OP_getAllExhibitPropertiesDiff),
3035 GetThumbnails(OP_getThumbnails),
3036 GetVariable(OP_getVariable),
3037 GetVariables(OP_getVariables),
3038 SetVariables(OP_setVariables),
3039 SyncVariables(OP_syncVariables),
3040 GetEventValues(OP_getEventValues),
3041 GetStratum(OP_getStratum),
3042 INTEX(OP__INTEX),
3043 PGMNISEX(OP__PGMNISEX),
3044 REMEX(OP__REMEX),
3045 RUNTEX(OP__RUNTEX);
3046
3047 private OpCode(final byte code) { this.code = code; }
3048
3049 /**The byte code; normal values are small and positive.
3050 * We try to maintain a good Hamming distance between any two codes.
3051 */
3052 private final byte code;
3053
3054 /**Get the "closeness" factor higher meaning closer; strictly positive. */
3055 public final byte getCode() { return(code); }
3056
3057 /**Constant-time lookup from (unsigned) byte value to enum.
3058 * Created on first use.
3059 */
3060 private static final class LookupCache
3061 {
3062 private LookupCache() { /* No instances needed. */ }
3063 static final OpCode lookup[] = new OpCode[256];
3064 static
3065 {
3066 for(final OpCode oc : OpCode.values())
3067 {
3068 final int index = oc.getCode() & 0xff;
3069 assert(lookup[index] == null) : "all OpCode values must be unique";
3070 lookup[index] = oc;
3071 }
3072 }
3073 }
3074
3075 /**Look up from byte to enum value, else null if no matching enum value. */
3076 public static OpCode lookupCode(final byte code)
3077 {
3078 return(LookupCache.lookup[code & 0xff]); // Constant-time lookup.
3079 // // Above should be equivalent to this (slow) linear lookup...
3080 // for(final OpCode oc : OpCode.values())
3081 // { if(oc.code == code) { return(oc); } }
3082 // return(null); // None found.
3083 }
3084 }
3085
3086
3087 /**Trailer byte; different to all valid op-code byte values. */
3088 public static final byte TRAILER = (byte) 0xda;
3089 /**Verify that TRAILER is indeed different not a valid op-code. */
3090 static { assert(null == OpCode.lookupCode(TRAILER)); }
3091
3092
3093 /**Minimum size of payload for which we will try heaviest compression modes; strictly positive.
3094 * This is for modes such as BZIP2 or LZMA with heavy memory, CPU and
3095 * start-up costs (ie much heavier than GZIP).
3096 * <p>
3097 * Below this we assume that the absolute bandwidth savings
3098 * are not worth the pain.
3099 * <p>
3100 * Probably bigger than a "bulk transfer" block too,
3101 * so that the heavy guns are wielded only for the biggest frames.
3102 */
3103 public static final int MIN_PLLENGTH_FOR_HEAVY_COMP_ALGS = 64 * 1024;
3104
3105 /**Opcode (single byte); never null.
3106 * Set first in the raw frame on the wire.
3107 * <p>
3108 * Never numerically equal to TRAILER.
3109 */
3110 public final OpCode opCode;
3111
3112 /**Frame overhead (header and trailer) in bytes; strictly positive. */
3113 public static final int FRAME_OVERHEAD_BYTES = 6;
3114
3115 /**Maximum permitted total frame length including header/trailer; strictly positive power of two minus the frame overhead.
3116 * Set much larger than the user-level transfer-size limit
3117 * to allow for some control data overhead on top of the given data,
3118 * and for much larger non-user-controlled items such as the AEP.
3119 * <p>
3120 * This limit mainly exists to prevent DoS-style out-of-memory problems
3121 * arising from corrupt (huge) length values.
3122 */
3123 public static final int MAX_FRAME_SIZE = 128*1024*1024;
3124
3125 /**Maximum permitted payload length; strictly positive power of two minus the frame overhead.
3126 * Set much larger than the user-level transfer-size limit
3127 * to allow for some control data overhead on top of the given data,
3128 * and for much larger non-user-controlled items such as the AEP.
3129 * <p>
3130 * This mainly exists to prevent DoS-style out-of-memory problems
3131 * arising from corrupt (huge) length values.
3132 */
3133 public static final int MAX_PAYLOAD_SIZE = MAX_FRAME_SIZE - FRAME_OVERHEAD_BYTES;
3134 /**Check that we really do allow room for the maximum user-data transfer and then some. */
3135 static { assert(MAX_PAYLOAD_SIZE > 1024+SimpleExhibitPipelineIF.MAX_USER_READ_SIZE); }
3136
3137 /**Payload length, send second; non-negative. */
3138 public final int plLength;
3139
3140 /**Payload data, non-null unless the data is stored compressed.
3141 * Private in order to keep the whole packet immutable.
3142 */
3143 private final byte payload[];
3144
3145 /**Compressed payload data, null unless the data is stored compressed.
3146 * Private in order to keep the whole packet immutable.
3147 */
3148 private final byte payloadCompressed[];
3149
3150 /**Trailer byte; not a valid op-code. */
3151 public static final byte trailer = TRAILER;
3152
3153 /**Depends on the whole packet being identical.
3154 * This depends on the header and data being identical,
3155 * including being compressed in the same way if at all.
3156 */
3157 @Override public boolean equals(final Object obj)
3158 {
3159 if(obj == this) { return(true); }
3160 if(!(obj instanceof RawPacket)) { return(false); }
3161 final RawPacket other = (RawPacket) obj;
3162
3163 if(opCode != other.opCode) { return(false); }
3164 if(plLength != other.plLength) { return(false); }
3165
3166 // Checking the data for equality may be very slow...
3167 if(!Arrays.equals(payloadCompressed, other.payloadCompressed)) { return(false); }
3168 if(!Arrays.equals(payload, other.payload)) { return(false); }
3169
3170 return(true); // Yes, identical.
3171 }
3172
3173 /**Checks for equivalent raw packets, ignoring internal representation details.
3174 * Mainly this means ignoring whether the data is held compressed or not,
3175 * as long as the values that the user can see are identical.
3176 */
3177 public boolean equivalentTo(final RawPacket other)
3178 {
3179 if(opCode != other.opCode) { return(false); }
3180 if(plLength != other.plLength) { return(false); }
3181
3182 // Check that the data appears identical to the user.
3183 return(Arrays.equals(getPayloadCopy(), other.getPayloadCopy()));
3184 }
3185
3186 /**Hash is based on just packet type and (uncompressed) packet length. */
3187 @Override public int hashCode()
3188 { return(((opCode.getCode() * 69069) + plLength)); }
3189
3190 /**Construct a new raw packet with the whole data array as the payload.
3191 * Note that the data is <em>not</em> copied (nor modified);
3192 * the array should not subsequently be modified.
3193 * <p>
3194 * This assumes that it is worth trying to compress the payload,
3195 * if other conditions support it.
3196 *
3197 * @param op the op-code of the RPC; one of the OP_XXX values
3198 * @param data the payload bytes; never null
3199 */
3200 public RawPacket(final OpCode op, final byte data[])
3201 { this(op, data, true); }
3202
3203 /**Construct a new raw packet with the whole data array as the payload.
3204 * Note that the data is <em>not</em> copied (nor modified);
3205 * the array should not subsequently be modified.
3206 *
3207 * @param op the op-code of the RPC; one of the OP_XXX values
3208 * @param data the payload bytes; never null
3209 * @param attemptCompression if true, it is probably worth attempting
3210 * to compress the payload data unless very short
3211 */
3212 public RawPacket(final OpCode op, final byte data[],
3213 final boolean attemptCompression)
3214 { this(op, data, data.length, attemptCompression); }
3215
3216 /**Construct a new raw packet with the initial portion of the data array as the payload.
3217 * Note that the data is <em>not</em> copied (nor modified),
3218 * unless it is compressed or the data does not occupy the whole array;
3219 * the array should not subsequently be modified.
3220 * <p>
3221 * If compression is requested and is possible
3222 * then the compression may be performed at construction time
3223 * and the compressed form of the data held to save space.
3224 *
3225 * @param op the op-code of the RPC; one of the OP_XXX values
3226 * @param data the payload bytes; never null
3227 * @param initialPortion the initial portion of data[] containing
3228 * payload data; never negative or larger than data.length or MAX_PAYLOAD_SIZE
3229 * @param attemptCompression if true, it is probably worth attempting
3230 * to compress the payload data unless very short
3231 */
3232 public RawPacket(final OpCode op, final byte data[], final int initialPortion,
3233 final boolean attemptCompression)
3234 {
3235 if((op == null) ||
3236 (data == null) ||
3237 (initialPortion > MAX_PAYLOAD_SIZE) ||
3238 (initialPortion > data.length) || (initialPortion < 0))
3239 { throw new IllegalArgumentException(); }
3240
3241 // Capture the opcode and the raw payload length immediately.
3242 opCode = op;
3243 plLength = initialPortion;
3244
3245 // See if the data needs compressing and can be compressed.
3246 // If not, then keep it uncompressed.
3247 //
3248 // If compression is requested and the payload size is significant
3249 // (allowing for for data overheads of the compression scheme)
3250 // then check out the possibility of compression,
3251 // else ignore, and send the data uncompressed.
3252 //
3253 // We assume that trying to deflate anything smaller than
3254 // (say) 256 bytes is a waste of effort given the
3255 // deflate overhead and, indeed, the packet overhead.
3256 if(attemptCompression &&
3257 (plLength >= FileTools.TYPICAL_DEFLATE_MIN_TEXT_SIZE_COMPRESSABLE))
3258 {
3259 final byte compressed[] = FileTools.compressDeflatableData(data, 0, initialPortion);
3260 final boolean savedSpace = (compressed.length < plLength);
3261
3262 // If successful then store the compressed data.
3263 if(savedSpace)
3264 {
3265 payload = null;
3266 payloadCompressed = compressed;
3267 if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[RawPacket: compressed payload from "+plLength+" to "+(payloadCompressed.length)+" bytes.]"); }
3268 return;
3269 }
3270 }
3271
3272 // Store uncompressed data.
3273 // Store without copying if data is entire array.
3274 if(data.length == plLength)
3275 { payload = data; }
3276 else
3277 {
3278 // It is inefficient to copy,
3279 // so we want to avoid this where possible,
3280 // and in debug will warn where it is happening.
3281 if(IsDebug.isDebug) { System.err.println("[RawPacket: WARNING: copying uncompressed payload from oversize buffer.]"); Thread.dumpStack(); }
3282 payload = new byte[plLength];
3283 System.arraycopy(data, 0, payload, 0, plLength);
3284 }
3285 payloadCompressed = null;
3286 }
3287
3288 /**Construct a new raw packet with data provided.
3289 * This accepts whatever it is passed
3290 * (we check for some errors with run-time assertions),
3291 * and is intended only to be called from other constructors
3292 * or factory methods.
3293 */
3294 private RawPacket(final OpCode opCode,
3295 final int plLength,
3296 final byte[] payload,
3297 final byte[] payloadCompressed)
3298 {
3299 assert(opCode != null);
3300 assert(plLength >= 0); // Uncompressed payload always has non-negative length.
3301 assert((payload == null) != (payloadCompressed == null)); // Exactly one must be null.
3302 assert((payload == null) || (payload.length == plLength)); // Check length of uncompressed payload.
3303
3304 this.opCode = opCode;
3305 this.plLength = plLength;
3306 this.payload = payload;
3307 this.payloadCompressed = payloadCompressed;
3308 }
3309
3310 /**Construct a new raw packet with a serialised object as the payload.
3311 * This will use an uncompressed form if more efficient on the wire.
3312 * <p>
3313 * This will have the entire uncompressed form in memory at peak,
3314 * so may be unsuitable for very large (and highly-compressable)
3315 * objects.
3316 *
3317 * @param op the op-code of the RPC; one of the OP_XXX values
3318 * @param obj the Serializable object to send,
3319 * or null to send empty payload
3320 * @param attemptCompression if true, it is probably worth attempting
3321 * to compress the payload data if not very short
3322 */
3323 public RawPacket(final OpCode op, final Serializable obj,
3324 final boolean attemptCompression)
3325 throws IOException
3326 {
3327 this(op, _serObjForPayload(obj), attemptCompression);
3328 }
3329
3330 /**Construct a new raw packet with a stream-serialised object as the payload.
3331 * This will always compress the supplied object
3332 * (unless null, in which case an empty payload is used)
3333 * to try to avoid ever having the full unserialised form in memory.
3334 * <p>
3335 * This will veto with an IOException any attempt to write more than
3336 * MAX_PAYLOAD_SIZE pre-compression or pre-compression bytes.
3337 *
3338 * @param op the op-code of the RPC; one of the OP_XXX values
3339 * @param obj the Serializable object to send,
3340 * or null to send empty payload
3341 * @param attemptCompression if true, it is probably worth attempting
3342 * to compress the payload data if not very short
3343 */
3344 public static RawPacket streamSerialiseObject(final OpCode op, final Serializable obj)
3345 throws IOException
3346 {
3347 if(op == null) { throw new IllegalArgumentException(); }
3348
3349 // If the object to be serialised is null
3350 // then return a result using an empty payload.
3351 if(null == obj) { return(new RawPacket(op, 0, EMPTY_PAYLOAD, null)); }
3352
3353 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3354 // We veto any attempt to construct a frame
3355 // whose compressed or uncompressed payload would exceed MAX_PAYLOAD_SIZE bytes.
3356 // We apply exactly the same header-less deflate as in other places we compress.
3357 final DefOutputStream dos = new DefOutputStream(new GenUtils.LengthLimitedOutputStream(baos, MAX_PAYLOAD_SIZE));
3358 final GenUtils.LengthLimitedOutputStream llos = new GenUtils.LengthLimitedOutputStream(dos, MAX_PAYLOAD_SIZE); // Must not throw an exception...
3359 ObjectOutputStream oos = null;
3360 try
3361 {
3362 oos = new ObjectOutputStream(llos);
3363 oos.writeObject(obj);
3364 oos.flush();
3365 }
3366 finally
3367 { dos.finish(); } // Finish compression and ensure that native resources are released.
3368 if(oos != null) { oos.close(); } // Hopefully free resources quickly.
3369
3370 // Construct packet with compressed data...
3371 return(new RawPacket(op, llos.getBytesWritten(), null, baos.toByteArray()));
3372 }
3373
3374 /**Serialise object for payload.
3375 * Returns a zero-length array if the object is null.
3376 */
3377 private static byte[] _serObjForPayload(final Serializable obj)
3378 throws IOException
3379 {
3380 if(obj == null)
3381 { return(ExhibitDataTunnelSource.EMPTY_PAYLOAD); }
3382
3383 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3384 final ObjectOutputStream oos = new ObjectOutputStream(baos);
3385 oos.writeObject(obj);
3386 oos.flush();
3387 final byte[] responsePayload = baos.toByteArray();
3388 oos.close(); // Hopefully free resources quickly.
3389 return(responsePayload);
3390 }
3391
3392 /**Retrieves a single serialised Object from the payload.
3393 * If the payload is empty (zero-length), we return null.
3394 * <p>
3395 * For robustness, treats a class mismatch problem like a data problem,
3396 * ie this will re-throw any ClassNotFoundException as an IOException.
3397 */
3398 public Object getSerializedObjectPayload()
3399 throws IOException
3400 {
3401 if(plLength == 0) { return(null); }
3402
3403 final InputStream is = getPayloadAsInputStream();
3404 final ObjectInputStream ois = new ObjectInputStream(is);
3405 try { return(ois.readObject()); }
3406 catch(final ClassNotFoundException e) { throw new IOException(e); }
3407 finally { ois.close(); } // Release resources, especially native in decompressor.
3408 }
3409
3410 /**Computes the total number of bytes that would be written by writePacket(); strictly positive.
3411 */
3412 public int getFrameLength()
3413 {
3414 final boolean isCompressed = (payload == null);
3415 final byte[] dataOnTheWire =
3416 isCompressed ? payloadCompressed : payload;
3417 final int realPayloadLen = dataOnTheWire.length;
3418
3419 // The total length on the wire of the entire packet.
3420 // An excess 6 bytes allows for 1 opCode, 4 length, 1 trailer.
3421 return(6 + realPayloadLen);
3422 }
3423
3424 /**Send the packet down the given OutputStream with header and trailer.
3425 * We flush the data after writing it.
3426 * <p>
3427 * If the payload is small enough,
3428 * then the entire packet will be buffered
3429 * and sent in one write().
3430 * <p>
3431 * We do not close() the stream.
3432 */
3433 public void writePacket(final OutputStream os)
3434 throws IOException
3435 {
3436 if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[writePacket(): op="+opCode+" len="+plLength+".]"); }
3437
3438 final boolean isCompressed = (payload == null);
3439 final byte[] dataOnTheWire =
3440 isCompressed ? payloadCompressed : payload;
3441 final int realPayloadLen = dataOnTheWire.length;
3442
3443 // The total length on the wire of the entire packet.
3444 final int totalPacketLen = getFrameLength();
3445
3446 // Buffer the output (with a carefully-crafted buffer size)
3447 // so that we can write it all in one operating-system interaction
3448 // if small enough, else in efficient-ish disc/network-size chunks.
3449 // (Note that Tomcat 4.1.31 writes chunked data to the network
3450 // in blocks of 2kB on Solaris 8/9,
3451 // but also note that the write of a large payload will cause
3452 // the header data to be flushed through the buffer first,
3453 // so anything other than quite a small buffer may be a waste.)
3454 final DataOutputStream dos = new DataOutputStream(
3455 new BufferedOutputStream(os,
3456 (realPayloadLen <= CoreConsts.BULK_DATA_TRANSFER_SIZE) ?
3457 totalPacketLen : 512));
3458
3459 // Send the opcode.
3460 dos.writeByte(opCode.getCode());
3461 // Send the payload length (negative indicating compressed).
3462 dos.writeInt(isCompressed ? -realPayloadLen : realPayloadLen);
3463 // Send the payload.
3464 dos.write(dataOnTheWire, 0, realPayloadLen);
3465 // Write the trailer.
3466 dos.writeByte(trailer);
3467 // Flush data though any buffers and down the wire if possible.
3468 dos.flush();
3469 }
3470
3471 /**Reads a packet from the input stream, blocking until done.
3472 * Even if the data is received compressed it is stored in
3473 * the packet that this returns in uncompressed form, so the
3474 * presence or absence of compression on the wire is invisible
3475 * to the recipient.
3476 * This also makes (re)reading the packet data cheap.
3477 * <p>
3478 * Compressed data is signalled by a negative length on the wire,
3479 * where this is the negation of the compressed data size.
3480 * <p>
3481 * The ZLIB deflater with maximum compression and
3482 * no checksum is used for any (de)compression.
3483 * <p>
3484 * To prevent unpleasantness such a OutOfMemoryError from a corrupt length value,
3485 * we veto lengths greater than the maximum specified for this frame type.
3486 */
3487 public static RawPacket readPacket(final InputStream is)
3488 throws IOException
3489 {
3490 // While not wonderfully efficient,
3491 // to save a great deal of fiddly coding
3492 // the input stream is wrapped in a DataInputStream.
3493 final DataInputStream dis = new DataInputStream(is);
3494 // Start collecting the data...
3495 final byte opC = dis.readByte();
3496 final OpCode op = OpCode.lookupCode(opC);
3497 if(op == null) { throw new IOException("invalid opcode "+opC); }
3498 final int rawLen = dis.readInt();
3499 final boolean isCompressed = (rawLen < 0);
3500 final int len = ((isCompressed) ? -rawLen : rawLen);
3501 if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[readPacket(): op="+opC+" rawLen="+rawLen+"...]"); }
3502 if(len > MAX_PAYLOAD_SIZE) { throw new IOException("corrupt length"); }
3503 final byte data[] = new byte[len];
3504 dis.readFully(data);
3505 final byte tr = dis.readByte();
3506 if(tr != TRAILER)
3507 { throw new IOException("corrupt trailer"); }
3508 try {
3509 // Decompress the data if necessary...
3510 if(isCompressed)
3511 {
3512 // Uncompress from the input stream...
3513 final byte uncompressedData[] = FileTools.decompressDeflatedData(data);
3514
3515 if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[readPacket(): compressed payload len="+data.length+", uncompressed = "+uncompressedData.length+" bytes.]"); }
3516 return(new RawPacket(op, uncompressedData, false));
3517 }
3518 return(new RawPacket(op, data, false));
3519 }
3520 catch(final IllegalArgumentException e)
3521 { throw new IOException("corrupt packet: " + e.getMessage()); }
3522 }
3523
3524 /**Create a human-readable summary (not including the payload data). */
3525 @Override
3526 public String toString()
3527 {
3528 final StringBuilder sb = new StringBuilder(79);
3529 sb.append("RawPacket");
3530 sb.append(":opCode=").append(opCode);
3531 sb.append(":plLength=").append(plLength);
3532 if(payloadCompressed != null)
3533 { sb.append(":compressedLength=").append(payloadCompressed.length); }
3534 return(sb.toString());
3535 }
3536
3537 /**Get the payload data as an InputStream; never null.
3538 * If the internal data is stored compressed however,
3539 * then this will return the uncompressed version.
3540 * <p>
3541 * Each stream returned is independent.
3542 * <p>
3543 * This stream should be explicitly closed when finished with
3544 * to release underlying memory and other resources ASAP.
3545 *
3546 * @return the uncompressed payload data as a stream of length plLength
3547 */
3548 public InputStream getPayloadAsInputStream()
3549 {
3550 if(payload != null)
3551 {
3552 // Payload is uncompressed; wrap it as-is.
3553 assert(payload.length == plLength);
3554 return(new ByteArrayInputStream(payload));
3555 }
3556
3557 // Payload is held compressed so decompress it
3558 // (on the fly to reduce peak memory usage).
3559 try { return(new DefInputStream(new ByteArrayInputStream(payloadCompressed))); }
3560 catch(final IOException e) { throw new IllegalStateException("unable to decompress"); }
3561 }
3562
3563 /**Get a copy of the payload data; never null.
3564 * We return a copy to keep this object immutable.
3565 * <p>
3566 * If the internal data is stored compressed
3567 * then this will return the original uncompressed version.
3568 *
3569 * @return a copy of the payload data, of length plLength
3570 */
3571 public byte[] getPayloadCopy()
3572 {
3573 if(payload != null)
3574 {
3575 // Payload is uncompressed; clone it as-is.
3576 assert(payload.length == plLength);
3577 // Take copy to protect internal state.
3578 return(payload.clone());
3579 }
3580
3581 // Payload is held compressed, so decompress it.
3582 try
3583 {
3584 final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3585 assert(uncompressedData.length == plLength);
3586 // No need to copy (private) data returned from decompression.
3587 return(uncompressedData);
3588 }
3589 catch(final IOException e)
3590 {
3591 throw new IllegalStateException("corrupt compressed data");
3592 }
3593 }
3594
3595 /**Get direct access to the uncompressed payload; never null.
3596 * We avoid making a copy, for speed.
3597 * <em>The caller must not alter the byte array returned.</em>
3598 * <p>
3599 * If the internal data is stored compressed
3600 * then this will return an uncompressed copy.
3601 * <p>
3602 * This is package-visible and is only for trusted callers
3603 * that will not alter the content.
3604 *
3605 * @return a the payload data, of length plLength; never null,
3606 */
3607 byte[] getPayload()
3608 {
3609 if(payload != null)
3610 {
3611 // Payload is uncompressed; clone it as-is.
3612 assert(payload.length == plLength);
3613 // Trust caller not to mess around...
3614 return(payload);
3615 }
3616
3617 // Payload is held compressed, so decompress it.
3618 try
3619 {
3620 final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3621 assert(uncompressedData.length == plLength);
3622 // No need to copy (private) data returned from decompression.
3623 return(uncompressedData);
3624 }
3625 catch(final IOException e)
3626 {
3627 throw new IllegalStateException("corrupt compressed data");
3628 }
3629 }
3630
3631 /**Copy the payload data into the supplied (trusted) buffer.
3632 * We only make this package-visible to keep this object immutable.
3633 * <p>
3634 * If the internal data is stored compressed
3635 * then this will return the original uncompressed version.
3636 */
3637 void getPayloadCopy(final ByteBuffer buf)
3638 {
3639 if(payload != null)
3640 {
3641 // Payload is uncompressed; clone it as-is.
3642 assert(payload.length == plLength);
3643 // Copy uncompressed data directly.
3644 buf.put(payload);
3645 return;
3646 }
3647
3648 // Payload is held compressed, so decompress it.
3649 try
3650 {
3651 final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3652 assert(uncompressedData.length == plLength);
3653 // No need to copy (private) data returned from decompression.
3654 buf.put(uncompressedData);
3655 return;
3656 }
3657 catch(final IOException e)
3658 {
3659 throw new IllegalStateException("corrupt compressed data");
3660 }
3661 }
3662 }
3663
3664 /**The immutable adjunct for a RawPacket that includes the HMAC and other anti-attack data.
3665 * Note that this class is NOT directly serialisable,
3666 * with the data fields sent in some other way,
3667 * eg as HTTP header fields "out-of-band" from the actual HTTP message.
3668 */
3669 public static final class PacketProtector
3670 {
3671 /**The timestamp for the RawPacket, input to each MAC; strictly positive. */
3672 public final long timestamp;
3673
3674 /**The length of the entire frame/datastream being protected, input to each MAC; non-negative. */
3675 public final int length;
3676
3677 /**The immutable in-order list of MAC authenticator segments for the stream and fields herein; never null nor empty nor containing nulls.
3678 * This sequence of HMAC values together makes up the message MAC.
3679 * It is designed not to be possible to re-arrange these segments,
3680 * as each depends in part on the value of its predecessor
3681 * as well as the data in its segment.
3682 * <p>
3683 * Note that the <em>last</em> MAC, which depends on all the data being protected,
3684 * can be used as a unique message MAC on its own.
3685 */
3686 public final List<ROByteArray> mac;
3687
3688 /**Maxium size in (ASCII) characters of output of toCheckString(); strictly positive.
3689 * Chosen to allow inclusion of the output of toCheckString() in an HTTP header.
3690 */
3691 public static final int MAX_CHECK_STRING_CHARS = 900;
3692
3693 /**Generate (HTTP-header) check-string; never null nor empty.
3694 * This is a pure-ASCII single-line control-code-free check String
3695 * that contains the input fields and MAC from this object
3696 * to serve as the "checksum" of a RawPacket.
3697 * <p>
3698 * This object is suitable (short enough, avoidance of meta-characters)
3699 * to be used directly in an HTTP header.
3700 * <p>
3701 * This value is suitable to be decoded by fromCheckString().
3702 * <p>
3703 * The format is a space-separated list of fields:
3704 * <ol>
3705 * <li>The Java-style UTC timestamp in milliseconds as an unsigned decimal.
3706 * <li>The length of the protected stream in bytes as an unsigned decimal.
3707 * <li>The MAC values for each segment, in order, encoded Base64.
3708 * </ol>
3709 */
3710 public String toCheckString()
3711 {
3712 // Do a worst-case check given the size of the (first) MAC as encoded.
3713 assert((""+Long.MAX_VALUE+' '+Integer.MAX_VALUE+' ').length() + (MAX_SEGMENTS*(1+TextUtils.encode8To6(mac.get(0).toByteArray()).length()))-1 < MAX_CHECK_STRING_CHARS) : "check string must always be short enough for HTTP header";
3714
3715 final StringBuilder sb = new StringBuilder(32 + (45*mac.size()));
3716 sb.append(timestamp).append(' ');
3717 sb.append(length).append(' ');
3718 for(final ROByteArray r : mac)
3719 { sb.append(TextUtils.encode8To6(r.toByteArray())).append(' '); }
3720 // Rip off trailing space...
3721 sb.setLength(sb.length()-1);
3722 assert(sb.length() <= MAX_CHECK_STRING_CHARS) : "check string must be short enough for HTTP header";
3723 return(sb.toString());
3724 }
3725
3726 /**Field splitter regex pattern compiled once for efficiency; never null. */
3727 private static final Pattern fieldSplitPattern = Pattern.compile(" ");
3728
3729 /**Parses a check-string as generated by toCheckString(); never null.
3730 * This is tolerant of some leading and trailing whitespace.
3731 *
3732 * @throws IllegalArgumentException if the input is unparsable
3733 */
3734 public static PacketProtector fromCheckString(final String check)
3735 {
3736 if((check == null) || (check.length() > 2*MAX_CHECK_STRING_CHARS))
3737 { throw new IllegalArgumentException("too big before trim"); }
3738 final String trimmed = check.trim();
3739 if(trimmed.length() > MAX_CHECK_STRING_CHARS)
3740 { throw new IllegalArgumentException("too big"); }
3741 final String fields[] = fieldSplitPattern.split(check, 0);
3742 if(fields.length < 3)
3743 { throw new IllegalArgumentException("not enough fields"); }
3744 final long timestamp = Long.parseLong(fields[0], 10);
3745 final int length = Integer.parseInt(fields[1], 10);
3746 if((length < 0) || (length > RawPacket.MAX_FRAME_SIZE))
3747 { throw new IllegalArgumentException("out-of-range length value"); }
3748 final ArrayList<ROByteArray> mac = new ArrayList<ROByteArray>(fields.length-2);
3749 for(int i = 2; i < fields.length; ++i)
3750 { mac.add(new ROByteArray(TextUtils.decode8To6(fields[i]))); }
3751 return(new PacketProtector(timestamp, length, mac));
3752 }
3753
3754 /**Maximum number of segments protected stream may be broken into; strictly positive power of two.
3755 * This determines the maximum number of incremental portions
3756 * we can process while streaming a RawPacket.
3757 * <p>
3758 * This value is capped to limit the amount of MAC data that needs to be sent,
3759 * eg in an HTTP header of limited length.
3760 */
3761 public static final int MAX_SEGMENTS = 16;
3762
3763 /**Minimum segment size (other than final segment); strictly positive power of two.
3764 * This is set to be somewhat larger than the largest typical frame
3765 * (ie the entire header, payload and trailer)
3766 * that we usually don't want to stream, eg bulk data transfer,
3767 * so that such frames/packets will get a single MAC for efficiency.
3768 * This is also set large enough to amortise the cost of each segment MAC.
3769 * <p>
3770 * This is small enough to represent a reasonable incremental amount of CPU
3771 * for streamed inputs.
3772 */
3773 public static final int MIN_SEGMENT_SIZE = (1<<16);
3774
3775 /**Maximum segment size; strictly positive power of two.
3776 * This is determined from the maximum frame size
3777 * and the maximum number of segments.
3778 */
3779 public static final int MAX_SEGMENT_SIZE = RawPacket.MAX_FRAME_SIZE / MAX_SEGMENTS;
3780 /**Check some invariants. */
3781 static { assert(RawPacket.MAX_FRAME_SIZE == MAX_SEGMENT_SIZE * MAX_SEGMENTS); }
3782 static { assert(MAX_SEGMENT_SIZE > MIN_SEGMENT_SIZE); }
3783
3784 /**Create an adjunct to protect a RawPacket, including a current timestamp.
3785 * This must be supplied with the (secret) Key shared between the servers.
3786 *
3787 * @param raw the RawPacket to protect; never null
3788 * @param key the (secret) key for the HMAC; never null
3789 */
3790 public PacketProtector(final RawPacket raw,
3791 final SecretKey key)
3792 throws InvalidKeyException
3793 { this(raw, System.currentTimeMillis(), key); }
3794
3795 /**Create an adjunct to protect a RawPacket, including the given timestamp.
3796 * This must be supplied with the (secret) Key shared between the servers.
3797 *
3798 * @param raw the RawPacket to protect; never null
3799 * @param timestamp the timestamp for creation/send of the RawPacket; strictly positive
3800 * @param key the (secret) key for the HMAC; never null
3801 */
3802 public PacketProtector(final RawPacket raw,
3803 final long timestamp,
3804 final SecretKey key)
3805 throws InvalidKeyException
3806 { this(timestamp, raw.getFrameLength(), computeMAC(timestamp, raw, key)); }
3807
3808 /**Create an adjunct to protect a RawPacket.
3809 *
3810 * @param timestamp the timestamp for creation/send of the RawPacket; strictly positive
3811 * @param mac the HMAC for the RawPacket and other fields, not checked; never null
3812 */
3813 public PacketProtector(final long timestamp,
3814 final int length,
3815 final List<ROByteArray> mac)
3816 {
3817 this.timestamp = timestamp;
3818 this.length = length;
3819 // Take defensive immutable copy of MAC segment list.
3820 this.mac = Collections.unmodifiableList(new ArrayList<ROByteArray>(mac));
3821
3822 try { validateObject(); } // Validate this instance.
3823 catch(final InvalidObjectException e) { throw new IllegalArgumentException(e); }
3824 }
3825
3826 /**Compute segment size; strictly positive power of two.
3827 * Given the full message/frame length
3828 * this computes the segment size between/at the MIN/MAX limits
3829 * to maximise the number of segments
3830 * and thus maximise the the amount of incremental MAC processing
3831 * and incremental processing of streamed frames that can be done,
3832 * minimising buffer working space and time/bandwidth before corruption is found.
3833 * <p>
3834 * All segments are of the same size, except the last one which may be shorter.
3835 */
3836 private static final int computeSegmentSize(final int frameLength)
3837 {
3838 if((frameLength > RawPacket.MAX_FRAME_SIZE) ||
3839 (frameLength < 0))
3840 { throw new IllegalArgumentException(); }
3841
3842 // Find minimum segment size that will cover the frame.
3843 for(int segSize = MIN_SEGMENT_SIZE; segSize <= MAX_SEGMENT_SIZE; segSize <<= 1)
3844 {
3845 if(segSize * MAX_SEGMENTS >= frameLength)
3846 { return(segSize); }
3847 }
3848
3849 throw new Error("should not get here");
3850 }
3851
3852 /**Compute the MAC given the message and other fields to be included; never null.
3853 * The (secret) Key must also be supplied
3854 * and be suitable for the HMAC algorithm used.
3855 * <p>
3856 * The MAC is computed on a series of segments of the input of the same length
3857 * (except for a possibly-shorter final segment)
3858 * producing an HMAC on each segment.
3859 * <p>
3860 * Each HMAC is computed over a binary message consisting of:
3861 * <ol>
3862 * <li>the 4-byte length of the entire protected stream/frame.
3863 * <li>the 8-byte (Java-style millisecond UTC) timestamp in network order,
3864 * <li>for all segments other than the first, the HMAC of the previous segment,
3865 * <li>the full on-the-wire form of the RawPacket (including header/trailer).
3866 * </ol>
3867 * <p>
3868 * The chaining should make it impossible to reorder the segments.
3869 * <p>
3870 * The segmentation is so that the data in each segment is known-safe
3871 * and can be safely consumed by incremental/streaming CPU-heavy operations
3872 * before subsequent segments have been received and decoded.
3873 * This segmentation also means that we can abort a damaged message
3874 * as soon as we check the damaged segment: we do not need to wait
3875 * to receive and store and check the whole message.
3876 *
3877 * @param raw the RawPacket to protect; never null
3878 * @param timestamp the timestamp for creation/send of the RawPacket; strictly positive
3879 * @param key the (secret) key for the HMAC; never null
3880 *
3881 * @throws InvalidKeyException if the Key supplied is inappropriate
3882 *
3883 * @throws IllegalArgumentException if the timestamp is non-positive
3884 * or any other argument is null
3885 * @throws IllegalStateException if the HMAC algorithm is unavailable
3886 *
3887 * @return immutable in-order list of HMACs to protect each stream segment
3888 */
3889 public static List<ROByteArray> computeMAC(final long timestamp,
3890 final RawPacket raw,
3891 final SecretKey key)
3892 throws InvalidKeyException
3893 {
3894 if((timestamp <= 0) || (raw == null) || (key == null))
3895 { throw new IllegalArgumentException(); }
3896
3897 // Compute the segment size that we will be using.
3898 final int frameLength = raw.getFrameLength();
3899 final int segmentSize = computeSegmentSize(frameLength);
3900 assert(0 == (segmentSize & (segmentSize-1))); // Segment size should be a power of two.
3901
3902 // Compute the number of segments that we will be using.
3903 final int nSegs = (frameLength + segmentSize - 1) / segmentSize;
3904
3905 // Compose our result MAC list...
3906 final List<ROByteArray> mac = new ArrayList<ROByteArray>(nSegs);
3907
3908 // Incrementally generate the MAC block by block.
3909 // We use an output stream sink to avoid having to copy large frames.
3910 // This OutputStream is not thread-safe and does not need to be.
3911 // Working memory required is basically only that for the current segment's Mac.
3912 final OutputStream sink = new OutputStream(){
3913 /**MAC for current segment; initialised for first block. */
3914 Mac current = startMacForSegment();
3915
3916 /**Start MAC for next segment.
3917 * The working MAC must be null before this is called.
3918 */
3919 private Mac startMacForSegment()
3920 {
3921 try
3922 {
3923 final Mac m = Mac.getInstance(CoreConsts.HMAC_ALG);
3924 m.init(key);
3925
3926 // First, write 4-byte length of entire message.
3927 m.update(intSer(frameLength));
3928 // Second, write 8-byte timestamp.
3929 m.update(longSer(timestamp));
3930 // If there is a previous MAC, stir it in.
3931 if(null != prevMAC) { m.update(prevMAC); }
3932
3933 return(m);
3934 }
3935 catch(final Exception e)
3936 {
3937 e.printStackTrace();
3938 throw new Error("internal error", e);
3939 }
3940 }
3941
3942 /**The previous MAC value, or null while gathering the first segment. */
3943 private byte[] prevMAC;
3944
3945 /**Count of bytes seen so far; non-negative.
3946 * Implicitly this also indicates how far into the current segment we are.
3947 */
3948 private int bytesSeenSoFar;
3949
3950 /**Process the next block to generate the next MAC.
3951 * Must only be called when we've collected a full segment, or at EOF/close().
3952 */
3953 private void endMacForSegment()
3954 {
3955 try
3956 {
3957 // Compute and record the MAC.
3958 final byte[] hmac = current.doFinal();
3959 prevMAC = hmac;
3960 mac.add(new ROByteArray(hmac));
3961 }
3962 catch(final Exception e)
3963 {
3964 e.printStackTrace();
3965 throw new Error("internal error", e);
3966 }
3967 }
3968
3969 @Override
3970 public void write(final byte[] b, int off, int len) throws IOException
3971 {
3972 // Loop to generate multiple segment MACs if needed.
3973 while(len > 0)
3974 {
3975 // Compute bytes needed to fill the buffer: always positive.
3976 final int offsetInSegBuf = bytesSeenSoFar & (segmentSize-1); // Faster version of bytesSeenSoFar % segmentSize;
3977 final int needed = segmentSize - (offsetInSegBuf);
3978 // How many will we copy in...
3979 final int toCopy = Math.min(needed, len);
3980
3981 // Write the frame bytes to protect.
3982 current.update(b, off, toCopy);
3983
3984 // Adjust all our indexes...
3985 bytesSeenSoFar += toCopy;
3986 off += toCopy;
3987 len -= toCopy;
3988
3989 // If we copied exactly up to the end of the segBuf
3990 // then complete the current MAC and start the next one...
3991 if(toCopy == needed) { endMacForSegment(); current = startMacForSegment(); }
3992 }
3993 }
3994
3995 /**Horribly inefficient, but correct, and should never actually get called. */
3996 @Override public void write(final int b) throws IOException
3997 {
3998 final byte buf[] = new byte[1];
3999 buf[0] = (byte) b;
4000 write(buf, 0, 1);
4001 }
4002
4003 /**Handle the tail of the input. */
4004 @Override public void close() throws IOException
4005 {
4006 assert(bytesSeenSoFar == frameLength); // Should have seen all the data.
4007 endMacForSegment(); // Push out the current (final) MAC...
4008 }
4009 };
4010 // Compute the HMAC value(s).
4011 try { raw.writePacket(sink); sink.close(); }
4012 catch(final IOException e) { throw new Error("internal error", e); }
4013 //System.out.println("frameLength/nSegs/mac.size() = " + frameLength + '/' + nSegs + '/' + mac.size());
4014 assert(nSegs == mac.size()); // Did we create the correct number of MACs values?
4015
4016 return(Collections.unmodifiableList(mac));
4017 }
4018
4019 /**Protect an input stream with our MAC; aborts with IOException in case of corruption.
4020 * This acts as a transparent pass-through filter
4021 * that will read a MAC-protected-segment at a time from the underlying stream
4022 * and either pass it through having verified it,
4023 * or abort with an IOException if the segment is corrupt.
4024 * <p>
4025 * This closes its input stream and vetoes any further operations
4026 * once any error has been encountered.
4027 * <p>
4028 * We use Key rather than SecretKey,
4029 * since the latter depends on Java extensions that may not be available,
4030 * eg when run in JWS.
4031 */
4032 public InputStream protectInputStream(final Key key, final InputStream is)
4033 {
4034 if(key == null) { throw new IllegalArgumentException(); }
4035 if(is == null) { throw new IllegalArgumentException(); }
4036
4037 // Compute our segment size for buffering.
4038 final int segmentSize = computeSegmentSize(length);
4039
4040 // Number of segments.
4041 final int nSegs = mac.size();
4042
4043 // To avoid any confusion, we do not allow mark/reset.
4044 return(new FilterInputStream(is){
4045 /**Set true upon any error; subsequently we will not allow any further read()s. */
4046 private boolean hasErred;
4047
4048 /**Our read buffer for one segment's data (or whole frame if smaller); never null. */
4049 private final byte[] segBuf = new byte[Math.min(length, segmentSize)];
4050
4051 /**Data remaining in segBuf; non-negative. */
4052 private int bufLen;
4053
4054 /**Start of remaining data in segBuf; non-negative. */
4055 private int bufPos;
4056
4057 /**Which MAC segment is next to be loaded/verified. */
4058 private int nextSegNumber;
4059
4060 /**Whatever is in our buffer plus from upstream without blocking. */
4061 @Override public synchronized int available() throws IOException
4062 { return(in.available() + bufLen); }
4063
4064 @Override public void mark(final int readlimit)
4065 { throw new UnsupportedOperationException("mark not supported"); }
4066
4067 /**Mark/reset is not allowed to avoid confusion/complexity. */
4068 @Override public boolean markSupported() { return(false); }
4069
4070 /**Private buffer for read(); never null. */
4071 private final byte[] buf1 = new byte[1];
4072
4073 /**Read a single byte: we try to make this moderately efficient. */
4074 @Override public synchronized int read() throws IOException
4075 {
4076 if(hasErred) { throw new IOException("corrupt stream"); }
4077
4078 // Fast path to return data already in our buffer.
4079 if(bufLen > 0)
4080 {
4081 --bufLen;
4082 return(segBuf[bufPos++] & 0xff);
4083 }
4084
4085 // Use full read() to do all the leg-work.
4086 final int n = read(buf1, 0, 1);
4087 if(n < 1) { return(-1); /* EOF */ }
4088 assert(n == 1);
4089 return(buf1[0] & 0xff);
4090 }
4091
4092 /**Bulk data read.
4093 * This routine returns any data it has buffered (ie already verified)
4094 * else it reads a segment of data from the underlying stream
4095 * (possibly less than a full segment at EOF, but verifying the length)
4096 * and validates it with the appropriate MAC.
4097 * <p>
4098 * If validation against the MAC fails
4099 * then this and all subsequent calls to read()
4100 * are vetoed with an IOException.
4101 * <p>
4102 * If validation succeeds then the requested data is returned.
4103 * <p>
4104 * We do not return data in one read across segment boundaries,
4105 * as we are not required to and it would increase complexity.
4106 */
4107 @Override
4108 public synchronized int read(final byte[] b, final int off, final int len) throws IOException
4109 {
4110 if((b == null) ||
4111 (off < 0) || (off >= b.length) ||
4112 (len < 0) || (off + len > b.length))
4113 { throw new IllegalArgumentException(); }
4114
4115 if(hasErred) { throw new IOException("corrupt stream"); }
4116
4117 if(len == 0) { return(0); }
4118
4119 // If we have no verified data left in our buffer
4120 // then unless at EOF we need to load another segment and verify it.
4121 if(bufLen == 0)
4122 {
4123 // We are at EOF if we have read the last segment.
4124 if(nextSegNumber >= nSegs) { return(-1); }
4125
4126 // Allow for the final segment to be smaller.
4127 final int readSize = (nextSegNumber < nSegs-1) ? segmentSize :
4128 (length - (segmentSize * (nSegs-1)));
4129 bufPos = 0; // Next caller will read from offset zero.
4130 bufLen = 0; // Buffer is currently empty...
4131 while(bufLen != readSize)
4132 {
4133 try
4134 {
4135 final int n = in.read(segBuf, bufLen, readSize - bufLen);
4136 if(n < 1) { throw new IOException("premature EOF"); }
4137 bufLen += n;
4138 }
4139 catch(final IOException e) { hasErred = true; throw e; }
4140 }
4141
4142 // Verify the data.
4143 try
4144 {
4145 final Mac m = Mac.getInstance(CoreConsts.HMAC_ALG);
4146 m.init(key);
4147
4148 // First, write 4-byte length of entire message.
4149 m.update(intSer(length));
4150 // Second, write 8-byte timestamp.
4151 m.update(longSer(timestamp));
4152 // If there is a previous MAC, stir it in.
4153 if(nextSegNumber > 0) { m.update(mac.get(nextSegNumber-1).toByteArray()); }
4154 // Finally, use the bytes read from upstream.
4155 m.update(segBuf, 0, bufLen);
4156
4157 // Compute and record the MAC.
4158 final byte[] hmac = m.doFinal();
4159 if(!Arrays.equals(mac.get(nextSegNumber++).toByteArray(), hmac))
4160 { throw new IOException("corrupt input data"); }
4161 }
4162 catch(final IOException e)
4163 {
4164 hasErred = true; // Prevent further read()s from this stream.
4165 throw e;
4166 }
4167 catch(final Exception e)
4168 {
4169 hasErred = true;
4170 e.printStackTrace();
4171 throw new Error("internal error", e);
4172 }
4173 }
4174
4175 // The buffer must now have something in it.
4176 assert(bufLen > 0);
4177 // Return verified data from our buffer to the caller.
4178 final int toCopy = Math.min(bufLen, len);
4179 System.arraycopy(segBuf, bufPos, b, off, toCopy);
4180 bufLen -= toCopy;
4181 bufPos += toCopy;
4182 return(toCopy);
4183 }
4184
4185 /**Implemented as read(b, 0, b.length). */
4186 @Override public int read(final byte[] b) throws IOException
4187 { return(read(b, 0, b.length)); }
4188
4189 @Override public void reset() throws IOException
4190 { throw new IOException("reset not allowed"); }
4191
4192 /**We have to read the input to skip it. */
4193 @Override public synchronized long skip(long n) throws IOException
4194 {
4195 long skipped = 0;
4196 final byte b[] = new byte[(int) Math.min(1024, n)];
4197 while(n > 0)
4198 {
4199 final int r = read(b, 0, (int) Math.min(n, b.length));
4200 if(r < 1) { break; /* EOF */ }
4201 n -= r;
4202 skipped += r;
4203 }
4204 return(skipped);
4205 }
4206 });
4207 }
4208
4209 /**Checks that the object content is valid only.
4210 * This does NOT attempt to check the MAC matches the message and fields;
4211 * that must be done explicitly elsewhere.
4212 * <p>
4213 * Partly this does not check the MAC because it does not have access to the key.
4214 */
4215 public void validateObject() throws InvalidObjectException
4216 {
4217 if(timestamp <= 0) { throw new InvalidObjectException("bad object: bad timestamp"); }
4218 if(length < 0) { throw new InvalidObjectException("bad object: bad length"); }
4219 if(mac == null) { throw new InvalidObjectException("bad object: null MAC"); }
4220 if(mac.size() == 0) { throw new InvalidObjectException("bad object: empty MAC"); }
4221 for(final ROByteArray m : mac)
4222 {
4223 if(m == null) { throw new InvalidObjectException("bad object: null segement MAC"); }
4224 if(m.length() == 0) { throw new InvalidObjectException("bad object: empty segment MAC"); }
4225 }
4226 }
4227
4228 /**Equality depends on all the members being equal. */
4229 @Override
4230 public boolean equals(final Object obj)
4231 {
4232 if(this == obj) { return(true); }
4233 if(!(obj instanceof PacketProtector)) { return(false); }
4234 final PacketProtector other = (PacketProtector) obj;
4235 return((timestamp == other.timestamp) &&
4236 (length == other.length) &&
4237 mac.equals(other.mac));
4238 }
4239
4240 /**We use the timestamp and length fields in the hash for the entire collection. */
4241 @Override public int hashCode() { return(length ^ (int) timestamp); }
4242 }
4243 }