Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(
dataset_metadata: DatasetMetadata | None = None,
dataset: pd.DataFrame | None = None,
analysis: DatasetProfilerResults | None = None,
processor_artifacts: dict[str, list[str] | str] | None = None,
processor_artifacts: dict[str, list[dict]] | None = None,
):
"""Creates a new instance with results from a Data Designer preview run.

Expand All @@ -35,6 +35,6 @@ def __init__(
"""
self.dataset: pd.DataFrame | None = dataset
self.analysis: DatasetProfilerResults | None = analysis
self.processor_artifacts: dict[str, list[str] | str] | None = processor_artifacts
self.processor_artifacts: dict[str, list[dict]] | None = processor_artifacts
self.dataset_metadata: DatasetMetadata | None = dataset_metadata
self._config_builder = config_builder
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,30 @@ def save_config_file(file_path: Path, config: dict) -> None:
)


def list_processor_names(processors_outputs_path: Path) -> list[str]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These processor helpers are standalone (not on ArtifactStorage) because the service layer uses them directly without an ArtifactStorage instance.

"""Discover processor names from directories and parquet files under the given path."""
if not processors_outputs_path.exists():
return []
names: dict[str, None] = {}
for entry in sorted(processors_outputs_path.iterdir()):
if entry.is_dir():
names[entry.name] = None
elif entry.suffix == ".parquet":
names[entry.stem] = None
return list(names)


def load_processor_dataset(processors_outputs_path: Path, processor_name: str) -> pd.DataFrame:
"""Load a processor's output dataset, checking for a directory first then a single parquet file."""
dir_path = processors_outputs_path / processor_name
file_path = processors_outputs_path / f"{processor_name}.parquet"
if dir_path.is_dir():
return read_parquet_dataset(dir_path)
if file_path.is_file():
return read_parquet_dataset(file_path)
raise FileNotFoundError(f"No artifacts found for processor named {processor_name!r}")


def read_parquet_dataset(path: Path) -> pd.DataFrame:
"""Read a parquet dataset from a path.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pydantic import BaseModel, ConfigDict, PrivateAttr, field_validator, model_validator

import data_designer.lazy_heavy_imports as lazy
from data_designer.config.utils.io_helpers import read_parquet_dataset
from data_designer.config.utils.io_helpers import list_processor_names, load_processor_dataset, read_parquet_dataset
from data_designer.config.utils.type_helpers import StrEnum, resolve_string_enum
from data_designer.engine.dataset_builders.errors import ArtifactStorageError
from data_designer.engine.storage.media_storage import MediaStorage, StorageMode
Expand Down Expand Up @@ -169,6 +169,17 @@ def create_batch_file_path(
def load_dataset(self, batch_stage: BatchStage = BatchStage.FINAL_RESULT) -> pd.DataFrame:
return read_parquet_dataset(self._get_stage_path(batch_stage))

def load_processor_dataset(self, processor_name: str) -> pd.DataFrame:
"""Load a processor's output dataset. Raises ArtifactStorageError if not found."""
try:
return load_processor_dataset(self.processors_outputs_path, processor_name)
except FileNotFoundError as e:
raise ArtifactStorageError(str(e)) from e

def list_processor_names(self) -> list[str]:
"""Discover processor names from the processor outputs directory."""
return list_processor_names(self.processors_outputs_path)

def load_dataset_with_dropped_columns(self) -> pd.DataFrame:
# The pyarrow backend has better support for nested data types.
df = self.load_dataset()
Expand Down Expand Up @@ -241,15 +252,15 @@ def get_processor_file_paths(self) -> dict[str, list[str]]:
Dictionary mapping processor names to lists of relative file paths.
"""
processor_files: dict[str, list[str]] = {}
if self.processors_outputs_path.exists():
for processor_dir in sorted(self.processors_outputs_path.iterdir()):
if processor_dir.is_dir():
processor_name = processor_dir.name
processor_files[processor_name] = [
str(f.relative_to(self.base_dataset_path))
for f in sorted(processor_dir.rglob("*"))
if f.is_file()
]
for name in self.list_processor_names():
dir_path = self.processors_outputs_path / name
file_path = self.processors_outputs_path / f"{name}.parquet"
if dir_path.is_dir():
processor_files[name] = [
str(f.relative_to(self.base_dataset_path)) for f in sorted(dir_path.rglob("*")) if f.is_file()
]
elif file_path.is_file():
processor_files[name] = [str(file_path.relative_to(self.base_dataset_path))]
return processor_files

def get_file_paths(self) -> dict[str, list[str] | dict[str, list[str]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pyarrow import ArrowNotImplementedError

import data_designer.lazy_heavy_imports as lazy
from data_designer.config.utils.io_helpers import load_processor_dataset
from data_designer.engine.dataset_builders.errors import ArtifactStorageError
from data_designer.engine.storage.artifact_storage import ArtifactStorage, BatchStage

Expand Down Expand Up @@ -269,6 +270,54 @@ def test_get_processor_file_paths_with_files(stub_artifact_storage):
assert len(paths["processor2"]) == 3


def test_get_processor_file_paths_with_single_files(stub_artifact_storage):
"""Test get_processor_file_paths picks up single parquet files."""
stub_artifact_storage.mkdir_if_needed(stub_artifact_storage.processors_outputs_path)
(stub_artifact_storage.processors_outputs_path / "preview.parquet").touch()

paths = stub_artifact_storage.get_processor_file_paths()
assert "preview" in paths
assert len(paths["preview"]) == 1


def test_list_processor_names(stub_artifact_storage):
assert stub_artifact_storage.list_processor_names() == []

# Directory-based processor
proc_dir = stub_artifact_storage.processors_outputs_path / "batched"
stub_artifact_storage.mkdir_if_needed(proc_dir)
(proc_dir / "batch_00000.parquet").touch()
# Single-file processor
(stub_artifact_storage.processors_outputs_path / "preview.parquet").touch()
# Duplicate: both dir and file with same name
dup_dir = stub_artifact_storage.processors_outputs_path / "both"
dup_dir.mkdir()
(dup_dir / "batch_00000.parquet").touch()
(stub_artifact_storage.processors_outputs_path / "both.parquet").touch()

assert stub_artifact_storage.list_processor_names() == ["batched", "both", "preview"]


@pytest.mark.parametrize("write_as_dir", [True, False], ids=["directory", "single_file"])
def test_load_processor_dataset(stub_artifact_storage, stub_sample_dataframe, write_as_dir):
if write_as_dir:
stub_artifact_storage.write_batch_to_parquet_file(
0, stub_sample_dataframe, BatchStage.PROCESSORS_OUTPUTS, subfolder="chat_format"
)
else:
stub_artifact_storage.write_parquet_file(
"chat_format.parquet", stub_sample_dataframe, BatchStage.PROCESSORS_OUTPUTS
)

result = stub_artifact_storage.load_processor_dataset("chat_format")
lazy.pd.testing.assert_frame_equal(result, stub_sample_dataframe, check_dtype=False)


def test_load_processor_dataset_not_found(stub_artifact_storage):
with pytest.raises(ArtifactStorageError, match="No artifacts found"):
stub_artifact_storage.load_processor_dataset("nonexistent")


def test_read_metadata_success(stub_artifact_storage):
"""Test read_metadata successfully reads metadata file."""
metadata = {"key1": "value1", "key2": 123}
Expand Down Expand Up @@ -357,3 +406,9 @@ def test_update_metadata_with_nested_structures(stub_artifact_storage):
assert final_metadata["nested"] == {"c": 3} # Replaced, not merged
assert final_metadata["list"] == [1, 2, 3] # Unchanged
assert final_metadata["new_list"] == [4, 5, 6]


def test_standalone_load_processor_dataset_raises_file_not_found(tmp_path):
"""Standalone function raises FileNotFoundError (not ArtifactStorageError)."""
with pytest.raises(FileNotFoundError, match="No artifacts found"):
load_processor_dataset(tmp_path, "nonexistent")
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,9 @@ def preview(
except Exception as e:
raise DataDesignerProfilingError(f"🛑 Error profiling preview dataset: {e}")

if builder.artifact_storage.processors_outputs_path.exists():
processor_artifacts = {
processor_config.name: lazy.pd.read_parquet(
builder.artifact_storage.processors_outputs_path / f"{processor_config.name}.parquet",
dtype_backend="pyarrow",
).to_dict(orient="records")
for processor_config in config_builder.get_processor_configs()
}
else:
processor_artifacts = {}
processor_artifacts: dict[str, list[dict]] = {}
for name in builder.artifact_storage.list_processor_names():
processor_artifacts[name] = builder.artifact_storage.load_processor_dataset(name).to_dict(orient="records")

if (
len(processed_dataset) > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,7 @@ def load_processor_dataset(self, processor_name: str) -> pd.DataFrame:
Returns:
A pandas DataFrame containing the dataset generated by the processor.
"""
try:
dataset = self.artifact_storage.read_parquet_files(
self.artifact_storage.processors_outputs_path / processor_name
)
except Exception as e:
raise ArtifactStorageError(f"Failed to load dataset for processor {processor_name}: {e}")

return dataset
return self.artifact_storage.load_processor_dataset(processor_name)

def get_path_to_processor_artifacts(self, processor_name: str) -> Path:
"""Get the path to the artifacts generated by a processor.
Expand Down