Skip to content

Commit 314f2fe

Browse files
committed
[python] Add streaming infrastructure: scanners, consumers, caching, sharding
Add foundational infrastructure for pure-Python streaming reads: - Follow-up scanners (delta, changelog, incremental diff) for continuous snapshot polling - Consumer manager for persisting read progress - LRU caching for snapshots, manifests, and manifest lists - Batch existence checks for efficient file IO - Bucket-based sharding for parallel consumption - Row kind support in table reads - Streaming-related core options - Backtick support for identifier parsing Includes unit tests for all new components.
1 parent 8d40018 commit 314f2fe

25 files changed

Lines changed: 2701 additions & 31 deletions

paimon-python/dev/requirements.txt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ dataclasses>=0.8; python_version < "3.7"
2222
fastavro>=1.4,<2
2323
fsspec>=2021.10,<2026; python_version<"3.8"
2424
fsspec>=2023,<2026; python_version>="3.8"
25-
ossfs>=2021.8; python_version<"3.8"
26-
ossfs>=2023; python_version>="3.8"
25+
# ossfs moved to extras_require (pip install pypaimon[oss])
2726
packaging>=21,<26
2827
pandas>=1.1,<2; python_version < "3.7"
2928
pandas>=1.3,<3; python_version >= "3.7" and python_version < "3.9"
@@ -32,8 +31,7 @@ polars>=0.9,<1; python_version<"3.8"
3231
polars>=1,<2; python_version>="3.8"
3332
pyarrow>=6,<7; python_version < "3.8"
3433
pyarrow>=16,<20; python_version >= "3.8"
35-
pylance>=0.20,<1; python_version>="3.9"
36-
pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"
34+
# pylance moved to extras_require (pip install pypaimon[lance])
3735
pyroaring
3836
readerwriterlock>=1,<2
3937
zstandard>=0.19,<1
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
"""
19+
Acceptance tests for pypaimon.
20+
21+
These tests use real file I/O with local temp filesystem to verify
22+
end-to-end behavior, as opposed to unit tests which use mocks.
23+
"""
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
"""
19+
Acceptance tests for IncrementalDiffScanner.
20+
21+
These tests verify that the diff approach (reading 2 base_manifest_lists)
22+
returns the same data as the delta approach (reading N delta_manifest_lists).
23+
24+
Uses real file I/O with local temp filesystem.
25+
"""
26+
27+
import asyncio
28+
import os
29+
import shutil
30+
import tempfile
31+
import unittest
32+
33+
import pyarrow as pa
34+
35+
from pypaimon import CatalogFactory, Schema
36+
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
37+
from pypaimon.manifest.manifest_list_manager import ManifestListManager
38+
from pypaimon.read.scanner.append_table_split_generator import AppendTableSplitGenerator
39+
from pypaimon.read.scanner.incremental_diff_scanner import IncrementalDiffScanner
40+
from pypaimon.read.streaming_table_scan import AsyncStreamingTableScan
41+
from pypaimon.snapshot.snapshot_manager import SnapshotManager
42+
43+
44+
class IncrementalDiffAcceptanceTest(unittest.TestCase):
45+
"""Acceptance tests for diff vs delta equivalence with real data."""
46+
47+
@classmethod
48+
def setUpClass(cls):
49+
cls.tempdir = tempfile.mkdtemp()
50+
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
51+
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
52+
cls.catalog.create_database('default', True)
53+
54+
cls.pa_schema = pa.schema([
55+
('id', pa.int32()),
56+
('value', pa.string()),
57+
('partition_col', pa.string())
58+
])
59+
60+
@classmethod
61+
def tearDownClass(cls):
62+
shutil.rmtree(cls.tempdir, ignore_errors=True)
63+
64+
def _create_table_with_snapshots(self, name, num_snapshots=5, partition_keys=None):
65+
"""
66+
Create a table and write num_snapshots of data.
67+
68+
Each snapshot adds 5 unique rows with IDs: snap_id*10 + [0,1,2,3,4]
69+
70+
Args:
71+
name: Table name (without database prefix)
72+
num_snapshots: Number of snapshots to create
73+
partition_keys: Optional list of partition keys
74+
75+
Returns:
76+
Tuple of (table, expected_data_per_snapshot)
77+
"""
78+
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=partition_keys)
79+
self.catalog.create_table(f'default.{name}', schema, False)
80+
table = self.catalog.get_table(f'default.{name}')
81+
82+
all_data = []
83+
for snap_id in range(1, num_snapshots + 1):
84+
write_builder = table.new_batch_write_builder()
85+
table_write = write_builder.new_write()
86+
table_commit = write_builder.new_commit()
87+
88+
data = {
89+
'id': [snap_id * 10 + i for i in range(5)],
90+
'value': [f'snap{snap_id}_row{i}' for i in range(5)],
91+
'partition_col': ['p1' if i % 2 == 0 else 'p2' for i in range(5)]
92+
}
93+
all_data.append(data)
94+
95+
pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
96+
table_write.write_arrow(pa_table)
97+
table_commit.commit(table_write.prepare_commit())
98+
table_write.close()
99+
table_commit.close()
100+
101+
return table, all_data
102+
103+
def _read_via_diff(self, table, start_snap_id, end_snap_id):
104+
"""
105+
Read data using diff approach.
106+
107+
Uses IncrementalDiffScanner to compute files added between
108+
start and end snapshots.
109+
110+
Args:
111+
table: The table to read from
112+
start_snap_id: Starting snapshot ID (exclusive)
113+
end_snap_id: Ending snapshot ID (inclusive)
114+
115+
Returns:
116+
PyArrow Table with the data
117+
"""
118+
snapshot_manager = SnapshotManager(table)
119+
start_snapshot = snapshot_manager.get_snapshot_by_id(start_snap_id)
120+
end_snapshot = snapshot_manager.get_snapshot_by_id(end_snap_id)
121+
122+
scanner = IncrementalDiffScanner(table)
123+
plan = scanner.scan(start_snapshot, end_snapshot)
124+
125+
splits = plan.splits()
126+
if not splits:
127+
# Return empty table with correct schema
128+
return pa.Table.from_pydict({
129+
'id': [],
130+
'value': [],
131+
'partition_col': []
132+
}, schema=self.pa_schema)
133+
134+
table_read = table.new_read_builder().new_read()
135+
return table_read.to_arrow(splits)
136+
137+
def _read_via_delta(self, table, start_snap_id, end_snap_id):
138+
"""
139+
Read data using delta approach.
140+
141+
Reads each delta_manifest_list from start_snap_id+1 to end_snap_id
142+
and combines all entries.
143+
144+
Args:
145+
table: The table to read from
146+
start_snap_id: Starting snapshot ID (exclusive)
147+
end_snap_id: Ending snapshot ID (inclusive)
148+
149+
Returns:
150+
PyArrow Table with the data
151+
"""
152+
snapshot_manager = SnapshotManager(table)
153+
manifest_list_manager = ManifestListManager(table)
154+
manifest_file_manager = ManifestFileManager(table)
155+
156+
all_entries = []
157+
for snap_id in range(start_snap_id + 1, end_snap_id + 1):
158+
snapshot = snapshot_manager.get_snapshot_by_id(snap_id)
159+
if snapshot and snapshot.commit_kind == 'APPEND':
160+
manifest_files = manifest_list_manager.read_delta(snapshot)
161+
if manifest_files:
162+
entries = manifest_file_manager.read_entries_parallel(manifest_files)
163+
all_entries.extend(entries)
164+
165+
if not all_entries:
166+
return pa.Table.from_pydict({
167+
'id': [],
168+
'value': [],
169+
'partition_col': []
170+
}, schema=self.pa_schema)
171+
172+
# Create splits from entries
173+
options = table.options
174+
split_generator = AppendTableSplitGenerator(
175+
table,
176+
options.source_split_target_size(),
177+
options.source_split_open_file_cost(),
178+
{}
179+
)
180+
splits = split_generator.create_splits(all_entries)
181+
182+
table_read = table.new_read_builder().new_read()
183+
return table_read.to_arrow(splits)
184+
185+
def _rows_to_set(self, arrow_table):
186+
"""Convert arrow table to set of (id, value, partition_col) tuples."""
187+
rows = set()
188+
for i in range(arrow_table.num_rows):
189+
row = (
190+
arrow_table.column('id')[i].as_py(),
191+
arrow_table.column('value')[i].as_py(),
192+
arrow_table.column('partition_col')[i].as_py()
193+
)
194+
rows.add(row)
195+
return rows
196+
197+
def test_diff_returns_same_rows_as_delta_simple(self):
198+
"""
199+
Basic case: 5 snapshots, verify row-level equivalence.
200+
201+
Creates a table with 5 snapshots, then reads data from snapshot 1 to 5
202+
using both diff and delta approaches, verifying they return the same rows.
203+
"""
204+
table, all_data = self._create_table_with_snapshots(
205+
'test_diff_delta_simple',
206+
num_snapshots=5
207+
)
208+
209+
# Read using both approaches (from snapshot 1 to 5, so we get snapshots 2-5)
210+
diff_result = self._read_via_diff(table, 1, 5)
211+
delta_result = self._read_via_delta(table, 1, 5)
212+
213+
# Convert to sets for order-independent comparison
214+
diff_rows = self._rows_to_set(diff_result)
215+
delta_rows = self._rows_to_set(delta_result)
216+
217+
self.assertEqual(diff_rows, delta_rows)
218+
219+
# Verify we got the expected number of rows (snapshots 2-5, 5 rows each = 20)
220+
self.assertEqual(len(diff_rows), 20)
221+
222+
# Verify specific IDs are present (from snapshots 2-5)
223+
expected_ids = set()
224+
for snap_id in range(2, 6): # snapshots 2, 3, 4, 5
225+
for i in range(5):
226+
expected_ids.add(snap_id * 10 + i)
227+
228+
actual_ids = {row[0] for row in diff_rows}
229+
self.assertEqual(actual_ids, expected_ids)
230+
231+
def test_diff_returns_same_rows_as_delta_many_snapshots(self):
232+
"""
233+
Stress test: 20 snapshots, verify row-level equivalence.
234+
235+
This tests the catch-up scenario where there are many snapshots
236+
between start and end.
237+
"""
238+
table, all_data = self._create_table_with_snapshots(
239+
'test_diff_delta_many',
240+
num_snapshots=20
241+
)
242+
243+
# Read using both approaches (from snapshot 1 to 20)
244+
diff_result = self._read_via_diff(table, 1, 20)
245+
delta_result = self._read_via_delta(table, 1, 20)
246+
247+
# Convert to sets for order-independent comparison
248+
diff_rows = self._rows_to_set(diff_result)
249+
delta_rows = self._rows_to_set(delta_result)
250+
251+
self.assertEqual(diff_rows, delta_rows)
252+
253+
# Verify we got the expected number of rows (snapshots 2-20, 5 rows each = 95)
254+
self.assertEqual(len(diff_rows), 95)
255+
256+
def test_diff_returns_same_rows_with_mixed_partitions(self):
257+
"""
258+
Partitioned table: Verify diff handles multiple partitions correctly.
259+
260+
Creates a partitioned table and verifies diff and delta return
261+
the same rows across all partitions.
262+
"""
263+
table, all_data = self._create_table_with_snapshots(
264+
'test_diff_delta_partitioned',
265+
num_snapshots=5,
266+
partition_keys=['partition_col']
267+
)
268+
269+
# Read using both approaches
270+
diff_result = self._read_via_diff(table, 1, 5)
271+
delta_result = self._read_via_delta(table, 1, 5)
272+
273+
# Convert to sets for order-independent comparison
274+
diff_rows = self._rows_to_set(diff_result)
275+
delta_rows = self._rows_to_set(delta_result)
276+
277+
self.assertEqual(diff_rows, delta_rows)
278+
279+
# Verify both partitions have data
280+
p1_rows = {r for r in diff_rows if r[2] == 'p1'}
281+
p2_rows = {r for r in diff_rows if r[2] == 'p2'}
282+
283+
self.assertGreater(len(p1_rows), 0, "Should have rows in partition p1")
284+
self.assertGreater(len(p2_rows), 0, "Should have rows in partition p2")
285+
286+
def test_streaming_catch_up_returns_same_data(self):
287+
"""
288+
End-to-end: Verify AsyncStreamingTableScan catch-up returns same data.
289+
290+
Creates a table with 20 snapshots, then uses streaming scan with
291+
a low diff_threshold to trigger diff-based catch-up. Verifies the
292+
total rows match expected.
293+
"""
294+
table, all_data = self._create_table_with_snapshots(
295+
'test_streaming_catch_up',
296+
num_snapshots=20
297+
)
298+
299+
# Create streaming scan with low threshold to trigger diff catch-up
300+
scan = AsyncStreamingTableScan(
301+
table,
302+
poll_interval_ms=10,
303+
diff_threshold=5, # Low threshold to trigger diff for gap > 5
304+
prefetch_enabled=False
305+
)
306+
307+
# Restore to snapshot 1 (will trigger catch-up to snapshot 20)
308+
scan.next_snapshot_id = 1
309+
310+
# Collect all rows from streaming scan
311+
all_rows = []
312+
table_read = table.new_read_builder().new_read()
313+
314+
async def collect_rows():
315+
plan_count = 0
316+
async for plan in scan.stream():
317+
splits = plan.splits()
318+
if splits:
319+
arrow_table = table_read.to_arrow(splits)
320+
for i in range(arrow_table.num_rows):
321+
row = (
322+
arrow_table.column('id')[i].as_py(),
323+
arrow_table.column('value')[i].as_py(),
324+
arrow_table.column('partition_col')[i].as_py()
325+
)
326+
all_rows.append(row)
327+
plan_count += 1
328+
# After first plan (catch-up), we should have all data
329+
# Break to avoid infinite loop waiting for new snapshots
330+
if plan_count >= 1:
331+
break
332+
333+
asyncio.run(collect_rows())
334+
335+
# Verify diff catch-up was used (gap of 19 > threshold of 5)
336+
self.assertTrue(scan._diff_catch_up_used,
337+
"Diff-based catch-up should have been used for large gap")
338+
339+
# Verify we got all expected rows (snapshots 1-20, 5 rows each = 100)
340+
# Note: catch-up includes snapshot 1's data since we start from next_snapshot_id=1
341+
self.assertEqual(len(all_rows), 100)
342+
343+
# Verify all expected IDs are present
344+
expected_ids = set()
345+
for snap_id in range(1, 21): # snapshots 1-20
346+
for i in range(5):
347+
expected_ids.add(snap_id * 10 + i)
348+
349+
actual_ids = {row[0] for row in all_rows}
350+
self.assertEqual(actual_ids, expected_ids)
351+
352+
353+
if __name__ == '__main__':
354+
unittest.main()

0 commit comments

Comments
 (0)