Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
GCS_VERSION_AWARE = "gcs.version-aware"
HF_ENDPOINT = "hf.endpoint"
HF_TOKEN = "hf.token"
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read"


@runtime_checkable
Expand Down
22 changes: 0 additions & 22 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
HDFS_KERB_TICKET,
HDFS_PORT,
HDFS_USER,
PYARROW_USE_LARGE_TYPES_ON_READ,
S3_ACCESS_KEY_ID,
S3_ANONYMOUS,
S3_CONNECT_TIMEOUT,
Expand Down Expand Up @@ -179,7 +178,6 @@
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import millis_to_datetime
from pyiceberg.utils.decimal import unscaled_to_decimal
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
from pyiceberg.utils.singleton import Singleton
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
Expand Down Expand Up @@ -1756,14 +1754,6 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
(pa.Table.from_batches([batch]) for batch in itertools.chain([first_batch], batches)), promote_options="permissive"
)

if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
deprecation_message(
deprecated_in="0.10.0",
removed_in="0.11.0",
help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
)
result = result.cast(arrow_schema)

return result

def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
Expand Down Expand Up @@ -1872,7 +1862,6 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
_file_schema: Schema
_include_field_ids: bool
_downcast_ns_timestamp_to_us: bool
_use_large_types: bool | None
_projected_missing_fields: dict[int, Any]
_allow_timestamp_tz_mismatch: bool

Expand All @@ -1881,26 +1870,17 @@ def __init__(
file_schema: Schema,
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
use_large_types: bool | None = None,
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
allow_timestamp_tz_mismatch: bool = False,
) -> None:
self._file_schema = file_schema
self._include_field_ids = include_field_ids
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._use_large_types = use_large_types
self._projected_missing_fields = projected_missing_fields
# When True, allows projecting timestamptz (UTC) to timestamp (no tz).
# Allowed for reading (aligns with Spark); disallowed for writing to enforce Iceberg spec's strict typing.
self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch

if use_large_types is not None:
deprecation_message(
deprecated_in="0.10.0",
removed_in="0.11.0",
help_message="Argument `use_large_types` will be removed from ArrowProjectionVisitor",
)

def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
file_field = self._file_schema.find_field(field.field_id)

Expand Down Expand Up @@ -1949,8 +1929,6 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
target_schema = schema_to_pyarrow(
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
)
if self._use_large_types is False:
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
return values.cast(target_schema)

return values
Expand Down
47 changes: 1 addition & 46 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.utils.deprecated import deprecation_message

if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -344,54 +343,10 @@ def _partition_summary(self, update_metrics: UpdateMetrics) -> str:
return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()])


def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary:
for prop in {
TOTAL_DATA_FILES,
TOTAL_DELETE_FILES,
TOTAL_RECORDS,
TOTAL_FILE_SIZE,
TOTAL_POSITION_DELETES,
TOTAL_EQUALITY_DELETES,
}:
summary[prop] = "0"

def get_prop(prop: str) -> int:
value = previous_summary.get(prop) or "0"
try:
return int(value)
except ValueError as e:
raise ValueError(f"Could not parse summary property {prop} to an int: {value}") from e

if value := get_prop(TOTAL_DATA_FILES):
summary[DELETED_DATA_FILES] = str(value)
if value := get_prop(TOTAL_DELETE_FILES):
summary[REMOVED_DELETE_FILES] = str(value)
if value := get_prop(TOTAL_RECORDS):
summary[DELETED_RECORDS] = str(value)
if value := get_prop(TOTAL_FILE_SIZE):
summary[REMOVED_FILE_SIZE] = str(value)
if value := get_prop(TOTAL_POSITION_DELETES):
summary[REMOVED_POSITION_DELETES] = str(value)
if value := get_prop(TOTAL_EQUALITY_DELETES):
summary[REMOVED_EQUALITY_DELETES] = str(value)

return summary


def update_snapshot_summaries(
summary: Summary, previous_summary: Mapping[str, str] | None = None, truncate_full_table: bool = False
) -> Summary:
def update_snapshot_summaries(summary: Summary, previous_summary: Mapping[str, str] | None = None) -> Summary:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
deprecation_message(
deprecated_in="0.10.0",
removed_in="0.11.0",
help_message="The truncate-full-table shouldn't be used.",
)
summary = _truncate_table_summary(summary, previous_summary)

if not previous_summary:
previous_summary = {
TOTAL_DATA_FILES: "0",
Expand Down
44 changes: 0 additions & 44 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
NotNaN,
NotNull,
)
from pyiceberg.io import PYARROW_USE_LARGE_TYPES_ON_READ
from pyiceberg.io.pyarrow import (
pyarrow_to_schema,
)
Expand Down Expand Up @@ -1125,49 +1124,6 @@ def test_table_scan_keep_types(catalog: Catalog) -> None:
assert result_table.schema.equals(expected_schema)


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_override_with_small_types"
arrow_table = pa.Table.from_arrays(
[
pa.array(["a", "b", "c"]),
pa.array(["a", "b", "c"]),
pa.array([b"a", b"b", b"c"]),
pa.array([["a", "b"], ["c", "d"], ["e", "f"]]),
],
names=["string", "string-to-binary", "binary", "list"],
)

try:
catalog.drop_table(identifier)
except NoSuchTableError:
pass

tbl = catalog.create_table(
identifier,
schema=arrow_table.schema,
)

tbl.append(arrow_table)

with tbl.update_schema() as update_schema:
update_schema.update_column("string-to-binary", BinaryType())

tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
result_table = tbl.scan().to_arrow()

expected_schema = pa.schema(
[
pa.field("string", pa.string()),
pa.field("string-to-binary", pa.large_binary()),
pa.field("binary", pa.binary()),
pa.field("list", pa.list_(pa.string())),
]
)
assert result_table.schema.equals(expected_schema)


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_empty_scan_ordered_str(catalog: Catalog) -> None:
Expand Down