feat: add ExecutionGraph, CompletionTracker, and Task model for async scheduler#356
Conversation
… scheduler Add the foundational data structures for the async task-queue dataset builder (plan #346, PR 1/4): - ExecutionGraph: column-level static DAG with topological ordering, critical path, task counts, cell-dependency resolution, Mermaid output, and side-effect column mapping (__trace, __reasoning_content). - CompletionTracker: lightweight (column, row_group, row_index) completion state with row dropping and ready-task enumeration. - Task/TaskResult/TaskTrace: frozen hashable task dataclass, result container, and opt-in tracing record. All three are pure data structures with no side effects on the existing codebase. They live in new modules under engine/dataset_builders/utils/ and are only imported by code introduced in later PRs. 56 unit tests covering graph construction, validation, dependency resolution, completion tracking, row drops, and task model semantics. Refs #346
Add `is_ready` and `is_batch_ready` methods to CompletionTracker to simplify `ready_tasks`. Cache topological order in ExecutionGraph since the graph is immutable after construction. Move DatasetBuilderColumnConfigT type alias to multi_column_configs. Fix license header years.
Greptile SummaryThis PR introduces three foundational modules for the upcoming async scheduler: Key findings:
These are latent correctness bugs that will manifest as duplicate task dispatch once the async scheduler (PR 3) consumes this API.
|
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Outdated
Show resolved
Hide resolved
...ages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/task_model.py
Outdated
Show resolved
Hide resolved
- Rename all_complete → is_all_complete for boolean method convention - Add ColumnName, RowGroup, RowIndex type aliases for readability - Add public mutation API to ExecutionGraph (add_column, add_edge, set_side_effect, resolve_side_effect) and rewrite build_execution_graph to use it instead of private attributes - Change TaskTrace.from_task from @staticmethod to @classmethod
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/task_model.py
Outdated
Show resolved
Hide resolved
...ages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py
Outdated
Show resolved
Hide resolved
| return "\n".join(lines) | ||
|
|
||
|
|
||
| def build_execution_graph( |
There was a problem hiding this comment.
nit: probably mostly stylistic:
This could be a factory create class method of the ExecutionGraph class itself:
@classmethod
def create(cls, column_configs: list[DatasetBuilderColumnConfigT], strategies: dict[ColumnName, GenerationStrategy]) -> Self:
...
There was a problem hiding this comment.
moved the logic into ExecutionGraph.create(), kept build_execution_graph as a thin deprecated wrapper so existing call sites still work
There was a problem hiding this comment.
Any reason to not update existing call sites now?
There was a problem hiding this comment.
nope, good call — removed the wrapper entirely and updated all call sites to ExecutionGraph.create()
- Rename RowGroup type alias to RowGroupIndex for consistency - Convert ExecutionGraph from dataclass to plain class - Move build_execution_graph logic to ExecutionGraph.create() classmethod
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
@andreatgretel a few more comments related to perf!
Optimization Review
High Impact
1. get_ready_tasks is O(C × R × G) on every scheduler tick
This scans every column × every row × every row group on each call. With 10 columns, 10k records, buffer_size=100, that's ~100k iterations per tick, each triggering cell_dependencies() + is_all_complete().
Two suggestions:
- Early skip for completed column×row_group pairs in the cell-by-cell branch. Before the inner row loop, a quick check like
len(completed.get(col, set())) + len(dropped) >= rg_sizewould let you skip entire blocks. - Incremental/event-driven readiness (future PR): maintain a frontier set updated on
mark_completeinstead of full-scanning. This turns the scheduler from poll-based to event-driven.
2. cell_dependencies allocates a new list + tuples every call
Called per-cell inside the hot loop. For a 100-row batch with 3 upstream columns: 100 list allocations + 300 tuple allocations per column per row group per tick. Since the graph is immutable, the dependency pattern for a given column is always the same — only (row_group, row_index) varies. A cached descriptor that is_all_complete interprets directly could avoid most allocations.
3. is_batch_ready builds full dep list then filters it
deps = graph.cell_dependencies(column, row_group, None, row_group_size)
deps = [(c, rg, ri) for c, rg, ri in deps if ri is None or not self.is_dropped(rg, ri)]For a full-column downstream of a 1000-row cell-by-cell column, this builds 1000 tuples then creates a second filtered list. Consider checking dropped rows inline or passing the dropped set into the dependency resolution.
Low Impact (fine to defer)
4. topological_order() and columns copy on every access — topological_order() does return list(cache) and is called once per column per row group in get_ready_tasks. Since the graph is immutable and callers don't mutate the result, an internal _topological_order that returns the cached list directly (skipping the copy) would help in the hot path. Same for the columns property.
5. is_all_complete repeated dict lookups — Each (col, rg, ri) tuple triggers self._completed.get(rg, {}).get(col, set()) with temporary empty dict/set allocations on misses. Hoisting the row-group lookup outside the per-cell loop would reduce overhead.
6. _upstream/_downstream are defaultdict but accessors use .get(key, set()) — Allocates a fresh empty set on every miss. Minor, but switching to plain dict would make the no-side-effect intent explicit and avoid the allocation.
Summary
The two highest-impact changes are (1) early-skip logic in get_ready_tasks and (2) reducing per-cell allocations in cell_dependencies. Everything else is micro-optimization that can wait until profiling confirms it matters. Great foundation overall.
|
@nabinchha update on the optimization review after the event-driven frontier refactor: 1. 2. 3. 4–6 (topological_order copies, is_all_complete lookups, defaultdict) — already addressed in previous commits or no longer in the hot path. |
Replace the poll-based get_ready_tasks (O(C × R × G) per tick) with an event-driven frontier maintained on mark_complete/mark_batch_complete/ drop_row. get_ready_tasks now returns O(frontier) instead of scanning all columns × rows × row groups.
- Add ReadyTasksFixture dataclass and ready_ctx pytest fixture to deduplicate graph/tracker/dispatched setup across get_ready_tasks tests - Align test with ExecutionGraph.create API rename - Remove redundant inline comments
- CompletionTracker now raises ValueError when graph/row_groups are provided without each other - resolve_side_effect prefers real columns over aliases when a name collision exists
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Show resolved
Hide resolved
...ages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py
Show resolved
Hide resolved
...ages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Show resolved
Hide resolved
...ages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py
Show resolved
Hide resolved
Additional Comments (3)
Add a defensive guard in rg_size = self._row_group_sizes.get(row_group)
if rg_size is None:
return
If the scheduler calls Consider tracking batch-level completions separately to distinguish between partial and complete batches, or validate that all rows in batch upstreams are complete before enqueuing downstream tasks.
Add an early return for the empty case: def critical_path(self) -> list[str]:
order = self.topological_order()
if not order:
return []
... |
| def is_all_complete(self, cells: list[tuple[ColumnName, RowGroupIndex, RowIndex | None]]) -> bool: | ||
| """Check whether all the given (column, row_group, row_index) tuples are done. |
There was a problem hiding this comment.
nit: would it be helpful to have a Cell object that we pass around? maybe not a big deal for agents, but for humans the requirement to get the tuple oder right is error prone 🤷♂️
There was a problem hiding this comment.
this would also remove the need for this type aliases, i think
There was a problem hiding this comment.
done — introduced a CellRef NamedTuple in task_model.py with column, row_group, row_index fields. replaced the raw tuples in cell_dependencies return type and is_all_complete parameter. also removed the ColumnName/RowGroupIndex/RowIndex type aliases since they're redundant now
| def mark_complete(self, column: ColumnName, row_group: RowGroupIndex, row_index: RowIndex) -> None: | ||
| self._completed[row_group][column].add(row_index) |
There was a problem hiding this comment.
nit: can we be more explicit about what is being marked complete? This is marking a single row complete, right?
Also, another super nit: historically we have used "record" throughout the codebase, including in the interface – e.g., num_records. I like that "row" is short lol, but wanted to call out that this is a small inconsistency.
There was a problem hiding this comment.
BTW – I'm not saying we necessarily need to change row -> record. Just wanted to make sure we knowingly have an inconsistency. Your call.
There was a problem hiding this comment.
renamed to mark_cell_complete to make it clear it's a single cell. keeping "row" for the internal scheduler layer since it maps naturally to the row_group/row_index grid — "record" stays in the public interface (num_records etc.)
| A ``row_index`` of ``None`` means the entire batch for that column must | ||
| be complete (i.e., that column key must exist in the row group's dict). |
There was a problem hiding this comment.
Purely speculating here, but this being the signal that a column is complete feels like something is off – e.g. why are are passing a column name and row group index when no row index exists. I realize this is the opposite of actionable feedback lol. Just noting this came to mind.
There was a problem hiding this comment.
agree it was awkward. with the new _batch_complete tracking set, batch completion now has an explicit separate signal — mark_row_range_complete populates _batch_complete[rg].add(column) and is_all_complete checks that set for row_index=None entries instead of just checking key presence
| self._frontier.discard(Task(column=column, row_group=row_group, row_index=row_index, task_type="cell")) | ||
| self._enqueue_downstream(column, row_group, row_index=row_index) | ||
|
|
||
| def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None: |
There was a problem hiding this comment.
super nit: for me, it would be more clear what this method does if we called it something like mark_row_range_complete.
There was a problem hiding this comment.
renamed to mark_row_range_complete
| for col, rg, ri in cells: | ||
| if ri is None: | ||
| if col not in self._completed.get(rg, {}): | ||
| return False |
There was a problem hiding this comment.
Claude seems convinced that is a bug in is_all_complete. Having trouble figure out if this is true, so sharing here:
When called with row_index=None (meaning "is this batch column fully done?"), line 120 only checks key presence:
if col not in self._completed.get(rg, {}):
return FalseIt doesn't check how many rows are in the set. So this sequence produces a wrong answer:
tracker = CompletionTracker()
tracker.mark_complete("topic", row_group=0, row_index=0) # just row 0
# This returns True — but only 1 of 3 rows is done!
tracker.is_all_complete([("topic", 0, None)])The key "topic" exists in self._completed[0] after the single mark_complete call, so the check passes.
Why it matters
is_all_complete is consumed by cell_dependencies callers to verify upstream readiness. If FULL_COLUMN column (like a sampler) is an upstream dep, the downstream's dependency list includes ("topic", 0, None). If something goes wrong and individual mark_complete calls happen on a column that should only be batch-completed, is_all_complete would report "ready" after a single row — potentially causing the scheduler to dispatch work before its inputs are actually available.
Why it's not catastrophic today
The frontier-based scheduling in _enqueue_downstream does not use is_all_complete. It has its own separate check at line 82:
if any(up not in rg_completed for up in batch_ups):
continueThis has the same key-presence issue, but the frontier is only updated from mark_complete and mark_batch_complete call sites, which the scheduler will control. So in practice the scheduler would call the right method for the right column type. The bug is latent — it's an API semantics issue rather than a live runtime failure.
The two fix options
Option A: Track batch completions separately (cleaner)
def __init__(self, ...) -> None:
...
self._batch_complete: dict[RowGroupIndex, set[ColumnName]] = defaultdict(set)
def mark_batch_complete(self, column, row_group, row_group_size):
self._completed[row_group][column].update(range(row_group_size))
self._batch_complete[row_group].add(column)
...
def is_all_complete(self, cells):
for col, rg, ri in cells:
if ri is None:
if col not in self._batch_complete.get(rg, set()):
return False
elif not self.is_complete(col, rg, ri):
return False
return TrueNow the ri=None path checks a definitive signal that mark_batch_complete was actually called, not just that some row happened to exist.
Option B: Document the precondition (simpler)
Add to the docstring: "Callers must ensure that batch columns (row_index=None) are completed via mark_batch_complete, not individual mark_complete calls. The check only verifies the column key is present."
This accepts the semantic gap but makes it explicit so future callers don't trip over it.
There was a problem hiding this comment.
went with option A — added a _batch_complete: dict[int, set[str]] that's only populated by mark_row_range_complete. is_all_complete now checks that set for row_index=None entries. also added a regression test that confirms mark_cell_complete on a single row doesn't make is_all_complete return True for the batch check
| return column | ||
| return self._side_effect_map.get(column, column) | ||
|
|
||
| def upstream(self, column: ColumnName) -> set[ColumnName]: |
There was a problem hiding this comment.
nit: get_upstream_columns would be more consistent with the existing codebase. same for downstream and other places names are potentially vague
There was a problem hiding this comment.
renamed: upstream → get_upstream_columns, downstream → get_downstream_columns, critical_path → get_longest_dependency_chain
| self._frontier.add(task) | ||
| else: | ||
| # Batch completion: check all non-dropped, non-complete rows | ||
| down_completed = rg_completed.get(down, set()) |
There was a problem hiding this comment.
We are needing to do this .get pattern an awful lot throughout this implementation. Not a big deal I suppose, but it might be a signal that some sort of abstraction might help with code readability.
There was a problem hiding this comment.
fair point. the _batch_complete set removed some of it. the remaining .get() calls are mostly in _enqueue_downstream and _reevaluate_batch_tasks which are hot-path internal methods — a helper would add indirection without much clarity gain since the fallback values differ (empty dict vs empty set). leaving as-is for now but open to revisiting
| class CompletionTracker: | ||
| """Tracks which (column, row_group, row_index) tuples are done. |
There was a problem hiding this comment.
The tuple (column, row_group, row_index) is a cell, right? Sorry, I have left too many comments about this, but it feels clunky to have to carry the tuple around everywhere 😅
Can this be indexed / framed as cell_{row_group}? Actually, this makes me realize I'm not sure what the range of row_index is. Is it the actual dataset range, so (i, j) = (row_index, column) in the dataset? Or are we resetting the range for each row group?
There was a problem hiding this comment.
I wondered about that too. May be keep track of both?! Somewhere I made a suggestion to use named tuples or a dataclass instead carrying a tuple around.
There was a problem hiding this comment.
If we just track the local indices, we should be able to resolve global indices via a property, for example.
There was a problem hiding this comment.
introduced CellRef NamedTuple to replace the raw tuples. row indices are local to their row group (0-based), so for a row group of size 3, indices are 0, 1, 2. added a note in the class docstring
| self._topological_order_cache = order | ||
| return order | ||
|
|
||
| def critical_path(self) -> list[str]: |
There was a problem hiding this comment.
Two things I'm noticing as general feedback throughout:
- We generally start method names with a verb – e.g, "get_", "load_", "save_", etc.
- Where possible, it would be nice to be as explicit as possible (without have names that are 10 words long lol). In this case, I can probably guess what critical path means, but "longest dependency chain" is more clear at first glance.
There was a problem hiding this comment.
good points — renamed all methods to start with verbs: upstream → get_upstream_columns, downstream → get_downstream_columns, critical_path → get_longest_dependency_chain. also renamed mark_complete → mark_cell_complete and mark_batch_complete → mark_row_range_complete for clarity
| class Task: | ||
| """A unit of work for the async scheduler.""" | ||
|
|
||
| column: ColumnName |
There was a problem hiding this comment.
nit: we use column_name in other places (note sure if we are 100% consistent about this, though). personally, I find ColumnName sort of strange – would prefer column_name: str.
There was a problem hiding this comment.
removed all three type aliases (ColumnName, RowGroupIndex, RowIndex) — fields are now plain str/int. the CellRef NamedTuple replaces the tuple pattern
| row_group: RowGroupIndex | ||
| row_index: RowIndex | None # None for batch/full-column tasks | ||
| task_type: Literal["from_scratch", "cell", "batch", "pre_batch_processor", "post_batch_processor"] |
There was a problem hiding this comment.
Coming back to feeling like maybe the abstractions might still need some fiddling. Mainly because we are passing the tuple around, which when fully specified is a single cell, but then it also is the same as a Task once we add a task_type.
There was a problem hiding this comment.
addressed by introducing CellRef NamedTuple — the tuple is now a named type with column, row_group, row_index fields. Task is a separate concept (adds task_type for the scheduler). the overlap is intentional: CellRef is the data coordinate, Task is the work unit
…-queue-foundation
| else: | ||
| # FULL_COLUMN downstream: ready when all cell upstreams are fully complete | ||
| if self._are_cell_ups_complete(cell_ups, rg_completed, rg_size, rg_dropped): | ||
| task = Task(column=down, row_group=row_group, row_index=None, task_type="batch") | ||
| self._frontier.add(task) |
There was a problem hiding this comment.
FULL_COLUMN downstream can be re-enqueued after it's already marked complete
This branch adds a batch task to _frontier whenever all cell upstreams are complete, but does not guard against re-adding a downstream that has already been marked complete. Compare line 148 in _reevaluate_batch_tasks:
if col in rg_completed:
continueIf a FULL_COLUMN downstream is dispatched and mark_batch_complete() is called, then a later upstream cell completion triggers _enqueue_downstream(), the downstream task will be silently re-added to _frontier without the guard. The scheduler will see a "ready" task for a column that has already been processed.
| else: | |
| # FULL_COLUMN downstream: ready when all cell upstreams are fully complete | |
| if self._are_cell_ups_complete(cell_ups, rg_completed, rg_size, rg_dropped): | |
| task = Task(column=down, row_group=row_group, row_index=None, task_type="batch") | |
| self._frontier.add(task) | |
| else: | |
| # FULL_COLUMN downstream: ready when all cell upstreams are fully complete | |
| if down not in rg_completed and self._are_cell_ups_complete(cell_ups, rg_completed, rg_size, rg_dropped): | |
| task = Task(column=down, row_group=row_group, row_index=None, task_type="batch") | |
| self._frontier.add(task) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Line: 106-110
Comment:
**FULL_COLUMN downstream can be re-enqueued after it's already marked complete**
This branch adds a batch task to `_frontier` whenever all cell upstreams are complete, but does not guard against re-adding a downstream that has already been marked complete. Compare line 148 in `_reevaluate_batch_tasks`:
```python
if col in rg_completed:
continue
```
If a FULL_COLUMN downstream is dispatched and `mark_batch_complete()` is called, then a later upstream cell completion triggers `_enqueue_downstream()`, the downstream task will be silently re-added to `_frontier` without the guard. The scheduler will see a "ready" task for a column that has already been processed.
```suggestion
else:
# FULL_COLUMN downstream: ready when all cell upstreams are fully complete
if down not in rg_completed and self._are_cell_ups_complete(cell_ups, rg_completed, rg_size, rg_dropped):
task = Task(column=down, row_group=row_group, row_index=None, task_type="batch")
self._frontier.add(task)
```
How can I resolve this? If you propose a fix, please make it concise.| if row_index is not None: | ||
| # Cell completion: only check the same row | ||
| if row_index not in rg_dropped and all(row_index in s for s in cell_up_completed): | ||
| task = Task(column=down, row_group=row_group, row_index=row_index, task_type="cell") | ||
| self._frontier.add(task) |
There was a problem hiding this comment.
Cell-completion path can re-enqueue already-completed rows
The single-row cell path (line 92–96) lacks the "already-done" row check that the multi-row batch path includes. The batch branch (lines 99–105) correctly skips downstream rows with:
if ri in rg_dropped or ri in down_completed:
continueIf mark_complete() is called for an upstream cell after the downstream cell for that same row has already been marked complete, line 96 will re-add the downstream task to _frontier without verifying the row isn't already done for that downstream column.
| if row_index is not None: | |
| # Cell completion: only check the same row | |
| if row_index not in rg_dropped and all(row_index in s for s in cell_up_completed): | |
| task = Task(column=down, row_group=row_group, row_index=row_index, task_type="cell") | |
| self._frontier.add(task) | |
| if row_index is not None: | |
| # Cell completion: only check the same row | |
| down_completed = rg_completed.get(down, set()) | |
| if row_index not in rg_dropped and row_index not in down_completed and all(row_index in s for s in cell_up_completed): | |
| task = Task(column=down, row_group=row_group, row_index=row_index, task_type="cell") | |
| self._frontier.add(task) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Line: 92-96
Comment:
**Cell-completion path can re-enqueue already-completed rows**
The single-row cell path (line 92–96) lacks the "already-done" row check that the multi-row batch path includes. The batch branch (lines 99–105) correctly skips downstream rows with:
```python
if ri in rg_dropped or ri in down_completed:
continue
```
If `mark_complete()` is called for an upstream cell after the downstream cell for that same row has already been marked complete, line 96 will re-add the downstream task to `_frontier` without verifying the row isn't already done for that downstream column.
```suggestion
if row_index is not None:
# Cell completion: only check the same row
down_completed = rg_completed.get(down, set())
if row_index not in rg_dropped and row_index not in down_completed and all(row_index in s for s in cell_up_completed):
task = Task(column=down, row_group=row_group, row_index=row_index, task_type="cell")
self._frontier.add(task)
```
How can I resolve this? If you propose a fix, please make it concise.
Summary
PR 1 of 4 in the async generators & task-queue builder plan. Adds the foundational data structures —
ExecutionGraph,CompletionTracker, andTask/TaskResult/TaskTrace— that the async scheduler (PR 3) will consume. No existing behavior changes; all new modules underengine/dataset_builders/utils/.Changes
Added
execution_graph.py— Column-level DAG built from config dependencies. Supports topological ordering (Kahn's, cached), critical path, cell-level dependency resolution, side-effect column mapping, Mermaid visualization, upfront task count estimation, cachedupstream_by_strategy, and acreate()factory classmethod.completion_tracker.py— Tracks per-cell and per-batch completion state across row groups. Uses an event-driven frontier — readiness is computed incrementally onmark_complete/mark_batch_complete/drop_rowvia_enqueue_downstream, soget_ready_tasksreturns in O(frontier) instead of scanning all columns × rows × row groups (O(C × R × G)) per tick. Handles row drops and batch-level markers.task_model.py— Frozen dataclasses forTask(hashable work unit),TaskResult(outcome), andTaskTrace(timing trace). IncludesColumnName,RowGroupIndex,RowIndextype aliases for self-documenting signatures.test_execution_graph.py(381 lines) — Tests for graph construction, topological order, critical path, cell dependencies, side-effects, Mermaid output, cycle detection, task counts.test_completion_tracker.py(257 lines) — Tests for mark/query, batch completion, row drops, frontier-based readiness resolution, multi-row-group scenarios.test_task_model.py(87 lines) — Tests for equality, hashing, set membership, defaults.Changed
Total: +1,250 / -29 lines across 9 files (6 new, 3 modified). ~58% of added lines are tests (725 test / 506 source).
Attention Areas
completion_tracker.py— Event-driven frontier logic in_enqueue_downstreamand_reevaluate_batch_tasks. This is the core optimization: cell completions do O(fan_out), batch completions check downstream rows, andget_ready_tasksis just a frontier filter.execution_graph.py— Core DAG logic. Thecell_dependenciesmethod resolves side-effect columns and maps generation strategy to readiness granularity (cell vs batch).upstream_by_strategyis cached and used by the frontier logic. This is the contract that PR 3's scheduler will rely on.Test plan
pytest tests/engine/dataset_builders/utils/)make check-allpasses (lint + format)Description updated with AI