Skip to content

Commit 4394583

Browse files
Clear, Mark Success/Fail and delete multiple Task Instances (#64141)
1 parent 0c2a8d5 commit 4394583

19 files changed

Lines changed: 1316 additions & 23 deletions

File tree

airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
168168
"clearing the task instances.",
169169
)
170170
prevent_running_task: bool = False
171+
note: Annotated[str, StringConstraints(max_length=1000)] | None = None
171172

172173
@model_validator(mode="before")
173174
@classmethod

airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9904,6 +9904,12 @@ components:
99049904
type: boolean
99059905
title: Prevent Running Task
99069906
default: false
9907+
note:
9908+
anyOf:
9909+
- type: string
9910+
maxLength: 1000
9911+
- type: 'null'
9912+
title: Note
99079913
additionalProperties: false
99089914
type: object
99099915
title: ClearTaskInstancesBody

airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,7 @@ def post_clear_task_instances(
721721
dag_bag: DagBagDep,
722722
body: ClearTaskInstancesBody,
723723
session: SessionDep,
724+
user: GetUserDep,
724725
) -> TaskInstanceCollectionResponse:
725726
"""Clear task instances."""
726727
dag = get_latest_version_of_dag(dag_bag, dag_id, session)
@@ -837,6 +838,13 @@ def _collect_relatives(run_id: str, direction: Literal["upstream", "downstream"]
837838
except AirflowClearRunningTaskException as e:
838839
raise HTTPException(status.HTTP_409_CONFLICT, str(e)) from e
839840

841+
if body.note is not None:
842+
_patch_task_instance_note(
843+
task_instance_body=body,
844+
tis=task_instances,
845+
user=user,
846+
)
847+
840848
return TaskInstanceCollectionResponse(
841849
task_instances=[TaskInstanceResponse.model_validate(ti) for ti in task_instances],
842850
total_entries=len(task_instances),

airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@
3737
BulkDeleteAction,
3838
BulkUpdateAction,
3939
)
40-
from airflow.api_fastapi.core_api.datamodels.task_instances import BulkTaskInstanceBody, PatchTaskInstanceBody
40+
from airflow.api_fastapi.core_api.datamodels.task_instances import (
41+
BulkTaskInstanceBody,
42+
ClearTaskInstancesBody,
43+
PatchTaskInstanceBody,
44+
)
4145
from airflow.api_fastapi.core_api.security import GetUserDep
4246
from airflow.api_fastapi.core_api.services.public.common import BulkService
4347
from airflow.listeners.listener import get_listener_manager
@@ -139,7 +143,7 @@ def _patch_task_instance_state(
139143

140144

141145
def _patch_task_instance_note(
142-
task_instance_body: BulkTaskInstanceBody | PatchTaskInstanceBody,
146+
task_instance_body: BulkTaskInstanceBody | ClearTaskInstancesBody | PatchTaskInstanceBody,
143147
tis: list[TI],
144148
user: GetUserDep,
145149
update_mask: list[str] | None = Query(None),
@@ -275,6 +279,7 @@ def _perform_update(
275279
dag_bag=self.dag_bag,
276280
body=entity,
277281
session=self.session,
282+
map_index=map_index,
278283
update_mask=update_mask,
279284
)
280285

@@ -318,12 +323,12 @@ def handle_bulk_update(
318323

319324
try:
320325
specific_entity_map = {
321-
(entity.dag_id, entity.dag_run_id, entity.task_id, entity.map_index): entity
326+
self._extract_task_identifiers(entity): entity
322327
for entity in action.entities
323328
if entity.map_index is not None
324329
}
325330
all_map_entity_map = {
326-
(entity.dag_id, entity.dag_run_id, entity.task_id): entity
331+
self._extract_task_identifiers(entity)[:3]: entity
327332
for entity in action.entities
328333
if entity.map_index is None
329334
}

airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,6 +1383,18 @@ export const $ClearTaskInstancesBody = {
13831383
type: 'boolean',
13841384
title: 'Prevent Running Task',
13851385
default: false
1386+
},
1387+
note: {
1388+
anyOf: [
1389+
{
1390+
type: 'string',
1391+
maxLength: 1000
1392+
},
1393+
{
1394+
type: 'null'
1395+
}
1396+
],
1397+
title: 'Note'
13861398
}
13871399
},
13881400
additionalProperties: false,

airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ export type ClearTaskInstancesBody = {
427427
*/
428428
run_on_latest_version?: boolean;
429429
prevent_running_task?: boolean;
430+
note?: string | null;
430431
};
431432

432433
/**

airflow-core/src/airflow/ui/public/i18n/locales/en/common.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@
199199
"users": "Users"
200200
},
201201
"selectLanguage": "Select Language",
202+
"selected": "Selected",
202203
"showDetailsPanel": "Show Details Panel",
203204
"signedInAs": "Signed in as",
204205
"source": {
@@ -299,13 +300,27 @@
299300
"utc": "UTC (Coordinated Universal Time)"
300301
},
301302
"toaster": {
303+
"bulkClear": {
304+
"error": "Bulk Clear {{resourceName}} Request Failed",
305+
"success": {
306+
"description": "{{count}} {{resourceName}} have been successfully cleared. Keys: {{keys}}",
307+
"title": "Bulk Clear {{resourceName}} Request Submitted"
308+
}
309+
},
302310
"bulkDelete": {
303311
"error": "Bulk Delete {{resourceName}} Request Failed",
304312
"success": {
305313
"description": "{{count}} {{resourceName}} have been successfully deleted. Keys: {{keys}}",
306314
"title": "Bulk Delete {{resourceName}} Request Submitted"
307315
}
308316
},
317+
"bulkUpdate": {
318+
"error": "Bulk Update {{resourceName}} Request Failed",
319+
"success": {
320+
"description": "{{count}} {{resourceName}} have been successfully updated. Keys: {{keys}}",
321+
"title": "Bulk Update {{resourceName}} Request Submitted"
322+
}
323+
},
309324
"create": {
310325
"error": "Create {{resourceName}} Request Failed",
311326
"success": {

airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ import { Box, Editable, Text, VStack } from "@chakra-ui/react";
2020
import type { ChangeEvent } from "react";
2121
import { useTranslation } from "react-i18next";
2222

23-
import type { DAGRunResponse, TaskInstanceCollectionResponse } from "openapi/requests/types.gen";
23+
import type {
24+
DAGRunResponse,
25+
TaskInstanceCollectionResponse,
26+
TaskInstanceResponse,
27+
} from "openapi/requests/types.gen";
2428
import ReactMarkdown from "src/components/ReactMarkdown";
2529
import { Accordion } from "src/components/ui";
2630

@@ -29,17 +33,59 @@ import { getColumns } from "./columns";
2933

3034
type Props = {
3135
readonly affectedTasks?: TaskInstanceCollectionResponse;
36+
readonly groupByRunId?: boolean;
3237
readonly note: DAGRunResponse["note"];
3338
readonly setNote: (value: string) => void;
3439
};
3540

41+
const TasksTable = ({
42+
noRowsMessage,
43+
tasks,
44+
}: {
45+
readonly noRowsMessage: string;
46+
readonly tasks: Array<TaskInstanceResponse>;
47+
}) => {
48+
const { t: translate } = useTranslation();
49+
const columns = getColumns(translate);
50+
51+
return (
52+
<DataTable
53+
columns={columns}
54+
data={tasks}
55+
displayMode="table"
56+
modelName="common:taskInstance"
57+
noRowsMessage={noRowsMessage}
58+
total={tasks.length}
59+
/>
60+
);
61+
};
62+
3663
// Table is in memory, pagination and sorting are disabled.
3764
// TODO: Make a front-end only unconnected table component with client side ordering and pagination
38-
const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => {
65+
const ActionAccordion = ({ affectedTasks, groupByRunId = false, note, setNote }: Props) => {
3966
const showTaskSection = affectedTasks !== undefined;
4067
const { t: translate } = useTranslation();
4168

42-
const columns = getColumns(translate);
69+
// Group task instances by dag_run_id when requested
70+
const runGroups = (() => {
71+
if (!groupByRunId || !affectedTasks) {
72+
return undefined;
73+
}
74+
75+
const map = new Map<string, Array<TaskInstanceResponse>>();
76+
77+
for (const ti of affectedTasks.task_instances) {
78+
const group = map.get(ti.dag_run_id) ?? [];
79+
80+
group.push(ti);
81+
map.set(ti.dag_run_id, group);
82+
}
83+
84+
return map;
85+
})();
86+
87+
// Only group when there are actually multiple run IDs
88+
const shouldGroup = groupByRunId && runGroups !== undefined && runGroups.size > 1;
4389

4490
return (
4591
<Accordion.Root
@@ -59,14 +105,33 @@ const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => {
59105
</Accordion.ItemTrigger>
60106
<Accordion.ItemContent>
61107
<Box maxH="400px" overflowY="scroll">
62-
<DataTable
63-
columns={columns}
64-
data={affectedTasks.task_instances}
65-
displayMode="table"
66-
modelName="common:taskInstance"
67-
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
68-
total={affectedTasks.total_entries}
69-
/>
108+
{shouldGroup ? (
109+
<Accordion.Root collapsible multiple variant="plain">
110+
{[...runGroups.entries()].map(([runId, tis]) => (
111+
<Accordion.Item key={runId} value={runId}>
112+
<Accordion.ItemTrigger px={2} py={1}>
113+
<Text fontSize="sm" fontWeight="semibold">
114+
{translate("runId")}: {runId}{" "}
115+
<Text as="span" color="fg.subtle" fontWeight="normal">
116+
({tis.length})
117+
</Text>
118+
</Text>
119+
</Accordion.ItemTrigger>
120+
<Accordion.ItemContent>
121+
<TasksTable
122+
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
123+
tasks={tis}
124+
/>
125+
</Accordion.ItemContent>
126+
</Accordion.Item>
127+
))}
128+
</Accordion.Root>
129+
) : (
130+
<TasksTable
131+
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
132+
tasks={affectedTasks.task_instances}
133+
/>
134+
)}
70135
</Box>
71136
</Accordion.ItemContent>
72137
</Accordion.Item>

airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr
5757
const downstream = selectedOptions.includes("downstream");
5858
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
5959

60-
const [note, setNote] = useState<string>("");
60+
const [note, setNote] = useState<string | null>(null);
6161

6262
const { data: dagDetails } = useDagServiceGetDagDetails({
6363
dagId,
@@ -186,6 +186,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr
186186
include_future: future,
187187
include_past: past,
188188
include_upstream: upstream,
189+
...(note === null ? {} : { note }),
189190
only_failed: onlyFailed,
190191
run_on_latest_version: runOnLatestVersion,
191192
task_ids: groupTaskIds,

0 commit comments

Comments
 (0)