feat: Add block_simultaneous_read with top-level stream_groups interface#870
feat: Add block_simultaneous_read with top-level stream_groups interface#870Anatolii Yatsuk (tolik0) wants to merge 28 commits intomainfrom
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/concurrent-source/add-block_simultaneous_read#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/concurrent-source/add-block_simultaneous_readHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
/autofix
|
|
/prerelease
|
|
/prerelease
|
|
/prerelease
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a block_simultaneous_read feature to the Python CDK that prevents concurrent execution of streams sharing the same resource (API endpoint, session, or rate limit pool). The feature uses string-based group identifiers where streams with matching non-empty group names will not run concurrently, addressing issues like duplicate API calls when streams function as both standalone and parent streams.
Changes:
- Added
block_simultaneous_readproperty to stream interfaces and schema definitions with empty string as default (backward compatible) - Implemented blocking logic in
ConcurrentReadProcessorthat defers streams when their group or parent's group is active - Added comprehensive test coverage for various blocking scenarios including parent-child relationships and multi-level hierarchies
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
test_concurrent_read_processor.py |
Added comprehensive test suite covering all blocking scenarios |
test_model_to_component_factory.py |
Added integration test verifying manifest-to-stream property flow |
default_stream.py |
Added block_simultaneous_read property to DefaultStream |
adapters.py |
Added property adapter for legacy stream compatibility |
abstract_stream.py |
Added abstract property definition with documentation |
model_to_component_factory.py |
Integrated property from manifest to stream construction |
declarative_component_schema.py |
Generated schema with new property definition |
declarative_component_schema.yaml |
Added schema definition with comprehensive documentation |
concurrent_read_processor.py |
Implemented core blocking logic with group tracking and deferral |
Comments suppressed due to low confidence (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml:1
- The description change for
use_cacheappears unrelated to theblock_simultaneous_readfeature. This change should be separated into its own PR or have an explanation for why it's included in this feature PR.
"$schema": http://json-schema.org/draft-07/schema#
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Outdated
Show resolved
Hide resolved
Co-Authored-By: unknown <>
- Factory now owns the stream_groups resolution via set_stream_groups(manifest) - ConcurrentDeclarativeSource just calls factory.set_stream_groups(manifest) - Removed _build_stream_name_to_group from ConcurrentDeclarativeSource - Updated tests to use factory's _build_stream_name_to_group directly Co-Authored-By: unknown <>
…of factory - Removed _build_stream_name_to_group, set_stream_groups, _stream_name_to_group from factory - Factory no longer knows about stream_groups at all - Added _apply_stream_groups to ConcurrentDeclarativeSource: creates streams first, then sets block_simultaneous_read on matching DefaultStream instances - Added block_simultaneous_read setter on DefaultStream - Replaced mock-based tests with parametrized tests using real DefaultStream instances Co-Authored-By: unknown <>
|
Brian Lai (@brianjlai) Anatolii Yatsuk (@tolik0) I'm not sure who is working on this. I see commits being made with no authors. However, flagging that this is now being swarmed here. I can try to test these changes if they're ready. Please let me know. |
Replace hasattr chain in ConcurrentReadProcessor._collect_all_parent_stream_names with DefaultStream.get_partition_router() that safely traverses the internal partition_generator -> stream_slicer -> partition_router chain using isinstance checks. Co-Authored-By: unknown <>
_apply_stream_groups now checks that no stream shares a group with any of its parent streams (via get_partition_router). Raises ValueError at config time if a deadlock-causing configuration is detected. Co-Authored-By: unknown <>
… done Adds a safety check in is_done() that raises AirbyteTracedException (system_error) if streams remain in the partition generation queue after all streams are marked done. Also moves inline imports to module level and updates test mocks to use DefaultStream with get_partition_router(). Co-Authored-By: unknown <>
…d concurrent_declarative_source.py Co-Authored-By: unknown <>
…ct parent streams Co-Authored-By: unknown <>
…artition_router() Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
…ps check, and get_partition_router Co-Authored-By: unknown <>
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Dismissed
Show dismissed
Hide dismissed
…rents Co-Authored-By: unknown <>
Brian Lai (brianjlai)
left a comment
There was a problem hiding this comment.
I mentioned a few things and one potential interesting change to how we present the StreamGroup component within the ConcurrentSource component. Not blocking and ultimately not something that must be done, but a thought I had.
From a functional side I think we're all set. But again, I think we need this to be a dev CDK version first so we can roll it out to Intercom safely w/o having this go out to all connectors during the automatic version bump that goes out.
| description: A description of the connector. It will be presented on the Source documentation page. | ||
| additionalProperties: false | ||
| definitions: | ||
| StreamGroup: |
There was a problem hiding this comment.
nit: can we move this so it's alphabetized instead of just at the top of definitions
There was a problem hiding this comment.
Fixed
| "$ref": "#/definitions/ConcurrencyLevel" | ||
| api_budget: | ||
| "$ref": "#/definitions/HTTPAPIBudget" | ||
| stream_groups: |
There was a problem hiding this comment.
an interesting idea I had was instead of needing to define a separate stream_groups field on the DeclarativeSource component, what if we were to change it so that the streams list can take in an additional type in the anyOf for "$ref": "#/definitions/StreamGroup". And so therefore StreamGroup components can just be defined in there. And since right now stream_groups is a map, we'd have to adjust the StreamGroup component to also add a name field since we won't have a key if we remove this stream_groups mapping
It does make how we implement _apply_stream_groups a little more complicated because we need to check the isinstance for StreamGroup, but it also feels like a natural place to place the StreamGroup since a StreamGroup is just a list of grouped streams. But this feels like it might be a little bit of a cleaner interface if we can make it work.
Anatolii Yatsuk (@tolik0) what do you think about this design?
There was a problem hiding this comment.
The issue with this approach is that it limits flexibility if we add more actions later. We wouldn’t be able to apply two different actions to the same stream. For example, defining two groups with different actions that include overlapping streams.
…hema definitions Co-Authored-By: unknown <>
…er reading Co-Authored-By: unknown <>
|
/prerelease
|
|
Closing this PR per Alfredo Garcia (@agarctfi)'s request. The changes have been rebased into a new PR against |
Summary
Adds a
block_simultaneous_readfeature to prevent multiple streams from running concurrently when they share the same resource. The feature is configured via a top-levelstream_groupsstructure in the manifest (rather than per-stream properties).Interface
stream_groupsis a top-level manifest property (alongsidestreams,definitions,check, etc.)crm_objects), a list of stream$refreferences, and anactionBlockSimultaneousSyncsActionis the only action type for nowImplementation
Schema (
declarative_component_schema.yaml): Removed per-streamblock_simultaneous_readproperty fromDeclarativeStream. Added top-levelstream_groupswithStreamGroupandBlockSimultaneousSyncsActiondefinitions.Pydantic models (
declarative_component_schema.py): AddedBlockSimultaneousSyncsAction,StreamGroupclasses. Addedstream_groups: Optional[Dict[str, StreamGroup]]toDeclarativeSource1/DeclarativeSource2.ConcurrentDeclarativeSource._apply_stream_groups(): Resolves stream groups from actual stream instances after stream creation. Validates that no stream shares a group with any of its parent streams (deadlock prevention). Setsblock_simultaneous_readon matchingDefaultStreaminstances.DefaultStream.get_partition_router(): New helper method that safely traverses thepartition_generator → stream_slicer → partition_routerchain usingisinstancechecks, replacing thehasattrchains inConcurrentReadProcessor.ConcurrentReadProcessor(core blocking logic): Uses group-based deferral/retry with parent-child awareness. Addedis_done()safety check that raisesAirbyteTracedExceptionif streams remain in the partition generation queue after all streams are marked done.Blocking Behavior
First use case: source-intercom — prevents duplicate concurrent requests to the companies endpoint.
Resolves: https://github.com/airbytehq/oncall/issues/8346
Updates since last revision
get_partition_router()helper onDefaultStream. Replaces messyhasattrchains inConcurrentReadProcessor._collect_all_parent_stream_names()with a clean method that usesisinstancechecks to traverseStreamSlicerPartitionGenerator → ConcurrentPerPartitionCursor → partition_router._apply_stream_groups(). RaisesValueErrorat config time if a stream and any of its parent streams are in the same blocking group.is_done()safety check. RaisesAirbyteTracedException(system_error) if_stream_instances_to_start_partition_generationis not empty after all streams are marked done — catches stuck-stream bugs at runtime.ConcurrentPerPartitionCursor,StreamSlicerPartitionGenerator,PartitionRouter,SubstreamPartitionRouter) are now at the top of files per Python coding standards.Review & Testing Checklist for Human
ValueErrorat config time with a clear message.get_partition_router()handles all stream types: Test with bothDefaultStream(withStreamSlicerPartitionGenerator) and legacyDeclarativeStreampaths. Confirm it returnsNonefor streams without partition routers.is_done()check doesn't mask real issues: If the safety check triggers, investigate the root cause (why streams remained in the queue) rather than just fixing the symptom.stream_groupsconfig format to confirm end-to-end blocking behavior and verify no deadlocks occur.ConcurrentPerPartitionCursor,StreamSlicerPartitionGenerator,SubstreamPartitionRouter) should not cause circular dependencies in production connectors.Recommended test plan:
stream_groupsreferencing 2+ streams via$refConcurrentDeclarativeSourceand verify streams in the same group are read sequentiallyNotes
stream_groupscontinue to work unchanged.ConcurrentReadProcessorstill uses string-based group identifiers, so the blocking logic is unchanged.test_read_with_concurrent_and_synchronous_streams_with_concurrent_statefails with SQLite locking issues (pre-existing, not caused by these changes).Summary by CodeRabbit
New Features
get_partition_router()onDefaultStreamfor safe partition router access.is_done()to detect stuck streams in partition generation queue.Bug Fixes
Tests