Skip to content

fix(spark): Replace mapInArrow with foreachPartition and bound write memory#6441

Open
abhijeet-dhumal wants to merge 8 commits into
feast-dev:masterfrom
abhijeet-dhumal:fix/spark-foreachpartition-materialization
Open

fix(spark): Replace mapInArrow with foreachPartition and bound write memory#6441
abhijeet-dhumal wants to merge 8 commits into
feast-dev:masterfrom
abhijeet-dhumal:fix/spark-foreachpartition-materialization

Conversation

@abhijeet-dhumal
Copy link
Copy Markdown
Contributor

@abhijeet-dhumal abhijeet-dhumal commented May 27, 2026

What this PR does / why we need it

Spark 3.5 inserts WindowGroupLimitExec upstream of MapInArrowExec when UDFs use Window operations, causing materialization to fail with:

AttributeError: 'list' object has no attribute 'dtype'

Fix 1: Replace mapInArrow with foreachPartition (pickle-based, no Arrow bridge) to eliminate the serializer mismatch.

Fix 2: Re-apply spark.sql.* / spark.hadoop.* session configs after SparkSession.getOrCreate() — these are silently dropped when reusing a warm session in K8s-mode, causing S3 access failures.

Fix 3: Chunk foreachPartition writes (chunk_size=1000) to prevent MemoryError / OOMKill on large feature views (10M+ rows).

Which issue(s) this PR fixes

Fixes BatchFeatureView materialization with Spark 3.5+ and vector stores (Milvus, Redis).

Checks

  • I've made sure the tests are passing.
  • My commits are signed off (git commit -s)
  • My PR title follows conventional commits format

Testing Strategy

  • Unit tests — session config forwarding, chunked write, empty partition handling
  • Manual tests — 26M+ keys materialized to Redis across 4 K8s GPU executors

@abhijeet-dhumal abhijeet-dhumal changed the title Fix/spark foreachpartition materialization fix(spark): Replace mapInArrow with foreachPartition and bound write memory May 27, 2026
@abhijeet-dhumal abhijeet-dhumal marked this pull request as ready for review May 28, 2026 06:23
@abhijeet-dhumal abhijeet-dhumal requested a review from a team as a code owner May 28, 2026 06:23
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 6 additional findings in Devin Review.

Open in Devin Review

spark_session, feature_views, query_context
)

spark_query_context = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Removal of _apply_bfv_transformations silently returns untransformed features via SparkOfflineStore.get_historical_features()

The _apply_bfv_transformations call was the only mechanism applying BatchFeatureView UDF transformations in the SparkOfflineStore.get_historical_features() codepath. That path is still the one used by the standard FeatureStore.get_historical_features() API (which routes through passthrough_provider.get_historical_features() at sdk/python/feast/infra/passthrough_provider.py:481SparkOfflineStore.get_historical_features()). The compute engine DAG path (SparkComputeEngine.get_historical_features) handles transformations via SparkTransformationNode, but nothing in FeatureStore.get_historical_features() routes through that path.

After this removal, any user with a BatchFeatureView that has a UDF transformation, using the Spark offline store, calling fs.get_historical_features() will silently receive raw untransformed feature values. The table_subquery in the query context now always points to the raw source table, and the PIT join query reads raw columns without applying the UDF.

Prompt for agents
The removal of _apply_bfv_transformations from SparkOfflineStore.get_historical_features() breaks BFV UDF transformations for the standard FeatureStore.get_historical_features() API path, which routes through passthrough_provider → SparkOfflineStore rather than the SparkComputeEngine DAG path.

Options to fix:
1. Restore the _apply_bfv_transformations call in SparkOfflineStore.get_historical_features() until the FeatureStore API is updated to route through the compute engine for historical features.
2. Update FeatureStore.get_historical_features() (or PassthroughProvider.get_historical_features()) to detect when a BatchFeatureView has a UDF and route through SparkComputeEngine.get_historical_features() instead of the offline store directly.
3. If the intent is to deprecate this path, add an explicit warning or error when a BFV with a UDF is passed through the offline store path.

Relevant files: sdk/python/feast/infra/passthrough_provider.py (get_historical_features at line 470), sdk/python/feast/feature_store.py (get_historical_features at line 1574), sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py (get_historical_features at line 160).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhijeet-dhumal why this removal ?

Copy link
Copy Markdown
Contributor Author

@abhijeet-dhumal abhijeet-dhumal May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the removal was unintentional in this PR.
Now it's fixed ✅

@abhijeet-dhumal abhijeet-dhumal force-pushed the fix/spark-foreachpartition-materialization branch from d666762 to 9c217ae Compare May 29, 2026 09:09
@tokoko
Copy link
Copy Markdown
Collaborator

tokoko commented May 29, 2026

This is gonna be a terrible performance degradation. I'm not sure I understand what the actual issue is. how are WindowGroupLimitExec and AttributeError: 'list' object has no attribute 'dtype' related?

jyejare

This comment was marked as spam.

…config forwarding for vector store materialization

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…MemoryError/OOMKill on large feature views

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…leted _apply_bfv_transformations

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…orical_features

The function and its call were removed in this PR but the replacement
(_apply_bfv_transformations_for_historical) lives in a separate PR (feast-dev#6440).
Removing it here would silently return raw untransformed features for any
BatchFeatureView with a Python UDF via the standard get_historical_features()
API path (FeatureStore → passthrough_provider → SparkOfflineStore).

Restoring the function and its call until feast-dev#6440 lands.

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
@abhijeet-dhumal abhijeet-dhumal force-pushed the fix/spark-foreachpartition-materialization branch from 9c217ae to 8442a94 Compare June 1, 2026 07:39
@abhijeet-dhumal
Copy link
Copy Markdown
Contributor Author

This is gonna be a terrible performance degradation. I'm not sure I understand what the actual issue is. how are WindowGroupLimitExec and AttributeError: 'list' object has no attribute 'dtype' related?

@tokoko Could you please clarify what performance degradation you're referring to? foreachPartition is used here purely for writing to the online store (Redis, Milvus, etc.) that path is I/O-bound, not serialization-bound, so pickle vs Arrow overhead is negligible.

On the root cause: in Spark 3.5, WindowGroupLimitExec gets inserted upstream of MapInArrowExec when Window operations are present. It materializes rows as plain Python lists before they reach the Arrow bridge.. so mapInArrow fails because it expects pyarrow.RecordBatch (which has .dtype), not a list.

Happy to circle back on this if it seems suspicious to you, Thanks for your review !

- Restore test_spark_bfv_compute_on_read.py accidentally deleted with _apply_bfv_transformations
- Log applied runtime configs for reused SparkSession observability
- Validate text_col exists in spark_embed before Spark evaluation

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
@abhijeet-dhumal abhijeet-dhumal force-pushed the fix/spark-foreachpartition-materialization branch from 8442a94 to 4da54d2 Compare June 1, 2026 07:46
@abhijeet-dhumal abhijeet-dhumal requested a review from ntkathole June 1, 2026 07:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants