diff --git a/docs/content/pypaimon/python-api.md b/docs/content/pypaimon/python-api.md index 3e9041ed5a03..51eded3d7354 100644 --- a/docs/content/pypaimon/python-api.md +++ b/docs/content/pypaimon/python-api.md @@ -591,6 +591,124 @@ to the appropriate rollback logic. | f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) | | lower <= f <= upper | PredicateBuilder.between(f, lower, upper) | +## Consumer Management + +Consumer management allows you to track consumption progress, prevent snapshot expiration, and resume from breakpoints. + +### Create ConsumerManager + +```python +from pypaimon import CatalogFactory + +# Get table and file_io +catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'}) +table = catalog.get_table('database_name.table_name') +file_io = table.file_io() + +# Create consumer manager +manager = table.consumer_manager() +``` + +### Get Consumer + +Retrieve a consumer by its ID: + +```python +from pypaimon.consumer.consumer import Consumer + +consumer = manager.consumer('consumer_id') +if consumer: + print(f"Next snapshot: {consumer.next_snapshot}") +else: + print("Consumer not found") +``` + +### Reset Consumer + +Create or reset a consumer with a new snapshot ID: + +```python +# Reset consumer to snapshot 10 +manager.reset_consumer('consumer_id', Consumer(next_snapshot=10)) +``` + +### Delete Consumer + +Delete a consumer by its ID: + +```python +manager.delete_consumer('consumer_id') +``` + +### List Consumers + +Get all consumers with their next snapshot IDs: + +```python +consumers = manager.consumers() +for consumer_id, next_snapshot in consumers.items(): + print(f"Consumer {consumer_id}: next snapshot {next_snapshot}") +``` + +### List All Consumer IDs + +List all consumer IDs: + +```python +consumer_ids = manager.list_all_ids() +for consumer_id in consumer_ids: + print(consumer_id) +``` + +### Get Minimum Next Snapshot + +Get the minimum next snapshot across all consumers: + +```python +min_snapshot = manager.min_next_snapshot() +if min_snapshot: + print(f"Minimum next snapshot: {min_snapshot}") +``` + +### Expire Consumers + +Expire consumers modified before a given datetime: + +```python +from datetime import datetime, timedelta + +# Expire consumers older than 1 day +expire_time = datetime.now() - timedelta(days=1) +manager.expire(expire_time) +``` + +### Clear Consumers + +Clear consumers matching regular expression patterns: + +```python +# Clear all consumers starting with "test_" +manager.clear_consumers('test_.*') + +# Clear all consumers except those starting with "prod_" +manager.clear_consumers( + '.*', + 'prod_.*' +) +``` + +### Branch Support + +ConsumerManager supports multiple branches: + +```python +# Custom branch +branch_manager = manager.with_branch('feature_branch') + +# Each branch maintains its own consumers +print(branch_manager.consumers()) # Consumers on feature branch +``` + ## Supported Features The following shows the supported features of Python Paimon compared to Java Paimon: diff --git a/paimon-python/pypaimon/consumer/__init__.py b/paimon-python/pypaimon/consumer/__init__.py index df4788f89404..1151e52f2a4b 100644 --- a/paimon-python/pypaimon/consumer/__init__.py +++ b/paimon-python/pypaimon/consumer/__init__.py @@ -16,3 +16,21 @@ # limitations under the License. ################################################################################ """Consumer management for tracking streaming read progress.""" + +from pypaimon.consumer.consumer import Consumer +from pypaimon.consumer.consumer_manager import ( + DEFAULT_MAIN_BRANCH, + ConsumerManager, + _branch_path, + _is_main_branch, + _normalize_branch, +) + +__all__ = [ + 'Consumer', + 'ConsumerManager', + 'DEFAULT_MAIN_BRANCH', + '_branch_path', + '_is_main_branch', + '_normalize_branch', +] diff --git a/paimon-python/pypaimon/consumer/consumer.py b/paimon-python/pypaimon/consumer/consumer.py index 8cc913f0c650..58cf32d520d0 100644 --- a/paimon-python/pypaimon/consumer/consumer.py +++ b/paimon-python/pypaimon/consumer/consumer.py @@ -18,7 +18,9 @@ """Consumer dataclass for streaming read progress.""" import json +import time from dataclasses import dataclass +from typing import Optional @dataclass @@ -36,3 +38,39 @@ def from_json(json_str: str) -> 'Consumer': """Deserialize from JSON.""" data = json.loads(json_str) return Consumer(next_snapshot=data["nextSnapshot"]) + + @classmethod + def from_path(cls, file_io, path: str) -> Optional['Consumer']: + """ + Load consumer from file path with retry mechanism. + + Args: + file_io: FileIO instance for reading files + path: Path to consumer file + + Returns: + Consumer instance if file exists, None otherwise + + Raises: + RuntimeError: If retry fails after 10 attempts + """ + retry_number = 0 + exception = None + while retry_number < 10: + try: + content = file_io.read_file_utf8(path) + except FileNotFoundError: + return None + except Exception as e: + exception = e + time.sleep(0.2) + retry_number += 1 + continue + try: + return cls.from_json(content) + except Exception as e: + exception = e + time.sleep(0.2) + retry_number += 1 + continue + raise RuntimeError(f"Retry fail after 10 times: {exception}") diff --git a/paimon-python/pypaimon/consumer/consumer_manager.py b/paimon-python/pypaimon/consumer/consumer_manager.py index 4edec70d18ff..74a1c1d88eb9 100644 --- a/paimon-python/pypaimon/consumer/consumer_manager.py +++ b/paimon-python/pypaimon/consumer/consumer_manager.py @@ -17,19 +17,45 @@ ################################################################################ """ConsumerManager for persisting streaming read progress.""" -from typing import Optional +import re +from datetime import datetime +from typing import Dict, List, Optional from pypaimon.consumer.consumer import Consumer +DEFAULT_MAIN_BRANCH = "main" +BRANCH_PREFIX = "branch-" + + +def _branch_path(table_path: str, branch: str) -> str: + """Return the path string of a branch.""" + if _is_main_branch(branch): + return table_path + return f"{table_path}/branch/{BRANCH_PREFIX}{branch}" + + +def _is_main_branch(branch: str) -> bool: + """Check if branch is main branch.""" + return branch == DEFAULT_MAIN_BRANCH + + +def _normalize_branch(branch: Optional[str]) -> str: + """Normalize branch name, use DEFAULT_MAIN_BRANCH if None or empty.""" + if not branch or not branch.strip(): + return DEFAULT_MAIN_BRANCH + return branch + class ConsumerManager: """Manages consumer state stored at {table_path}/consumer/consumer-{id}.""" CONSUMER_PREFIX = "consumer-" - def __init__(self, file_io, table_path: str): + def __init__(self, file_io, table_path: str, + branch: Optional[str] = None): self._file_io = file_io self._table_path = table_path + self._branch = _normalize_branch(branch) @staticmethod def _validate_consumer_id(consumer_id: str) -> None: @@ -45,22 +71,35 @@ def _validate_consumer_id(consumer_id: str) -> None: f"consumer_id cannot be a relative path component: {consumer_id}" ) + def _consumer_directory(self) -> str: + """Return the path to consumer directory.""" + branch_table_path = _branch_path(self._table_path, self._branch) + return f"{branch_table_path}/consumer" + def _consumer_path(self, consumer_id: str) -> str: """Return the path to a consumer file.""" self._validate_consumer_id(consumer_id) - return ( - f"{self._table_path}/consumer/" - f"{self.CONSUMER_PREFIX}{consumer_id}" - ) + consumer_dir = self._consumer_directory() + return f"{consumer_dir}/{self.CONSUMER_PREFIX}{consumer_id}" + + def _list_consumer_ids(self) -> List[str]: + """List all consumer IDs.""" + consumer_dir = self._consumer_directory() + if not self._file_io.exists(consumer_dir): + return [] + + consumer_ids = [] + for status in self._file_io.list_status(consumer_dir): + if status.base_name and status.base_name.startswith( + self.CONSUMER_PREFIX): + consumer_id = status.base_name[len(self.CONSUMER_PREFIX):] + consumer_ids.append(consumer_id) + return consumer_ids def consumer(self, consumer_id: str) -> Optional[Consumer]: """Get consumer state, or None if not found.""" path = self._consumer_path(consumer_id) - if not self._file_io.exists(path): - return None - - json_str = self._file_io.read_file_utf8(path) - return Consumer.from_json(json_str) + return Consumer.from_path(self._file_io, path) def reset_consumer(self, consumer_id: str, consumer: Consumer) -> None: """Write or update consumer state.""" @@ -71,3 +110,86 @@ def delete_consumer(self, consumer_id: str) -> None: """Delete a consumer.""" path = self._consumer_path(consumer_id) self._file_io.delete_quietly(path) + + def min_next_snapshot(self) -> Optional[int]: + """Get minimum next snapshot among all consumers.""" + consumer_ids = self._list_consumer_ids() + if not consumer_ids: + return None + + min_snapshot = None + for consumer_id in consumer_ids: + consumer = self.consumer(consumer_id) + if consumer: + if min_snapshot is None or consumer.next_snapshot < min_snapshot: + min_snapshot = consumer.next_snapshot + return min_snapshot + + def expire(self, expire_datetime: datetime) -> None: + """Delete consumers with modification time before expire_datetime.""" + consumer_dir = self._consumer_directory() + if not self._file_io.exists(consumer_dir): + return + + expire_timestamp = expire_datetime.timestamp() + for status in self._file_io.list_status(consumer_dir): + if status.base_name and status.base_name.startswith( + self.CONSUMER_PREFIX): + consumer_path = f"{consumer_dir}/{status.base_name}" + try: + if status.mtime and status.mtime < expire_timestamp: + self._file_io.delete_quietly(consumer_path) + except Exception: + pass + + def list_all_ids(self) -> List[str]: + """List all consumer IDs.""" + return self._list_consumer_ids() + + def consumers(self) -> Dict[str, int]: + """Get all consumers as a dict mapping consumer_id to next_snapshot.""" + consumer_ids = self._list_consumer_ids() + consumers = {} + for consumer_id in consumer_ids: + consumer = self.consumer(consumer_id) + if consumer: + consumers[consumer_id] = consumer.next_snapshot + return consumers + + def clear_consumers( + self, including_pattern: str, excluding_pattern: Optional[str] = None + ) -> None: + """ + Clear consumers matching inclusion pattern but not exclusion pattern. + + Args: + including_pattern: Regex pattern for consumers to delete + excluding_pattern: Optional regex pattern for consumers to keep + """ + consumer_ids = self._list_consumer_ids() + including_re = re.compile(including_pattern) + excluding_re = ( + re.compile(excluding_pattern) if excluding_pattern else None + ) + + for consumer_id in consumer_ids: + should_delete = including_re.match(consumer_id) + if excluding_re: + should_delete = should_delete and not excluding_re.match( + consumer_id + ) + + if should_delete: + self.delete_consumer(consumer_id) + + def with_branch(self, branch: Optional[str]) -> 'ConsumerManager': + """ + Create a new ConsumerManager instance with the specified branch. + + Args: + branch: Branch name, or None for main branch + + Returns: + A new ConsumerManager instance with the specified branch + """ + return ConsumerManager(self._file_io, self._table_path, branch) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 5fd46248d0a7..7eaa1715d454 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -19,21 +19,21 @@ from typing import List, Optional from pypaimon.catalog.catalog_environment import CatalogEnvironment -from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.common.identifier import Identifier +from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.options.options import Options from pypaimon.read.read_builder import ReadBuilder from pypaimon.schema.schema_manager import SchemaManager from pypaimon.schema.table_schema import TableSchema from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.table import Table -from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor, FixedBucketRowKeyExtractor, PostponeBucketRowKeyExtractor, RowKeyExtractor, UnawareBucketRowKeyExtractor) +from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder class FileStoreTable(Table): @@ -84,6 +84,11 @@ def current_branch(self) -> str: """Get the current branch name from options.""" return self.options.branch() + def consumer_manager(self): + """Get the consumer manager for this table.""" + from pypaimon.consumer.consumer_manager import ConsumerManager + return ConsumerManager(self.file_io, self.table_path, self.current_branch()) + def snapshot_manager(self): """Get the snapshot manager for this table.""" from pypaimon.snapshot.snapshot_manager import SnapshotManager diff --git a/paimon-python/pypaimon/tests/consumer_manager_test.py b/paimon-python/pypaimon/tests/consumer_manager_test.py new file mode 100644 index 000000000000..6caeb57708f3 --- /dev/null +++ b/paimon-python/pypaimon/tests/consumer_manager_test.py @@ -0,0 +1,315 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +################################################################################ +import os +import tempfile +import time +import unittest +from datetime import datetime + +from pypaimon.consumer.consumer import Consumer +from pypaimon.consumer.consumer_manager import ConsumerManager +from pypaimon.filesystem.local_file_io import LocalFileIO + + +class ConsumerManagerTest(unittest.TestCase): + """Test cases for ConsumerManager.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = tempfile.mkdtemp() + self.file_io = LocalFileIO(self.temp_dir) + self.manager = ConsumerManager(self.file_io, self.temp_dir) + self.consumer_manager_branch = ConsumerManager(self.file_io, self.temp_dir, "branch1") + + def tearDown(self): + """Clean up test environment.""" + import shutil + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + def test_retry(self): + """Test retry mechanism for corrupted consumer file.""" + # Create corrupted consumer file + consumer_dir = os.path.join(self.temp_dir, "consumer") + os.makedirs(consumer_dir, exist_ok=True) + consumer_file = os.path.join(consumer_dir, "consumer-id1") + with open(consumer_file, 'w') as f: + f.write("invalid json content") + + # Should raise RuntimeError after retries + with self.assertRaises(RuntimeError) as context: + self.manager.consumer("id1") + self.assertIn("Retry fail after 10 times", str(context.exception)) + + def test_basic_operations(self): + """Test basic consumer operations.""" + # Test non-existent consumer + consumer = self.manager.consumer("id1") + self.assertIsNone(consumer) + + # Test min next snapshot when no consumers + min_snapshot = self.manager.min_next_snapshot() + self.assertIsNone(min_snapshot) + + # Reset consumer + self.manager.reset_consumer("id1", Consumer(5)) + consumer = self.manager.consumer("id1") + self.assertIsNotNone(consumer) + self.assertEqual(consumer.next_snapshot, 5) + + # Reset another consumer + self.manager.reset_consumer("id2", Consumer(8)) + consumer = self.manager.consumer("id2") + self.assertIsNotNone(consumer) + self.assertEqual(consumer.next_snapshot, 8) + + # Test min next snapshot + min_snapshot = self.manager.min_next_snapshot() + self.assertEqual(min_snapshot, 5) + + def test_branch_operations(self): + """Test consumer operations on different branches.""" + # Test non-existent consumer on branch + consumer = self.consumer_manager_branch.consumer("id1") + self.assertIsNone(consumer) + + # Test min next snapshot when no consumers on branch + min_snapshot = self.consumer_manager_branch.min_next_snapshot() + self.assertIsNone(min_snapshot) + + # Reset consumer on branch + self.consumer_manager_branch.reset_consumer("id1", Consumer(5)) + consumer = self.consumer_manager_branch.consumer("id1") + self.assertIsNotNone(consumer) + self.assertEqual(consumer.next_snapshot, 5) + + # Reset another consumer on branch + self.consumer_manager_branch.reset_consumer("id2", Consumer(8)) + consumer = self.consumer_manager_branch.consumer("id2") + self.assertIsNotNone(consumer) + self.assertEqual(consumer.next_snapshot, 8) + + # Test min next snapshot on branch + min_snapshot = self.consumer_manager_branch.min_next_snapshot() + self.assertEqual(min_snapshot, 5) + + def test_expire(self): + """Test consumer expiration.""" + # Create consumers with different timestamps + self.manager.reset_consumer("id1", Consumer(1)) + time.sleep(1) + expire_datetime = datetime.now() + time.sleep(1) + self.manager.reset_consumer("id2", Consumer(2)) + + # Check expire + self.manager.expire(expire_datetime) + consumer1 = self.manager.consumer("id1") + self.assertIsNone(consumer1) + consumer2 = self.manager.consumer("id2") + self.assertIsNotNone(consumer2) + self.assertEqual(consumer2.next_snapshot, 2) + + # Check last modification + expire_datetime = datetime.now() + time.sleep(1) + self.manager.reset_consumer("id2", Consumer(3)) + self.manager.expire(expire_datetime) + consumer2 = self.manager.consumer("id2") + self.assertIsNotNone(consumer2) + self.assertEqual(consumer2.next_snapshot, 3) + + def test_expire_branch(self): + """Test consumer expiration on branch.""" + # Create consumers on branch with different timestamps + self.consumer_manager_branch.reset_consumer("id3", Consumer(1)) + time.sleep(1) + expire_datetime = datetime.now() + time.sleep(1) + self.consumer_manager_branch.reset_consumer("id4", Consumer(2)) + + # Check expire on branch + self.consumer_manager_branch.expire(expire_datetime) + consumer3 = self.consumer_manager_branch.consumer("id3") + self.assertIsNone(consumer3) + consumer4 = self.consumer_manager_branch.consumer("id4") + self.assertIsNotNone(consumer4) + self.assertEqual(consumer4.next_snapshot, 2) + + # Check last modification on branch + expire_datetime = datetime.now() + time.sleep(1) + self.consumer_manager_branch.reset_consumer("id4", Consumer(3)) + self.consumer_manager_branch.expire(expire_datetime) + consumer4 = self.consumer_manager_branch.consumer("id4") + self.assertIsNotNone(consumer4) + self.assertEqual(consumer4.next_snapshot, 3) + + def test_read_consumer(self): + """Test reading consumer from different branches.""" + # Create consumer on main branch + self.manager.reset_consumer("id1", Consumer(5)) + consumer = self.manager.consumer("id1") + self.assertIsNotNone(consumer) + self.assertEqual(consumer.next_snapshot, 5) + + # Create consumer on branch + self.consumer_manager_branch.reset_consumer("id2", Consumer(5)) + consumer = self.consumer_manager_branch.consumer("id2") + self.assertIsNotNone(consumer) + self.assertEqual(consumer.next_snapshot, 5) + + # Verify id2 doesn't exist on main branch + consumer = self.manager.consumer("id2") + self.assertIsNone(consumer) + + def test_list_all_ids(self): + """Test listing all consumer IDs.""" + # Initially empty + ids = self.manager.list_all_ids() + self.assertEqual(len(ids), 0) + + # Add consumers + self.manager.reset_consumer("id1", Consumer(1)) + self.manager.reset_consumer("id2", Consumer(2)) + self.manager.reset_consumer("id3", Consumer(3)) + + # List IDs + ids = self.manager.list_all_ids() + self.assertEqual(len(ids), 3) + self.assertIn("id1", ids) + self.assertIn("id2", ids) + self.assertIn("id3", ids) + + def test_consumers(self): + """Test getting all consumers.""" + # Initially empty + consumers = self.manager.consumers() + self.assertEqual(len(consumers), 0) + + # Add consumers + self.manager.reset_consumer("id1", Consumer(1)) + self.manager.reset_consumer("id2", Consumer(2)) + self.manager.reset_consumer("id3", Consumer(3)) + + # Get all consumers + consumers = self.manager.consumers() + self.assertEqual(len(consumers), 3) + self.assertEqual(consumers["id1"], 1) + self.assertEqual(consumers["id2"], 2) + self.assertEqual(consumers["id3"], 3) + + def test_delete_consumer(self): + """Test deleting consumer.""" + # Create consumer + self.manager.reset_consumer("id1", Consumer(5)) + consumer = self.manager.consumer("id1") + self.assertIsNotNone(consumer) + + # Delete consumer + self.manager.delete_consumer("id1") + consumer = self.manager.consumer("id1") + self.assertIsNone(consumer) + + def test_clear_consumers(self): + """Test clearing consumers with patterns.""" + # Add multiple consumers + self.manager.reset_consumer("test-id1", Consumer(1)) + self.manager.reset_consumer("test-id2", Consumer(2)) + self.manager.reset_consumer("prod-id1", Consumer(3)) + self.manager.reset_consumer("prod-id2", Consumer(4)) + + # Clear test consumers + self.manager.clear_consumers("test-.*") + + # Verify test consumers are deleted + self.assertIsNone(self.manager.consumer("test-id1")) + self.assertIsNone(self.manager.consumer("test-id2")) + + # Verify prod consumers remain + self.assertIsNotNone(self.manager.consumer("prod-id1")) + self.assertIsNotNone(self.manager.consumer("prod-id2")) + + def test_clear_consumers_with_exclusion(self): + """Test clearing consumers with inclusion and exclusion patterns.""" + # Add multiple consumers + self.manager.reset_consumer("test-id1", Consumer(1)) + self.manager.reset_consumer("test-id2", Consumer(2)) + self.manager.reset_consumer("test-backup", Consumer(3)) + + # Clear test consumers but exclude backup + self.manager.clear_consumers("test-.*", "test-backup") + + # Verify test-id1 and test-id2 are deleted + self.assertIsNone(self.manager.consumer("test-id1")) + self.assertIsNone(self.manager.consumer("test-id2")) + + # Verify test-backup remains + self.assertIsNotNone(self.manager.consumer("test-backup")) + + def test_with_branch(self): + """Test with_branch method.""" + # Create consumer on main branch + self.manager.reset_consumer("main_consumer", Consumer(10)) + main_consumer = self.manager.consumer("main_consumer") + self.assertIsNotNone(main_consumer) + self.assertEqual(main_consumer.next_snapshot, 10) + + # Create consumer manager for a different branch using with_branch + branch_manager = self.manager.with_branch("feature_branch") + self.assertEqual(branch_manager._branch, "feature_branch") + + # Verify main branch consumer doesn't exist on new branch + branch_consumer = branch_manager.consumer("main_consumer") + self.assertIsNone(branch_consumer) + + # Create consumer on new branch + branch_manager.reset_consumer("branch_consumer", Consumer(20)) + branch_consumer = branch_manager.consumer("branch_consumer") + self.assertIsNotNone(branch_consumer) + self.assertEqual(branch_consumer.next_snapshot, 20) + + # Verify original manager still operates on main branch + self.assertEqual(self.manager._branch, "main") + main_consumer = self.manager.consumer("main_consumer") + self.assertIsNotNone(main_consumer) + self.assertEqual(main_consumer.next_snapshot, 10) + + # Verify branch consumer doesn't exist on main branch + main_branch_consumer = self.manager.consumer("branch_consumer") + self.assertIsNone(main_branch_consumer) + + def test_with_branch_main(self): + """Test with_branch with None returns main branch.""" + branch_manager = self.manager.with_branch(None) + self.assertEqual(branch_manager._branch, "main") + + def test_with_branch_empty(self): + """Test with_branch with empty string returns main branch.""" + branch_manager = self.manager.with_branch("") + self.assertEqual(branch_manager._branch, "main") + + def test_with_branch_explicit_main(self): + """Test with_branch with explicit 'main' branch.""" + branch_manager = self.manager.with_branch("main") + self.assertEqual(branch_manager._branch, "main") + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/table/file_store_table_test.py b/paimon-python/pypaimon/tests/table/file_store_table_test.py index 2a6413f207e1..04edf1e19051 100644 --- a/paimon-python/pypaimon/tests/table/file_store_table_test.py +++ b/paimon-python/pypaimon/tests/table/file_store_table_test.py @@ -82,3 +82,53 @@ def test_copy_raises_error_when_changing_bucket(self): self.table.copy(new_options) self.assertIn("Cannot change bucket number", str(context.exception)) + + def test_consumer_manager(self): + """Test that FileStoreTable has consumer_manager method.""" + # Get consumer_manager + consumer_manager = self.table.consumer_manager() + + # Verify consumer_manager type + from pypaimon.consumer.consumer_manager import ConsumerManager + self.assertIsInstance(consumer_manager, ConsumerManager) + + # Verify consumer_manager has correct branch + from pypaimon.consumer.consumer_manager import DEFAULT_MAIN_BRANCH + self.assertEqual(self.table.current_branch(), DEFAULT_MAIN_BRANCH) + + # Test basic consumer operations through table.consumer_manager() + from pypaimon.consumer.consumer import Consumer + consumer_manager.reset_consumer("test_consumer", Consumer(next_snapshot=5)) + consumer = consumer_manager.consumer("test_consumer") + self.assertIsNotNone(consumer) + self.assertEqual(consumer.next_snapshot, 5) + + # Test min_next_snapshot + min_snapshot = consumer_manager.min_next_snapshot() + self.assertEqual(min_snapshot, 5) + + def test_consumer_manager_with_branch(self): + """Test consumer_manager with branch option.""" + # Create table with branch option + branch_name = "feature_branch" + schema = Schema.from_pyarrow_schema( + self.pa_schema, + partition_keys=['dt'], + options={ + CoreOptions.BUCKET.key(): "2", + "branch": branch_name + } + ) + self.catalog.create_table('default.test_branch_table', schema, False) + branch_table = self.catalog.get_table('default.test_branch_table') + + # Get consumer_manager and verify it has correct branch + branch_consumer_manager = branch_table.consumer_manager() + self.assertEqual(branch_table.current_branch(), branch_name) + + # Test consumer operations on branch + from pypaimon.consumer.consumer import Consumer + branch_consumer_manager.reset_consumer("branch_consumer", Consumer(next_snapshot=10)) + consumer = branch_consumer_manager.consumer("branch_consumer") + self.assertIsNotNone(consumer) + self.assertEqual(consumer.next_snapshot, 10)