Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions docs/content/pypaimon/python-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,124 @@ to the appropriate rollback logic.
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |

## Consumer Management

Consumer management allows you to track consumption progress, prevent snapshot expiration, and resume from breakpoints.

### Create ConsumerManager

```python
from pypaimon import CatalogFactory

# Get table and file_io
catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'})
table = catalog.get_table('database_name.table_name')
file_io = table.file_io()

# Create consumer manager
manager = table.consumer_manager()
```

### Get Consumer

Retrieve a consumer by its ID:

```python
from pypaimon.consumer.consumer import Consumer

consumer = manager.consumer('consumer_id')
if consumer:
print(f"Next snapshot: {consumer.next_snapshot}")
else:
print("Consumer not found")
```

### Reset Consumer

Create or reset a consumer with a new snapshot ID:

```python
# Reset consumer to snapshot 10
manager.reset_consumer('consumer_id', Consumer(next_snapshot=10))
```

### Delete Consumer

Delete a consumer by its ID:

```python
manager.delete_consumer('consumer_id')
```

### List Consumers

Get all consumers with their next snapshot IDs:

```python
consumers = manager.consumers()
for consumer_id, next_snapshot in consumers.items():
print(f"Consumer {consumer_id}: next snapshot {next_snapshot}")
```

### List All Consumer IDs

List all consumer IDs:

```python
consumer_ids = manager.list_all_ids()
for consumer_id in consumer_ids:
print(consumer_id)
```

### Get Minimum Next Snapshot

Get the minimum next snapshot across all consumers:

```python
min_snapshot = manager.min_next_snapshot()
if min_snapshot:
print(f"Minimum next snapshot: {min_snapshot}")
```

### Expire Consumers

Expire consumers modified before a given datetime:

```python
from datetime import datetime, timedelta

# Expire consumers older than 1 day
expire_time = datetime.now() - timedelta(days=1)
manager.expire(expire_time)
```

### Clear Consumers

Clear consumers matching regular expression patterns:

```python
# Clear all consumers starting with "test_"
manager.clear_consumers('test_.*')

# Clear all consumers except those starting with "prod_"
manager.clear_consumers(
'.*',
'prod_.*'
)
```

### Branch Support

ConsumerManager supports multiple branches:

```python
# Custom branch
branch_manager = manager.with_branch('feature_branch')

# Each branch maintains its own consumers
print(branch_manager.consumers()) # Consumers on feature branch
```

## Supported Features

The following shows the supported features of Python Paimon compared to Java Paimon:
Expand Down
18 changes: 18 additions & 0 deletions paimon-python/pypaimon/consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,21 @@
# limitations under the License.
################################################################################
"""Consumer management for tracking streaming read progress."""

from pypaimon.consumer.consumer import Consumer
from pypaimon.consumer.consumer_manager import (
DEFAULT_MAIN_BRANCH,
ConsumerManager,
_branch_path,
_is_main_branch,
_normalize_branch,
)

__all__ = [
'Consumer',
'ConsumerManager',
'DEFAULT_MAIN_BRANCH',
'_branch_path',
'_is_main_branch',
'_normalize_branch',
]
38 changes: 38 additions & 0 deletions paimon-python/pypaimon/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
"""Consumer dataclass for streaming read progress."""

import json
import time
from dataclasses import dataclass
from typing import Optional


@dataclass
Expand All @@ -36,3 +38,39 @@ def from_json(json_str: str) -> 'Consumer':
"""Deserialize from JSON."""
data = json.loads(json_str)
return Consumer(next_snapshot=data["nextSnapshot"])

@classmethod
def from_path(cls, file_io, path: str) -> Optional['Consumer']:
"""
Load consumer from file path with retry mechanism.

Args:
file_io: FileIO instance for reading files
path: Path to consumer file

Returns:
Consumer instance if file exists, None otherwise

Raises:
RuntimeError: If retry fails after 10 attempts
"""
retry_number = 0
exception = None
while retry_number < 10:
try:
content = file_io.read_file_utf8(path)
except FileNotFoundError:
return None
except Exception as e:
exception = e
time.sleep(0.2)
retry_number += 1
continue
try:
return cls.from_json(content)
except Exception as e:
exception = e
time.sleep(0.2)
retry_number += 1
continue
raise RuntimeError(f"Retry fail after 10 times: {exception}")
144 changes: 133 additions & 11 deletions paimon-python/pypaimon/consumer/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,45 @@
################################################################################
"""ConsumerManager for persisting streaming read progress."""

from typing import Optional
import re
from datetime import datetime
from typing import Dict, List, Optional

from pypaimon.consumer.consumer import Consumer

DEFAULT_MAIN_BRANCH = "main"
BRANCH_PREFIX = "branch-"


def _branch_path(table_path: str, branch: str) -> str:
"""Return the path string of a branch."""
if _is_main_branch(branch):
return table_path
return f"{table_path}/branch/{BRANCH_PREFIX}{branch}"


def _is_main_branch(branch: str) -> bool:
"""Check if branch is main branch."""
return branch == DEFAULT_MAIN_BRANCH


def _normalize_branch(branch: Optional[str]) -> str:
"""Normalize branch name, use DEFAULT_MAIN_BRANCH if None or empty."""
if not branch or not branch.strip():
return DEFAULT_MAIN_BRANCH
return branch


class ConsumerManager:
"""Manages consumer state stored at {table_path}/consumer/consumer-{id}."""

CONSUMER_PREFIX = "consumer-"

def __init__(self, file_io, table_path: str):
def __init__(self, file_io, table_path: str,
branch: Optional[str] = None):
self._file_io = file_io
self._table_path = table_path
self._branch = _normalize_branch(branch)

@staticmethod
def _validate_consumer_id(consumer_id: str) -> None:
Expand All @@ -45,22 +71,35 @@ def _validate_consumer_id(consumer_id: str) -> None:
f"consumer_id cannot be a relative path component: {consumer_id}"
)

def _consumer_directory(self) -> str:
"""Return the path to consumer directory."""
branch_table_path = _branch_path(self._table_path, self._branch)
return f"{branch_table_path}/consumer"

def _consumer_path(self, consumer_id: str) -> str:
"""Return the path to a consumer file."""
self._validate_consumer_id(consumer_id)
return (
f"{self._table_path}/consumer/"
f"{self.CONSUMER_PREFIX}{consumer_id}"
)
consumer_dir = self._consumer_directory()
return f"{consumer_dir}/{self.CONSUMER_PREFIX}{consumer_id}"

def _list_consumer_ids(self) -> List[str]:
"""List all consumer IDs."""
consumer_dir = self._consumer_directory()
if not self._file_io.exists(consumer_dir):
return []

consumer_ids = []
for status in self._file_io.list_status(consumer_dir):
if status.base_name and status.base_name.startswith(
self.CONSUMER_PREFIX):
consumer_id = status.base_name[len(self.CONSUMER_PREFIX):]
consumer_ids.append(consumer_id)
return consumer_ids

def consumer(self, consumer_id: str) -> Optional[Consumer]:
"""Get consumer state, or None if not found."""
path = self._consumer_path(consumer_id)
if not self._file_io.exists(path):
return None

json_str = self._file_io.read_file_utf8(path)
return Consumer.from_json(json_str)
return Consumer.from_path(self._file_io, path)

def reset_consumer(self, consumer_id: str, consumer: Consumer) -> None:
"""Write or update consumer state."""
Expand All @@ -71,3 +110,86 @@ def delete_consumer(self, consumer_id: str) -> None:
"""Delete a consumer."""
path = self._consumer_path(consumer_id)
self._file_io.delete_quietly(path)

def min_next_snapshot(self) -> Optional[int]:
"""Get minimum next snapshot among all consumers."""
consumer_ids = self._list_consumer_ids()
if not consumer_ids:
return None

min_snapshot = None
for consumer_id in consumer_ids:
consumer = self.consumer(consumer_id)
if consumer:
if min_snapshot is None or consumer.next_snapshot < min_snapshot:
min_snapshot = consumer.next_snapshot
return min_snapshot

def expire(self, expire_datetime: datetime) -> None:
"""Delete consumers with modification time before expire_datetime."""
consumer_dir = self._consumer_directory()
if not self._file_io.exists(consumer_dir):
return

expire_timestamp = expire_datetime.timestamp()
for status in self._file_io.list_status(consumer_dir):
if status.base_name and status.base_name.startswith(
self.CONSUMER_PREFIX):
consumer_path = f"{consumer_dir}/{status.base_name}"
try:
if status.mtime and status.mtime < expire_timestamp:
self._file_io.delete_quietly(consumer_path)
except Exception:
pass

def list_all_ids(self) -> List[str]:
"""List all consumer IDs."""
return self._list_consumer_ids()

def consumers(self) -> Dict[str, int]:
"""Get all consumers as a dict mapping consumer_id to next_snapshot."""
consumer_ids = self._list_consumer_ids()
consumers = {}
for consumer_id in consumer_ids:
consumer = self.consumer(consumer_id)
if consumer:
consumers[consumer_id] = consumer.next_snapshot
return consumers

def clear_consumers(
self, including_pattern: str, excluding_pattern: Optional[str] = None
) -> None:
"""
Clear consumers matching inclusion pattern but not exclusion pattern.

Args:
including_pattern: Regex pattern for consumers to delete
excluding_pattern: Optional regex pattern for consumers to keep
"""
consumer_ids = self._list_consumer_ids()
including_re = re.compile(including_pattern)
excluding_re = (
re.compile(excluding_pattern) if excluding_pattern else None
)

for consumer_id in consumer_ids:
should_delete = including_re.match(consumer_id)
if excluding_re:
should_delete = should_delete and not excluding_re.match(
consumer_id
)

if should_delete:
self.delete_consumer(consumer_id)

def with_branch(self, branch: Optional[str]) -> 'ConsumerManager':
"""
Create a new ConsumerManager instance with the specified branch.

Args:
branch: Branch name, or None for main branch

Returns:
A new ConsumerManager instance with the specified branch
"""
return ConsumerManager(self._file_io, self._table_path, branch)
Loading
Loading