Skip to content

Commit 8ec0f0e

Browse files
committed
Merge remote-tracking branch 'origin/main' into kevinjqliu/upgrade-pytest
2 parents 1862b80 + 78615d2 commit 8ec0f0e

File tree

8 files changed

+124
-120
lines changed

8 files changed

+124
-120
lines changed

pyiceberg/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
__version__ = "0.10.0"
18+
__version__ = "0.11.0"

pyiceberg/io/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@
9999
GCS_VERSION_AWARE = "gcs.version-aware"
100100
HF_ENDPOINT = "hf.endpoint"
101101
HF_TOKEN = "hf.token"
102-
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read"
103102

104103

105104
@runtime_checkable

pyiceberg/io/pyarrow.py

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
HDFS_KERB_TICKET,
101101
HDFS_PORT,
102102
HDFS_USER,
103-
PYARROW_USE_LARGE_TYPES_ON_READ,
104103
S3_ACCESS_KEY_ID,
105104
S3_ANONYMOUS,
106105
S3_CONNECT_TIMEOUT,
@@ -179,7 +178,6 @@
179178
from pyiceberg.utils.config import Config
180179
from pyiceberg.utils.datetime import millis_to_datetime
181180
from pyiceberg.utils.decimal import unscaled_to_decimal
182-
from pyiceberg.utils.deprecated import deprecation_message
183181
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
184182
from pyiceberg.utils.singleton import Singleton
185183
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
@@ -1656,6 +1654,7 @@ def _task_to_record_batches(
16561654
current_batch,
16571655
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
16581656
projected_missing_fields=projected_missing_fields,
1657+
allow_timestamp_tz_mismatch=True,
16591658
)
16601659

16611660

@@ -1755,14 +1754,6 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
17551754
(pa.Table.from_batches([batch]) for batch in itertools.chain([first_batch], batches)), promote_options="permissive"
17561755
)
17571756

1758-
if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
1759-
deprecation_message(
1760-
deprecated_in="0.10.0",
1761-
removed_in="0.11.0",
1762-
help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
1763-
)
1764-
result = result.cast(arrow_schema)
1765-
17661757
return result
17671758

17681759
def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
@@ -1849,13 +1840,18 @@ def _to_requested_schema(
18491840
downcast_ns_timestamp_to_us: bool = False,
18501841
include_field_ids: bool = False,
18511842
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
1843+
allow_timestamp_tz_mismatch: bool = False,
18521844
) -> pa.RecordBatch:
18531845
# We could reuse some of these visitors
18541846
struct_array = visit_with_partner(
18551847
requested_schema,
18561848
batch,
18571849
ArrowProjectionVisitor(
1858-
file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields
1850+
file_schema,
1851+
downcast_ns_timestamp_to_us,
1852+
include_field_ids,
1853+
projected_missing_fields=projected_missing_fields,
1854+
allow_timestamp_tz_mismatch=allow_timestamp_tz_mismatch,
18591855
),
18601856
ArrowAccessor(file_schema),
18611857
)
@@ -1866,46 +1862,44 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
18661862
_file_schema: Schema
18671863
_include_field_ids: bool
18681864
_downcast_ns_timestamp_to_us: bool
1869-
_use_large_types: bool | None
18701865
_projected_missing_fields: dict[int, Any]
1866+
_allow_timestamp_tz_mismatch: bool
18711867

18721868
def __init__(
18731869
self,
18741870
file_schema: Schema,
18751871
downcast_ns_timestamp_to_us: bool = False,
18761872
include_field_ids: bool = False,
1877-
use_large_types: bool | None = None,
18781873
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
1874+
allow_timestamp_tz_mismatch: bool = False,
18791875
) -> None:
18801876
self._file_schema = file_schema
18811877
self._include_field_ids = include_field_ids
18821878
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1883-
self._use_large_types = use_large_types
18841879
self._projected_missing_fields = projected_missing_fields
1885-
1886-
if use_large_types is not None:
1887-
deprecation_message(
1888-
deprecated_in="0.10.0",
1889-
removed_in="0.11.0",
1890-
help_message="Argument `use_large_types` will be removed from ArrowProjectionVisitor",
1891-
)
1880+
# When True, allows projecting timestamptz (UTC) to timestamp (no tz).
1881+
# Allowed for reading (aligns with Spark); disallowed for writing to enforce Iceberg spec's strict typing.
1882+
self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch
18921883

18931884
def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
18941885
file_field = self._file_schema.find_field(field.field_id)
18951886

18961887
if field.field_type.is_primitive:
18971888
if (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
18981889
if field.field_type == TimestampType():
1899-
# Downcasting of nanoseconds to microseconds
1890+
source_tz_compatible = values.type.tz is None or (
1891+
self._allow_timestamp_tz_mismatch and values.type.tz in UTC_ALIASES
1892+
)
19001893
if (
19011894
pa.types.is_timestamp(target_type)
19021895
and not target_type.tz
19031896
and pa.types.is_timestamp(values.type)
1904-
and not values.type.tz
1897+
and source_tz_compatible
19051898
):
1899+
# Downcasting of nanoseconds to microseconds
19061900
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
19071901
return values.cast(target_type, safe=False)
1908-
elif target_type.unit == "us" and values.type.unit in {"s", "ms"}:
1902+
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
19091903
return values.cast(target_type)
19101904
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
19111905
elif field.field_type == TimestamptzType():
@@ -1915,6 +1909,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
19151909
and pa.types.is_timestamp(values.type)
19161910
and (values.type.tz in UTC_ALIASES or values.type.tz is None)
19171911
):
1912+
# Downcasting of nanoseconds to microseconds
19181913
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
19191914
return values.cast(target_type, safe=False)
19201915
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
@@ -1934,8 +1929,6 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
19341929
target_schema = schema_to_pyarrow(
19351930
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
19361931
)
1937-
if self._use_large_types is False:
1938-
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
19391932
return values.cast(target_schema)
19401933

19411934
return values

pyiceberg/table/snapshots.py

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
3030
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
3131
from pyiceberg.schema import Schema
32-
from pyiceberg.utils.deprecated import deprecation_message
3332

3433
if TYPE_CHECKING:
3534
from pyiceberg.table.metadata import TableMetadata
@@ -344,54 +343,10 @@ def _partition_summary(self, update_metrics: UpdateMetrics) -> str:
344343
return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()])
345344

346345

347-
def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary:
348-
for prop in {
349-
TOTAL_DATA_FILES,
350-
TOTAL_DELETE_FILES,
351-
TOTAL_RECORDS,
352-
TOTAL_FILE_SIZE,
353-
TOTAL_POSITION_DELETES,
354-
TOTAL_EQUALITY_DELETES,
355-
}:
356-
summary[prop] = "0"
357-
358-
def get_prop(prop: str) -> int:
359-
value = previous_summary.get(prop) or "0"
360-
try:
361-
return int(value)
362-
except ValueError as e:
363-
raise ValueError(f"Could not parse summary property {prop} to an int: {value}") from e
364-
365-
if value := get_prop(TOTAL_DATA_FILES):
366-
summary[DELETED_DATA_FILES] = str(value)
367-
if value := get_prop(TOTAL_DELETE_FILES):
368-
summary[REMOVED_DELETE_FILES] = str(value)
369-
if value := get_prop(TOTAL_RECORDS):
370-
summary[DELETED_RECORDS] = str(value)
371-
if value := get_prop(TOTAL_FILE_SIZE):
372-
summary[REMOVED_FILE_SIZE] = str(value)
373-
if value := get_prop(TOTAL_POSITION_DELETES):
374-
summary[REMOVED_POSITION_DELETES] = str(value)
375-
if value := get_prop(TOTAL_EQUALITY_DELETES):
376-
summary[REMOVED_EQUALITY_DELETES] = str(value)
377-
378-
return summary
379-
380-
381-
def update_snapshot_summaries(
382-
summary: Summary, previous_summary: Mapping[str, str] | None = None, truncate_full_table: bool = False
383-
) -> Summary:
346+
def update_snapshot_summaries(summary: Summary, previous_summary: Mapping[str, str] | None = None) -> Summary:
384347
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
385348
raise ValueError(f"Operation not implemented: {summary.operation}")
386349

387-
if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
388-
deprecation_message(
389-
deprecated_in="0.10.0",
390-
removed_in="0.11.0",
391-
help_message="The truncate-full-table shouldn't be used.",
392-
)
393-
summary = _truncate_table_summary(summary, previous_summary)
394-
395350
if not previous_summary:
396351
previous_summary = {
397352
TOTAL_DATA_FILES: "0",

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
[project]
1818
name = "pyiceberg"
19-
version = "0.10.0"
19+
version = "0.11.0"
2020
description = "Apache Iceberg is an open table format for huge analytic datasets"
2121
authors = [{ name = "Apache Software Foundation", email = "dev@iceberg.apache.org" }]
2222
requires-python = ">=3.10.0,<4.0.0"

tests/integration/test_reads.py

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
NotNaN,
4747
NotNull,
4848
)
49-
from pyiceberg.io import PYARROW_USE_LARGE_TYPES_ON_READ
5049
from pyiceberg.io.pyarrow import (
5150
pyarrow_to_schema,
5251
)
@@ -1126,49 +1125,6 @@ def test_table_scan_keep_types(catalog: Catalog) -> None:
11261125
assert result_table.schema.equals(expected_schema)
11271126

11281127

1129-
@pytest.mark.integration
1130-
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
1131-
def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
1132-
identifier = "default.test_table_scan_override_with_small_types"
1133-
arrow_table = pa.Table.from_arrays(
1134-
[
1135-
pa.array(["a", "b", "c"]),
1136-
pa.array(["a", "b", "c"]),
1137-
pa.array([b"a", b"b", b"c"]),
1138-
pa.array([["a", "b"], ["c", "d"], ["e", "f"]]),
1139-
],
1140-
names=["string", "string-to-binary", "binary", "list"],
1141-
)
1142-
1143-
try:
1144-
catalog.drop_table(identifier)
1145-
except NoSuchTableError:
1146-
pass
1147-
1148-
tbl = catalog.create_table(
1149-
identifier,
1150-
schema=arrow_table.schema,
1151-
)
1152-
1153-
tbl.append(arrow_table)
1154-
1155-
with tbl.update_schema() as update_schema:
1156-
update_schema.update_column("string-to-binary", BinaryType())
1157-
1158-
tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
1159-
result_table = tbl.scan().to_arrow()
1160-
1161-
expected_schema = pa.schema(
1162-
[
1163-
pa.field("string", pa.string()),
1164-
pa.field("string-to-binary", pa.large_binary()),
1165-
pa.field("binary", pa.binary()),
1166-
pa.field("list", pa.list_(pa.string())),
1167-
]
1168-
)
1169-
assert result_table.schema.equals(expected_schema)
1170-
1171-
11721128
@pytest.mark.integration
11731129
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
11741130
def test_empty_scan_ordered_str(catalog: Catalog) -> None:

tests/io/test_pyarrow.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
expression_to_pyarrow,
8282
parquet_path_to_id_mapping,
8383
schema_to_pyarrow,
84+
write_file,
8485
)
8586
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
8687
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -2725,6 +2726,106 @@ def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None:
27252726
assert expected.equals(actual_result)
27262727

27272728

2729+
def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None:
2730+
# file is written with timestamp with timezone
2731+
file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2732+
batch = pa.record_batch(
2733+
[
2734+
pa.array(
2735+
[
2736+
datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
2737+
datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
2738+
],
2739+
type=pa.timestamp("us", tz="UTC"),
2740+
)
2741+
],
2742+
names=["ts_field"],
2743+
)
2744+
2745+
# table schema expects timestamp without timezone
2746+
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2747+
2748+
# allow_timestamp_tz_mismatch=True enables reading timestamptz as timestamp
2749+
actual_result = _to_requested_schema(
2750+
table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=True
2751+
)
2752+
expected = pa.record_batch(
2753+
[
2754+
pa.array(
2755+
[
2756+
datetime(2025, 8, 14, 12, 0, 0),
2757+
datetime(2025, 8, 14, 13, 0, 0),
2758+
],
2759+
type=pa.timestamp("us"),
2760+
)
2761+
],
2762+
names=["ts_field"],
2763+
)
2764+
2765+
# expect actual_result to have no timezone
2766+
assert expected.equals(actual_result)
2767+
2768+
2769+
def test__to_requested_schema_timestamptz_to_timestamp_write_rejects() -> None:
2770+
"""Test that the write path (default) rejects timestamptz to timestamp casting.
2771+
2772+
This ensures we enforce the Iceberg spec distinction between timestamp and timestamptz on writes,
2773+
while the read path can be more permissive (like Spark) via allow_timestamp_tz_mismatch=True.
2774+
"""
2775+
# file is written with timestamp with timezone
2776+
file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2777+
batch = pa.record_batch(
2778+
[
2779+
pa.array(
2780+
[
2781+
datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
2782+
datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
2783+
],
2784+
type=pa.timestamp("us", tz="UTC"),
2785+
)
2786+
],
2787+
names=["ts_field"],
2788+
)
2789+
2790+
# table schema expects timestamp without timezone
2791+
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2792+
2793+
# allow_timestamp_tz_mismatch=False (default, used in write path) should raise
2794+
with pytest.raises(ValueError, match="Unsupported schema projection"):
2795+
_to_requested_schema(
2796+
table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=False
2797+
)
2798+
2799+
2800+
def test_write_file_rejects_timestamptz_to_timestamp(tmp_path: Path) -> None:
2801+
"""Test that write_file rejects writing timestamptz data to a timestamp column."""
2802+
from pyiceberg.table import WriteTask
2803+
2804+
# Table expects timestamp (no tz), but data has timestamptz
2805+
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2806+
task_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2807+
2808+
arrow_data = pa.table({"ts_field": [datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc)]})
2809+
2810+
table_metadata = TableMetadataV2(
2811+
location=f"file://{tmp_path}",
2812+
last_column_id=1,
2813+
format_version=2,
2814+
schemas=[table_schema],
2815+
partition_specs=[PartitionSpec()],
2816+
)
2817+
2818+
task = WriteTask(
2819+
write_uuid=uuid.uuid4(),
2820+
task_id=0,
2821+
record_batches=arrow_data.to_batches(),
2822+
schema=task_schema,
2823+
)
2824+
2825+
with pytest.raises(ValueError, match="Unsupported schema projection"):
2826+
list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
2827+
2828+
27282829
def test__to_requested_schema_timestamps(
27292830
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
27302831
arrow_table_with_all_timestamp_precisions: pa.Table,

0 commit comments

Comments
 (0)