From 9adfd5b16a6381be6ebeeda83568f6d0040b887f Mon Sep 17 00:00:00 2001 From: Sam Verhasselt Date: Thu, 19 Feb 2026 16:25:07 -0800 Subject: [PATCH 1/2] Enable V3 manifest writing and row-lineage snapshot commits --- pyiceberg/manifest.py | 96 +++++++++++++++++++- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/metadata.py | 6 +- pyiceberg/table/update/__init__.py | 12 ++- pyiceberg/table/update/snapshot.py | 25 +++-- tests/integration/test_reads.py | 8 +- tests/integration/test_writes/test_writes.py | 14 ++- tests/table/test_init.py | 11 ++- tests/table/test_metadata.py | 12 ++- tests/utils/test_manifest.py | 94 ++++++++++++++++++- 10 files changed, 244 insertions(+), 36 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index cca0af7628..9e1a3e61fb 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -54,7 +54,7 @@ UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 -DEFAULT_READ_VERSION: Literal[2] = 2 +DEFAULT_READ_VERSION: Literal[3] = 3 INITIAL_SEQUENCE_NUMBER = 0 @@ -852,6 +852,17 @@ def partitions(self) -> list[PartitionFieldSummary] | None: def key_metadata(self) -> bytes | None: return self._data[14] + @property + def first_row_id(self) -> int | None: + return self._data[15] if len(self._data) > 15 else None + + @first_row_id.setter + def first_row_id(self, value: int | None) -> None: + if len(self._data) <= 15: + self._data.append(value) + else: + self._data[15] = value + def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 @@ -1240,6 +1251,12 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: return entry +class ManifestWriterV3(ManifestWriterV2): + @property + def version(self) -> TableVersion: + return 3 + + def write_manifest( format_version: TableVersion, spec: PartitionSpec, @@ -1252,6 +1269,8 @@ def write_manifest( return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression) elif format_version == 2: return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression) + elif format_version == 3: + return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") @@ -1295,6 +1314,10 @@ def __exit__( @abstractmethod def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ... + @property + def next_row_id(self) -> int | None: + return None + def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter: self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files]) return self @@ -1351,9 +1374,7 @@ def __init__( self._commit_snapshot_id = snapshot_id self._sequence_number = sequence_number - def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: - wrapped_manifest_file = copy(manifest_file) - + def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile: if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ: # if the sequence number is being assigned here, then the manifest must be created by the current operation. # To validate this, check that the snapshot id matches the current commit @@ -1374,6 +1395,59 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: wrapped_manifest_file.min_sequence_number = self._sequence_number return wrapped_manifest_file + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + return self._prepare_manifest_for_commit(copy(manifest_file)) + + +class ManifestListWriterV3(ManifestListWriterV2): + _next_row_id: int + + def __init__( + self, + output_file: OutputFile, + snapshot_id: int, + parent_snapshot_id: int | None, + sequence_number: int, + snapshot_first_row_id: int, + compression: AvroCompressionCodec, + ): + super().__init__( + output_file=output_file, + snapshot_id=snapshot_id, + parent_snapshot_id=parent_snapshot_id, + sequence_number=sequence_number, + compression=compression, + ) + self._format_version = 3 + self._meta = { + "snapshot-id": str(snapshot_id), + "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", + "sequence-number": str(sequence_number), + "first-row-id": str(snapshot_first_row_id), + "format-version": "3", + AVRO_CODEC_KEY: compression, + } + self._next_row_id = snapshot_first_row_id + + @property + def next_row_id(self) -> int | None: + return self._next_row_id + + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + wrapped_manifest_file = self._prepare_manifest_for_commit(copy(manifest_file)) + + if wrapped_manifest_file.content == ManifestContent.DATA and wrapped_manifest_file.first_row_id is None: + if wrapped_manifest_file.existing_rows_count is None or wrapped_manifest_file.added_rows_count is None: + raise ValueError( + "Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: " + f"{wrapped_manifest_file.manifest_path}" + ) + + wrapped_manifest_file.first_row_id = self._next_row_id + self._next_row_id += wrapped_manifest_file.existing_rows_count + wrapped_manifest_file.added_rows_count + + return wrapped_manifest_file + def write_manifest_list( format_version: TableVersion, @@ -1382,6 +1456,7 @@ def write_manifest_list( parent_snapshot_id: int | None, sequence_number: int | None, avro_compression: AvroCompressionCodec, + snapshot_first_row_id: int | None = None, ) -> ManifestListWriter: if format_version == 1: return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression) @@ -1389,5 +1464,18 @@ def write_manifest_list( if sequence_number is None: raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}") return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression) + elif format_version == 3: + if sequence_number is None: + raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}") + if snapshot_first_row_id is None: + raise ValueError(f"snapshot_first_row_id is required for V3 tables: {snapshot_first_row_id}") + return ManifestListWriterV3( + output_file=output_file, + snapshot_id=snapshot_id, + parent_snapshot_id=parent_snapshot_id, + sequence_number=sequence_number, + snapshot_first_row_id=snapshot_first_row_id, + compression=avro_compression, + ) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}") diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cc0d9ff341..d20890dce3 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -335,7 +335,7 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction: Returns: The alter table builder. """ - if format_version not in {1, 2}: + if format_version not in {1, 2, 3}: raise ValueError(f"Unsupported table format version: {format_version}") if format_version < self.table_metadata.format_version: diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 26b6e3d3ad..329d4c3a7a 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -66,7 +66,7 @@ INITIAL_SPEC_ID = 0 DEFAULT_SCHEMA_ID = 0 -SUPPORTED_TABLE_FORMAT_VERSION = 2 +SUPPORTED_TABLE_FORMAT_VERSION = 3 def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]: @@ -574,9 +574,6 @@ def construct_refs(self) -> TableMetadata: next_row_id: int | None = Field(alias="next-row-id", default=None) """A long higher than all assigned row IDs; the next snapshot's `first-row-id`.""" - def model_dump_json(self, exclude_none: bool = True, exclude: Any | None = None, by_alias: bool = True, **kwargs: Any) -> str: - raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551") - TableMetadata = Annotated[TableMetadataV1 | TableMetadataV2 | TableMetadataV3, Field(discriminator="format_version")] @@ -645,6 +642,7 @@ def new_table_metadata( properties=properties, last_partition_id=fresh_partition_spec.last_assigned_field_id, table_uuid=table_uuid, + next_row_id=0, ) else: raise ValidationError(f"Unknown format version: {format_version}") diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 455b46953c..de217cbb12 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -28,7 +28,7 @@ from pyiceberg.exceptions import CommitFailedException from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil +from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil, TableMetadataV3 from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, @@ -320,9 +320,17 @@ def _( return base_metadata updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version}) + updated_metadata = TableMetadataUtil._construct_without_validation(updated_metadata) + + if ( + isinstance(updated_metadata, TableMetadataV3) + and base_metadata.format_version < 3 + and updated_metadata.next_row_id is None + ): + updated_metadata = updated_metadata.model_copy(update={"next_row_id": 0}) context.add_update(update) - return TableMetadataUtil._construct_without_validation(updated_metadata) + return updated_metadata @_apply_table_update.register(SetPropertiesUpdate) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c88337e724..70b0f3354a 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -276,7 +276,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: def _commit(self) -> UpdatesAndRequirements: new_manifests = self._manifests() - next_sequence_number = self._transaction.table_metadata.next_sequence_number() + table_metadata = self._transaction.table_metadata + next_sequence_number = table_metadata.next_sequence_number() summary = self._summary(self.snapshot_properties) file_name = _new_manifest_list_file_name( @@ -287,20 +288,31 @@ def _commit(self) -> UpdatesAndRequirements: location_provider = self._transaction._table.location_provider() manifest_list_file_path = location_provider.new_metadata_location(file_name) + snapshot_first_row_id: int | None = None + if table_metadata.format_version >= 3: + snapshot_first_row_id = table_metadata.next_row_id + if snapshot_first_row_id is None: + raise ValueError("Cannot commit to a v3 table without next-row-id") + with write_manifest_list( - format_version=self._transaction.table_metadata.format_version, + format_version=table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, sequence_number=next_sequence_number, avro_compression=self._compression, + snapshot_first_row_id=snapshot_first_row_id, ) as writer: writer.add_manifests(new_manifests) - first_row_id: int | None = None + added_rows: int | None = None + if table_metadata.format_version >= 3: + writer_next_row_id = writer.next_row_id + if writer_next_row_id is None or snapshot_first_row_id is None: + raise ValueError("Cannot determine assigned rows for a v3 snapshot commit") + added_rows = writer_next_row_id - snapshot_first_row_id - if self._transaction.table_metadata.format_version >= 3: - first_row_id = self._transaction.table_metadata.next_row_id + first_row_id: int | None = snapshot_first_row_id snapshot = Snapshot( snapshot_id=self._snapshot_id, @@ -308,8 +320,9 @@ def _commit(self) -> UpdatesAndRequirements: manifest_list=manifest_list_file_path, sequence_number=next_sequence_number, summary=summary, - schema_id=self._transaction.table_metadata.current_schema_id, + schema_id=table_metadata.current_schema_id, first_row_id=first_row_id, + added_rows=added_rows, ) add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 6c8b4a20a7..2884ece400 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -968,10 +968,10 @@ def test_upgrade_table_version(catalog: Catalog) -> None: transaction.upgrade_table_version(format_version=1) assert "Cannot downgrade v2 table to v1" in str(e.value) - with pytest.raises(ValueError) as e: - with table_test_table_version.transaction() as transaction: - transaction.upgrade_table_version(format_version=3) - assert "Unsupported table format version: 3" in str(e.value) + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=3) + + assert table_test_table_version.format_version == 3 @pytest.mark.integration diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index e17c8ef633..0028bb1ebc 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -2336,10 +2336,10 @@ def test_nanosecond_support_on_catalog( _create_table(session_catalog, identifier, {"format-version": "3"}, schema=arrow_table_schema_with_all_timestamp_precisions) - with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"): - catalog.create_table( - "ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"} - ) + nanosecond_table = catalog.create_table( + "ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"} + ) + assert nanosecond_table.format_version == 3 with pytest.raises( UnsupportedPyArrowTypeException, match=re.escape("Column 'timestamp_ns' has an unsupported type: timestamp[ns]") @@ -2495,7 +2495,6 @@ def test_stage_only_overwrite_files( assert parent_snapshot_id == [None, first_snapshot, second_snapshot, second_snapshot, second_snapshot] -@pytest.mark.skip("V3 writer support is not enabled.") @pytest.mark.integration def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Catalog) -> None: """Test writing to a v3 table and reading with Spark.""" @@ -2528,6 +2527,11 @@ def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Cat tbl.append(test_data) + current_snapshot = tbl.current_snapshot() + assert current_snapshot is not None + assert current_snapshot.first_row_id == initial_next_row_id + assert current_snapshot.added_rows == len(test_data) + assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), ( "Expected next_row_id to be incremented by the number of added rows" ) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index aef6e3cc5f..591361d729 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -41,7 +41,7 @@ Table, TableIdentifier, ) -from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV2, _generate_snapshot_id +from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV2, TableMetadataV3, _generate_snapshot_id from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, @@ -81,6 +81,7 @@ SetPropertiesUpdate, SetSnapshotRefUpdate, SetStatisticsUpdate, + UpgradeFormatVersionUpdate, _apply_table_update, _TableMetadataUpdateContext, update_table_metadata, @@ -942,6 +943,14 @@ def test_update_metadata_update_sort_order_invalid(table_v2: Table) -> None: update_table_metadata(table_v2.metadata, (SetDefaultSortOrderUpdate(sort_order_id=invalid_order_id),)) +def test_upgrade_format_version_to_v3_initializes_next_row_id(table_v2: Table) -> None: + new_metadata = update_table_metadata(table_v2.metadata, (UpgradeFormatVersionUpdate(format_version=3),)) + + assert isinstance(new_metadata, TableMetadataV3) + assert new_metadata.format_version == 3 + assert new_metadata.next_row_id == 0 + + def test_update_metadata_with_multiple_updates(table_v1: Table) -> None: base_metadata = table_v1.metadata transaction = table_v1.transaction() diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index c163c90626..f374d93282 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -184,12 +184,13 @@ def test_serialize_v2(example_table_metadata_v2: dict[str, Any]) -> None: def test_serialize_v3(example_table_metadata_v3: dict[str, Any]) -> None: - # Writing will be part of https://github.com/apache/iceberg-python/issues/1551 - - with pytest.raises(NotImplementedError) as exc_info: - _ = TableMetadataV3(**example_table_metadata_v3).model_dump_json() + table_metadata = TableMetadataV3(**example_table_metadata_v3) + table_metadata_json = table_metadata.model_dump_json() + parsed = json.loads(table_metadata_json) - assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value) + assert parsed["format-version"] == 3 + assert parsed["next-row-id"] == 1 + assert TableMetadataV3(**parsed) == table_metadata def test_migrate_v1_schemas(example_table_metadata_v1: dict[str, Any]) -> None: @@ -837,6 +838,7 @@ def test_new_table_metadata_with_v3_schema() -> None: default_sort_order_id=1, refs={}, format_version=3, + next_row_id=0, ) assert actual.model_dump() == expected.model_dump() diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 3f859b3b32..11023b88bf 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -362,7 +362,7 @@ def test_write_empty_manifest() -> None: pass -@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("format_version", [1, 2, 3]) @pytest.mark.parametrize("compression", ["null", "deflate", "zstd"]) def test_write_manifest( generated_manifest_file_file_v1: str, @@ -534,7 +534,7 @@ def test_write_manifest( assert data_file.sort_order_id == 0 -@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("format_version", [1, 2, 3]) @pytest.mark.parametrize("parent_snapshot_id", [19, None]) @pytest.mark.parametrize("compression", ["null", "deflate"]) def test_write_manifest_list( @@ -566,6 +566,7 @@ def test_write_manifest_list( parent_snapshot_id=parent_snapshot_id, sequence_number=0, avro_compression=compression, + snapshot_first_row_id=0 if format_version == 3 else None, ) as writer: writer.add_manifests(demo_manifest_list) new_manifest_list = list(read_manifest_list(io.new_input(path))) @@ -575,8 +576,10 @@ def test_write_manifest_list( else: expected_metadata = {"snapshot-id": "25", "parent-snapshot-id": "null", "format-version": str(format_version)} - if format_version == 2: + if format_version in (2, 3): expected_metadata["sequence-number"] = "0" + if format_version == 3: + expected_metadata["first-row-id"] = "0" _verify_metadata_with_fastavro(path, expected_metadata) manifest_file = new_manifest_list[0] @@ -618,6 +621,89 @@ def test_write_manifest_list( assert entry.status == ManifestEntryStatus.ADDED +def test_write_manifest_list_v3_requires_snapshot_first_row_id() -> None: + io = load_file_io() + + with TemporaryDirectory() as tmp_dir: + path = tmp_dir + "/manifest-list.avro" + output = io.new_output(path) + with pytest.raises(ValueError, match="snapshot_first_row_id is required for V3 tables"): + with write_manifest_list( + format_version=3, + output_file=output, + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=7, + avro_compression="null", + ): + pass + + +def test_write_manifest_list_v3_assigns_first_row_id_and_tracks_next_row_id() -> None: + io = load_file_io() + commit_snapshot_id = 25 + + data_manifest = ManifestFile.from_args( + manifest_path="/tmp/data-manifest.avro", + manifest_length=1, + partition_spec_id=0, + content=ManifestContent.DATA, + sequence_number=-1, + min_sequence_number=-1, + added_snapshot_id=commit_snapshot_id, + added_files_count=1, + existing_files_count=1, + deleted_files_count=0, + added_rows_count=7, + existing_rows_count=5, + deleted_rows_count=0, + partitions=[], + key_metadata=None, + first_row_id=None, + _table_format_version=3, + ) + + data_manifest_with_existing_row_id = ManifestFile.from_args( + manifest_path="/tmp/data-manifest-with-row-id.avro", + manifest_length=1, + partition_spec_id=0, + content=ManifestContent.DATA, + sequence_number=9, + min_sequence_number=9, + added_snapshot_id=23, + added_files_count=1, + existing_files_count=0, + deleted_files_count=0, + added_rows_count=3, + existing_rows_count=0, + deleted_rows_count=0, + partitions=[], + key_metadata=None, + first_row_id=100, + _table_format_version=3, + ) + + with TemporaryDirectory() as tmp_dir: + path = tmp_dir + "/manifest-list.avro" + output = io.new_output(path) + with write_manifest_list( + format_version=3, + output_file=output, + snapshot_id=commit_snapshot_id, + parent_snapshot_id=19, + sequence_number=7, + avro_compression="null", + snapshot_first_row_id=11, + ) as writer: + writer.add_manifests([data_manifest, data_manifest_with_existing_row_id]) + assert writer.next_row_id == 23 + + new_manifest_list = list(read_manifest_list(io.new_input(path))) + + assert new_manifest_list[0].first_row_id == 11 + assert new_manifest_list[1].first_row_id == 100 + + @pytest.mark.parametrize( "raw_file_format,expected_file_format", [ @@ -899,7 +985,7 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: assert ref is references[0], f"All references to manifest {i} should be the same object instance" -@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("format_version", [1, 2, 3]) def test_manifest_writer_tell(format_version: TableVersion) -> None: io = load_file_io() test_schema = Schema(NestedField(1, "foo", IntegerType(), False)) From b50a01a987e792a4c1a8aacfb2aa507ed27a2c78 Mon Sep 17 00:00:00 2001 From: Sam Verhasselt Date: Thu, 19 Feb 2026 16:37:08 -0800 Subject: [PATCH 2/2] Tighten v3 AddSnapshotUpdate validation invariants --- pyiceberg/table/update/__init__.py | 31 ++++++++++++++---------- tests/table/test_init.py | 39 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 13 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index de217cbb12..8b2e2037d3 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -441,7 +441,7 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe elif base_metadata.snapshot_by_id(update.snapshot.snapshot_id) is not None: raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} already exists") elif ( - base_metadata.format_version == 2 + base_metadata.format_version >= 2 and update.snapshot.sequence_number is not None and update.snapshot.sequence_number <= base_metadata.last_sequence_number and update.snapshot.parent_snapshot_id is not None @@ -462,20 +462,25 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe f"Cannot add a snapshot with first row id smaller than the table's next-row-id " f"{update.snapshot.first_row_id} < {base_metadata.next_row_id}" ) + elif base_metadata.format_version >= 3 and update.snapshot.added_rows is None: + raise ValueError("Cannot add snapshot without added rows") + elif base_metadata.format_version >= 3 and base_metadata.next_row_id is None: + raise ValueError("Cannot add a snapshot when table next-row-id is null") + + metadata_updates: dict[str, Any] = { + "last_updated_ms": update.snapshot.timestamp_ms, + "last_sequence_number": update.snapshot.sequence_number, + "snapshots": base_metadata.snapshots + [update.snapshot], + } + if base_metadata.format_version >= 3: + next_row_id = base_metadata.next_row_id + added_rows = update.snapshot.added_rows + if next_row_id is None or added_rows is None: + raise ValueError("Cannot compute next-row-id for v3 snapshot update") + metadata_updates["next_row_id"] = next_row_id + added_rows context.add_update(update) - return base_metadata.model_copy( - update={ - "last_updated_ms": update.snapshot.timestamp_ms, - "last_sequence_number": update.snapshot.sequence_number, - "snapshots": base_metadata.snapshots + [update.snapshot], - "next_row_id": base_metadata.next_row_id + update.snapshot.added_rows - if base_metadata.format_version >= 3 - and base_metadata.next_row_id is not None - and update.snapshot.added_rows is not None - else None, - } - ) + return base_metadata.model_copy(update=metadata_updates) @_apply_table_update.register(SetSnapshotRefUpdate) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 591361d729..79db2954f3 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1714,6 +1714,45 @@ def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) +def test_add_snapshot_update_fails_without_added_rows(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=2, + ) + + with pytest.raises( + ValueError, + match="Cannot add snapshot without added rows", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_add_snapshot_update_fails_with_stale_sequence_number_in_v3(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=34, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=2, + added_rows=1, + ) + + with pytest.raises( + ValueError, + match="Cannot add snapshot with sequence number 34 older than last sequence number 34", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: new_snapshot = Snapshot( snapshot_id=25,