Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def xcom_pull(
)

if values is None:
xcoms.append(None)
xcoms.append(default)
else:
xcoms.extend(values)
# For single task pulling from unmapped task, return single value
Expand Down
62 changes: 62 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2430,6 +2430,68 @@ def mock_send_side_effect(*args, **kwargs):
),
)

@pytest.mark.parametrize(
("default_value", "task_ids", "map_indexes"),
[
pytest.param("fallback", "task_a", NOTSET, id="single-task-default-notset"),
pytest.param(42, "task_a", NOTSET, id="single-task-int-default-notset"),
pytest.param("fallback", None, NOTSET, id="none-task-ids-default-notset"),
pytest.param("fallback", "task_a", 0, id="single-task-default-explicit-map"),
pytest.param("fallback", "task_a", None, id="single-task-default-map-none"),
pytest.param("fallback", ["task_a", "task_b"], NOTSET, id="multi-task-default-notset"),
],
)
def test_xcom_pull_returns_default_when_no_xcom_found(
self,
default_value,
task_ids,
map_indexes,
create_runtime_ti,
mock_supervisor_comms,
):
"""Test that xcom_pull returns the user-provided default when no XCom is found.

This is a regression test for the bug where xcom_pull() ignored the ``default``
parameter when ``map_indexes`` was not explicitly set (ArgNotSet), always
returning None instead of the provided default value.
"""
task = BaseOperator(task_id="pull_task")
runtime_ti = create_runtime_ti(task=task)

with (
patch.object(XCom, "get_one", return_value=None),
patch.object(XCom, "get_all", return_value=None),
):
result = runtime_ti.xcom_pull(
key="key",
task_ids=task_ids,
map_indexes=map_indexes,
default=default_value,
)

if map_indexes == NOTSET:
# NOTSET path: single task returns default directly; multi-task returns list
if isinstance(task_ids, (str, type(None))):
assert result == default_value
else:
assert result == [default_value, default_value]
else:
# Explicit map_indexes path: single task + single map index returns default directly
assert result == default_value

def test_xcom_pull_default_none_when_not_specified(
self,
create_runtime_ti,
mock_supervisor_comms,
):
"""Test that xcom_pull returns None as default when no default is provided and no XCom found."""
task = BaseOperator(task_id="pull_task")
runtime_ti = create_runtime_ti(task=task)

with patch.object(XCom, "get_all", return_value=None):
result = runtime_ti.xcom_pull(key="key", task_ids="task_a")
assert result is None

def test_get_param_from_context(
self, mocked_parse, make_ti_context, mock_supervisor_comms, create_runtime_ti
):
Expand Down