From b1718af92b524655fff428d0f30f654790feea04 Mon Sep 17 00:00:00 2001 From: jlaportebot Date: Mon, 1 Jun 2026 09:00:41 -0400 Subject: [PATCH 1/2] test(task-sdk): add regression tests for xcom_pull default parameter When map_indexes was not explicitly set (ArgNotSet), xcom_pull() ignored the user-provided default parameter and always returned None instead. Add parametrized tests covering single-task, multi-task, and explicit map_indexes paths with various default values. --- .../execution_time/test_task_runner.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 507e63cfff072..d3498beecd6d2 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -2430,6 +2430,68 @@ def mock_send_side_effect(*args, **kwargs): ), ) + @pytest.mark.parametrize( + ("default_value", "task_ids", "map_indexes"), + [ + pytest.param("fallback", "task_a", NOTSET, id="single-task-default-notset"), + pytest.param(42, "task_a", NOTSET, id="single-task-int-default-notset"), + pytest.param("fallback", None, NOTSET, id="none-task-ids-default-notset"), + pytest.param("fallback", "task_a", 0, id="single-task-default-explicit-map"), + pytest.param("fallback", "task_a", None, id="single-task-default-map-none"), + pytest.param("fallback", ["task_a", "task_b"], NOTSET, id="multi-task-default-notset"), + ], + ) + def test_xcom_pull_returns_default_when_no_xcom_found( + self, + default_value, + task_ids, + map_indexes, + create_runtime_ti, + mock_supervisor_comms, + ): + """Test that xcom_pull returns the user-provided default when no XCom is found. + + This is a regression test for the bug where xcom_pull() ignored the ``default`` + parameter when ``map_indexes`` was not explicitly set (ArgNotSet), always + returning None instead of the provided default value. + """ + task = BaseOperator(task_id="pull_task") + runtime_ti = create_runtime_ti(task=task) + + with ( + patch.object(XCom, "get_one", return_value=None), + patch.object(XCom, "get_all", return_value=None), + ): + result = runtime_ti.xcom_pull( + key="key", + task_ids=task_ids, + map_indexes=map_indexes, + default=default_value, + ) + + if map_indexes == NOTSET: + # NOTSET path: single task returns default directly; multi-task returns list + if isinstance(task_ids, (str, type(None))): + assert result == default_value + else: + assert result == [default_value, default_value] + else: + # Explicit map_indexes path: single task + single map index returns default directly + assert result == default_value + + def test_xcom_pull_default_none_when_not_specified( + self, + create_runtime_ti, + mock_supervisor_comms, + ): + """Test that xcom_pull returns None as default when no default is provided and no XCom found.""" + task = BaseOperator(task_id="pull_task") + runtime_ti = create_runtime_ti(task=task) + + with patch.object(XCom, "get_all", return_value=None): + result = runtime_ti.xcom_pull(key="key", task_ids="task_a") + assert result is None + def test_get_param_from_context( self, mocked_parse, make_ti_context, mock_supervisor_comms, create_runtime_ti ): From 1903ab2541e63316b344f2540cae1df81b8fb304 Mon Sep 17 00:00:00 2001 From: jlaportebot Date: Mon, 1 Jun 2026 09:01:14 -0400 Subject: [PATCH 2/2] fix(task-sdk): respect default parameter in xcom_pull when no XCom found When xcom_pull() was called with map_indexes=ArgNotSet (the default) and no XCom value was found, the code appended None to the results list instead of the user-provided default value. This caused xcom_pull to always return None when no data existed, ignoring the default parameter. Fix by appending default instead of None when values is None. --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 9f2599a19634b..52c1e9db38a9c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -483,7 +483,7 @@ def xcom_pull( ) if values is None: - xcoms.append(None) + xcoms.append(default) else: xcoms.extend(values) # For single task pulling from unmapped task, return single value