001 /*
002 Copyright (c) 1996-2012, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029
030 package org.hd.d.pg2k.test.dev;
031
032 import java.io.File;
033 import java.io.IOException;
034 import java.io.PrintStream;
035 import java.nio.ByteBuffer;
036 import java.util.ArrayList;
037 import java.util.BitSet;
038 import java.util.HashMap;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Properties;
043 import java.util.Random;
044 import java.util.Timer;
045 import java.util.TimerTask;
046 import java.util.Vector;
047
048 import junit.framework.TestCase;
049
050 import org.hd.d.pg2k.svrCore.AbstractSimpleLogger;
051 import org.hd.d.pg2k.svrCore.AllExhibitImmutableData;
052 import org.hd.d.pg2k.svrCore.AllExhibitProperties;
053 import org.hd.d.pg2k.svrCore.ExhibitStaticAttr;
054 import org.hd.d.pg2k.svrCore.ExhibitThumbnails;
055 import org.hd.d.pg2k.svrCore.FileTools;
056 import org.hd.d.pg2k.svrCore.GenUtils;
057 import org.hd.d.pg2k.svrCore.Name;
058 import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
059 import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
060 import org.hd.d.pg2k.svrCore.Stratum;
061 import org.hd.d.pg2k.svrCore.datasource.ExhibitDataFileSource;
062 import org.hd.d.pg2k.svrCore.datasource.SimpleExhibitPipelineFilter;
063 import org.hd.d.pg2k.svrCore.datasource.SimpleExhibitPipelineIF;
064 import org.hd.d.pg2k.svrCore.datasource.simpleCache.ExhibitDataSimpleCache;
065 import org.hd.d.pg2k.svrCore.props.GenProps;
066 import org.hd.d.pg2k.svrCore.vars.BasicVarMgr;
067 import org.hd.d.pg2k.svrCore.vars.EventPeriod;
068 import org.hd.d.pg2k.svrCore.vars.EventVariableValue;
069 import org.hd.d.pg2k.svrCore.vars.SimpleVariableDefinition;
070 import org.hd.d.pg2k.svrCore.vars.SimpleVariableValue;
071 import org.hd.d.pg2k.svrCore.vars.SystemVariables;
072
073 /**Test the ExhibitSimpleDataCache.
074 */
075 public final class SimpleCacheTest extends TestCase
076 {
077 public SimpleCacheTest(final String name)
078 {
079 super(name);
080 }
081
082 private final SimpleLoggerIF logger = new AbstractSimpleLogger(){
083 public void log(final String message) { Main.getOut().println(message); }
084 };
085
086 /**Temporary directory in which we build a cache; null if not needed.
087 * We only chose a directory name
088 * (which we store in absolute form in case we change working directory)
089 * and delete any existing content before and after the tests run.
090 * The test routines must create the directory if they want it.
091 */
092 private File tempDir;
093
094 /**Timer that we can use to run pipelines; never null during tests. */
095 private Timer t;
096
097 /**Do any setup needed for the tests. */
098 @Override
099 protected void setUp()
100 {
101 // Work in a temporary subdirectory of the current directory.
102 tempDir = (new File("CacheTst.tmp")).getAbsoluteFile();
103
104 // Make sure that we obliterate any extant content.
105 final File f = tempDir;
106 try { FileTools.rmRecursively(f); }
107 catch(final IOException e) { e.printStackTrace(Main.getErr()); }
108
109 t = new Timer();
110 }
111
112 /**Do any clear-up needed after the tests. */
113 @Override
114 protected void tearDown()
115 {
116 final File f = tempDir;
117 if(f != null)
118 {
119 try { FileTools.rmRecursively(f); }
120 catch(final IOException e) { e.printStackTrace(Main.getErr()); }
121 }
122
123 // Kill the timer and free resources...
124 t.cancel();
125 t = null;
126 }
127
128 /**Dummy data source.
129 * Has one or more fake "virtual" exhibits,
130 * each of which contain some synthetic virtual data.
131 * <p>
132 * This can be wrapped to make a sort of "mock object"
133 * that can flag certain undesirable access patterns as test failures.
134 * <p>
135 * This returns data based on the exhibit data supplied to the constructor.
136 * <p>
137 * This never returns thumbnails.
138 */
139 public static class DummyDataSource implements SimpleExhibitPipelineIF
140 {
141 /**Construct a new instance with the given properties.
142 * @param aep set of exhibit properties; never null
143 * @param gp set of GenProps; may be null
144 */
145 public DummyDataSource(final AllExhibitProperties aep,
146 final GenProps gp)
147 {
148 if(aep == null)
149 { throw new IllegalArgumentException(); }
150
151 this.aep = aep;
152 this.gp = gp;
153 }
154
155 /**Construct a new instance with empty properties. */
156 public DummyDataSource()
157 { this(new AllExhibitProperties(), new GenProps()); }
158
159 /**The set of exhibit properties; never null. */
160 private final AllExhibitProperties aep;
161
162 /**Test set of GenProps; may be null. */
163 private final GenProps gp;
164
165 public ExhibitStaticAttr getStaticAttr(final ExhibitFull name)
166 throws IOException
167 {
168 return(aep.aeid.getStaticAttr(name));
169 }
170
171 /**Minimalist implementation; not really the spirit of the contract.
172 * May or may not bother to check arguments
173 * or even fill in the data buffer.
174 */
175 public void getRawFile(final ByteBuffer buf,
176 final Name.ExhibitFull exhibitName, final int position, final boolean dontCache)
177 throws IOException
178 {
179 }
180
181 public AllExhibitImmutableData getAllExhibitImmutableData(final long oldStamp)
182 throws IOException
183 {
184 if(oldStamp != aep.aeid.timestamp)
185 { return(aep.aeid); }
186
187 // Caller has up-to-date copy.
188 return(null);
189 }
190
191 public AllExhibitProperties getAllExhibitProperties(final long oldHash)
192 throws IOException
193 {
194 if(oldHash != aep.longHash)
195 { return(aep); }
196
197 // Caller has up-to-date copy.
198 return(null);
199 }
200
201 /**Returns fixed (supplied) set of GenProps. */
202 public GenProps getGenProps(final long oldStamp)
203 throws IOException
204 {
205 return(gp);
206 }
207
208 /**Always returns empty Properties set. */
209 public Properties getGenSecProps(final long oldStamp)
210 throws IOException
211 {
212 return(new Properties());
213 }
214
215 /**Always returns null, ie we have no thumbnails. */
216 public ExhibitThumbnails getThumbnails(final ExhibitFull name, final boolean create)
217 throws IOException
218 {
219 return(null);
220 }
221
222 /**Basic "endpoint" set of variables. */
223 private final BasicVarMgr vars = new BasicVarMgr(GenUtils.nullLogger, true);
224
225 public void setVariable(final SimpleVariableValue newValue)
226 throws IOException
227 {
228 vars.setVariable(newValue);
229 }
230
231 public int setVariables(final SimpleVariableValue[] newValues)
232 throws IOException
233 {
234 return(vars.setVariables(newValues));
235 }
236
237 public SimpleVariableValue getVariable(final SimpleVariableDefinition var)
238 throws IOException
239 {
240 return(vars.getVariable(var));
241 }
242
243 public SimpleVariableValue[] getVariables(final long changedSince)
244 throws IOException
245 {
246 return(vars.getVariables(changedSince));
247 }
248
249 public EventVariableValue getEventValue(final SimpleVariableDefinition def,
250 final EventPeriod intervalSelector,
251 final boolean current)
252 {
253 return(vars.getEventValue(def, intervalSelector, current));
254 }
255
256 public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
257 final EventPeriod intervalSelector,
258 final long intervalNumber,
259 final BitSet whichValues)
260 {
261 return(vars.getEventValues(def, intervalSelector, intervalNumber, whichValues));
262 }
263
264 /**Synchronise with upstream values.
265 * Does nothing in this implementation.
266 */
267 public void syncVariables(final boolean force)
268 throws IOException
269 {
270 }
271
272 /**Get requested Properties selected by key and versionID.
273 * Fetches a Properties set unconditionally (versionID == -1)
274 * else if the versionID presented is not current.
275 *
276 * @param key selector (with possible embedded sub-key)
277 * for desired properties set; never null
278 * @param versionID if -1 then map is always returned if available,
279 * else must be non-negative and null is returned if the versionID
280 * presented matches that of the current version
281 * (ie if the caller has presumably got the up-to-date version);
282 * may be a timestamp or a hash or other value,
283 * and by convention is zero only for an empty properties set
284 *
285 * @return null, or Properties map guaranteed to contain only
286 * String keys and values
287 */
288 public java.util.Properties getProperties(final PropsKey key,
289 final long versionID)
290 throws IOException
291 {
292 throw new IOException("NOT IMPLEMENTED");
293 }
294
295 /**Polling does nothing in this implementation. */
296 public void poll(final GenProps gp)
297 throws IOException
298 {
299 }
300
301 public Stratum getStratum()
302 throws IOException
303 {
304 throw new IOException("NOT IMPLEMENTED"); // FIXME: maybe should return 'UNKNOWN'?
305 }
306
307 /**Does nothing in this implementation. */
308 public void destroy()
309 {
310 }
311 }
312
313 /**Internal test that DummyDataSource behaves as expected.
314 * Essentially we ensure that basic routines return
315 * legitimate non-null values.
316 */
317 public static void testDummyDataSource()
318 throws Exception
319 {
320 // Ensure that we can create a dummy source...
321 final DummyDataSource dds = new DummyDataSource();
322
323 assertNotNull("Fetched GenProps should not be null",
324 dds.getGenProps(-1));
325 assertNotNull("Fetched getAllExhibitProperties should not be null",
326 dds.getAllExhibitProperties(-1));
327 assertNotNull("Should be able to get system ID for variables",
328 dds.getVariable(SystemVariables.LOCAL_SYS_ID));
329 }
330
331 /**Check that we can construct a cache wrapped round a (dummy) source.
332 * And check that we can do some basic superficial operations on it.
333 * <p>
334 * Look elsewhere for a thorough test to behaviour.
335 */
336 public void testBasicCacheBehaviour()
337 throws Exception
338 {
339 assertTrue("Must be no extant temporary/test cache dir.", !tempDir.exists());
340
341 // Check that trying to remove a cache from a non-existent directory
342 // does not fail in an ugly way.
343 ExhibitDataSimpleCache.rmCache(tempDir);
344
345 // Check that a null argument to rmCache() is rejected.
346 try
347 {
348 ExhibitDataSimpleCache.rmCache(null);
349 fail("rmCache() should reject null argument");
350 }
351 catch(final IllegalArgumentException e) { /* Correctly rejected. */ }
352
353 // Set up new, empty, test cache directory.
354 tempDir.mkdirs();
355 assertTrue("Must be able to create temporary/test cache dir.", tempDir.isDirectory());
356
357 // Check that removing an empty cache does not fail.
358 ExhibitDataSimpleCache.rmCache(tempDir);
359 // Check that removing an empty cache directory does not
360 // remove the top-level (containing) directory.
361 assertTrue("Removing cache should not remove containing temporary/test cache dir.", tempDir.isDirectory());
362
363 // Check that we disallow creating a cache with a null up-stream source.
364 try
365 {
366 ExhibitDataSimpleCache.cacheFactory(null, tempDir, logger);
367 fail("should not allow creation of cache with null data source");
368 }
369 catch(final IllegalArgumentException e) { /* Rejected as expected. */ }
370
371 // Create a dummy data source...
372 final DummyDataSource dds = new DummyDataSource();
373
374 // Check that we disallow creating a cache with a null cache dir.
375 try
376 {
377 ExhibitDataSimpleCache.cacheFactory(dds, null, logger);
378 fail("should not allow creation of cache with null cache dir");
379 }
380 catch(final IllegalArgumentException e) { /* Rejected as expected. */ }
381
382 // Attempt to create a new cache
383 // wrapped around the dummy source...
384 final ExhibitDataSimpleCache c1 = ExhibitDataSimpleCache.cacheFactory(
385 dds, tempDir, logger);
386 assertNotNull("Failed to create a cache instance", c1);
387
388 // Check that some basic superficial operations work.
389 assertNotNull("Fetched GenProps should not be null",
390 c1.getGenProps(-1));
391 assertNotNull("Fetched getAllExhibitProperties should not be null",
392 c1.getAllExhibitProperties(-1));
393 assertNotNull("Should be able to get system ID for variables",
394 c1.getVariable(SystemVariables.LOCAL_SYS_ID));
395
396 // Check that system ID has come properly from upstream.
397 assertSame("System ID should have come from upstream",
398 dds.getVariable(SystemVariables.LOCAL_SYS_ID),
399 c1.getVariable(SystemVariables.LOCAL_SYS_ID));
400 }
401
402 /**Test that variables work properly with the cache.
403 * This is just to make sure that its semantics are correct.
404 */
405 public void testCacheVarHandling()
406 throws Exception
407 {
408 // Set up new, empty, test cache directory.
409 tempDir.mkdirs();
410
411 // Create a dummy data source...
412 final DummyDataSource dds1 = new DummyDataSource();
413
414 // Attempt to create a new cache (c1)
415 // wrapped around the dummy source...
416 final ExhibitDataSimpleCache c1 = ExhibitDataSimpleCache.cacheFactory(
417 dds1, tempDir, logger);
418 assertNotNull("Failed to create a cache instance", c1);
419
420 // Start polling at a typical rate...
421 final TimerTask tt1 = new TimerTask(){
422 @Override
423 public void run()
424 {
425 final GenProps gp = c1.getGenProps(-1);
426 try { c1.poll(gp); }
427 catch(final Throwable t) { t.printStackTrace(Main.getErr()); }
428 }
429 };
430 t.schedule(tt1, 0L, 100 + rnd.nextInt(700));
431
432 // Spend at most a few seconds testing behaviour...
433 final SimpleExhibitPipelineIF pipeline[] = { c1, dds1 };
434 SystemVariablesTest.singlePipelineTest(pipeline,
435 10000 + System.currentTimeMillis()); // 10s test.
436
437 // Stop poll activity for c1; it effectively dies now.
438 tt1.cancel();
439 }
440
441 /**Test cacheing and resilience after restart.
442 * This test attempts to test that precacheing works
443 * and that the cache remains intact after a restart.
444 * <p>
445 * This relies on the test data set being present
446 * and being easily small enough to fit into cache.
447 */
448 public void testCachePersistence()
449 throws Exception
450 {
451 // Set up new, empty, test cache directory.
452 tempDir.mkdirs();
453
454 // Combined "trace" of all relevant events so far, in order.
455 final List<RawDataAccessMonitor.GetRawDataEvent> events = new ArrayList<RawDataAccessMonitor.GetRawDataEvent>();
456
457 // Target time to stop first round of tests.
458 final long stopBy1 = System.currentTimeMillis() + 15000L; // 15s...
459
460 // Running count of cached exhibits...
461 // Must never go down
462 // (on the assumption that our test data set is
463 // MUCH smaller than our configured cache size).
464 int exhibitsCached = 0;
465
466 // TEST FIRST INSTANCE OF CACHE...
467 // Starting with an absolutely fresh, empty cache area...
468 Main.getOut().println("Creating/testing new/empty cache...");
469 exhibitsCached = checkCacheInstance(true, exhibitsCached, events,
470 stopBy1);
471
472 if(false) // CODE DISABLED AS CURRENTLY SEEMS TO BE BROKEN
473 {
474 // Target time for remaining (re)tests...
475 final long stopBy = System.currentTimeMillis() + 30000L;
476
477 // At least once test that any new cache sees all the previously-cached
478 // data, ie does not lose it on shut-down or can reconstruct it.
479 for(int i = 5; --i >= 0; ) // Try at most a few times...
480 {
481 Main.getOut().println("[Exhibits partly/fully cached: "+exhibitsCached+"...]");
482
483 // TEST NEW INSTANCE OF CACHE OBJECT IN SAME DISC AREA...
484 // Make sure that it does not lose data from the previous run.
485 Main.getOut().println("Re-testing exiting cache...");
486 exhibitsCached = checkCacheInstance(false, exhibitsCached, events,
487 stopBy);
488
489 if(System.currentTimeMillis() >= stopBy) { break; }
490 }
491 }
492 }
493
494 /**Thread-safe monitor of data-read activity.
495 * An instance of this class can be inserted into a pipeline
496 * to record data-read activity.
497 * <p>
498 * This filter stage does not generally impede normal operation,
499 * but simply records the data access pattern for processing.
500 */
501 static final class RawDataAccessMonitor extends SimpleExhibitPipelineFilter
502 {
503 /**Construct instance wrapped round upstream source.
504 * @param upstream datasource; never null
505 */
506 RawDataAccessMonitor(final SimpleExhibitPipelineIF upstream)
507 { super(upstream); }
508
509 /**A List of GetRawDataEvent values in the order they occurred.
510 * A Vector is used for thread-safety.
511 */
512 private final List<GetRawDataEvent> events = new Vector<GetRawDataEvent>();
513
514 /**A "getRawData()" event.
515 * Immutable.
516 * <p>
517 * Records the time, stack trace, exhibit name, start and length.
518 */
519 static final class GetRawDataEvent
520 {
521 public GetRawDataEvent(final Name.ExhibitFull name, final int start, final int len)
522 {
523 this.name = name;
524 this.start = start;
525 this.len = len;
526 }
527
528 /**Time at which event happened. */
529 public final long timestamp = System.currentTimeMillis();
530
531 /**Stack trace when event happened. */
532 private final Throwable th = new Throwable();
533 /**Print the stack trace to the specified stream. */
534 public void printStackTrace(final PrintStream ps)
535 { th.printStackTrace(ps); }
536
537 /**Name of exhibit concerned. */
538 public final Name.ExhibitFull name;
539 /**Start offset of read within exhibit. */
540 public final int start;
541 /**Length of read within exhibit. */
542 public final int len;
543 }
544
545 /**Read a chunk of the raw exhibit binary into the given buffer.
546 * This passes the request upstream without modification,
547 * but records it in a trace buffer <em>before</em> passing it upstream.
548 * The trace is taken before in case the fulfillment of the request
549 * happens to be vetoed by the data source; it is the requests that
550 * we are interested in here.
551 */
552 @Override
553 public void getRawFile(final ByteBuffer buf,
554 final Name.ExhibitFull exhibitName, final int position, final boolean dontCache)
555 throws IOException
556 {
557 events.add(new GetRawDataEvent(exhibitName, position, buf.limit() - buf.position()));
558 super.getRawFile(buf, exhibitName, position, dontCache);
559 }
560
561 /**Retrieve the List of events in order; never null though possibly empty.
562 * This returns a private snapshot of the event list.
563 */
564 public List<GetRawDataEvent> getEventList()
565 {
566 // Hold a lock on events while we take the snapshot...
567 synchronized(events)
568 {
569 return(new ArrayList<GetRawDataEvent>(events));
570 }
571 }
572 }
573
574 /**Check behaviour of single cache instance, mainly for persistence.
575 * This checks for different things on the first and subsequent runs,
576 * but in all cases makes sure that the cache behaves properly
577 * and does not lose data upon restart,
578 * whether it gets shut down properly or not.
579 * <p>
580 * The number of exhibits partially or fully cached must not go down
581 * and must be greater than zero when we return if there are exhibits.
582 * <p>
583 * This tests the cache's use of its upstream data source
584 * knowing that this should read and cache exhibit prefixes
585 * (ie it will/may only cache activity that reads starting at the
586 * beginning of an exhibit, as a normal download should),
587 * and that having read such a prefix it should never have to go
588 * back to the underlying source for anything within that prefix again,
589 * even when the cache is shut down an restarted,
590 * assuming that our cache area is big enough to hold all the exhibits.
591 *
592 * @param firstRun if true, this is the first run on a new cache area
593 * @param prevCachedCount number of exhibits previously cached;
594 * non-negative
595 * @param eventsFromTestStart List of events collected so far in order;
596 * we will append to this (the content is opaque to the caller);
597 * must be non-null and appendable-to
598 * @param stopBy target time to stop by
599 *
600 * @return number of exhibits partially or fully cached; non-negative
601 *
602 * @throws IOException
603 * @throws InterruptedException
604 */
605 private int checkCacheInstance(final boolean firstRun,
606 final int prevCachedCount,
607 final List<RawDataAccessMonitor.GetRawDataEvent> eventsFromTestStart,
608 final long stopBy)
609 throws IOException,
610 InterruptedException
611 {
612 // Set up a simple instance of a file data source.
613 final ExhibitDataFileSource edfs = new ExhibitDataFileSource(logger);
614
615 // Set up a data-access monitoring stage.
616 // We use this to monitor the effectiveness of the cache.
617 final RawDataAccessMonitor rdam = new RawDataAccessMonitor(edfs);
618
619 // Set up a simple cache instance.
620 final ExhibitDataSimpleCache cache = ExhibitDataSimpleCache.cacheFactory(
621 rdam, tempDir, logger);
622 assertNotNull("Failed to create"+(firstRun?" first":"")+" cache instance", cache);
623
624
625 // Check some features of the cache before the first poll()...
626
627 // Collect the raw exhibit info from the underlying file system.
628 final AllExhibitProperties aepRaw = edfs.getAllExhibitProperties(-1);
629 // When re-using an existing cache directory,
630 // we assume that the exhibit properties are immediately available
631 // before even a single poll.
632 if(!firstRun)
633 {
634 assertEquals("Exhibit properties via cache must match those fetched directly even before first poll() is cache is not new",
635 aepRaw, cache.getAllExhibitProperties(-1));
636 }
637
638 // Set up data pipeline.
639 final SimpleExhibitPipelineIF pipeline[] = { cache, rdam, edfs };
640 final SimpleExhibitPipelineIF top = pipeline[0];
641
642
643 // Start poll()ing the pipeline at a typical rate...
644 // Vary the poll rate randomly a little
645 // in order to demonstrate that we are not very sensitive to it.
646 // Note that cache implementation does not necessarily
647 // use all poll() calls for real processing...
648 final TimerTask tt = new TimerTask(){
649 @Override
650 public void run()
651 {
652 try
653 {
654 final GenProps gp = top.getGenProps(-1);
655 top.poll(gp);
656 }
657 catch(final Exception e) { e.printStackTrace(Main.getErr()); }
658 }
659 };
660 t.schedule(tt, 0L, 100 + rnd.nextInt(700));
661
662
663 // Make sure that we can see all exhibit data via the cache
664 // just like when we get it directly...
665 // We have forced one poll(),
666 // and that one poll() should be enough even on the first run
667 // to ensure that all data is up-to-date
668 // unless there is some sort of I/O error...
669 AllExhibitProperties aepViaCache = cache.getAllExhibitProperties(-1);
670 assertNotNull("Exhibit properties (aep) via cache must not be null",
671 aepViaCache);
672 // Wait until aep (and GenProps) are not at their default values,
673 // ie the cache has had time to fetch them.
674 // This must not take more than a few seconds however...
675 if(aepRaw.aeid.size() > 0)
676 {
677 // Wait a few seconds for the data to get loaded...
678 for(int i = 30; --i >= 0; )
679 {
680 aepViaCache = cache.getAllExhibitProperties(-1);
681 if(aepViaCache.aeid.size() > 0) { break; }
682 System.out.println("Waiting for cache to be reloaded...");
683 Thread.sleep(2000L);
684 }
685 assertTrue("Exhibit properties (aep) via cache must not be empty if underlying ones are not",
686 aepViaCache.aeid.size() > 0);
687 }
688 assertEquals("Exhibit properties (aep) via cache must match those fetched directly",
689 aepRaw, aepViaCache);
690 assertEquals("Exhibit properties (aeid) via cache must match those fetched directly",
691 edfs.getAllExhibitImmutableData(-1),
692 cache.getAllExhibitImmutableData(-1));
693
694 // Check that data is seen consistently on either side of the cache.
695 TunnelTest.checkExhibitDataConsistency(cache, edfs,
696 false, // Not a tunnel...
697 stopBy);
698
699 // Read the first byte of each exhibit
700 // to ensure that something should be cached for each.
701 // Have the "dontCache" flag false to try to ensure we cache the data.
702 final ByteBuffer buf = ByteBuffer.allocate(1);
703 for(final Name.ExhibitFull name: aepViaCache.aeid.getAllExhibitNamesSorted())
704 {
705 buf.clear();
706 cache.getRawFile(buf, name, 0, false);
707 }
708
709 // Now check the cached exhibit count...
710 cache.syncVariables(true); // Force our view to be up-to-date...
711 final SimpleVariableValue cc =
712 cache.getVariable(SystemVariables.ExhibitDataSimpleCache_CACHED_EXHIBIT_COUNT);
713 assertNotNull("Must be able to retrieve cached exhibit count",
714 cc);
715 assertNotNull("Must be able to retrieve non-null cached exhibit count",
716 cc.getValue());
717 final int cachedCountVar = ((Number) cc.getValue()).intValue();
718 assertTrue("Must be able to retrieve non-negative cached exhibit count",
719 cachedCountVar >= 0);
720
721 // The number of (partly/fully) cached exhibits must not go down;
722 // at worst we must be able to retrieve the previous
723 // possibly-slightly-stale meta-data from disc
724 // and/or reconstruct it from the data we actually find in place.
725 final int cachedCount = cache.getLiveCachedExhibitCount();
726 assertTrue("Number of cached exhibits must not go down",
727 cachedCount >= prevCachedCount);
728
729 // Get the live raw count of exhibits partly/fully cached.
730 if(aepRaw.aeid.size() > 0)
731 {
732 assertTrue("The number of exhibits cached after some data has been retrieved must be greater than zero if there are exhibits",
733 cachedCount > 0);
734 }
735
736
737 // if(firstRun)
738 // {
739 // // Test variable-handling behaviour...
740 // // (We must be calling poll() by now.)
741 // SystemVariablesTest.singlePipelineTest(pipeline, stopBy);
742 // }
743
744
745 // Kill off this instance.
746 // It will get no more poll() calls so is effectively dead.
747 tt.cancel();
748
749 // If we are feeling nice then we'll properly destroy() it
750 // to allow data to be flushed to disc.
751 /* if(rnd.nextBoolean()) */ { cache.destroy(); }
752
753 // Now examine the upstream activity trace from the current run...
754 // We make sure that the following bad/marginal things do not happen:
755 // * No requests for non-existent exhibits.
756 // * No requests from beyond exhibit bounds.
757 // * No zero-length (or other badly-formed) requests...
758 // Event count before this run.
759 final int oldEventCount = eventsFromTestStart.size();
760 Main.getOut().println("[getRawData() events before this run: "+oldEventCount+".]");
761 final List<RawDataAccessMonitor.GetRawDataEvent> eventsThisTime = rdam.getEventList();
762 Main.getOut().println("[getRawData() events counted this run: "+eventsThisTime.size()+".]");
763 for(final Iterator<RawDataAccessMonitor.GetRawDataEvent> it = eventsThisTime.iterator(); it.hasNext(); )
764 {
765 final RawDataAccessMonitor.GetRawDataEvent e =
766 it.next();
767
768 // Look for obviously badly-formed requests.
769 if((e.name == null) || (e.start < 0) || (e.len < 0))
770 { fail("Badly formed getRawData() request detected (null/-ve args)."); }
771 if(e.len == 0)
772 { fail("Marginal/deprecated getRawData() request detected (len==0)."); }
773 final ExhibitStaticAttr staticAttr = aepRaw.aeid.getStaticAttr(e.name);
774 if(staticAttr == null)
775 { fail("Bad getRawData() request detected (no such exhibit)."); }
776 if((e.start >= staticAttr.length))
777 { fail("Bad getRawData() request detected (start >= length)."); }
778 if((e.start + e.len > staticAttr.length))
779 { fail("Bad getRawData() request detected (start+len > length)."); }
780 }
781
782 // Append this run's events to the running tally.
783 eventsFromTestStart.addAll(eventsThisTime);
784
785 // Check that the cache has not "forgotten" anything after a restart.
786 // Ensure that exhibit prefixes that have already been fetched
787 // are not refetched...
788 // But we do it on all runs in case we are generally amnesiac...
789 // if(!firstRun)
790 // if(true) { System.err.println("WARNING: skipping refetch tests"); } else
791 {
792 // Map from full exhibit name
793 // to high-water mark that should have been cached (Long).
794 final Map<Name.ExhibitFull,Long> marks = new HashMap<Name.ExhibitFull, Long>();
795
796 int eventNum = 1;
797 for(final Iterator<RawDataAccessMonitor.GetRawDataEvent> it = eventsFromTestStart.iterator(); it.hasNext(); ++eventNum)
798 {
799 final RawDataAccessMonitor.GetRawDataEvent e =
800 it.next();
801
802 // Get any extant entry for the exhibit
803 // that is the subject of this event...
804 // If no mark so far then we have cached zero bytes so far.
805 final Long cachedSoFarL = marks.get(e.name);
806 final long cachedSoFar = (cachedSoFarL == null) ?
807 0 : cachedSoFarL.longValue();
808
809 // Create a description of the current event,
810 // just in case we need to print it...
811 final StringBuilder sb = new StringBuilder(32);
812 if(eventNum >= oldEventCount)
813 { sb.append("NEW,"); }
814 sb.append("event#=").append(eventNum);
815 sb.append(",cachedSoFar=").append(cachedSoFar);
816 sb.append(",e.start=").append(e.start);
817 sb.append(",e.len=").append(e.len);
818 sb.append(",e.name=").append(e.name);
819 final String description = sb.toString();
820
821 // If any part of this request should have been
822 // locally satisfied from cache,
823 // it is an error.
824 if(e.start < cachedSoFar)
825 {
826 Main.getErr().println("ERROR: invalid refetch: " + description);
827 e.printStackTrace(Main.getErr());
828 fail("Re-fetched data from source that should still be in cache: ("+description+")");
829 }
830
831 // If this request starts with a gap from the
832 // currently-cached prefix then it is a random access
833 // and will not be cached...
834 if(e.start > cachedSoFar)
835 {
836 // Main.getOut().println("Random read assumed not added to cache... ("+description+")");
837 continue;
838 }
839
840 // If this read extends the cached prefix,
841 // update the mark entry for this exhibit.
842 final int readTo = e.start + e.len;
843 if(readTo > cachedSoFar)
844 {
845 // Main.getOut().println("Extended cache... ("+description+")");
846 marks.put(e.name, new Long(readTo));
847 }
848 }
849 }
850
851 // Return cached exhibit count.
852 return(cachedCount);
853 }
854
855
856
857 /**Thread-safe monitor of data-read activity.
858 * An instance of this class can be inserted into a pipeline
859 * to record or modify data-read activity.
860 */
861 static final class DataConcurrencyAccessMonitor extends SimpleExhibitPipelineFilter
862 {
863 DataConcurrencyAccessMonitor(final SimpleExhibitPipelineIF upstream)
864 { super(upstream); }
865
866 /**Once true, threads do not block in getRawFile(); volatile so as not to need the mutex. */
867 private volatile boolean dontBlock;
868
869 /**Count of blocked thread in getRawData().
870 * Must only be accessed under protection of the notifier mutex.
871 */
872 private int blockedThreadsCount;
873
874 /**Get count of blocked thread in getRawData(); non-negative. */
875 private int getBlockedThreadsCount() { synchronized(notifier) { return(blockedThreadsCount); } }
876
877 /**Used to notify/wake blocked threads. */
878 private final Object notifier = new Object();
879
880 /**Call this to release any waiting/blocked threads in getRawFile(). */
881 void release()
882 {
883 dontBlock = true;
884 synchronized(notifier)
885 {
886 notifier.notifyAll(); // Wake everyone up...
887 }
888 }
889
890 /**Intercept and block calls as if blocked waiting for disc access. */
891 @Override
892 public void getRawFile(final ByteBuffer buf, final Name.ExhibitFull exhibitName, final int position, final boolean dontCache) throws IOException
893 {
894 // Block until released.
895 try
896 {
897 synchronized(notifier)
898 {
899 //(new Throwable("BLOCKING: blockedThreadsCount = " + blockedThreadsCount)).printStackTrace(System.out);
900 ++blockedThreadsCount;
901 while(!dontBlock)
902 { notifier.wait(501 + rnd.nextInt(501)); }
903 --blockedThreadsCount;
904 //(new Throwable("UNBLOCKING: blockedThreadsCount = " + blockedThreadsCount)).printStackTrace(System.out);
905 }
906 }
907 catch(final InterruptedException e)
908 {
909 throw new IllegalMonitorStateException("Unexpected wakeup");
910 }
911
912 // Really do the call...
913 super.getRawFile(buf, exhibitName, position, dontCache);
914 }
915 }
916
917
918 /**Test that cache has indefinite (file read) concurrency.
919 * The aim is to demonstrate that we can scale well to serve exhibit data,
920 * eg on a highly-threadable system such as Niagara/CoolThreads.
921 */
922 public void testCacheFileReadConcurrency()
923 throws Exception
924 {
925 // Pick how many threads that we will try to push through the cache.
926 final int nThreads = 30 + rnd.nextInt(50);
927
928 // Set up new, empty, test cache directory.
929 tempDir.mkdirs();
930
931 // Set up a simple instance of a file data source.
932 final ExhibitDataFileSource edfs = new ExhibitDataFileSource(logger);
933
934 // Set up a data-access monitoring stage.
935 // We use this to monitor the effectiveness of the cache.
936 final DataConcurrencyAccessMonitor dcam = new DataConcurrencyAccessMonitor(edfs);
937
938 // Set up a simple cache instance.
939 final ExhibitDataSimpleCache cache = ExhibitDataSimpleCache.cacheFactory(
940 dcam, tempDir, logger);
941 assertNotNull("Failed to create cache instance", cache);
942
943 // Collect the raw exhibit info from the underlying file system.
944 final AllExhibitProperties aepRaw = edfs.getAllExhibitProperties(-1);
945
946 // Start poll()ing the pipeline at a typical rate...
947 // Vary the poll rate randomly a little
948 // in order to demonstrate that we are not very sensitive to it.
949 // Note that cache implementation does not necessarily
950 // use all poll() calls for real processing...
951 final TimerTask tt = new TimerTask(){
952 @Override
953 public void run()
954 {
955 try
956 {
957 final GenProps gp = cache.getGenProps(-1);
958 cache.poll(gp);
959 }
960 catch(final Exception e) { e.printStackTrace(Main.getErr()); }
961 }
962 };
963 t.schedule(tt, 0L, 100 + rnd.nextInt(700));
964
965 // Wait until the cache has loaded the exhibit set.
966 final long waitStartTime = System.currentTimeMillis();
967 do
968 {
969 Main.getOut().println("Waiting for cache to load AEP... ("+(System.currentTimeMillis()-waitStartTime)+"ms)");
970 Thread.sleep(1000);
971 } while(cache.getAllExhibitProperties(-1).aeid.size() != aepRaw.aeid.size());
972
973 // A worker class to read a chunk of data via the cache
974 // any verify it against the underlying data.
975 final class ReaderThread extends Thread
976 {
977 /**Read some data from the named exhibit. */
978 ReaderThread(final Name.ExhibitFull exhibitName)
979 {
980 super("ReaderThread for " + exhibitName);
981 exhibit = exhibitName;
982 }
983
984 /**Full name of exhibit to read from; never null. */
985 private final Name.ExhibitFull exhibit;
986
987 /**Set true iff completed OK. */
988 volatile boolean completedOK;
989
990 @Override
991 public final void run()
992 {
993 final ExhibitStaticAttr esa = aepRaw.aeid.getStaticAttr(exhibit);
994 assertNotNull(esa);
995 assertTrue(aepRaw.aeid.isPresent(exhibit));
996
997 // Often start read at zero, to get the data cached.
998 final int start = rnd.nextBoolean() ? 0 : rnd.nextInt((int) esa.length);
999 // Limit length of read to reasonable block size.
1000 final int length = rnd.nextInt(Math.min(16384, (int) esa.length - start));
1001
1002 try
1003 {
1004 final ByteBuffer buf1 = ByteBuffer.allocate(length);
1005 cache.getRawFile(buf1, exhibit, 0, rnd.nextBoolean());
1006
1007 final ByteBuffer buf2 = ByteBuffer.allocate(length);
1008 edfs.getRawFile(buf2, exhibit, 0, rnd.nextBoolean());
1009
1010 // If the data was the same through the cache as raw
1011 // then we succeeded.
1012 if(buf1.equals(buf2))
1013 { completedOK = true; }
1014 else
1015 { Main.getErr().println("Failed for: " + exhibit); }
1016 }
1017 catch(final Exception e)
1018 { e.printStackTrace(); /* Whoops! */ }
1019 }
1020 }
1021
1022 // Manage a group of workers to use the cache.
1023 final List<ReaderThread> workers = new ArrayList<ReaderThread>(nThreads);
1024
1025 try
1026 {
1027 // Start one reader/worker thread per exhibit.
1028 // All these should be able to run concurrently,
1029 // reaching through the cache to the underlying file source,
1030 // as each will be the first read of this exhibit
1031 // with no data in the cache for it.
1032 for(final Name.ExhibitFull exhibitName : aepRaw.aeid.getAllExhibitNamesSorted())
1033 {
1034 if(workers.size() >= nThreads) { break; }
1035
1036 final ReaderThread rt = new ReaderThread(exhibitName);
1037 workers.add(rt);
1038 rt.setPriority(Thread.MAX_PRIORITY); // Should run before us...
1039 rt.start();
1040 Thread.yield();
1041 }
1042
1043 // Be prepared to sleep a little for all threads to block...
1044 mainLoop: for(int i = 9+(5*workers.size()); (workers.size() > dcam.getBlockedThreadsCount()) && (--i >= 0); )
1045 {
1046 Main.getOut().println("Waiting for threads to block/settle... ("+(System.currentTimeMillis()-waitStartTime)+"ms)");
1047
1048 // Look for any prematurely-finished workers, and zap them.
1049 for(final Iterator<ReaderThread> rtIt = workers.iterator(); rtIt.hasNext(); )
1050 {
1051 final ReaderThread rt = rtIt.next();
1052 if(!rt.isAlive())
1053 {
1054 Main.getErr().println("WARNING: worker exited early.");
1055 rtIt.remove();
1056 continue mainLoop;
1057 }
1058 }
1059
1060 Thread.sleep(1000);
1061 }
1062
1063 // Check that new threads hit the back-end
1064 // even if we have blocked all previous threads in the back-end,
1065 // ie that the cache is not a concurrency bottleneck.
1066 final int initialWorkerCount = workers.size();
1067 assertEquals("All threads must be blocked in the getRawFile() interceptor",
1068 initialWorkerCount, dcam.getBlockedThreadsCount());
1069
1070 if(workers.size() < nThreads)
1071 {
1072 Main.getOut().println("Starting extra reader threads... ("+(System.currentTimeMillis()-waitStartTime)+"ms)");
1073
1074 // Set off remaining threads for random exhibits...
1075 // These may not all immediately be allowed through by the cache,
1076 // but they should all work correctly.
1077 final List<Name.ExhibitFull> names = aepRaw.aeid.getAllExhibitNamesSorted();
1078 while(workers.size() < nThreads)
1079 {
1080 final ReaderThread rt = new ReaderThread(names.get(rnd.nextInt(names.size())));
1081 workers.add(rt);
1082 rt.start();
1083 }
1084
1085 Thread.yield(); // Give the new threads a chance to start...
1086
1087 // Check that the blocked-threads count is still reasonable.
1088 assertTrue("All initial worker threads must (still) be blocked in the getRawFile() interceptor",
1089 initialWorkerCount <= dcam.getBlockedThreadsCount());
1090 }
1091 }
1092 finally
1093 {
1094 final long releaseStart = System.currentTimeMillis();
1095 final long timeSoFar = (releaseStart-waitStartTime);
1096 Main.getOut().println("Releasing threads... ("+timeSoFar+"ms)");
1097
1098 dcam.release(); // Let the threads go...
1099 Thread.yield(); // Give them a sporting chance to execute...
1100 Main.getOut().println("Released threads...");
1101
1102 // Wait for all the worker/reader threads and test their status.
1103 // Allow a fair amount of time for thread thrashing, etc.
1104 // We make this slightly adaptive to the speed of this machine/JVM.
1105 final long maxWaitTime = 60001 + (timeSoFar/2);
1106 final long targetFinishTime = releaseStart + maxWaitTime;
1107 final long hopedForFinishTime = releaseStart + maxWaitTime/4;
1108 int failCount = 0;
1109 for(int i = workers.size(); --i >= 0; )
1110 {
1111 final ReaderThread readerThread = workers.get(i);
1112 // If we've already been waiting a while, print progress...
1113 final long now = System.currentTimeMillis();
1114 if(now > hopedForFinishTime)
1115 { Main.getOut().println("Waiting for worker thread "+i+"... ("+(now-waitStartTime)+"ms)"); }
1116 // We will not wait indefinitely...
1117 readerThread.join(Math.max(1001, targetFinishTime - now));
1118 if(!readerThread.completedOK)
1119 { ++failCount; }
1120 }
1121 if(failCount > 0) { fail("Workers failed: " + failCount + " out of " + workers.size()); }
1122 }
1123 }
1124
1125
1126 /**Private source of OK pseudo-random numbers. */
1127 private static final Random rnd = new Random();
1128 }