[SPARK-56322][CONNECT][PYTHON] Fix TypeError when self-joining observed DataFrames#55140
[SPARK-56322][CONNECT][PYTHON] Fix TypeError when self-joining observed DataFrames#55140mwojtyczka wants to merge 8 commits intoapache:masterfrom
Conversation
0cf0cc8 to
530cc24
Compare
…ed DataFrames
When a DataFrame with .observe() is filtered and then self-joined, both
branches carry the same Observation under the same name. The observations
property merged them using dict(**left, **right), which raises TypeError
on duplicate keyword arguments.
Replace dict(**a, **b) with {**a, **b} dict literal syntax in Join,
AsOfJoin, LateralJoin, SetOperation, and CollectMetrics. Dict literals
handle duplicate keys by letting the last value win, which is correct
here since both values are the same Observation instance.
Closes apache#55140
530cc24 to
a24c3e1
Compare
|
cc @hvanhovell What's the expected behavior if the same
>>> df.observe(obs, count(lit(1).alias("row_count")))
Traceback (most recent call last):
...
pyspark.errors.exceptions.base.PySparkAssertionError: [REUSE_OBSERVATION] An Observation can be used with a DataFrame only once.Currently classic returns UPDATE: nvm, the other case's error message suggested it's ok to appear multiple times if it's self-join: >>> obs2 = Observation("12345")
>>> df.observe(obs2, count(lit(1).alias("row_count")))
Traceback (most recent call last):
...
pyspark.errors.exceptions.captured.AnalysisException: [DUPLICATED_METRICS_NAME] The metric name is not unique: 12345. The same name cannot be used for metrics with different results.
However multiple instances of metrics with with same result and name are allowed (e.g. self-joins). SQLSTATE: 42710;
CollectMetrics 12345, [count(1) AS count(1 AS row_count)#7L], 2
+- CollectMetrics 12345, [count(1) AS count(1 AS row_count)#4L], 1
+- Project [id#0L, CASE WHEN (id#0L < cast(10 as bigint)) THEN A ELSE B END AS group_key#1]
+- Range (0, 100, step=1, splits=Some(16))
|
|
@mwojtyczka btw, could you add tests to |
ueshin
left a comment
There was a problem hiding this comment.
LGTM, pending to add a test in test_observation.py.
|
@mwojtyczka Could you also enable Github Actions to run the CI? |
thank you for the review!
|
|
Added a test to cover |
|
@mwojtyczka @ghanse Could you also add some more tests from my previous comment: For: obs = Observation("my_observation")
df = (
spark.range(100)
.selectExpr("id", "case when id < 10 then 'A' else 'B' end as group_key")
.observe(obs, count(lit(1)).alias("row_count"))
)
df.observe(obs, count(lit(1).alias("row_count"))).collect()
obs2 = Observation("12345")
df.observe(obs2, count(lit(1).alias("row_count"))).collect()both should raise exceptions above. |
| messageParameters={}, | ||
| ) | ||
|
|
||
| new_observation = Observation("new") |
There was a problem hiding this comment.
The name should be metric if checking DUPLICATED_METRICS_NAME.
| ) | ||
|
|
||
| new_observation = Observation("new") | ||
| with self.assertRaises(PySparkAssertionError) as pe: |
There was a problem hiding this comment.
The error should be AnalysisException for DUPLICATED_METRICS_NAME.
|
|
||
| new_observation = Observation("new") | ||
| with self.assertRaises(PySparkAssertionError) as pe: | ||
| df.observe(new_observation, 2 * F.count(F.lit(1)).alias("cnt")) |
There was a problem hiding this comment.
| df.observe(new_observation, 2 * F.count(F.lit(1)).alias("cnt")) | |
| observed.observe(new_observation, 2 * F.count(F.lit(1)).alias("cnt")).collect() |
| messageParameters={}, | ||
| ) | ||
|
|
||
| obs2 = Observation("12345") |
There was a problem hiding this comment.
| obs2 = Observation("12345") | |
| obs2 = Observation("my_observation") |
| messageParameters={}, | ||
| ) | ||
|
|
||
| obs2 = Observation("12345") |
There was a problem hiding this comment.
| obs2 = Observation("12345") | |
| obs2 = Observation("lateral_join_obs") |
What changes were proposed in this pull request?
Fixing bug: https://issues.apache.org/jira/browse/SPARK-56322
Replace
dict(**a, **b)with{**a, **b}dict literal syntax when merging observations across plan branches inJoin,AsOfJoin,LateralJoin,SetOperation, andCollectMetrics.Why are the changes needed?
When a DataFrame with
.observe()is filtered into two subsets and then self-joined, both branches of the join carry the sameObservationinstance under the same name. Theobservationsproperty merges left and right observations usingdict(**left, **right), which raisesTypeErrorwhen both dicts contain the same key:This is a Python semantics issue —
dict(**a, **b)treats each key as a keyword argument, and Python does not allow duplicate keyword arguments. The dict literal{**a, **b}does not have this restriction and silently lets the last value win, which is correct here since both values are the sameObservationinstance originating from the same.observe()call.Why "last value wins" is safe here: When a DataFrame is observed and then branched (filtered, aliased), both branches inherit a reference to the same
Observationinstance. The duplicate keys in the merge always map to the identical Python object — there is no scenario where two differentObservationinstances share the same name within a single plan tree. Therefore, deduplication does not lose any data.This pattern affects any workflow that:
This is common in data quality pipelines (split into valid/invalid rows, then rejoin) and ETL workflows that branch and merge.
How to reproduce
Does this PR introduce any user-facing change?
Yes — self-joining an observed DataFrame no longer raises
TypeError. No behavior change for joins where observations don't overlap (the common case).How was this patch tested?
Added unit tests in
test_connect_plan.pycovering:Joinwith duplicate observation names (the reported scenario)Joinwith distinct observations (regression check)SetOperationwith duplicate observation namesCollectMetricswith parent sharing the same observation nameAll tests fail on the unpatched code with the exact
TypeErrorand pass with the fix.Was this patch authored or co-authored using generative AI tooling?
Yes, Claude Code for testing