001    /*
002    Copyright (c) 1996-2011, Damon Hart-Davis
003    All rights reserved.
004    
005    Redistribution and use in source and binary forms, with or without
006    modification, are permitted provided that the following conditions are
007    met:
008    
009      * Redistributions of source code must retain the above copyright
010        notice, this list of conditions and the following disclaimer.
011    
012      * Redistributions in binary form must reproduce the above copyright
013        notice, this list of conditions and the following disclaimer in the
014        documentation and/or other materials provided with the
015        distribution.
016    
017    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028    */
029    package org.hd.d.pg2k.svrCore.datasource;
030    
031    import java.io.ByteArrayInputStream;
032    import java.io.EOFException;
033    import java.io.IOException;
034    import java.io.InputStream;
035    import java.io.InterruptedIOException;
036    import java.io.OutputStream;
037    import java.net.HttpURLConnection;
038    import java.net.InetAddress;
039    import java.net.MalformedURLException;
040    import java.net.URL;
041    import java.net.URLConnection;
042    import java.net.URLStreamHandler;
043    import java.net.UnknownHostException;
044    import java.security.InvalidKeyException;
045    import java.util.HashMap;
046    import java.util.Map;
047    import java.util.concurrent.ConcurrentHashMap;
048    import java.util.concurrent.ConcurrentMap;
049    import java.util.concurrent.Semaphore;
050    import java.util.concurrent.TimeUnit;
051    import java.util.concurrent.locks.Lock;
052    import java.util.concurrent.locks.ReentrantLock;
053    
054    import javax.crypto.SecretKey;
055    
056    import org.hd.d.pg2k.svrCore.CoreConsts;
057    import org.hd.d.pg2k.svrCore.DefInputStream;
058    import org.hd.d.pg2k.svrCore.DuplicateIDChecker;
059    import org.hd.d.pg2k.svrCore.GenUtils;
060    import org.hd.d.pg2k.svrCore.PGMasterNotInServiceException;
061    import org.hd.d.pg2k.svrCore.ROByteArray;
062    import org.hd.d.pg2k.svrCore.Rnd;
063    import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
064    import org.hd.d.pg2k.svrCore.Tuple;
065    import org.hd.d.pg2k.svrCore.Tuple.Pair;
066    import org.hd.d.pg2k.svrCore.datasource.ExhibitDataTunnelSource.RawPacket.OpCode;
067    import org.hd.d.pg2k.svrCore.props.GenProps;
068    import org.hd.d.pg2k.svrCore.props.LocalProps;
069    
070    import HTTPClient.HTTPConnection;
071    import HTTPClient.ProtocolNotSuppException;
072    import HTTPClient.URI;
073    import ORG.hd.d.IsDebug;
074    
075    
076    /**Exhibit pipeline stage that fetches its data across an HTTP[S] tunnel.
077     * Derived from the abstract tunnel source base class.
078     * <p>
079     * This class does two important things:
080     * <ul>
081     * <li>Sets up an HTTP connection with the correct characteristics
082     *     (such as with sensible timeouts on open and read).
083     * <li>Controls the concurrency of the connection
084     *     to avoid overwhelming the upstream server.
085     * </ul>
086     * <p>
087     * A limited amount of concurrency (ie more than one connection)
088     * may help overcome latency and improve throughput,
089     * especially over a lossy network or during long operations
090     * such as fetching the AEP or exhibit data blocks.
091     * <p>
092     * Concurrency should not be so large as to cause the master
093     * to start rejecting connections,
094     * eg no more than (say) the 8 connections that a normal
095     * Web browser might open to overcome latency.
096     * <p>
097     * Note also that the master may itself have limited concurrency
098     * available internally to service many types of operation.
099     * <p>
100     * Our concurrency limit may be by any type of RPC call,
101     * or at most one of any particular operation type
102     * (on the grounds that the master may only be able to service
103     * limited numbers of calls by any particular operation type).
104     * (At one point we limited concurrency using a simple Java monitor
105     * that was selected by a hash on the RPC type.
106     * Now we have one overall limit on the number of connections upstream
107     * and abort rather than waiting indefinitely to get a connection.)
108     * <p>
109     * We may vary this value between instances to help prevent collisions
110     * between different slaves, etc.
111     * <p>
112     * We may limit concurrency to 1 after any IOException
113     * until one or more successful RPCs have completed.
114     */
115    public final class ExhibitDataHTTPTunnelSource extends ExhibitDataTunnelSource
116        {
117        /**The full master-Web-server endpoint URL; never null. */
118        private final URL serverURL;
119    
120        /**Get the endpoint URL of this tunnel; never null.
121         * We assume that it is safe to return the URL directly,
122         * ie that it behaves as if immutable.
123         */
124        public URL getServerURL()
125            { return(serverURL); }
126    
127        /**User ID authentication info, never serialised; null if not set. */
128        private transient volatile String userID;
129    
130        /**User ID password, never serialised; null if not set. */
131        private transient volatile String passwd;
132    
133        /**Name of HTTP header for (optional) authentication user ID. */
134        public static final String HEADER_AUTH_ID = "X-Tunnel-Auth-ID";
135    
136        /**Name of HTTP header for (optional) authentication password. */
137        public static final String HEADER_AUTH_PW = "X-Tunnel-Auth-PW";
138    
139        /**Name of HTTP header for (optional) MAC and timestamp. */
140        public static final String HEADER_MAC = "X-Tunnel-MAC";
141    
142        /**Set (or clear) additional authentication data in form of ID and password.
143         * When the server requires explicit authentication from the client,
144         * this form may be sufficient to allow full or partial (eg read-only)
145         * access.
146         * <p>
147         * To set authentication arguments must be non-null and non-empty,
148         * else all arguments must be null to clear authentication data.
149         */
150        public void setAuthenticationInfo(final String userID, final char[] passwd)
151            {
152            // Any null/empty argument implies unsetting the authentication data.
153            if((userID == null) || (userID.length() == 0) ||
154               (passwd == null) || (passwd.length == 0))
155                {
156                this.userID = null;
157                this.passwd = null;
158                return;
159                }
160    
161            // The authentication data provided is syntactically OK.
162            // Store a private copy of the authentication data.
163            this.userID = userID;
164            this.passwd = new String(passwd);
165            }
166    
167        /**Maximum number of concurrent connections to allow back to the master; strictly positive.
168         * We make this value public in case other modules
169         * would like to base their load regulation on this connection limit.
170         * <p>
171         * If &gt;1 then we can potentially prevent any one RPC type
172         * hogging all the available connections.
173         * <p>
174         * Any value between 1 and 8 inclusive is probably reasonable;
175         * in practice there is rarely more than one connection open at once.
176         */
177        public static final int MAX_CONCURRENT_CONNECTIONS = 4;
178    
179        /**If true allow excess connection attempts to queue for a while.
180         * Setting this false reduces threads blocking.
181         */
182        private static final boolean ALLOW_CONNECTION_QUEUEING = false;
183    
184        /**If true, use "fair" lock access to try to reduce variance in connection service time.
185         * May be slightly slower (and is fatally buggy under JDK 1.5.0_05 or earlier).
186         * <p>
187         * Largely irrelevant if we don's allow connection queueing anyway.
188         */
189        private static final boolean USE_FAIR_LOCKS = ALLOW_CONNECTION_QUEUEING; // System.getProperty("java.version").compareTo("1.5.0_06") >= 0;
190    //    static
191    //        {
192    //if(IsDebug.isDebug) { System.out.println("[ExhibitDataHTTPTunnelSource.USE_FAIR_LOCKS="+USE_FAIR_LOCKS+"; java.version="+System.getProperty("java.version")+".]"); }
193    //        }
194    
195        /**Counting semaphore to limit RPC concurrency by total number of callers; never null */
196        private final Semaphore countingSemaphore =
197            new Semaphore(MAX_CONCURRENT_CONNECTIONS, USE_FAIR_LOCKS);
198    
199        /**Default HTTP connection timeout (ms); strictly positive.
200         * This is set relatively low so that we don't block downstream/callers
201         * too long if the Net is lossy or slow.
202         * <p>
203         * We assume that the default HTTP connection timeout would be longer than
204         * the value chosen here.
205         * <p>
206         * We take failure to establish a connection quickly as indicative of
207         * generally poor connectivity.
208         * <p>
209         * (We may need to allow extra time for HTTPS or tunnelled connections
210         * to be set up.)
211         * <p>
212         * We may vary this value a little between instances to help prevent collisions
213         * between different clients at one master.
214         * <p>
215         * A value of a few seconds to a few tens of seconds is probably suitable,
216         * especially given the fact that connections that don't happen quickly
217         * rarely happen at all.
218         * <p>
219         * As at 20070430 from the UK approximate RTTs (over DSL which accounts for ~14ms)
220         * to: cn-bj1 RTT 600ms, 330ms au-nsw1, 270ms sg-wv1, 160ms in-bom1, 110ms us-ga1;
221         * thus we must allow for (say) at least 3*RTT for HTTP/TCP connection to establish,
222         * ie at least (say) 2s.
223         * <p>
224         * This should probably be somewhat lower than CoreConsts.MAX_RPC_RTT_MS
225         * since this connection phase is only one component of the total RPC time.
226         * <p>
227         * (Note that in Windows NT the hard-wired initial TCP timeout was 3s,
228         * which seems to have been a little too short for legitimate long-delay paths.)
229         */
230        private final int DEFAULT_HTTP_CONN_TIMEOUT_MS = Math.min(5003, (3*CoreConsts.MAX_TYPICAL_RPC_RTT_MS)/4) +
231                Rnd.fastRnd.nextInt(1003);
232    
233        /**Default maximum HTTP read timeout (ms); strictly positive.
234         * This is set high enough not to wrongly time-out long-running operations,
235         * such as fetching the AEP, but short enough not to block clients indefinitely
236         * if something goes seriously wrong,
237         * eg a filesystem on the master hangs or dies.
238         * <p>
239         * This should be (much) larger than the CoreConsts.MAX_RPC_RTT_MS time.
240         * <p>
241         * This must probably be in the region of a few minutes to a few hours.
242         */
243        private static final int DEFAULT_HTTP_MAX_READ_TIMEOUT_MS = 20 * 60 * 1000 +
244                Rnd.fastRnd.nextInt(5 * 60 * 1000);
245    
246        /**If true then have a watchdog thread ensure that no connection hangs indefinitely.
247         * This is only needed as a fallback because the read and connect timeouts
248         * on the HTTP URLConnection are not totally reliable as of JDK 1.5.0.
249         */
250        private static final boolean USE_WATCHDOG = true;
251    
252        /**Thread-safe Map of connections to times they should have terminated.
253         * This may get checked in poll(),
254         * or where another inbound thread might otherwise be blocked.
255         * <p>
256         * This holds the Thread with the connection,
257         * the time that the connection should have been closed by,
258         * and the RPC type of the connection.
259         */
260        private final ConcurrentMap<HttpURLConnection, Tuple.Triple<Thread,Long,RawPacket.OpCode>> openConnections =
261            new ConcurrentHashMap<HttpURLConnection, Tuple.Triple<Thread,Long,RawPacket.OpCode>>();
262    
263        /**Private lock to allowing one thread into killDeadConnections(); never null. */
264        private final Lock _kdcLock = new ReentrantLock();
265    
266        /**Closes any connections that have been alive too long.
267         * Thread-safe.
268         * <p>
269         * We only allow one thread into this (per instance) at once,
270         * eg to limit the scale of any damage if one of these hangs
271         * due to problems with the underlying HTTP implementation.
272         */
273        private void killDeadConnections()
274            {
275            if(!_kdcLock.tryLock()) { return; /* One at a time, please! */ }
276            try {
277                // Copy into another map to use at our leisure.
278                // Guaranteed not to throw ConcurrentModificationException
279                // not otherwise misbehave.
280                final Map<HttpURLConnection, Tuple.Triple<Thread,Long,RawPacket.OpCode>> conns = new HashMap<HttpURLConnection, Tuple.Triple<Thread,Long,RawPacket.OpCode>>(openConnections);
281    
282                // Kill any connection that should have died by now.
283                final long now = System.currentTimeMillis();
284                for(final HttpURLConnection conn : conns.keySet())
285                    {
286                    final Tuple.Triple<Thread,Long,RawPacket.OpCode> tuple = conns.get(conn);
287                    final Long expires = tuple.second;
288                    if(expires.longValue() < now)
289                        {
290                        logger.log("ERROR: ExhibitDataHTTPTunnelSource: attempting to kill hung connection (type "+tuple.third+") to "+serverURL+": "+conn);
291                        try
292                            {
293                            // Try to wake/interrupt the jammed Thread.
294                            tuple.first.interrupt();
295    
296                            // Attempt to force the connection closed
297                            // and try to disconnect it as it may be jammed.
298                            conn.disconnect();
299                            // Note that attempting to getInputStream()
300                            // seems to provoke JDK 1.6 into attempting to re-connect
301                            // which then results in a deadlock.
302        //                    conn.getInputStream().close();
303        //                    conn.getOutputStream().close();
304                            }
305                        catch(final Throwable t)
306                            {
307                            t.printStackTrace();
308                            }
309                        }
310                    }
311                }
312            finally { _kdcLock.unlock(); }
313            }
314    
315        /**Returns true iff this operation is considered safe for an unauthenticated tunnel client.
316         * This has to allow just enough interaction (for example) for an upload client
317         * and possibly even a minimal read-only light-weight mirror.
318         *
319         * @param opCode  op code to test; never null
320         * @return  true if the operation is generally 'safe'
321         */
322        public static boolean safeTunnelOp(final OpCode opCode)
323            {
324            if(null == opCode) { throw new IllegalArgumentException(); }
325    
326            switch(opCode)
327                {
328                // A small number of "safe" non-write operations are allowed.
329                // This excludes all (read and write) operations on event data.
330                case NOOP:
331                case GetGenProps:
332                case GetAllExhibitProperties:
333                case GetAllExhibitPropertiesDiff:
334                case GetThumbnails:
335                case GetStratum:
336                case GetRawFile: // Note that a master may still impose throttling on most potential clients.
337                    return(true);
338                }
339    
340            return(false);
341            }
342    
343        /**Make an RPC call over HTTP[S] with the given outgoing packet.
344         * This controls concurrency (number of connections to the master).
345         * <p>
346         * If we cannot get a connection, because:
347         * <ul>
348         * <li>we have reached the self-imposed concurrency limit, or
349         * <li>there is a problem at the HTTP level (or in the upstream server),
350         * </ul>
351         * then we will try to time out and throw an IOException
352         * rather than blocking indefinitely.
353         *
354         * @param  packetOut  request packet; never null
355         * @return response packet; never null
356         * @throws IOException  in case of I/O difficulties
357         */
358        @Override
359        protected RawPacket doRPCRaw(final RawPacket packetOut)
360            throws IOException
361            {
362            // Return the result as a complete packet,
363            // having checked the trailer, etc.
364            // We have a normal (short) read timeout.
365    //        if(IsDebug.isDebug) { System.out.println("doRPCRaw("+packetOut+") START for endpoint "+this.serverURL); }
366    //        try
367    //            {
368                final RawPacket result = (RawPacket) _doRPCRawInternal(packetOut, false, false);
369    //            if(IsDebug.isDebug) { System.out.println("doRPCRaw("+packetOut+") RESULT from endpoint "+this.serverURL + " is " + result); }
370                return(result);
371    //            }
372    //        finally
373    //            {
374    //            if(IsDebug.isDebug) { System.out.println("doRPCRaw("+packetOut+") FINISH for endpoint "+this.serverURL); }
375    //            }
376            }
377    
378        /**If true then we enforce MACs when we have a local xfer key.
379         * We may still try to send a request without a MAC (iff we don't have a key),
380         * but we won't accept a response without a MAC unless we have no key.
381         * <p>
382         * All but a safe subset of ops is rejected with an OP__RUNTEX (RuntimeException)
383         * on incoming requests without a verifiable MAC.
384         */
385        public static final boolean ENFORCE_MAC = true;
386    
387        /**Record of unique response packet IDs around our acceptance window; never null.
388         * We use the final MAC segment ID, as that covers the entire frame.
389         * <p>
390         * We actually remember IDs for about twice the age implied by the skew
391         * so that if our clock is wrong by, or slips by, the maximum skew,
392         * we won't start admitting very old duplicate messages.
393         * <p>
394         * We should only ad IDs if possible when we are already fairly sure of the source
395         * and when we have, for example, already checked for acceptable skew,
396         * to make any sort of DoS attack against us harder.
397         * <p>
398         * We should reject otherwise-acceptable messages that have an ID
399         * already present in this Map.
400         * <p>
401         * FIXME: compute a sensible upper bound on size to prevent replays and all our memory being eaten.
402         * <p>
403         * Thread-safe.
404         */
405        private final DuplicateIDChecker<ROByteArray> messageIDs =
406            DuplicateIDChecker.<ROByteArray>create(2*CoreConsts.MAX_PEER_CLOCK_SKEW_MS, "ExhibitDataHTTPTunnelSource.messageIDs");
407    
408        /**If true, then limit outgoing connections by RPC type when busy.
409         * This may help prevent a misbehaviour of one system component
410         * shutting out communications for the rest of the system.
411         */
412        private static final boolean LIMIT_BUSY_CONS_BY_RPC_TYPE = true;
413    
414        /**Make an RPC call over HTTP[S] with the given outgoing packet.
415         * This controls concurrency (number of connections to the master).
416         * <p>
417         * If we cannot get a connection, because:
418         * <ul>
419         * <li>we have reached the self-imposed concurrency limit, or
420         * <li>there is a problem at the HTTP level (or in the upstream server),
421         * </ul>
422         * then we will eventually time out and throw an IOException
423         * rather than blocking indefinitely.
424         * <p>
425         * If we have a data-protecting "XferKey" then we always send a MAC.
426         * This enables the receiver to detect more errors in transmission.
427         *
428         * @param packetOut  request packet; never null
429         * @param asStream  if true, return as streamed result rather than packet
430         * @param allowBigReadTimeout if true then the RPC may take a lot of work at the master
431         *     so we should be prepared to wait a long time before timing out
432         *
433         * @return response packet; never null
434         * @throws IOException  in case of I/O difficulties
435         */
436        private Object _doRPCRawInternal(final RawPacket packetOut,
437                                         final boolean asStream,
438                                         final boolean allowBigReadTimeout)
439            throws IOException
440            {
441            assert(packetOut != null);
442    
443            // Kill any dead connections that we find
444            // if we seem to be short of resources.
445            final int availablePermits = countingSemaphore.availablePermits();
446            if(USE_WATCHDOG && (availablePermits <= 0))
447                { killDeadConnections(); }
448    
449            // Consider limiting connection by RPC operation type
450            // for fairness and robustness.
451            if(LIMIT_BUSY_CONS_BY_RPC_TYPE && (MAX_CONCURRENT_CONNECTIONS > 1) &&
452                    (availablePermits <= MAX_CONCURRENT_CONNECTIONS/2))
453                {
454                // Only about half the maximum-permitted connections are free,
455                // so prevent further outgoing connections for RPC types
456                // of connections already in place.
457                //
458                // There are some slight race hazards here,
459                // ie killing some innocent victim RPCs
460                // when we're not as busy as we thought,
461                // but random early connection killing
462                // isn't necessarily too bad anyway when getting busy.
463                final RawPacket.OpCode type = packetOut.opCode;
464                // Guaranteed to be (thread-)safe to iterate over openConnections.
465                for(final Tuple.Triple<Thread,Long,RawPacket.OpCode> connDetails : openConnections.values())
466                    {
467                    if(connDetails.third == type)
468                        {
469                        // Log this selective veto for now (maybe mute later).
470    logger.log("WARNING: ExhibitDataHTTPTunnelSource: _doRPCRawInternal(): HTTP tunnel getting busy to "+serverURL+" ("+MAX_CONCURRENT_CONNECTIONS+" concurrent connections max) for opcode#"+packetOut.opCode);
471                        // Veto extra connection of type already in progress...
472                        throw new IOException("HTTP tunnel getting busy to "+serverURL+" ("+MAX_CONCURRENT_CONNECTIONS+" concurrent connections max) for opcode#"+packetOut.opCode);
473                        }
474                    }
475                }
476    
477            try
478                {
479                if(ALLOW_CONNECTION_QUEUEING)
480                    {
481                    // Only wait to obtain a permit for something like
482                    // the time that we allow for an HTTP connection.
483                    // This avoids unexpectedly blocking users for a long time,
484                    // though if there really is contention then it should probably be
485                    // handled properly at a higher level.
486                    if(!countingSemaphore.tryAcquire(DEFAULT_HTTP_CONN_TIMEOUT_MS/2,
487                                                         TimeUnit.MILLISECONDS))
488                        {
489                        logger.log("_doRPCRawInternal(): timeout while waiting to acquire HTTP tunnel permit to "+serverURL);
490                        throw new IOException("timeout while waiting to acquire HTTP tunnel permit to "+serverURL+" ("+MAX_CONCURRENT_CONNECTIONS+" concurrent connections max) for opcode#"+packetOut.opCode+"; queue length = " + countingSemaphore.getQueueLength());
491                        }
492                    }
493                else
494                    {
495                    // Baulk/quit if a connection is not immediately available.
496                    if(!countingSemaphore.tryAcquire(0, TimeUnit.SECONDS))
497                        {
498                        logger.log("_doRPCRawInternal(): HTTP tunnel busy to "+serverURL);
499                        throw new IOException("HTTP tunnel busy to "+serverURL+" ("+MAX_CONCURRENT_CONNECTIONS+" concurrent connections max) for opcode#"+packetOut.opCode+"; queue length = " + countingSemaphore.getQueueLength());
500                        }
501                    }
502                }
503            catch(final InterruptedException e)
504                { throw new IOException("thread interrupted waiting to start HTTP tunnel connection"); }
505            try
506                {
507    if(_protocolDebug) { System.err.println("[_doRPCRawInternal(): connecting upstream: " + serverURL + ".]"); }
508    
509                // Construct a HTTP connection to the master server.
510                // Hopefully this will be transparently pooled by the JVM.
511                final HttpURLConnection connection =
512                    (HttpURLConnection) serverURL.openConnection();
513    
514                // Set an explicit fairly short connection timeout
515                // so as to be intolerant of poor Net connectivity,
516                // ie don't attempt to use a connection with very poor behaviour.
517                // We don't set a very long read timeout as some calls
518                // may take a lot of remote work before they can start to reply,
519                // but we'll still manage to escape a call that blocks completely.
520                if(!CoreConsts.AVOID_UNSAFE_TCP_TIMEOUTS)
521                    {
522                    connection.setConnectTimeout(DEFAULT_HTTP_CONN_TIMEOUT_MS);
523                    // Unless we are doing a very slow and/or CPU-expensive RPC,
524                    // use something more like the connection timeout if appropriate.
525                    connection.setReadTimeout(allowBigReadTimeout ? DEFAULT_HTTP_MAX_READ_TIMEOUT_MS :
526                        Math.min(DEFAULT_HTTP_MAX_READ_TIMEOUT_MS, 2*DEFAULT_HTTP_CONN_TIMEOUT_MS));
527                    }
528    
529                // Don't get any data from cache: must be a live connection!
530                connection.setUseCaches(false);
531                // Exact given URL must be valid...
532                connection.setInstanceFollowRedirects(false);
533                // Allow bidirectional communication.
534                connection.setDoOutput(true);
535                connection.setDoInput(true);
536                // Don't allow user interaction.
537                connection.setAllowUserInteraction(false);
538    
539                // We know what the content length will be
540                // so set it here to allow some HTTP performance optimisations.
541                final int contentLength = packetOut.getFrameLength();
542                connection.setRequestProperty("Content-Length", String.valueOf(contentLength));
543                connection.setFixedLengthStreamingMode(contentLength);
544    
545                // If authentication/check data is available,
546                // then apply it to the request.
547                final String uID = userID;
548                final String pwd = passwd;
549                if((uID != null) && (pwd != null))
550                    {
551                    connection.setRequestProperty(HEADER_AUTH_ID, uID);
552                    connection.setRequestProperty(HEADER_AUTH_PW, pwd);
553                    }
554                // Apply a MAC to our outbound request for extra error-detection if we can.
555                SecretKey xk = null;
556                try
557                    {
558                    xk = LocalProps.getXferHMACKey();
559                    if(xk != null) // Need a key to create the MAC.
560                        {
561                        try
562                            {
563                            final PacketProtector pp = new PacketProtector(packetOut, xk);
564                            final String checkString = pp.toCheckString();
565    if(IsDebug.isDebug) { logger.log("INFO: applying request MAC: " + checkString); }
566                            connection.setRequestProperty(HEADER_MAC, checkString);
567                            }
568                        catch(final InvalidKeyException e)
569                            {
570                            // In case of error, don't apply the MAC header, but do complain loudly!
571                            e.printStackTrace();
572                            }
573                        }
574                    }
575                catch(final Exception e1)
576                    {
577                    // Absorb/log any other exception that we encounter,
578                    // but attempt to continue.
579                    e1.printStackTrace();
580                    }
581    
582                // Do simple HTTP security/sanity check...
583                assert(!connection.usingProxy());
584    
585    if(_protocolDebug) { System.err.println(" [_doRPCRawInternal(): about to writePacket().]"); }
586    
587                if(USE_WATCHDOG)
588                    {
589                    final long stopBy = System.currentTimeMillis() +
590                                        DEFAULT_HTTP_CONN_TIMEOUT_MS +
591                                        DEFAULT_HTTP_MAX_READ_TIMEOUT_MS;
592                    openConnections.put(connection, new Tuple.Triple<Thread,Long,RawPacket.OpCode>(Thread.currentThread(), new Long(stopBy), packetOut.opCode));
593                    }
594    
595                try
596                    {
597                    // Send the output (request) packet, compressed if appropriate...
598                    final OutputStream os = connection.getOutputStream();
599                    try { packetOut.writePacket(os); }
600                    // Release resources.
601                    finally { os.close(); }
602    
603                    if(_protocolDebug) { System.err.println(" [_doRPCRawInternal(): about to open input/response stream.]"); }
604    
605                    final int responseCode = connection.getResponseCode();
606                    if(responseCode != 200)
607                        { throw new IOException("ERROR: RPC failed: HTTP POST response code "+responseCode); }
608    
609                    // Look for a MAC header on the response.
610                    // Veto obviously-broken communications, even if we don't have a key.
611                    PacketProtector pp = null;
612                    final String hdrMAC = connection.getHeaderField(HEADER_MAC);
613                    if(null != hdrMAC)
614                        {
615                        try { pp = PacketProtector.fromCheckString(hdrMAC); }
616                        catch(final IllegalArgumentException e) { logger.log("garbled MAC on response"); }
617    if(IsDebug.isDebug) { logger.log("INFO: received response MAC: " + pp); }
618    
619                        // Veto responses with a definitely-wrong length.
620                        // (For now just warn about a missing Content-Length.)
621                        final int contentLengthRcvd = connection.getContentLength();
622                        if(contentLengthRcvd == -1)
623                            { logger.log("WARNING: missing Content-Length on response in tunnel"); }
624                        else if(pp.length != contentLengthRcvd)
625                            { throw new IOException("ERROR: wrong-length (or missing Content-Length) response received"); }
626                        // Check that the remote peer's clock is not vastly wrong
627                        // (or that a very old packet is reappearing out of nowhere).
628                        // We assume that the skew allowance is generous enough to cover
629                        // actual transit time of messages in the network.
630                        final long now = System.currentTimeMillis();
631                        if((pp.timestamp < now - CoreConsts.MAX_PEER_CLOCK_SKEW_MS) ||
632                           (pp.timestamp > now + CoreConsts.MAX_PEER_CLOCK_SKEW_MS))
633                           { throw new IOException("ERROR: discarded response with bad timestamp: check local and peer clocks"); }
634                        // Veto duplicate packets/requests in this tunnel.
635                        if(messageIDs.add(pp.mac.get(pp.mac.size()-1)) != null)
636                            { throw new IOException("ERROR: duplicate response received"); }
637                        }
638    
639                    if(ENFORCE_MAC && ((xk != null) && (pp == null)))
640                        {
641                        logger.log("ERROR: did not receive MAC on RPC "+packetOut.opCode+" response from "+serverURL+" so discarding because unsafe: MAC hdr="+hdrMAC);
642                        throw new IOException("RPC "+packetOut.opCode+" response from "+serverURL+" not protected by MAC, thus not safe to use");
643                        }
644    
645                    // Get the response input stream.
646                    // If we have the XferKey and a MAC then always verify the input stream.
647                    // This protects all of the input data, including the RawPacket headers.
648                    // Using the stream mechanism helps spread the verification load,
649                    // as well as transparently permitting streamed handling of responses.
650                    final InputStream isRaw = connection.getInputStream();
651                    final InputStream is = ((pp == null) || (xk == null)) ? isRaw :
652                        pp.protectInputStream(xk, isRaw);
653    
654                    // Return the input stream for a streamed response.
655                    if(asStream)
656                        { return(is); }
657    
658                    try
659                        {
660                        if(_protocolDebug) { System.err.println(" [_doRPCRawInternal(): about to readPacket().]"); }
661    
662                        final RawPacket result = RawPacket.readPacket(is);
663    
664                        if(_protocolDebug) { System.err.println(" [_doRPCRawInternal(): about to return result.]"); }
665    
666                        // Return result successfully.
667                        return(result);
668                        }
669                    finally
670                        {
671                        // Explicitly close the input stream now that we've read the packet.
672                        is.close();
673    
674                        // Try to discard the connection if that is our policy.
675                        if(!KEEP_SERVER_CONNECTION_ALIVE) { connection.disconnect(); }
676                        }
677                    }
678                // Attempt to discard a possibly-broken (pool) connection...
679                catch(final IOException e) { connection.disconnect(); throw e; }
680                // Remove connection from pool being monitored...
681                finally
682                    {
683                    // Record that the connection has been closed
684                    // (or is not being handled by the caller now).
685                    if(USE_WATCHDOG) { openConnections.remove(connection); }
686                    }
687                }
688            finally { countingSemaphore.release(); }
689            }
690    
691        // Inherit javadoc comment...
692        @Override
693        protected Tuple.Pair<Integer, InputStream> doRPCRawWithStreamResponse(final RawPacket packetOut,
694                                                                              final boolean allowBigReadTimeout)
695            throws IOException
696            {
697            // Get the raw input/response stream...
698            final InputStream is = (InputStream) _doRPCRawInternal(packetOut, true, allowBigReadTimeout);
699    
700            // Start reading/decoding the raw packet header.
701            // Read the opcode (byte) header field.
702            final int ch = is.read();
703            if(ch < 0) { throw new EOFException(); }
704            final byte opC = (byte) ch;
705    
706            // Check for mismatched response packets
707            // or remote exceptions codes with special response op-codes.
708            if(opC != packetOut.opCode.getCode())
709                {
710                // Deal specially with some fixed remote exception types.
711                switch(opC)
712                    {
713                    case RawPacket.OP__PGMNISEX:
714                        { throw new PGMasterNotInServiceException(); }
715                    case RawPacket.OP__RUNTEX:
716                        { throw new RuntimeException("remote threw RuntimeException"); }
717                    case RawPacket.OP__REMEX:
718                        { throw new IOException("remote threw RemoteException"); }
719                    case RawPacket.OP__INTEX:
720                        { throw new InterruptedIOException("remote operation interrupted"); }
721                    }
722                throw new IOException("ExhibitDataHTTPTunnelSource.doRPCRawWithStreamResponse(): packet response-type mismatch; got "+opC+" expected "+packetOut.opCode);
723                }
724    
725            // Read the (int) length header field.
726            final int ch1 = is.read();
727            final int ch2 = is.read();
728            final int ch3 = is.read();
729            final int ch4 = is.read();
730            if((ch1 | ch2 | ch3 | ch4) < 0) { throw new EOFException(); }
731            final int rawLen = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
732    
733            // In the simplest (and fairly common) case of an empty response,
734            // for efficiency and robustness we close the original response stream immediately
735            // and return an empty immutable response to the caller.
736            if(rawLen == 0)
737                {
738                // Double-check that we find our trailer (and nothing else)
739                // after the empty payload.
740                int ic;
741                if(((byte)(ic = is.read())) != RawPacket.TRAILER) { throw new IOException("missing frame trailer byte, got "+ic+": op "+opC); }
742                if((ic = is.read()) != -1) { throw new IOException("garbage ("+ic+") present after frame trailer rather than EOF"); }
743    
744                is.close(); // Immediate HTTP stream close to release resources ASAP.
745                return(EMPTY_RESPONSE_STREAM);
746                }
747    
748            // Check that the reported packet length is within bounds.
749            if((rawLen > RawPacket.MAX_PAYLOAD_SIZE) ||
750               (rawLen < -RawPacket.MAX_PAYLOAD_SIZE))
751                {
752                is.close();
753                throw new IOException("Bad (huge) packet length");
754                }
755    
756            final boolean isCompressed = (rawLen < 0);
757    
758            // In the simple (uncompressed-data) case we return the stream as-is.
759            // This leaves the trailer byte on the stream visible to the caller,
760            // but the true length of the data payload is returned too.
761            if(!isCompressed)
762                { return(new Tuple.Pair<Integer, InputStream>(Integer.valueOf(rawLen), is)); }
763    
764            // We wrap the underlying input in our custom deflater stream.
765            // We do not know the final result length so return a null length value.
766            // This leaves the trailer byte on the stream visible to the deflater,
767            // but the deflater will not use its value though may read it.
768            // We wrap the stream in a non-aggressive data pump
769            // to improve throughput for large results.
770            final InputStream cis = GenUtils.dataPump(new DefInputStream(is, true), 1<<16, false);
771            return(new Tuple.Pair<Integer, InputStream>(null, cis));
772            }
773    
774        /**Empty, immutable/uncloseable InputStream that we can return to all callers. */
775        private static final InputStream EMPTY_INPUTSTREAM = new ByteArrayInputStream(new byte[0]);
776    
777        /**Empty, immutable/uncloseable response stream that we can return to all callers. */
778        private static final Pair<Integer, InputStream> EMPTY_RESPONSE_STREAM = new Tuple.Pair<Integer, InputStream>(Integer.valueOf(0), EMPTY_INPUTSTREAM);
779    
780        /**Address to bind to for outbound connections, or null if any available address is OK.
781         * At the moment we only accept this in a backdoor setting via a system property
782         * (and don't complain if property access is not allowed).
783         * <p>
784         * If using this, eg with <code>-Dorg.hd.pg2k.tunnelLocalAddr=85.158.46.82</code>
785         * (and if HTTPClient V0.3-3 is still in use for these 'local' connections)
786         * then it may be wise to work round a bug with <code>-DHTTPClient.disableKeepAlives=true</code>.
787         */
788        private static final InetAddress localAddr;
789        /**Initialise localAddr. */
790        static
791            {
792            String rawLocalAddr = null;
793            try { rawLocalAddr = System.getProperty("org.hd.pg2k.tunnelLocalAddr"); }
794            catch(final SecurityException e) { /* Silently ignore. */ }
795            if(rawLocalAddr == null) { localAddr = null; }
796            else
797                {
798                InetAddress lA = null;
799                try { lA = InetAddress.getByName(rawLocalAddr); }
800                catch(final UnknownHostException e) { }
801                localAddr = lA;
802                }
803            }
804    
805        /**Short upstream name for peer; never null but can be "". */
806        private final String shortUpstreamName;
807    
808        /**Return short unique name of upstream peer/server suitable for Stratum; never null but can be "" for 'unknown'.
809         * Override this in implementations that know the upstream name.
810         */
811        protected String _getStratumUpstreamName()
812            { return(shortUpstreamName); }
813    
814        /**Construct a new HTTP tunnel data source to the given URL.
815         * Does not necessarily try to build the tunnel immediately.
816         *
817         * @param upstreamURL  URL to reach the server's tunnel,
818         *     of the form http://<i>serverName</i>/tunnelMountPoint; never null
819         *
820         * @throws IllegalArgumentException  if the URL is invalid or unusable.
821         */
822        public ExhibitDataHTTPTunnelSource(final String upstreamURL,
823                                           final String shortUpstreamName,
824                                           final SimpleLoggerIF logger)
825            {
826            super(logger);
827    
828            if(upstreamURL == null) { throw new IllegalArgumentException("null upstreamURL not allowed"); }
829            if(shortUpstreamName == null) { throw new IllegalArgumentException("null shortUpstreamName not allowed"); }
830    
831            this.shortUpstreamName = shortUpstreamName;
832    
833            // If we want to bind outgoing connections to a particular local address
834            // then we have to use a custom URLStreamHandler.
835            final URLStreamHandler handler = (localAddr == null) ? null :
836                (new URLStreamHandler(){
837                    @Override
838                    public URLConnection openConnection(final URL url)
839                        throws IOException, ProtocolNotSuppException
840                        {
841                        return(new HTTPClient.HttpURLConnection(url) {
842                            /**Overrides mechanism to get new connection.
843                             * Lookup key and connection use the specified local address.
844                             */
845                            @Override
846                            protected HTTPConnection getConnection(final URL url)
847                                throws ProtocolNotSuppException
848                                {
849                                // try the cache, using the host name, port, etc.
850                                final String php = url.getProtocol() + ":" + url.getHost().toLowerCase() + ":" +
851                                         ((url.getPort() != -1) ? url.getPort() :
852                                                URI.defaultPort(url.getProtocol())) +
853                                          " from " + localAddr.getHostAddress();
854    
855                                HTTPConnection con = (HTTPConnection) connections.get(php);
856                                if (con != null) {
857                                    return con;
858                                    }
859    
860                                // Not in cache, so create new one and cache it.
861                                con = new HTTPClient.HTTPConnection(url.getProtocol(),
862                                                         url.getHost(), url.getPort(),
863                                                         localAddr, 0); /* Zero local port implies 'any'. */
864                                connections.put(php, con);
865    
866                                return con;
867                                }
868                            });
869                        }
870                });
871    
872            try { serverURL = new URL(null, upstreamURL, handler); }
873            catch(final MalformedURLException e)
874                { throw new IllegalArgumentException("invalid masterURL \""+upstreamURL+"\": " + e.getMessage()); }
875    
876            if(_protocolDebug)
877                {
878                System.out.println("ExhibitDataHTTPTunnelSource("+upstreamURL+") instance created");
879                }
880            }
881    
882        /**Poll periodically.
883         * We call super.poll(gp) to ensure that base-class polling work gets done,
884         * including any upstream poll() calls.
885         */
886        @Override
887            public void poll(final GenProps gp)
888            {
889            try
890                {
891                // Kill any dead connections that we find.
892                if(USE_WATCHDOG) { killDeadConnections(); }
893    
894                final int permits = countingSemaphore.availablePermits();
895                if(permits <= 0)
896                    {
897                    logger.log("ExhibitDataHTTPTunnelSource: WARNING: full HTTP tunnel to "+serverURL+": queue="+(countingSemaphore.getQueueLength())+".");
898                    }
899                else if(permits <= MAX_CONCURRENT_CONNECTIONS/2)
900                    {
901                    logger.log("ExhibitDataHTTPTunnelSource: WARNING: busy HTTP tunnel to "+serverURL+".");
902                    }
903                }
904            finally { super.poll(gp); }
905            }
906        }