001 /*
002 Copyright (c) 1996-2011, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029
030 package org.hd.d.pg2k.test.dev;
031
032 import java.io.File;
033 import java.io.IOException;
034 import java.io.PrintStream;
035 import java.nio.ByteBuffer;
036 import java.util.ArrayList;
037 import java.util.BitSet;
038 import java.util.HashMap;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Properties;
043 import java.util.Random;
044 import java.util.Timer;
045 import java.util.TimerTask;
046 import java.util.Vector;
047
048 import junit.framework.TestCase;
049
050 import org.hd.d.pg2k.svrCore.AllExhibitImmutableData;
051 import org.hd.d.pg2k.svrCore.AllExhibitProperties;
052 import org.hd.d.pg2k.svrCore.ExhibitStaticAttr;
053 import org.hd.d.pg2k.svrCore.ExhibitThumbnails;
054 import org.hd.d.pg2k.svrCore.FileTools;
055 import org.hd.d.pg2k.svrCore.Name;
056 import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
057 import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
058 import org.hd.d.pg2k.svrCore.Stratum;
059 import org.hd.d.pg2k.svrCore.datasource.ExhibitDataFileSource;
060 import org.hd.d.pg2k.svrCore.datasource.ExhibitDataSimpleCache;
061 import org.hd.d.pg2k.svrCore.datasource.SimpleExhibitPipelineFilter;
062 import org.hd.d.pg2k.svrCore.datasource.SimpleExhibitPipelineIF;
063 import org.hd.d.pg2k.svrCore.props.GenProps;
064 import org.hd.d.pg2k.svrCore.vars.BasicVarMgr;
065 import org.hd.d.pg2k.svrCore.vars.EventPeriod;
066 import org.hd.d.pg2k.svrCore.vars.EventVariableValue;
067 import org.hd.d.pg2k.svrCore.vars.SimpleVariableDefinition;
068 import org.hd.d.pg2k.svrCore.vars.SimpleVariableValue;
069 import org.hd.d.pg2k.svrCore.vars.SystemVariables;
070
071 /**Test the ExhibitSimpleDataCache.
072 */
073 public final class SimpleCacheTest extends TestCase
074 {
075 public SimpleCacheTest(final String name)
076 {
077 super(name);
078 }
079
080 private final SimpleLoggerIF logger = new SimpleLoggerIF(){
081 public void log(final String message) { Main.getOut().println(message); }
082 };
083
084 /**Temporary directory in which we build a cache; null if not needed.
085 * We only chose a directory name
086 * (which we store in absolute form in case we change working directory)
087 * and delete any existing content before and after the tests run.
088 * The test routines must create the directory if they want it.
089 */
090 private File tempDir;
091
092 /**Timer that we can use to run pipelines; never null during tests. */
093 private Timer t;
094
095 /**Do any setup needed for the tests. */
096 @Override
097 protected void setUp()
098 {
099 // Work in a temporary subdirectory of the current directory.
100 tempDir = (new File("CacheTst.tmp")).getAbsoluteFile();
101
102 // Make sure that we obliterate any extant content.
103 final File f = tempDir;
104 try { FileTools.rmRecursively(f); }
105 catch(final IOException e) { e.printStackTrace(Main.getErr()); }
106
107 t = new Timer();
108 }
109
110 /**Do any clear-up needed after the tests. */
111 @Override
112 protected void tearDown()
113 {
114 final File f = tempDir;
115 if(f != null)
116 {
117 try { FileTools.rmRecursively(f); }
118 catch(final IOException e) { e.printStackTrace(Main.getErr()); }
119 }
120
121 // Kill the timer and free resources...
122 t.cancel();
123 t = null;
124 }
125
126 /**Dummy data source.
127 * Has one or more fake "virtual" exhibits,
128 * each of which contain some synthetic virtual data.
129 * <p>
130 * This can be wrapped to make a sort of "mock object"
131 * that can flag certain undesirable access patterns as test failures.
132 * <p>
133 * This returns data based on the exhibit data supplied to the constructor.
134 * <p>
135 * This never returns thumbnails.
136 */
137 public static class DummyDataSource implements SimpleExhibitPipelineIF
138 {
139 /**Construct a new instance with the given properties.
140 * @param aep set of exhibit properties; never null
141 * @param gp set of GenProps; may be null
142 */
143 public DummyDataSource(final AllExhibitProperties aep,
144 final GenProps gp)
145 {
146 if(aep == null)
147 { throw new IllegalArgumentException(); }
148
149 this.aep = aep;
150 this.gp = gp;
151 }
152
153 /**Construct a new instance with empty properties. */
154 public DummyDataSource()
155 { this(new AllExhibitProperties(), new GenProps()); }
156
157 /**The set of exhibit properties; never null. */
158 private final AllExhibitProperties aep;
159
160 /**Test set of GenProps; may be null. */
161 private final GenProps gp;
162
163 public ExhibitStaticAttr getStaticAttr(final ExhibitFull name)
164 throws IOException
165 {
166 return(aep.aeid.getStaticAttr(name));
167 }
168
169 /**Minimalist implementation; not really the spirit of the contract.
170 * May or may not bother to check arguments
171 * or even fill in the data buffer.
172 */
173 public void getRawFile(final ByteBuffer buf,
174 final Name.ExhibitFull exhibitName, final int position, final boolean dontCache)
175 throws IOException
176 {
177 }
178
179 public AllExhibitImmutableData getAllExhibitImmutableData(final long oldStamp)
180 throws IOException
181 {
182 if(oldStamp != aep.aeid.timestamp)
183 { return(aep.aeid); }
184
185 // Caller has up-to-date copy.
186 return(null);
187 }
188
189 public AllExhibitProperties getAllExhibitProperties(final long oldHash)
190 throws IOException
191 {
192 if(oldHash != aep.longHash)
193 { return(aep); }
194
195 // Caller has up-to-date copy.
196 return(null);
197 }
198
199 /**Returns fixed (supplied) set of GenProps. */
200 public GenProps getGenProps(final long oldStamp)
201 throws IOException
202 {
203 return(gp);
204 }
205
206 /**Always returns empty Properties set. */
207 public Properties getGenSecProps(final long oldStamp)
208 throws IOException
209 {
210 return(new Properties());
211 }
212
213 /**Always returns null, ie we have no thumbnails. */
214 public ExhibitThumbnails getThumbnails(final ExhibitFull name, final boolean create)
215 throws IOException
216 {
217 return(null);
218 }
219
220 /**Basic "endpoint" set of variables. */
221 private final BasicVarMgr vars = new BasicVarMgr(true);
222
223 public void setVariable(final SimpleVariableValue newValue)
224 throws IOException
225 {
226 vars.setVariable(newValue);
227 }
228
229 public int setVariables(final SimpleVariableValue[] newValues)
230 throws IOException
231 {
232 return(vars.setVariables(newValues));
233 }
234
235 public SimpleVariableValue getVariable(final SimpleVariableDefinition var)
236 throws IOException
237 {
238 return(vars.getVariable(var));
239 }
240
241 public SimpleVariableValue[] getVariables(final long changedSince)
242 throws IOException
243 {
244 return(vars.getVariables(changedSince));
245 }
246
247 public EventVariableValue getEventValue(final SimpleVariableDefinition def,
248 final EventPeriod intervalSelector,
249 final boolean current)
250 {
251 return(vars.getEventValue(def, intervalSelector, current));
252 }
253
254 public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
255 final EventPeriod intervalSelector,
256 final long intervalNumber,
257 final BitSet whichValues)
258 {
259 return(vars.getEventValues(def, intervalSelector, intervalNumber, whichValues));
260 }
261
262 /**Synchronise with upstream values.
263 * Does nothing in this implementation.
264 */
265 public void syncVariables(final boolean force)
266 throws IOException
267 {
268 }
269
270 /**Get requested Properties selected by key and versionID.
271 * Fetches a Properties set unconditionally (versionID == -1)
272 * else if the versionID presented is not current.
273 *
274 * @param key selector (with possible embedded sub-key)
275 * for desired properties set; never null
276 * @param versionID if -1 then map is always returned if available,
277 * else must be non-negative and null is returned if the versionID
278 * presented matches that of the current version
279 * (ie if the caller has presumably got the up-to-date version);
280 * may be a timestamp or a hash or other value,
281 * and by convention is zero only for an empty properties set
282 *
283 * @return null, or Properties map guaranteed to contain only
284 * String keys and values
285 */
286 public java.util.Properties getProperties(final PropsKey key,
287 final long versionID)
288 throws IOException
289 {
290 throw new IOException("NOT IMPLEMENTED");
291 }
292
293 /**Polling does nothing in this implementation. */
294 public void poll(final GenProps gp)
295 throws IOException
296 {
297 }
298
299 public Stratum getStratum()
300 throws IOException
301 {
302 throw new IOException("NOT IMPLEMENTED"); // FIXME: maybe should return 'UNKNOWN'?
303 }
304
305 /**Does nothing in this implementation. */
306 public void destroy()
307 {
308 }
309 }
310
311 /**Internal test that DummyDataSource behaves as expected.
312 * Essentially we ensure that basic routines return
313 * legitimate non-null values.
314 */
315 public static void testDummyDataSource()
316 throws Exception
317 {
318 // Ensure that we can create a dummy source...
319 final DummyDataSource dds = new DummyDataSource();
320
321 assertNotNull("Fetched GenProps should not be null",
322 dds.getGenProps(-1));
323 assertNotNull("Fetched getAllExhibitProperties should not be null",
324 dds.getAllExhibitProperties(-1));
325 assertNotNull("Should be able to get system ID for variables",
326 dds.getVariable(SystemVariables.LOCAL_SYS_ID));
327 }
328
329 /**Check that we can construct a cache wrapped round a (dummy) source.
330 * And check that we can do some basic superficial operations on it.
331 * <p>
332 * Look elsewhere for a thorough test to behaviour.
333 */
334 public void testBasicCacheBehaviour()
335 throws Exception
336 {
337 assertTrue("Must be no extant temporary/test cache dir.", !tempDir.exists());
338
339 // Check that trying to remove a cache from a non-existent directory
340 // does not fail in an ugly way.
341 ExhibitDataSimpleCache.rmCache(tempDir);
342
343 // Check that a null argument to rmCache() is rejected.
344 try
345 {
346 ExhibitDataSimpleCache.rmCache(null);
347 fail("rmCache() should reject null argument");
348 }
349 catch(final IllegalArgumentException e) { /* Correctly rejected. */ }
350
351 // Set up new, empty, test cache directory.
352 tempDir.mkdirs();
353 assertTrue("Must be able to create temporary/test cache dir.", tempDir.isDirectory());
354
355 // Check that removing an empty cache does not fail.
356 ExhibitDataSimpleCache.rmCache(tempDir);
357 // Check that removing an empty cache directory does not
358 // remove the top-level (containing) directory.
359 assertTrue("Removing cache should not remove containing temporary/test cache dir.", tempDir.isDirectory());
360
361 // Check that we disallow creating a cache with a null up-stream source.
362 try
363 {
364 ExhibitDataSimpleCache.cacheFactory(null, tempDir, logger);
365 fail("should not allow creation of cache with null data source");
366 }
367 catch(final IllegalArgumentException e) { /* Rejected as expected. */ }
368
369 // Create a dummy data source...
370 final DummyDataSource dds = new DummyDataSource();
371
372 // Check that we disallow creating a cache with a null cache dir.
373 try
374 {
375 ExhibitDataSimpleCache.cacheFactory(dds, null, logger);
376 fail("should not allow creation of cache with null cache dir");
377 }
378 catch(final IllegalArgumentException e) { /* Rejected as expected. */ }
379
380 // Attempt to create a new cache
381 // wrapped around the dummy source...
382 final ExhibitDataSimpleCache c1 = ExhibitDataSimpleCache.cacheFactory(
383 dds, tempDir, logger);
384 assertNotNull("Failed to create a cache instance", c1);
385
386 // Check that some basic superficial operations work.
387 assertNotNull("Fetched GenProps should not be null",
388 c1.getGenProps(-1));
389 assertNotNull("Fetched getAllExhibitProperties should not be null",
390 c1.getAllExhibitProperties(-1));
391 assertNotNull("Should be able to get system ID for variables",
392 c1.getVariable(SystemVariables.LOCAL_SYS_ID));
393
394 // Check that system ID has come properly from upstream.
395 assertSame("System ID should have come from upstream",
396 dds.getVariable(SystemVariables.LOCAL_SYS_ID),
397 c1.getVariable(SystemVariables.LOCAL_SYS_ID));
398 }
399
400 /**Test that variables work properly with the cache.
401 * This is just to make sure that its semantics are correct.
402 */
403 public void testCacheVarHandling()
404 throws Exception
405 {
406 // Set up new, empty, test cache directory.
407 tempDir.mkdirs();
408
409 // Create a dummy data source...
410 final DummyDataSource dds1 = new DummyDataSource();
411
412 // Attempt to create a new cache (c1)
413 // wrapped around the dummy source...
414 final ExhibitDataSimpleCache c1 = ExhibitDataSimpleCache.cacheFactory(
415 dds1, tempDir, logger);
416 assertNotNull("Failed to create a cache instance", c1);
417
418 // Start polling at a typical rate...
419 final TimerTask tt1 = new TimerTask(){
420 @Override
421 public void run()
422 {
423 final GenProps gp = c1.getGenProps(-1);
424 try { c1.poll(gp); }
425 catch(final Throwable t) { t.printStackTrace(Main.getErr()); }
426 }
427 };
428 t.schedule(tt1, 0L, 100 + rnd.nextInt(700));
429
430 // Spend at most a few seconds testing behaviour...
431 final SimpleExhibitPipelineIF pipeline[] = { c1, dds1 };
432 SystemVariablesTest.singlePipelineTest(pipeline,
433 10000 + System.currentTimeMillis()); // 10s test.
434
435 // Stop poll activity for c1; it effectively dies now.
436 tt1.cancel();
437 }
438
439 /**Test cacheing and resilience after restart.
440 * This test attempts to test that precacheing works
441 * and that the cache remains intact after a restart.
442 * <p>
443 * This relies on the test data set being present
444 * and being easily small enough to fit into cache.
445 */
446 public void testCachePersistence()
447 throws Exception
448 {
449 // Set up new, empty, test cache directory.
450 tempDir.mkdirs();
451
452 // Combined "trace" of all relevant events so far, in order.
453 final List<RawDataAccessMonitor.GetRawDataEvent> events = new ArrayList<RawDataAccessMonitor.GetRawDataEvent>();
454
455 // Target time to stop first round of tests.
456 final long stopBy1 = System.currentTimeMillis() + 15000L; // 15s...
457
458 // Running count of cached exhibits...
459 // Must never go down
460 // (on the assumption that our test data set is
461 // MUCH smaller than our configured cache size).
462 int exhibitsCached = 0;
463
464 // TEST FIRST INSTANCE OF CACHE...
465 // Starting with an absolutely fresh, empty cache area...
466 Main.getOut().println("Creating/testing new/empty cache...");
467 exhibitsCached = checkCacheInstance(true, exhibitsCached, events,
468 stopBy1);
469
470 if(false) // CODE DISABLED AS CURRENTLY SEEMS TO BE BROKEN
471 {
472 // Target time for remaining (re)tests...
473 final long stopBy = System.currentTimeMillis() + 30000L;
474
475 // At least once test that any new cache sees all the previously-cached
476 // data, ie does not lose it on shut-down or can reconstruct it.
477 for(int i = 5; --i >= 0; ) // Try at most a few times...
478 {
479 Main.getOut().println("[Exhibits partly/fully cached: "+exhibitsCached+"...]");
480
481 // TEST NEW INSTANCE OF CACHE OBJECT IN SAME DISC AREA...
482 // Make sure that it does not lose data from the previous run.
483 Main.getOut().println("Re-testing exiting cache...");
484 exhibitsCached = checkCacheInstance(false, exhibitsCached, events,
485 stopBy);
486
487 if(System.currentTimeMillis() >= stopBy) { break; }
488 }
489 }
490 }
491
492 /**Thread-safe monitor of data-read activity.
493 * An instance of this class can be inserted into a pipeline
494 * to record data-read activity.
495 * <p>
496 * This filter stage does not generally impede normal operation,
497 * but simply records the data access pattern for processing.
498 */
499 static final class RawDataAccessMonitor extends SimpleExhibitPipelineFilter
500 {
501 /**Construct instance wrapped round upstream source.
502 * @param upstream datasource; never null
503 */
504 RawDataAccessMonitor(final SimpleExhibitPipelineIF upstream)
505 { super(upstream); }
506
507 /**A List of GetRawDataEvent values in the order they occurred.
508 * A Vector is used for thread-safety.
509 */
510 private final List<GetRawDataEvent> events = new Vector<GetRawDataEvent>();
511
512 /**A "getRawData()" event.
513 * Immutable.
514 * <p>
515 * Records the time, stack trace, exhibit name, start and length.
516 */
517 static final class GetRawDataEvent
518 {
519 public GetRawDataEvent(final Name.ExhibitFull name, final int start, final int len)
520 {
521 this.name = name;
522 this.start = start;
523 this.len = len;
524 }
525
526 /**Time at which event happened. */
527 public final long timestamp = System.currentTimeMillis();
528
529 /**Stack trace when event happened. */
530 private final Throwable th = new Throwable();
531 /**Print the stack trace to the specified stream. */
532 public void printStackTrace(final PrintStream ps)
533 { th.printStackTrace(ps); }
534
535 /**Name of exhibit concerned. */
536 public final Name.ExhibitFull name;
537 /**Start offset of read within exhibit. */
538 public final int start;
539 /**Length of read within exhibit. */
540 public final int len;
541 }
542
543 /**Read a chunk of the raw exhibit binary into the given buffer.
544 * This passes the request upstream without modification,
545 * but records it in a trace buffer <em>before</em> passing it upstream.
546 * The trace is taken before in case the fulfillment of the request
547 * happens to be vetoed by the data source; it is the requests that
548 * we are interested in here.
549 */
550 @Override
551 public void getRawFile(final ByteBuffer buf,
552 final Name.ExhibitFull exhibitName, final int position, final boolean dontCache)
553 throws IOException
554 {
555 events.add(new GetRawDataEvent(exhibitName, position, buf.limit() - buf.position()));
556 super.getRawFile(buf, exhibitName, position, dontCache);
557 }
558
559 /**Retrieve the List of events in order; never null though possibly empty.
560 * This returns a private snapshot of the event list.
561 */
562 public List<GetRawDataEvent> getEventList()
563 {
564 // Hold a lock on events while we take the snapshot...
565 synchronized(events)
566 {
567 return(new ArrayList<GetRawDataEvent>(events));
568 }
569 }
570 }
571
572 /**Check behaviour of single cache instance, mainly for persistence.
573 * This checks for different things on the first and subsequent runs,
574 * but in all cases makes sure that the cache behaves properly
575 * and does not lose data upon restart,
576 * whether it gets shut down properly or not.
577 * <p>
578 * The number of exhibits partially or fully cached must not go down
579 * and must be greater than zero when we return if there are exhibits.
580 * <p>
581 * This tests the cache's use of its upstream data source
582 * knowing that this should read and cache exhibit prefixes
583 * (ie it will/may only cache activity that reads starting at the
584 * beginning of an exhibit, as a normal download should),
585 * and that having read such a prefix it should never have to go
586 * back to the underlying source for anything within that prefix again,
587 * even when the cache is shut down an restarted,
588 * assuming that our cache area is big enough to hold all the exhibits.
589 *
590 * @param firstRun if true, this is the first run on a new cache area
591 * @param prevCachedCount number of exhibits previously cached;
592 * non-negative
593 * @param eventsFromTestStart List of events collected so far in order;
594 * we will append to this (the content is opaque to the caller);
595 * must be non-null and appendable-to
596 * @param stopBy target time to stop by
597 *
598 * @return number of exhibits partially or fully cached; non-negative
599 *
600 * @throws IOException
601 * @throws InterruptedException
602 */
603 private int checkCacheInstance(final boolean firstRun,
604 final int prevCachedCount,
605 final List<RawDataAccessMonitor.GetRawDataEvent> eventsFromTestStart,
606 final long stopBy)
607 throws IOException,
608 InterruptedException
609 {
610 // Set up a simple instance of a file data source.
611 final ExhibitDataFileSource edfs = new ExhibitDataFileSource();
612
613 // Set up a data-access monitoring stage.
614 // We use this to monitor the effectiveness of the cache.
615 final RawDataAccessMonitor rdam = new RawDataAccessMonitor(edfs);
616
617 // Set up a simple cache instance.
618 final ExhibitDataSimpleCache cache = ExhibitDataSimpleCache.cacheFactory(
619 rdam, tempDir, logger);
620 assertNotNull("Failed to create"+(firstRun?" first":"")+" cache instance", cache);
621
622
623 // Check some features of the cache before the first poll()...
624
625 // Collect the raw exhibit info from the underlying file system.
626 final AllExhibitProperties aepRaw = edfs.getAllExhibitProperties(-1);
627 // When re-using an existing cache directory,
628 // we assume that the exhibit properties are immediately available
629 // before even a single poll.
630 if(!firstRun)
631 {
632 assertEquals("Exhibit properties via cache must match those fetched directly even before first poll() is cache is not new",
633 aepRaw, cache.getAllExhibitProperties(-1));
634 }
635
636 // Set up data pipeline.
637 final SimpleExhibitPipelineIF pipeline[] = { cache, rdam, edfs };
638 final SimpleExhibitPipelineIF top = pipeline[0];
639
640
641 // Start poll()ing the pipeline at a typical rate...
642 // Vary the poll rate randomly a little
643 // in order to demonstate that we are not very sensitive to it.
644 // Note that cache implementation does not necessarily
645 // use all poll() calls for real processing...
646 final TimerTask tt = new TimerTask(){
647 @Override
648 public void run()
649 {
650 try
651 {
652 final GenProps gp = top.getGenProps(-1);
653 top.poll(gp);
654 }
655 catch(final Exception e) { e.printStackTrace(Main.getErr()); }
656 }
657 };
658 t.schedule(tt, 0L, 100 + rnd.nextInt(700));
659
660
661 // Make sure that we can see all exhibit data via the cache
662 // just like when we get it directly...
663 // We have forced one poll(),
664 // and that one poll() should be enough even on the first run
665 // to ensure that all data is up-to-date
666 // unless there is some sort of I/O error...
667 AllExhibitProperties aepViaCache = cache.getAllExhibitProperties(-1);
668 assertNotNull("Exhibit properties (aep) via cache must not be null",
669 aepViaCache);
670 // Wait until aep (and GenProps) are not at their default values,
671 // ie the cache has had time to fetch them.
672 // This must not take more than a few seconds however...
673 if(aepRaw.aeid.size() > 0)
674 {
675 // Wait a few seconds for the data to get loaded...
676 for(int i = 30; --i >= 0; )
677 {
678 aepViaCache = cache.getAllExhibitProperties(-1);
679 if(aepViaCache.aeid.size() > 0) { break; }
680 System.out.println("Waiting for cache to be reloaded...");
681 Thread.sleep(2000L);
682 }
683 assertTrue("Exhibit properties (aep) via cache must not be empty if underlying ones are not",
684 aepViaCache.aeid.size() > 0);
685 }
686 assertEquals("Exhibit properties (aep) via cache must match those fetched directly",
687 aepRaw, aepViaCache);
688 assertEquals("Exhibit properties (aeid) via cache must match those fetched directly",
689 edfs.getAllExhibitImmutableData(-1),
690 cache.getAllExhibitImmutableData(-1));
691
692 // Check that data is seen consistently on either side of the cache.
693 TunnelTest.checkExhibitDataConsistency(cache, edfs,
694 false, // Not a tunnel...
695 stopBy);
696
697 // Read the first byte of each exhibit
698 // to ensure that something should be cached for each.
699 // Have the "dontCache" flag false to try to ensure we cache the data.
700 final ByteBuffer buf = ByteBuffer.allocate(1);
701 for(final Name.ExhibitFull name: aepViaCache.aeid.getAllExhibitNamesSorted())
702 {
703 buf.clear();
704 cache.getRawFile(buf, name, 0, false);
705 }
706
707 // Now check the cached exhibit count...
708 cache.syncVariables(true); // Force our view to be up-to-date...
709 final SimpleVariableValue cc =
710 cache.getVariable(SystemVariables.ExhibitDataSimpleCache_CACHED_EXHIBIT_COUNT);
711 assertNotNull("Must be able to retrieve cached exhibit count",
712 cc);
713 assertNotNull("Must be able to retrieve non-null cached exhibit count",
714 cc.getValue());
715 final int cachedCountVar = ((Number) cc.getValue()).intValue();
716 assertTrue("Must be able to retrieve non-negative cached exhibit count",
717 cachedCountVar >= 0);
718
719 // The number of (partly/fully) cached exhibits must not go down;
720 // at worst we must be able to retrieve the previous
721 // possibly-slightly-stale meta-data from disc
722 // and/or reconstruct it from the data we actually find in place.
723 final int cachedCount = cache.getLiveCachedExhibitCount();
724 assertTrue("Number of cached exhibits must not go down",
725 cachedCount >= prevCachedCount);
726
727 // Get the live raw count of exhibits partly/fully cached.
728 if(aepRaw.aeid.size() > 0)
729 {
730 assertTrue("The number of exhibits cached after some data has been retrieved must be greater than zero if there are exhibits",
731 cachedCount > 0);
732 }
733
734
735 // if(firstRun)
736 // {
737 // // Test variable-handling behaviour...
738 // // (We must be calling poll() by now.)
739 // SystemVariablesTest.singlePipelineTest(pipeline, stopBy);
740 // }
741
742
743 // Kill off this instance.
744 // It will get no more poll() calls so is effectively dead.
745 tt.cancel();
746
747 // If we are feeling nice then we'll properly destroy() it
748 // to allow data to be flushed to disc.
749 /* if(rnd.nextBoolean()) */ { cache.destroy(); }
750
751 // Now examine the upstream activity trace from the current run...
752 // We make sure that the following bad/marginal things do not happen:
753 // * No requests for non-existent exhibits.
754 // * No requests from beyond exhibit bounds.
755 // * No zero-length (or other badly-formed) requests...
756 // Event count before this run.
757 final int oldEventCount = eventsFromTestStart.size();
758 Main.getOut().println("[getRawData() events before this run: "+oldEventCount+".]");
759 final List<RawDataAccessMonitor.GetRawDataEvent> eventsThisTime = rdam.getEventList();
760 Main.getOut().println("[getRawData() events counted this run: "+eventsThisTime.size()+".]");
761 for(final Iterator<RawDataAccessMonitor.GetRawDataEvent> it = eventsThisTime.iterator(); it.hasNext(); )
762 {
763 final RawDataAccessMonitor.GetRawDataEvent e =
764 it.next();
765
766 // Look for obviously badly-formed requests.
767 if((e.name == null) || (e.start < 0) || (e.len < 0))
768 { fail("Badly formed getRawData() request detected (null/-ve args)."); }
769 if(e.len == 0)
770 { fail("Marginal/deprecated getRawData() request detected (len==0)."); }
771 final ExhibitStaticAttr staticAttr = aepRaw.aeid.getStaticAttr(e.name);
772 if(staticAttr == null)
773 { fail("Bad getRawData() request detected (no such exhibit)."); }
774 if((e.start >= staticAttr.length))
775 { fail("Bad getRawData() request detected (start >= length)."); }
776 if((e.start + e.len > staticAttr.length))
777 { fail("Bad getRawData() request detected (start+len > length)."); }
778 }
779
780 // Append this run's events to the running tally.
781 eventsFromTestStart.addAll(eventsThisTime);
782
783 // Check that the cache has not "forgotten" anything after a restart.
784 // Ensure that exhibit prefixes that have already been fetched
785 // are not refetched...
786 // But we do it on all runs in case we are generally amnesiac...
787 // if(!firstRun)
788 // if(true) { System.err.println("WARNING: skipping refetch tests"); } else
789 {
790 // Map from full exhibit name
791 // to high-water mark that should have been cached (Long).
792 final Map<Name.ExhibitFull,Long> marks = new HashMap<Name.ExhibitFull, Long>();
793
794 int eventNum = 1;
795 for(final Iterator<RawDataAccessMonitor.GetRawDataEvent> it = eventsFromTestStart.iterator(); it.hasNext(); ++eventNum)
796 {
797 final RawDataAccessMonitor.GetRawDataEvent e =
798 it.next();
799
800 // Get any extant entry for the exhibit
801 // that is the subject of this event...
802 // If no mark so far then we have cached zero bytes so far.
803 final Long cachedSoFarL = marks.get(e.name);
804 final long cachedSoFar = (cachedSoFarL == null) ?
805 0 : cachedSoFarL.longValue();
806
807 // Create a description of the current event,
808 // just in case we need to print it...
809 final StringBuilder sb = new StringBuilder(32);
810 if(eventNum >= oldEventCount)
811 { sb.append("NEW,"); }
812 sb.append("event#=").append(eventNum);
813 sb.append(",cachedSoFar=").append(cachedSoFar);
814 sb.append(",e.start=").append(e.start);
815 sb.append(",e.len=").append(e.len);
816 sb.append(",e.name=").append(e.name);
817 final String description = sb.toString();
818
819 // If any part of this request should have been
820 // locally satisfied from cache,
821 // it is an error.
822 if(e.start < cachedSoFar)
823 {
824 Main.getErr().println("ERROR: invalid refetch: " + description);
825 e.printStackTrace(Main.getErr());
826 fail("Re-fetched data from source that should still be in cache: ("+description+")");
827 }
828
829 // If this request starts with a gap from the
830 // currently-cached prefix then it is a random access
831 // and will not be cached...
832 if(e.start > cachedSoFar)
833 {
834 // Main.getOut().println("Random read assumed not added to cache... ("+description+")");
835 continue;
836 }
837
838 // If this read extends the cached prefix,
839 // update the mark entry for this exhibit.
840 final int readTo = e.start + e.len;
841 if(readTo > cachedSoFar)
842 {
843 // Main.getOut().println("Extended cache... ("+description+")");
844 marks.put(e.name, new Long(readTo));
845 }
846 }
847 }
848
849 // Return cached exhibit count.
850 return(cachedCount);
851 }
852
853
854
855 /**Thread-safe monitor of data-read activity.
856 * An instance of this class can be inserted into a pipeline
857 * to record or modify data-read activity.
858 */
859 static final class DataConcurrencyAccessMonitor extends SimpleExhibitPipelineFilter
860 {
861 DataConcurrencyAccessMonitor(final SimpleExhibitPipelineIF upstream)
862 { super(upstream); }
863
864 /**Once true, threads do not block in getRawFile(); volatile so as not to need the mutex. */
865 private volatile boolean dontBlock;
866
867 /**Count of blocked thread in getRawData().
868 * Must only be accessed under protection of the notifier mutex.
869 */
870 private int blockedThreadsCount;
871
872 /**Get count of blocked thread in getRawData(); non-negative. */
873 private int getBlockedThreadsCount() { synchronized(notifier) { return(blockedThreadsCount); } }
874
875 /**Used to notify/wake blocked threads. */
876 private final Object notifier = new Object();
877
878 /**Call this to release any waiting/blocked threads in getRawFile(). */
879 void release()
880 {
881 dontBlock = true;
882 synchronized(notifier)
883 {
884 notifier.notifyAll(); // Wake everyone up...
885 }
886 }
887
888 /**Intercept and block calls as if blocked waiting for disc access. */
889 @Override
890 public void getRawFile(final ByteBuffer buf, final Name.ExhibitFull exhibitName, final int position, final boolean dontCache) throws IOException
891 {
892 // Block until released.
893 try
894 {
895 synchronized(notifier)
896 {
897 //(new Throwable("BLOCKING: blockedThreadsCount = " + blockedThreadsCount)).printStackTrace(System.out);
898 ++blockedThreadsCount;
899 while(!dontBlock)
900 { notifier.wait(501 + rnd.nextInt(501)); }
901 --blockedThreadsCount;
902 //(new Throwable("UNBLOCKING: blockedThreadsCount = " + blockedThreadsCount)).printStackTrace(System.out);
903 }
904 }
905 catch(final InterruptedException e)
906 {
907 throw new IllegalMonitorStateException("Unexpected wakeup");
908 }
909
910 // Really do the call...
911 super.getRawFile(buf, exhibitName, position, dontCache);
912 }
913 }
914
915
916 /**Test that cache has indefinite (file read) concurrency.
917 * The aim is to demonstrate that we can scale well to serve exhibit data,
918 * eg on a highly-threadable system such as Niagara/CoolThreads.
919 */
920 public void testCacheFileReadConcurrency()
921 throws Exception
922 {
923 // Pick how many threads that we will try to push through the cache.
924 final int nThreads = 30 + rnd.nextInt(50);
925
926 // Set up new, empty, test cache directory.
927 tempDir.mkdirs();
928
929 // Set up a simple instance of a file data source.
930 final ExhibitDataFileSource edfs = new ExhibitDataFileSource();
931
932 // Set up a data-access monitoring stage.
933 // We use this to monitor the effectiveness of the cache.
934 final DataConcurrencyAccessMonitor dcam = new DataConcurrencyAccessMonitor(edfs);
935
936 // Set up a simple cache instance.
937 final ExhibitDataSimpleCache cache = ExhibitDataSimpleCache.cacheFactory(
938 dcam, tempDir, logger);
939 assertNotNull("Failed to create cache instance", cache);
940
941 // Collect the raw exhibit info from the underlying file system.
942 final AllExhibitProperties aepRaw = edfs.getAllExhibitProperties(-1);
943
944 // Start poll()ing the pipeline at a typical rate...
945 // Vary the poll rate randomly a little
946 // in order to demonstrate that we are not very sensitive to it.
947 // Note that cache implementation does not necessarily
948 // use all poll() calls for real processing...
949 final TimerTask tt = new TimerTask(){
950 @Override
951 public void run()
952 {
953 try
954 {
955 final GenProps gp = cache.getGenProps(-1);
956 cache.poll(gp);
957 }
958 catch(final Exception e) { e.printStackTrace(Main.getErr()); }
959 }
960 };
961 t.schedule(tt, 0L, 100 + rnd.nextInt(700));
962
963 // Wait until the cache has loaded the exhibit set.
964 final long waitStartTime = System.currentTimeMillis();
965 do
966 {
967 Main.getOut().println("Waiting for cache to load AEP... ("+(System.currentTimeMillis()-waitStartTime)+"ms)");
968 Thread.sleep(1000);
969 } while(cache.getAllExhibitProperties(-1).aeid.size() != aepRaw.aeid.size());
970
971 // A worker class to read a chunk of data via the cache
972 // any verify it against the underlying data.
973 final class ReaderThread extends Thread
974 {
975 /**Read some data from the named exhibit. */
976 ReaderThread(final Name.ExhibitFull exhibitName)
977 {
978 super("ReaderThread for " + exhibitName);
979 exhibit = exhibitName;
980 }
981
982 /**Full name of exhibit to read from; never null. */
983 private final Name.ExhibitFull exhibit;
984
985 /**Set true iff completed OK. */
986 volatile boolean completedOK;
987
988 @Override
989 public final void run()
990 {
991 final ExhibitStaticAttr esa = aepRaw.aeid.getStaticAttr(exhibit);
992 assertNotNull(esa);
993 assertTrue(aepRaw.aeid.isPresent(exhibit));
994
995 // Often start read at zero, to get the data cached.
996 final int start = rnd.nextBoolean() ? 0 : rnd.nextInt((int) esa.length);
997 // Limit length of read to reasonable block size.
998 final int length = rnd.nextInt(Math.min(16384, (int) esa.length - start));
999
1000 try
1001 {
1002 final ByteBuffer buf1 = ByteBuffer.allocate(length);
1003 cache.getRawFile(buf1, exhibit, 0, rnd.nextBoolean());
1004
1005 final ByteBuffer buf2 = ByteBuffer.allocate(length);
1006 edfs.getRawFile(buf2, exhibit, 0, rnd.nextBoolean());
1007
1008 // If the data was the same through the cache as raw
1009 // then we succeeded.
1010 if(buf1.equals(buf2))
1011 { completedOK = true; }
1012 else
1013 { Main.getErr().println("Failed for: " + exhibit); }
1014 }
1015 catch(final Exception e)
1016 { e.printStackTrace(); /* Whoops! */ }
1017 }
1018 }
1019
1020 // Manage a group of workers to use the cache.
1021 final List<ReaderThread> workers = new ArrayList<ReaderThread>(nThreads);
1022
1023 try
1024 {
1025 // Start one reader/worker thread per exhibit.
1026 // All these should be able to run concurrently,
1027 // reaching through the cache to the underlying file source,
1028 // as each will be the first read of this exhibit
1029 // with no data in the cache for it.
1030 for(final Name.ExhibitFull exhibitName : aepRaw.aeid.getAllExhibitNamesSorted())
1031 {
1032 if(workers.size() >= nThreads) { break; }
1033
1034 final ReaderThread rt = new ReaderThread(exhibitName);
1035 workers.add(rt);
1036 rt.setPriority(Thread.MAX_PRIORITY); // Should run before us...
1037 rt.start();
1038 Thread.yield();
1039 }
1040
1041 // Be prepared to sleep a little for all threads to block...
1042 mainLoop: for(int i = 9+(5*workers.size()); (workers.size() > dcam.getBlockedThreadsCount()) && (--i >= 0); )
1043 {
1044 Main.getOut().println("Waiting for threads to block/settle... ("+(System.currentTimeMillis()-waitStartTime)+"ms)");
1045
1046 // Look for any prematurely-finished workers, and zap them.
1047 for(final Iterator<ReaderThread> rtIt = workers.iterator(); rtIt.hasNext(); )
1048 {
1049 final ReaderThread rt = rtIt.next();
1050 if(!rt.isAlive())
1051 {
1052 Main.getErr().println("WARNING: worker exited early.");
1053 rtIt.remove();
1054 continue mainLoop;
1055 }
1056 }
1057
1058 Thread.sleep(1000);
1059 }
1060
1061 // Check that new threads hit the back-end
1062 // even if we have blocked all previous threads in the back-end,
1063 // ie that the cache is not a concurrency bottleneck.
1064 final int initialWorkerCount = workers.size();
1065 assertEquals("All threads must be blocked in the getRawFile() interceptor",
1066 initialWorkerCount, dcam.getBlockedThreadsCount());
1067
1068 if(workers.size() < nThreads)
1069 {
1070 Main.getOut().println("Starting extra reader threads... ("+(System.currentTimeMillis()-waitStartTime)+"ms)");
1071
1072 // Set off remaining threads for random exhibits...
1073 // These may not all immediately be allowed through by the cache,
1074 // but they should all work correctly.
1075 final List<Name.ExhibitFull> names = aepRaw.aeid.getAllExhibitNamesSorted();
1076 while(workers.size() < nThreads)
1077 {
1078 final ReaderThread rt = new ReaderThread(names.get(rnd.nextInt(names.size())));
1079 workers.add(rt);
1080 rt.start();
1081 }
1082
1083 Thread.yield(); // Give the new threads a chance to start...
1084
1085 // Check that the blocked-threads count is still reasonable.
1086 assertTrue("All initial worker threads must (still) be blocked in the getRawFile() interceptor",
1087 initialWorkerCount <= dcam.getBlockedThreadsCount());
1088 }
1089 }
1090 finally
1091 {
1092 final long releaseStart = System.currentTimeMillis();
1093 final long timeSoFar = (releaseStart-waitStartTime);
1094 Main.getOut().println("Releasing threads... ("+timeSoFar+"ms)");
1095
1096 dcam.release(); // Let the threads go...
1097 Thread.yield(); // Give them a sporting chance to execute...
1098 Main.getOut().println("Released threads...");
1099
1100 // Wait for all the worker/reader threads and test their status.
1101 // Allow a fair amount of time for thread thrashing, etc.
1102 // We make this slightly adaptive to the speed of this machine/JVM.
1103 final long maxWaitTime = 60001 + (timeSoFar/2);
1104 final long targetFinishTime = releaseStart + maxWaitTime;
1105 final long hopedForFinishTime = releaseStart + maxWaitTime/4;
1106 int failCount = 0;
1107 for(int i = workers.size(); --i >= 0; )
1108 {
1109 final ReaderThread readerThread = workers.get(i);
1110 // If we've already been waiting a while, print progress...
1111 final long now = System.currentTimeMillis();
1112 if(now > hopedForFinishTime)
1113 { Main.getOut().println("Waiting for worker thread "+i+"... ("+(now-waitStartTime)+"ms)"); }
1114 // We will not wait indefinitely...
1115 readerThread.join(Math.max(1001, targetFinishTime - now));
1116 if(!readerThread.completedOK)
1117 { ++failCount; }
1118 }
1119 if(failCount > 0) { fail("Workers failed: " + failCount + " out of " + workers.size()); }
1120 }
1121 }
1122
1123
1124 /**Private source of OK pseudo-random numbers. */
1125 private static final Random rnd = new Random();
1126 }