Skip to content

Speed up TaskGroup.topological_sort with int-indexed projected sweep#67288

Open
shahar1 wants to merge 7 commits into
apache:mainfrom
shahar1:taskgroup-topo-sort-projection
Open

Speed up TaskGroup.topological_sort with int-indexed projected sweep#67288
shahar1 wants to merge 7 commits into
apache:mainfrom
shahar1:taskgroup-topo-sort-projection

Conversation

@shahar1
Copy link
Copy Markdown
Contributor

@shahar1 shahar1 commented May 21, 2026

Summary

Replaces the modified-Kahn body of TaskGroup.topological_sort (and the mirror in SerializedTaskGroup) with a projected sweep:

  • Each child's per-task upstream_task_ids is projected onto sibling-level integer indices once up front (_project_child_deps).
  • The sweep (_sweep_projection) then runs against the projection with a bytearray-backed emission flag - per-edge work happens once per sort instead of once per outer-loop pass.

Emission order is identical to the previous implementation. The existing order-sensitive tests (test_topological_sort1/2, test_topological_nested_groups, test_topological_group_dep) cover the contract.

Benchmark

Min of five runs, N=2000 children, ms per call. Source: https://gist.github.com/shahar1/9c61dc9f34f7e77cd29cfb9d67af7ceb

Shape main PR speedup
chain (forward-declared) 1.39 0.75 1.84x
diamond (root → many → sink) 2.20 0.86 2.56x
independent (no deps) 0.65 0.35 1.84x
layered (4-layer full bipartite) 230.96 67.15 3.44x
nested (depth-3, walked across all groups) 17.78 4.77 3.73x
rev-chain (adversarial reverse insertion) 894.04 114.16 7.83x

The 7.8x on rev-chain comes from the projection precompute alone (one projection vs N projections); algorithmic complexity is unchanged and remains O(N²) on that shape until the follow-up PR.

The nested row times a recursive walk across all 39 nested groups (depth=3, ~222 tasks each); the root-only timing was dominated by setup overhead and was previously labelled ~noise in this table.

Test plan

  • task-sdk/tests/task_sdk/definitions/test_taskgroup.py + test_dag.py — 145 passed (18 new shape-correctness cases)
  • airflow-core/tests/unit/utils/test_task_group.py — 20 passed (1 new serialization round-trip case)
  • airflow-core/tests/unit/serialization/ (task-group / topological filter) — 4 passed
  • prek run mypy-task-sdk — passed
  • prek run mypy-airflow-core — passed
  • prek run --from-ref upstream/main --stage pre-commit — passed

Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4.7)

Generated-by: Claude Code (Opus 4.7) following the guidelines

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes TaskGroup.topological_sort (and the serialized equivalent) by pre-projecting upstream dependencies to sibling indices once per call, then performing a sweep using an efficient emitted-flag structure. This aims to reduce repeated per-edge work during sorting while preserving the existing emission order.

Changes:

  • Replaced the previous modified-Kahn loop in TaskGroup.topological_sort with a “project then sweep” implementation, including explicit cycle detection.
  • Mirrored the same algorithm into SerializedTaskGroup.topological_sort for consistency/performance.
  • Added Task SDK tests that validate topological correctness across multiple DAG shapes, plus a newsfragment entry.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
task-sdk/tests/task_sdk/definitions/test_taskgroup.py Adds cross-shape correctness tests for TaskGroup.topological_sort.
task-sdk/src/airflow/sdk/definitions/taskgroup.py Implements projected dependency computation and sweep-based topo sort in Task SDK TaskGroup.
airflow-core/src/airflow/serialization/definitions/taskgroup.py Mirrors the new topo sort algorithm for serialized task groups and adds cycle detection behavior.
airflow-core/newsfragments/67288.improvement.rst Documents the TaskGroup.topological_sort performance improvement.

Comment thread task-sdk/tests/task_sdk/definitions/test_taskgroup.py Outdated
Comment thread airflow-core/src/airflow/serialization/definitions/taskgroup.py Outdated
shahar1 added 6 commits May 21, 2026 23:21
The previous modified-Kahn implementation re-derived each child's upstream
edges (materializing ``upstream_list`` and walking ``parent_group``) on every
outer-loop pass. Project per-task upstream IDs onto sibling-level integer
indices once up front, then run a greedy multi-pass sweep against that
projection with a ``bytearray`` emission flag. Emission order is identical to
the previous implementation; existing order-sensitive tests cover the contract.

Same change is mirrored in SerializedTaskGroup.
Adds a round-trip test for SerializedTaskGroup.topological_sort (the
serialization variant was previously untested), rewrites the newsfragment
in user-facing terms, and cleans up a stale reference and type annotation
in the task-sdk shape tests.
Per Copilot review feedback, the previous wording said cycles are caught
at "deserialization time" — but DAG.check_cycle runs at DAG parse time
(via dagbag loading), not during from_dict/from_json. Reword to describe
cycles reaching this code path as malformed serialized data, with the
defensive ValueError still raised on detection.
Tighten _project_child_deps and the topological_sort body: drop the
per-edge child_idx comparison in favour of a single set.discard at the
end, inline the get_task/.task_group chain, and replace the pre-alloc
loop with a list comprehension. Net -15 lines per file with a small
(~10%) speedup on the layered shape and no regression elsewhere.

The hot _sweep_projection inner loop is left intact — earlier attempts
to extract its duplicated body into a closure cost 10-30% on
independent / layered shapes and were reverted.
@shahar1 shahar1 force-pushed the taskgroup-topo-sort-projection branch from 62325c1 to 411f90e Compare May 21, 2026 20:22
@shahar1 shahar1 requested a review from Copilot May 21, 2026 20:30
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment thread task-sdk/tests/task_sdk/definitions/test_taskgroup.py
Comment thread airflow-core/newsfragments/67288.improvement.rst
Drop n=500 from the parametrize grid. The algorithm has no n-dependent
branches, so n=100 covers every code path; n=500 only re-runs the same
loops with more iterations and added ~0.7s to the file without
exercising new correctness behaviour (per Copilot review feedback on PR
apache#67288).
@shahar1 shahar1 marked this pull request as ready for review May 21, 2026 20:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants