-
Notifications
You must be signed in to change notification settings - Fork 431
Optimize upsert performance for large datasets #2943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Remove logging reset pyarrow squash
|
🙏 🙏 Just to illustrate. We're trying to upsert 30k x 10 cols rows into another 100k rows. It should fly like nothing, but interpreter gets killed after trying to upsert for a like a minute or two :/ I mean, we're not even talking big data here... We're probably going to patch up our scripts with PySpark (yuck). |
|
I would seriously consider Rust bindings in the future. This won't fly, at least for merges :/ |
|
thanks for the pr! this is a rather large change so i'll have to take some time and understand it. i want to verify the correctness of the filtering also i feel like the upsert code is getting harder and harder to maintain. I want to propose an alternative solution to support the upsert in pyiceberg. my general feeling is that pyiceberg is "not an engine" and will never perform upsert efficiently, but it might be able to delegate the upsert to an engine. i'll raise as a separate discussion |
Summary
This PR improves the performance of the
upsert()operation, particularly for large upserts with 10,000+ rows. The changes address three main bottlenecks in the current implementation.Problem
The current upsert implementation has several performance issues that become significant with large datasets:
Expensive match filter generation: For composite keys,
create_match_filter()generatesOr(And(EqualTo, EqualTo), ...)expressions - oneAndclause per unique key combination. With 10k+ rows, this creates big expression trees that are slow to evaluate, up to n*m leaves for a single key column.Per-batch insert filtering: The insert logic filters rows using expression evaluation (
expression_to_pyarrow) on each batch, which is inefficient and doesn't leverage PyArrow's join capabilities.Row-by-row comparison:
get_rows_to_update()uses Python loops to compare rows one at a time (source_table.slice(source_idx, 1)), missing the opportunity for vectorized operations.Solution
1. Coarse Match Filter for Initial Scan (Biggest Performance Win)
Added
create_coarse_match_filter()that generates a less precise but much faster filter for the initial table scan. This is where the majority of the performance improvement comes from.And(In(col1, values), In(col2, values))instead of exact key-tuple matchingcol >= min AND col <= max)AlwaysTrue()to allow full scan (exact matching happens downstream anyway)This is safe because exact key matching occurs in
get_rows_to_update()via the join operation.Key insight -
AlwaysTrue()is where the biggest win happens:The benchmark data was sparse, triggering the
AlwaysTrue()path. Counter-intuitively, this is actually the best case for performance improvement. The speedup doesn't come from reading fewer files - it comes from avoiding the massive expression tree construction and evaluation:Or(And(...), And(...), ...)with millions of nodes (8s), then evaluate during scan (382s)AlwaysTrue()instantly (0.07s), scan without filter overhead (1.4s)With sparse data, you'd read most/all files anyway, so the "full scan" isn't a penalty - but avoiding an expression tree with n×m nodes (n keys × m columns) and evaluating it across f files is a huge win.
When this optimization helps less:
2. Anti-Join for Insert Filtering
Replaced per-batch expression filtering with a single anti-join operation after processing all batches:
Note on memory usage: The new approach accumulates matched keys in memory during batch processing. We only store key columns (not full rows) to minimize memory footprint, and deduplicate after the loop. For tables with millions of matching rows, this could increase peak memory usage compared to the previous approach. A potential future improvement would be incremental deduplication during the loop.
3. Vectorized Row Comparison
Replaced row-by-row Python comparison with vectorized PyArrow operations:
The
_compare_columns_vectorized()function handles:pc.not_equal()with proper null handlingBenchmark Results
Ran benchmarks on a table with ~2M rows, doing incremental upserts:
Why times increase with each run: The table uses bucketing, and each upsert modifies files independently, causing file count to increase over time. The original implementation's big filter expression (
Or(And(...), ...)) had to be evaluated against every file, so more files = dramatically more time. The optimized version avoids this by usingAlwaysTrue(), making the scan time grow linearly with data size rather than exponentially with file count.This file increase could be mitigated with table maintenance (compaction), which is not yet implemented in PyIceberg.
Where the Time Went (Run 2: 2M rows, 32 batches)
The coarse filter approach shows the biggest improvement:
Or(And(...), And(...), ...)with millions of nodesAlwaysTrue()or simpleAnd(In(), In())Incremental Adoption
If the anti-join change is concerning due to memory implications, the coarse match filter optimization can be contributed separately as it provides the majority of the performance benefit and doesn't change the memory characteristics.
Suggested PR split:
get_rows_to_update()Future Considerations
Why Rust bindings weren't explored for this PR:
In ticket #2159, a suggestion was made to side-step performance issues by using the Python binding of the rust implementation. However, we would like to stick with a Python-centric implementation, because our use case requires mocking datetime using
time-machinefor:This is why I kept the implementation in pure Python rather than exploring Rust bindings.
Potential hybrid approach:
The data processing (filtering, joins, comparisons) is where most of the time is spent and could benefit from Rust bindings. However - and I'll be selfish here - snapshot creation and metadata operations should remain in Python to preserve the ability to mock time. Without this, our backfill and replay workflows would break.
A future optimization could:
I'd happily trade some performance for keeping our time-mocking capability intact.
Testing
Added comprehensive tests for:
create_coarse_match_filter()behavior across different dataset sizes and types_compare_columns_vectorized()with primitives, nulls, structs, nested structs, and listsBreaking Changes
None. The API remains unchanged; this is purely an internal optimization.
Files Changed
pyiceberg/table/__init__.py- upsert method optimizationspyiceberg/table/upsert_util.py- newcreate_coarse_match_filter(), vectorized comparison functionstests/table/test_upsert.py- new tests for optimization functionsNote: This code was co-written with the help of an AI agent (Claude), primarily to speed up exploration and understanding of the PyIceberg codebase. All the speed up ideas are mine. The benchmark results are from our real-world production data that we actively use and store. I have reviewed all the generated code. All related tests pass.