feat(vector_search): Implement functionality for pre-filters and maxD…#18797
Conversation
…istance threshold parameters
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds optional pre-filter and max-distance parameters to the vector search TVFs in both single-query and batch modes, with the filter applied before distance computation (to enable partition pruning / shrink cross-join cardinality) and max-distance applied before ordering/windowing (to reduce sort and shuffle volume). No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One readability nit on the var/foreach-mutation pattern in the Scala implementation; otherwise the code looks clean.
cc @yihua
| // Apply pre-filter before distance computation to enable Hudi partition pruning | ||
| // and data skipping, reducing the number of rows that need distance computation. | ||
| var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull) | ||
| filter.foreach(f => filteredDf = applyFilter(filteredDf, f)) |
There was a problem hiding this comment.
🤖 nit: using var + foreach to optionally mutate a local is a bit non-idiomatic Scala — could you use fold instead? e.g. val base = corpusDf.filter(col(embeddingCol).isNotNull); val filteredDf = filter.fold(base)(applyFilter(base, _)) keeps everything a val. The same pattern appears for filteredCorpus (batch path, line ~347) and both scored locals.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
- Narrow applyFilter to catch only ParseException + AnalysisException so unrelated runtime errors are not misreported as filter problems. - Reject non-numeric literals (including strings whose contents parse as numbers) for max_distance via explicit NumericType guard in parseOptionalDouble; the previous toString.toDouble path silently accepted '0.5' as a string. - Document on both parseArgs Javadocs that NULL, empty string, and whitespace-only strings all mean "no filter," matching what the parseOptionalString helper already implements. - Add failure-mode tests for invalid filter syntax, unknown filter column, non-string filter literal, non-numeric max_distance literal, integer max_distance widening, negative max_distance, empty/whitespace filter equivalence with no-filter, and batch-mode error wrapping. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This adds optional filter and max_distance arguments to both vector-search TVFs, with the filter pushed before distance computation (enabling Hudi partition pruning / data skipping) and max_distance applied before sort/window to reduce shuffle volume. I traced the parsing, plan structure, and error-wrapping paths and didn't spot anything functionally off — the test matrix covers null/empty/whitespace filters, type validation, negative thresholds, and the batch path. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One small simplification suggestion below; otherwise reads cleanly.
cc @yihua
| val filteredDf = corpusDf.filter(col(embeddingCol).isNotNull) | ||
| // Apply pre-filter before distance computation to enable Hudi partition pruning | ||
| // and data skipping, reducing the number of rows that need distance computation. | ||
| var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull) |
There was a problem hiding this comment.
🤖 nit: the var filteredDf = ...; filter.foreach(f => filteredDf = applyFilter(filteredDf, f)) pattern (repeated for scored and filteredCorpus in both plan builders) could be expressed without a mutable local, e.g. val filteredDf = filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter). Reads a bit more idiomatically and avoids the reassignment.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds optional filter and max_distance arguments to the hudi_vector_search and hudi_vector_search_batch TVFs, with pre-filter applied before distance computation to enable pushdown and threshold applied before ordering/windowing to reduce shuffle. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of nits below — a stray asterisk in the new class-level Javadoc and a non-idiomatic Scala mutation pattern used for applying the optional filter and threshold.
cc @yihua
| * (tens to low hundreds of queries) against moderate corpora. | ||
| * | ||
| * <p>Both modes support an optional {@code filter} predicate (applied to the corpus before | ||
| * distance computation, and an optional * {@code maxDistance} threshold (results beyond this distance are excluded before top-K |
There was a problem hiding this comment.
🤖 nit: there's a stray * mid-sentence here — and an optional * {@code maxDistance} looks like a copy-paste artifact from a Javadoc line prefix; could you drop the extra asterisk?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| val filteredDf = corpusDf.filter(col(embeddingCol).isNotNull) | ||
| // Apply pre-filter before distance computation to enable reducing the number of rows that need distance computation. | ||
| var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull) | ||
| filter.foreach(f => filteredDf = applyFilter(filteredDf, f)) |
There was a problem hiding this comment.
🤖 nit: mutating a var inside foreach is non-idiomatic Scala — have you considered val filteredDf = filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter) instead? The same pattern also appears for filteredCorpus and scored later in the file.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Please add an issue and track this inline in the PR description. |
voonhous
left a comment
There was a problem hiding this comment.
Added some minor nit comments, i think they are worth addressing.
| private[logical] def parseOptionalDouble( | ||
| funcName: String, expr: Expression, argName: String): Option[Double] = expr match { | ||
| case Literal(null, _) => None | ||
| case Literal(v, _: NumericType) if v != null => Some(v.toString.toDouble) | ||
| case _ => throw new HoodieAnalysisException( | ||
| s"Function '$funcName': argument '$argName' must be a numeric literal or NULL, got: ${expr.sql}") | ||
| } |
There was a problem hiding this comment.
No tests for parseOptionalDouble non-Double numeric inputs, the helper claims to accept Int/Long/Decimal/Short/Byte and widen it.
Let's add a single test passing e.g. 0 (an Int literal) or a CAST(0.3 AS DECIMAL) would protect that contract.
| searchAlgorithm.buildBatchQueryPlan( | ||
| spark, corpusDf, a.corpusEmbeddingCol, queryDf, a.queryEmbeddingCol, a.k, a.metric) | ||
| spark, corpusDf, a.corpusEmbeddingCol, queryDf, a.queryEmbeddingCol, |
There was a problem hiding this comment.
IIUC a.filter affectssz corpus-only, not query-side
In batch mode, the filter only narrows the corpus side, and not the query relation.
Let's call this out. If users assume "filter applies to both", they'll be confused. Add a Javadoc sentence in buildBatchQueryPlan: "Applied to the corpus only, use a prior projection on the query table to filter queries."
There was a problem hiding this comment.
Nice callout @voonhous, will update this
- Fix stray asterisk in BruteForceSearchAlgorithm Javadoc - Drop redundant default parameter values on BruteForceSearchAlgorithm overrides (inherited from the trait) - Clarify in buildBatchQueryPlan Javadoc that the filter applies to the corpus only, not the query side - Add test exercising parseOptionalDouble with non-Double numeric literals (Int and Decimal) to lock in the widening contract
…ls for inclusive threshold The Int-literal sub-case expected 3 rows with max_distance=1, on the incorrect assumption that the threshold filter is strict (>). The actual implementation uses <= (consistent with testSingleQueryMaxDistanceExcludesAll which depends on doc_1 at distance 0.0 being kept when max_distance=0.0). With <= and max_distance=1.0, doc_2 and doc_3 (both at distance 1.0) are included, returning all 5 corpus rows. Updated the expected count and flipped the negative doc_2 assertion to positive doc_2 + doc_3 checks with messages naming the inclusive-boundary contract, so the test now also acts as a regression guard for that semantics. The Decimal sub-case was unaffected. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds optional filter and max_distance arguments to the two vector-search TVFs, with filter pushdown applied before distance computation (and before the cross-join in batch mode) and the threshold applied via inclusive <= ahead of sort/window. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability nits around the new var + Option.foreach mutation pattern; otherwise looks clean.
cc @yihua
| var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull) | ||
| filter.foreach(f => filteredDf = applyFilter(filteredDf, f)) | ||
|
|
||
| // Validate byte corpus query vector values before creating the UDF. |
There was a problem hiding this comment.
🤖 nit: the var filteredDf + filter.foreach(...) reassignment reads awkwardly in Scala. Could you switch to something like val filteredDf = filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter) (or filter.map(applyFilter(base, _)).getOrElse(base)) so the binding stays immutable?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| val distanceUdf = VectorDistanceUtils.createSingleQueryDistanceUdf(metric, elemType, queryVector) | ||
|
|
||
| val result = filteredDf | ||
| var scored = filteredDf |
There was a problem hiding this comment.
🤖 nit: same var scored = ...; maxDistance.foreach(...) pattern here — folding the Option in (val scored = maxDistance.foldLeft(scoredBase)((df, d) => df.filter(col(DISTANCE_COL) <= d))) avoids the rebinding and matches the style used elsewhere in the file.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Will be taking this up in general 1.2.0 release notes callout |
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds optional filter and max_distance arguments to both vector search TVFs, with sensible NULL/empty handling and inclusive threshold semantics. I traced through the analysis paths carefully — including verifying that Spark 3.5.5's Dataset constructor triggers assertAnalyzed(), so the applyFilter catch correctly handles both ParseException and AnalysisException for unresolved columns. No critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of small simplification nits below — overall the changes read cleanly.
cc @yihua
| queryVector: Array[Double], | ||
| k: Int, | ||
| metric: DistanceMetric.Value): LogicalPlan = { | ||
| metric: DistanceMetric.Value, |
There was a problem hiding this comment.
🤖 nit: the var x = ...; opt.foreach(f => x = ...) pattern (used three times in this file — here, in scored for the single-query path, and again for filteredCorpus/scored in the batch path) is a bit unidiomatic in Scala. Could you express it as a val with filter.map(applyFilter(base, _)).getOrElse(base) or a small fold? That keeps the intermediate DataFrame immutable and avoids reassignment.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
…sult Replace the toString.toDouble round-trip in parseOptionalDouble with a direct pattern match on Number / Decimal, avoiding the unnecessary string conversion (especially wasteful for Decimal literals). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the continued iteration on this! The pre-filter and max_distance plumbing reads correctly — the order of operations (validation → isNotNull → user filter → distance UDF → max_distance threshold → sort/limit) preserves correctness while enabling the intended optimization, NULL handling in both parse helpers is consistent, and the error wrapping in applyFilter echoes the offending expression for clearer user feedback. The test coverage exercises the key corner cases (NULL args, combined filter+threshold, non-Double numerics, invalid syntax). No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One minor style suggestion below — using a val + foldLeft would let you avoid the repeated var + foreach-mutation pattern.
cc @yihua
| validateEmbeddingColumn(corpusDf, embeddingCol) | ||
| validateQueryVectorDimension(corpusDf, embeddingCol, queryVector.length) | ||
|
|
||
| val elemType = getElementType(corpusDf, embeddingCol) |
There was a problem hiding this comment.
🤖 nit: this var + foreach-mutation pattern repeats four times across the two buildXxxQueryPlan methods. Could fold it into a single val for clarity, e.g. val filteredDf = filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter) — and similarly val scored = maxDistance.foldLeft(...)((df, d) => df.filter(col(DISTANCE_COL) <= d)).
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18797 +/- ##
============================================
+ Coverage 68.25% 68.79% +0.53%
+ Complexity 29337 29140 -197
============================================
Files 2527 2514 -13
Lines 141858 139960 -1898
Branches 17627 17196 -431
============================================
- Hits 96827 96287 -540
+ Misses 37068 35896 -1172
+ Partials 7963 7777 -186
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Closes #18459
The
hudi_vector_searchandhudi_vector_search_batchTVFs (introduced in #18432) currently scan the entire corpus when computing distances, even when the user only cares about a narrow slice of it (e.g. one partition, one tenant, one category). They also always return the top-K closest rows regardless of how far away those rows actually are — there is no way to express "give me neighbors only if they are closer than X."This PR adds two optional, NULL-able arguments to both TVFs to address these gaps:
filter— a SQL predicate applied to the corpus before distance computation shrinking the cross-join cardinality in batch mode).max_distance— a distance threshold; rows beyond this distance are dropped before top-K selection, reducing shuffle/sort volume and giving users an "only return neighbors closer than X" semantic.Summary and Changelog
User-visible change: both vector-search TVFs now accept two extra optional positional arguments.
hudi_vector_searchextends from 4–6 args to 4–8 args;hudi_vector_search_batchextends from 5–7 args to 5–9 args. Both new positions acceptNULLto mean "not specified," preserving the old behavior.A quoting note worth calling out: because the filter is itself a single-quoted TVF argument and Spark's lexer doesn't support
''as an escape sequence inside string literals, predicates that compare against string values must use double quotes inside (e.g.'label = "z-axis"'). This is documented on theVectorSearchAlgorithm.buildSingleQueryPlanJavadoc.Detailed log:
HoodieVectorSearchTableValuedFunction.scalafilter: Option[String]andmaxDistance: Option[Double]toParsedArgsfor both the single-query and batch TVF objects.parseArgsranges (4-8and5-9) and error messages.parseOptionalString/parseOptionalDoublehelpers that acceptLiteral(null)→None, with error messages naming the offending argument.HoodieVectorSearchPlanBuilder.scalafilterandmaxDistanceparameters withNonedefaults to theVectorSearchAlgorithmtrait (buildSingleQueryPlanandbuildBatchQueryPlan) so future algorithm implementations inherit the contract.applyFilterhelper inBruteForceSearchAlgorithmthat wraps SparkAnalysisExceptions inHoodieAnalysisExceptionand echoes the offending predicate into the error message.orderBy.limit(single-query) and before theWindow.partitionBystep (batch) to reduce sort/shuffle input.BruteForceSearchAlgorithmto describe both new features.HoodieSparkBaseAnalysis.scalaa.filteranda.maxDistancethrough tobuildSingleQueryPlan/buildBatchQueryPlanin theResolveReferencesrule.TestHoodieVectorSearchFunction.scalatestArgumentCountValidationto expect the new4-8range.Impact
Public API: both vector-search TVFs gain two new trailing optional arguments. The change is fully backward compatible — existing call sites with 4/5/6/7 arguments behave identically. No SerDe or storage format change.
Performance:
filterlets users push selective predicates down to the Hudi reader, which can skip entire partitions and file groups via column stats — significant savings on large corpora where the relevant slice is small.max_distancelets users trade recall for shuffle/sort cost when they have a meaningful threshold. With a tight threshold, the input to the partial sort in single-query mode and the window shuffle in batch mode can shrink dramatically.No regressions expected on the existing code path: when both args are
None(the default), the planner falls through to exactly the pre-existing behavior.Risk Level
low
Option.foreach, so omitting the args is a strict no-op.Documentation Update
VectorSearchAlgorithm.buildSingleQueryPlan/buildBatchQueryPlanand onBruteForceSearchAlgorithmupdated to describe the new parameters, including the double-quote-inside-the-filter convention.Contributor's checklist