Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -905,4 +905,7 @@ Previous versions of Java (`<1.4.0`) implementations incorrectly assume the opti

## Nanoseconds Support

PyIceberg currently only supports upto microsecond precision in its TimestampType. PyArrow timestamp types in 's' and 'ms' will be upcast automatically to 'us' precision timestamps on write. Timestamps in 'ns' precision can also be downcast automatically on write if desired. This can be configured by setting the `downcast-ns-timestamp-to-us-on-write` property as "True" in the configuration file, or by setting the `PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE` environment variable. Refer to the [nanoseconds timestamp proposal document](https://docs.google.com/document/d/1bE1DcEGNzZAMiVJSZ0X1wElKLNkT9kRkk0hDlfkXzvU/edit#heading=h.ibflcctc9i1d) for more details on the long term roadmap for nanoseconds support
PyIceberg currently only supports upto microsecond precision in its TimestampType. PyArrow timestamp types in 's' and 'ms' will be upcast automatically to 'us' precision timestamps on write. Timestamps in 'ns' precision can also be downcast automatically when desired. This can be configured by setting the `downcast-ns-timestamp-to-us` property as "True" in the configuration file, or by setting the `PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US` environment variable. Refer to the [nanoseconds timestamp proposal document](https://docs.google.com/document/d/1bE1DcEGNzZAMiVJSZ0X1wElKLNkT9kRkk0hDlfkXzvU/edit#heading=h.ibflcctc9i1d) for more details on the long term roadmap for nanoseconds support.

!!! note "Deprecated config key"
The previous config key `downcast-ns-timestamp-to-us-on-write` (env: `PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE`) is deprecated. It still works but will emit a deprecation warning. Migrate to `downcast-ns-timestamp-to-us` (env: `PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US`).
4 changes: 2 additions & 2 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@
from pyiceberg.schema import Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
TableProperties,
_get_downcast_ns_timestamp_to_us,
)
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
Expand Down Expand Up @@ -842,7 +842,7 @@ def _convert_schema_if_needed(

from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
downcast_ns_timestamp_to_us = _get_downcast_ns_timestamp_to_us()
if isinstance(schema, pa.Schema):
schema: Schema = visit_pyarrow( # type: ignore
schema,
Expand Down
17 changes: 8 additions & 9 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
from pyiceberg.table import TableProperties, _get_downcast_ns_timestamp_to_us
from pyiceberg.table.deletion_vector import deletion_vectors_from_puffin_file
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -180,7 +180,6 @@
strtobool,
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import millis_to_datetime
from pyiceberg.utils.decimal import unscaled_to_decimal
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
Expand Down Expand Up @@ -1467,8 +1466,8 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. "
"Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically "
"downcast 'ns' to 'us' on write.",
"Use 'downcast-ns-timestamp-to-us' configuration property to automatically "
"downcast 'ns' to 'us'.",
)
else:
raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")
Expand Down Expand Up @@ -1763,7 +1762,7 @@ def __init__(
self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
self._case_sensitive = case_sensitive
self._limit = limit
self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
self._downcast_ns_timestamp_to_us = _get_downcast_ns_timestamp_to_us()
self._dictionary_columns = dictionary_columns

@property
Expand Down Expand Up @@ -2615,7 +2614,7 @@ def data_file_statistics_from_parquet_metadata(


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
from pyiceberg.table import TableProperties

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = property_as_int(
Expand All @@ -2634,7 +2633,7 @@ def write_parquet(task: WriteTask) -> DataFile:
else:
file_schema = table_schema

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
downcast_ns_timestamp_to_us = _get_downcast_ns_timestamp_to_us()
batches = [
_to_requested_schema(
requested_schema=file_schema,
Expand Down Expand Up @@ -2887,7 +2886,7 @@ def _dataframe_to_data_files(
Returns:
An iterable that supplies datafiles that represent the input data.
"""
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties, WriteTask
from pyiceberg.table import TableProperties, WriteTask

counter = counter or itertools.count(0)
write_uuid = write_uuid or uuid.uuid4()
Expand All @@ -2897,7 +2896,7 @@ def _dataframe_to_data_files(
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)
name_mapping = table_metadata.schema().name_mapping
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
downcast_ns_timestamp_to_us = _get_downcast_ns_timestamp_to_us()
task_schema = pyarrow_to_schema(
df.schema,
name_mapping=name_mapping,
Expand Down
43 changes: 37 additions & 6 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,39 @@
from pyiceberg.catalog.rest.scan_planning import RESTContentFile, RESTDeleteFile, RESTFileScanTask

ALWAYS_TRUE = AlwaysTrue()
DOWNCAST_NS_TIMESTAMP_TO_US = "downcast-ns-timestamp-to-us"
# Deprecated: use DOWNCAST_NS_TIMESTAMP_TO_US. The old key said "on-write" but the
# config also controls read-path downcasting, so the name was misleading.
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"


def _get_downcast_ns_timestamp_to_us() -> bool:
"""Return the effective value of the downcast-ns-timestamp-to-us config.

Checks the new ``downcast-ns-timestamp-to-us`` key first. If not set, falls
back to the deprecated ``downcast-ns-timestamp-to-us-on-write`` key and
emits a :class:`DeprecationWarning` so callers can migrate their config.
"""
config = Config()

value = config.get_bool(DOWNCAST_NS_TIMESTAMP_TO_US)
if value is not None:
return value

legacy = config.get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
if legacy is not None:
warnings.warn(
f"Config key '{DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE}' (env: PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) "
f"is deprecated. Use '{DOWNCAST_NS_TIMESTAMP_TO_US}' "
"(env: PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US) instead.",
DeprecationWarning,
stacklevel=2,
)
return legacy

return False


@dataclass()
class UpsertResult:
"""Summary the upsert operation."""
Expand Down Expand Up @@ -517,7 +547,7 @@ def append(
if not isinstance(df, (pa.Table, pa.RecordBatchReader)):
raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: {df}")

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
downcast_ns_timestamp_to_us = _get_downcast_ns_timestamp_to_us()
_check_pyarrow_schema_compatible(
self.table_metadata.schema(),
provided_schema=df.schema,
Expand Down Expand Up @@ -573,7 +603,7 @@ def dynamic_partition_overwrite(
f"in the latest partition spec: {field}"
)

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
downcast_ns_timestamp_to_us = _get_downcast_ns_timestamp_to_us()
_check_pyarrow_schema_compatible(
self.table_metadata.schema(),
provided_schema=df.schema,
Expand Down Expand Up @@ -674,7 +704,7 @@ def overwrite(
if not isinstance(df, (pa.Table, pa.RecordBatchReader)):
raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: {df}")

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
downcast_ns_timestamp_to_us = _get_downcast_ns_timestamp_to_us()
_check_pyarrow_schema_compatible(
self.table_metadata.schema(),
provided_schema=df.schema,
Expand Down Expand Up @@ -873,7 +903,7 @@ def upsert(

from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
downcast_ns_timestamp_to_us = _get_downcast_ns_timestamp_to_us()
_check_pyarrow_schema_compatible(
self.table_metadata.schema(),
provided_schema=df.schema,
Expand Down Expand Up @@ -2471,8 +2501,9 @@ def plan_files(self) -> Iterable[FileScanTask]:
options=self.options,
).plan_files(
manifests=manifests,
manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids
and manifest_entry.status == ManifestEntryStatus.ADDED,
manifest_entry_filter=lambda manifest_entry: (
manifest_entry.snapshot_id in append_snapshot_ids and manifest_entry.status == ManifestEntryStatus.ADDED
),
)

def to_arrow(self) -> pa.Table:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ filterwarnings = [
"ignore:As the c extension couldn't be imported:RuntimeWarning:google_crc32c",
# Ignore Spark 4.0.1 pandas conversion warning under pandas 3.0
"ignore:The copy keyword is deprecated and will be removed in a future version.*",
# Deprecated config key migration (backwards compat)
"ignore:Config key 'downcast-ns-timestamp-to-us-on-write'.*:DeprecationWarning",
]

[tool.mypy]
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ def test_add_files_with_timestamp_tz_ns_fails(session_catalog: Catalog, format_v
],
schema=nanoseconds_schema,
)
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"})
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US": "True"})

identifier = f"default.timestamptz_ns_added{format_version}"
tbl = _create_table(session_catalog, identifier, format_version, schema=nanoseconds_schema_iceberg)
Expand All @@ -754,8 +754,8 @@ def test_add_files_with_timestamp_tz_ns_fails(session_catalog: Catalog, format_v
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert (
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' "
"configuration property to automatically downcast 'ns' to 'us' on write." in exception_cause.args[0]
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us' "
"configuration property to automatically downcast 'ns' to 'us'." in exception_cause.args[0]
)


Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ def test_write_all_timestamp_precision(
arrow_table_schema_with_all_microseconds_timestamp_precisions: pa.Schema,
) -> None:
identifier = "default.table_all_timestamp_precision"
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"})
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US": "True"})

tbl = _create_table(
session_catalog,
Expand Down
96 changes: 92 additions & 4 deletions tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,101 @@ def test_pyarrow_timestamp_invalid_units() -> None:
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' "
"configuration property to automatically downcast 'ns' to 'us' on write."
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us' "
"configuration property to automatically downcast 'ns' to 'us'."
),
):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


def test_downcast_ns_timestamp_legacy_env_var_is_backwards_compat() -> None:
"""The deprecated PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE env var still activates downcasting."""
import os
import warnings

from pyiceberg.table import _get_downcast_ns_timestamp_to_us

env_key = "PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE"
old_value = os.environ.get(env_key)
try:
os.environ[env_key] = "True"
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
result = _get_downcast_ns_timestamp_to_us()
assert result is True, "Legacy env var should still activate downcasting"
deprecation_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert len(deprecation_warnings) == 1
assert "downcast-ns-timestamp-to-us-on-write" in str(deprecation_warnings[0].message)
assert "downcast-ns-timestamp-to-us" in str(deprecation_warnings[0].message)
finally:
if old_value is None:
os.environ.pop(env_key, None)
else:
os.environ[env_key] = old_value


def test_downcast_ns_timestamp_new_env_var_takes_precedence() -> None:
"""The new PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US env var works and emits no deprecation warning."""
import os
import warnings

from pyiceberg.table import _get_downcast_ns_timestamp_to_us

new_key = "PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US"
old_key = "PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE"
old_new = os.environ.get(new_key)
old_legacy = os.environ.get(old_key)
try:
os.environ[new_key] = "True"
os.environ.pop(old_key, None)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
result = _get_downcast_ns_timestamp_to_us()
assert result is True
deprecation_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert len(deprecation_warnings) == 0, "New key must not emit a deprecation warning"
finally:
if old_new is None:
os.environ.pop(new_key, None)
else:
os.environ[new_key] = old_new
if old_legacy is None:
os.environ.pop(old_key, None)
else:
os.environ[old_key] = old_legacy


def test_downcast_ns_timestamp_new_key_overrides_legacy_key() -> None:
"""When both keys are set, the new key wins and no deprecation warning is emitted."""
import os
import warnings

from pyiceberg.table import _get_downcast_ns_timestamp_to_us

new_key = "PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US"
old_key = "PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE"
old_new = os.environ.get(new_key)
old_legacy = os.environ.get(old_key)
try:
os.environ[new_key] = "False"
os.environ[old_key] = "True" # legacy says True, but new key says False
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
result = _get_downcast_ns_timestamp_to_us()
assert result is False, "New key must win over legacy key"
deprecation_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert len(deprecation_warnings) == 0, "New key present: no deprecation warning expected"
finally:
if old_new is None:
os.environ.pop(new_key, None)
else:
os.environ[new_key] = old_new
if old_legacy is None:
os.environ.pop(old_key, None)
else:
os.environ[old_key] = old_legacy


def test_pyarrow_timestamp_tz_to_iceberg() -> None:
pyarrow_type = pa.timestamp(unit="us", tz="UTC")
pyarrow_type_zero_offset = pa.timestamp(unit="us", tz="+00:00")
Expand All @@ -214,8 +302,8 @@ def test_pyarrow_timestamp_tz_invalid_units() -> None:
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' "
"configuration property to automatically downcast 'ns' to 'us' on write."
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us' "
"configuration property to automatically downcast 'ns' to 'us'."
),
):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
Expand Down
Loading