001 /*
002 Copyright (c) 1996-2012, Damon Hart-Davis
003 All rights reserved.
004
005 Redistribution and use in source and binary forms, with or without
006 modification, are permitted provided that the following conditions are
007 met:
008
009 * Redistributions of source code must retain the above copyright
010 notice, this list of conditions and the following disclaimer.
011
012 * Redistributions in binary form must reproduce the above copyright
013 notice, this list of conditions and the following disclaimer in the
014 documentation and/or other materials provided with the
015 distribution.
016
017 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020 PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028 */
029 package org.hd.d.pg2k.webSvr.exhibit;
030
031 import java.util.ArrayList;
032 import java.util.Collection;
033 import java.util.List;
034 import java.util.concurrent.Callable;
035 import java.util.concurrent.Future;
036
037 import org.hd.d.pg2k.svrCore.AllExhibitProperties;
038 import org.hd.d.pg2k.svrCore.GenUtils;
039 import org.hd.d.pg2k.svrCore.Name;
040 import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
041 import org.hd.d.pg2k.svrCore.ThreadUtils;
042
043 /**Wraps a filter to make an expression (for a FilterBean).
044 * Filters must be Serializable so that filter expressions can
045 * be persisted. They must not attempt to cache any reference,
046 * direct or indirect, to the DataSourceBean.
047 * <p>
048 * Filter objects must be immutable and the accept() method must be
049 * idempotent and not depend on any state held within the object.
050 * <p>
051 * TODO: do object validation on deserialisation
052 */
053 public final class FilterExpr extends Expr
054 {
055 /**The filter we apply here; never null. */
056 private final FilterIF filter;
057
058 /**Construct us with the upstream expression.
059 * If the upstream expression is null this is the first
060 * element in the expression pipeline.
061 */
062 public FilterExpr(final Expr _upstream, final FilterIF _filter)
063 {
064 super(_upstream);
065 if(_filter == null) { throw new IllegalArgumentException(); }
066 filter = _filter;
067 }
068
069 /**Minimum number of values worth attempting to filter in parallel; strictly positive.
070 * This plays off parallelism overhead vs available concurrency
071 * but should not be especially critical an order of magnitude either way.
072 */
073 private static final int MIN_SIZE_FOR_CONCURRENCY = 256;
074
075 /**Evaluate this expression applying our filter.
076 * Given the upstream expression, compute its results and
077 * then compute ours on it.
078 * <p>
079 * We return a set of names, possibly empty.
080 * <p>
081 * The result array contains no duplicates nor nulls
082 * (providing the input is similarly well-behaved)
083 * and is never null itself.
084 * <p>
085 * Designed to be overridden and have super() called to
086 * collect upstream results... By itself returns all
087 * the exhibits untouched, ie is a null filter/sorter.
088 * <p>
089 * Will attempt to filter in parallel if possible...
090 */
091 @Override
092 public Name.ExhibitFull[] eval(final AllExhibitProperties aep, final Name.ExhibitFull[] in)
093 {
094 // Chose our input.
095 final Name.ExhibitFull input[] = super.eval(aep, in);
096
097 // Optimisation: no work required for already-empty input.
098 final int length = input.length;
099 if(length == 0) { return(input); }
100
101 // Don't attempt to parallelise for small data sets nor on uni-processor JVMs.
102 final boolean parallelise = ((ThreadUtils.AVAILABLE_PROCESSORS > 1) && (length > MIN_SIZE_FOR_CONCURRENCY));
103
104 final Collection<Name.ExhibitFull> result = new ArrayList<Name.ExhibitFull>(length);
105
106 if(!parallelise)
107 {
108 // Single-threaded version, with accepted results added directly to the result collection.
109 for(int i = length; --i >= 0; )
110 {
111 final Name.ExhibitFull name = input[i];
112 // Apply the user filter.
113 if(!filter.accept(aep, name)) { continue; }
114 // Hurrah! We have a hit!
115 result.add(name);
116 }
117 }
118 else
119 {
120 // Have the number of parallel tasks go up slowly (with the log of the work size)
121 // and capped by the number of available CPUs.
122 final int nTasks = Math.min(ThreadUtils.AVAILABLE_PROCESSORS, GenUtils.log2Approx(length));
123 assert(nTasks > 1);
124 assert(nTasks < length);
125
126 // Size of each partition: one partition/task will have any extra residue.
127 final int partitionSize = length / nTasks;
128 assert(partitionSize > 0);
129
130 // Create the tasks.
131 final Collection<Callable<Collection<Name.ExhibitFull>>> tasks = new ArrayList<Callable<Collection<ExhibitFull>>>(nTasks);
132 int taskEnd = length;
133 for(int i = nTasks; --i >= 0; taskEnd -= partitionSize)
134 {
135 // Note that task 0 may be slightly larger than the other tasks.
136 final int end = taskEnd; // Exclusive end in array.
137 final int start = (i == 0) ? 0 : (taskEnd - partitionSize); // Inclusive start in array.
138 tasks.add(new Callable<Collection<ExhibitFull>>(){
139 public Collection<ExhibitFull> call()
140 {
141 // Do filtering on one section of the array
142 // accumulating partial results to combine at the end.
143 final Collection<Name.ExhibitFull> partialResult = new ArrayList<Name.ExhibitFull>(end-start);
144 for(int i = end; --i >= start; )
145 {
146 final Name.ExhibitFull name = input[i];
147 // Apply the user filter.
148 if(!filter.accept(aep, name)) { continue; }
149 // Hurrah! We have a hit!
150 partialResult.add(name);
151 }
152 return(partialResult);
153 }
154 });
155 }
156
157 try
158 {
159 // Execute them all, as concurrently as possible.
160 final List<Future<Collection<ExhibitFull>>> taskResults = ThreadUtils.computeIntensiveThreadPool.invokeAll(tasks);
161
162 // Combine the partial results (single-threaded).
163 for(final Future<Collection<ExhibitFull>> tr : taskResults)
164 { result.addAll(tr.get()); }
165 }
166 // If we were interrupted or an exception was thrown, re-throw here.
167 catch(final Exception e) { throw new RuntimeException("unexpected exception", e); }
168 }
169
170 assert(result.size() <= length);
171 final Name.ExhibitFull output[] = new Name.ExhibitFull[result.size()];
172 result.toArray(output);
173
174 return(output);
175 }
176
177 /**Hash depends on underlying expression and this filter. */
178 @Override
179 public int hashCode()
180 {
181 return((super.hashCode() << 5) - filter.hashCode());
182 }
183
184 /**Equality depends on underlying expression and this filter. */
185 @Override
186 public boolean equals(final Object obj)
187 {
188 if(!(obj instanceof FilterExpr)) { return(false); }
189 final FilterExpr other = (FilterExpr) obj;
190 if(!filter.equals(other.filter)) { return(false); }
191 return(super.equals(obj));
192 }
193
194 /**Unique Serialisation class ID generated by http://random.hd.org/. */
195 private static final long serialVersionUID = -7059115865126107312L;
196 }