-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[python] Improve the consumer_manager API and support in file_store_table #7415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
ba6155f
15f04bc
b50f04e
627eb53
b90133a
f7d83ad
7163aca
af0ce14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -591,6 +591,132 @@ to the appropriate rollback logic. | |
| | f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) | | ||
| | lower <= f <= upper | PredicateBuilder.between(f, lower, upper) | | ||
|
|
||
| ## Consumer Management | ||
|
|
||
| Consumer management allows you to track consumption progress, prevent snapshot expiration, and resume from breakpoints. | ||
|
|
||
| ### Create ConsumerManager | ||
|
|
||
| ```python | ||
| from pypaimon import CatalogFactory | ||
| from pypaimon.consumer.consumer_manager import ConsumerManager | ||
|
|
||
| # Get table and file_io | ||
| catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'}) | ||
| table = catalog.get_table('database_name.table_name') | ||
| file_io = table.file_io() | ||
|
|
||
| # Create consumer manager | ||
| manager = ConsumerManager(file_io, table.location()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ConsumerManager should be obtained from table?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, it has been changed. |
||
|
|
||
| # For branch support | ||
| manager_branch = ConsumerManager(file_io, table.location(), branch='my_branch') | ||
| ``` | ||
|
|
||
| ### Get Consumer | ||
|
|
||
| Retrieve a consumer by its ID: | ||
|
|
||
| ```python | ||
| from pypaimon.consumer.consumer import Consumer | ||
|
|
||
| consumer = manager.consumer('consumer_id') | ||
| if consumer: | ||
| print(f"Next snapshot: {consumer.next_snapshot}") | ||
| else: | ||
| print("Consumer not found") | ||
| ``` | ||
|
|
||
| ### Reset Consumer | ||
|
|
||
| Create or reset a consumer with a new snapshot ID: | ||
|
|
||
| ```python | ||
| # Reset consumer to snapshot 10 | ||
| manager.reset_consumer('consumer_id', Consumer(next_snapshot=10)) | ||
| ``` | ||
|
|
||
| ### Delete Consumer | ||
|
|
||
| Delete a consumer by its ID: | ||
|
|
||
| ```python | ||
| manager.delete_consumer('consumer_id') | ||
| ``` | ||
|
|
||
| ### List Consumers | ||
|
|
||
| Get all consumers with their next snapshot IDs: | ||
|
|
||
| ```python | ||
| consumers = manager.consumers() | ||
| for consumer_id, next_snapshot in consumers.items(): | ||
| print(f"Consumer {consumer_id}: next snapshot {next_snapshot}") | ||
| ``` | ||
|
|
||
| ### List All Consumer IDs | ||
|
|
||
| List all consumer IDs: | ||
|
|
||
| ```python | ||
| consumer_ids = manager.list_all_ids() | ||
| for consumer_id in consumer_ids: | ||
| print(consumer_id) | ||
| ``` | ||
|
|
||
| ### Get Minimum Next Snapshot | ||
|
|
||
| Get the minimum next snapshot across all consumers: | ||
|
|
||
| ```python | ||
| min_snapshot = manager.min_next_snapshot() | ||
| if min_snapshot: | ||
| print(f"Minimum next snapshot: {min_snapshot}") | ||
| ``` | ||
|
|
||
| ### Expire Consumers | ||
|
|
||
| Expire consumers modified before a given datetime: | ||
|
|
||
| ```python | ||
| from datetime import datetime, timedelta | ||
|
|
||
| # Expire consumers older than 1 day | ||
| expire_time = datetime.now() - timedelta(days=1) | ||
| manager.expire(expire_time) | ||
| ``` | ||
|
|
||
| ### Clear Consumers | ||
|
|
||
| Clear consumers matching regular expression patterns: | ||
|
|
||
| ```python | ||
| # Clear all consumers starting with "test_" | ||
| manager.clear_consumers('test_.*') | ||
|
|
||
| # Clear all consumers except those starting with "prod_" | ||
| manager.clear_consumers( | ||
| '.*', | ||
| 'prod_.*' | ||
| ) | ||
| ``` | ||
|
|
||
| ### Branch Support | ||
|
|
||
| ConsumerManager supports multiple branches: | ||
|
|
||
| ```python | ||
| # Main branch (default) | ||
| manager_main = ConsumerManager(file_io, table.location()) | ||
|
|
||
| # Custom branch | ||
| manager_branch = ConsumerManager(file_io, table.location(), branch='feature_branch') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. branch_manager = manager.with_branch('xxx')
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
|
||
| # Each branch maintains its own consumers | ||
| print(manager_main.consumers()) # Consumers on main branch | ||
| print(manager_branch.consumers()) # Consumers on feature branch | ||
| ``` | ||
|
|
||
| ## Supported Features | ||
|
|
||
| The following shows the supported features of Python Paimon compared to Java Paimon: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to import?