From 78f8ebe1dff27ae873524a86c460a65a6b4cfe86 Mon Sep 17 00:00:00 2001 From: ColtenOuO Date: Thu, 21 May 2026 20:39:28 +0000 Subject: [PATCH 1/3] Fix N+1 query pattern in bulk task instance delete endpoint --- .../services/public/task_instances.py | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index f72dfc52fbb67..9d6227a695cc2 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -611,7 +611,7 @@ def handle_bulk_delete( try: # Handle deletion of specific (dag_id, dag_run_id, task_id, map_index) tuples if delete_specific_map_index_task_keys: - _, matched_task_keys, not_found_task_keys = self._categorize_task_instances( + task_instances_map, matched_task_keys, not_found_task_keys = self._categorize_task_instances( delete_specific_map_index_task_keys ) not_found_task_ids = [ @@ -625,23 +625,10 @@ def handle_bulk_delete( detail=f"The task instances with these identifiers: {not_found_task_ids} were not found", ) - for dag_id, run_id, task_id, map_index in matched_task_keys: - ti = ( - self.session.execute( - select(TI).where( - TI.dag_id == dag_id, - TI.run_id == run_id, - TI.task_id == task_id, - TI.map_index == map_index, - ) - ) - .scalars() - .one_or_none() - ) - - if ti: - self.session.delete(ti) - results.success.append(f"{dag_id}.{run_id}.{task_id}[{map_index}]") + for task_key in matched_task_keys: + dag_id, run_id, task_id, map_index = task_key + self.session.delete(task_instances_map[task_key]) + results.success.append(f"{dag_id}.{run_id}.{task_id}[{map_index}]") # Handle deletion of all map indexes for certain (dag_id, dag_run_id, task_id) tuples if delete_all_map_index_task_keys: From 170cf40c16558371a8ab616ae13f59236e6b7b92 Mon Sep 17 00:00:00 2001 From: ColtenOuO Date: Thu, 21 May 2026 20:39:47 +0000 Subject: [PATCH 2/3] Add regression test for bulk task instance delete N+1 --- .../routes/public/test_task_instances.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index d49e36c8c63ba..ad7bea44e2d3a 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -6719,6 +6719,48 @@ def test_bulk_delete_rejects_unauthorized_dag_ids_from_request_body(self, test_c } ] + def test_bulk_delete_does_not_requery_each_task_instance(self, test_client, session): + # Regression guard for the N+1 fix in BulkTaskInstanceService.handle_bulk_delete: + # when deleting specific (task_id, map_index) tuples, the service must reuse the + # task instances already loaded by _categorize_task_instances instead of issuing + # a fresh SELECT for each one inside the delete loop. Each extra task instance + # should add exactly one query (its DELETE); the removed re-query would have + # added a second one per task instance. + + def delete_n_task_instances(n: int) -> int: + self.create_task_instances( + session, + task_instances=[{"state": State.RUNNING, "map_indexes": tuple(range(n))}], + ) + request_body = { + "actions": [ + { + "action": "delete", + "entities": [ + {"task_id": self.TASK_ID, "map_index": map_index} for map_index in range(n) + ], + "action_on_non_existence": "fail", + } + ] + } + with count_queries() as result: + response = test_client.patch(self.ENDPOINT_URL, json=request_body) + + assert response.status_code == 200 + assert len(response.json()["delete"]["success"]) == n + clear_db_runs() + return sum(result.values()) + + small, large = 5, 15 + queries_small = delete_n_task_instances(small) + queries_large = delete_n_task_instances(large) + + assert queries_large - queries_small == large - small, ( + f"Expected {large - small} extra queries for {large - small} extra task instances, " + f"got {queries_large - queries_small}. A regression that re-queries each task " + f"instance inside the delete loop would roughly double this delta." + ) + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.patch(self.ENDPOINT_URL, json={}) assert response.status_code == 401 From 41387eeac1cc598bdaaf4c5dc379380669a021c1 Mon Sep 17 00:00:00 2001 From: ColtenOuO Date: Tue, 26 May 2026 16:32:09 +0800 Subject: [PATCH 3/3] Refactor N+1 regression test to use parametrize pattern --- .../routes/public/test_task_instances.py | 72 +++++++++---------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index ad7bea44e2d3a..1e874f925b30d 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -6719,46 +6719,44 @@ def test_bulk_delete_rejects_unauthorized_dag_ids_from_request_body(self, test_c } ] - def test_bulk_delete_does_not_requery_each_task_instance(self, test_client, session): + @pytest.mark.parametrize("task_count", [5, 10, 20]) + def test_bulk_delete_query_count_scales_linearly_with_task_count(self, test_client, session, task_count): # Regression guard for the N+1 fix in BulkTaskInstanceService.handle_bulk_delete: - # when deleting specific (task_id, map_index) tuples, the service must reuse the - # task instances already loaded by _categorize_task_instances instead of issuing - # a fresh SELECT for each one inside the delete loop. Each extra task instance - # should add exactly one query (its DELETE); the removed re-query would have - # added a second one per task instance. + # each extra task instance must add exactly QUERIES_PER_TASK_INSTANCE query (its DELETE), + # not 2 (DELETE + re-SELECT). A regression that re-queries inside the loop would make + # each run strictly exceed BASE_QUERY_COUNT + task_count * QUERIES_PER_TASK_INSTANCE. + QUERIES_PER_TASK_INSTANCE = 1 + BASE_QUERY_COUNT = 5 - def delete_n_task_instances(n: int) -> int: - self.create_task_instances( - session, - task_instances=[{"state": State.RUNNING, "map_indexes": tuple(range(n))}], - ) - request_body = { - "actions": [ - { - "action": "delete", - "entities": [ - {"task_id": self.TASK_ID, "map_index": map_index} for map_index in range(n) - ], - "action_on_non_existence": "fail", - } - ] - } - with count_queries() as result: - response = test_client.patch(self.ENDPOINT_URL, json=request_body) + self.create_task_instances( + session, + task_instances=[{"state": State.RUNNING, "map_indexes": tuple(range(task_count))}], + ) + request_body = { + "actions": [ + { + "action": "delete", + "entities": [ + {"task_id": self.TASK_ID, "map_index": map_index} for map_index in range(task_count) + ], + "action_on_non_existence": "fail", + } + ] + } - assert response.status_code == 200 - assert len(response.json()["delete"]["success"]) == n - clear_db_runs() - return sum(result.values()) - - small, large = 5, 15 - queries_small = delete_n_task_instances(small) - queries_large = delete_n_task_instances(large) - - assert queries_large - queries_small == large - small, ( - f"Expected {large - small} extra queries for {large - small} extra task instances, " - f"got {queries_large - queries_small}. A regression that re-queries each task " - f"instance inside the delete loop would roughly double this delta." + with count_queries() as result: + response = test_client.patch(self.ENDPOINT_URL, json=request_body) + + assert response.status_code == 200 + assert len(response.json()["delete"]["success"]) == task_count + + query_count = sum(result.values()) + expected_query_count = BASE_QUERY_COUNT + task_count * QUERIES_PER_TASK_INSTANCE + assert query_count == expected_query_count, ( + f"Bulk-delete query count {query_count} does not match expected {expected_query_count} " + f"for {task_count} task instances. " + f"A regression that re-queries each task instance would give " + f"~{BASE_QUERY_COUNT + task_count * 2} queries instead." ) def test_should_respond_401(self, unauthenticated_test_client):