Skip to content

Introduce CachingCollectorManager to parallelize search when using CachingCollector#16247

Open
gaobinlong wants to merge 5 commits into
apache:mainfrom
gaobinlong:cachingCollectorManager
Open

Introduce CachingCollectorManager to parallelize search when using CachingCollector#16247
gaobinlong wants to merge 5 commits into
apache:mainfrom
gaobinlong:cachingCollectorManager

Conversation

@gaobinlong

Copy link
Copy Markdown
Contributor

Description

This PR introduces CachingCollectorManager, switches GroupingSearch to use search concurrency and move away from the deprecated search(Query, Collector) method.

Relates to #12892.

…chingCollector

Signed-off-by: Binlong Gao <gbinlong@amazon.com>
Signed-off-by: Binlong Gao <gbinlong@amazon.com>

@javanna javanna left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see movement in this area, thanks for working on this @gaobinlong ! I left some comments

: CachingCollector.create(collector, cacheScores, maxRAMMB);
cachingCollectors.add(cache);
@SuppressWarnings("unchecked")
C wrapped = (C) cache;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unchecked cast is a bit of a red flag to me: I think that this CollectorManager should be typed differently: CachingCollectorManager<C extends C, R> implements CollectorManager<CachingCollector, R>. This way there is no cast needed in newCollector? That is also more inline to the actual implementation.

An alternative would be to return cache.in, but that would also require casting.

// One CachingCollector per slice, thread-safe for concurrent newCollector() calls.
private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>();
// The original unwrapped collectors
private final List<C> originalCollectors = new CopyOnWriteArrayList<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not need to handle concurrency, as newCollector is called from a single thread, the same that will call reduce? I wonder if this list is necessary too, given that reduce could loop through the collectors received as argument and retrieve the in collector, which though requires casting.

private final Integer maxDocsToCache;

// One CachingCollector per slice, thread-safe for concurrent newCollector() calls.
private final List<CachingCollector> cachingCollectors = new CopyOnWriteArrayList<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this list is needed for the replay functionality. Note that like we discussed in other PRs, newCollector is never called concurrently. It is called sequentially by the coordinating thread which will also call reduce at the end.

What I do worry about when it comes to concurrency though is the isCached mutable flag, that gets modified by the worker threads and accessed by the main thread at the end. There isn't a concurrent access problem with it, but there may be visibility issues. The list does not need to handle concurrency though.

doc.add(new StringField("groupend", "x", Field.Store.NO));
documents.add(doc);
w.addDocuments(documents);
documents.clear();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you expand on why these changes were needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to verify the unmatched doc never be cached and never be in the search results, since all other docs are matched by the query.

Comment thread lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupingSearch.java Outdated
*
* @lucene.experimental
*/
public class CachingCollectorManager<C extends Collector, R> implements CollectorManager<C, R> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a new test class that verifies the basic functionality of this collector manager, outside of its employment in grouping search?

if (!isCached()) {
throw new IllegalStateException("cache is not available; re-run the query instead");
}
List<C2> secondCollectors = new ArrayList<>(cachingCollectors.size());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a check that throws illegal state also when the list is empty, that means newCollector has never been called?

Signed-off-by: Binlong Gao <gbinlong@amazon.com>
@github-actions github-actions Bot modified the milestones: 10.5.0, 11.0.0 Jun 17, 2026
@Override
public void setScorer(Scorable scorer) throws IOException {
this.scorer = scorer;
groupSelector.setScorer(scorer);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In single-thread search, scorer is set in the first pass collector, see

, but in concurrent search mode, each collector has its own selector, so need to set scorer separately, if not values in selector will be null.

@gaobinlong

Copy link
Copy Markdown
Contributor Author

@javanna all comments are addressed yet, please help to review again, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants