diff --git a/paimon-python/pypaimon/read/stream_read_builder.py b/paimon-python/pypaimon/read/stream_read_builder.py new file mode 100644 index 000000000000..db0c1a46fb2e --- /dev/null +++ b/paimon-python/pypaimon/read/stream_read_builder.py @@ -0,0 +1,142 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +""" +StreamReadBuilder for building streaming table scans and reads. + +This module provides a builder for configuring streaming reads from Paimon +tables, similar to ReadBuilder but for continuous streaming use cases. +""" + +from typing import Callable, List, Optional, Set + +from pypaimon.common.predicate import Predicate +from pypaimon.common.predicate_builder import PredicateBuilder +from pypaimon.read.streaming_table_scan import AsyncStreamingTableScan +from pypaimon.read.table_read import TableRead +from pypaimon.schema.data_types import DataField +from pypaimon.table.special_fields import SpecialFields + + +class StreamReadBuilder: + """ + Builder for streaming reads from Paimon tables. + + Usage: + stream_builder = table.new_stream_read_builder() + stream_builder.with_poll_interval_ms(500) + + scan = stream_builder.new_streaming_scan() + table_read = stream_builder.new_read() + + async for plan in scan.stream(): + arrow_table = table_read.to_arrow(plan.splits()) + process(arrow_table) + """ + + def __init__(self, table): + """Initialize the StreamReadBuilder.""" + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + self._predicate: Optional[Predicate] = None + self._projection: Optional[List[str]] = None + self._poll_interval_ms: int = 1000 + self._include_row_kind: bool = False + self._bucket_filter: Optional[Callable[[int], bool]] = None + + def with_filter(self, predicate: Predicate) -> 'StreamReadBuilder': + """Set a filter predicate for the streaming read.""" + self._predicate = predicate + return self + + def with_projection(self, projection: List[str]) -> 'StreamReadBuilder': + """Set column projection for the streaming read.""" + self._projection = projection + return self + + def with_poll_interval_ms(self, poll_interval_ms: int) -> 'StreamReadBuilder': + """Set the poll interval in ms for checking new snapshots (default: 1000).""" + self._poll_interval_ms = poll_interval_ms + return self + + def with_include_row_kind(self, include: bool = True) -> 'StreamReadBuilder': + """Include row kind column (_row_kind) in the output. + + When enabled, the output will include a _row_kind column as the first + column with values: +I (insert), -U (update before), +U (update after), + -D (delete). + """ + self._include_row_kind = include + return self + + def with_bucket_filter( + self, + bucket_filter: Callable[[int], bool] + ) -> 'StreamReadBuilder': + """Push bucket filter for parallel consumption. + + Example: + builder.with_bucket_filter(lambda b: b % 2 == 0) + builder.with_bucket_filter(lambda b: b < 4) + """ + self._bucket_filter = bucket_filter + return self + + def with_buckets(self, bucket_ids: List[int]) -> 'StreamReadBuilder': + """Convenience method to read only specific buckets. + + Example: + builder.with_buckets([0, 1, 2]) + builder.with_buckets([3, 4, 5]) + """ + bucket_set: Set[int] = set(bucket_ids) + return self.with_bucket_filter(lambda bucket: bucket in bucket_set) + + def new_streaming_scan(self) -> AsyncStreamingTableScan: + """Create a new AsyncStreamingTableScan with this builder's settings.""" + return AsyncStreamingTableScan( + table=self.table, + predicate=self._predicate, + poll_interval_ms=self._poll_interval_ms, + bucket_filter=self._bucket_filter, + ) + + def new_read(self) -> TableRead: + """Create a new TableRead with this builder's settings.""" + return TableRead( + table=self.table, + predicate=self._predicate, + read_type=self.read_type(), + include_row_kind=self._include_row_kind + ) + + def new_predicate_builder(self) -> PredicateBuilder: + """Create a PredicateBuilder for building filter predicates.""" + return PredicateBuilder(self.read_type()) + + def read_type(self) -> List[DataField]: + """Get the read schema fields, applying projection if set.""" + table_fields = self.table.fields + + if not self._projection: + return table_fields + else: + if self.table.options.row_tracking_enabled(): + table_fields = SpecialFields.row_type_with_row_tracking(table_fields) + field_map = {field.name: field for field in table_fields} + return [field_map[name] for name in self._projection if name in field_map] diff --git a/paimon-python/pypaimon/read/streaming_table_scan.py b/paimon-python/pypaimon/read/streaming_table_scan.py new file mode 100644 index 000000000000..95899fd9d358 --- /dev/null +++ b/paimon-python/pypaimon/read/streaming_table_scan.py @@ -0,0 +1,369 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +""" +AsyncStreamingTableScan for continuous streaming reads from Paimon tables. + +This module provides async-based streaming reads that continuously poll for +new snapshots and yield Plans as new data arrives. It is the Python equivalent +of Java's DataTableStreamScan. +""" + +import asyncio +import logging +import os +from concurrent.futures import Future, ThreadPoolExecutor +from typing import AsyncGenerator, Callable, Iterator, List, Optional + +from pypaimon.common.options.core_options import ChangelogProducer +from pypaimon.common.predicate import Predicate +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.read.plan import Plan +from pypaimon.read.scanner.append_table_split_generator import \ + AppendTableSplitGenerator +from pypaimon.read.scanner.changelog_follow_up_scanner import \ + ChangelogFollowUpScanner +from pypaimon.read.scanner.delta_follow_up_scanner import DeltaFollowUpScanner +from pypaimon.read.scanner.file_scanner import FileScanner +from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner +from pypaimon.read.scanner.incremental_diff_scanner import \ + IncrementalDiffScanner +from pypaimon.read.scanner.primary_key_table_split_generator import \ + PrimaryKeyTableSplitGenerator +from pypaimon.snapshot.snapshot import Snapshot +from pypaimon.snapshot.snapshot_manager import SnapshotManager + + +class AsyncStreamingTableScan: + """ + Async streaming table scan for continuous reads from Paimon tables. + + This class provides an async iterator that continuously polls for new + snapshots and yields Plans containing splits for new data. + + Usage: + scan = AsyncStreamingTableScan(table) + + async for plan in scan.stream(): + for split in plan.splits(): + # Process the data + pass + + For synchronous usage: + for plan in scan.stream_sync(): + process(plan) + """ + + def __init__( + self, + table, + predicate: Optional[Predicate] = None, + poll_interval_ms: int = 1000, + follow_up_scanner: Optional[FollowUpScanner] = None, + bucket_filter: Optional[Callable[[int], bool]] = None, + prefetch_enabled: bool = True, + diff_threshold: int = 10, + ): + """Initialize the streaming table scan.""" + self.table = table + self.predicate = predicate + self.poll_interval = poll_interval_ms / 1000.0 + + # Bucket filter for parallel consumption + self._bucket_filter = bucket_filter + + # Diff-based catch-up configuration + self._diff_threshold = diff_threshold + self._catch_up_in_progress = False + + # Prefetching configuration + self._prefetch_enabled = prefetch_enabled + self._prefetch_future: Optional[Future] = None + self._prefetch_snapshot_id: Optional[int] = None + self._lookahead_skips = 0 + self._prefetch_executor = ThreadPoolExecutor(max_workers=1) if prefetch_enabled else None + self._lookahead_size = 10 # How many snapshots to look ahead + + # Initialize managers + self._snapshot_manager = SnapshotManager(table) + self._manifest_list_manager = ManifestListManager(table) + self._manifest_file_manager = ManifestFileManager(table) + + # Scanner for determining which snapshots to read + # Auto-select based on changelog-producer if not explicitly provided + self.follow_up_scanner = follow_up_scanner or self._create_follow_up_scanner() + + # State tracking + self.next_snapshot_id: Optional[int] = None + + async def stream(self) -> AsyncGenerator[Plan, None]: + """Yield Plans as new snapshots appear. + + On first call, performs an initial full scan of the latest snapshot. + Subsequent iterations poll for new snapshots and yield delta Plans. + + Yields: + Plan objects containing splits for reading + """ + # Initial scan + if self.next_snapshot_id is None: + latest_snapshot = self._snapshot_manager.get_latest_snapshot() + if latest_snapshot: + self.next_snapshot_id = latest_snapshot.id + 1 + yield self._create_initial_plan(latest_snapshot) + + # Check for catch-up scenario: starting from earlier snapshot with large gap. + # This block only executes once per stream() call (before the while True loop). + # Handles --from snapshot:X with many snapshots to process. + if self._should_use_diff_catch_up(): + self._catch_up_in_progress = True + try: + latest_snapshot = self._snapshot_manager.get_latest_snapshot() + if latest_snapshot and self.next_snapshot_id: + catch_up_plan = self._create_catch_up_plan( + self.next_snapshot_id, + latest_snapshot + ) + self.next_snapshot_id = latest_snapshot.id + 1 + yield catch_up_plan + finally: + self._catch_up_in_progress = False + + # Follow-up polling loop with lookahead and optional prefetching + while True: + plan = None + snapshot_processed = False # Track if we processed (or skipped) a snapshot + + # Check if we have a prefetched result ready + prefetch_used = False + if self._prefetch_future is not None: + try: + # Wait for the prefetch thread to complete + # Returns (plan, next_id, skipped_count) tuple + prefetch_result = self._prefetch_future.result(timeout=30) + prefetch_used = True + + if prefetch_result is not None: + prefetch_plan, next_id, skipped_count = prefetch_result + self._lookahead_skips += skipped_count + self.next_snapshot_id = next_id + snapshot_processed = skipped_count > 0 or prefetch_plan is not None + + if prefetch_plan is not None: + plan = prefetch_plan + except Exception: + # Prefetch failed, fall back to synchronous + prefetch_used = False + finally: + self._prefetch_future = None + self._prefetch_snapshot_id = None + + # If prefetch wasn't available or failed, use lookahead to find next scannable + if not prefetch_used: + # Use batch lookahead to find the next scannable snapshot + snapshot, next_id, skipped_count = self._snapshot_manager.find_next_scannable( + self.next_snapshot_id, + self.follow_up_scanner.should_scan, + lookahead_size=self._lookahead_size + ) + self._lookahead_skips += skipped_count + self.next_snapshot_id = next_id + + # Check if we found a scannable snapshot or skipped some + snapshot_processed = skipped_count > 0 or snapshot is not None + + if snapshot is not None: + plan = self._create_follow_up_plan(snapshot) + + if plan is not None: + # Start prefetching next scannable snapshot before yielding + if self._prefetch_enabled: + self._start_prefetch(self.next_snapshot_id) + yield plan + elif not snapshot_processed: + # No snapshot available yet, wait and poll again + await asyncio.sleep(self.poll_interval) + # If snapshots were processed but plan is None (all skipped), continue loop immediately + + def stream_sync(self) -> Iterator[Plan]: + """ + Synchronous wrapper for stream(). + + Provides a blocking iterator for use in non-async code. + + Yields: + Plan objects containing splits for reading + """ + loop = asyncio.new_event_loop() + try: + async_gen = self.stream() + while True: + try: + plan = loop.run_until_complete(async_gen.__anext__()) + yield plan + except StopAsyncIteration: + break + finally: + loop.close() + + def _start_prefetch(self, snapshot_id: int) -> None: + """Start prefetching the next scannable snapshot in a background thread.""" + if self._prefetch_future is not None or self._prefetch_executor is None: + return # Already prefetching or executor not available + + self._prefetch_snapshot_id = snapshot_id + # Submit to thread pool - this starts immediately, not when event loop runs + self._prefetch_future = self._prefetch_executor.submit( + self._fetch_plan_with_lookahead, + snapshot_id + ) + + def _fetch_plan_with_lookahead(self, start_id: int) -> Optional[tuple]: + """Find next scannable snapshot via lookahead and create a plan. Runs in thread pool.""" + try: + snapshot, next_id, skipped_count = self._snapshot_manager.find_next_scannable( + start_id, + self.follow_up_scanner.should_scan, + lookahead_size=self._lookahead_size + ) + + if snapshot is None: + return (None, next_id, skipped_count) + + plan = self._create_follow_up_plan(snapshot) + return (plan, next_id, skipped_count) + except Exception: + logging.exception("Prefetch failed for snapshot_id=%d; falling back to synchronous", start_id) + return None + + def _create_follow_up_plan(self, snapshot: Snapshot) -> Plan: + """Route to changelog or delta plan based on scanner type.""" + if isinstance(self.follow_up_scanner, ChangelogFollowUpScanner): + return self._create_changelog_plan(snapshot) + else: + return self._create_delta_plan(snapshot) + + def _create_follow_up_scanner(self) -> FollowUpScanner: + """Create the appropriate follow-up scanner based on changelog-producer option.""" + changelog_producer = self.table.options.changelog_producer() + if changelog_producer == ChangelogProducer.NONE: + return DeltaFollowUpScanner() + else: + # INPUT, FULL_COMPACTION, LOOKUP all use changelog scanner + return ChangelogFollowUpScanner() + + def _filter_entries_for_shard(self, entries: List) -> List: + """Filter manifest entries by bucket filter, if set.""" + if self._bucket_filter is not None: + return [e for e in entries if self._bucket_filter(e.bucket)] + return entries + + def _create_initial_plan(self, snapshot: Snapshot) -> Plan: + """Create a Plan for the initial full scan of the latest snapshot.""" + def all_manifests(): + return self._manifest_list_manager.read_all(snapshot) + + starting_scanner = FileScanner( + self.table, + all_manifests, + predicate=self.predicate, + limit=None + ) + return starting_scanner.scan() + + def _create_delta_plan(self, snapshot: Snapshot) -> Plan: + """Read new files from delta_manifest_list (changelog-producer=none).""" + manifest_files = self._manifest_list_manager.read_delta(snapshot) + return self._create_plan_from_manifests(manifest_files) + + def _create_changelog_plan(self, snapshot: Snapshot) -> Plan: + """Read from changelog_manifest_list (changelog-producer=input/full-compaction/lookup).""" + manifest_files = self._manifest_list_manager.read_changelog(snapshot) + return self._create_plan_from_manifests(manifest_files) + + def _create_plan_from_manifests(self, manifest_files: List) -> Plan: + """Create splits from manifest files, applying shard filtering.""" + if not manifest_files: + return Plan([]) + + # Use configurable parallelism from table options + max_workers = max(8, self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)) + + # Read manifest entries from manifest files + entries = self._manifest_file_manager.read_entries_parallel( + manifest_files, + manifest_entry_filter=None, + max_workers=max_workers + ) + + # Apply shard/bucket filtering for parallel consumption + entries = self._filter_entries_for_shard(entries) if entries else [] + if not entries: + return Plan([]) + + # Get split options from table + options = self.table.options + target_split_size = options.source_split_target_size() + open_file_cost = options.source_split_open_file_cost() + + # Create appropriate split generator based on table type + if self.table.is_primary_key_table: + split_generator = PrimaryKeyTableSplitGenerator( + self.table, + target_split_size, + open_file_cost, + deletion_files_map={} + ) + else: + split_generator = AppendTableSplitGenerator( + self.table, + target_split_size, + open_file_cost, + deletion_files_map={} + ) + + splits = split_generator.create_splits(entries) + return Plan(splits) + + def _should_use_diff_catch_up(self) -> bool: + """Check if diff-based catch-up should be used (large gap to latest).""" + if self._catch_up_in_progress: + return False + + if self.next_snapshot_id is None: + return False + + latest = self._snapshot_manager.get_latest_snapshot() + if latest is None: + return False + + gap = latest.id - self.next_snapshot_id + return gap > self._diff_threshold + + def _create_catch_up_plan(self, start_id: int, end_snapshot: Snapshot) -> Plan: + """Create a catch-up plan using diff-based scanning between start and end snapshots.""" + # Get start snapshot (one before where we want to start reading). + # If start_id is 0 or 1, fall back to a full scan of end_snapshot. + start_snapshot = None + if start_id > 1: + start_snapshot = self._snapshot_manager.get_snapshot_by_id(start_id - 1) + + if start_snapshot is None: + return self._create_initial_plan(end_snapshot) + + return IncrementalDiffScanner(self.table).scan(start_snapshot, end_snapshot) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 7eaa1715d454..b42309b3b399 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -24,6 +24,7 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.options.options import Options from pypaimon.read.read_builder import ReadBuilder +from pypaimon.read.stream_read_builder import StreamReadBuilder from pypaimon.schema.schema_manager import SchemaManager from pypaimon.schema.table_schema import TableSchema from pypaimon.table.bucket_mode import BucketMode @@ -107,12 +108,12 @@ def create_tag( ) -> None: """ Create a tag for a snapshot. - + Args: tag_name: Name for the tag snapshot_id: ID of the snapshot to tag. If None, uses the latest snapshot. ignore_if_exists: If True, don't raise error if tag already exists - + Raises: ValueError: If no snapshot exists or tag already exists (when ignore_if_exists=False) """ @@ -134,10 +135,10 @@ def create_tag( def delete_tag(self, tag_name: str) -> bool: """ Delete a tag. - + Args: tag_name: Name of the tag to delete - + Returns: True if tag was deleted, False if tag didn't exist """ @@ -330,6 +331,9 @@ def new_global_index_scan_builder(self) -> Optional['GlobalIndexScanBuilder']: index_file_handler=IndexFileHandler(table=self) ) + def new_stream_read_builder(self) -> 'StreamReadBuilder': + return StreamReadBuilder(self) + def new_batch_write_builder(self) -> BatchWriteBuilder: return BatchWriteBuilder(self) @@ -371,10 +375,10 @@ def copy(self, options: dict) -> 'FileStoreTable': def _try_time_travel(self, options: Options) -> Optional[TableSchema]: """ Try to resolve time travel options and return the corresponding schema. - + Supports the following time travel options: - scan.tag-name: Travel to a specific tag - + Returns: The TableSchema at the time travel point, or None if no time travel option is set. """ diff --git a/paimon-python/pypaimon/table/format/format_table.py b/paimon-python/pypaimon/table/format/format_table.py index 564bd086255d..8c7e82415ec9 100644 --- a/paimon-python/pypaimon/table/format/format_table.py +++ b/paimon-python/pypaimon/table/format/format_table.py @@ -93,8 +93,12 @@ def new_read_builder(self): return FormatReadBuilder(self) def new_batch_write_builder(self): - from pypaimon.table.format.format_batch_write_builder import FormatBatchWriteBuilder + from pypaimon.table.format.format_batch_write_builder import \ + FormatBatchWriteBuilder return FormatBatchWriteBuilder(self) + def new_stream_read_builder(self): + raise NotImplementedError("Format table does not support stream read.") + def new_stream_write_builder(self): raise NotImplementedError("Format table does not support stream write.") diff --git a/paimon-python/pypaimon/table/iceberg/iceberg_table.py b/paimon-python/pypaimon/table/iceberg/iceberg_table.py index ab2b8c9e03e4..137b7c862282 100644 --- a/paimon-python/pypaimon/table/iceberg/iceberg_table.py +++ b/paimon-python/pypaimon/table/iceberg/iceberg_table.py @@ -98,6 +98,11 @@ def new_batch_write_builder(self): "IcebergTable does not support batch write operation in paimon-python yet." ) + def new_stream_read_builder(self): + raise NotImplementedError( + "IcebergTable does not support stream read operation in paimon-python yet." + ) + def new_stream_write_builder(self): raise NotImplementedError( "IcebergTable does not support stream write operation in paimon-python yet." diff --git a/paimon-python/pypaimon/table/object/object_table.py b/paimon-python/pypaimon/table/object/object_table.py index 592835da4f09..f3dc44b7613d 100644 --- a/paimon-python/pypaimon/table/object/object_table.py +++ b/paimon-python/pypaimon/table/object/object_table.py @@ -94,6 +94,9 @@ def new_batch_write_builder(self): "ObjectTable is read-only and does not support batch write." ) + def new_stream_read_builder(self): + raise NotImplementedError("ObjectTable does not support stream read.") + def new_stream_write_builder(self): raise NotImplementedError( "ObjectTable is read-only and does not support stream write." diff --git a/paimon-python/pypaimon/table/table.py b/paimon-python/pypaimon/table/table.py index e20784f1fc9d..dfae23281065 100644 --- a/paimon-python/pypaimon/table/table.py +++ b/paimon-python/pypaimon/table/table.py @@ -19,6 +19,7 @@ from abc import ABC, abstractmethod from pypaimon.read.read_builder import ReadBuilder +from pypaimon.read.stream_read_builder import StreamReadBuilder from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder @@ -29,6 +30,10 @@ class Table(ABC): def new_read_builder(self) -> ReadBuilder: """Return a builder for building table scan and table read.""" + @abstractmethod + def new_stream_read_builder(self) -> StreamReadBuilder: + """Return a builder for building streaming table scan and read.""" + @abstractmethod def new_batch_write_builder(self) -> BatchWriteBuilder: """Returns a builder for building batch table write and table commit.""" diff --git a/paimon-python/pypaimon/tests/stream_read_builder_test.py b/paimon-python/pypaimon/tests/stream_read_builder_test.py new file mode 100644 index 000000000000..aa02e1d1d17f --- /dev/null +++ b/paimon-python/pypaimon/tests/stream_read_builder_test.py @@ -0,0 +1,120 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Tests for StreamReadBuilder.""" + +from unittest.mock import MagicMock + +import pytest + +from pypaimon.read.stream_read_builder import StreamReadBuilder +from pypaimon.read.streaming_table_scan import AsyncStreamingTableScan + + +class MockEntry: + """Mock manifest entry for testing bucket filtering.""" + + def __init__(self, bucket): + self.bucket = bucket + + +@pytest.fixture +def mock_table(): + """Create a mock table for unit tests.""" + table = MagicMock() + table.fields = [] + table.options.row_tracking_enabled.return_value = False + return table + + +@pytest.fixture +def builder(mock_table): + """Create a StreamReadBuilder with mock table.""" + return StreamReadBuilder(mock_table) + + +@pytest.fixture +def mock_scan_table(): + """Create mock table for AsyncStreamingTableScan.""" + table = MagicMock() + table.options.changelog_producer.return_value = MagicMock() + table.file_io = MagicMock() + table.table_path = "/tmp/test" + table.fields = [] + table.options.row_tracking_enabled.return_value = False + return table + + +class TestStreamReadBuilderValidation: + """Unit tests for StreamReadBuilder method validation.""" + + def test_with_bucket_filter_valid(self, builder): + """Test with_bucket_filter() accepts valid filter function.""" + filter_fn = lambda b: b % 2 == 0 + result = builder.with_bucket_filter(filter_fn) + assert result is builder + assert builder._bucket_filter is filter_fn + + @pytest.mark.parametrize("bucket_ids,expected_true,expected_false", [ + ([0, 2, 4], [0, 2, 4], [1, 3, 5]), + ([], [], [0, 1, 2]), + ([5], [5], [0, 1, 4, 6]), + ]) + def test_with_buckets(self, builder, bucket_ids, expected_true, expected_false): + """Test with_buckets() creates correct filter.""" + builder.with_buckets(bucket_ids) + for b in expected_true: + assert builder._bucket_filter(b), f"Bucket {b} should be included" + for b in expected_false: + assert not builder._bucket_filter(b), f"Bucket {b} should be excluded" + + def test_method_chaining(self, builder): + """Test method chaining works correctly.""" + result = (builder + .with_poll_interval_ms(500) + .with_bucket_filter(lambda b: b % 2 == 0) + .with_include_row_kind(True)) + assert result is builder + assert builder._poll_interval_ms == 500 + assert builder._bucket_filter is not None + assert builder._include_row_kind is True + + +class TestAsyncStreamingTableScanFiltering: + """Test AsyncStreamingTableScan._filter_entries_for_shard().""" + + def test_filter_with_bucket_filter(self, mock_scan_table): + """Test _filter_entries_for_shard with custom bucket filter.""" + scan = AsyncStreamingTableScan( + table=mock_scan_table, + bucket_filter=lambda b: b % 2 == 0 + ) + entries = [MockEntry(b) for b in range(8)] + filtered = scan._filter_entries_for_shard(entries) + assert [e.bucket for e in filtered] == [0, 2, 4, 6] + + def test_filter_no_filter_returns_all(self, mock_scan_table): + """Test _filter_entries_for_shard with no filter returns all entries.""" + scan = AsyncStreamingTableScan(table=mock_scan_table) + entries = [MockEntry(b) for b in range(8)] + filtered = scan._filter_entries_for_shard(entries) + assert [e.bucket for e in filtered] == list(range(8)) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/paimon-python/pypaimon/tests/streaming_table_scan_test.py b/paimon-python/pypaimon/tests/streaming_table_scan_test.py new file mode 100644 index 000000000000..f853603e41e5 --- /dev/null +++ b/paimon-python/pypaimon/tests/streaming_table_scan_test.py @@ -0,0 +1,493 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""Tests for AsyncStreamingTableScan.""" + +import asyncio +import unittest +from unittest.mock import Mock, patch + +from pypaimon.common.options.core_options import ChangelogProducer +from pypaimon.read.plan import Plan +from pypaimon.read.streaming_table_scan import AsyncStreamingTableScan +from pypaimon.snapshot.snapshot import Snapshot + + +def _create_mock_snapshot(snapshot_id: int, commit_kind: str = "APPEND"): + """Helper to create a mock snapshot.""" + snapshot = Mock(spec=Snapshot) + snapshot.id = snapshot_id + snapshot.commit_kind = commit_kind + snapshot.time_millis = 1000000 + snapshot_id + snapshot.base_manifest_list = f"manifest-list-{snapshot_id}" + snapshot.delta_manifest_list = f"delta-manifest-list-{snapshot_id}" + return snapshot + + +def _create_mock_table(latest_snapshot_id: int = 5): + """Helper to create a mock table.""" + table = Mock() + table.table_path = "/tmp/test_table" + table.is_primary_key_table = False + table.options = Mock() + table.options.source_split_target_size.return_value = 128 * 1024 * 1024 + table.options.source_split_open_file_cost.return_value = 4 * 1024 * 1024 + table.options.scan_manifest_parallelism.return_value = 8 + table.options.bucket.return_value = 1 + table.options.data_evolution_enabled.return_value = False + table.options.deletion_vectors_enabled.return_value = False + table.options.changelog_producer.return_value = ChangelogProducer.NONE + table.field_names = ['col1', 'col2'] + table.trimmed_primary_keys = [] + table.partition_keys = [] + table.file_io = Mock() + table.table_schema = Mock() + table.table_schema.id = 0 + table.table_schema.fields = [] + table.schema_manager = Mock() + table.schema_manager.get_schema.return_value = table.table_schema + + return table, latest_snapshot_id + + +class AsyncStreamingTableScanTest(unittest.TestCase): + """Tests for AsyncStreamingTableScan async streaming functionality.""" + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.FileScanner') + def test_initial_scan(self, MockStartingScanner, MockManifestListManager, MockSnapshotManager): + """Initial scan should yield a Plan and set next_snapshot_id to latest + 1.""" + table, _ = _create_mock_table(latest_snapshot_id=5) + + mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager.get_latest_snapshot.return_value = _create_mock_snapshot(5) + mock_snapshot_manager.get_snapshot_by_id.return_value = None + + MockStartingScanner.return_value.scan.return_value = Plan([]) + + scan = AsyncStreamingTableScan(table) + + async def get_first_plan(): + async for plan in scan.stream(): + return plan + + plan = asyncio.run(get_first_plan()) + + self.assertIsInstance(plan, Plan) + self.assertEqual(scan.next_snapshot_id, 6) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.FileScanner') + def test_stream_skips_non_append_commits(self, MockStartingScanner, MockManifestListManager, MockSnapshotManager): + """Stream should skip COMPACT/OVERWRITE commits.""" + table, _ = _create_mock_table(latest_snapshot_id=7) + + # Setup mocks + mock_snapshot_manager = MockSnapshotManager.return_value + + # Snapshots: 6 (COMPACT - skip), 7 (APPEND - scan) + snapshot_7 = _create_mock_snapshot(7, "APPEND") + + # find_next_scannable returns (snapshot, next_id, skipped_count) + # Start at 6, skip 1 (COMPACT), return snapshot 7, next_id=8 + mock_snapshot_manager.find_next_scannable.return_value = (snapshot_7, 8, 1) + mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} + # Mock get_latest_snapshot for diff catch-up check (gap=1, below threshold) + mock_snapshot_manager.get_latest_snapshot.return_value = snapshot_7 + + mock_manifest_list_manager = MockManifestListManager.return_value + mock_manifest_list_manager.read_delta.return_value = [] + + mock_starting_scanner = MockStartingScanner.return_value + mock_starting_scanner.read_manifest_entries.return_value = [] + + scan = AsyncStreamingTableScan(table) + scan.next_snapshot_id = 6 # Start from snapshot 6 + + async def get_plans(): + plans = [] + count = 0 + async for plan in scan.stream(): + plans.append(plan) + count += 1 + if count >= 1: # Get one plan (snapshot 7) + break + return plans + + asyncio.run(get_plans()) + + # Should have skipped snapshot 6 (COMPACT) and scanned 7 (APPEND) + self.assertEqual(scan.next_snapshot_id, 8) + # Verify lookahead skipped 1 snapshot + self.assertEqual(scan._lookahead_skips, 1) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.FileScanner') + def test_stream_sync_yields_plans(self, MockStartingScanner, MockManifestListManager, MockSnapshotManager): + """stream_sync() should provide a synchronous iterator.""" + table, _ = _create_mock_table(latest_snapshot_id=5) + + # Setup mocks + mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager.get_latest_snapshot.return_value = _create_mock_snapshot(5) + mock_snapshot_manager.get_snapshot_by_id.return_value = None + + mock_starting_scanner = MockStartingScanner.return_value + mock_starting_scanner.scan.return_value = Plan([]) + + scan = AsyncStreamingTableScan(table) + + # Get first plan synchronously + for plan in scan.stream_sync(): + self.assertIsInstance(plan, Plan) + break # Just get one + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + def test_poll_interval_configurable(self, MockManifestListManager, MockSnapshotManager): + """Poll interval should be configurable.""" + table, _ = _create_mock_table() + + scan = AsyncStreamingTableScan(table, poll_interval_ms=500) + + self.assertEqual(scan.poll_interval, 0.5) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.FileScanner') + def test_no_snapshot_waits_and_polls(self, MockStartingScanner, MockManifestListManager, MockSnapshotManager): + """When no new snapshot exists, should wait and poll again.""" + table, _ = _create_mock_table(latest_snapshot_id=5) + + mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} + + # No snapshot 6 exists yet - find_next_scannable returns (None, 6, 0) first, + # then on subsequent calls returns a snapshot + call_count = [0] + snapshot_6 = _create_mock_snapshot(6, "APPEND") + + def find_next_scannable(start_id, should_scan, lookahead_size=10, max_workers=4): + call_count[0] += 1 + # After 3 calls, snapshot 6 appears + if call_count[0] > 3: + return (snapshot_6, 7, 0) + # No snapshot yet - return (None, start_id, 0) to indicate no snapshot exists + return (None, start_id, 0) + + mock_snapshot_manager.find_next_scannable.side_effect = find_next_scannable + mock_snapshot_manager.get_latest_snapshot.return_value = None + + mock_manifest_list_manager = MockManifestListManager.return_value + mock_manifest_list_manager.read_delta.return_value = [] + + mock_starting_scanner = MockStartingScanner.return_value + mock_starting_scanner.read_manifest_entries.return_value = [] + + scan = AsyncStreamingTableScan(table, poll_interval_ms=10) + scan.next_snapshot_id = 6 + + async def get_plan_with_timeout(): + async for plan in scan.stream(): + return plan + + # Should eventually get a plan after polling + plan = asyncio.run(asyncio.wait_for(get_plan_with_timeout(), timeout=1.0)) + self.assertIsInstance(plan, Plan) + + +class StreamingPrefetchTest(unittest.TestCase): + """Tests for prefetching functionality in AsyncStreamingTableScan.""" + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + def test_prefetch_enabled_by_default(self, MockManifestFileManager, MockManifestListManager, MockSnapshotManager): + """Prefetching should be enabled by default.""" + table, _ = _create_mock_table() + scan = AsyncStreamingTableScan(table) + self.assertTrue(scan._prefetch_enabled) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + def test_prefetch_can_be_disabled(self, MockManifestFileManager, MockManifestListManager, MockSnapshotManager): + """Prefetching can be disabled via constructor parameter.""" + table, _ = _create_mock_table() + scan = AsyncStreamingTableScan(table, prefetch_enabled=False) + self.assertFalse(scan._prefetch_enabled) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + def test_prefetch_starts_after_yielding_plan( + self, + MockManifestFileManager, + MockManifestListManager, + MockSnapshotManager): + """After yielding a plan, prefetch for next snapshot should start.""" + table, _ = _create_mock_table(latest_snapshot_id=5) + + mock_snapshot_manager = MockSnapshotManager.return_value + mock_manifest_list_manager = MockManifestListManager.return_value + mock_manifest_file_manager = MockManifestFileManager.return_value + mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} + + # Snapshots 5, 6, 7 exist - find_next_scannable returns each one + snapshot_5 = _create_mock_snapshot(5, "APPEND") + snapshot_6 = _create_mock_snapshot(6, "APPEND") + snapshot_7 = _create_mock_snapshot(7, "APPEND") + + call_count = [0] + + def find_next_scannable(start_id, should_scan, lookahead_size=10, max_workers=4): + call_count[0] += 1 + if start_id == 5: + return (snapshot_5, 6, 0) + elif start_id == 6: + return (snapshot_6, 7, 0) + elif start_id == 7: + return (snapshot_7, 8, 0) + return (None, start_id, 0) + + mock_snapshot_manager.find_next_scannable.side_effect = find_next_scannable + mock_snapshot_manager.get_latest_snapshot.return_value = None + + mock_manifest_list_manager.read_delta.return_value = [] + mock_manifest_file_manager.read_entries_parallel.return_value = [] + + scan = AsyncStreamingTableScan(table, poll_interval_ms=10) + scan.next_snapshot_id = 5 + + async def get_two_plans(): + plans = [] + async for plan in scan.stream(): + plans.append(plan) + # After first plan, prefetch task should exist + if len(plans) == 1: + # Give prefetch a moment to start + await asyncio.sleep(0.01) + self.assertIsNotNone(scan._prefetch_future) + if len(plans) >= 2: + break + return plans + + plans = asyncio.run(get_two_plans()) + self.assertEqual(len(plans), 2) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + def test_prefetch_returns_same_data_as_sequential( + self, + MockManifestFileManager, + MockManifestListManager, + MockSnapshotManager): + """Prefetched plans should contain the same data as non-prefetched.""" + table, _ = _create_mock_table(latest_snapshot_id=5) + + mock_snapshot_manager = MockSnapshotManager.return_value + mock_manifest_list_manager = MockManifestListManager.return_value + mock_manifest_file_manager = MockManifestFileManager.return_value + mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} + + snapshot_5 = _create_mock_snapshot(5, "APPEND") + snapshot_6 = _create_mock_snapshot(6, "APPEND") + + def find_next_scannable(start_id, should_scan, lookahead_size=10, max_workers=4): + if start_id == 5: + return (snapshot_5, 6, 0) + elif start_id == 6: + return (snapshot_6, 7, 0) + return (None, start_id, 0) + + mock_snapshot_manager.find_next_scannable.side_effect = find_next_scannable + mock_snapshot_manager.get_latest_snapshot.return_value = None + + mock_manifest_list_manager.read_delta.return_value = [] + mock_manifest_file_manager.read_entries_parallel.return_value = [] + + # Test with prefetch enabled + scan_prefetch = AsyncStreamingTableScan(table, poll_interval_ms=10, prefetch_enabled=True) + scan_prefetch.next_snapshot_id = 5 + + # Test with prefetch disabled + scan_sequential = AsyncStreamingTableScan(table, poll_interval_ms=10, prefetch_enabled=False) + scan_sequential.next_snapshot_id = 5 + + async def get_plans(scan, count): + plans = [] + async for plan in scan.stream(): + plans.append(plan) + if len(plans) >= count: + break + return plans + + plans_prefetch = asyncio.run(get_plans(scan_prefetch, 2)) + plans_sequential = asyncio.run(get_plans(scan_sequential, 2)) + + # Both should get the same number of plans + self.assertEqual(len(plans_prefetch), len(plans_sequential)) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + def test_prefetch_handles_no_next_snapshot( + self, + MockManifestFileManager, + MockManifestListManager, + MockSnapshotManager): + """When no next snapshot exists, prefetch should return None gracefully.""" + table, _ = _create_mock_table(latest_snapshot_id=5) + + mock_snapshot_manager = MockSnapshotManager.return_value + mock_manifest_list_manager = MockManifestListManager.return_value + mock_manifest_file_manager = MockManifestFileManager.return_value + mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} + + snapshot_5 = _create_mock_snapshot(5, "APPEND") + + # Only snapshot 5 exists + def find_next_scannable(start_id, should_scan, lookahead_size=10, max_workers=4): + if start_id == 5: + return (snapshot_5, 6, 0) + # No more snapshots after 5 + return (None, start_id, 0) + + mock_snapshot_manager.find_next_scannable.side_effect = find_next_scannable + mock_snapshot_manager.get_latest_snapshot.return_value = None + + mock_manifest_list_manager.read_delta.return_value = [] + mock_manifest_file_manager.read_entries_parallel.return_value = [] + + scan = AsyncStreamingTableScan(table, poll_interval_ms=10) + scan.next_snapshot_id = 5 + + async def get_one_plan(): + async for plan in scan.stream(): + # After getting plan for snapshot 5, prefetch for 6 should start + # but return None since snapshot 6 doesn't exist + await asyncio.sleep(0.05) # Let prefetch complete + # Prefetch task should have completed (or be None) + return plan + + plan = asyncio.run(get_one_plan()) + self.assertIsInstance(plan, Plan) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + def test_prefetch_disabled_no_prefetch_future( + self, + MockManifestFileManager, + MockManifestListManager, + MockSnapshotManager): + """With prefetch disabled, no prefetch future should be created.""" + table, _ = _create_mock_table(latest_snapshot_id=5) + + mock_snapshot_manager = MockSnapshotManager.return_value + mock_manifest_list_manager = MockManifestListManager.return_value + mock_manifest_file_manager = MockManifestFileManager.return_value + mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} + + snapshot_5 = _create_mock_snapshot(5, "APPEND") + + def find_next_scannable(start_id, should_scan, lookahead_size=10, max_workers=4): + if start_id == 5: + return (snapshot_5, 6, 0) + return (None, start_id, 0) + + mock_snapshot_manager.find_next_scannable.side_effect = find_next_scannable + mock_snapshot_manager.get_latest_snapshot.return_value = None + + mock_manifest_list_manager.read_delta.return_value = [] + mock_manifest_file_manager.read_entries_parallel.return_value = [] + + scan = AsyncStreamingTableScan(table, poll_interval_ms=10, prefetch_enabled=False) + scan.next_snapshot_id = 5 + + async def get_one_plan(): + async for plan in scan.stream(): + await asyncio.sleep(0.01) + # With prefetch disabled, no task should exist + self.assertIsNone(scan._prefetch_future) + return plan + + asyncio.run(get_one_plan()) + + +class StreamingCatchUpDiffTest(unittest.TestCase): + """Tests for diff-based catch-up optimization in AsyncStreamingTableScan.""" + + @patch('pypaimon.read.streaming_table_scan.IncrementalDiffScanner') + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + def test_stream_triggers_diff_catch_up_for_large_gap( + self, MockManifestFileManager, MockManifestListManager, + MockSnapshotManager, MockDiffScanner + ): + """ + When starting with a large gap, stream() should use diff scanner. + + This tests the full flow: + 1. CLI calls restore({"next_snapshot_id": 5}) for --from snapshot:5 + 2. stream() detects large gap (5 to 100, gap=95) + 3. Diff scanner is triggered + """ + table, _ = _create_mock_table(latest_snapshot_id=100) + + mock_snapshot_manager = MockSnapshotManager.return_value + mock_diff_scanner = MockDiffScanner.return_value + + # Setup: latest is 100, start is 5 (gap=95) + mock_snapshot_manager.get_latest_snapshot.return_value = _create_mock_snapshot(100) + mock_snapshot_manager.get_snapshot_by_id.return_value = _create_mock_snapshot(4) # start-1 + + # Diff scanner returns a plan with some splits + mock_split = Mock() + mock_plan = Plan([mock_split]) + mock_diff_scanner.scan.return_value = mock_plan + + scan = AsyncStreamingTableScan(table, poll_interval_ms=10, prefetch_enabled=False) + + # Simulate --from snapshot:5: restore to snapshot 5 + scan.next_snapshot_id = 5 + + # Verify diff catch-up should be used (gap=95 > threshold=10) + self.assertTrue(scan._should_use_diff_catch_up()) + + # Run stream() and get first plan + async def get_first_plan(): + async for plan in scan.stream(): + return plan + + asyncio.run(get_first_plan()) + + # Verify diff scanner was used + MockDiffScanner.assert_called_once_with(table) + mock_diff_scanner.scan.assert_called_once() + + # Verify next_snapshot_id was updated to latest + 1 + self.assertEqual(scan.next_snapshot_id, 101) + + +if __name__ == '__main__': + unittest.main()