feat: add snapshot expiration methods with retention strategies#2369
feat: add snapshot expiration methods with retention strategies#2369ForeverAngry wants to merge 7 commits intoapache:mainfrom
Conversation
Anton-Tarazi
left a comment
There was a problem hiding this comment.
Nice job! Left some comments
pyiceberg/table/update/snapshot.py
Outdated
| protected_ids = self._get_protected_snapshot_ids() | ||
|
|
||
| # Sort snapshots by timestamp (most recent first) | ||
| sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) | ||
|
|
||
| # Keep the last N snapshots and all protected ones | ||
| snapshots_to_keep = set() | ||
| snapshots_to_keep.update(protected_ids) | ||
|
|
||
| # Add the N most recent snapshots | ||
| for i, snapshot in enumerate(sorted_snapshots): | ||
| if i < n: | ||
| snapshots_to_keep.add(snapshot.snapshot_id) | ||
|
|
||
| # Find snapshots to expire | ||
| snapshots_to_expire = [] | ||
| for snapshot in self._transaction.table_metadata.snapshots: | ||
| if snapshot.snapshot_id not in snapshots_to_keep: | ||
| snapshots_to_expire.append(snapshot.snapshot_id) |
There was a problem hiding this comment.
| protected_ids = self._get_protected_snapshot_ids() | |
| # Sort snapshots by timestamp (most recent first) | |
| sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) | |
| # Keep the last N snapshots and all protected ones | |
| snapshots_to_keep = set() | |
| snapshots_to_keep.update(protected_ids) | |
| # Add the N most recent snapshots | |
| for i, snapshot in enumerate(sorted_snapshots): | |
| if i < n: | |
| snapshots_to_keep.add(snapshot.snapshot_id) | |
| # Find snapshots to expire | |
| snapshots_to_expire = [] | |
| for snapshot in self._transaction.table_metadata.snapshots: | |
| if snapshot.snapshot_id not in snapshots_to_keep: | |
| snapshots_to_expire.append(snapshot.snapshot_id) | |
| snapshots_to_keep = self._get_protected_snapshot_ids() | |
| # Sort snapshots by timestamp (most recent first), and get most recent N | |
| sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) | |
| snapshots_to_keep.update(snapshot.snapshot_id for snapshot in sorted_snapshots[:n]) | |
| snapshots_to_expire = [id for snapshot in self._transaction.table_metadata.snapshots if (id := snapshot.snapshot_id) not in snapshots_to_keep] |
There was a problem hiding this comment.
small syntax change to make more pythonic :)
pyiceberg/table/update/snapshot.py
Outdated
| """ | ||
| properties = self._transaction.table_metadata.properties | ||
|
|
||
| max_snapshot_age_ms = properties.get("history.expire.max-snapshot-age-ms") |
There was a problem hiding this comment.
should this string and the default value be a named constant somewhere? What do you think about using property_as_int from properties.py to be consistent with how properties are handled elsewhere?
pyiceberg/table/update/snapshot.py
Outdated
| } | ||
|
|
||
| def by_id(self, snapshot_id: int) -> ExpireSnapshots: | ||
| def by_id(self, snapshot_id: int) -> "ExpireSnapshots": |
There was a problem hiding this comment.
fwiw since we have from __future__ import annotations at the top of the file I think its cleaner to make things consistent to not have quotes. Probably outside of the scope of this PR
There was a problem hiding this comment.
Thats a great point. I can log an issue to address these all at one.
pyiceberg/table/update/snapshot.py
Outdated
| protected_ids = self._get_protected_snapshot_ids() | ||
|
|
||
| # Sort snapshots by timestamp (most recent first) | ||
| sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) | ||
|
|
||
| # Start with all snapshots that could be expired | ||
| candidates_for_expiration = [] | ||
| snapshots_to_keep = set(protected_ids) | ||
|
|
||
| # Apply retain_last_n constraint | ||
| if retain_last_n is not None: | ||
| for i, snapshot in enumerate(sorted_snapshots): | ||
| if i < retain_last_n: | ||
| snapshots_to_keep.add(snapshot.snapshot_id) |
There was a problem hiding this comment.
this code is the same as in retain_last_n, can we refactor to its own function? I think we also need to handle branches and take the last n of each branch
| for snapshot in self._transaction.table_metadata.snapshots: | ||
| if snapshot.snapshot_id not in snapshots_to_keep and (timestamp_ms is None or snapshot.timestamp_ms < timestamp_ms): | ||
| candidates_for_expiration.append(snapshot) |
There was a problem hiding this comment.
make more pythonic with comprehension?
pyiceberg/table/update/snapshot.py
Outdated
| # Sort candidates by timestamp (oldest first) for potential expiration | ||
| candidates_for_expiration.sort(key=lambda s: s.timestamp_ms) | ||
|
|
||
| # Apply min_snapshots_to_keep constraint | ||
| total_snapshots = len(self._transaction.table_metadata.snapshots) | ||
| snapshots_to_expire: List[int] = [] | ||
|
|
||
| for candidate in candidates_for_expiration: | ||
| # Check if expiring this snapshot would violate min_snapshots_to_keep | ||
| remaining_after_expiration = total_snapshots - len(snapshots_to_expire) - 1 | ||
|
|
||
| if min_snapshots_to_keep is None or remaining_after_expiration >= min_snapshots_to_keep: | ||
| snapshots_to_expire.append(candidate.snapshot_id) | ||
| else: | ||
| # Stop expiring to maintain minimum count | ||
| break |
There was a problem hiding this comment.
| # Sort candidates by timestamp (oldest first) for potential expiration | |
| candidates_for_expiration.sort(key=lambda s: s.timestamp_ms) | |
| # Apply min_snapshots_to_keep constraint | |
| total_snapshots = len(self._transaction.table_metadata.snapshots) | |
| snapshots_to_expire: List[int] = [] | |
| for candidate in candidates_for_expiration: | |
| # Check if expiring this snapshot would violate min_snapshots_to_keep | |
| remaining_after_expiration = total_snapshots - len(snapshots_to_expire) - 1 | |
| if min_snapshots_to_keep is None or remaining_after_expiration >= min_snapshots_to_keep: | |
| snapshots_to_expire.append(candidate.snapshot_id) | |
| else: | |
| # Stop expiring to maintain minimum count | |
| break | |
| # Sort candidates by timestamp (newest first) for potential expiration | |
| candidates_for_expiration.sort(key=lambda s: s.timestamp_ms, reverse=True) | |
| snapshots_to_expire = candidates_for_expiration[min_snapshots_to_keep:] |
There was a problem hiding this comment.
double check that I didn't make an off-by-one error here but I believe this is a more concise way to express things :)
GitHub Copilot
Rationale for this change
Add richer snapshot expiration retention capabilities:
Internal helpers:
Motivation: safer, configurable space reclamation while preventing accidental over‑expiration.
ExpireSnapshot->"ExpireSnapshots"Are these changes tested?
Yes. Extended test_expire_snapshots.py with new cases:
Tests cover interaction of all parameters, property default fallback, protection of branch/tag snapshots, and minimum keep enforcement.
Are there any user-facing changes?
Yes :
No breaking changes to existing APIs; older_than and by_id/by_ids behavior unchanged. Documentation/changelog should be updated to reflect new retention strategy APIs and property usage.