001 /*
002 Copyright (c) 1996-2012, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029
030 package org.hd.d.pg2k.clApp.atHome;
031
032 import java.awt.Dimension;
033 import java.io.ByteArrayInputStream;
034 import java.io.ByteArrayOutputStream;
035 import java.io.DataInputStream;
036 import java.io.EOFException;
037 import java.io.File;
038 import java.io.FileInputStream;
039 import java.io.FileReader;
040 import java.io.IOException;
041 import java.io.InputStreamReader;
042 import java.io.LineNumberReader;
043 import java.io.OutputStreamWriter;
044 import java.io.PrintWriter;
045 import java.lang.ref.SoftReference;
046 import java.net.HttpURLConnection;
047 import java.net.MalformedURLException;
048 import java.net.URL;
049 import java.net.URLEncoder;
050 import java.nio.ByteBuffer;
051 import java.util.ArrayList;
052 import java.util.BitSet;
053 import java.util.Collection;
054 import java.util.Collections;
055 import java.util.HashMap;
056 import java.util.HashSet;
057 import java.util.List;
058 import java.util.Map;
059 import java.util.Properties;
060 import java.util.Set;
061 import java.util.concurrent.ConcurrentHashMap;
062 import java.util.concurrent.SynchronousQueue;
063 import java.util.concurrent.ThreadPoolExecutor;
064 import java.util.concurrent.TimeUnit;
065 import java.util.concurrent.atomic.AtomicBoolean;
066 import java.util.concurrent.atomic.AtomicInteger;
067 import java.util.concurrent.atomic.AtomicReference;
068
069 import javax.jnlp.BasicService;
070 import javax.jnlp.ExtendedService;
071 import javax.jnlp.FileOpenService;
072 import javax.jnlp.PersistenceService;
073 import javax.jnlp.ServiceManager;
074 import javax.jnlp.SingleInstanceService;
075 import javax.jnlp.UnavailableServiceException;
076
077 import org.hd.d.pg2k.ai.scorer.AbstractScorer;
078 import org.hd.d.pg2k.ai.scorer.MiniScorerCacheImpl;
079 import org.hd.d.pg2k.ai.scorer.ScoreAndConf;
080 import org.hd.d.pg2k.ai.scorer.ScorerCreator;
081 import org.hd.d.pg2k.ai.scorer.ScorerCreator.ScorerWork;
082 import org.hd.d.pg2k.svrCore.AllExhibitImmutableData;
083 import org.hd.d.pg2k.svrCore.AllExhibitProperties;
084 import org.hd.d.pg2k.svrCore.CoreConsts;
085 import org.hd.d.pg2k.svrCore.ExhibitName;
086 import org.hd.d.pg2k.svrCore.ExhibitStaticAttr;
087 import org.hd.d.pg2k.svrCore.ExhibitThumbnails;
088 import org.hd.d.pg2k.svrCore.FileTools;
089 import org.hd.d.pg2k.svrCore.GenUtils;
090 import org.hd.d.pg2k.svrCore.MemoryTools;
091 import org.hd.d.pg2k.svrCore.Name;
092 import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
093 import org.hd.d.pg2k.svrCore.Rnd;
094 import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
095 import org.hd.d.pg2k.svrCore.Stratum;
096 import org.hd.d.pg2k.svrCore.ThreadUtils;
097 import org.hd.d.pg2k.svrCore.Tuple;
098 import org.hd.d.pg2k.svrCore.Tuple.Pair;
099 import org.hd.d.pg2k.svrCore.MIME.ExhibitMIME;
100 import org.hd.d.pg2k.svrCore.MIME.ExhibitMIME.ExhibitTypeParameters;
101 import org.hd.d.pg2k.svrCore.MIME.Handler;
102 import org.hd.d.pg2k.svrCore.datasource.SimpleExhibitPipelineIF;
103 import org.hd.d.pg2k.svrCore.props.GenProps;
104 import org.hd.d.pg2k.svrCore.vars.EventPeriod;
105 import org.hd.d.pg2k.svrCore.vars.EventVariableValue;
106 import org.hd.d.pg2k.svrCore.vars.SimpleVariableDefinition;
107 import org.hd.d.pg2k.svrCore.vars.SimpleVariableValue;
108
109 import ORG.hd.d.IsDebug;
110
111 /**Core of at-home program: suitable to drive from a bare command-line or better.
112 * This can also form the core of a more-sophisticated GUI-based (eg JWS) app.
113 *
114 * @author dhd
115 */
116 public final class AHStandaloneMain
117 {
118 /**Default logger: writes to stdout. */
119 public static final SimpleLoggerIF defaultLogger = GenUtils.systemOutLogger;
120
121 /**Default server host name to connect back to; the generic/main host in this case. */
122 private static final String DEFAULT_SERVER = CoreConsts.MAIN_DATA_HOST;
123
124 /**Default target fractional CPU usage (of entire host) in exclusive range ]1,100[, ie strictly positive.
125 * Generally much less than 100% to avoid straining a user system,
126 * eg if overclocked or undercooled,
127 * and to get reasonable computational return for energy used
128 * amortising the base power consumption of the host machine.
129 */
130 public static final int DEFAULT_CPU_PERCENT = 70;
131
132 /**Default target CPU work chunk in ms; strictly positive.
133 * A value from a few hundred milliseconds to a few tens of seconds
134 * probably keeps overheads to a reasonably low proportion of CPU time,]
135 * and allows slow methods such as minimisation a chance to work.
136 * <p>
137 * A sub-1s value may look less weird/lumpy on a user's CPU meter
138 * (given a typical 1s sampling on GUI performance meters).
139 */
140 public static final int DEFAULT_CPU_CHUNK_MS = 500;
141
142 /**If true then attempt to use a primitive filesystem thumbnail/data cache where permitted.
143 * This will not be permitted in a JWS environment for example.
144 * <p>
145 * This may save a great deal of bandwidth avoiding refetching thumbnails.
146 * <p>
147 * The cache may also be prepopulated for stand-alone tests.
148 * <p>
149 * This cache is flat, ie contains no subdirectories,
150 * and will not be used unless already extant with appropriate permissions.
151 * <p>
152 * This may need to be periodically manually cleared of stale/old/bulk/corrupt data,
153 * and may become inefficient if it gains many entries.
154 */
155 private static final boolean ALLOW_FILESYSTEM_TNCACHE = true;
156 /**Default directory in which cache thumbnail/playback data; always relative to pwd/cwd. */
157 private static final String DEFAULT_TNCACHE_DIR = "_tncache.dat";
158
159 /**If true then save/record calibration data received from server.
160 * Data is saved into a fixed directory where it can easily be retrieved
161 * for playback (no-network) mode.
162 * <p>
163 * Not viable/safe for JWS mode.
164 * <p>
165 * Should not be used (true) when PLAYBACK_NO_NET is true.
166 */
167 private static final boolean PLAYBACK_SAVE_DATA = true;
168 /**If true then use playback calibration/thumbnail data rather than attempting to connect to the server.
169 * This is for stand-alone regression/performance/JVM testing with no network access.
170 */
171 private static final boolean PLAYBACK_NO_NET = false;
172
173 /**Command-line start-up.
174 * Takes no arguments.
175 * <p>
176 * Writes any log/status information to stdout.
177 * <p>
178 * Defaults to take ~50% of available CPU.
179 *
180 * @param args an optional first argument is interpretted as serverhost[:port]
181 */
182 public static void main(final String[] args)
183 {
184 defaultLogger.log("Gallery command-line 'at home' driver... ^C or equivalent to stop...");
185 if(PLAYBACK_NO_NET) { defaultLogger.log("RUNNING IN DISCONNECTED (NO-NETWORK) TEST MODE..."); }
186 // Start work, connecting back to the master server.
187 final String server = ((args == null) || (args.length == 0) || (args[0] == null)) ?
188 DEFAULT_SERVER : args[0];
189 final AHStandaloneMain worker = new AHStandaloneMain(server, defaultLogger, DEFAULT_CPU_PERCENT, DEFAULT_CPU_CHUNK_MS);
190 worker.doWork();
191 defaultLogger.log("Worker finished: goodbye.");
192 }
193
194 /**Get filesystem-based thumbnail/data cache directory, or null if none. */
195 private File getCacheDir()
196 {
197 // Do not attempt to use cache if forbidden.
198 if(!ALLOW_FILESYSTEM_TNCACHE) { return(null); }
199 // Do not attempt to use filesystem cache in JWS mode.
200 if(bs != null) { return(null); }
201 // Do not attempt to use cache dir if not already present and r/w.
202 final File dir = new File(DEFAULT_TNCACHE_DIR);
203 if(!dir.isDirectory() || !dir.canRead() || !dir.canWrite())
204 {
205 if(IsDebug.isDebug) { log.log("Local cache dir not available: "+dir); }
206 return(null);
207 }
208 // Looks to be usable.
209 return(dir);
210 }
211
212 /**Fake data source cobbled together to just support the actions that some Scorers and their support need.
213 * Blocks all other activity, and does not have a real tunnel back to the server at all.
214 * <p>
215 * This fakes the fetching of thumbnails (on demand) using their normal URLs.
216 */
217 private static final class FakeTunnel implements SimpleExhibitPipelineIF
218 {
219 /**Construct an instance, given the hostname and optional port. */
220 FakeTunnel(final String hostnameAndOptionalPort,
221 final SimpleLoggerIF log,
222 final File cacheDir)
223 {
224 if((hostnameAndOptionalPort == null) || (log == null))
225 { throw new IllegalArgumentException(); }
226 this.hostnameAndOptionalPort = hostnameAndOptionalPort;
227 this.log = log;
228 this.cacheDir = cacheDir;
229 }
230 /**Hostname of our server to fetch thumbnails from; never null. */
231 private final String hostnameAndOptionalPort;
232 /**Logger; never null. */
233 private final SimpleLoggerIF log;
234 /**Handle on local cache dir; null if none. */
235 private final File cacheDir;
236 /**Immutable (atomically-replacable) current set of full exhibit names that we support; never null. */
237 private volatile Set<String> supportedExhibits = Collections.emptySet();
238 /**Set/update the current set of full exhibit names. */
239 void setSupportedExhibits(final Set<String> s)
240 {
241 assert(s != null);
242 // If nothing has changed then return immediately.
243 if(s.equals(supportedExhibits)) { return; }
244 // Take safe defensive immutable copy.
245 final Set<String> safeCopy = Collections.unmodifiableSet(new HashSet<String>(s));
246 // Atomically store immutable copy of argument set.
247 supportedExhibits = safeCopy;
248 // Create new AEID with fake static attributes.
249 final long now = System.currentTimeMillis();
250 final Set<ExhibitStaticAttr> esas = new HashSet<ExhibitStaticAttr>(2*s.size());
251 for(final String n : safeCopy)
252 { esas.add(new ExhibitStaticAttr(n, 1, now)); }
253 // Atomically replace stored aeid with new synthetic value.
254 aeid = new AllExhibitImmutableData(esas, esas.isEmpty() ? 0 : now);
255 // Throw away any thumbnails no longer in the calibration set.
256 _tnCache.keySet().retainAll(safeCopy);
257 _tnNotFetchBefore.keySet().retainAll(safeCopy);
258 }
259 public void destroy() { /* Does nothing. */ }
260 /**Cache of AEID value; never null. */
261 private volatile AllExhibitImmutableData aeid = new AllExhibitImmutableData();
262 public AllExhibitImmutableData getAllExhibitImmutableData(final long oldStamp) throws IOException
263 {
264 final long hash = aeid.timestamp;
265 if(oldStamp == hash) { return(null); }
266 return(aeid); // Return synthetic aeid.
267 }
268 /**AEP is too expensive (large, slow) to fetch and is more than we need, so forbid it. */
269 // TODO: we might relent and synthesise one...
270 public AllExhibitProperties getAllExhibitProperties(final long oldHash) throws IOException
271 {
272 (new Throwable("UNWANTED AEP FETCH")).printStackTrace();
273 throw new UnsupportedOperationException("NOT ALLOWED: too expensive so use AEID");
274 }
275 /**Empty GP value; never null. */
276 private final GenProps gp = new GenProps();
277 public GenProps getGenProps(final long oldStamp) throws IOException
278 {
279 final long ts = gp.timestamp;
280 if(oldStamp == ts) { return(null); }
281 return(gp); // Return fixed/empty gp.
282 }
283 public Properties getGenSecProps(final long oldStamp) throws IOException
284 { throw new IOException("NOT ALLOWED"); }
285 public Properties getProperties(final PropsKey key, final long versionID) throws IOException
286 { throw new IOException("NOT ALLOWED"); }
287 public void getRawFile(final ByteBuffer buf, final Name.ExhibitFull exhibitName, final int position, final boolean dontCache) throws IOException
288 { throw new IOException("NOT ALLOWED"); /* May wish to allow this again. */ }
289 public ExhibitStaticAttr getStaticAttr(final ExhibitFull name) throws IOException
290 { throw new IOException("NOT ALLOWED"); }
291 public void poll(final GenProps gp) throws IOException { }
292 public void syncVariables(final boolean force) throws IOException
293 { throw new IOException("NOT ALLOWED"); }
294 /**Always return empty values. */
295 public EventVariableValue getEventValue(final SimpleVariableDefinition def, final EventPeriod intervalSelector, final boolean current)
296 {
297 final long now = System.currentTimeMillis();
298 final long currentIntervalNumber = intervalSelector.getIntervalNumber(now);
299 final long intervalNumber = (current ? currentIntervalNumber : (currentIntervalNumber-1));
300 return(new EventVariableValue(false,
301 def,
302 intervalSelector,
303 intervalNumber,
304 0, null, null));
305 }
306 public EventVariableValue[] getEventValues(final SimpleVariableDefinition def, final EventPeriod intervalSelector, final long intervalNumber, final BitSet whichValues)
307 { return(new EventVariableValue[0]); }
308 public SimpleVariableValue getVariable(final SimpleVariableDefinition var) throws IOException, UnsupportedOperationException
309 { throw new IOException("NOT ALLOWED"); }
310 public SimpleVariableValue[] getVariables(final long changedSince) throws IOException
311 { throw new IOException("NOT ALLOWED"); }
312 public void setVariable(final SimpleVariableValue newValue) throws IOException, UnsupportedOperationException
313 { throw new IOException("NOT ALLOWED"); }
314 public int setVariables(final SimpleVariableValue[] newValues) throws IOException
315 { throw new IOException("NOT ALLOWED"); }
316 public Stratum getStratum() throws IOException
317 { throw new IOException("NOT ALLOWED"); }
318 /**Thumbnail fetch count; watch those calories^Wbytes. */
319 private final AtomicInteger _tnDownloadCount = new AtomicInteger();
320 /**Minimum wait before retrying thumbnail fetch from server, ms; strictly positive. */
321 private static final int MIN_TN_RETRY_MS = 5000;
322 /**Thread-safe cache for time before retry of thumbnail fetch is allowed. */
323 private final Map<Name.ExhibitFull, Long> _tnNotFetchBefore = new ConcurrentHashMap<Name.ExhibitFull, Long>(2*ScorerCreator.MIN_SUGGESTED_CALIB_SET_SIZE);
324 /**Thread-safe thumbnail cache, holding thumbnails for the calibration set. */
325 private final Map<Name.ExhibitFull, ExhibitThumbnails> _tnCache = new ConcurrentHashMap<Name.ExhibitFull, ExhibitThumbnails>(2*ScorerCreator.MIN_SUGGESTED_CALIB_SET_SIZE);
326 /**Serialise thumbnail fetches, ie limit calls back to server, on this lock. */
327 private final Object tnFetchLock = new Object();
328 /**Get std thumbnail; returns null if not currently available.
329 * Fetched only if part of the calibration set; cached too if so.
330 */
331 public ExhibitThumbnails getThumbnails(final ExhibitFull fullName, final boolean create) throws IOException
332 {
333 // Try the cache.
334 final ExhibitThumbnails cachedResult = _tnCache.get(fullName);
335 if(cachedResult != null) { return(cachedResult); }
336
337 // If the requested name is not in the calibration set then return null.
338 if(!supportedExhibits.contains(fullName)) { return(null); }
339
340 // Get the exhibit type from its name.
341 final ExhibitTypeParameters inputFileType = ExhibitMIME.getInputFileType(fullName);
342 if(inputFileType == null) { return(null); }
343 // Get the exhibit handler...
344 final Handler handler = inputFileType.handler;
345 if(handler == null) { return(null); }
346
347 // If we have (too) recently unsuccessfully tried to fetch this thumbnail
348 // then veto this attempt to avoid wasting bandwidth and server patience.
349 final Long notBefore = _tnNotFetchBefore.get(fullName);
350 if((notBefore != null) && (System.currentTimeMillis() < notBefore)) { return(null); }
351
352 DataInputStream dis = null;
353 int tnBytes = -1; // Size of thumbnail in bytes.
354 try
355 {
356 synchronized(tnFetchLock)
357 {
358 // If we have a local filesystem cache,
359 // then try to retrieve the thumbnail from there first.
360 // Thumbnails are cached by short name...
361 final File tnCacheFile = (cacheDir == null) ? null :
362 new File(cacheDir, ExhibitName.getFileComponent(fullName).toString());
363 if(tnCacheFile != null)
364 {
365 // Do we seem to have a viable cached file?
366 if(tnCacheFile.isFile() && tnCacheFile.canRead() && (tnCacheFile.length() > 0))
367 {
368 dis = new DataInputStream(new FileInputStream(tnCacheFile));
369 tnBytes = (int) tnCacheFile.length(); // Validate length later...
370 if(IsDebug.isDebug) { log.log("Reading thumbnail from cache: " + tnCacheFile); }
371 }
372 }
373
374 // If thumbnail was not in local cache then try to fetch it from the server.
375 if((!PLAYBACK_NO_NET) && (dis == null))
376 {
377 // Try to fetch thumbnail from server since it is not in our cache.
378 // We fetch the thumbnail over HTTP just like a browser...
379 // We just get the standard thumbnail, if any, and create a thumbnails object containing it.
380 final URL tnURL = new URL("http://" + hostnameAndOptionalPort + "/_tn/std/" + fullName);
381
382 final HttpURLConnection uc = (HttpURLConnection) tnURL
383 .openConnection();
384 uc.setUseCaches(true); // We'll take any cacheing available to save bandwidth...
385 uc.setRequestProperty("Referer", tnURL.toString());
386 final int responseCode = uc.getResponseCode();
387 if(responseCode == 404)
388 {
389 // If no thumbnail available (permanently) then cache this negative answer.
390 final ExhibitThumbnails thumbnails = ExhibitThumbnails.NO_THUMBNAILS;
391 _tnCache.put(fullName, thumbnails);
392 _tnNotFetchBefore.remove(fullName); // Possibly reclaim a little space...
393 log.log("[No thumbnail for: " + fullName + " available from the server...]");
394 return(thumbnails);
395 }
396 else if(responseCode != 200)
397 { throw new IOException("thumbnail not (yet) available: " + responseCode); }
398 tnBytes = uc.getContentLength();
399 // Avoid fetching thumbnail data if we cannot get a valid thumbnail length.
400 if(tnBytes < 1)
401 { throw new IOException("thumbnail length (" + tnBytes + ") missing or invalid"); }
402 log.log("[Fetching thumbnail #" + _tnDownloadCount.incrementAndGet() + ": " + fullName + " from the server...]");
403 dis = new DataInputStream(uc.getInputStream());
404 }
405
406 // Validate thumbnail length...
407 if((tnBytes < 1) || (tnBytes > ExhibitThumbnails.STD_ABS_MAX_BYTES))
408 { throw new IOException("thumbnail length (" + tnBytes + ") missing or invalid"); }
409
410 final byte[] b = new byte[tnBytes];
411 // Read the thumbnail data from the server.
412 dis.readFully(b);
413 // Extract the image dimensions if possible.
414 // If not then don't allow us to attempt to fetch this thumbnail again.
415 final Dimension dim = handler .get2DImageDimensions(new ByteArrayInputStream(b));
416 final ExhibitThumbnails thumbnails = (dim == null) ? ExhibitThumbnails.NO_THUMBNAILS :
417 ExhibitThumbnails.createExhibitThumbnails(null, new ExhibitThumbnails.Thumbnail(b, dim));
418 // Cache result iff non-null.
419 if(thumbnails != null)
420 {
421 _tnCache.put(fullName, thumbnails);
422 _tnNotFetchBefore.remove(fullName); // Possibly reclaim a little space...
423
424 // If not already there, write raw thumbnail data to local cache.
425 if((tnCacheFile != null) && !tnCacheFile.exists())
426 { FileTools.replacePublishedFile(tnCacheFile.getPath(), b, true); }
427 }
428 else
429 {
430 // Postpone any refetch attempt to give server a chance, avoid wasting bandwidth, etc.
431 // Randomise this a little to reduce request collisions.
432 _tnNotFetchBefore.put(fullName, System.currentTimeMillis() + MIN_TN_RETRY_MS + Rnd.fastRnd.nextInt(MIN_TN_RETRY_MS));
433 log.log("[FAILED to fetch thumbnail, may retry: "+fullName+"...]");
434 }
435 // Return thumbnail, if any.
436 return(thumbnails);
437 }
438 }
439 catch(final IOException e)
440 {
441 // Postpone any refetch attempt to give server a chance, avoid wasting bandwidth, etc.
442 // Add a random element to spread client attempts and extend the delay a little further.
443 _tnNotFetchBefore.put(fullName, System.currentTimeMillis() + MIN_TN_RETRY_MS + Rnd.fastRnd.nextInt(MIN_TN_RETRY_MS));
444 log.log("WARNING: problem fetching thumbnail "+fullName+" from server: " + e.getMessage());
445 throw e;
446 }
447 finally
448 {
449 if(dis != null) { dis.close(); /* Release resources. */ }
450 }
451 }
452 }
453
454 /**Create worker instance with all defaults and the supplied logger.
455 * Does no work; just stores the parameters.
456 * <p>
457 * Package-visible to be exposed only to this package.
458 */
459 AHStandaloneMain(final SimpleLoggerIF log)
460 { this(null, log, DEFAULT_CPU_PERCENT, DEFAULT_CPU_CHUNK_MS); }
461
462 /**Create worker instance with the suppied logger and other parameters.
463 * Does no work; just stores the parameters.
464 * <p>
465 * Package-visible to be exposed only to this package.
466 */
467 AHStandaloneMain(String serverHostname,
468 final SimpleLoggerIF log,
469 final int percentCPU,
470 final int chunkMs)
471 {
472 if((log == null) ||
473 (percentCPU < 0) || (percentCPU > 100) ||
474 (chunkMs <= 0))
475 { throw new IllegalArgumentException(); }
476 this.log = log;
477 this.percentCPU = percentCPU;
478 this.chunkMs = chunkMs;
479
480 // Get access to basic services.
481 BasicService basicService = null;
482 try { basicService = (BasicService) ServiceManager.lookup("javax.jnlp.BasicService"); }
483 catch(final UnavailableServiceException e) { }
484 bs = basicService;
485
486 // Get access to persistence management.
487 PersistenceService persistenceService = null;
488 try { persistenceService = (PersistenceService) ServiceManager.lookup("javax.jnlp.PersistenceService"); }
489 catch(final UnavailableServiceException e) { }
490 ps = persistenceService;
491
492 // Get access to file open services.
493 FileOpenService fileOpenService = null;
494 try { fileOpenService = (FileOpenService) ServiceManager.lookup("javax.jnlp.FileOpenService"); }
495 catch(final UnavailableServiceException e) { }
496 fos = fileOpenService;
497
498 // Get access to single-instance services.
499 SingleInstanceService singleInstanceService = null;
500 try { singleInstanceService = (SingleInstanceService) ServiceManager.lookup("javax.jnlp.SingleInstanceService"); }
501 catch(final UnavailableServiceException e) { }
502 sis = singleInstanceService;
503
504 // Get access to extended services.
505 ExtendedService extendedService = null;
506 try { extendedService = (ExtendedService) ServiceManager.lookup("javax.jnlp.ExtendedService"); }
507 catch(final UnavailableServiceException e) { }
508 exs = extendedService;
509
510 // If no server host name (and optional port) is supplied
511 // then try to extract one from the JNLP runtime
512 // so that we connect back to wherever we were loaded from (sandbox friendly).
513 if(serverHostname == null)
514 {
515 if(bs != null)
516 {
517 final URL codeBase = bs.getCodeBase();
518 serverHostname = codeBase.getHost();
519 final int port = codeBase.getPort();
520 if((port > 0) && (port != 80)) { serverHostname += ':' + Integer.toString(port); }
521 }
522 else { serverHostname = DEFAULT_SERVER; }
523 }
524 this.serverHostname = serverHostname;
525
526 // Create an initial minimum-size thread-pool using
527 // minimum-priority (to be kind to other processes on the host machine)
528 // daemon (to allow the JVM to exit quickly on demand) work threads.
529 workerThreadPoolDiscardable =
530 new ThreadPoolExecutor(1, 1,
531 600L, TimeUnit.SECONDS, // Keep worker threads alive for 10 minutes...
532 new SynchronousQueue<Runnable>(), // No queueing.
533 new ThreadUtils.DaemonThreadFactory("workerThreadPoolDiscardable : ExecutorService", true),
534 new ThreadPoolExecutor.DiscardPolicy());
535
536 try
537 {
538 tunnelURL = (bs != null) ? (new URL(bs.getCodeBase(), ScorerCreator.scorerTunnelRRURL)).toString() :
539 ("http://" + serverHostname + ScorerCreator.scorerTunnelRRURL);
540 }
541 catch(final MalformedURLException e) { throw new Error(e); /* Should not happen... */ }
542 fakeTunnel = new FakeTunnel(serverHostname, log, getCacheDir());
543 cb = new Callback(tunnelURL, fakeTunnel, log);
544 }
545
546 /**Server host name; never null. */
547 private final String serverHostname;
548 /**Logger; never null. */
549 private final SimpleLoggerIF log;
550 /**Target percent CPU usage (of all CPUs in host machine) in exclusive range ]1,100[, ie strictly positive. */
551 private volatile int percentCPU;
552 /**Target CPU work chunk in ms; strictly positive. */
553 private volatile int chunkMs;
554
555
556 /**Handle on JWS basic service; null if none.
557 * Package-visible so as to be directly usable by GUI classes.
558 */
559 final BasicService bs;
560
561 /**Handle on JWS persistence service; null if none.
562 * Package-visible so as to be directly usable by GUI classes.
563 */
564 final PersistenceService ps;
565
566 /**Handle on JWS file-open service; null if none.
567 * Package-visible so as to be directly usable by GUI classes.
568 */
569 final FileOpenService fos;
570
571 /**Handle on JWS singleton service; null if none.
572 * Package-visible so as to be directly usable by GUI classes.
573 */
574 final SingleInstanceService sis;
575
576 /**Handle on JWS extended service; null if none.
577 * Package-visible so as to be directly usable by GUI classes.
578 */
579 final ExtendedService exs;
580
581
582 /**Thread pool; reconfigurable at run-time; never null. */
583 private final ThreadPoolExecutor workerThreadPoolDiscardable;
584
585 /**Last time we polled the server for calibration data.
586 * Initially zero (indicating "never").
587 * <p>
588 * Marked volatile for thread-safe lock-free access.
589 */
590 private volatile long lastFetchedCalibrationData;
591
592 /**Start sequence; we look for a line containing this to start our data. */
593 public static final String DATA_BLOCK_START = "<ul class=\"data\">";
594 /**End sequence; we look for a line containing this to start our data. */
595 public static final String DATA_BLOCK_END = "</ul>";
596
597 /**Method to use in getData() to connect to server (POST or GET).
598 * POST easier to distinguish from normal browser (human) traffic,
599 * but may not get through some proxies or firewalls.
600 */
601 private static final String GETDATA_METHOD = "GET";
602
603 /**Fetch specified ordered records from server; may be empty but never null.
604 * Looks for first block of lines between <ul class="data"> and </ul>,
605 * extracting each line starting with <li> and ending with </li> as a record.
606 *
607 * @param q if not null, part to pass as the generic (query) parameter (will be URL encoded)
608 */
609 private List<String> getData(final String tunnelURL, final String dataType, final String q)
610 throws IOException
611 {
612 // Generic query part.
613 final String qPart = ScorerCreator.DT_PARAM_NAME + '=' + dataType + ((q == null) ? "" :
614 ("&"+ScorerCreator.DT__GEN_DATA+"=" + URLEncoder.encode(q, CoreConsts.FILE_ENCODING_UTF_8)));
615
616 // If we are using te playback mechanism, the file name is based on the query.
617 // In playback mode we never attempt to fetch from the server.
618 final String cacheFile = qPart + ".txt";
619 if(PLAYBACK_NO_NET)
620 {
621 final File d = getCacheDir();
622 if(d == null) { throw new EOFException("ERROR: playback cache not available"); }
623 final File pbf = new File(d, cacheFile);
624 // Read the cached file line by line (and trim it)...
625 final LineNumberReader lr = new LineNumberReader(new FileReader(pbf));
626 try
627 {
628 final List<String> result = new ArrayList<String>();
629 String line;
630 while((null != (line = lr.readLine())))
631 { result.add(line.trim()); }
632 return(result);
633 }
634 finally { lr.close(); }
635 }
636
637 // 'Cache-buster' random URL parameter/component.
638 final String rndPart = "&rnd=" + (Rnd.fastRnd.nextInt() >>> 1);
639 // Full URL.
640 final URL tunnel = new URL(tunnelURL + '?' + qPart + rndPart);
641 if(IsDebug.isDebug) { log.log("[About to open connection to "+tunnel+"...]"); }
642 final HttpURLConnection uc = (HttpURLConnection) tunnel.openConnection();
643 uc.setRequestMethod(GETDATA_METHOD);
644 uc.setUseCaches(false); // Forbid cacheing to ensure that we always get fresh data.
645 uc.setRequestProperty("Referer", tunnel.toString());
646 uc.setDoInput(true);
647 // Check for a positive response.
648 final int responseCode = uc.getResponseCode();
649 if(IsDebug.isDebug) { log.log("[Got response code "+responseCode+"...]"); }
650 if(responseCode != 200) { log.log("WARNING: unexpected response code "+responseCode+" trying to get data from "+tunnelURL); }
651 // Read from HTML page line by line, assuming UTF-8 (or subset, eg ASCII-7) encoding.
652 final LineNumberReader lr = new LineNumberReader(new InputStreamReader(uc.getInputStream(), CoreConsts.FILE_ENCODING_UTF_8));
653 try
654 {
655 final List<String> result = new ArrayList<String>();
656
657 // Look for the start sequence on a line on its own...
658 String line;
659 while((null != (line = lr.readLine())) && !line.trim().equals(DATA_BLOCK_START))
660 {
661 //if(IsDebug.isDebug) { log.log("[Skipping line: "+line+"...]"); }
662 }
663 // Now record all <li>...</li> lines until we hit the end sequence.
664 while((null != (line = lr.readLine())) && !(line = line.trim()).equals(DATA_BLOCK_END))
665 {
666 if(line.startsWith("<li>") && line.endsWith("</li>"))
667 { result.add(line.substring(4, line.length()-5)); }
668 //else if(IsDebug.isDebug) { log.log("[Non-record line: "+line+"...]"); }
669 }
670
671 if(IsDebug.isDebug) { log.log("[Result lines: "+result.size()+"...]"); }
672
673 // If recording data from the server, do so now,
674 // but don't fail if the cache area isn't available.
675 if(PLAYBACK_SAVE_DATA)
676 {
677 final File d = getCacheDir();
678 if(d == null) { log.log("WARNING: playback cache not available"); }
679 else
680 {
681 final File pbf = new File(d, cacheFile);
682 final ByteArrayOutputStream baos = new ByteArrayOutputStream(result.size() << 6);
683 final PrintWriter pw = new PrintWriter(new OutputStreamWriter(baos));
684 for(final String l : result)
685 { pw.println(l); }
686 pw.flush();
687 FileTools.replacePublishedFile(pbf.getPath(), baos.toByteArray(), false);
688 }
689 }
690
691 return(result);
692 }
693 finally { lr.close(); /* Release resources. */ }
694 }
695
696 /**Get calibration data from server; never null but may be empty.
697 * @param baseName valid Scorer base name supported by current cache/population
698 * @return the set of full calibration exhibit names for the given Scorer base type,
699 * and the map from the short version of those names to their ScoreAndConf calibration/vote values
700 */
701 private Tuple.Pair<Set<String>, Map<Name.ExhibitShort, ScoreAndConf>> getCalibrationSet(final String tunnelURL,
702 final String baseName)
703 throws IOException
704 {
705 if((tunnelURL == null) ||
706 (baseName == null) || !cb.getCache().getBaseScorersWithoutParameters().contains(baseName))
707 { throw new IllegalArgumentException(); }
708 try
709 {
710 final List<String> records = getData(tunnelURL, ScorerCreator.DT_CALIB, baseName);
711 if(IsDebug.isDebug) { log.log("Got "+(records.size())+" raw calibration records."); }
712 // Each record should parse as two space-separated components,
713 // a full exhibit name and a ScoreAndConf value.
714 final Tuple.Pair<Set<String>, Map<Name.ExhibitShort, ScoreAndConf>> result = new Pair<Set<String>, Map<Name.ExhibitShort,ScoreAndConf>>(new HashSet<String>(), new HashMap<Name.ExhibitShort, ScoreAndConf>());
715 for(final String r : records)
716 {
717 final String[] fields = r.split(" ");
718 if(fields.length != 2)
719 { throw new IOException("wrong number of fields in calibration-data record"); }
720 if(!ExhibitName.validNameSyntax(fields[0]))
721 { throw new IOException("invalid exhibit name in calibration-data record"); }
722 result.first.add(fields[0]);
723 result.second.put(Name.ExhibitFull.create(fields[0]).getShortName(), ScoreAndConf.fromString(fields[1]));
724 }
725 return(result);
726 }
727 catch(final RuntimeException e)
728 {
729 log.log("Problem parsing data in calibration set: " + e.getMessage());
730 throw new IOException(e);
731 }
732 }
733
734 /**Lock used to serialise new/best Scorer submissions back to the server; never null. */
735 private static final Object _submitLock = new Object();
736
737 /**Full URL for dedicated 'AH' tunnel on server; never null. */
738 private final String tunnelURL;
739 /**Synthetic data pipeline tunnel back to server; never null. */
740 private final FakeTunnel fakeTunnel;
741 /**Callback object for reporting new/best Scorer; never null. */
742 private final Callback cb;
743
744 /**Send the new Scorer back to the server.
745 * We serialise these to never send back more than one at a time
746 * to avoid wasting server connection resources.
747 *
748 * @param tunnelURL the full URL for the tunnel servlet; never null
749 * @param scorerNameAndParameters the Scorer description to send to the server
750 *
751 * @return true if we seem to have sent the data successfully to the server
752 */
753 private static boolean sendNewBestScorer(final String tunnelURL,
754 final String scorerNameAndParameters,
755 final SimpleLoggerIF log)
756 {
757 if(PLAYBACK_NO_NET)
758 {
759 log.log("NOT SUBMITTING SCORER TO SERVER: IN STAND-ANLONE TEST MODE");
760 return(true); // Lie, so displays, etc, are as normal as possible.
761 }
762
763 log.log("[Submitting new 'best' Scorer "+scorerNameAndParameters+" ]");
764 try
765 {
766 final URL tunnel = new URL(tunnelURL + '?' + ScorerCreator.DT_PARAM_NAME + '=' + ScorerCreator.DT_POSTNEWSC +
767 '&' + ScorerCreator.DT__GEN_DATA + '=' + URLEncoder.encode(scorerNameAndParameters, CoreConsts.FILE_ENCODING_UTF_8));
768 synchronized(_submitLock)
769 {
770 final HttpURLConnection uc = (HttpURLConnection) tunnel.openConnection();
771 uc.setRequestMethod(GETDATA_METHOD); // Use same method as other server interactions.
772 uc.setUseCaches(false); // Forbid cacheing to ensure that we always get fresh data.
773 uc.setRequestProperty("Referer", tunnel.toString());
774 uc.connect();
775 if(uc.getResponseCode() != 200)
776 {
777 log.log("WARNING: unexpected response code when submitting new Scorer to server: " + uc.getResponseCode());
778 return(false);
779 }
780 return(true); // Seems to have been accepted by the server.
781 }
782 }
783 catch(final IOException e)
784 {
785 log.log("WARNING: I/O problem when submitting Scorer to server: "+e.getMessage());
786 return(false);
787 }
788 }
789
790 /**Container of various items of information we need to run the worker threads.
791 * The cache and worker instances are cleared/replaced atomically.
792 */
793 private static final class Callback implements org.hd.d.pg2k.ai.scorer.ScorerPopulation.NewBestCallbackIF
794 {
795 /**Make an instance wrapped round the tunnel URL, log and (restricted) dataSource.
796 */
797 Callback(final String tunnelURL,
798 final SimpleExhibitPipelineIF dataSource,
799 final SimpleLoggerIF log)
800 {
801 if((tunnelURL == null) || (dataSource == null) || (log == null))
802 { throw new IllegalArgumentException(); }
803 this.dataSource = dataSource;
804 this.log = log;
805
806 // Create initial cache, etc.
807 reset();
808 }
809
810 /**Data source; never null. */
811 final SimpleExhibitPipelineIF dataSource;
812 /**Logger; never null. */
813 final SimpleLoggerIF log;
814
815 /**Light-weight Scorer cache and population; never null. */
816 private MiniScorerCacheImpl cache;
817 /**Work controller object; never null. */
818 private ScorerCreator.ScorerWork scorerWork;
819 /**Generation number; strictly positive.
820 * When the cache is non-zero-sized and is reset, this value is incremented.
821 * <p>
822 * Accessed under the same lock used to protect the cache.
823 */
824 private int generation = 1;
825
826 /**Get the cache; never null. */
827 synchronized MiniScorerCacheImpl getCache() { return(cache); }
828 /**Get the work controller object; never null. */
829 synchronized ScorerCreator.ScorerWork getScorerWork() { return(scorerWork); }
830 /**Get the current generation number; strictly positive. */
831 synchronized int getGeneration() { return(generation); }
832
833 /**Replace cache and worker objects atomically.
834 */
835 final synchronized void reset()
836 {
837 // Update the generation count if zapping a live population.
838 if((cache != null) && (cache.size() != 0)) { ++generation; }
839
840 // We'll use a 'mini' Scorer cache as a satellite of our server's population.
841 // This 'mini' form is much less resource-intensive (memory and I/O especially).
842 cache = new MiniScorerCacheImpl(dataSource, log, this);
843 // We use the tunnel very sparingly,
844 // and in particular access to the event mechanism is not permitted.
845 scorerWork = new ScorerCreator.ScorerWork(cache, log, false);
846 }
847
848 /**Immutable Set of server's "best" Scorers; never null but may be empty.
849 * Marked volatile for atomic replacement and lock-free thread-safe access;
850 * this is not protected under the same instance lock as the cache/worker fields.
851 */
852 private volatile Set<String> serverBestScorerNAP = Collections.emptySet();
853 /**Get immutable Set of server's "best" Scorers; never null but may be empty. */
854 Set<String> getServerBestScorerNAP() { return(serverBestScorerNAP); }
855 /**Set the server's "best" Scorers; may not be null nor contain nulls or invalid name-and-parameter values. */
856 void setServerBestScorerNAP(final Collection<String> bestScorers)
857 {
858 if(bestScorers == null) { throw new IllegalArgumentException(); }
859 // Take defensive immutable copy.
860 final Set<String> copy = Collections.unmodifiableSet(new HashSet<String>(bestScorers));
861 // Verify all Scorers are parsable (ie syntactically correct)...
862 for(final String snp : copy)
863 { AbstractScorer.parseNameAndParameters(snp); }
864 // Store the copy.
865 serverBestScorerNAP = copy;
866 }
867
868 /**Time last new Scorer was locally generated and reported.
869 * This notes time of generation of a new Scorer that was the best in its sub-population.
870 * While these reports are still coming in, this client is probably doing useful work.
871 * <p>
872 * Marked volatile for lock-free thread-safe access.
873 */
874 private volatile long lastNewScorerReport;
875 /**Get time last new Scorer was locally generated and reported. */
876 long getLastNewScorerReport() { return(lastNewScorerReport); }
877
878 /**Last 'best' Scorer queued to report to server; null if currently no such item.
879 * We only retain the last item since we assume it to be the best.
880 * We then poll for this to send it back to the server,
881 * possibly reducing the traffic sent the server's way.
882 * <p>
883 * Accessed under the instance lock.
884 */
885 private String queuedBestScorer;
886 /**Atomically get and clear any queued best Scorer to send to the server; null if currently no such item. */
887 synchronized String getQueuedBestScorer()
888 {
889 final String result = queuedBestScorer;
890 if(result != null) { queuedBestScorer = null; }
891 return(result);
892 }
893
894 /**Used to post a new "best of breed" Scorer name and parameters for posterity.
895 * This routine should not throw any exceptions.
896 * <p>
897 * This routine is not synchronized so as to avoid possibility of deadlock,
898 * especially on reentrant calls to the cache and/or computations.
899 * It does grab the cache value under a lock to work on later at leisure.
900 *
901 * @param scorerNameAndParameters valid name-and-parameters set; never null.
902 */
903 public void reportNewBestScorer(final String scorerNameAndParameters)
904 {
905 // Grab the lock just long enough to capture any values we need.
906 final MiniScorerCacheImpl cacheCopy;
907 synchronized(this)
908 {
909 cacheCopy = cache;
910 }
911
912 // Take a private handle on the Set of the server's best Scorers.
913 final Set<String> bestCopy = serverBestScorerNAP;
914
915 // If we don't have any benchmark server-best Scorers
916 // then ignore new Scorer reports for now.
917 if(bestCopy.isEmpty()) { return; }
918
919 // This doesn't get reported back to the server
920 // if it is one of the server's original Scorers!
921 if(bestCopy.contains(scorerNameAndParameters)) { return; }
922
923 // Record time of this new Scorer report,
924 // showing that worthwhile activity is contining.
925 lastNewScorerReport = System.currentTimeMillis();
926
927 try
928 {
929 // This doesn't get reported back to the server
930 // if not better than all of the server's original Scorers of the same base type.
931 // Relies on the Scorer cache and possible weight computation being reentrant.
932 final ScoreAndConf newSac = cacheCopy.computeScorerWeighting(scorerNameAndParameters, true, null);
933 final int newGoodness = ScoreAndConf.computeScorerGoodness(newSac);
934 // Reject any new Scorer with non-positive goodness.
935 if(newGoodness <= 0) { return; }
936 // Now compare against same base-name exemplar Scorers from the server.
937 final Pair<String, Map<String, String>> newNAP = AbstractScorer.parseNameAndParameters(scorerNameAndParameters);
938 final String prefix = newNAP.first + ':';
939 for(final String serverSNP : bestCopy)
940 {
941 // Only judge the new Scorer by those with the same base type (and that have parameters).
942 if(!serverSNP.startsWith(prefix)) { continue; }
943 final int goodnessOfBest = ScoreAndConf.computeScorerGoodness(cacheCopy.computeScorerWeighting(serverSNP, true, null));
944 if(IsDebug.isDebug && (goodnessOfBest <= 0)) { System.out.println("Non-positive goodness="+goodnessOfBest+" for server-best Scorer "+serverSNP); }
945 if(newGoodness <= goodnessOfBest)
946 {
947 // The new Scorer is not better than this similar Scorer
948 // that the server told us about,
949 // so give up immediately.
950 return;
951 }
952 //log.log("[Better @ "+newGoodness+" than "+goodnessOfBest+" for "+serverSNP+".]");
953 }
954
955 // Atomically queue this improved Scorer to send back to the server...
956 synchronized(this)
957 {
958 // Discards any old (presumably less-good) pending Scorer value
959 // since this one should be "better".
960 queuedBestScorer = scorerNameAndParameters;
961 }
962 }
963 catch(final IOException e)
964 {
965 // Report but otherwise ignore any I/O error.
966 // This may mean that we fail to report the new 'star' back to base.
967 log.log("ERROR: problem evaluating/reporting new 'best' Scorer: " + e.getMessage());
968 }
969 }
970 }
971
972 /**Get current target CPU load percentage in range ]1,100[. */
973 int getTargetPercentCPU() { return(percentCPU); }
974 /**Set current target CPU load percentage in range ]1,100[.
975 * An out-of-range value is coerced into range.
976 * <p>
977 * We also adjust the CPU chunk size for better scheduling and CPU behaviour at extremes.
978 * @param pc target percent CPU time in the range 1 to 100.
979 */
980 void setTargetPercentCPU(final int pc)
981 {
982 // Corece target %age to a reasonable/legal range.
983 percentCPU = Math.max(1, Math.min(100, pc));
984
985 // Set a short chunk/quantum for low loads to keep pauses and thus 'lumpiness to a minimum.
986 if(percentCPU <= DEFAULT_CPU_PERCENT/3) { chunkMs = DEFAULT_CPU_CHUNK_MS/2; }
987 // Set a large chunk/quantum for high loads for greatest CPU efficiency.
988 else if(percentCPU >= 100-(DEFAULT_CPU_PERCENT/3)) { chunkMs = DEFAULT_CPU_CHUNK_MS*2; }
989 // For values around the default, use the default timeslice.
990 else { chunkMs = DEFAULT_CPU_CHUNK_MS; }
991 }
992
993 /**Get generation number; strictly positive. */
994 int getGeneration() { return(cb.getGeneration()); }
995
996 /**Allow polling for population size; non-negative. */
997 int getPopulationSize() { return(cb.getCache().size()); }
998
999 /**Get size of current calibration set, or zero if none. */
1000 int getCalibrationSetSize() { return(cb.getCache().getCalibrationExhibitsAndScores().size()); }
1001
1002 /**Number of Scorers reported back home (ignoring failures); never null, never negative. */
1003 private final AtomicInteger scorersReported = new AtomicInteger();
1004 /**Get number of Scorers reported back home; never negative. */
1005 final int getScorersReported() { return(scorersReported.get()); }
1006
1007 /**Score/confidence of last Scorer reported; never null.
1008 * Marked volatile for safe lock-free access.
1009 */
1010 private volatile ScoreAndConf lastSAC = ScoreAndConf.NO_OPINION;
1011 /**Get score/confidence of last Scorer reported; never null. */
1012 ScoreAndConf getLastSAC() { return(lastSAC); }
1013
1014 /**Time remaining for this generation; reset if a new/best Scorer is found.
1015 * Marked volatile for safe lock-free access.
1016 */
1017 private volatile int genTimeRemaining;
1018 /**Get time remaining for this generation; reset if a new/best Scorer is found. */
1019 int getGenTimeRemaining() { return(genTimeRemaining); }
1020
1021 /**Minimum time (in ms) spent working on one generation; strictly positive.
1022 * Setting a lower bound on generation time avoids wasting too much bandwidth
1023 * on the client fetching calibraton data
1024 * and maximises the chance of a client finding an 'unusual'/rare Scorer,
1025 * but setting it too high risks wasting client CPU when no more progress can be made.
1026 * <p>
1027 * A value in the range of minutes to hours is probably appropriate.
1028 */
1029 public static final int MIN_GENERATION_MS = Math.min(1000*CoreConsts.DEFAULT_TEMPORAL_SLACKNESS_S, 15 * 60 * 1000);
1030 static { assert(MIN_GENERATION_MS > 0); /* Check not overflowed. */ }
1031
1032 /**Minimum time between reports back to the server; strictly positive but much less than MIN_GENERATION_MS.
1033 * This avoids wasteful 'chatter' back to base,
1034 * but ensures that we should always have time to flush our 'last' result back for a given generation.
1035 * <p>
1036 * Should be small enough to minimise any chance of significant work loss with unexpected shutdown.
1037 */
1038 public static final int MIN_SCORER_SERVER_REPORT_GAP_MS = MIN_GENERATION_MS / 4;
1039 static { assert((MIN_SCORER_SERVER_REPORT_GAP_MS > 0) && (MIN_SCORER_SERVER_REPORT_GAP_MS < MIN_GENERATION_MS/2)); }
1040
1041 /**Simple core worker routine, usually runs until JVM terminates.
1042 * Creates a communication tunnel back home,
1043 * then runs whatever work needs doing.
1044 * <p>
1045 * Will quit immediately if it cannot "call home" when it starts up.
1046 * Thereafter it will retry to overcome errors as necessary.
1047 * <p>
1048 * Not thread-safe (ie only one thread should call this at once for this instance).
1049 * <p>
1050 * Package-visible to be exposed only to this package.
1051 */
1052 synchronized void doWork()
1053 {
1054 final long workStartTime = System.currentTimeMillis();
1055
1056 // Create a test connection back to our server; quit if we can't do it.
1057 if(!PLAYBACK_NO_NET)
1058 {
1059 log.log("Connecting to server at: "+serverHostname+", full tunnel "+tunnelURL);
1060 try
1061 {
1062 final URL tunnel = new URL(tunnelURL);
1063 tunnel.openConnection().connect();
1064 log.log("Successfully connected to server at "+serverHostname+", so starting work...");
1065 }
1066 catch(final IOException e)
1067 {
1068 log.log("STOPPED: could not connect to server at "+serverHostname+", so quitting now.");
1069 return;
1070 }
1071 }
1072 else if(getCacheDir() == null)
1073 {
1074 log.log("STOPPED: no cache/playback directory, so quitting now.");
1075 return;
1076 }
1077
1078 // Note our previous-best Set of Scorers preserved from one population to the next.
1079 // Initially empty; never set to null nor to empty.
1080 final AtomicReference<Set<String>> localPreviousBest = new AtomicReference<Set<String>>(Collections.<String>emptySet());
1081
1082 // Combined (immutable) calibration set,
1083 // as a map from the Scorer base name to the accumulated calibration data.
1084 // If not null then we continue working with the same base Scorer.
1085 // We accumulate calibration data in here (overwriting older stuff)
1086 // until we exceed a given multiple of the minimum size
1087 // TODO or it gets ditched indicating memory stress.
1088 // This may not be null but its referent may be, eg after memory stress.
1089 // This helps us regulate memory use to suit the JVM that we are running in.
1090 // This needs to be polled relatively frequently to avoid being cleared simply through age...
1091 SoftReference<Pair<String, Pair<Set<String>, Map<Name.ExhibitShort, ScoreAndConf>>>> cumulativeCalibrationSet = new SoftReference<Pair<String,Pair<Set<String>,Map<Name.ExhibitShort,ScoreAndConf>>>>(null);
1092
1093 // Time we last reported our status to the user, and the interval between reports...
1094 final long beforeLoopStartTime = workStartTime;
1095 long lastReportTime = 0;
1096 final long REPORT_INTERVAL_MAX_MS = 120000L; // 2 mins...
1097
1098 // Time we last reported a Scorer to the server.
1099 // We allow the first result to be sent immediately as evidence of a working connection.
1100 long lastScorerReportTime = 0;
1101
1102 // Currently-running active worker threads; never negative.
1103 final AtomicInteger activeWorkers = new AtomicInteger();
1104
1105 // True iff the 'final' fractional thread is currently running.
1106 final AtomicBoolean runningFinalFractionalThread = new AtomicBoolean();
1107
1108 // An object that a completing worker can signal us on.
1109 final Object taskCompletedSignaler = new Object();
1110
1111 // Main work loop.
1112 while(!pleaseQuit)
1113 {
1114 // Collect some stats for display and use later in the loop.
1115 final long loopStart = System.currentTimeMillis();
1116
1117 // Check our cumulative calibration data set cache often
1118 // so that if it goes to null we take that as a sign of memory stress
1119 // (ie forced clearing of the SoftReference even though frequently dereferenced).
1120 // We use this in conjunction with another measure to reduce false positives.
1121 final boolean possibleMemoryStress = (cumulativeCalibrationSet.get() == null) &&
1122 MemoryTools.isMemoryStressed();
1123
1124 // CPU utilisation.
1125 final int availableCPUs = Runtime.getRuntime().availableProcessors();
1126 final int maxCPUsToUse = Math.min(availableCPUs, Math.max(1, ((availableCPUs * percentCPU) + 50) / 100));
1127 // Run at most one thread until population is seeded
1128 // and while there is any evidence of memory stress.
1129 final int targetWorkerCount = (possibleMemoryStress || (cb.getCache().size() == 0)) ? 1 : maxCPUsToUse;
1130
1131 // Are we due to fetch new calibration data
1132 // because we're running out of steam with the current lot?
1133 // Work on roughly the global "slackness" cycle or ~15 minutes of no progress,
1134 // emprically determined to be a reasonable time to try a new data set.
1135 final long timeOfNextFetchFromServer = Math.max(lastFetchedCalibrationData, cb.getLastNewScorerReport()) + MIN_GENERATION_MS;
1136 final boolean dueToFetchNewCalibrationData = (timeOfNextFetchFromServer <= loopStart);
1137 if(dueToFetchNewCalibrationData) { genTimeRemaining = 0; }
1138 else { genTimeRemaining = (int) Math.min(MIN_GENERATION_MS, timeOfNextFetchFromServer - loopStart); }
1139
1140 // We'll report some status info here if the time is right...
1141 if(loopStart - lastReportTime >= Math.min(REPORT_INTERVAL_MAX_MS, (loopStart-beforeLoopStartTime)/4))
1142 {
1143 lastReportTime = loopStart;
1144 log.log("INFO: status: population size = "+cb.getCache().size()+"; " +
1145 "work time remaining on current data set = "+(dueToFetchNewCalibrationData?0:((timeOfNextFetchFromServer-loopStart)/60000L))+"m; " +
1146 "CPU limit = "+targetWorkerCount+".");
1147 }
1148
1149 // Send back any pending/queued new/best Scorer to the server,
1150 // though only check if it is a decent while since our last attempt.
1151 // We're prepared to block (and avoid starting more work) while we try to send this,
1152 // as there's no point creating more values that we can't send.
1153 // We only try sending any a queued value at most once.
1154 if(loopStart > lastScorerReportTime + MIN_SCORER_SERVER_REPORT_GAP_MS)
1155 {
1156 final String queuedNewBestScorer = cb.getQueuedBestScorer();
1157 if(queuedNewBestScorer != null)
1158 {
1159 final boolean sent = sendNewBestScorer(tunnelURL, queuedNewBestScorer, log);
1160 if(sent)
1161 {
1162 // Report the Scorer as 'sent' only if we believe that it has been...
1163 scorersReported.incrementAndGet();
1164 try { lastSAC = cb.getCache().computeScorerWeighting(queuedNewBestScorer, true, null); }
1165 catch(final IOException e) { e.printStackTrace(); /* Whine, but absorb. */ }
1166 }
1167 // Postpone next Scorer report attempt whether or not this one was successful.
1168 lastScorerReportTime = System.currentTimeMillis();
1169 }
1170 }
1171
1172 // We need a calibration data set and the server's 'best Scorer' Set
1173 // to be able to do any useful work,
1174 // and we poll the server occasionally for a new/up-to-date set if no longer generating new results.
1175 if(dueToFetchNewCalibrationData ||
1176 cb.getServerBestScorerNAP().isEmpty() ||
1177 cb.getCache().getCalibrationExhibitsAndScores().isEmpty())
1178 {
1179 try
1180 {
1181 // See if we have any accumulated calibration data.
1182 final Pair<String, Pair<Set<String>, Map<Name.ExhibitShort, ScoreAndConf>>> extantCalibData = cumulativeCalibrationSet.get();
1183
1184 // Choose which of our known Scorer base names we will work with.
1185 final String baseName;
1186 // If we have any accumulated data then we stick with the same base name.
1187 if(extantCalibData != null)
1188 { baseName = extantCalibData.first; }
1189 // If we have no accumulated data, eg on the first pass, then we pick one at random.
1190 else
1191 {
1192 final List<String> baseNames = new ArrayList<String>(cb.getCache().getBaseScorersWithoutParameters());
1193 baseName = (baseNames.size() == 1) ? baseNames.get(0) : baseNames.get(Rnd.goodRnd.nextInt(baseNames.size()));
1194 }
1195 assert(baseName != null);
1196
1197
1198 // Fetch current 'best Scorer' and calibration data from server,
1199 // possibly adding to any retained calibration data from last time.
1200 // Assume any empty response implies that the server is busy or otherwise not ready.
1201 final List<String> bestScorers = getData(tunnelURL, ScorerCreator.DT_BESTSC, baseName);
1202 final Pair<Set<String>, Map<Name.ExhibitShort, ScoreAndConf>> calibrationSetFromServer = !bestScorers.isEmpty() ?
1203 getCalibrationSet(tunnelURL, baseName) :
1204 // Don't trouble the server as it may be busy...
1205 new Pair<Set<String>, Map<Name.ExhibitShort, ScoreAndConf>>(Collections.<String>emptySet(), Collections.<Name.ExhibitShort, ScoreAndConf>emptyMap());
1206 // Compute cumulative calibration data: discard if too big.
1207 final Set<String> cumulExhibits = new HashSet<String>(calibrationSetFromServer.first);
1208 final Map<Name.ExhibitShort, ScoreAndConf> cumulCS = new HashMap<Name.ExhibitShort, ScoreAndConf>(calibrationSetFromServer.second);
1209 if(extantCalibData != null)
1210 {
1211 cumulExhibits.addAll(extantCalibData.second.first);
1212 cumulCS.putAll(extantCalibData.second.second);
1213 assert(cumulExhibits.size() == cumulCS.size());
1214 if(cumulExhibits.size() > 2*ScorerCreator.SUGGESTED_CALIB_SET_SIZE)
1215 {
1216 // Gotten too big, retain just the values received from the server this time.
1217 cumulExhibits.clear();
1218 cumulCS.clear();
1219 cumulExhibits.addAll(calibrationSetFromServer.first);
1220 cumulCS.putAll(calibrationSetFromServer.second);
1221 }
1222 }
1223 // Immutable set of exhibits and calibration map.
1224 final Pair<Set<String>, Map<Name.ExhibitShort, ScoreAndConf>> calibrationSet =
1225 new Pair<Set<String>, Map<Name.ExhibitShort,ScoreAndConf>>(Collections.unmodifiableSet(cumulExhibits),
1226 Collections.unmodifiableMap(cumulCS));
1227 // Cache this value for next time.Name.ExhibitShort
1228 cumulativeCalibrationSet = new SoftReference<Pair<String,Pair<Set<String>,Map<Name.ExhibitShort,ScoreAndConf>>>>(
1229 new Pair<String,Pair<Set<String>,Map<Name.ExhibitShort,ScoreAndConf>>>(baseName, calibrationSet));
1230
1231
1232 // Only reset things if the calibration data or best Scorers are non-empty/null and actually changed...
1233 if(!calibrationSet.second.equals(cb.getCache().getCalibrationExhibitsAndScores()) ||
1234 !cb.getServerBestScorerNAP().equals(new HashSet<String>(bestScorers)))
1235 {
1236 // Capture the local currently-best Scorers.
1237 final Set<String> currentBest = cb.getCache().getCurrentScorersWithParameters(true);
1238 if(!currentBest.isEmpty()) { localPreviousBest.set(currentBest); }
1239
1240 log.log("[Calibration data set size "+calibrationSet.second.size()+", server best Scorers size "+bestScorers.size()+".]");
1241
1242 // We wait for all the current worker threads to stop,
1243 // then clear all the (stale) population results based on the old calibration set,
1244 // though we'll remember the best old Scorers that we had found
1245 // in case they are still good.
1246 while(workerThreadPoolDiscardable.getActiveCount() > 0)
1247 {
1248 log.log("[Waiting for workers to stop...]");
1249 try { Thread.sleep(chunkMs*2); }
1250 catch(final InterruptedException e) { return; /* Quit on interruption. */ }
1251 }
1252 // Discard the old population and cache...
1253 cb.reset();
1254
1255 cb.setServerBestScorerNAP(bestScorers);
1256 fakeTunnel.setSupportedExhibits(calibrationSet.first);
1257 cb.getCache().setCalibrationExhibitsAndScores(calibrationSet.second);
1258 assert(cb.getServerBestScorerNAP().size() == bestScorers.size());
1259 assert(cb.getCache().getCalibrationExhibitsAndScores().equals(calibrationSet.second));
1260
1261 // Get as many of the old best items evaluated as we can
1262 // in case they are still useful...
1263 final MiniScorerCacheImpl newCache = cb.getCache();
1264 for(final String snp : currentBest)
1265 {
1266 try { newCache.computeScorerWeighting(snp, true, null); }
1267 catch(final Exception e) { /* Absorb/ignore any errors. */ }
1268 }
1269 // Insert/evaluate the server's "best" Scorers here.
1270 for(final String snp : bestScorers)
1271 {
1272 try { newCache.computeScorerWeighting(snp, true, null); }
1273 catch(final Exception e) { /* Absorb/ignore any errors. */ }
1274 }
1275 }
1276 }
1277 // catch(final IOException e1)
1278 // { log.log("WARNING: fetch of calibration set failed: will retry"); }
1279 catch(final Exception e1)
1280 { log.log("WARNING: fetch of calibration set failed: will retry: "+e1.getMessage()); }
1281
1282 // If calibration data set and/or 'best Scorer' set (still) empty,
1283 // then sleep and then try again later.
1284 if(cb.getCache().getCalibrationExhibitsAndScores().isEmpty() ||
1285 cb.getServerBestScorerNAP().isEmpty())
1286 {
1287 log.log("[Waiting to fetch calibration data (currently "+cb.getCache().getCalibrationExhibitsAndScores().size()+") and/or 'best Scorers' (currently "+cb.getServerBestScorerNAP().size()+") from server...]");
1288 // Sleep a good while so as not to pester the server too much (mean 60s).
1289 // The server may be busy or overloaded if it hasn't responded with non-empty results.
1290 try { Thread.sleep(30000 + Rnd.fastRnd.nextInt(60000)); }
1291 // Quit if interrupted.
1292 catch(final InterruptedException e) { return; }
1293 continue;
1294 }
1295
1296 // If calibration set not empty, then put off next fetch attempt for a while.
1297 // We can keep on working with whatever we have.
1298 lastFetchedCalibrationData = System.currentTimeMillis();
1299 }
1300
1301 // If we've got enough workers busy then we can sleep a while.
1302 if(activeWorkers.get() >= targetWorkerCount)
1303 {
1304 // Sleep less than a full chunk to keep threads/CPU busy,
1305 // but not so short a time as to spin pointlessly when not busy.
1306 // However, hope to be woken immediately by a worker going idle.
1307 synchronized(taskCompletedSignaler)
1308 {
1309 try { taskCompletedSignaler.wait(Math.max(100, chunkMs / 2)); }
1310 // Return/quit if interrupted.
1311 catch(final InterruptedException e) { return; }
1312 continue;
1313 }
1314 }
1315
1316 // In the case where we are about to schedule the "last" thread/worker
1317 // and our pool size reaches our number of CPUs (and no 'last' thread is still running)
1318 // for example on a 1-CPU system or when wanting a very-nearly-full utilisation,
1319 // then we apply special rules for this final thread to use fractional CPU.
1320 final boolean scheduleFinalFractionalThread = (activeWorkers.get() >= targetWorkerCount-1) &&
1321 (!runningFinalFractionalThread.get());
1322 // assert((!scheduleFinalFractionalThread) || (targetWorkerCount >= availableCPUs));
1323
1324 // Make any on-the-fly adjustments of thread-pool parameters needed.
1325 // Adjust the number of pool CPUs if necessary.
1326 if(targetWorkerCount > workerThreadPoolDiscardable.getMaximumPoolSize())
1327 {
1328 // Increase the limits carefully, increasing the max size first.
1329 workerThreadPoolDiscardable.setMaximumPoolSize(targetWorkerCount);
1330 workerThreadPoolDiscardable.setCorePoolSize(targetWorkerCount);
1331 }
1332 else if(targetWorkerCount < workerThreadPoolDiscardable.getMaximumPoolSize())
1333 {
1334 // Decrease the limits carefully, reducing the core size first.
1335 workerThreadPoolDiscardable.setCorePoolSize(targetWorkerCount);
1336 workerThreadPoolDiscardable.setMaximumPoolSize(targetWorkerCount);
1337 }
1338
1339 // OK, time to do the actual work...
1340 final int chunkTime = chunkMs;
1341 final ScorerWork sw = cb.getScorerWork();
1342 workerThreadPoolDiscardable.execute(new Runnable(){
1343 public void run()
1344 {
1345 // Note if the 'special'/'last' thread is starting up.
1346 if(scheduleFinalFractionalThread)
1347 {
1348 assert(!runningFinalFractionalThread.get()); // FIXME has failed... Cannot be currently running 'last' thread.
1349 runningFinalFractionalThread.set(true);
1350 }
1351 // Worker starting...
1352 activeWorkers.incrementAndGet();
1353 try
1354 {
1355 final long startTime = System.currentTimeMillis();
1356 final long endTime = startTime + chunkTime;
1357 // Minimum time to run until, to keep working hard.
1358 final long minEndTime = startTime + (chunkTime/8);
1359
1360 // Special runs do extra housekeeping and may use fractional CPU.
1361 final boolean specialRun = scheduleFinalFractionalThread && Rnd.fastRnd.nextBoolean();
1362
1363 final MiniScorerCacheImpl cache = cb.getCache();
1364 // If population is not yet seeded (or on each "special run")
1365 // try to force up-to-date goodnesses for
1366 // the server-best and recent-locally-good Scorers first.
1367 if(specialRun || (cache.size() == 0))
1368 {
1369 for(final String snp : localPreviousBest.get())
1370 {
1371 try { cache.computeScorerWeighting(snp, false, "(local)"); }
1372 catch(final Exception e) { /* Absorb/ignore any errors. */ }
1373 }
1374 // Insert/evaluate the server's "best" Scorers here.
1375 for(final String snp : cb.getServerBestScorerNAP())
1376 {
1377 try { cache.computeScorerWeighting(snp, false, "(server)"); }
1378 catch(final Exception e) { /* Absorb/ignore any errors. */ }
1379 }
1380 }
1381
1382 // Do the work...
1383 // Some of the time, if this is the "last" thread,
1384 // then do some of the extra background/housekeeping work too.
1385 try { sw.doChunk(endTime, minEndTime, specialRun, null); }
1386 finally
1387 {
1388 // If this is the 'last' thread
1389 // then after working we must sleep enough
1390 // to ensure that we use a suitable fraction of the CPU
1391 // of the 'last' or only CPU in the system.
1392 // Note that we always do the calculations
1393 // as if there is exactly 1 CPU;
1394 // the error in doing so diminishes anyway with more CPUs.
1395 // We limit the sleep duration from above and below
1396 // so as not to be too perturbed by the odd I/O hiccup, etc,
1397 // and to give an OS time to schedule other work.
1398 if(scheduleFinalFractionalThread)
1399 {
1400 final long timeTaken = (System.currentTimeMillis() - startTime);
1401 assert(percentCPU > 0);
1402 final long sleepTime = Math.max(1, Math.min(10*chunkTime,
1403 ((100/percentCPU)-1) * timeTaken));
1404 try { Thread.sleep(sleepTime); }
1405 // Quit on thread interrupt...
1406 catch(final InterruptedException e) { return; }
1407 }
1408 }
1409 }
1410 finally
1411 {
1412 activeWorkers.decrementAndGet();
1413 // Note if the 'special'/'last' thread is finishing.
1414 if(scheduleFinalFractionalThread)
1415 {
1416 assert(runningFinalFractionalThread.get()); // Nothing else should have cleared the flag..
1417 runningFinalFractionalThread.set(false);
1418 }
1419 // Wake up main loop if sleeping waiting for worker to become idle.
1420 synchronized(taskCompletedSignaler) { taskCompletedSignaler.notifyAll(); }
1421 }
1422 }
1423 });
1424 }
1425 }
1426
1427 /**Set to try to force main loop to quit ASAP, never set back to false.
1428 * Marked volatile for thread-safe lock-free access.
1429 */
1430 private volatile boolean pleaseQuit;
1431
1432 /**Try to shut down quickly and gracefully and with minimal loss of work.
1433 * We try to flush any pending results back to the server.
1434 */
1435 public void shutdown()
1436 {
1437 // Flag main loop to quit.
1438 pleaseQuit = true;
1439
1440 // Attempt to interrupt() all running worker tasks to stop ASAP.
1441 // Note that in some environment (such as JWS) this may throw a SecurityException,
1442 // so we make sure that we absorb any such nasties and continue with an orderly exit.
1443 try { workerThreadPoolDiscardable.shutdownNow(); }
1444 catch(final Throwable t) { /* Absorb: we're trying to quit ASAP... */ }
1445
1446 // Send any queued-up Scorer back to the server if possible.
1447 final String queuedNewBestScorer = cb.getQueuedBestScorer();
1448 if(queuedNewBestScorer != null)
1449 {
1450 log.log("Reporting final Scorer...");
1451 sendNewBestScorer(tunnelURL, queuedNewBestScorer, log);
1452 scorersReported.incrementAndGet();
1453 try { lastSAC = cb.getCache().computeScorerWeighting(queuedNewBestScorer, true, null); }
1454 catch(final IOException e) { e.printStackTrace(); /* Whine, but absorb. */ }
1455 }
1456 }
1457 }