diff --git a/packages/data-designer-config/src/data_designer/config/preview_results.py b/packages/data-designer-config/src/data_designer/config/preview_results.py index 424bfc3c0..d7804e0da 100644 --- a/packages/data-designer-config/src/data_designer/config/preview_results.py +++ b/packages/data-designer-config/src/data_designer/config/preview_results.py @@ -22,7 +22,7 @@ def __init__( dataset_metadata: DatasetMetadata | None = None, dataset: pd.DataFrame | None = None, analysis: DatasetProfilerResults | None = None, - processor_artifacts: dict[str, list[str] | str] | None = None, + processor_artifacts: dict[str, list[dict]] | None = None, ): """Creates a new instance with results from a Data Designer preview run. @@ -35,6 +35,6 @@ def __init__( """ self.dataset: pd.DataFrame | None = dataset self.analysis: DatasetProfilerResults | None = analysis - self.processor_artifacts: dict[str, list[str] | str] | None = processor_artifacts + self.processor_artifacts: dict[str, list[dict]] | None = processor_artifacts self.dataset_metadata: DatasetMetadata | None = dataset_metadata self._config_builder = config_builder diff --git a/packages/data-designer-config/src/data_designer/config/utils/io_helpers.py b/packages/data-designer-config/src/data_designer/config/utils/io_helpers.py index 021430a4f..8247a36b7 100644 --- a/packages/data-designer-config/src/data_designer/config/utils/io_helpers.py +++ b/packages/data-designer-config/src/data_designer/config/utils/io_helpers.py @@ -94,6 +94,30 @@ def save_config_file(file_path: Path, config: dict) -> None: ) +def list_processor_names(processors_outputs_path: Path) -> list[str]: + """Discover processor names from directories and parquet files under the given path.""" + if not processors_outputs_path.exists(): + return [] + names: dict[str, None] = {} + for entry in sorted(processors_outputs_path.iterdir()): + if entry.is_dir(): + names[entry.name] = None + elif entry.suffix == ".parquet": + names[entry.stem] = None + return list(names) + + +def load_processor_dataset(processors_outputs_path: Path, processor_name: str) -> pd.DataFrame: + """Load a processor's output dataset, checking for a directory first then a single parquet file.""" + dir_path = processors_outputs_path / processor_name + file_path = processors_outputs_path / f"{processor_name}.parquet" + if dir_path.is_dir(): + return read_parquet_dataset(dir_path) + if file_path.is_file(): + return read_parquet_dataset(file_path) + raise FileNotFoundError(f"No artifacts found for processor named {processor_name!r}") + + def read_parquet_dataset(path: Path) -> pd.DataFrame: """Read a parquet dataset from a path. diff --git a/packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py b/packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py index 10abf6b75..458eed689 100644 --- a/packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py +++ b/packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py @@ -14,7 +14,7 @@ from pydantic import BaseModel, ConfigDict, PrivateAttr, field_validator, model_validator import data_designer.lazy_heavy_imports as lazy -from data_designer.config.utils.io_helpers import read_parquet_dataset +from data_designer.config.utils.io_helpers import list_processor_names, load_processor_dataset, read_parquet_dataset from data_designer.config.utils.type_helpers import StrEnum, resolve_string_enum from data_designer.engine.dataset_builders.errors import ArtifactStorageError from data_designer.engine.storage.media_storage import MediaStorage, StorageMode @@ -169,6 +169,17 @@ def create_batch_file_path( def load_dataset(self, batch_stage: BatchStage = BatchStage.FINAL_RESULT) -> pd.DataFrame: return read_parquet_dataset(self._get_stage_path(batch_stage)) + def load_processor_dataset(self, processor_name: str) -> pd.DataFrame: + """Load a processor's output dataset. Raises ArtifactStorageError if not found.""" + try: + return load_processor_dataset(self.processors_outputs_path, processor_name) + except FileNotFoundError as e: + raise ArtifactStorageError(str(e)) from e + + def list_processor_names(self) -> list[str]: + """Discover processor names from the processor outputs directory.""" + return list_processor_names(self.processors_outputs_path) + def load_dataset_with_dropped_columns(self) -> pd.DataFrame: # The pyarrow backend has better support for nested data types. df = self.load_dataset() @@ -241,15 +252,15 @@ def get_processor_file_paths(self) -> dict[str, list[str]]: Dictionary mapping processor names to lists of relative file paths. """ processor_files: dict[str, list[str]] = {} - if self.processors_outputs_path.exists(): - for processor_dir in sorted(self.processors_outputs_path.iterdir()): - if processor_dir.is_dir(): - processor_name = processor_dir.name - processor_files[processor_name] = [ - str(f.relative_to(self.base_dataset_path)) - for f in sorted(processor_dir.rglob("*")) - if f.is_file() - ] + for name in self.list_processor_names(): + dir_path = self.processors_outputs_path / name + file_path = self.processors_outputs_path / f"{name}.parquet" + if dir_path.is_dir(): + processor_files[name] = [ + str(f.relative_to(self.base_dataset_path)) for f in sorted(dir_path.rglob("*")) if f.is_file() + ] + elif file_path.is_file(): + processor_files[name] = [str(file_path.relative_to(self.base_dataset_path))] return processor_files def get_file_paths(self) -> dict[str, list[str] | dict[str, list[str]]]: diff --git a/packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py b/packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py index 02f830ecb..6206d5bbc 100644 --- a/packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py +++ b/packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py @@ -11,6 +11,7 @@ from pyarrow import ArrowNotImplementedError import data_designer.lazy_heavy_imports as lazy +from data_designer.config.utils.io_helpers import load_processor_dataset from data_designer.engine.dataset_builders.errors import ArtifactStorageError from data_designer.engine.storage.artifact_storage import ArtifactStorage, BatchStage @@ -269,6 +270,54 @@ def test_get_processor_file_paths_with_files(stub_artifact_storage): assert len(paths["processor2"]) == 3 +def test_get_processor_file_paths_with_single_files(stub_artifact_storage): + """Test get_processor_file_paths picks up single parquet files.""" + stub_artifact_storage.mkdir_if_needed(stub_artifact_storage.processors_outputs_path) + (stub_artifact_storage.processors_outputs_path / "preview.parquet").touch() + + paths = stub_artifact_storage.get_processor_file_paths() + assert "preview" in paths + assert len(paths["preview"]) == 1 + + +def test_list_processor_names(stub_artifact_storage): + assert stub_artifact_storage.list_processor_names() == [] + + # Directory-based processor + proc_dir = stub_artifact_storage.processors_outputs_path / "batched" + stub_artifact_storage.mkdir_if_needed(proc_dir) + (proc_dir / "batch_00000.parquet").touch() + # Single-file processor + (stub_artifact_storage.processors_outputs_path / "preview.parquet").touch() + # Duplicate: both dir and file with same name + dup_dir = stub_artifact_storage.processors_outputs_path / "both" + dup_dir.mkdir() + (dup_dir / "batch_00000.parquet").touch() + (stub_artifact_storage.processors_outputs_path / "both.parquet").touch() + + assert stub_artifact_storage.list_processor_names() == ["batched", "both", "preview"] + + +@pytest.mark.parametrize("write_as_dir", [True, False], ids=["directory", "single_file"]) +def test_load_processor_dataset(stub_artifact_storage, stub_sample_dataframe, write_as_dir): + if write_as_dir: + stub_artifact_storage.write_batch_to_parquet_file( + 0, stub_sample_dataframe, BatchStage.PROCESSORS_OUTPUTS, subfolder="chat_format" + ) + else: + stub_artifact_storage.write_parquet_file( + "chat_format.parquet", stub_sample_dataframe, BatchStage.PROCESSORS_OUTPUTS + ) + + result = stub_artifact_storage.load_processor_dataset("chat_format") + lazy.pd.testing.assert_frame_equal(result, stub_sample_dataframe, check_dtype=False) + + +def test_load_processor_dataset_not_found(stub_artifact_storage): + with pytest.raises(ArtifactStorageError, match="No artifacts found"): + stub_artifact_storage.load_processor_dataset("nonexistent") + + def test_read_metadata_success(stub_artifact_storage): """Test read_metadata successfully reads metadata file.""" metadata = {"key1": "value1", "key2": 123} @@ -357,3 +406,9 @@ def test_update_metadata_with_nested_structures(stub_artifact_storage): assert final_metadata["nested"] == {"c": 3} # Replaced, not merged assert final_metadata["list"] == [1, 2, 3] # Unchanged assert final_metadata["new_list"] == [4, 5, 6] + + +def test_standalone_load_processor_dataset_raises_file_not_found(tmp_path): + """Standalone function raises FileNotFoundError (not ArtifactStorageError).""" + with pytest.raises(FileNotFoundError, match="No artifacts found"): + load_processor_dataset(tmp_path, "nonexistent") diff --git a/packages/data-designer/src/data_designer/interface/data_designer.py b/packages/data-designer/src/data_designer/interface/data_designer.py index a2b7f74d3..0293dfd12 100644 --- a/packages/data-designer/src/data_designer/interface/data_designer.py +++ b/packages/data-designer/src/data_designer/interface/data_designer.py @@ -279,16 +279,9 @@ def preview( except Exception as e: raise DataDesignerProfilingError(f"🛑 Error profiling preview dataset: {e}") - if builder.artifact_storage.processors_outputs_path.exists(): - processor_artifacts = { - processor_config.name: lazy.pd.read_parquet( - builder.artifact_storage.processors_outputs_path / f"{processor_config.name}.parquet", - dtype_backend="pyarrow", - ).to_dict(orient="records") - for processor_config in config_builder.get_processor_configs() - } - else: - processor_artifacts = {} + processor_artifacts: dict[str, list[dict]] = {} + for name in builder.artifact_storage.list_processor_names(): + processor_artifacts[name] = builder.artifact_storage.load_processor_dataset(name).to_dict(orient="records") if ( len(processed_dataset) > 0 diff --git a/packages/data-designer/src/data_designer/interface/results.py b/packages/data-designer/src/data_designer/interface/results.py index 6ad049ce8..4a4006a95 100644 --- a/packages/data-designer/src/data_designer/interface/results.py +++ b/packages/data-designer/src/data_designer/interface/results.py @@ -75,14 +75,7 @@ def load_processor_dataset(self, processor_name: str) -> pd.DataFrame: Returns: A pandas DataFrame containing the dataset generated by the processor. """ - try: - dataset = self.artifact_storage.read_parquet_files( - self.artifact_storage.processors_outputs_path / processor_name - ) - except Exception as e: - raise ArtifactStorageError(f"Failed to load dataset for processor {processor_name}: {e}") - - return dataset + return self.artifact_storage.load_processor_dataset(processor_name) def get_path_to_processor_artifacts(self, processor_name: str) -> Path: """Get the path to the artifacts generated by a processor.