Skip to content

feat(vector_search): Implement functionality for pre-filters and maxD…#18797

Merged
voonhous merged 7 commits into
apache:masterfrom
rahil-c:rahil/vector-search-prefilters
May 28, 2026
Merged

feat(vector_search): Implement functionality for pre-filters and maxD…#18797
voonhous merged 7 commits into
apache:masterfrom
rahil-c:rahil/vector-search-prefilters

Conversation

@rahil-c
Copy link
Copy Markdown
Collaborator

@rahil-c rahil-c commented May 20, 2026

Describe the issue this Pull Request addresses

Closes #18459

The hudi_vector_search and hudi_vector_search_batch TVFs (introduced in #18432) currently scan the entire corpus when computing distances, even when the user only cares about a narrow slice of it (e.g. one partition, one tenant, one category). They also always return the top-K closest rows regardless of how far away those rows actually are — there is no way to express "give me neighbors only if they are closer than X."

This PR adds two optional, NULL-able arguments to both TVFs to address these gaps:

  1. filter — a SQL predicate applied to the corpus before distance computation shrinking the cross-join cardinality in batch mode).
  2. max_distance — a distance threshold; rows beyond this distance are dropped before top-K selection, reducing shuffle/sort volume and giving users an "only return neighbors closer than X" semantic.

Summary and Changelog

User-visible change: both vector-search TVFs now accept two extra optional positional arguments.

-- Before:
SELECT * FROM hudi_vector_search('t', 'emb', ARRAY(1.0, 0.0, 0.0), 5, 'cosine', 'brute_force')

-- After (new args, both NULL-able):
SELECT * FROM hudi_vector_search(
  't', 'emb', ARRAY(1.0, 0.0, 0.0), 5, 'cosine', 'brute_force',
  'label IN ("x-axis", "y-axis")',   -- filter predicate
  0.3                                 -- max_distance
)

hudi_vector_search extends from 4–6 args to 4–8 args; hudi_vector_search_batch extends from 5–7 args to 5–9 args. Both new positions accept NULL to mean "not specified," preserving the old behavior.

A quoting note worth calling out: because the filter is itself a single-quoted TVF argument and Spark's lexer doesn't support '' as an escape sequence inside string literals, predicates that compare against string values must use double quotes inside (e.g. 'label = "z-axis"'). This is documented on the VectorSearchAlgorithm.buildSingleQueryPlan Javadoc.

Detailed log:

  • HoodieVectorSearchTableValuedFunction.scala
    • Added filter: Option[String] and maxDistance: Option[Double] to ParsedArgs for both the single-query and batch TVF objects.
    • Extended parseArgs ranges (4-8 and 5-9) and error messages.
    • Added parseOptionalString / parseOptionalDouble helpers that accept Literal(null)None, with error messages naming the offending argument.
  • HoodieVectorSearchPlanBuilder.scala
    • Added filter and maxDistance parameters with None defaults to the VectorSearchAlgorithm trait (buildSingleQueryPlan and buildBatchQueryPlan) so future algorithm implementations inherit the contract.
    • Added a private applyFilter helper in BruteForceSearchAlgorithm that wraps Spark AnalysisExceptions in HoodieAnalysisException and echoes the offending predicate into the error message.
    • Wired filter application before distance computation (single-query) and before the cross-join (batch) so Catalyst can push the predicate through the Hudi relation.
    • Wired maxDistance application after distance computation but before orderBy.limit (single-query) and before the Window.partitionBy step (batch) to reduce sort/shuffle input.
    • Updated the object-level Javadoc on BruteForceSearchAlgorithm to describe both new features.
  • HoodieSparkBaseAnalysis.scala
    • Plumbed a.filter and a.maxDistance through to buildSingleQueryPlan / buildBatchQueryPlan in the ResolveReferences rule.
  • TestHoodieVectorSearchFunction.scala
    • Updated testArgumentCountValidation to expect the new 4-8 range.
    • Added 12 new tests covering: filter alone, max_distance alone, NULL passthrough for both, combined filter + threshold, cosine and L2 metrics, batch mode, and filter on a non-Hudi in-memory view (regression guard against Hudi-specific coupling). Every test asserts exact row count, ids, and distance values (epsilon-bounded for floats).

Impact

Public API: both vector-search TVFs gain two new trailing optional arguments. The change is fully backward compatible — existing call sites with 4/5/6/7 arguments behave identically. No SerDe or storage format change.

Performance:

  • filter lets users push selective predicates down to the Hudi reader, which can skip entire partitions and file groups via column stats — significant savings on large corpora where the relevant slice is small.
  • max_distance lets users trade recall for shuffle/sort cost when they have a meaningful threshold. With a tight threshold, the input to the partial sort in single-query mode and the window shuffle in batch mode can shrink dramatically.

No regressions expected on the existing code path: when both args are None (the default), the planner falls through to exactly the pre-existing behavior.

Risk Level

low

  • Backward compatible — old SQL still parses and runs unchanged.
  • Both new code paths are gated on Option.foreach, so omitting the args is a strict no-op.
  • 12 new tests cover the feature surface; the existing test suite (negative, positive, batch, byte/float/double corpora, multi-metric) continues to pass.

Documentation Update

  • Javadoc on VectorSearchAlgorithm.buildSingleQueryPlan / buildBatchQueryPlan and on BruteForceSearchAlgorithm updated to describe the new parameters, including the double-quote-inside-the-filter convention.
  • The user-facing TVF docs on the Hudi website should be updated to describe the new arguments (will follow up with a website PR if reviewers agree).
  • No new configs added.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds optional pre-filter and max-distance parameters to the vector search TVFs in both single-query and batch modes, with the filter applied before distance computation (to enable partition pruning / shrink cross-join cardinality) and max-distance applied before ordering/windowing (to reduce sort and shuffle volume). No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One readability nit on the var/foreach-mutation pattern in the Scala implementation; otherwise the code looks clean.

cc @yihua

// Apply pre-filter before distance computation to enable Hudi partition pruning
// and data skipping, reducing the number of rows that need distance computation.
var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
filter.foreach(f => filteredDf = applyFilter(filteredDf, f))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: using var + foreach to optionally mutate a local is a bit non-idiomatic Scala — could you use fold instead? e.g. val base = corpusDf.filter(col(embeddingCol).isNotNull); val filteredDf = filter.fold(base)(applyFilter(base, _)) keeps everything a val. The same pattern appears for filteredCorpus (batch path, line ~347) and both scored locals.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@github-actions github-actions Bot added the size:L PR with lines of changes in (300, 1000] label May 20, 2026
- Narrow applyFilter to catch only ParseException + AnalysisException so
  unrelated runtime errors are not misreported as filter problems.
- Reject non-numeric literals (including strings whose contents parse as
  numbers) for max_distance via explicit NumericType guard in
  parseOptionalDouble; the previous toString.toDouble path silently
  accepted '0.5' as a string.
- Document on both parseArgs Javadocs that NULL, empty string, and
  whitespace-only strings all mean "no filter," matching what the
  parseOptionalString helper already implements.
- Add failure-mode tests for invalid filter syntax, unknown filter
  column, non-string filter literal, non-numeric max_distance literal,
  integer max_distance widening, negative max_distance, empty/whitespace
  filter equivalence with no-filter, and batch-mode error wrapping.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rahil-c rahil-c requested a review from yihua May 21, 2026 22:28
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This adds optional filter and max_distance arguments to both vector-search TVFs, with the filter pushed before distance computation (enabling Hudi partition pruning / data skipping) and max_distance applied before sort/window to reduce shuffle volume. I traced the parsing, plan structure, and error-wrapping paths and didn't spot anything functionally off — the test matrix covers null/empty/whitespace filters, type validation, negative thresholds, and the batch path. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One small simplification suggestion below; otherwise reads cleanly.

cc @yihua

val filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
// Apply pre-filter before distance computation to enable Hudi partition pruning
// and data skipping, reducing the number of rows that need distance computation.
var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the var filteredDf = ...; filter.foreach(f => filteredDf = applyFilter(filteredDf, f)) pattern (repeated for scored and filteredCorpus in both plan builders) could be expressed without a mutable local, e.g. val filteredDf = filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter). Reads a bit more idiomatically and avoids the reassignment.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@rahil-c rahil-c requested a review from voonhous May 22, 2026 01:07
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds optional filter and max_distance arguments to the hudi_vector_search and hudi_vector_search_batch TVFs, with pre-filter applied before distance computation to enable pushdown and threshold applied before ordering/windowing to reduce shuffle. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of nits below — a stray asterisk in the new class-level Javadoc and a non-idiomatic Scala mutation pattern used for applying the optional filter and threshold.

cc @yihua

* (tens to low hundreds of queries) against moderate corpora.
*
* <p>Both modes support an optional {@code filter} predicate (applied to the corpus before
* distance computation, and an optional * {@code maxDistance} threshold (results beyond this distance are excluded before top-K
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: there's a stray * mid-sentence here — and an optional * {@code maxDistance} looks like a copy-paste artifact from a Javadoc line prefix; could you drop the extra asterisk?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address

val filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
// Apply pre-filter before distance computation to enable reducing the number of rows that need distance computation.
var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
filter.foreach(f => filteredDf = applyFilter(filteredDf, f))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: mutating a var inside foreach is non-idiomatic Scala — have you considered val filteredDf = filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter) instead? The same pattern also appears for filteredCorpus and scored later in the file.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@voonhous
Copy link
Copy Markdown
Member

The user-facing TVF docs on the Hudi website should be updated to describe the new arguments (will follow up with a website PR if reviewers agree).

Please add an issue and track this inline in the PR description.

Copy link
Copy Markdown
Member

@voonhous voonhous left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some minor nit comments, i think they are worth addressing.

Comment on lines +136 to +142
private[logical] def parseOptionalDouble(
funcName: String, expr: Expression, argName: String): Option[Double] = expr match {
case Literal(null, _) => None
case Literal(v, _: NumericType) if v != null => Some(v.toString.toDouble)
case _ => throw new HoodieAnalysisException(
s"Function '$funcName': argument '$argName' must be a numeric literal or NULL, got: ${expr.sql}")
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests for parseOptionalDouble non-Double numeric inputs, the helper claims to accept Int/Long/Decimal/Short/Byte and widen it.

Let's add a single test passing e.g. 0 (an Int literal) or a CAST(0.3 AS DECIMAL) would protect that contract.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 167 to +168
searchAlgorithm.buildBatchQueryPlan(
spark, corpusDf, a.corpusEmbeddingCol, queryDf, a.queryEmbeddingCol, a.k, a.metric)
spark, corpusDf, a.corpusEmbeddingCol, queryDf, a.queryEmbeddingCol,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC a.filter affectssz corpus-only, not query-side

In batch mode, the filter only narrows the corpus side, and not the query relation.

Let's call this out. If users assume "filter applies to both", they'll be confused. Add a Javadoc sentence in buildBatchQueryPlan: "Applied to the corpus only, use a prior projection on the query table to filter queries."

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice callout @voonhous, will update this

rahil-c and others added 2 commits May 26, 2026 08:51
- Fix stray asterisk in BruteForceSearchAlgorithm Javadoc
- Drop redundant default parameter values on BruteForceSearchAlgorithm
  overrides (inherited from the trait)
- Clarify in buildBatchQueryPlan Javadoc that the filter applies to the
  corpus only, not the query side
- Add test exercising parseOptionalDouble with non-Double numeric
  literals (Int and Decimal) to lock in the widening contract
…ls for inclusive threshold

The Int-literal sub-case expected 3 rows with max_distance=1, on the
incorrect assumption that the threshold filter is strict (>). The actual
implementation uses <= (consistent with testSingleQueryMaxDistanceExcludesAll
which depends on doc_1 at distance 0.0 being kept when max_distance=0.0).

With <= and max_distance=1.0, doc_2 and doc_3 (both at distance 1.0) are
included, returning all 5 corpus rows. Updated the expected count and
flipped the negative doc_2 assertion to positive doc_2 + doc_3 checks
with messages naming the inclusive-boundary contract, so the test now
also acts as a regression guard for that semantics.

The Decimal sub-case was unaffected.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR adds optional filter and max_distance arguments to the two vector-search TVFs, with filter pushdown applied before distance computation (and before the cross-join in batch mode) and the threshold applied via inclusive <= ahead of sort/window. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability nits around the new var + Option.foreach mutation pattern; otherwise looks clean.

cc @yihua

var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
filter.foreach(f => filteredDf = applyFilter(filteredDf, f))

// Validate byte corpus query vector values before creating the UDF.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the var filteredDf + filter.foreach(...) reassignment reads awkwardly in Scala. Could you switch to something like val filteredDf = filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter) (or filter.map(applyFilter(base, _)).getOrElse(base)) so the binding stays immutable?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

val distanceUdf = VectorDistanceUtils.createSingleQueryDistanceUdf(metric, elemType, queryVector)

val result = filteredDf
var scored = filteredDf
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: same var scored = ...; maxDistance.foreach(...) pattern here — folding the Option in (val scored = maxDistance.foldLeft(scoredBase)((df, d) => df.filter(col(DISTANCE_COL) <= d))) avoids the rebinding and matches the style used elsewhere in the file.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@rahil-c rahil-c requested a review from voonhous May 26, 2026 17:18
@rahil-c
Copy link
Copy Markdown
Collaborator Author

rahil-c commented May 26, 2026

Please add an issue and track this inline in the PR description.

Will be taking this up in general 1.2.0 release notes callout

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR adds optional filter and max_distance arguments to both vector search TVFs, with sensible NULL/empty handling and inclusive threshold semantics. I traced through the analysis paths carefully — including verifying that Spark 3.5.5's Dataset constructor triggers assertAnalyzed(), so the applyFilter catch correctly handles both ParseException and AnalysisException for unresolved columns. No critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of small simplification nits below — overall the changes read cleanly.

cc @yihua

queryVector: Array[Double],
k: Int,
metric: DistanceMetric.Value): LogicalPlan = {
metric: DistanceMetric.Value,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the var x = ...; opt.foreach(f => x = ...) pattern (used three times in this file — here, in scored for the single-query path, and again for filteredCorpus/scored in the batch path) is a bit unidiomatic in Scala. Could you express it as a val with filter.map(applyFilter(base, _)).getOrElse(base) or a small fold? That keeps the intermediate DataFrame immutable and avoids reassignment.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

…sult

Replace the toString.toDouble round-trip in parseOptionalDouble with a
direct pattern match on Number / Decimal, avoiding the unnecessary string
conversion (especially wasteful for Decimal literals).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the continued iteration on this! The pre-filter and max_distance plumbing reads correctly — the order of operations (validation → isNotNull → user filter → distance UDF → max_distance threshold → sort/limit) preserves correctness while enabling the intended optimization, NULL handling in both parse helpers is consistent, and the error wrapping in applyFilter echoes the offending expression for clearer user feedback. The test coverage exercises the key corner cases (NULL args, combined filter+threshold, non-Double numerics, invalid syntax). No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One minor style suggestion below — using a val + foldLeft would let you avoid the repeated var + foreach-mutation pattern.

cc @yihua

validateEmbeddingColumn(corpusDf, embeddingCol)
validateQueryVectorDimension(corpusDf, embeddingCol, queryVector.length)

val elemType = getElementType(corpusDf, embeddingCol)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: this var + foreach-mutation pattern repeats four times across the two buildXxxQueryPlan methods. Could fold it into a single val for clarity, e.g. val filteredDf = filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter) — and similarly val scored = maxDistance.foldLeft(...)((df, d) => df.filter(col(DISTANCE_COL) <= d)).

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 75.00000% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.79%. Comparing base (9565926) to head (3a9a650).
⚠️ Report is 17 commits behind head on master.

Files with missing lines Patch % Lines
...ogical/HoodieVectorSearchTableValuedFunction.scala 73.80% 5 Missing and 6 partials ⚠️
.../hudi/analysis/HoodieVectorSearchPlanBuilder.scala 72.22% 4 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18797      +/-   ##
============================================
+ Coverage     68.25%   68.79%   +0.53%     
+ Complexity    29337    29140     -197     
============================================
  Files          2527     2514      -13     
  Lines        141858   139960    -1898     
  Branches      17627    17196     -431     
============================================
- Hits          96827    96287     -540     
+ Misses        37068    35896    -1172     
+ Partials       7963     7777     -186     
Flag Coverage Δ
common-and-other-modules 44.32% <0.00%> (-0.10%) ⬇️
hadoop-mr-java-client 44.84% <ø> (-0.07%) ⬇️
spark-client-hadoop-common 48.22% <ø> (-0.02%) ⬇️
spark-java-tests 49.37% <75.00%> (+0.50%) ⬆️
spark-scala-tests 45.26% <0.00%> (+0.33%) ⬆️
utilities 37.41% <0.00%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...rk/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala 71.06% <100.00%> (+0.24%) ⬆️
.../hudi/analysis/HoodieVectorSearchPlanBuilder.scala 76.47% <72.22%> (-2.04%) ⬇️
...ogical/HoodieVectorSearchTableValuedFunction.scala 74.16% <73.80%> (+2.57%) ⬆️

... and 65 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Member

@voonhous voonhous left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@voonhous voonhous merged commit 853cbef into apache:master May 28, 2026
58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement functionality for pre-filters and maxDistance threshold parameters

5 participants