Introduce CachingCollectorManager to parallelize search when using CachingCollector#16247
Introduce CachingCollectorManager to parallelize search when using CachingCollector#16247gaobinlong wants to merge 5 commits into
Conversation
…chingCollector Signed-off-by: Binlong Gao <gbinlong@amazon.com>
Signed-off-by: Binlong Gao <gbinlong@amazon.com>
javanna
left a comment
There was a problem hiding this comment.
Great to see movement in this area, thanks for working on this @gaobinlong ! I left some comments
| : CachingCollector.create(collector, cacheScores, maxRAMMB); | ||
| cachingCollectors.add(cache); | ||
| @SuppressWarnings("unchecked") | ||
| C wrapped = (C) cache; |
There was a problem hiding this comment.
This unchecked cast is a bit of a red flag to me: I think that this CollectorManager should be typed differently: CachingCollectorManager<C extends C, R> implements CollectorManager<CachingCollector, R>. This way there is no cast needed in newCollector? That is also more inline to the actual implementation.
An alternative would be to return cache.in, but that would also require casting.
| // One CachingCollector per slice, thread-safe for concurrent newCollector() calls. | ||
| private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>(); | ||
| // The original unwrapped collectors | ||
| private final List<C> originalCollectors = new CopyOnWriteArrayList<>(); |
There was a problem hiding this comment.
this does not need to handle concurrency, as newCollector is called from a single thread, the same that will call reduce? I wonder if this list is necessary too, given that reduce could loop through the collectors received as argument and retrieve the in collector, which though requires casting.
| private final Integer maxDocsToCache; | ||
|
|
||
| // One CachingCollector per slice, thread-safe for concurrent newCollector() calls. | ||
| private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>(); |
There was a problem hiding this comment.
looks like this list is needed for the replay functionality. Note that like we discussed in other PRs, newCollector is never called concurrently. It is called sequentially by the coordinating thread which will also call reduce at the end.
What I do worry about when it comes to concurrency though is the isCached mutable flag, that gets modified by the worker threads and accessed by the main thread at the end. There isn't a concurrent access problem with it, but there may be visibility issues. The list does not need to handle concurrency though.
| doc.add(new StringField("groupend", "x", Field.Store.NO)); | ||
| documents.add(doc); | ||
| w.addDocuments(documents); | ||
| documents.clear(); |
There was a problem hiding this comment.
could you expand on why these changes were needed?
There was a problem hiding this comment.
This is to verify the unmatched doc never be cached and never be in the search results, since all other docs are matched by the query.
| * | ||
| * @lucene.experimental | ||
| */ | ||
| public class CachingCollectorManager<C extends Collector, R> implements CollectorManager<C, R> { |
There was a problem hiding this comment.
Could we add a new test class that verifies the basic functionality of this collector manager, outside of its employment in grouping search?
| if (!isCached()) { | ||
| throw new IllegalStateException("cache is not available; re-run the query instead"); | ||
| } | ||
| List<C2> secondCollectors = new ArrayList<>(cachingCollectors.size()); |
There was a problem hiding this comment.
I would add a check that throws illegal state also when the list is empty, that means newCollector has never been called?
Signed-off-by: Binlong Gao <gbinlong@amazon.com>
| @Override | ||
| public void setScorer(Scorable scorer) throws IOException { | ||
| this.scorer = scorer; | ||
| groupSelector.setScorer(scorer); |
There was a problem hiding this comment.
In single-thread search, scorer is set in the first pass collector, see
values in selector will be null.
|
@javanna all comments are addressed yet, please help to review again, thanks! |
Description
This PR introduces CachingCollectorManager, switches GroupingSearch to use search concurrency and move away from the deprecated search(Query, Collector) method.
Relates to #12892.