fix(spark): Replace mapInArrow with foreachPartition and bound write memory#6441
Conversation
| spark_session, feature_views, query_context | ||
| ) | ||
|
|
||
| spark_query_context = [ |
There was a problem hiding this comment.
🔴 Removal of _apply_bfv_transformations silently returns untransformed features via SparkOfflineStore.get_historical_features()
The _apply_bfv_transformations call was the only mechanism applying BatchFeatureView UDF transformations in the SparkOfflineStore.get_historical_features() codepath. That path is still the one used by the standard FeatureStore.get_historical_features() API (which routes through passthrough_provider.get_historical_features() at sdk/python/feast/infra/passthrough_provider.py:481 → SparkOfflineStore.get_historical_features()). The compute engine DAG path (SparkComputeEngine.get_historical_features) handles transformations via SparkTransformationNode, but nothing in FeatureStore.get_historical_features() routes through that path.
After this removal, any user with a BatchFeatureView that has a UDF transformation, using the Spark offline store, calling fs.get_historical_features() will silently receive raw untransformed feature values. The table_subquery in the query context now always points to the raw source table, and the PIT join query reads raw columns without applying the UDF.
Prompt for agents
The removal of _apply_bfv_transformations from SparkOfflineStore.get_historical_features() breaks BFV UDF transformations for the standard FeatureStore.get_historical_features() API path, which routes through passthrough_provider → SparkOfflineStore rather than the SparkComputeEngine DAG path.
Options to fix:
1. Restore the _apply_bfv_transformations call in SparkOfflineStore.get_historical_features() until the FeatureStore API is updated to route through the compute engine for historical features.
2. Update FeatureStore.get_historical_features() (or PassthroughProvider.get_historical_features()) to detect when a BatchFeatureView has a UDF and route through SparkComputeEngine.get_historical_features() instead of the offline store directly.
3. If the intent is to deprecate this path, add an explicit warning or error when a BFV with a UDF is passed through the offline store path.
Relevant files: sdk/python/feast/infra/passthrough_provider.py (get_historical_features at line 470), sdk/python/feast/feature_store.py (get_historical_features at line 1574), sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py (get_historical_features at line 160).
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
the removal was unintentional in this PR.
Now it's fixed ✅
d666762 to
9c217ae
Compare
|
This is gonna be a terrible performance degradation. I'm not sure I understand what the actual issue is. how are |
…config forwarding for vector store materialization Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…MemoryError/OOMKill on large feature views Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…leted _apply_bfv_transformations Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…orical_features The function and its call were removed in this PR but the replacement (_apply_bfv_transformations_for_historical) lives in a separate PR (feast-dev#6440). Removing it here would silently return raw untransformed features for any BatchFeatureView with a Python UDF via the standard get_historical_features() API path (FeatureStore → passthrough_provider → SparkOfflineStore). Restoring the function and its call until feast-dev#6440 lands. Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
9c217ae to
8442a94
Compare
@tokoko Could you please clarify what performance degradation you're referring to? foreachPartition is used here purely for writing to the online store (Redis, Milvus, etc.) that path is I/O-bound, not serialization-bound, so pickle vs Arrow overhead is negligible. On the root cause: in Spark 3.5, WindowGroupLimitExec gets inserted upstream of MapInArrowExec when Window operations are present. It materializes rows as plain Python lists before they reach the Arrow bridge.. so mapInArrow fails because it expects pyarrow.RecordBatch (which has .dtype), not a list. Happy to circle back on this if it seems suspicious to you, Thanks for your review ! |
- Restore test_spark_bfv_compute_on_read.py accidentally deleted with _apply_bfv_transformations - Log applied runtime configs for reused SparkSession observability - Validate text_col exists in spark_embed before Spark evaluation Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com> Co-authored-by: Cursor <cursoragent@cursor.com>
8442a94 to
4da54d2
Compare
What this PR does / why we need it
Spark 3.5 inserts
WindowGroupLimitExecupstream ofMapInArrowExecwhen UDFs use Window operations, causing materialization to fail with:Fix 1: Replace
mapInArrowwithforeachPartition(pickle-based, no Arrow bridge) to eliminate the serializer mismatch.Fix 2: Re-apply
spark.sql.*/spark.hadoop.*session configs afterSparkSession.getOrCreate()— these are silently dropped when reusing a warm session in K8s-mode, causing S3 access failures.Fix 3: Chunk
foreachPartitionwrites (chunk_size=1000) to preventMemoryError/ OOMKill on large feature views (10M+ rows).Which issue(s) this PR fixes
Fixes
BatchFeatureViewmaterialization with Spark 3.5+ and vector stores (Milvus, Redis).Checks
git commit -s)Testing Strategy