001 /*
002 Copyright (c) 1996-2012, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029 package org.hd.d.pg2k.webSvr.exhibit;
030
031 import java.io.BufferedInputStream;
032 import java.io.IOException;
033 import java.io.InputStream;
034 import java.io.InterruptedIOException;
035 import java.net.InetAddress;
036 import java.security.InvalidKeyException;
037 import java.util.Map;
038
039 import javax.crypto.SecretKey;
040 import javax.servlet.ServletConfig;
041 import javax.servlet.ServletContext;
042 import javax.servlet.ServletException;
043 import javax.servlet.ServletOutputStream;
044 import javax.servlet.http.HttpServlet;
045 import javax.servlet.http.HttpServletRequest;
046 import javax.servlet.http.HttpServletResponse;
047
048 import org.hd.d.pg2k.svrCore.CoreConsts;
049 import org.hd.d.pg2k.svrCore.DuplicateIDChecker;
050 import org.hd.d.pg2k.svrCore.MemoryTools;
051 import org.hd.d.pg2k.svrCore.PGMasterNotInServiceException;
052 import org.hd.d.pg2k.svrCore.ROByteArray;
053 import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
054 import org.hd.d.pg2k.svrCore.datasource.ExhibitDataHTTPTunnelSource;
055 import org.hd.d.pg2k.svrCore.datasource.ExhibitDataTunnelSource;
056 import org.hd.d.pg2k.svrCore.datasource.ExhibitDataTunnelSource.PacketProtector;
057 import org.hd.d.pg2k.svrCore.datasource.ExhibitDataTunnelSource.RawPacket;
058 import org.hd.d.pg2k.svrCore.datasource.SimpleExhibitPipelineIF;
059 import org.hd.d.pg2k.svrCore.props.LocalProps;
060 import org.hd.d.pg2k.svrCore.props.SecurityProps;
061 import org.hd.d.pg2k.svrCore.props.SimplepassProps;
062 import org.hd.d.pg2k.svrCore.stats.StatsLogger;
063 import org.hd.d.pg2k.svrCore.vars.InstanceID;
064 import org.hd.d.pg2k.svrCore.vars.SimpleVariableValue;
065 import org.hd.d.pg2k.svrCore.vars.SystemVariables;
066 import org.hd.d.pg2k.webSvr.util.WebUtils;
067
068 import ORG.hd.d.IsDebug;
069
070
071 /**This serves data through an HTTP tunnel from master Web-app to authorised slaves, ie is an RPC end-point.
072 * We are communicating on the client side with ExhibitDataHTTPTunnelSource
073 * and may have (at least) one long-running open connection per slave.
074 * <p>
075 * Takes data from whatever the DataSourceBean is using.
076 * <p>
077 * This allows concurrency between different slaves,
078 * and for multiple connections from each slave,
079 * though this concurrency may be limited by resource constraints.
080 * <p>
081 * Notice that all operations require some degree of authentication:
082 * <ul>
083 * <li>A core set of safe read-only non-sensitive operations
084 * are available to clients that can provide a suitable authenticator
085 * (username/password) as HTTP fields,
086 * such as the JWS-based uploader.
087 * <li>The full set of operations is available to clients (eg slave servers)
088 * that can authenticate by address,
089 * though "write" operations may be silently discarded/ignored
090 * when a second authenticator (eg HMAC using a shared private key)
091 * is available and required locally but is not provided by the client.
092 * (If the second authenticator is provided but the message does not validate,
093 * eg because the message was corrupted in transit,
094 * then the client will get an error so that they can retry.)
095 * </ul>
096 * <p>
097 * Note that for messages that have a MAC:
098 * <ul>
099 * <li>We screen the message stamp for validity
100 * (ie within a certain window of our current time to allow for
101 * transmission delays and clock skew).
102 * <li>For those message whose MAC is successfully validated (to stop DoS attacks)
103 * we store the MAC until some time after its stamp would become invalid
104 * and reject any further messages with the same MAC,
105 * ie we reject duplicate/replayed messages.
106 * </ul>
107 */
108 public final class TunnelServlet extends HttpServlet
109 {
110 /**If debugging locally, note some activity at protocol level. */
111 private final static boolean isDebug = false && ORG.hd.d.IsDebug.isDebug;
112
113 /**Fixed (common) REMEX response; non-null. */
114 private static final RawPacket REMEX_RESPONSE = new ExhibitDataHTTPTunnelSource.RawPacket(
115 ExhibitDataHTTPTunnelSource.RawPacket.OpCode.REMEX);
116
117
118 /**Unhook from servlet logger. */
119 @Override
120 public void destroy()
121 {
122 logger.setContext(null); // Stop using servlet logger.
123 super.destroy();
124 }
125
126 /**Hook into servlet logger. */
127 @Override
128 public void init(final ServletConfig servletConfig)
129 throws ServletException
130 {
131 logger.setContext(servletConfig.getServletContext()); // Use servlet.log().
132 super.init(servletConfig);
133 }
134
135 /**Our logger which falls back to System.out if servlet log not available; never null. */
136 private final WebUtils.ServletLoggerWithFallback logger = new WebUtils.ServletLoggerWithFallback();
137
138 /**The stats set to which we log general tunnel servlet stats.
139 * The unique codes are the constants TSNAME_XXX.
140 */
141 private final StatsLogger.StatsConfig statsIDTSV =
142 new StatsLogger.StatsConfig("TUNNELSERVLET",
143 logger, // Use servlet log if poss.
144 false, // Only dump summaries...
145 12 * 3600, // About every 12 hours.
146 true); // Adaptive.
147
148 // /**General stats event name: inbound HTTP request. */
149 // public static final String TSVNAME_HTTPREQUEST = "HTTP-request";
150
151 // /**General stats event name prefix: inbound HTTP request service time (ms exponent). */
152 // public static final String TSVNAMEP_HTTPSERVICETIME = "HTTP-serviceTime=";
153
154 // /**General stats event name prefix: slow inbound HTTP request op code. */
155 // public static final String TSVNAMEP_HTTPSLOWOP = "HTTP-slowOp=";
156
157 /**General stats event name: failures handling input HTTP requests. */
158 public static final String TSVNAME_HTTPFAIL = "HTTP-failure";
159
160 /**General stats event name: inbound HTTP request rejected by client IP address. */
161 public static final String TSVNAME_HTTPREJADDR = "HTTP-reqRejectedByAddr";
162
163 /**General stats event name: inbound HTTP request rejected by ID/password. */
164 public static final String TSVNAME_HTTPREJID = "HTTP-reqRejectedByID";
165
166 /**General stats event name: inbound HTTP request rejected by request op-code. */
167 public static final String TSVNAME_HTTPREJOP = "HTTP-reqRejectedByOp";
168
169 // /**General stats event name: inbound HTTP request rejected by failed MAC. */
170 // private static final String TSVNAME_HTTPREJMAC = "HTTP-reqRejectedByMAC";
171
172 // /**General stats event name: inbound HTTP request rejected as a duplicate. */
173 // private static final String TSVNAME_HTTPREJDUP = "HTTP-reqRejectedAsDup";
174
175 /**General stats event name prefix: RPC request packet type. */
176 public static final String TSVNAMEPR_RPCTYPE = ExhibitDataTunnelSource.TSNAMEPR_RPCTYPE;
177
178
179 /**Maximum expected typical service time, ms.
180 * This ideally should be well under 1s.
181 * <p>
182 * This should in fact be a tiny bit less than the total RTT,
183 * since there is a short delay from when we send the last response byte
184 * to it being processed by the receiver (eg due to transmission delays),
185 * but we ignore that for now.
186 */
187 private static final int MAX_TYPICAL_SERVICE_TIME_MS = CoreConsts.MAX_TYPICAL_RPC_RTT_MS;
188
189 /**Size in bytes of 'big' frame that may require special handling; strictly positive.
190 * Inbound frames at least this big may need to be rejected or handled serially
191 * because of the heap space they may consume,
192 * and indeed may be rejected simply because the system is temporarily busy
193 * with another memory-intensive task at the time.
194 * <p>
195 * Almost all 'normal' inbound packets should be below this threshold.
196 */
197 public static final int LARGE_FRAME_BYTES = 1 << 16;
198
199 /**Get singleton (per-servlet-context) data pipeline/cache instance.
200 * The config param must not be null, but for some operations
201 * (such as calling destroy()) request can be null.
202 * <p>
203 * This does not cache its return value.
204 */
205 private static DataSourceBean getDataSourceBean(
206 final ServletContext ctxt,
207 final HttpServletRequest request)
208 {
209 // Fetches/creates the data source...
210 final DataSourceBean dataSource =
211 DataSourceBean.getApplicationInstance(ctxt);
212
213 // Ensure that the essential details are set up.
214 dataSource.setServletContext(ctxt);
215 if(request != null)
216 { dataSource.setContextPath(request.getContextPath()); }
217
218 return(dataSource);
219 }
220
221 /**Get data source/pipeline; never returns null.
222 * This routine may or may not cache its result,
223 * but the caller should not cache it.
224 */
225 private SimpleExhibitPipelineIF getDataSource()
226 {
227 final DataSourceBean dataSource =
228 getDataSourceBean(getServletContext(), null);
229 return(dataSource);
230 }
231
232 /**Cached SecurityProps; private to getSecurityProps(); never null.
233 * Volatile so as to allow access without a lock.
234 */
235 private volatile SecurityProps _gSP_cache = new SecurityProps();
236
237 /**Get SecurityProps (including sensitives values); never returns null.
238 * May cache results for performance.
239 *
240 * @throws IOException if SecurityProps cannot be retrieved
241 * (eg because no local file exists)
242 */
243 private SecurityProps getSecurityProps()
244 throws IOException
245 {
246 // Get handle on cached properties.
247 final SecurityProps cached = _gSP_cache;
248 assert(cached != null);
249
250 // Get timestamp of cached properties.
251 final long ts = cached.timestamp;
252 // Get updated set if need be...
253 final SecurityProps sp = SecurityProps.getSecurityPropsUncachedFromFilesystem(ts);
254
255 // If new then cache and return the new set.
256 if(sp != null)
257 {
258 _gSP_cache = sp;
259 return(sp);
260 }
261
262 // Return the still-current cached set.
263 return(cached);
264 }
265
266 /**Cache/lock object to optimise handling of incoming RPCs; never null. */
267 private final ExhibitDataTunnelSource.HIRPCCache _RPC_cache =
268 new ExhibitDataTunnelSource.HIRPCCache();
269 /**Register the RPC cache with the emergency-free callback mechanism. */
270 { MemoryTools.registerRecurrentEmergencyFreeHandle(_RPC_cache); }
271
272 /**Record of unique request packet IDs around our acceptance window; never null.
273 * We use the final MAC segment ID, as that covers the entire frame.
274 * <p>
275 * We actually remember IDs for about twice the age implied by the skew
276 * so that if our clock is wrong by, or slips by, the maximum skew,
277 * we won't start admitting very old duplicate messages.
278 * <p>
279 * We should only add IDs if possible when we are already fairly sure of the source
280 * and when we have, for example, already checked for acceptable skew,
281 * to make any sort of DoS attack against us harder.
282 * <p>
283 * We should reject otherwise-acceptable messages that have an ID
284 * already present in this Map.
285 * <p>
286 * Thread-safe.
287 */
288 private final DuplicateIDChecker<ROByteArray> messageIDs =
289 DuplicateIDChecker.<ROByteArray>create(2*CoreConsts.MAX_PEER_CLOCK_SKEW_MS, "TunnelServlet.messageIDs");
290
291 // Wrapper to smuggle out internal IOException in wrapped packet handler...
292 @SuppressWarnings("serial")
293 private static final class WrappedIOException extends RuntimeException
294 { WrappedIOException(final IOException e) { super(e); } }
295
296 /**Respond to a POST request to open and use an HTTP tunnel (for a single transfer).
297 * Slave servers should refuse to respond to these requests;
298 * ie only respond if seemingly a master.
299 * <p>
300 * Normal slave servers are primarily authenticated by IP address
301 * and have full real/write access.
302 * <p>
303 * Uploader JWS clients are authenticated by username/password,
304 * and have partial, read-only access.
305 *
306 * @exception IOException if an input/output error occurs
307 */
308 @Override
309 public void doPost(final HttpServletRequest request,
310 final HttpServletResponse response)
311 throws IOException // , ServletException
312 {
313 // Record when we started to service the request...
314 final long startTime = System.currentTimeMillis();
315
316 final ServletContext context = getServletConfig().getServletContext();
317
318 // HTTP/1.0 headers.
319 response.setHeader("Expires", "0");
320 response.setHeader("Pragma", "no-cache");
321 // HTTP/1.1 headers.
322 response.setHeader("Cache-Control", "no-cache,no-store,must-revalidate");
323
324 if(isDebug) { logger.log("[Receiving tunnel request...]"); }
325
326 // // Note HTTP request...
327 // StatsLogger.captureDataPoint(statsIDTSV, TSVNAME_HTTPREQUEST);
328
329 // Screen out declared huge inbound frames very early,
330 // as we'll almost certainly bust ourselves trying to read/unpack them...
331 // Allow for the fact that these frames might be compressed too.
332 // Only accept something that represents a small fraction of
333 // the total heap size currently allocated.
334 if(request.getContentLength() > Math.min(Runtime.getRuntime().totalMemory() >>> 6, ExhibitDataTunnelSource.RawPacket.MAX_PAYLOAD_SIZE))
335 {
336 context.log("rejecting enormous inbound request frame early: " + request.getContentLength() + "bytes from " + request.getRemoteAddr());
337 // Set an appropriate HTTP error response code if possible.
338 response.setStatus(HttpServletResponse.SC_FORBIDDEN);
339 return;
340 }
341
342 final DataSourceBean ds = getDataSourceBean(getServletContext(), request);
343
344 // It is now possible for slaves to accept a limited range of requests.
345 // final Boolean isS = ds.isSlave();
346 // // If we don't know or we are definitely a slave, reject the request.
347 // if(!Boolean.FALSE.equals(isS))
348 // {
349 //if(isDebug) { logger.log("[ERROR: slave rejecting tunnel request.]"); }
350 // response.setStatus(HttpServletResponse.SC_FORBIDDEN);
351 // return;
352 // }
353
354 // Initialisation time for call and response...
355 final long initTime = System.currentTimeMillis();
356
357 // Values that we'll set later if we get far enough.
358 // long validationTime = 0; // When we finish validation...
359 // final AtomicLong requestReadTime = new AtomicLong(); // When we finish reading the request...
360 // final AtomicLong responseGenerationTime = new AtomicLong(); // When we finish generating the response...
361 // final AtomicReference<RawPacket.OpCode> opcode = new AtomicReference<RawPacket.OpCode>(); // The request's opcode.
362 // final AtomicLong requestSize = new AtomicLong(); // Request payload size.
363 // final AtomicLong responseSize = new AtomicLong(); // Response payload size.
364
365 // By default allow only partial, read-only access.
366 boolean partialAccess = true;
367
368 final String remoteAddr = request.getRemoteAddr();
369
370 try
371 {
372 // If the client does not seem to be authorised,
373 // tell them to go away!
374 // By default reject callers...
375
376 // Get the user's IP address.
377 final java.net.InetAddress addr = java.net.InetAddress.getByName(remoteAddr);
378
379 // If the client has ID/passwd, then authenticate with that.
380 final String userID = request.getHeader(ExhibitDataHTTPTunnelSource.HEADER_AUTH_ID);
381 if(userID != null)
382 {
383 // Authenticate by ID/passwd rather than by header.
384 if(!SimplepassProps.isAuthorUploadPasswordCorrect(userID,
385 request.getHeader(ExhibitDataHTTPTunnelSource.HEADER_AUTH_PW)))
386 {
387 context.log("rejecting putative tunnel user by ID/pass: " + userID + " @ " + remoteAddr);
388
389 // Note rejected HTTP request (by ID/password).
390 StatsLogger.captureDataPoint(statsIDTSV, TSVNAME_HTTPREJID);
391
392 // Set an appropriate HTTP error response code if possible.
393 response.setStatus(HttpServletResponse.SC_FORBIDDEN);
394 return;
395 }
396
397 // Authenticated (for partial read-only access) by password.
398 partialAccess = true;
399 }
400 // Try validating as a slave, ie a tunnel client, by the remote IP.
401 else if(getSecurityProps().tunnelClientIsOK(addr))
402 {
403 // Authenticated (for full tunnel access) by IP address.
404 partialAccess = false;
405 }
406 // Try validating the client as a peer mirror/slave.
407 // We'll only allow this access if bandwidth is not expensive
408 // and if we are not too busy.
409 else if((LocalProps.getServerSlowdownFactor() < 2) &&
410 isPeerMirror(ds, addr) &&
411 !WebUtils.isOverloaded(context))
412 {
413 // Authenticated (for partial tunnel access) by IP address.
414 partialAccess = true;
415 }
416 else // Unauthorised client...
417 {
418 context.log("rejecting putative tunnel user by address: " + remoteAddr);
419
420 // Note rejected HTTP request (by client address).
421 StatsLogger.captureDataPoint(statsIDTSV, TSVNAME_HTTPREJADDR);
422
423 // Set an appropriate HTTP error response code if possible.
424 response.setStatus(HttpServletResponse.SC_FORBIDDEN);
425 return;
426 }
427
428 // Once we have done preliminary authentication/screening of the request,
429 // look for a MAC header.
430 // Veto obviously-broken communications, even if we don't have a key.
431 PacketProtector ppM = null;
432 final String hdrMAC = request.getHeader(ExhibitDataHTTPTunnelSource.HEADER_MAC);
433 int frameSizeM = request.getContentLength(); // Estimated raw frame size (bytes): may be -1 if we don't know.
434 if(null != hdrMAC)
435 {
436 try { ppM = PacketProtector.fromCheckString(hdrMAC); }
437 catch(final IllegalArgumentException e) { logger.log("garbled MAC on request from: "+remoteAddr); }
438 //if(IsDebug.isDebug) { logger.log("INFO: received request MAC: " + ppM); }
439
440 // Veto responses with the wrong length (but not a missing Content-Length).
441 final int contentLengthRcvd = request.getContentLength();
442 if(contentLengthRcvd == -1)
443 { logger.log("WARNING: missing Content-Length on tunnel request from: "+remoteAddr); }
444 else if(ppM.length != request.getContentLength())
445 { response.setStatus(HttpServletResponse.SC_FORBIDDEN); return; }
446
447 frameSizeM = ppM.length; // Trust checked header length over Content-Length when we have both.
448
449 // Check that the remote peer's clock is not vastly wrong
450 // (or that a very old packet is reappearing out of nowhere).
451 // We assume that the skew allowance is generous enough to cover
452 // transit time of messages in the network.
453 final long now = System.currentTimeMillis();
454 if((ppM.timestamp < now - CoreConsts.MAX_PEER_CLOCK_SKEW_MS) ||
455 (ppM.timestamp > now + CoreConsts.MAX_PEER_CLOCK_SKEW_MS))
456 {
457 // This can happen if the remote machine's system clock is badly wrong.
458 System.err.println("ERROR: rejecting message with bad timestamp from: "+remoteAddr+" stamp: "+(new java.util.Date(ppM.timestamp)) + " @ "+(new java.util.Date(now)));
459 logger.log("ERROR: rejecting message with bad timestamp from: "+remoteAddr+" stamp: "+(new java.util.Date(ppM.timestamp)) + " @ "+(new java.util.Date(now)));
460 response.setStatus(HttpServletResponse.SC_FORBIDDEN); return;
461 }
462 // Veto duplicate packets/requests in this tunnel.
463 if(messageIDs.add(ppM.mac.get(ppM.mac.size()-1)) != null)
464 {
465 logger.log("ERROR: rejecting duplicate message from: "+remoteAddr);
466 response.setStatus(HttpServletResponse.SC_FORBIDDEN); return;
467 }
468 }
469 final PacketProtector pp = ppM;
470 final int frameSize = frameSizeM;
471
472 // Get the Xfer key if we have one.
473 final SecretKey xk = LocalProps.getXferHMACKey();
474
475 // If enforcing the MAC, and one is missing,
476 // then enforce at most partial (read-only) access.
477 if(ExhibitDataHTTPTunnelSource.ENFORCE_MAC &&
478 !partialAccess &&
479 ((xk != null) && (ppM == null)))
480 {
481 logger.log("INFO: did not receive MAC from client "+remoteAddr+": restricting to partial access");
482 partialAccess = true;
483 }
484
485 // // Note when initial validation/authorisation complete.
486 // validationTime = System.currentTimeMillis();
487
488 // Indicate that we are sending binary data down the line...
489 response.setContentType("application/octet-stream");
490
491 if(isDebug) { logger.log("ready to process request (frame size "+frameSize+") for tunnel user " + remoteAddr); }
492
493 final boolean partialAccessF = partialAccess;
494 final Runnable r = new Runnable()
495 {
496 public void run()
497 {
498 try {
499 // Get the request from the slave...
500 final ExhibitDataHTTPTunnelSource.RawPacket reqPacket;
501 try {
502 // Get access to the input and output streams...
503 // If we have (and can check) a MAC then we wrap the input for verification
504 // (which also buffers it),
505 // else we buffer the input stream for efficiency.
506 final InputStream isRaw = request.getInputStream();
507 final InputStream is = ((pp == null) || (xk == null)) ?
508 new BufferedInputStream(isRaw, 1024) :
509 pp.protectInputStream(xk, isRaw);
510
511 reqPacket = ExhibitDataHTTPTunnelSource.RawPacket.readPacket(is);
512 }
513 catch(final Error e)
514 {
515 // If the input packet causes us to barf badly,
516 // eg because it is huge (OOM) or badly malformed,
517 // then try to avoid it being sent to us again
518 // by pretending rejecting it with a remote exception.
519 // This may be futile since we have no memory (etc) to do anything.
520 writeResponsePacketToWire(response, xk, response.getOutputStream(),
521 REMEX_RESPONSE);
522 return;
523 }
524 // // Note when we have finished reading the raw request off the wire.
525 // requestReadTime.set(System.currentTimeMillis());
526
527 // For partial access reject all but specified op-codes.
528 // The allowed operations are all safe "read-only" activities,
529 // and allow the fetching of generic and exhibit properties,
530 // and the fetching of exhibit data and thumbnails,
531 // but no access at all to (possibly-slightly-sensitive) event data.
532 if(partialAccessF)
533 {
534 // A small number of "safe" non-write operations are allowed.
535 // This excludes all (read and write) operations on event data.
536 if(!ExhibitDataHTTPTunnelSource.safeTunnelOp(reqPacket.opCode))
537 {
538 context.log("rejecting partial-access tunnel user for op-code: " + (reqPacket.opCode) + " @ " + remoteAddr);
539
540 // Note rejected HTTP request (by client address).
541 StatsLogger.captureDataPoint(statsIDTSV, TSVNAME_HTTPREJOP);
542
543 // Quickly reject non-safe request with a proper packet...
544 // This OP__REMEX is not interpreted by the other end of the tunnel
545 // as indicating that the tunnel is broken,
546 // so thus this does not deter other, allowed, ops.
547 //throw new RuntimeException("illegal op-code");
548 writeResponsePacketToWire(response, xk, response.getOutputStream(),
549 REMEX_RESPONSE);
550 return;
551 }
552 }
553
554 // Note op type...
555 // opcode.set(reqPacket.opCode);
556 // requestSize.set(reqPacket.getActualPayloadLength());
557 StatsLogger.captureDataPoint(statsIDTSV, TSVNAMEPR_RPCTYPE + reqPacket.opCode);
558
559 // Get the output stream (as late as possible).
560 final ServletOutputStream os = response.getOutputStream();
561
562 try
563 {
564 // Handle the request,
565 // and serialise the result back onto the wire to the caller.
566 final ExhibitDataTunnelSource.RawPacket rawResponse = ExhibitDataTunnelSource.handleInboundRPC(getDataSource(),
567 reqPacket,
568 remoteAddr,
569 _RPC_cache,
570 logger);
571
572 // // Note when we've generated the response...
573 // responseGenerationTime.set(System.currentTimeMillis());
574 // responseSize.set(rawResponse.getActualPayloadLength());
575
576 // Write the response onto the wire...
577 writeResponsePacketToWire(response, xk, os, rawResponse);
578 }
579 // Generically handle common exception types with appropriate error responses.
580 catch(final PGMasterNotInServiceException e)
581 {
582 writeResponsePacketToWire(response, xk, os,
583 new ExhibitDataHTTPTunnelSource.RawPacket(
584 ExhibitDataHTTPTunnelSource.RawPacket.OpCode.PGMNISEX));
585 }
586 catch(final java.rmi.RemoteException e)
587 {
588 writeResponsePacketToWire(response, xk, os, REMEX_RESPONSE);
589 }
590 catch(final InterruptedIOException e)
591 {
592 writeResponsePacketToWire(response, xk, os,
593 new ExhibitDataHTTPTunnelSource.RawPacket(
594 ExhibitDataHTTPTunnelSource.RawPacket.OpCode.INTEX));
595 }
596 catch(final RuntimeException e)
597 {
598 // This indicates some kind of parameter problem
599 // and thus may need debugging/fixing...
600 context.log("RuntimeException handling tunnel user " + remoteAddr, e);
601 if(IsDebug.isDebug) { e.printStackTrace(); }
602 writeResponsePacketToWire(response, xk, os,
603 new ExhibitDataHTTPTunnelSource.RawPacket(
604 ExhibitDataHTTPTunnelSource.RawPacket.OpCode.RUNTEX));
605 }
606 // A generic IOException is a "remote" exception to the caller...
607 catch(final IOException e)
608 {
609 writeResponsePacketToWire(response, xk, os, REMEX_RESPONSE);
610 }
611
612 // Make sure that all output has been sent back to the caller.
613 os.flush();
614 os.close();
615 }
616 catch(final InvalidKeyException e)
617 {
618 throw new IllegalStateException(e); // Shouldn't happen...
619 }
620 catch(final IOException e)
621 {
622 throw new WrappedIOException(e); // Allows internal IOException to be extracted unambiguously.
623 }
624 }
625 };
626
627 // Handle small (known-size) frames directly...
628 if((frameSize <= LARGE_FRAME_BYTES) && (frameSize >= 0))
629 { r.run(); }
630 // Handle big (or unknown size) ones only if we have space
631 // and possibly only one at a time...
632 else
633 {
634 logger.log("about to try to process LARGE request (frame size "+frameSize+") for tunnel user " + remoteAddr);
635 // Assume that we'll need a fair multiplier over the frame size to handle it...
636 if(!MemoryTools.runMemoryIntensiveOperation(r, false, Math.max(1024, 4 * frameSize)))
637 {
638 context.log("Vetoed handling of large frame ("+frameSize+" bytes) from tunnel user " + remoteAddr);
639 // If we would not try to handle this large-looking request
640 // then report a RemoteException to the caller
641 // to allow other requests to be be sent through...
642 final ServletOutputStream os = response.getOutputStream();
643 writeResponsePacketToWire(response, xk, os, REMEX_RESPONSE);
644 os.flush();
645 os.close();
646 }
647 }
648 }
649 catch(final Exception e)
650 {
651 // Note failure...
652 StatsLogger.captureDataPoint(statsIDTSV, TSVNAME_HTTPFAIL);
653
654 context.log("Exception handling tunnel user " + remoteAddr, e);
655 e.printStackTrace();
656 if(e instanceof WrappedIOException) { throw ((IOException) e.getCause()); }
657 if(e instanceof IOException) { throw ((IOException) e); }
658 throw new IOException("ERROR: tunnel failure: " + e.getMessage(), e);
659 }
660 finally
661 {
662 final long endTime = System.currentTimeMillis();
663 final long serviceTime = endTime - startTime;
664 // final int sTE = GenUtils.log2Approx(serviceTime);
665 // // Note service time bucket...
666 // StatsLogger.captureDataPoint(statsIDTSV, TSVNAMEP_HTTPSERVICETIME + sTE);
667
668 // If the service time was long, note some details...
669 if(serviceTime > MAX_TYPICAL_SERVICE_TIME_MS)
670 {
671 context.log("WARNING: TunnelServlet: long service time: "+serviceTime+"ms...");
672 context.log(" Response set-up time: "+(initTime - startTime)+"ms.");
673 // final OpCode oc = opcode.get();
674 // if(oc != null)
675 // { context.log(" Request op-code: "+opcode+" ("+(oc.getCode() & 0xff)+")."); }
676 // if(oc != null) // Note slow operation...
677 // { StatsLogger.captureDataPoint(statsIDTSV, TSVNAMEP_HTTPSLOWOP + oc); }
678 // if(validationTime != 0)
679 // { context.log(" Validation/auth time: "+(validationTime - initTime)+"ms."); }
680 // if(requestSize.get() != 0)
681 // { context.log(" Request payload size: "+(requestSize.get())+" bytes."); }
682 // if(requestReadTime.get() != 0)
683 // { context.log(" Request read time: "+(requestReadTime.get() - validationTime)+"ms."); }
684 // if(responseGenerationTime.get() != 0)
685 // { context.log(" Response generation time: "+(responseGenerationTime.get() - requestReadTime.get())+"ms."); }
686 // if(responseSize.get() != 0)
687 // { context.log(" Response payload size: "+(responseSize.get())+" bytes."); }
688 // if(responseGenerationTime.get() != 0)
689 // { context.log(" Response write time: "+(endTime - responseGenerationTime.get())+"ms."); }
690 }
691 }
692 }
693
694 /**Efficiently write the raw response frame/packet to the write, with a MAC if possible.
695 * TODO: This should also log traffic volumes by type and destination.
696 */
697 private void writeResponsePacketToWire(final HttpServletResponse response,
698 final SecretKey xk,
699 final ServletOutputStream os,
700 final ExhibitDataTunnelSource.RawPacket rawResponse)
701 throws InvalidKeyException, IOException
702 {
703 response.setContentLength(rawResponse.getFrameLength());
704 if(xk != null) // Create the MAC if we have the Xfer key.
705 {
706 final PacketProtector ppResp = new PacketProtector(rawResponse, xk);
707 response.setHeader(ExhibitDataHTTPTunnelSource.HEADER_MAC, ppResp.toCheckString());
708 assert(ppResp.length == rawResponse.getFrameLength());
709 }
710 rawResponse.writePacket(os, true);
711 }
712
713 /**Returns true iff the address is that of a (peer) mirror/slave.
714 * Does this by looking at the IP addresses of inbound slave connections
715 * as seen by the master, so as to capture the <em>outgoing</em>
716 * IP addresses of each mirror.
717 * <p>
718 * No local/loopback addresses are considered valid.
719 *
720 * @param ds current data source; never null
721 * @param addr client address; never null
722 */
723 private static boolean isPeerMirror(final DataSourceBean ds, final InetAddress addr)
724 throws IOException
725 {
726 if(addr.isAnyLocalAddress() || addr.isLoopbackAddress() || addr.isLinkLocalAddress())
727 { return(false); /* Local addresses not valid for this. */ }
728
729 final SimpleVariableValue var = ds.getVariable(SystemVariables.TunnelServlet_SLAVE_ADDRS);
730 if(var == null)
731 { return(false); /* No extant slaves/mirrors at all. */ }
732
733 final String addrS = addr.getHostAddress();
734 if(addrS.equals(var.getValue()))
735 { return(true); }
736
737 final Map<InstanceID,SimpleVariableValue> globalMap = var.getGlobalMap();
738 if(globalMap == null)
739 { return(false); /* No extant slaves/mirrors at all. */ }
740
741 for(final SimpleVariableValue v : globalMap.values())
742 {
743 // If we find this address as a String,
744 // then this is a legitimate peer/mirror.
745 if(addrS.equals(v.getValue()))
746 { return(true); }
747 }
748
749 // Not a peer...
750 return(false);
751 }
752
753 /**Unique Serialisation class ID generated by http://random.hd.org/. */
754 private static final long serialVersionUID = -1276945887652232880L;
755
756 /**Factory method to create an instance given the ServletContext.
757 * Will fail if the CoreConsts.WAR_CTXTPARAM_BOOTURL param is not set.
758 */
759 public static ExhibitDataTunnelSource createFromContext(final ServletContext ctxt)
760 {
761 if(ctxt == null)
762 { throw new IllegalArgumentException("null ServletContext not allowed"); }
763
764 final String masterURL = ctxt.getInitParameter(CoreConsts.WAR_CTXTPARAM_BOOTURL);
765 if(masterURL == null)
766 { throw new IllegalArgumentException("missing context-param " + CoreConsts.WAR_CTXTPARAM_BOOTURL); }
767
768 final SimpleLoggerIF logger = new WebUtils.ServletLogger(ctxt);
769 return(new ExhibitDataHTTPTunnelSource(masterURL, "", logger));
770 }
771 }