-
Notifications
You must be signed in to change notification settings - Fork 45
feat(DpathExtractor): Add RecordExpander component for nested array extraction #859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Alfredo Garcia (agarctfi)
merged 20 commits into
main
from
devin/1764690419-dpath-extractor-expansion
Apr 6, 2026
Merged
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
d077d99
feat: Add expand_records_from_field and remain_original_record to Dpa…
devin-ai-integration[bot] 81d4630
style: Fix ruff formatting in dpath_extractor.py
devin-ai-integration[bot] c7ac5f2
style: Fix ruff formatting in test_dpath_extractor.py
devin-ai-integration[bot] 24c8ac9
fix: Add type annotation for _expand_path to fix MyPy error
devin-ai-integration[bot] 91690f4
refactor: Extract record expansion logic into RecordExpander class
devin-ai-integration[bot] c035138
feat: Add RecordExpander to declarative component schema
devin-ai-integration[bot] b04e174
refactor: Clean up DpathExtractor extract_records logic
devin-ai-integration[bot] c8a2643
fix: Update RecordExpander to return nothing when path doesn't exist
devin-ai-integration[bot] c6a9d05
feat: Add wildcard support to RecordExpander and remove TypeError
devin-ai-integration[bot] c6448e5
fix: Add type casts for dpath.values and dpath.get to fix MyPy errors
devin-ai-integration[bot] 6afe474
refactor: Eliminate code duplication in expand_record method
devin-ai-integration[bot] 5b0c0d5
refactor: Simplify expand_record per code review feedback
devin-ai-integration[bot] 2ca9ad7
feat: Add on_no_records and parent_fields_to_copy to RecordExpander
devin-ai-integration[bot] 1aadd2f
Add missing import
agarctfi 1f31837
Auto-fix lint and format issues
f6cf99c
fix: Use Sequence instead of list for covariant type annotations in R…
devin-ai-integration[bot] ead1747
refactor(RecordExpander): Remove ParentFieldMapping to delegate to Re…
agarctfi 967dffd
refactor(RecordExpander): Add enum for on_no_records
agarctfi 5e3b13b
Auto-fix lint and format issues
939d374
fix: harden record expander and clean up stripe subscription_items ex…
agarctfi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| # | ||
| # Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| from airbyte_cdk.sources.declarative.expanders.record_expander import ( | ||
| ParentFieldMapping, | ||
| RecordExpander, | ||
| ) | ||
|
|
||
| __all__ = ["ParentFieldMapping", "RecordExpander"] |
153 changes: 153 additions & 0 deletions
153
airbyte_cdk/sources/declarative/expanders/record_expander.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| # | ||
| # Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| from dataclasses import InitVar, dataclass, field | ||
| from typing import Any, Iterable, Mapping, MutableMapping | ||
|
|
||
| import dpath | ||
|
|
||
| from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString | ||
| from airbyte_cdk.sources.types import Config | ||
|
|
||
|
|
||
| @dataclass | ||
| class ParentFieldMapping: | ||
| """Defines a mapping from a parent record field to a child record field.""" | ||
|
|
||
| source_field_path: list[str | InterpolatedString] | ||
| target_field: str | ||
| config: Config | ||
| parameters: InitVar[Mapping[str, Any]] | ||
|
|
||
| def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
| self._source_path = [ | ||
| InterpolatedString.create(path, parameters=parameters) | ||
| for path in self.source_field_path | ||
| ] | ||
|
|
||
| def copy_field( | ||
| self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any] | ||
| ) -> None: | ||
| """Copy a field from parent record to child record.""" | ||
| source_path = [path.eval(self.config) for path in self._source_path] | ||
| try: | ||
| value = dpath.get(dict(parent_record), source_path) | ||
| child_record[self.target_field] = value | ||
| except KeyError: | ||
|
github-code-quality[bot] marked this conversation as resolved.
Fixed
|
||
| pass | ||
|
|
||
|
|
||
| @dataclass | ||
| class RecordExpander: | ||
| """Expands records by extracting items from a nested array field. | ||
|
|
||
| When configured, this component extracts items from a specified nested array path | ||
| within each record and emits each item as a separate record. Optionally, the original | ||
| parent record can be embedded in each expanded item for context preservation. | ||
|
|
||
| The expand_records_from_field path supports wildcards (*) for matching multiple arrays. | ||
| When wildcards are used, items from all matched arrays are extracted and emitted. | ||
|
|
||
| Examples of instantiating this component: | ||
| ``` | ||
| record_expander: | ||
| type: RecordExpander | ||
| expand_records_from_field: | ||
| - "lines" | ||
| - "data" | ||
| remain_original_record: true | ||
| ``` | ||
|
|
||
| ``` | ||
| record_expander: | ||
| type: RecordExpander | ||
| expand_records_from_field: | ||
| - "sections" | ||
| - "*" | ||
| - "items" | ||
| on_no_records: emit_parent | ||
| parent_fields_to_copy: | ||
| - type: ParentFieldMapping | ||
| source_field_path: ["id"] | ||
| target_field: "parent_id" | ||
| ``` | ||
|
|
||
| Attributes: | ||
| expand_records_from_field: Path to a nested array field within each record. | ||
| Items from this array will be extracted and emitted as separate records. | ||
| Supports wildcards (*). | ||
| remain_original_record: If True, each expanded record will include the original | ||
| parent record in an "original_record" field. Defaults to False. | ||
| on_no_records: Behavior when expansion produces no records. "skip" (default) | ||
| emits nothing. "emit_parent" emits the original parent record unchanged. | ||
| parent_fields_to_copy: List of field mappings to copy from parent to each | ||
| expanded child record. | ||
| config: The user-provided configuration as specified by the source's spec. | ||
| """ | ||
|
|
||
| expand_records_from_field: list[str | InterpolatedString] | ||
|
agarctfi marked this conversation as resolved.
Outdated
|
||
| config: Config | ||
| parameters: InitVar[Mapping[str, Any]] | ||
| remain_original_record: bool = False | ||
| on_no_records: str = "skip" | ||
|
agarctfi marked this conversation as resolved.
Outdated
|
||
| parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list) | ||
|
agarctfi marked this conversation as resolved.
Outdated
|
||
|
|
||
| def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
| self._expand_path: list[InterpolatedString] | None = [ | ||
|
agarctfi marked this conversation as resolved.
Outdated
|
||
| InterpolatedString.create(path, parameters=parameters) | ||
| for path in self.expand_records_from_field | ||
| ] | ||
|
|
||
| def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: | ||
| """Expand a record by extracting items from a nested array field.""" | ||
| if not self._expand_path: | ||
| yield record | ||
| return | ||
|
|
||
| parent_record = record | ||
| expand_path = [path.eval(self.config) for path in self._expand_path] | ||
| expanded_any = False | ||
|
|
||
| if "*" in expand_path: | ||
| extracted: Any = dpath.values(parent_record, expand_path) | ||
| for record in extracted: | ||
| if isinstance(record, list): | ||
| for item in record: | ||
| if isinstance(item, dict): | ||
| expanded_record = dict(item) | ||
| self._apply_parent_context(parent_record, expanded_record) | ||
| yield expanded_record | ||
| expanded_any = True | ||
| else: | ||
| yield item | ||
| expanded_any = True | ||
| else: | ||
| try: | ||
| extracted = dpath.get(parent_record, expand_path) | ||
| except KeyError: | ||
| extracted = None | ||
|
|
||
| if isinstance(extracted, list): | ||
| for item in extracted: | ||
| if isinstance(item, dict): | ||
| expanded_record = dict(item) | ||
| self._apply_parent_context(parent_record, expanded_record) | ||
| yield expanded_record | ||
| expanded_any = True | ||
| else: | ||
| yield item | ||
| expanded_any = True | ||
|
|
||
| if not expanded_any and self.on_no_records == "emit_parent": | ||
| yield parent_record | ||
|
|
||
| def _apply_parent_context( | ||
| self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any] | ||
| ) -> None: | ||
| """Apply parent context to a child record.""" | ||
| if self.remain_original_record: | ||
| child_record["original_record"] = parent_record | ||
|
|
||
| for field_mapping in self.parent_fields_to_copy: | ||
| field_mapping.copy_field(parent_record, child_record) | ||
|
agarctfi marked this conversation as resolved.
Outdated
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.