001 /*
002 Copyright (c) 1996-2011, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029 package org.hd.d.pg2k.svrCore.datasource;
030
031 import java.io.ByteArrayInputStream;
032 import java.io.EOFException;
033 import java.io.IOException;
034 import java.io.InputStream;
035 import java.io.InterruptedIOException;
036 import java.io.OutputStream;
037 import java.net.HttpURLConnection;
038 import java.net.InetAddress;
039 import java.net.MalformedURLException;
040 import java.net.URL;
041 import java.net.URLConnection;
042 import java.net.URLStreamHandler;
043 import java.net.UnknownHostException;
044 import java.security.InvalidKeyException;
045 import java.util.HashMap;
046 import java.util.Map;
047 import java.util.concurrent.ConcurrentHashMap;
048 import java.util.concurrent.ConcurrentMap;
049 import java.util.concurrent.Semaphore;
050 import java.util.concurrent.TimeUnit;
051 import java.util.concurrent.locks.Lock;
052 import java.util.concurrent.locks.ReentrantLock;
053
054 import javax.crypto.SecretKey;
055
056 import org.hd.d.pg2k.svrCore.CoreConsts;
057 import org.hd.d.pg2k.svrCore.DefInputStream;
058 import org.hd.d.pg2k.svrCore.DuplicateIDChecker;
059 import org.hd.d.pg2k.svrCore.GenUtils;
060 import org.hd.d.pg2k.svrCore.PGMasterNotInServiceException;
061 import org.hd.d.pg2k.svrCore.ROByteArray;
062 import org.hd.d.pg2k.svrCore.Rnd;
063 import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
064 import org.hd.d.pg2k.svrCore.Tuple;
065 import org.hd.d.pg2k.svrCore.Tuple.Pair;
066 import org.hd.d.pg2k.svrCore.datasource.ExhibitDataTunnelSource.RawPacket.OpCode;
067 import org.hd.d.pg2k.svrCore.props.GenProps;
068 import org.hd.d.pg2k.svrCore.props.LocalProps;
069
070 import HTTPClient.HTTPConnection;
071 import HTTPClient.ProtocolNotSuppException;
072 import HTTPClient.URI;
073 import ORG.hd.d.IsDebug;
074
075
076 /**Exhibit pipeline stage that fetches its data across an HTTP[S] tunnel.
077 * Derived from the abstract tunnel source base class.
078 * <p>
079 * This class does two important things:
080 * <ul>
081 * <li>Sets up an HTTP connection with the correct characteristics
082 * (such as with sensible timeouts on open and read).
083 * <li>Controls the concurrency of the connection
084 * to avoid overwhelming the upstream server.
085 * </ul>
086 * <p>
087 * A limited amount of concurrency (ie more than one connection)
088 * may help overcome latency and improve throughput,
089 * especially over a lossy network or during long operations
090 * such as fetching the AEP or exhibit data blocks.
091 * <p>
092 * Concurrency should not be so large as to cause the master
093 * to start rejecting connections,
094 * eg no more than (say) the 8 connections that a normal
095 * Web browser might open to overcome latency.
096 * <p>
097 * Note also that the master may itself have limited concurrency
098 * available internally to service many types of operation.
099 * <p>
100 * Our concurrency limit may be by any type of RPC call,
101 * or at most one of any particular operation type
102 * (on the grounds that the master may only be able to service
103 * limited numbers of calls by any particular operation type).
104 * (At one point we limited concurrency using a simple Java monitor
105 * that was selected by a hash on the RPC type.
106 * Now we have one overall limit on the number of connections upstream
107 * and abort rather than waiting indefinitely to get a connection.)
108 * <p>
109 * We may vary this value between instances to help prevent collisions
110 * between different slaves, etc.
111 * <p>
112 * We may limit concurrency to 1 after any IOException
113 * until one or more successful RPCs have completed.
114 */
115 public final class ExhibitDataHTTPTunnelSource extends ExhibitDataTunnelSource
116 {
117 /**The full master-Web-server endpoint URL; never null. */
118 private final URL serverURL;
119
120 /**Get the endpoint URL of this tunnel; never null.
121 * We assume that it is safe to return the URL directly,
122 * ie that it behaves as if immutable.
123 */
124 public URL getServerURL()
125 { return(serverURL); }
126
127 /**User ID authentication info, never serialised; null if not set. */
128 private transient volatile String userID;
129
130 /**User ID password, never serialised; null if not set. */
131 private transient volatile String passwd;
132
133 /**Name of HTTP header for (optional) authentication user ID. */
134 public static final String HEADER_AUTH_ID = "X-Tunnel-Auth-ID";
135
136 /**Name of HTTP header for (optional) authentication password. */
137 public static final String HEADER_AUTH_PW = "X-Tunnel-Auth-PW";
138
139 /**Name of HTTP header for (optional) MAC and timestamp. */
140 public static final String HEADER_MAC = "X-Tunnel-MAC";
141
142 /**Set (or clear) additional authentication data in form of ID and password.
143 * When the server requires explicit authentication from the client,
144 * this form may be sufficient to allow full or partial (eg read-only)
145 * access.
146 * <p>
147 * To set authentication arguments must be non-null and non-empty,
148 * else all arguments must be null to clear authentication data.
149 */
150 public void setAuthenticationInfo(final String userID, final char[] passwd)
151 {
152 // Any null/empty argument implies unsetting the authentication data.
153 if((userID == null) || (userID.length() == 0) ||
154 (passwd == null) || (passwd.length == 0))
155 {
156 this.userID = null;
157 this.passwd = null;
158 return;
159 }
160
161 // The authentication data provided is syntactically OK.
162 // Store a private copy of the authentication data.
163 this.userID = userID;
164 this.passwd = new String(passwd);
165 }
166
167 /**Maximum number of concurrent connections to allow back to the master; strictly positive.
168 * We make this value public in case other modules
169 * would like to base their load regulation on this connection limit.
170 * <p>
171 * If >1 then we can potentially prevent any one RPC type
172 * hogging all the available connections.
173 * <p>
174 * Any value between 1 and 8 inclusive is probably reasonable;
175 * in practice there is rarely more than one connection open at once.
176 */
177 public static final int MAX_CONCURRENT_CONNECTIONS = 4;
178
179 /**If true allow excess connection attempts to queue for a while.
180 * Setting this false reduces threads blocking.
181 */
182 private static final boolean ALLOW_CONNECTION_QUEUEING = false;
183
184 /**If true, use "fair" lock access to try to reduce variance in connection service time.
185 * May be slightly slower (and is fatally buggy under JDK 1.5.0_05 or earlier).
186 * <p>
187 * Largely irrelevant if we don's allow connection queueing anyway.
188 */
189 private static final boolean USE_FAIR_LOCKS = ALLOW_CONNECTION_QUEUEING; // System.getProperty("java.version").compareTo("1.5.0_06") >= 0;
190 // static
191 // {
192 //if(IsDebug.isDebug) { System.out.println("[ExhibitDataHTTPTunnelSource.USE_FAIR_LOCKS="+USE_FAIR_LOCKS+"; java.version="+System.getProperty("java.version")+".]"); }
193 // }
194
195 /**Counting semaphore to limit RPC concurrency by total number of callers; never null */
196 private final Semaphore countingSemaphore =
197 new Semaphore(MAX_CONCURRENT_CONNECTIONS, USE_FAIR_LOCKS);
198
199 /**Default HTTP connection timeout (ms); strictly positive.
200 * This is set relatively low so that we don't block downstream/callers
201 * too long if the Net is lossy or slow.
202 * <p>
203 * We assume that the default HTTP connection timeout would be longer than
204 * the value chosen here.
205 * <p>
206 * We take failure to establish a connection quickly as indicative of
207 * generally poor connectivity.
208 * <p>
209 * (We may need to allow extra time for HTTPS or tunnelled connections
210 * to be set up.)
211 * <p>
212 * We may vary this value a little between instances to help prevent collisions
213 * between different clients at one master.
214 * <p>
215 * A value of a few seconds to a few tens of seconds is probably suitable,
216 * especially given the fact that connections that don't happen quickly
217 * rarely happen at all.
218 * <p>
219 * As at 20070430 from the UK approximate RTTs (over DSL which accounts for ~14ms)
220 * to: cn-bj1 RTT 600ms, 330ms au-nsw1, 270ms sg-wv1, 160ms in-bom1, 110ms us-ga1;
221 * thus we must allow for (say) at least 3*RTT for HTTP/TCP connection to establish,
222 * ie at least (say) 2s.
223 * <p>
224 * This should probably be somewhat lower than CoreConsts.MAX_RPC_RTT_MS
225 * since this connection phase is only one component of the total RPC time.
226 * <p>
227 * (Note that in Windows NT the hard-wired initial TCP timeout was 3s,
228 * which seems to have been a little too short for legitimate long-delay paths.)
229 */
230 private final int DEFAULT_HTTP_CONN_TIMEOUT_MS = Math.min(5003, (3*CoreConsts.MAX_TYPICAL_RPC_RTT_MS)/4) +
231 Rnd.fastRnd.nextInt(1003);
232
233 /**Default maximum HTTP read timeout (ms); strictly positive.
234 * This is set high enough not to wrongly time-out long-running operations,
235 * such as fetching the AEP, but short enough not to block clients indefinitely
236 * if something goes seriously wrong,
237 * eg a filesystem on the master hangs or dies.
238 * <p>
239 * This should be (much) larger than the CoreConsts.MAX_RPC_RTT_MS time.
240 * <p>
241 * This must probably be in the region of a few minutes to a few hours.
242 */
243 private static final int DEFAULT_HTTP_MAX_READ_TIMEOUT_MS = 20 * 60 * 1000 +
244 Rnd.fastRnd.nextInt(5 * 60 * 1000);
245
246 /**If true then have a watchdog thread ensure that no connection hangs indefinitely.
247 * This is only needed as a fallback because the read and connect timeouts
248 * on the HTTP URLConnection are not totally reliable as of JDK 1.5.0.
249 */
250 private static final boolean USE_WATCHDOG = true;
251
252 /**Thread-safe Map of connections to times they should have terminated.
253 * This may get checked in poll(),
254 * or where another inbound thread might otherwise be blocked.
255 * <p>
256 * This holds the Thread with the connection,
257 * the time that the connection should have been closed by,
258 * and the RPC type of the connection.
259 */
260 private final ConcurrentMap<HttpURLConnection, Tuple.Triple<Thread,Long,RawPacket.OpCode>> openConnections =
261 new ConcurrentHashMap<HttpURLConnection, Tuple.Triple<Thread,Long,RawPacket.OpCode>>();
262
263 /**Private lock to allowing one thread into killDeadConnections(); never null. */
264 private final Lock _kdcLock = new ReentrantLock();
265
266 /**Closes any connections that have been alive too long.
267 * Thread-safe.
268 * <p>
269 * We only allow one thread into this (per instance) at once,
270 * eg to limit the scale of any damage if one of these hangs
271 * due to problems with the underlying HTTP implementation.
272 */
273 private void killDeadConnections()
274 {
275 if(!_kdcLock.tryLock()) { return; /* One at a time, please! */ }
276 try {
277 // Copy into another map to use at our leisure.
278 // Guaranteed not to throw ConcurrentModificationException
279 // not otherwise misbehave.
280 final Map<HttpURLConnection, Tuple.Triple<Thread,Long,RawPacket.OpCode>> conns = new HashMap<HttpURLConnection, Tuple.Triple<Thread,Long,RawPacket.OpCode>>(openConnections);
281
282 // Kill any connection that should have died by now.
283 final long now = System.currentTimeMillis();
284 for(final HttpURLConnection conn : conns.keySet())
285 {
286 final Tuple.Triple<Thread,Long,RawPacket.OpCode> tuple = conns.get(conn);
287 final Long expires = tuple.second;
288 if(expires.longValue() < now)
289 {
290 logger.log("ERROR: ExhibitDataHTTPTunnelSource: attempting to kill hung connection (type "+tuple.third+") to "+serverURL+": "+conn);
291 try
292 {
293 // Try to wake/interrupt the jammed Thread.
294 tuple.first.interrupt();
295
296 // Attempt to force the connection closed
297 // and try to disconnect it as it may be jammed.
298 conn.disconnect();
299 // Note that attempting to getInputStream()
300 // seems to provoke JDK 1.6 into attempting to re-connect
301 // which then results in a deadlock.
302 // conn.getInputStream().close();
303 // conn.getOutputStream().close();
304 }
305 catch(final Throwable t)
306 {
307 t.printStackTrace();
308 }
309 }
310 }
311 }
312 finally { _kdcLock.unlock(); }
313 }
314
315 /**Returns true iff this operation is considered safe for an unauthenticated tunnel client.
316 * This has to allow just enough interaction (for example) for an upload client
317 * and possibly even a minimal read-only light-weight mirror.
318 *
319 * @param opCode op code to test; never null
320 * @return true if the operation is generally 'safe'
321 */
322 public static boolean safeTunnelOp(final OpCode opCode)
323 {
324 if(null == opCode) { throw new IllegalArgumentException(); }
325
326 switch(opCode)
327 {
328 // A small number of "safe" non-write operations are allowed.
329 // This excludes all (read and write) operations on event data.
330 case NOOP:
331 case GetGenProps:
332 case GetAllExhibitProperties:
333 case GetAllExhibitPropertiesDiff:
334 case GetThumbnails:
335 case GetStratum:
336 case GetRawFile: // Note that a master may still impose throttling on most potential clients.
337 return(true);
338 }
339
340 return(false);
341 }
342
343 /**Make an RPC call over HTTP[S] with the given outgoing packet.
344 * This controls concurrency (number of connections to the master).
345 * <p>
346 * If we cannot get a connection, because:
347 * <ul>
348 * <li>we have reached the self-imposed concurrency limit, or
349 * <li>there is a problem at the HTTP level (or in the upstream server),
350 * </ul>
351 * then we will try to time out and throw an IOException
352 * rather than blocking indefinitely.
353 *
354 * @param packetOut request packet; never null
355 * @return response packet; never null
356 * @throws IOException in case of I/O difficulties
357 */
358 @Override
359 protected RawPacket doRPCRaw(final RawPacket packetOut)
360 throws IOException
361 {
362 // Return the result as a complete packet,
363 // having checked the trailer, etc.
364 // We have a normal (short) read timeout.
365 // if(IsDebug.isDebug) { System.out.println("doRPCRaw("+packetOut+") START for endpoint "+this.serverURL); }
366 // try
367 // {
368 final RawPacket result = (RawPacket) _doRPCRawInternal(packetOut, false, false);
369 // if(IsDebug.isDebug) { System.out.println("doRPCRaw("+packetOut+") RESULT from endpoint "+this.serverURL + " is " + result); }
370 return(result);
371 // }
372 // finally
373 // {
374 // if(IsDebug.isDebug) { System.out.println("doRPCRaw("+packetOut+") FINISH for endpoint "+this.serverURL); }
375 // }
376 }
377
378 /**If true then we enforce MACs when we have a local xfer key.
379 * We may still try to send a request without a MAC (iff we don't have a key),
380 * but we won't accept a response without a MAC unless we have no key.
381 * <p>
382 * All but a safe subset of ops is rejected with an OP__RUNTEX (RuntimeException)
383 * on incoming requests without a verifiable MAC.
384 */
385 public static final boolean ENFORCE_MAC = true;
386
387 /**Record of unique response packet IDs around our acceptance window; never null.
388 * We use the final MAC segment ID, as that covers the entire frame.
389 * <p>
390 * We actually remember IDs for about twice the age implied by the skew
391 * so that if our clock is wrong by, or slips by, the maximum skew,
392 * we won't start admitting very old duplicate messages.
393 * <p>
394 * We should only ad IDs if possible when we are already fairly sure of the source
395 * and when we have, for example, already checked for acceptable skew,
396 * to make any sort of DoS attack against us harder.
397 * <p>
398 * We should reject otherwise-acceptable messages that have an ID
399 * already present in this Map.
400 * <p>
401 * FIXME: compute a sensible upper bound on size to prevent replays and all our memory being eaten.
402 * <p>
403 * Thread-safe.
404 */
405 private final DuplicateIDChecker<ROByteArray> messageIDs =
406 DuplicateIDChecker.<ROByteArray>create(2*CoreConsts.MAX_PEER_CLOCK_SKEW_MS, "ExhibitDataHTTPTunnelSource.messageIDs");
407
408 /**If true, then limit outgoing connections by RPC type when busy.
409 * This may help prevent a misbehaviour of one system component
410 * shutting out communications for the rest of the system.
411 */
412 private static final boolean LIMIT_BUSY_CONS_BY_RPC_TYPE = true;
413
414 /**Make an RPC call over HTTP[S] with the given outgoing packet.
415 * This controls concurrency (number of connections to the master).
416 * <p>
417 * If we cannot get a connection, because:
418 * <ul>
419 * <li>we have reached the self-imposed concurrency limit, or
420 * <li>there is a problem at the HTTP level (or in the upstream server),
421 * </ul>
422 * then we will eventually time out and throw an IOException
423 * rather than blocking indefinitely.
424 * <p>
425 * If we have a data-protecting "XferKey" then we always send a MAC.
426 * This enables the receiver to detect more errors in transmission.
427 *
428 * @param packetOut request packet; never null
429 * @param asStream if true, return as streamed result rather than packet
430 * @param allowBigReadTimeout if true then the RPC may take a lot of work at the master
431 * so we should be prepared to wait a long time before timing out
432 *
433 * @return response packet; never null
434 * @throws IOException in case of I/O difficulties
435 */
436 private Object _doRPCRawInternal(final RawPacket packetOut,
437 final boolean asStream,
438 final boolean allowBigReadTimeout)
439 throws IOException
440 {
441 assert(packetOut != null);
442
443 // Kill any dead connections that we find
444 // if we seem to be short of resources.
445 final int availablePermits = countingSemaphore.availablePermits();
446 if(USE_WATCHDOG && (availablePermits <= 0))
447 { killDeadConnections(); }
448
449 // Consider limiting connection by RPC operation type
450 // for fairness and robustness.
451 if(LIMIT_BUSY_CONS_BY_RPC_TYPE && (MAX_CONCURRENT_CONNECTIONS > 1) &&
452 (availablePermits <= MAX_CONCURRENT_CONNECTIONS/2))
453 {
454 // Only about half the maximum-permitted connections are free,
455 // so prevent further outgoing connections for RPC types
456 // of connections already in place.
457 //
458 // There are some slight race hazards here,
459 // ie killing some innocent victim RPCs
460 // when we're not as busy as we thought,
461 // but random early connection killing
462 // isn't necessarily too bad anyway when getting busy.
463 final RawPacket.OpCode type = packetOut.opCode;
464 // Guaranteed to be (thread-)safe to iterate over openConnections.
465 for(final Tuple.Triple<Thread,Long,RawPacket.OpCode> connDetails : openConnections.values())
466 {
467 if(connDetails.third == type)
468 {
469 // Log this selective veto for now (maybe mute later).
470 logger.log("WARNING: ExhibitDataHTTPTunnelSource: _doRPCRawInternal(): HTTP tunnel getting busy to "+serverURL+" ("+MAX_CONCURRENT_CONNECTIONS+" concurrent connections max) for opcode#"+packetOut.opCode);
471 // Veto extra connection of type already in progress...
472 throw new IOException("HTTP tunnel getting busy to "+serverURL+" ("+MAX_CONCURRENT_CONNECTIONS+" concurrent connections max) for opcode#"+packetOut.opCode);
473 }
474 }
475 }
476
477 try
478 {
479 if(ALLOW_CONNECTION_QUEUEING)
480 {
481 // Only wait to obtain a permit for something like
482 // the time that we allow for an HTTP connection.
483 // This avoids unexpectedly blocking users for a long time,
484 // though if there really is contention then it should probably be
485 // handled properly at a higher level.
486 if(!countingSemaphore.tryAcquire(DEFAULT_HTTP_CONN_TIMEOUT_MS/2,
487 TimeUnit.MILLISECONDS))
488 {
489 logger.log("_doRPCRawInternal(): timeout while waiting to acquire HTTP tunnel permit to "+serverURL);
490 throw new IOException("timeout while waiting to acquire HTTP tunnel permit to "+serverURL+" ("+MAX_CONCURRENT_CONNECTIONS+" concurrent connections max) for opcode#"+packetOut.opCode+"; queue length = " + countingSemaphore.getQueueLength());
491 }
492 }
493 else
494 {
495 // Baulk/quit if a connection is not immediately available.
496 if(!countingSemaphore.tryAcquire(0, TimeUnit.SECONDS))
497 {
498 logger.log("_doRPCRawInternal(): HTTP tunnel busy to "+serverURL);
499 throw new IOException("HTTP tunnel busy to "+serverURL+" ("+MAX_CONCURRENT_CONNECTIONS+" concurrent connections max) for opcode#"+packetOut.opCode+"; queue length = " + countingSemaphore.getQueueLength());
500 }
501 }
502 }
503 catch(final InterruptedException e)
504 { throw new IOException("thread interrupted waiting to start HTTP tunnel connection"); }
505 try
506 {
507 if(_protocolDebug) { System.err.println("[_doRPCRawInternal(): connecting upstream: " + serverURL + ".]"); }
508
509 // Construct a HTTP connection to the master server.
510 // Hopefully this will be transparently pooled by the JVM.
511 final HttpURLConnection connection =
512 (HttpURLConnection) serverURL.openConnection();
513
514 // Set an explicit fairly short connection timeout
515 // so as to be intolerant of poor Net connectivity,
516 // ie don't attempt to use a connection with very poor behaviour.
517 // We don't set a very long read timeout as some calls
518 // may take a lot of remote work before they can start to reply,
519 // but we'll still manage to escape a call that blocks completely.
520 if(!CoreConsts.AVOID_UNSAFE_TCP_TIMEOUTS)
521 {
522 connection.setConnectTimeout(DEFAULT_HTTP_CONN_TIMEOUT_MS);
523 // Unless we are doing a very slow and/or CPU-expensive RPC,
524 // use something more like the connection timeout if appropriate.
525 connection.setReadTimeout(allowBigReadTimeout ? DEFAULT_HTTP_MAX_READ_TIMEOUT_MS :
526 Math.min(DEFAULT_HTTP_MAX_READ_TIMEOUT_MS, 2*DEFAULT_HTTP_CONN_TIMEOUT_MS));
527 }
528
529 // Don't get any data from cache: must be a live connection!
530 connection.setUseCaches(false);
531 // Exact given URL must be valid...
532 connection.setInstanceFollowRedirects(false);
533 // Allow bidirectional communication.
534 connection.setDoOutput(true);
535 connection.setDoInput(true);
536 // Don't allow user interaction.
537 connection.setAllowUserInteraction(false);
538
539 // We know what the content length will be
540 // so set it here to allow some HTTP performance optimisations.
541 final int contentLength = packetOut.getFrameLength();
542 connection.setRequestProperty("Content-Length", String.valueOf(contentLength));
543 connection.setFixedLengthStreamingMode(contentLength);
544
545 // If authentication/check data is available,
546 // then apply it to the request.
547 final String uID = userID;
548 final String pwd = passwd;
549 if((uID != null) && (pwd != null))
550 {
551 connection.setRequestProperty(HEADER_AUTH_ID, uID);
552 connection.setRequestProperty(HEADER_AUTH_PW, pwd);
553 }
554 // Apply a MAC to our outbound request for extra error-detection if we can.
555 SecretKey xk = null;
556 try
557 {
558 xk = LocalProps.getXferHMACKey();
559 if(xk != null) // Need a key to create the MAC.
560 {
561 try
562 {
563 final PacketProtector pp = new PacketProtector(packetOut, xk);
564 final String checkString = pp.toCheckString();
565 if(IsDebug.isDebug) { logger.log("INFO: applying request MAC: " + checkString); }
566 connection.setRequestProperty(HEADER_MAC, checkString);
567 }
568 catch(final InvalidKeyException e)
569 {
570 // In case of error, don't apply the MAC header, but do complain loudly!
571 e.printStackTrace();
572 }
573 }
574 }
575 catch(final Exception e1)
576 {
577 // Absorb/log any other exception that we encounter,
578 // but attempt to continue.
579 e1.printStackTrace();
580 }
581
582 // Do simple HTTP security/sanity check...
583 assert(!connection.usingProxy());
584
585 if(_protocolDebug) { System.err.println(" [_doRPCRawInternal(): about to writePacket().]"); }
586
587 if(USE_WATCHDOG)
588 {
589 final long stopBy = System.currentTimeMillis() +
590 DEFAULT_HTTP_CONN_TIMEOUT_MS +
591 DEFAULT_HTTP_MAX_READ_TIMEOUT_MS;
592 openConnections.put(connection, new Tuple.Triple<Thread,Long,RawPacket.OpCode>(Thread.currentThread(), new Long(stopBy), packetOut.opCode));
593 }
594
595 try
596 {
597 // Send the output (request) packet, compressed if appropriate...
598 final OutputStream os = connection.getOutputStream();
599 try { packetOut.writePacket(os); }
600 // Release resources.
601 finally { os.close(); }
602
603 if(_protocolDebug) { System.err.println(" [_doRPCRawInternal(): about to open input/response stream.]"); }
604
605 final int responseCode = connection.getResponseCode();
606 if(responseCode != 200)
607 { throw new IOException("ERROR: RPC failed: HTTP POST response code "+responseCode); }
608
609 // Look for a MAC header on the response.
610 // Veto obviously-broken communications, even if we don't have a key.
611 PacketProtector pp = null;
612 final String hdrMAC = connection.getHeaderField(HEADER_MAC);
613 if(null != hdrMAC)
614 {
615 try { pp = PacketProtector.fromCheckString(hdrMAC); }
616 catch(final IllegalArgumentException e) { logger.log("garbled MAC on response"); }
617 if(IsDebug.isDebug) { logger.log("INFO: received response MAC: " + pp); }
618
619 // Veto responses with a definitely-wrong length.
620 // (For now just warn about a missing Content-Length.)
621 final int contentLengthRcvd = connection.getContentLength();
622 if(contentLengthRcvd == -1)
623 { logger.log("WARNING: missing Content-Length on response in tunnel"); }
624 else if(pp.length != contentLengthRcvd)
625 { throw new IOException("ERROR: wrong-length (or missing Content-Length) response received"); }
626 // Check that the remote peer's clock is not vastly wrong
627 // (or that a very old packet is reappearing out of nowhere).
628 // We assume that the skew allowance is generous enough to cover
629 // actual transit time of messages in the network.
630 final long now = System.currentTimeMillis();
631 if((pp.timestamp < now - CoreConsts.MAX_PEER_CLOCK_SKEW_MS) ||
632 (pp.timestamp > now + CoreConsts.MAX_PEER_CLOCK_SKEW_MS))
633 { throw new IOException("ERROR: discarded response with bad timestamp: check local and peer clocks"); }
634 // Veto duplicate packets/requests in this tunnel.
635 if(messageIDs.add(pp.mac.get(pp.mac.size()-1)) != null)
636 { throw new IOException("ERROR: duplicate response received"); }
637 }
638
639 if(ENFORCE_MAC && ((xk != null) && (pp == null)))
640 {
641 logger.log("ERROR: did not receive MAC on RPC "+packetOut.opCode+" response from "+serverURL+" so discarding because unsafe: MAC hdr="+hdrMAC);
642 throw new IOException("RPC "+packetOut.opCode+" response from "+serverURL+" not protected by MAC, thus not safe to use");
643 }
644
645 // Get the response input stream.
646 // If we have the XferKey and a MAC then always verify the input stream.
647 // This protects all of the input data, including the RawPacket headers.
648 // Using the stream mechanism helps spread the verification load,
649 // as well as transparently permitting streamed handling of responses.
650 final InputStream isRaw = connection.getInputStream();
651 final InputStream is = ((pp == null) || (xk == null)) ? isRaw :
652 pp.protectInputStream(xk, isRaw);
653
654 // Return the input stream for a streamed response.
655 if(asStream)
656 { return(is); }
657
658 try
659 {
660 if(_protocolDebug) { System.err.println(" [_doRPCRawInternal(): about to readPacket().]"); }
661
662 final RawPacket result = RawPacket.readPacket(is);
663
664 if(_protocolDebug) { System.err.println(" [_doRPCRawInternal(): about to return result.]"); }
665
666 // Return result successfully.
667 return(result);
668 }
669 finally
670 {
671 // Explicitly close the input stream now that we've read the packet.
672 is.close();
673
674 // Try to discard the connection if that is our policy.
675 if(!KEEP_SERVER_CONNECTION_ALIVE) { connection.disconnect(); }
676 }
677 }
678 // Attempt to discard a possibly-broken (pool) connection...
679 catch(final IOException e) { connection.disconnect(); throw e; }
680 // Remove connection from pool being monitored...
681 finally
682 {
683 // Record that the connection has been closed
684 // (or is not being handled by the caller now).
685 if(USE_WATCHDOG) { openConnections.remove(connection); }
686 }
687 }
688 finally { countingSemaphore.release(); }
689 }
690
691 // Inherit javadoc comment...
692 @Override
693 protected Tuple.Pair<Integer, InputStream> doRPCRawWithStreamResponse(final RawPacket packetOut,
694 final boolean allowBigReadTimeout)
695 throws IOException
696 {
697 // Get the raw input/response stream...
698 final InputStream is = (InputStream) _doRPCRawInternal(packetOut, true, allowBigReadTimeout);
699
700 // Start reading/decoding the raw packet header.
701 // Read the opcode (byte) header field.
702 final int ch = is.read();
703 if(ch < 0) { throw new EOFException(); }
704 final byte opC = (byte) ch;
705
706 // Check for mismatched response packets
707 // or remote exceptions codes with special response op-codes.
708 if(opC != packetOut.opCode.getCode())
709 {
710 // Deal specially with some fixed remote exception types.
711 switch(opC)
712 {
713 case RawPacket.OP__PGMNISEX:
714 { throw new PGMasterNotInServiceException(); }
715 case RawPacket.OP__RUNTEX:
716 { throw new RuntimeException("remote threw RuntimeException"); }
717 case RawPacket.OP__REMEX:
718 { throw new IOException("remote threw RemoteException"); }
719 case RawPacket.OP__INTEX:
720 { throw new InterruptedIOException("remote operation interrupted"); }
721 }
722 throw new IOException("ExhibitDataHTTPTunnelSource.doRPCRawWithStreamResponse(): packet response-type mismatch; got "+opC+" expected "+packetOut.opCode);
723 }
724
725 // Read the (int) length header field.
726 final int ch1 = is.read();
727 final int ch2 = is.read();
728 final int ch3 = is.read();
729 final int ch4 = is.read();
730 if((ch1 | ch2 | ch3 | ch4) < 0) { throw new EOFException(); }
731 final int rawLen = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
732
733 // In the simplest (and fairly common) case of an empty response,
734 // for efficiency and robustness we close the original response stream immediately
735 // and return an empty immutable response to the caller.
736 if(rawLen == 0)
737 {
738 // Double-check that we find our trailer (and nothing else)
739 // after the empty payload.
740 int ic;
741 if(((byte)(ic = is.read())) != RawPacket.TRAILER) { throw new IOException("missing frame trailer byte, got "+ic+": op "+opC); }
742 if((ic = is.read()) != -1) { throw new IOException("garbage ("+ic+") present after frame trailer rather than EOF"); }
743
744 is.close(); // Immediate HTTP stream close to release resources ASAP.
745 return(EMPTY_RESPONSE_STREAM);
746 }
747
748 // Check that the reported packet length is within bounds.
749 if((rawLen > RawPacket.MAX_PAYLOAD_SIZE) ||
750 (rawLen < -RawPacket.MAX_PAYLOAD_SIZE))
751 {
752 is.close();
753 throw new IOException("Bad (huge) packet length");
754 }
755
756 final boolean isCompressed = (rawLen < 0);
757
758 // In the simple (uncompressed-data) case we return the stream as-is.
759 // This leaves the trailer byte on the stream visible to the caller,
760 // but the true length of the data payload is returned too.
761 if(!isCompressed)
762 { return(new Tuple.Pair<Integer, InputStream>(Integer.valueOf(rawLen), is)); }
763
764 // We wrap the underlying input in our custom deflater stream.
765 // We do not know the final result length so return a null length value.
766 // This leaves the trailer byte on the stream visible to the deflater,
767 // but the deflater will not use its value though may read it.
768 // We wrap the stream in a non-aggressive data pump
769 // to improve throughput for large results.
770 final InputStream cis = GenUtils.dataPump(new DefInputStream(is, true), 1<<16, false);
771 return(new Tuple.Pair<Integer, InputStream>(null, cis));
772 }
773
774 /**Empty, immutable/uncloseable InputStream that we can return to all callers. */
775 private static final InputStream EMPTY_INPUTSTREAM = new ByteArrayInputStream(new byte[0]);
776
777 /**Empty, immutable/uncloseable response stream that we can return to all callers. */
778 private static final Pair<Integer, InputStream> EMPTY_RESPONSE_STREAM = new Tuple.Pair<Integer, InputStream>(Integer.valueOf(0), EMPTY_INPUTSTREAM);
779
780 /**Address to bind to for outbound connections, or null if any available address is OK.
781 * At the moment we only accept this in a backdoor setting via a system property
782 * (and don't complain if property access is not allowed).
783 * <p>
784 * If using this, eg with <code>-Dorg.hd.pg2k.tunnelLocalAddr=85.158.46.82</code>
785 * (and if HTTPClient V0.3-3 is still in use for these 'local' connections)
786 * then it may be wise to work round a bug with <code>-DHTTPClient.disableKeepAlives=true</code>.
787 */
788 private static final InetAddress localAddr;
789 /**Initialise localAddr. */
790 static
791 {
792 String rawLocalAddr = null;
793 try { rawLocalAddr = System.getProperty("org.hd.pg2k.tunnelLocalAddr"); }
794 catch(final SecurityException e) { /* Silently ignore. */ }
795 if(rawLocalAddr == null) { localAddr = null; }
796 else
797 {
798 InetAddress lA = null;
799 try { lA = InetAddress.getByName(rawLocalAddr); }
800 catch(final UnknownHostException e) { }
801 localAddr = lA;
802 }
803 }
804
805 /**Short upstream name for peer; never null but can be "". */
806 private final String shortUpstreamName;
807
808 /**Return short unique name of upstream peer/server suitable for Stratum; never null but can be "" for 'unknown'.
809 * Override this in implementations that know the upstream name.
810 */
811 protected String _getStratumUpstreamName()
812 { return(shortUpstreamName); }
813
814 /**Construct a new HTTP tunnel data source to the given URL.
815 * Does not necessarily try to build the tunnel immediately.
816 *
817 * @param upstreamURL URL to reach the server's tunnel,
818 * of the form http://<i>serverName</i>/tunnelMountPoint; never null
819 *
820 * @throws IllegalArgumentException if the URL is invalid or unusable.
821 */
822 public ExhibitDataHTTPTunnelSource(final String upstreamURL,
823 final String shortUpstreamName,
824 final SimpleLoggerIF logger)
825 {
826 super(logger);
827
828 if(upstreamURL == null) { throw new IllegalArgumentException("null upstreamURL not allowed"); }
829 if(shortUpstreamName == null) { throw new IllegalArgumentException("null shortUpstreamName not allowed"); }
830
831 this.shortUpstreamName = shortUpstreamName;
832
833 // If we want to bind outgoing connections to a particular local address
834 // then we have to use a custom URLStreamHandler.
835 final URLStreamHandler handler = (localAddr == null) ? null :
836 (new URLStreamHandler(){
837 @Override
838 public URLConnection openConnection(final URL url)
839 throws IOException, ProtocolNotSuppException
840 {
841 return(new HTTPClient.HttpURLConnection(url) {
842 /**Overrides mechanism to get new connection.
843 * Lookup key and connection use the specified local address.
844 */
845 @Override
846 protected HTTPConnection getConnection(final URL url)
847 throws ProtocolNotSuppException
848 {
849 // try the cache, using the host name, port, etc.
850 final String php = url.getProtocol() + ":" + url.getHost().toLowerCase() + ":" +
851 ((url.getPort() != -1) ? url.getPort() :
852 URI.defaultPort(url.getProtocol())) +
853 " from " + localAddr.getHostAddress();
854
855 HTTPConnection con = (HTTPConnection) connections.get(php);
856 if (con != null) {
857 return con;
858 }
859
860 // Not in cache, so create new one and cache it.
861 con = new HTTPClient.HTTPConnection(url.getProtocol(),
862 url.getHost(), url.getPort(),
863 localAddr, 0); /* Zero local port implies 'any'. */
864 connections.put(php, con);
865
866 return con;
867 }
868 });
869 }
870 });
871
872 try { serverURL = new URL(null, upstreamURL, handler); }
873 catch(final MalformedURLException e)
874 { throw new IllegalArgumentException("invalid masterURL \""+upstreamURL+"\": " + e.getMessage()); }
875
876 if(_protocolDebug)
877 {
878 System.out.println("ExhibitDataHTTPTunnelSource("+upstreamURL+") instance created");
879 }
880 }
881
882 /**Poll periodically.
883 * We call super.poll(gp) to ensure that base-class polling work gets done,
884 * including any upstream poll() calls.
885 */
886 @Override
887 public void poll(final GenProps gp)
888 {
889 try
890 {
891 // Kill any dead connections that we find.
892 if(USE_WATCHDOG) { killDeadConnections(); }
893
894 final int permits = countingSemaphore.availablePermits();
895 if(permits <= 0)
896 {
897 logger.log("ExhibitDataHTTPTunnelSource: WARNING: full HTTP tunnel to "+serverURL+": queue="+(countingSemaphore.getQueueLength())+".");
898 }
899 else if(permits <= MAX_CONCURRENT_CONNECTIONS/2)
900 {
901 logger.log("ExhibitDataHTTPTunnelSource: WARNING: busy HTTP tunnel to "+serverURL+".");
902 }
903 }
904 finally { super.poll(gp); }
905 }
906 }