001    /*
002    Copyright (c) 1996-2012, Damon Hart-Davis
003    All rights reserved.
004    
005    Redistribution and use in source and binary forms, with or without
006    modification, are permitted provided that the following conditions are
007    met:
008    
009      * Redistributions of source code must retain the above copyright
010        notice, this list of conditions and the following disclaimer.
011    
012      * Redistributions in binary form must reproduce the above copyright
013        notice, this list of conditions and the following disclaimer in the
014        documentation and/or other materials provided with the
015        distribution.
016    
017    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
018    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
019    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
020    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
021    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
022    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
023    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
024    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
025    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
026    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
027    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
028    */
029    package org.hd.d.pg2k.webSvr.exhibit;
030    
031    import java.util.ArrayList;
032    import java.util.Collection;
033    import java.util.List;
034    import java.util.concurrent.Callable;
035    import java.util.concurrent.Future;
036    
037    import org.hd.d.pg2k.svrCore.AllExhibitProperties;
038    import org.hd.d.pg2k.svrCore.GenUtils;
039    import org.hd.d.pg2k.svrCore.Name;
040    import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
041    import org.hd.d.pg2k.svrCore.ThreadUtils;
042    
043    /**Wraps a filter to make an expression (for a FilterBean).
044     * Filters must be Serializable so that filter expressions can
045     * be persisted.  They must not attempt to cache any reference,
046     * direct or indirect, to the DataSourceBean.
047     * <p>
048     * Filter objects must be immutable and the accept() method must be
049     * idempotent and not depend on any state held within the object.
050     * <p>
051     * TODO: do object validation on deserialisation
052     */
053    public final class FilterExpr extends Expr
054        {
055        /**The filter we apply here; never null. */
056        private final FilterIF filter;
057    
058        /**Construct us with the upstream expression.
059         * If the upstream expression is null this is the first
060         * element in the expression pipeline.
061         */
062        public FilterExpr(final Expr _upstream, final FilterIF _filter)
063            {
064            super(_upstream);
065            if(_filter == null) { throw new IllegalArgumentException(); }
066            filter = _filter;
067            }
068    
069        /**Minimum number of values worth attempting to filter in parallel; strictly positive.
070         * This plays off parallelism overhead vs available concurrency
071         * but should not be especially critical an order of magnitude either way.
072         */
073        private static final int MIN_SIZE_FOR_CONCURRENCY = 256;
074    
075        /**Evaluate this expression applying our filter.
076         * Given the upstream expression, compute its results and
077         * then compute ours on it.
078         * <p>
079         * We return a set of names, possibly empty.
080         * <p>
081         * The result array contains no duplicates nor nulls
082         * (providing the input is similarly well-behaved)
083         * and is never null itself.
084         * <p>
085         * Designed to be overridden and have super() called to
086         * collect upstream results...  By itself returns all
087         * the exhibits untouched, ie is a null filter/sorter.
088         * <p>
089         * Will attempt to filter in parallel if possible...
090         */
091        @Override
092        public Name.ExhibitFull[] eval(final AllExhibitProperties aep, final Name.ExhibitFull[] in)
093            {
094            // Chose our input.
095            final Name.ExhibitFull input[] = super.eval(aep, in);
096    
097            // Optimisation: no work required for already-empty input.
098            final int length = input.length;
099            if(length == 0) { return(input); }
100    
101            // Don't attempt to parallelise for small data sets nor on uni-processor JVMs.
102            final boolean parallelise = ((ThreadUtils.AVAILABLE_PROCESSORS > 1) && (length > MIN_SIZE_FOR_CONCURRENCY));
103    
104            final Collection<Name.ExhibitFull> result = new ArrayList<Name.ExhibitFull>(length);
105    
106            if(!parallelise)
107                {
108                // Single-threaded version, with accepted results added directly to the result collection.
109                for(int i = length; --i >= 0; )
110                    {
111                    final Name.ExhibitFull name = input[i];
112                    // Apply the user filter.
113                    if(!filter.accept(aep, name)) { continue; }
114                    // Hurrah!  We have a hit!
115                    result.add(name);
116                    }
117                }
118            else
119                {
120                // Have the number of parallel tasks go up slowly (with the log of the work size)
121                // and capped by the number of available CPUs.
122                final int nTasks = Math.min(ThreadUtils.AVAILABLE_PROCESSORS, GenUtils.log2Approx(length));
123                assert(nTasks > 1);
124                assert(nTasks < length);
125    
126                // Size of each partition: one partition/task will have any extra residue.
127                final int partitionSize = length / nTasks;
128                assert(partitionSize > 0);
129    
130                // Create the tasks.
131                final Collection<Callable<Collection<Name.ExhibitFull>>> tasks = new ArrayList<Callable<Collection<ExhibitFull>>>(nTasks);
132                int taskEnd = length;
133                for(int i = nTasks; --i >= 0; taskEnd -= partitionSize)
134                    {
135                    // Note that task 0 may be slightly larger than the other tasks.
136                    final int end = taskEnd; // Exclusive end in array.
137                    final int start = (i == 0) ? 0 : (taskEnd - partitionSize); // Inclusive start in array.
138                    tasks.add(new Callable<Collection<ExhibitFull>>(){
139                        public Collection<ExhibitFull> call()
140                            {
141                            // Do filtering on one section of the array
142                            // accumulating partial results to combine at the end.
143                            final Collection<Name.ExhibitFull> partialResult = new ArrayList<Name.ExhibitFull>(end-start);
144                            for(int i = end; --i >= start; )
145                                {
146                                final Name.ExhibitFull name = input[i];
147                                // Apply the user filter.
148                                if(!filter.accept(aep, name)) { continue; }
149                                // Hurrah!  We have a hit!
150                                partialResult.add(name);
151                                }
152                            return(partialResult);
153                            }
154                        });
155                    }
156    
157                try
158                    {
159                    // Execute them all, as concurrently as possible.
160                    final List<Future<Collection<ExhibitFull>>> taskResults = ThreadUtils.computeIntensiveThreadPool.invokeAll(tasks);
161    
162                    // Combine the partial results (single-threaded).
163                    for(final Future<Collection<ExhibitFull>> tr : taskResults)
164                        { result.addAll(tr.get()); }
165                    }
166                // If we were interrupted or an exception was thrown, re-throw here.
167                catch(final Exception e) { throw new RuntimeException("unexpected exception", e); }
168                }
169    
170            assert(result.size() <= length);
171            final Name.ExhibitFull output[] = new Name.ExhibitFull[result.size()];
172            result.toArray(output);
173    
174            return(output);
175            }
176    
177        /**Hash depends on underlying expression and this filter. */
178        @Override
179        public int hashCode()
180            {
181            return((super.hashCode() << 5) - filter.hashCode());
182            }
183    
184        /**Equality depends on underlying expression and this filter. */
185        @Override
186        public boolean equals(final Object obj)
187            {
188            if(!(obj instanceof FilterExpr)) { return(false); }
189            final FilterExpr other = (FilterExpr) obj;
190            if(!filter.equals(other.filter)) { return(false); }
191            return(super.equals(obj));
192            }
193    
194        /**Unique Serialisation class ID generated by http://random.hd.org/. */
195        private static final long serialVersionUID = -7059115865126107312L;
196        }