diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 9f2599a19634b..52c1e9db38a9c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -483,7 +483,7 @@ def xcom_pull( ) if values is None: - xcoms.append(None) + xcoms.append(default) else: xcoms.extend(values) # For single task pulling from unmapped task, return single value diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 507e63cfff072..d3498beecd6d2 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -2430,6 +2430,68 @@ def mock_send_side_effect(*args, **kwargs): ), ) + @pytest.mark.parametrize( + ("default_value", "task_ids", "map_indexes"), + [ + pytest.param("fallback", "task_a", NOTSET, id="single-task-default-notset"), + pytest.param(42, "task_a", NOTSET, id="single-task-int-default-notset"), + pytest.param("fallback", None, NOTSET, id="none-task-ids-default-notset"), + pytest.param("fallback", "task_a", 0, id="single-task-default-explicit-map"), + pytest.param("fallback", "task_a", None, id="single-task-default-map-none"), + pytest.param("fallback", ["task_a", "task_b"], NOTSET, id="multi-task-default-notset"), + ], + ) + def test_xcom_pull_returns_default_when_no_xcom_found( + self, + default_value, + task_ids, + map_indexes, + create_runtime_ti, + mock_supervisor_comms, + ): + """Test that xcom_pull returns the user-provided default when no XCom is found. + + This is a regression test for the bug where xcom_pull() ignored the ``default`` + parameter when ``map_indexes`` was not explicitly set (ArgNotSet), always + returning None instead of the provided default value. + """ + task = BaseOperator(task_id="pull_task") + runtime_ti = create_runtime_ti(task=task) + + with ( + patch.object(XCom, "get_one", return_value=None), + patch.object(XCom, "get_all", return_value=None), + ): + result = runtime_ti.xcom_pull( + key="key", + task_ids=task_ids, + map_indexes=map_indexes, + default=default_value, + ) + + if map_indexes == NOTSET: + # NOTSET path: single task returns default directly; multi-task returns list + if isinstance(task_ids, (str, type(None))): + assert result == default_value + else: + assert result == [default_value, default_value] + else: + # Explicit map_indexes path: single task + single map index returns default directly + assert result == default_value + + def test_xcom_pull_default_none_when_not_specified( + self, + create_runtime_ti, + mock_supervisor_comms, + ): + """Test that xcom_pull returns None as default when no default is provided and no XCom found.""" + task = BaseOperator(task_id="pull_task") + runtime_ti = create_runtime_ti(task=task) + + with patch.object(XCom, "get_all", return_value=None): + result = runtime_ti.xcom_pull(key="key", task_ids="task_a") + assert result is None + def test_get_param_from_context( self, mocked_parse, make_ti_context, mock_supervisor_comms, create_runtime_ti ):