Skip to content

Resume replays ordered seed rows after completed checkpoints #709

@shan-nvidia

Description

@shan-nvidia

Summary

ResumeMode.ALWAYS can replay already-consumed rows for ordered seed datasets after an interrupted run. The resume machinery correctly skips completed batch IDs, but SeedDatasetColumnGenerator is re-created with a fresh seed reader and starts reading from the beginning of the configured IndexRange.

I hit this while moving data-designer-retrieval-sdg to the new native resume API in DataDesigner 0.6.0. An interrupted run resumed from the expected batch, but the final dataset duplicated the first seed document instead of continuing to the next seed document.

Versions checked

  • Published package: data-designer==0.6.0
  • Local main: efdfd7ad8c31e562bad9f39db073a2586765e983

The relevant implementation still looks the same on local main.

Minimal repro

This reproduces without retrieval-sdg, model calls, or external services. It uses a small in-memory DataFrameSeedSource, SamplingStrategy.ORDERED, and buffer_size=1.

from __future__ import annotations

from pathlib import Path
from uuid import uuid4

import pandas as pd

import data_designer.config as dd
from data_designer.config.seed_source_dataframe import DataFrameSeedSource
from data_designer.engine.storage.artifact_storage import ResumeMode
from data_designer.interface import DataDesigner


class StopAfterFirstBatch(RuntimeError):
    pass


def build_config() -> dd.DataDesignerConfigBuilder:
    builder = dd.DataDesignerConfigBuilder()
    builder.with_seed_dataset(
        DataFrameSeedSource(df=pd.DataFrame({"name": ["alpha", "beta", "gamma"]})),
        sampling_strategy=dd.SamplingStrategy.ORDERED,
        selection_strategy=dd.IndexRange(start=0, end=2),
    )
    builder.add_column(dd.ExpressionColumnConfig(name="copy", expr="{{ name }}"))
    return builder


def main() -> None:
    base = Path(f"/tmp/dd-seed-resume-repro-{uuid4().hex}")
    dataset_name = "seed_resume"
    builder = build_config()

    designer = DataDesigner(artifact_path=base)
    designer.set_run_config(dd.RunConfig(disable_early_shutdown=True, buffer_size=1))

    provider = designer._create_resource_provider(dataset_name, builder, resume=ResumeMode.NEVER)
    dataset_builder = designer._create_dataset_builder(builder.build(), provider)

    def stop(_path: Path) -> None:
        raise StopAfterFirstBatch("simulated interruption after first checkpoint")

    try:
        dataset_builder.build(num_records=3, on_batch_complete=stop, resume=ResumeMode.NEVER)
    except StopAfterFirstBatch:
        pass

    resumed = designer.create(builder, num_records=3, dataset_name=dataset_name, resume=ResumeMode.ALWAYS)
    print(resumed.load_dataset()["name"].tolist())


if __name__ == "__main__":
    main()

Run with the sync engine:

DATA_DESIGNER_ASYNC_ENGINE=0 uv run python /tmp/dd_seed_resume_repro.py

Actual output:

['alpha', 'alpha', 'beta']

Expected output:

['alpha', 'beta', 'gamma']

Why this appears to happen

Sync resume restores progress at the batch manager layer:

self.batch_manager.start(
    num_records=num_records,
    buffer_size=buffer_size,
    start_batch=state.num_completed_batches,
    initial_actual_num_records=state.actual_num_records,
    ...
)

Then it runs remaining batch IDs:

for batch_idx in range(state.num_completed_batches, self.batch_manager.num_batches):
    self._run_batch(...)

But SeedDatasetColumnGenerator initializes a new reader on the resumed process:

def generate_from_scratch(self, num_records: int) -> pd.DataFrame:
    if self._batch_reader is None:
        self._reset_batch_reader(num_records)
    return self._sample_records(num_records)

def _reset_batch_reader(self, num_records: int) -> None:
    self._batch_reader = self.resource_provider.seed_reader.create_batch_reader(
        batch_size=num_records,
        index_range=self._index_range,
        shuffle=shuffle,
    )

There does not appear to be a resume offset passed into the seed generator/reader, so the first resumed batch samples from the beginning of the original IndexRange again.

The async resume path seems structurally similar: it computes remaining row groups, but a newly initialized ordered seed generator would still need to seek to the row group's absolute seed offset before generating that row group.

Impact

This makes native resume unsafe for ordered seed-reader pipelines where seed identity matters. In retrieval SDG, a resumed run can silently duplicate one document and skip another while still reporting a successful resume/export.

Suggested direction

Resume should preserve ordered seed-reader position across completed batches/row groups. Possible fixes:

  • Pass an absolute resume offset into SeedDatasetColumnGenerator before resumed generation.
  • Adjust the resolved IndexRange for each resumed batch/row group based on completed records and row-group ID.
  • Add regression coverage that interrupts an ordered seed dataset run after one checkpoint and asserts the final seed row sequence remains unchanged after resume.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions