feat: Add block_simultaneous_read with stream_groups + fix default HTTP timeout#940
Conversation
…tream_groups - Remove block_simultaneous_read property from DeclarativeStream schema - Add top-level stream_groups with StreamGroup and BlockSimultaneousSyncsAction - ConcurrentDeclarativeSource parses stream_groups and injects block_simultaneous_read into stream configs before factory processing - Internal blocking logic in ConcurrentReadProcessor unchanged - Update tests for new interface Co-Authored-By: unknown <>
- Add stream_name_to_group parameter to ModelToComponentFactory.__init__() - Add set_stream_name_to_group() method for post-init configuration - Factory now looks up block_simultaneous_read from its own mapping - Remove config injection hack from ConcurrentDeclarativeSource.streams() - Update tests to use factory-based approach instead of extra fields Co-Authored-By: unknown <>
…oded dict - Test now defines stream_groups with references in the manifest YAML - Uses _build_stream_name_to_group() to derive the mapping from manifest - Removed test_set_stream_name_to_group (redundant with the manifest-based test) - Added ConcurrentDeclarativeSource import for _build_stream_name_to_group Co-Authored-By: unknown <>
Child streams that depend on parent streams should not be in the same group, as this would cause a deadlock (child needs to read parent). Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
- Factory now owns the stream_groups resolution via set_stream_groups(manifest) - ConcurrentDeclarativeSource just calls factory.set_stream_groups(manifest) - Removed _build_stream_name_to_group from ConcurrentDeclarativeSource - Updated tests to use factory's _build_stream_name_to_group directly Co-Authored-By: unknown <>
…of factory - Removed _build_stream_name_to_group, set_stream_groups, _stream_name_to_group from factory - Factory no longer knows about stream_groups at all - Added _apply_stream_groups to ConcurrentDeclarativeSource: creates streams first, then sets block_simultaneous_read on matching DefaultStream instances - Added block_simultaneous_read setter on DefaultStream - Replaced mock-based tests with parametrized tests using real DefaultStream instances Co-Authored-By: unknown <>
Replace hasattr chain in ConcurrentReadProcessor._collect_all_parent_stream_names with DefaultStream.get_partition_router() that safely traverses the internal partition_generator -> stream_slicer -> partition_router chain using isinstance checks. Co-Authored-By: unknown <>
_apply_stream_groups now checks that no stream shares a group with any of its parent streams (via get_partition_router). Raises ValueError at config time if a deadlock-causing configuration is detected. Co-Authored-By: unknown <>
… done Adds a safety check in is_done() that raises AirbyteTracedException (system_error) if streams remain in the partition generation queue after all streams are marked done. Also moves inline imports to module level and updates test mocks to use DefaultStream with get_partition_router(). Co-Authored-By: unknown <>
…d concurrent_declarative_source.py Co-Authored-By: unknown <>
…ct parent streams Co-Authored-By: unknown <>
…artition_router() Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
…ps check, and get_partition_router Co-Authored-By: unknown <>
…rents Co-Authored-By: unknown <>
…hema definitions Co-Authored-By: unknown <>
…er reading Co-Authored-By: unknown <>
Add default timeout of (30s connect, 300s read) to HttpClient.send_request(). When no explicit timeout is provided in request_kwargs, the default is injected before sending the request. This prevents requests.Session.send() from blocking indefinitely when a server stalls mid-response (e.g. after a 500 error retry). ConnectTimeout and ReadTimeout are already in TRANSIENT_EXCEPTIONS, so timeouts trigger automatic retries with exponential backoff. Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1772836274-block-simultaneous-read-with-timeout#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1772836274-block-simultaneous-read-with-timeoutPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR adds stream-level concurrency blocking via manifest-configured stream_groups (to prevent simultaneous reads of related/competing streams, including parent-child dependencies) and introduces a default HTTP request timeout to avoid indefinite hangs in HttpClient.
Changes:
- Add blocking/deferral logic to
ConcurrentReadProcessorusing per-stream group membership and parent discovery, plus runtime safety checks for stuck queues/groups. - Add default
(connect=30s, read=300s)timeout injection inHttpClient.send_request()when callers do not specify a timeout. - Extend declarative schema/models and concurrent declarative source logic to support manifest-level
stream_groups, with deadlock-prevention validation and corresponding unit tests.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py |
Implements group-based deferral, active tracking, parent discovery, and new stuck-sync safety checks. |
airbyte_cdk/sources/declarative/concurrent_declarative_source.py |
Applies stream_groups to created stream instances and validates parent/child deadlock configurations. |
airbyte_cdk/sources/streams/http/http_client.py |
Injects default connect/read timeouts into request kwargs before sending. |
airbyte_cdk/sources/streams/concurrent/default_stream.py |
Adds block_simultaneous_read to streams and a helper for extracting the partition router. |
airbyte_cdk/sources/streams/concurrent/adapters.py |
Exposes block_simultaneous_read via stream adapter. |
airbyte_cdk/sources/streams/concurrent/abstract_stream.py |
Introduces a default block_simultaneous_read property on the abstract stream interface. |
airbyte_cdk/sources/declarative/declarative_component_schema.yaml |
Adds top-level stream_groups and new schema definitions for StreamGroup/action. |
airbyte_cdk/sources/declarative/models/declarative_component_schema.py |
Adds generated Pydantic types/fields for stream_groups and related models. |
unit_tests/sources/streams/http/test_http_client.py |
Adds tests ensuring default timeout application and explicit-timeout preservation. |
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py |
Adds extensive tests for blocking/deferral semantics, parent/grandparent handling, and safety checks. |
unit_tests/sources/declarative/test_concurrent_declarative_source.py |
Adds tests for applying stream_groups and deadlock validation in concurrent declarative sources. |
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py |
Ensures factory-created streams default to no blocking group (groups applied post-creation). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| # Deactivate all parent streams now that partition generation is complete | ||
| # Parents were only needed to generate slices, they can now be reused | ||
| parent_streams = self._collect_all_parent_stream_names(stream_name) | ||
| for parent_stream_name in parent_streams: | ||
| if parent_stream_name in self._active_stream_names: | ||
| self._logger.debug(f"Removing '{parent_stream_name}' from active streams") | ||
| self._active_stream_names.discard(parent_stream_name) | ||
|
|
||
| # Remove from active groups | ||
| parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") | ||
| if parent_group: | ||
| if parent_group in self._active_groups: | ||
| self._active_groups[parent_group].discard(parent_stream_name) | ||
| if not self._active_groups[parent_group]: | ||
| del self._active_groups[parent_group] | ||
| self._logger.info( | ||
| f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " | ||
| f"partition generation completed for child '{stream_name}'. " | ||
| f"Blocked streams in the queue will be retried on next start_next_partition_generator call." | ||
| ) |
There was a problem hiding this comment.
on_partition_generation_completed() deactivates all parent streams by removing them from _active_stream_names / _active_groups unconditionally. This can prematurely clear a parent that is still active for its own sync (or is still needed by another child currently generating partitions), which would let blocked streams start concurrently and violate the blocking guarantees. Consider tracking parent-activation with a refcount (e.g., “active as parent of N children”) or only deactivating a parent when it is not generating partitions and has no running partitions of its own, and when no other child still depends on it.
| List of references to streams that belong to this group. | ||
| type: array | ||
| items: | ||
| anyOf: |
There was a problem hiding this comment.
StreamGroup.streams is described as “references to streams”, but the schema currently restricts items to #/definitions/DeclarativeStream objects. This doesn’t match the intended interface shown in the PR description (string refs like "#/definitions/my_stream") and will force users to duplicate full stream definitions inside stream_groups. Update the schema to accept reference strings (or $ref-style objects) and document the expected format consistently.
| List of references to streams that belong to this group. | |
| type: array | |
| items: | |
| anyOf: | |
| List of references to streams that belong to this group. | |
| Each entry may be a reference string (e.g. "#/definitions/my_stream"), | |
| a $ref-style object (e.g. { "$ref": "#/definitions/my_stream" }), | |
| or an inline DeclarativeStream definition. | |
| type: array | |
| items: | |
| anyOf: | |
| - type: string | |
| - type: object | |
| required: | |
| - "$ref" | |
| properties: | |
| "$ref": | |
| type: string | |
| additionalProperties: false |
| streams: List[str] = Field( | ||
| ..., | ||
| description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', |
There was a problem hiding this comment.
The generated Pydantic model defines StreamGroup.streams as List[str] (JSON reference strings), but the JSON schema in declarative_component_schema.yaml currently defines streams items as DeclarativeStream objects. This schema/model mismatch will cause validation/parsing failures depending on which path consumes the manifest. Align the model and schema (preferably by making the schema accept the same reference-string format used by the model and docs).
| streams: List[str] = Field( | |
| ..., | |
| description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', | |
| streams: List[Union[str, "DeclarativeStream"]] = Field( | |
| ..., | |
| description='List of streams that belong to this group. Each item can be either a JSON reference to a stream definition (e.g., "#/definitions/my_stream") or an inlined DeclarativeStream object.', |
| # Build stream_name -> group_name mapping from the resolved manifest | ||
| stream_name_to_group: Dict[str, str] = {} | ||
| for group_name, group_config in stream_groups.items(): | ||
| for stream_ref in group_config.get("streams", []): | ||
| if isinstance(stream_ref, dict): | ||
| stream_name = stream_ref.get("name", "") | ||
| if stream_name: | ||
| stream_name_to_group[stream_name] = group_name | ||
|
|
||
| # Validate no stream shares a group with any of its ancestor streams | ||
| stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} | ||
|
|
There was a problem hiding this comment.
_apply_stream_groups() only extracts stream names when stream_groups[*].streams entries are dicts with a name field. If stream_groups is meant to support JSON-reference strings (as documented in the generated model and PR description), those entries will currently be ignored and no groups will be applied. Expand parsing to handle the reference format you intend to support (e.g., strings like #/definitions/<stream_key> or {"$ref": ...}) and map them to created stream instances reliably.
| # Build stream_name -> group_name mapping from the resolved manifest | |
| stream_name_to_group: Dict[str, str] = {} | |
| for group_name, group_config in stream_groups.items(): | |
| for stream_ref in group_config.get("streams", []): | |
| if isinstance(stream_ref, dict): | |
| stream_name = stream_ref.get("name", "") | |
| if stream_name: | |
| stream_name_to_group[stream_name] = group_name | |
| # Validate no stream shares a group with any of its ancestor streams | |
| stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} | |
| # Map existing stream instance names for reliable resolution of references | |
| stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} | |
| def _resolve_stream_ref_to_name(stream_ref: Any) -> Optional[str]: | |
| """ | |
| Normalize a stream reference entry from stream_groups[*].streams into a stream name. | |
| Supported formats: | |
| - {"name": "<stream_name>"} | |
| - {"$ref": "#/definitions/<stream_name_or_key>"} | |
| - "#/definitions/<stream_name_or_key>" | |
| """ | |
| # Direct dict with explicit name (current behavior) | |
| if isinstance(stream_ref, Mapping): | |
| name_val = stream_ref.get("name") | |
| if isinstance(name_val, str) and name_val: | |
| return name_val | |
| ref_val = stream_ref.get("$ref") | |
| if isinstance(ref_val, str): | |
| ref_str = ref_val | |
| else: | |
| return None | |
| elif isinstance(stream_ref, str): | |
| ref_str = stream_ref | |
| else: | |
| return None | |
| # At this point, ref_str is a reference string. For simple JSON-pointer-like | |
| # references such as "#/definitions/<stream_key>", use the last path segment. | |
| ref_str = ref_str.strip() | |
| if not ref_str: | |
| return None | |
| # Ignore fragment prefix "#/" if present, then take the last segment | |
| if ref_str.startswith("#/"): | |
| path = ref_str[2:] | |
| elif ref_str.startswith("#"): | |
| path = ref_str[1:] | |
| else: | |
| path = ref_str | |
| candidate = path.split("/")[-1] if path else "" | |
| if not candidate: | |
| return None | |
| # Only return names that correspond to an actual created stream | |
| return candidate if candidate in stream_name_to_instance else None | |
| # Build stream_name -> group_name mapping from the resolved manifest | |
| stream_name_to_group: Dict[str, str] = {} | |
| for group_name, group_config in stream_groups.items(): | |
| for stream_ref in group_config.get("streams", []): | |
| stream_name = _resolve_stream_ref_to_name(stream_ref) | |
| if stream_name: | |
| stream_name_to_group[stream_name] = group_name | |
| # Validate no stream shares a group with any of its ancestor streams |
The pytest.raises(match=...) regex matches against the exception's string representation, which uses the 'message' field, not 'internal_message'. Updated the two is_done safety check tests to match against the correct message text. Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds manifest-driven stream grouping and blocking semantics to concurrent reads: streams can declare a blocking group, the concurrent read processor tracks active streams/groups and defers starting generators when groups or parent streams are active; schema, models, stream interfaces, processor logic, and tests updated accordingly. Changes
Sequence DiagramsequenceDiagram
participant Processor as ConcurrentReadProcessor
participant Manager as ActiveGroupManager
participant StreamA as Stream A<br/>(group: "g")
participant StreamB as Stream B<br/>(group: "g")
participant Router as PartitionRouter
rect rgba(100,150,200,0.5)
Note over Processor,Router: Stream A attempts to start
Processor->>Manager: is group "g" active for Stream A?
Manager-->>Processor: no
Processor->>Manager: mark Stream A active
Manager->>Manager: add Stream A to group "g"
Processor->>StreamA: start partition generator
end
rect rgba(150,100,200,0.5)
Note over Processor,Router: Stream B attempts to start while A active
Processor->>Manager: is group "g" active for Stream B?
Manager-->>Processor: yes (Stream A)
Processor-->>Processor: defer Stream B
end
rect rgba(200,150,100,0.5)
Note over Processor,Router: Stream A completes partition generation
Processor->>Manager: deactivate Stream A
Manager->>Manager: remove Stream A from group "g"
Processor->>Manager: check deferred queue -> Stream B available
Manager-->>Processor: Stream B unblocked
Processor->>StreamB: start partition generator
Processor->>Manager: mark Stream B active
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Would you like suggestions for targeted unit tests or focused review areas (e.g., deadlock validation, logging verbosity, or partition-router edge cases)? 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
4148-4159: Minor: Consider simplifying theanyOfwith single item.The
anyOfwrapper around a single$refat lines 4154-4155 is technically valid but unnecessary. Would simplifying it to a direct reference be cleaner, wdyt?streams: title: Streams description: > List of references to streams that belong to this group. type: array items: - anyOf: - - "$ref": "#/definitions/DeclarativeStream" + "$ref": "#/definitions/DeclarativeStream"That said, if this pattern is intentional for future extensibility (e.g., planning to add more stream types like
StateDelegatingStream), keepinganyOfmakes sense.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around lines 4148 - 4159, The streams.items currently uses an unnecessary anyOf wrapper around a single "$ref" (`#/definitions/DeclarativeStream`); replace the anyOf block with a direct reference by changing streams.items to use items: "$ref": "#/definitions/DeclarativeStream" (or items: { "$ref": "#/definitions/DeclarativeStream" } depending on style) to simplify the schema, unless you intentionally want the anyOf for planned future stream types—in that case leave a clarifying comment above streams noting the extensibility intent.unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)
5481-5519: Minor inconsistency in type checking approach, wdyt?The parametrized test uses
type(None)as one expected_type value but strings like"SubstreamPartitionRouter"for others. This makes the assertion logic (lines 5515-5519) a bit mixed.For consistency, you could either use actual type objects throughout or simplify by directly checking the router value:
♻️ Optional cleanup
`@pytest.mark.parametrize`( "stream_factory,expected_type", [ pytest.param( lambda: _make_default_stream("plain_stream"), - type(None), + None, id="no_partition_router_returns_none", ), pytest.param( lambda: _make_child_stream_with_parent("child", _make_default_stream("parent")), - "SubstreamPartitionRouter", + SubstreamPartitionRouter, id="substream_returns_substream_router", ), pytest.param( lambda: _make_child_stream_with_grouping_router( "child", _make_default_stream("parent") ), - "GroupingPartitionRouter", + GroupingPartitionRouter, id="grouping_returns_grouping_router", ), ], ) def test_get_partition_router(stream_factory, expected_type): """Test DefaultStream.get_partition_router returns the correct router type.""" - from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( - GroupingPartitionRouter, - ) - from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( - SubstreamPartitionRouter, - ) - stream = stream_factory() router = stream.get_partition_router() - if expected_type is type(None): + if expected_type is None: assert router is None - elif expected_type == "SubstreamPartitionRouter": - assert isinstance(router, SubstreamPartitionRouter) - elif expected_type == "GroupingPartitionRouter": - assert isinstance(router, GroupingPartitionRouter) + else: + assert isinstance(router, expected_type)(Would need to import the types at module level or in the parametrize decorator scope.)
This is purely a readability nit—the current implementation works fine!
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py` around lines 5481 - 5519, The test_get_partition_router parametrization mixes type(None) with string names for router types; change the param values to use the actual classes instead of strings (use SubstreamPartitionRouter and GroupingPartitionRouter in the pytest.param expected_type entries) and import those classes at the top of the test so the assertions can uniformly use isinstance(router, expected_type) (keep the type(None) case as is but check with "is None" when expected_type is type(None) or alternatively set expected_type to type(None) and handle uniformly).
5174-5240: Consider extracting common setup logic to reduce duplication, wdyt?The helper functions
_make_child_stream_with_parentand_make_child_stream_with_grouping_routershare substantial code for setting upConcurrentCursorFactory,InMemoryMessageRepository,EpochValueConcurrentStreamStateConverter,ConcurrentPerPartitionCursor, and the finalDefaultStreamcreation.You could extract a common helper that takes the partition_router as a parameter. Something like:
♻️ Optional refactor
+def _make_child_stream_from_router(child_name: str, partition_router) -> DefaultStream: + """Common logic for creating a child stream with a given partition router.""" + from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( + ConcurrentCursorFactory, + ConcurrentPerPartitionCursor, + ) + from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( + DeclarativePartitionFactory, + StreamSlicerPartitionGenerator, + ) + from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor + from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + EpochValueConcurrentStreamStateConverter, + ) + + cursor_factory = ConcurrentCursorFactory(lambda *args, **kwargs: Mock()) + message_repository = InMemoryMessageRepository() + state_converter = EpochValueConcurrentStreamStateConverter() + + per_partition_cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory, + partition_router=partition_router, + stream_name=child_name, + stream_namespace=None, + stream_state={}, + message_repository=message_repository, + connector_state_manager=Mock(), + connector_state_converter=state_converter, + cursor_field=Mock(cursor_field_key="updated_at"), + ) + + partition_factory = Mock(spec=DeclarativePartitionFactory) + partition_generator = StreamSlicerPartitionGenerator( + partition_factory=partition_factory, + stream_slicer=per_partition_cursor, + ) + + cursor = FinalStateCursor( + stream_name=child_name, stream_namespace=None, message_repository=message_repository + ) + return DefaultStream( + partition_generator=partition_generator, + name=child_name, + json_schema={}, + primary_key=[], + cursor_field=None, + logger=logging.getLogger(f"test.{child_name}"), + cursor=cursor, + )Then both helpers can create their specific router and call this shared function.
That said, this is test code where some duplication is often acceptable for clarity—feel free to keep as-is if you prefer the explicit setup!
Also applies to: 5356-5433
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py` around lines 5174 - 5240, Extract a shared helper (e.g., _build_child_stream(partition_router, child_name)) that encapsulates the duplicated setup: create ConcurrentCursorFactory, InMemoryMessageRepository, EpochValueConcurrentStreamStateConverter, the ConcurrentPerPartitionCursor (using the passed partition_router), the StreamSlicerPartitionGenerator (with a Mock DeclarativePartitionFactory), the FinalStateCursor, and return the DefaultStream; then change _make_child_stream_with_parent and _make_child_stream_with_grouping_router to only construct their specific SubstreamPartitionRouter/GroupingPartitionRouter and call this new helper (references: _make_child_stream_with_parent, _make_child_stream_with_grouping_router, ConcurrentCursorFactory, InMemoryMessageRepository, EpochValueConcurrentStreamStateConverter, ConcurrentPerPartitionCursor, StreamSlicerPartitionGenerator, DeclarativePartitionFactory, FinalStateCursor, DefaultStream).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py`:
- Around line 446-453: The current build of stream_name_to_group from
stream_groups silently allows duplicate stream entries (last group wins) and
later processing silently skips unknown/unsupported targets; change this to
fail-fast: when iterating stream_groups in the block that builds
stream_name_to_group, detect if a stream name already exists and raise a clear
manifest/config error (include the duplicate stream name and both group names)
instead of overwriting; additionally, in the downstream target-processing logic
(the code that checks targets and currently skips unknown/unsupported targets),
replace silent skips with validation that raises a manifest/config error when a
target refers to a stream not present in stream_name_to_group or when a target
type is unsupported, so typos and double assignments surface immediately (use
the symbols stream_groups, stream_name_to_group and the target-processing
function/method where unknown targets are handled to locate the changes).
In `@airbyte_cdk/sources/streams/concurrent/default_stream.py`:
- Around line 115-122: get_partition_router currently returns None when the
slicer is wrapped (e.g., by StreamSlicerPartitionGenerator/test-read decorators)
because it only checks isinstance(stream_slicer, ConcurrentPerPartitionCursor);
change it to unwrap decorator layers: after obtaining stream_slicer from
self._stream_partition_generator._stream_slicer, loop while the object exposes
an inner slicer (e.g., has attribute _stream_slicer or _wrapped) and set
stream_slicer = stream_slicer._stream_slicer (or stream_slicer._wrapped) until
you reach the underlying instance, then check isinstance(...,
ConcurrentPerPartitionCursor) and return its _partition_router; this preserves
existing behavior for unwrapped slicers and recovers the partition router when
decorators are present.
In `@airbyte_cdk/sources/streams/http/http_client.py`:
- Around line 88-89: The new module-level defaults _DEFAULT_CONNECT_TIMEOUT and
_DEFAULT_READ_TIMEOUT are being applied universally and can unexpectedly cap
declarative and HttpStream flows; update the code to allow callers to opt out or
override: add an optional timeout parameter (or allow timeout=None) to
HttpRequester.send_request and thread it through to where requests are made (and
to HttpStream._fetch_next_page usage), ensure the declarative requester
(requesters/http_requester.py) honors a passed-in timeout and does not
substitute the (30,300) default when None is provided, and add
documentation/comments and tests to cover both explicit override and opt-out
behavior so long-running/long-poll endpoints can bypass the hard 5-minute read
cap.
In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py`:
- Around line 5284-5338: Update the
test_block_simultaneous_read_from_stream_groups to include a top-level
stream_groups entry in the YAML fixture so the factory is exercised with group
data present; keep the rest of the fixture the same and still assert that
factory.create_component (ModelToComponentFactory.create_component invoked with
DeclarativeStreamModel) returns a DefaultStream with name "parent" and
block_simultaneous_read == "" — this ensures the factory does not consume
top-level stream_groups while still verifying the default behavior.
---
Nitpick comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 4148-4159: The streams.items currently uses an unnecessary anyOf
wrapper around a single "$ref" (`#/definitions/DeclarativeStream`); replace the
anyOf block with a direct reference by changing streams.items to use items:
"$ref": "#/definitions/DeclarativeStream" (or items: { "$ref":
"#/definitions/DeclarativeStream" } depending on style) to simplify the schema,
unless you intentionally want the anyOf for planned future stream types—in that
case leave a clarifying comment above streams noting the extensibility intent.
In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py`:
- Around line 5481-5519: The test_get_partition_router parametrization mixes
type(None) with string names for router types; change the param values to use
the actual classes instead of strings (use SubstreamPartitionRouter and
GroupingPartitionRouter in the pytest.param expected_type entries) and import
those classes at the top of the test so the assertions can uniformly use
isinstance(router, expected_type) (keep the type(None) case as is but check with
"is None" when expected_type is type(None) or alternatively set expected_type to
type(None) and handle uniformly).
- Around line 5174-5240: Extract a shared helper (e.g.,
_build_child_stream(partition_router, child_name)) that encapsulates the
duplicated setup: create ConcurrentCursorFactory, InMemoryMessageRepository,
EpochValueConcurrentStreamStateConverter, the ConcurrentPerPartitionCursor
(using the passed partition_router), the StreamSlicerPartitionGenerator (with a
Mock DeclarativePartitionFactory), the FinalStateCursor, and return the
DefaultStream; then change _make_child_stream_with_parent and
_make_child_stream_with_grouping_router to only construct their specific
SubstreamPartitionRouter/GroupingPartitionRouter and call this new helper
(references: _make_child_stream_with_parent,
_make_child_stream_with_grouping_router, ConcurrentCursorFactory,
InMemoryMessageRepository, EpochValueConcurrentStreamStateConverter,
ConcurrentPerPartitionCursor, StreamSlicerPartitionGenerator,
DeclarativePartitionFactory, FinalStateCursor, DefaultStream).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 10dee9d5-7b20-429d-b1e5-9753ac5032d1
📒 Files selected for processing (12)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.pyairbyte_cdk/sources/declarative/concurrent_declarative_source.pyairbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/streams/concurrent/abstract_stream.pyairbyte_cdk/sources/streams/concurrent/adapters.pyairbyte_cdk/sources/streams/concurrent/default_stream.pyairbyte_cdk/sources/streams/http/http_client.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.pyunit_tests/sources/declarative/test_concurrent_declarative_source.pyunit_tests/sources/streams/concurrent/test_concurrent_read_processor.pyunit_tests/sources/streams/http/test_http_client.py
| # Build stream_name -> group_name mapping from the resolved manifest | ||
| stream_name_to_group: Dict[str, str] = {} | ||
| for group_name, group_config in stream_groups.items(): | ||
| for stream_ref in group_config.get("streams", []): | ||
| if isinstance(stream_ref, dict): | ||
| stream_name = stream_ref.get("name", "") | ||
| if stream_name: | ||
| stream_name_to_group[stream_name] = group_name |
There was a problem hiding this comment.
Could we fail fast on invalid stream_groups membership, wdyt?
Lines 446-453 let the last group win for duplicate entries, and Lines 475-490 silently skip unknown or unsupported targets. That makes a typo or double-assignment look like a runtime blocking bug instead of a clear manifest error.
🛠️ Possible guardrails
stream_name_to_group: Dict[str, str] = {}
for group_name, group_config in stream_groups.items():
for stream_ref in group_config.get("streams", []):
if isinstance(stream_ref, dict):
stream_name = stream_ref.get("name", "")
if stream_name:
+ previous_group = stream_name_to_group.get(stream_name)
+ if previous_group and previous_group != group_name:
+ raise AirbyteTracedException(
+ message=f"Stream '{stream_name}' is assigned to multiple stream groups.",
+ internal_message=(
+ f"Stream '{stream_name}' appears in both "
+ f"'{previous_group}' and '{group_name}'."
+ ),
+ failure_type=FailureType.config_error,
+ )
stream_name_to_group[stream_name] = group_name
# Validate no stream shares a group with any of its ancestor streams
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
+ invalid_targets = [
+ name
+ for name in stream_name_to_group
+ if name not in stream_name_to_instance
+ or not isinstance(stream_name_to_instance[name], DefaultStream)
+ ]
+ if invalid_targets:
+ raise AirbyteTracedException(
+ message="stream_groups references an unknown or unsupported stream.",
+ internal_message=(
+ "stream_groups entries must target created DefaultStream instances. "
+ f"Got: {sorted(invalid_targets)}"
+ ),
+ failure_type=FailureType.config_error,
+ )Also applies to: 475-490
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py` around
lines 446 - 453, The current build of stream_name_to_group from stream_groups
silently allows duplicate stream entries (last group wins) and later processing
silently skips unknown/unsupported targets; change this to fail-fast: when
iterating stream_groups in the block that builds stream_name_to_group, detect if
a stream name already exists and raise a clear manifest/config error (include
the duplicate stream name and both group names) instead of overwriting;
additionally, in the downstream target-processing logic (the code that checks
targets and currently skips unknown/unsupported targets), replace silent skips
with validation that raises a manifest/config error when a target refers to a
stream not present in stream_name_to_group or when a target type is unsupported,
so typos and double assignments surface immediately (use the symbols
stream_groups, stream_name_to_group and the target-processing function/method
where unknown targets are handled to locate the changes).
| def get_partition_router(self) -> PartitionRouter | None: | ||
| """Return the partition router for this stream, or None if not available.""" | ||
| if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator): | ||
| return None | ||
| stream_slicer = self._stream_partition_generator._stream_slicer | ||
| if not isinstance(stream_slicer, ConcurrentPerPartitionCursor): | ||
| return None | ||
| return stream_slicer._partition_router |
There was a problem hiding this comment.
Could get_partition_router() unwrap test-read decorators too, wdyt?
StreamSlicerPartitionGenerator wraps the slicer when slice_limit is enabled, so the isinstance(..., ConcurrentPerPartitionCursor) check at Line 120 returns False for those streams even though the underlying cursor still has a partition router. Any caller using this helper for parent discovery or deadlock validation will silently get None in that mode.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte_cdk/sources/streams/concurrent/default_stream.py` around lines 115 -
122, get_partition_router currently returns None when the slicer is wrapped
(e.g., by StreamSlicerPartitionGenerator/test-read decorators) because it only
checks isinstance(stream_slicer, ConcurrentPerPartitionCursor); change it to
unwrap decorator layers: after obtaining stream_slicer from
self._stream_partition_generator._stream_slicer, loop while the object exposes
an inner slicer (e.g., has attribute _stream_slicer or _wrapped) and set
stream_slicer = stream_slicer._stream_slicer (or stream_slicer._wrapped) until
you reach the underlying instance, then check isinstance(...,
ConcurrentPerPartitionCursor) and return its _partition_router; this preserves
existing behavior for unwrapped slicers and recovers the partition router when
decorators are present.
| _DEFAULT_CONNECT_TIMEOUT: int = 30 | ||
| _DEFAULT_READ_TIMEOUT: int = 300 |
There was a problem hiding this comment.
Could we keep an override path for these new default timeouts before landing this, wdyt?
This now applies (30, 300) to every caller that omits timeout. That includes the common HttpStream._fetch_next_page() path in airbyte_cdk/sources/streams/http/http.py:505-540 and the declarative HttpRequester.send_request() path in airbyte_cdk/sources/declarative/requesters/http_requester.py:446-490, which currently only passes {"stream": ...}. For declarative sources, that means a new hard 5-minute read cap with no manifest-level escape hatch, so long-running exports/long-poll endpoints would start failing with no connector-side workaround. Could we thread an optional timeout through the declarative requester, or add an explicit opt-out in the same change, wdyt?
Also applies to: 593-601
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte_cdk/sources/streams/http/http_client.py` around lines 88 - 89, The
new module-level defaults _DEFAULT_CONNECT_TIMEOUT and _DEFAULT_READ_TIMEOUT are
being applied universally and can unexpectedly cap declarative and HttpStream
flows; update the code to allow callers to opt out or override: add an optional
timeout parameter (or allow timeout=None) to HttpRequester.send_request and
thread it through to where requests are made (and to HttpStream._fetch_next_page
usage), ensure the declarative requester (requesters/http_requester.py) honors a
passed-in timeout and does not substitute the (30,300) default when None is
provided, and add documentation/comments and tests to cover both explicit
override and opt-out behavior so long-running/long-poll endpoints can bypass the
hard 5-minute read cap.
| def test_block_simultaneous_read_from_stream_groups(): | ||
| """Test that factory-created streams default to empty block_simultaneous_read. | ||
|
|
||
| The factory no longer handles stream_groups — that's done by | ||
| ConcurrentDeclarativeSource._apply_stream_groups after stream creation. | ||
| This test verifies the factory creates streams without group info. | ||
| """ | ||
| content = """ | ||
| definitions: | ||
| parent_stream: | ||
| type: DeclarativeStream | ||
| name: "parent" | ||
| primary_key: "id" | ||
| retriever: | ||
| type: SimpleRetriever | ||
| requester: | ||
| type: HttpRequester | ||
| url_base: "https://api.example.com" | ||
| path: "/parent" | ||
| http_method: "GET" | ||
| authenticator: | ||
| type: BearerAuthenticator | ||
| api_token: "{{ config['api_key'] }}" | ||
| record_selector: | ||
| type: RecordSelector | ||
| extractor: | ||
| type: DpathExtractor | ||
| field_path: [] | ||
| schema_loader: | ||
| type: InlineSchemaLoader | ||
| schema: | ||
| type: object | ||
| properties: | ||
| id: | ||
| type: string | ||
| """ | ||
|
|
||
| config = {"api_key": "test_key"} | ||
|
|
||
| parsed_manifest = YamlDeclarativeSource._parse(content) | ||
| resolved_manifest = resolver.preprocess_manifest(parsed_manifest) | ||
|
|
||
| factory = ModelToComponentFactory() | ||
|
|
||
| parent_manifest = transformer.propagate_types_and_parameters( | ||
| "", resolved_manifest["definitions"]["parent_stream"], {} | ||
| ) | ||
| parent_stream: DefaultStream = factory.create_component( | ||
| model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config | ||
| ) | ||
|
|
||
| assert isinstance(parent_stream, DefaultStream) | ||
| assert parent_stream.name == "parent" | ||
| assert parent_stream.block_simultaneous_read == "" | ||
|
|
There was a problem hiding this comment.
Could this test include an actual stream_groups entry, wdyt?
Right now the fixture at Line 5291 never defines stream_groups, so this only proves the default is "". If the factory accidentally starts consuming top-level groups again, this test would still pass.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py`
around lines 5284 - 5338, Update the
test_block_simultaneous_read_from_stream_groups to include a top-level
stream_groups entry in the YAML fixture so the factory is exercised with group
data present; keep the rest of the fixture the same and still assert that
factory.create_component (ModelToComponentFactory.create_component invoked with
DeclarativeStreamModel) returns a DefaultStream with name "parent" and
block_simultaneous_read == "" — this ensures the factory does not consume
top-level stream_groups while still verifying the default behavior.
feat: Add block_simultaneous_read with stream_groups + default HTTP timeout
Summary
This PR combines two changes into a single branch against
main(replaces closed PRs #870, #938, #939):1.
block_simultaneous_readwith top-levelstream_groups(originally PR #870 by Anatolii Yatsuk (@tolik0))Adds a mechanism to prevent concurrent reads of streams that share an API endpoint or have parent-child dependencies. Key changes:
stream_groupsconfig that maps streams to named groupsConcurrentReadProcessor.start_next_partition_generator()— streams are moved to the end of the queue if their group or a parent's group is already active_active_stream_namesand_active_groupsdictsis_done()that raiseAirbyteTracedExceptionif the partition queue or active groups are non-empty when all streams report done_collect_all_parent_stream_names()that unwrapsGroupingPartitionRouter2. Default HTTP request timeout
Adds
_DEFAULT_CONNECT_TIMEOUT = 30and_DEFAULT_READ_TIMEOUT = 300toHttpClient. When no explicit timeout is inrequest_kwargs, the default(30, 300)tuple is injected before callingsession.send(). This prevents indefinite hangs when a server stalls mid-response (root cause of a stuck Intercom sync at 2.21M records).ConnectTimeout/ReadTimeoutare already inTRANSIENT_EXCEPTIONS, so timeouts trigger automatic retries.Review & Testing Checklist for Human
start_next_partition_generator()cannot deadlock: Themax_attemptsbound prevents infinite loops within a single call, but verify that across multiple calls (triggered byon_partition_generation_completedandon_partition_complete_sentinel), deferred streams will eventually be retried and started. Pay special attention to the parent deactivation logic — if a parent is deactivated too early or too late, children could be permanently blocked or started too soon._collect_all_parent_stream_names()handles all partition router types: The method checks forDefaultStreamwithget_partition_router(), then handlesSubstreamPartitionRouterandGroupingPartitionRouterwrapping aSubstreamPartitionRouter. If a new router type is introduced, parent discovery would silently return empty — confirm this is acceptable.StreamGroupandBlockSimultaneousSyncsActiondefinitions.timeoutinrequest_kwargsif needed.is_done()method raisesAirbyteTracedExceptionif streams remain in the partition queue or active groups are non-empty. Verify these checks correctly detect stuck-sync scenarios without false positives.Recommended Test Plan
stream_groupsto verify children wait for parents to complete partition generation before startingpoetry run poe pytest)Notes
block_simultaneous_readfeature was originally authored by Anatolii Yatsuk (@tolik0) (PR feat: Add block_simultaneous_read with top-level stream_groups interface #870) and rebased here per user request to combine with the timeout fixSummary by CodeRabbit
New Features
Tests