diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cc0d9ff341..fb22cf541e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -26,25 +26,12 @@ from functools import cached_property from itertools import chain from types import TracebackType -from typing import ( - TYPE_CHECKING, - Any, - TypeVar, -) +from typing import TYPE_CHECKING, Any, TypeVar from pydantic import Field import pyiceberg.expressions.parser as parser -from pyiceberg.expressions import ( - AlwaysFalse, - AlwaysTrue, - And, - BooleanExpression, - EqualTo, - IsNull, - Or, - Reference, -) +from pyiceberg.expressions import AlwaysFalse, AlwaysTrue, And, BooleanExpression, EqualTo, IsNull, Or, Reference from pyiceberg.expressions.visitors import ( ResidualEvaluator, _InclusiveMetricsEvaluator, @@ -54,36 +41,17 @@ manifest_evaluator, ) from pyiceberg.io import FileIO, load_file_io -from pyiceberg.manifest import ( - DataFile, - DataFileContent, - ManifestContent, - ManifestEntry, - ManifestFile, -) -from pyiceberg.partitioning import ( - PARTITION_FIELD_ID_START, - UNPARTITIONED_PARTITION_SPEC, - PartitionKey, - PartitionSpec, -) +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestFile +from pyiceberg.partitioning import PARTITION_FIELD_ID_START, UNPARTITIONED_PARTITION_SPEC, PartitionKey, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.delete_file_index import DeleteFileIndex from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider from pyiceberg.table.maintenance import MaintenanceTable -from pyiceberg.table.metadata import ( - INITIAL_SEQUENCE_NUMBER, - TableMetadata, -) -from pyiceberg.table.name_mapping import ( - NameMapping, -) +from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata +from pyiceberg.table.name_mapping import NameMapping from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef -from pyiceberg.table.snapshots import ( - Snapshot, - SnapshotLogEntry, -) +from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( AddPartitionSpecUpdate, @@ -107,11 +75,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ( - ManageSnapshots, - UpdateSnapshot, - _FastAppendFiles, -) +from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles from pyiceberg.table.update.sorting import UpdateSortOrder from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics @@ -126,9 +90,7 @@ Record, TableVersion, ) -from pyiceberg.types import ( - strtobool, -) +from pyiceberg.types import strtobool from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.properties import property_as_bool @@ -144,11 +106,7 @@ from pyiceberg_core.datafusion import IcebergDataFusionTable from pyiceberg.catalog import Catalog - from pyiceberg.catalog.rest.scan_planning import ( - RESTContentFile, - RESTDeleteFile, - RESTFileScanTask, - ) + from pyiceberg.catalog.rest.scan_planning import RESTContentFile, RESTDeleteFile, RESTFileScanTask ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" @@ -396,16 +354,20 @@ def _set_ref_snapshot( return updates, requirements - def _build_partition_predicate(self, partition_records: set[Record]) -> BooleanExpression: + def _build_partition_predicate( + self, partition_records: set[Record], spec: PartitionSpec | None = None, schema: Schema | None = None + ) -> BooleanExpression: """Build a filter predicate matching any of the input partition records. Args: partition_records: A set of partition records to match + spec: An optional partition spec, if none then defaults to current + schema: An optional schema, if none then defaults to current Returns: A predicate matching any of the input partition records. """ - partition_spec = self.table_metadata.spec() - schema = self.table_metadata.schema() + partition_spec = spec or self.table_metadata.spec() + schema = schema or self.table_metadata.schema() partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields] expr: BooleanExpression = AlwaysFalse() @@ -673,11 +635,7 @@ def delete( case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive branch: Branch Reference to run the delete operation """ - from pyiceberg.io.pyarrow import ( - ArrowScan, - _dataframe_to_data_files, - _expression_to_complementary_pyarrow, - ) + from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow if ( self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c88337e724..be7e847791 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -26,11 +26,7 @@ from typing import TYPE_CHECKING, Generic from pyiceberg.avro.codecs import AvroCompressionCodec -from pyiceberg.expressions import ( - AlwaysFalse, - BooleanExpression, - Or, -) +from pyiceberg.expressions import AlwaysFalse, BooleanExpression, Or from pyiceberg.expressions.visitors import ( ROWS_MIGHT_NOT_MATCH, ROWS_MUST_MATCH, @@ -51,9 +47,8 @@ write_manifest, write_manifest_list, ) -from pyiceberg.partitioning import ( - PartitionSpec, -) +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRefType from pyiceberg.table.snapshots import ( Operation, @@ -76,10 +71,7 @@ UpdatesAndRequirements, UpdateTableMetadata, ) -from pyiceberg.typedef import ( - EMPTY_DICT, - KeyDefaultDict, -) +from pyiceberg.typedef import EMPTY_DICT, KeyDefaultDict, Record from pyiceberg.utils.bin_packing import ListPacker from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.datetime import datetime_to_millis @@ -110,6 +102,8 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _deleted_data_files: set[DataFile] _compression: AvroCompressionCodec _target_branch: str | None + _predicate: BooleanExpression + _case_sensitive: bool def __init__( self, @@ -138,6 +132,8 @@ def __init__( self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._target_branch)) else None ) + self._predicate = AlwaysFalse() + self._case_sensitive = True def _validate_target_branch(self, branch: str | None) -> str | None: # if branch is none, write will be written into a staging snapshot @@ -182,13 +178,8 @@ def _process_manifests(self, manifests: list[ManifestFile]) -> list[ManifestFile def _manifests(self) -> list[ManifestFile]: def _write_added_manifest() -> list[ManifestFile]: if self._added_data_files: - with write_manifest( - format_version=self._transaction.table_metadata.format_version, + with self.new_manifest_writer( spec=self._transaction.table_metadata.spec(), - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, - avro_compression=self._compression, ) as writer: for data_file in self._added_data_files: writer.add( @@ -213,14 +204,7 @@ def _write_delete_manifest() -> list[ManifestFile]: for deleted_entry in deleted_entries: partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry) for spec_id, entries in partition_groups.items(): - with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, - avro_compression=self._compression, - ) as writer: + with self.new_manifest_writer(self.spec(spec_id)) as writer: for entry in entries: writer.add_entry(entry) deleted_manifests.append(writer.to_manifest_file()) @@ -228,6 +212,9 @@ def _write_delete_manifest() -> list[ManifestFile]: else: return [] + # Updates self._predicate with computed partition predicate for manifest pruning + self._build_delete_files_partition_predicate() + executor = ExecutorFactory.get_or_create() added_manifests = executor.submit(_write_added_manifest) @@ -344,6 +331,12 @@ def _commit(self) -> UpdatesAndRequirements: def snapshot_id(self) -> int: return self._snapshot_id + def schema(self, schema_id: int | None = None) -> Schema: + if schema_id: + if schema := self._transaction.table_metadata.schema_by_id(schema_id): + return schema + return self._transaction.table_metadata.schema() + def spec(self, spec_id: int) -> PartitionSpec: return self._transaction.table_metadata.specs()[spec_id] @@ -351,7 +344,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: return write_manifest( format_version=self._transaction.table_metadata.format_version, spec=spec, - schema=self._transaction.table_metadata.schema(), + schema=self.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, avro_compression=self._compression, @@ -366,6 +359,35 @@ def new_manifest_output(self) -> OutputFile: def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> list[ManifestEntry]: return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted) + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + project = inclusive_projection(self.schema(), self.spec(spec_id), self._case_sensitive) + return project(self._predicate) + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + return manifest_evaluator(self.spec(spec_id), self.schema(), self.partition_filters[spec_id], self._case_sensitive) + + def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None: + self._predicate = Or(self._predicate, predicate) + self._case_sensitive = case_sensitive + + def _build_delete_files_partition_predicate(self) -> None: + """Build BooleanExpression based on deleted data files partitions.""" + partition_to_overwrite: dict[int, set[Record]] = {} + for data_file in self._deleted_data_files: + group = partition_to_overwrite.setdefault(data_file.spec_id, set()) + group.add(data_file.partition) + + for spec_id, data_files in partition_to_overwrite.items(): + self.delete_by_predicate( + self._transaction._build_partition_predicate( + partition_records=data_files, schema=self.schema(), spec=self.spec(spec_id) + ) + ) + class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]): """Will delete manifest entries from the current snapshot based on the predicate. @@ -377,22 +399,6 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]): From the specification """ - _predicate: BooleanExpression - _case_sensitive: bool - - def __init__( - self, - operation: Operation, - transaction: Transaction, - io: FileIO, - branch: str | None = MAIN_BRANCH, - commit_uuid: uuid.UUID | None = None, - snapshot_properties: dict[str, str] = EMPTY_DICT, - ): - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch) - self._predicate = AlwaysFalse() - self._case_sensitive = True - def _commit(self) -> UpdatesAndRequirements: # Only produce a commit when there is something to delete if self.files_affected: @@ -400,25 +406,6 @@ def _commit(self) -> UpdatesAndRequirements: else: return (), () - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - schema = self._transaction.table_metadata.schema() - spec = self._transaction.table_metadata.specs()[spec_id] - project = inclusive_projection(schema, spec, self._case_sensitive) - return project(self._predicate) - - @cached_property - def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) - - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - schema = self._transaction.table_metadata.schema() - spec = self._transaction.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, schema, self.partition_filters[spec_id], self._case_sensitive) - - def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None: - self._predicate = Or(self._predicate, predicate) - self._case_sensitive = case_sensitive - @cached_property def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], bool]: """Computes all the delete operation and cache it when nothing changes. @@ -428,7 +415,6 @@ def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], boo - The manifest-entries that are deleted based on the metadata. - Flag indicating that rewrites of data-files are needed. """ - schema = self._transaction.table_metadata.schema() def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: return ManifestEntry.from_args( @@ -442,9 +428,11 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ) manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval + strict_metrics_evaluator = _StrictMetricsEvaluator( + self.schema(), self._predicate, case_sensitive=self._case_sensitive + ).eval inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( - schema, self._predicate, case_sensitive=self._case_sensitive + self.schema(), self._predicate, case_sensitive=self._case_sensitive ).eval existing_manifests = [] @@ -486,7 +474,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], - schema=self._transaction.table_metadata.schema(), + schema=self.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, avro_compression=self._compression, @@ -609,36 +597,46 @@ def _existing_manifests(self) -> list[ManifestFile]: """Determine if there are any existing manifest files.""" existing_files = [] + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch): for manifest_file in snapshot.manifests(io=self._io): - entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) - found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] + # Manifest does not contain rows that match the files to delete partitions + if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): + existing_files.append(manifest_file) + continue + + entries_to_write: set[ManifestEntry] = set() + found_deleted_entries: set[ManifestEntry] = set() + + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + if entry.data_file in self._deleted_data_files: + found_deleted_entries.add(entry) + else: + entries_to_write.add(entry) - if len(found_deleted_data_files) == 0: + # Is the intercept the empty set? + if len(found_deleted_entries) == 0: existing_files.append(manifest_file) - else: - # We have to rewrite the manifest file without the deleted data files - if any(entry.data_file not in found_deleted_data_files for entry in entries): - with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, - avro_compression=self._compression, - ) as writer: - for entry in entries: - if entry.data_file not in found_deleted_data_files: - writer.add_entry( - ManifestEntry.from_args( - status=ManifestEntryStatus.EXISTING, - snapshot_id=entry.snapshot_id, - sequence_number=entry.sequence_number, - file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, - ) - ) - existing_files.append(writer.to_manifest_file()) + continue + + # Delete all files from manifest + if len(entries_to_write) == 0: + continue + + # We have to rewrite the manifest file without the deleted data files + with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer: + for entry in entries_to_write: + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + existing_files.append(writer.to_manifest_file()) + return existing_files def _deleted_entries(self) -> list[ManifestEntry]: @@ -655,8 +653,12 @@ def _deleted_entries(self) -> list[ManifestEntry]: raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") executor = ExecutorFactory.get_or_create() + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: + if not manifest_evaluators[manifest.partition_spec_id](manifest): + return [] + return [ ManifestEntry.from_args( status=ManifestEntryStatus.DELETED,