Skip to content

feat: Add block_simultaneous_read with top-level stream_groups interface#870

Open
Anatolii Yatsuk (tolik0) wants to merge 28 commits intomainfrom
tolik0/concurrent-source/add-block_simultaneous_read
Open

feat: Add block_simultaneous_read with top-level stream_groups interface#870
Anatolii Yatsuk (tolik0) wants to merge 28 commits intomainfrom
tolik0/concurrent-source/add-block_simultaneous_read

Conversation

@tolik0
Copy link
Contributor

@tolik0 Anatolii Yatsuk (tolik0) commented Dec 30, 2025

Summary

Adds a block_simultaneous_read feature to prevent multiple streams from running concurrently when they share the same resource. The feature is configured via a top-level stream_groups structure in the manifest (rather than per-stream properties).

Interface

stream_groups:
  crm_objects:
    streams:
      - "#/definitions/deals_property_history_stream"
      - "#/definitions/companies_property_history_stream"
    action: BlockSimultaneousSyncsAction
  • stream_groups is a top-level manifest property (alongside streams, definitions, check, etc.)
  • Each group has a name (e.g. crm_objects), a list of stream $ref references, and an action
  • BlockSimultaneousSyncsAction is the only action type for now
  • Streams in the same group will not run concurrently; streams in different groups run freely
  • Important: Child streams that depend on parent streams should not be placed in the same group as their parents — doing so would cause a deadlock (the child needs to read the parent during partition generation). A validation check enforces this at config time.

Implementation

  1. Schema (declarative_component_schema.yaml): Removed per-stream block_simultaneous_read property from DeclarativeStream. Added top-level stream_groups with StreamGroup and BlockSimultaneousSyncsAction definitions.

  2. Pydantic models (declarative_component_schema.py): Added BlockSimultaneousSyncsAction, StreamGroup classes. Added stream_groups: Optional[Dict[str, StreamGroup]] to DeclarativeSource1/DeclarativeSource2.

  3. ConcurrentDeclarativeSource._apply_stream_groups(): Resolves stream groups from actual stream instances after stream creation. Validates that no stream shares a group with any of its parent streams (deadlock prevention). Sets block_simultaneous_read on matching DefaultStream instances.

  4. DefaultStream.get_partition_router(): New helper method that safely traverses the partition_generator → stream_slicer → partition_router chain using isinstance checks, replacing the hasattr chains in ConcurrentReadProcessor.

  5. ConcurrentReadProcessor (core blocking logic): Uses group-based deferral/retry with parent-child awareness. Added is_done() safety check that raises AirbyteTracedException if streams remain in the partition generation queue after all streams are marked done.

Blocking Behavior

  • When a stream is about to start, the CDK checks if another stream in the same group (or any parent stream with a blocking group) is active
  • Blocked streams are deferred to the end of the queue and retried when the blocker completes
  • Streams in different groups run concurrently without interference

First use case: source-intercom — prevents duplicate concurrent requests to the companies endpoint.
Resolves: https://github.com/airbytehq/oncall/issues/8346

Updates since last revision

  • get_partition_router() helper on DefaultStream. Replaces messy hasattr chains in ConcurrentReadProcessor._collect_all_parent_stream_names() with a clean method that uses isinstance checks to traverse StreamSlicerPartitionGenerator → ConcurrentPerPartitionCursor → partition_router.
  • Deadlock validation in _apply_stream_groups(). Raises ValueError at config time if a stream and any of its parent streams are in the same blocking group.
  • is_done() safety check. Raises AirbyteTracedException (system_error) if _stream_instances_to_start_partition_generation is not empty after all streams are marked done — catches stuck-stream bugs at runtime.
  • Moved inline imports to module level. All imports (ConcurrentPerPartitionCursor, StreamSlicerPartitionGenerator, PartitionRouter, SubstreamPartitionRouter) are now at the top of files per Python coding standards.

Review & Testing Checklist for Human

  • Verify deadlock validation works correctly: Test with a manifest where a child stream and its parent are both in the same group — should raise ValueError at config time with a clear message.
  • Verify get_partition_router() handles all stream types: Test with both DefaultStream (with StreamSlicerPartitionGenerator) and legacy DeclarativeStream paths. Confirm it returns None for streams without partition routers.
  • Verify is_done() check doesn't mask real issues: If the safety check triggers, investigate the root cause (why streams remained in the queue) rather than just fixing the symptom.
  • Test with source-intercom: Run integration test with the new stream_groups config format to confirm end-to-end blocking behavior and verify no deadlocks occur.
  • Verify no circular import issues: The imports moved to module level (ConcurrentPerPartitionCursor, StreamSlicerPartitionGenerator, SubstreamPartitionRouter) should not cause circular dependencies in production connectors.

Recommended test plan:

  1. Create a test manifest with stream_groups referencing 2+ streams via $ref
  2. Run ConcurrentDeclarativeSource and verify streams in the same group are read sequentially
  3. Check logs for deferral messages and group activation/deactivation
  4. Test deadlock validation by placing a child stream and its parent in the same group (should fail at config time)
  5. Verify CI passes (especially schema validation and unit tests)
  6. Test with source-intercom to ensure no regressions and that blocking works as expected

Notes

  • Breaking changes: None. This is a new interface; existing connectors without stream_groups continue to work unchanged.
  • Backward compatibility: The internal ConcurrentReadProcessor still uses string-based group identifiers, so the blocking logic is unchanged.
  • Pre-existing test failure: test_read_with_concurrent_and_synchronous_streams_with_concurrent_state fails with SQLite locking issues (pre-existing, not caused by these changes).
  • Session URL: https://app.devin.ai/sessions/5184df5176d54d7c91ddcb9635c28dda
  • Requested by: Anatolii Yatsuk (gl_anatolii.yatsuk)

Summary by CodeRabbit

  • New Features

    • Manifest-configurable stream groups to prevent concurrent reads for grouped streams; group setting flows to stream components.
    • Active-stream tracking with deferral/retry logic to coordinate partition generation and start-next behavior.
    • Helper method get_partition_router() on DefaultStream for safe partition router access.
    • Deadlock validation to prevent parent-child streams from sharing the same blocking group.
    • Safety check in is_done() to detect stuck streams in partition generation queue.
  • Bug Fixes

    • Safer activation/deactivation and queue handling, avoiding infinite blocking and stuck/duplicated processing; improved logging for state transitions.
  • Tests

    • Extensive unit tests covering group blocking, parent/child interactions, deferral/retry semantics, queue ordering, and logging.

@github-actions github-actions bot added the enhancement New feature or request label Dec 30, 2025
@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/concurrent-source/add-block_simultaneous_read#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/concurrent-source/add-block_simultaneous_read

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Dec 30, 2025

PyTest Results (Fast)

3 899 tests  +3 477   3 887 ✅ +3 475   6m 53s ⏱️ + 5m 15s
    1 suites ±    0      12 💤 +    3 
    1 files   ±    0       0 ❌  -     1 

Results for commit fa6bdbe. ± Comparison against base commit 452acd1.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Dec 30, 2025

PyTest Results (Full)

3 902 tests  +79   3 890 ✅ +81   11m 24s ⏱️ +59s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌  -  2 

Results for commit fa6bdbe. ± Comparison against base commit 452acd1.

♻️ This comment has been updated with latest results.

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Jan 6, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

🟦 Job completed successfully (no changes).

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Jan 6, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/20754787469

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Jan 9, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/20859295403

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Jan 12, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/20918717015

@tolik0 Anatolii Yatsuk (tolik0) marked this pull request as ready for review January 12, 2026 12:06
Copilot AI review requested due to automatic review settings January 12, 2026 12:06
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a block_simultaneous_read feature to the Python CDK that prevents concurrent execution of streams sharing the same resource (API endpoint, session, or rate limit pool). The feature uses string-based group identifiers where streams with matching non-empty group names will not run concurrently, addressing issues like duplicate API calls when streams function as both standalone and parent streams.

Changes:

  • Added block_simultaneous_read property to stream interfaces and schema definitions with empty string as default (backward compatible)
  • Implemented blocking logic in ConcurrentReadProcessor that defers streams when their group or parent's group is active
  • Added comprehensive test coverage for various blocking scenarios including parent-child relationships and multi-level hierarchies

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
test_concurrent_read_processor.py Added comprehensive test suite covering all blocking scenarios
test_model_to_component_factory.py Added integration test verifying manifest-to-stream property flow
default_stream.py Added block_simultaneous_read property to DefaultStream
adapters.py Added property adapter for legacy stream compatibility
abstract_stream.py Added abstract property definition with documentation
model_to_component_factory.py Integrated property from manifest to stream construction
declarative_component_schema.py Generated schema with new property definition
declarative_component_schema.yaml Added schema definition with comprehensive documentation
concurrent_read_processor.py Implemented core blocking logic with group tracking and deferral
Comments suppressed due to low confidence (1)

airbyte_cdk/sources/declarative/declarative_component_schema.yaml:1

  • The description change for use_cache appears unrelated to the block_simultaneous_read feature. This change should be separated into its own PR or have an explanation for why it's included in this feature PR.
"$schema": http://json-schema.org/draft-07/schema#

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Co-Authored-By: unknown <>
- Factory now owns the stream_groups resolution via set_stream_groups(manifest)
- ConcurrentDeclarativeSource just calls factory.set_stream_groups(manifest)
- Removed _build_stream_name_to_group from ConcurrentDeclarativeSource
- Updated tests to use factory's _build_stream_name_to_group directly

Co-Authored-By: unknown <>
…of factory

- Removed _build_stream_name_to_group, set_stream_groups, _stream_name_to_group from factory
- Factory no longer knows about stream_groups at all
- Added _apply_stream_groups to ConcurrentDeclarativeSource: creates streams first, then sets block_simultaneous_read on matching DefaultStream instances
- Added block_simultaneous_read setter on DefaultStream
- Replaced mock-based tests with parametrized tests using real DefaultStream instances

Co-Authored-By: unknown <>
@agarctfi
Copy link
Contributor

Brian Lai (@brianjlai) Anatolii Yatsuk (@tolik0) I'm not sure who is working on this. I see commits being made with no authors. However, flagging that this is now being swarmed here.

I can try to test these changes if they're ready. Please let me know.

Anatolii Yatsuk (tolik0) and others added 4 commits March 4, 2026 18:40
Replace hasattr chain in ConcurrentReadProcessor._collect_all_parent_stream_names
with DefaultStream.get_partition_router() that safely traverses the internal
partition_generator -> stream_slicer -> partition_router chain using isinstance checks.

Co-Authored-By: unknown <>
_apply_stream_groups now checks that no stream shares a group with any
of its parent streams (via get_partition_router). Raises ValueError at
config time if a deadlock-causing configuration is detected.

Co-Authored-By: unknown <>
… done

Adds a safety check in is_done() that raises AirbyteTracedException
(system_error) if streams remain in the partition generation queue after
all streams are marked done. Also moves inline imports to module level
and updates test mocks to use DefaultStream with get_partition_router().

Co-Authored-By: unknown <>
…d concurrent_declarative_source.py

Co-Authored-By: unknown <>
…ct parent streams

Co-Authored-By: unknown <>
…artition_router()

Co-Authored-By: unknown <>
…ps check, and get_partition_router

Co-Authored-By: unknown <>
Copy link
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned a few things and one potential interesting change to how we present the StreamGroup component within the ConcurrentSource component. Not blocking and ultimately not something that must be done, but a thought I had.

From a functional side I think we're all set. But again, I think we need this to be a dev CDK version first so we can roll it out to Intercom safely w/o having this go out to all connectors during the automatic version bump that goes out.

description: A description of the connector. It will be presented on the Source documentation page.
additionalProperties: false
definitions:
StreamGroup:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we move this so it's alphabetized instead of just at the top of definitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
stream_groups:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an interesting idea I had was instead of needing to define a separate stream_groups field on the DeclarativeSource component, what if we were to change it so that the streams list can take in an additional type in the anyOf for "$ref": "#/definitions/StreamGroup". And so therefore StreamGroup components can just be defined in there. And since right now stream_groups is a map, we'd have to adjust the StreamGroup component to also add a name field since we won't have a key if we remove this stream_groups mapping

It does make how we implement _apply_stream_groups a little more complicated because we need to check the isinstance for StreamGroup, but it also feels like a natural place to place the StreamGroup since a StreamGroup is just a list of grouped streams. But this feels like it might be a little bit of a cleaner interface if we can make it work.

Anatolii Yatsuk (@tolik0) what do you think about this design?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with this approach is that it limits flexibility if we add more actions later. We wouldn’t be able to apply two different actions to the same stream. For example, defining two groups with different actions that include overlapping streams.

@tolik0
Copy link
Contributor Author

Anatolii Yatsuk (tolik0) commented Mar 5, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22724700845

@devin-ai-integration
Copy link
Contributor

Closing this PR per Alfredo Garcia (@agarctfi)'s request. The changes have been rebased into a new PR against main that also includes the HTTP timeout fix (from closed PR #938/#939).

@devin-ai-integration
Copy link
Contributor

Closing this PR — the block_simultaneous_read / stream_groups feature has been rebased and consolidated into #940, which also includes the HTTP timeout fix. #940 targets main directly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants