feat: Add experimental Grace Hash Join operator with spill-to-disk support#3564
Draft
andygrove wants to merge 10 commits intoapache:mainfrom
Draft
feat: Add experimental Grace Hash Join operator with spill-to-disk support#3564andygrove wants to merge 10 commits intoapache:mainfrom
andygrove wants to merge 10 commits intoapache:mainfrom
Conversation
Implement a Grace Hash Join operator that partitions both build and probe sides into N buckets by hashing join keys, then performs per-partition hash joins using DataFusion's HashJoinExec. Spills partitions to disk via Arrow IPC when memory pressure is detected. Key features: - SpillWriter for efficient incremental append I/O (no read-rewrite) - All join types supported (Inner, Left, Right, Full, Semi, Anti) - Build side selection (BuildLeft/BuildRight) via planner - Recursive repartitioning for oversized partitions (max depth 3) - Production metrics (build_time, probe_time, join_time, spill_count, etc.) - CometGraceHashJoinExec Spark-side integration with metrics wiring - Comprehensive test suite including fuzz tests with ParquetGenerator Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Benchmarks all join types (Inner, Left, Right, Full, LeftSemi, LeftAnti) plus filtered joins across Spark Sort Merge Join, Comet Sort Merge Join, Comet Hash Join, and Comet Grace Hash Join implementations. Sets COMET_REPLACE_SMJ appropriately for each case and uses auto shuffle mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…in is enabled Grace Hash Join supports all join type / build side combinations, so bypass the BuildRight+LeftAnti/LeftSemi guard and the canBuildShuffledHashJoinLeft/Right restrictions when it is enabled. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
RewriteJoin converts SortMergeJoin to Spark's ShuffledHashJoinExec, which doesn't support LeftSemi/LeftAnti with BuildLeft. The previous commit bypassed these restrictions when Grace Hash Join was enabled, but the intermediate ShuffledHashJoinExec fails validation before CometExecRule can convert it to GraceHashJoinExec. This reverts commit 02809ec. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace N separate take() calls per batch with a prefix-sum approach borrowed from the shuffle partitioner (multi_partition.rs): - Add ScratchSpace struct with reusable buffers for hashes, partition IDs, and row indices, allocated once and reused across all batches - Use interleave_record_batch with contiguous index slices instead of per-partition UInt32Array allocation + take() - Concatenate small sub-batches with concat_batches before feeding to HashJoinExec to reduce per-batch join overhead - Estimate per-partition memory sizes proportionally from total batch size instead of calling get_array_memory_size() on every sub-batch Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
87810fd to
a8d1f07
Compare
…sthrough Two performance optimizations: 1. Replace interleave_record_batch with Arrow's take() kernel in take_partition - SIMD-optimized and avoids (batch_idx, row_idx) tuple overhead for single-batch case 2. Skip take_partition when entire batch goes to one partition - use batch directly via cheap clone instead of copying through take() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
a8d1f07 to
60400c2
Compare
Member
Author
Remove the build_batches.len() > 1 guard that skipped the repartition check for single large batches, and multiply build_size by 3 to account for hash table overhead (~2-3x raw data). Add info-level logging for build/probe phase summaries and per-partition join decisions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The probe side of GraceHashJoin was accumulating batches without tracking memory in the reservation, causing OOM when the probe side is massive and the build side is tiny (e.g. TPC-DS q72 with 171M probe rows vs 15 build rows). Now probe-side memory is tracked in the shared reservation and partitions are spilled (both build and probe sides) when memory pressure is detected during the probe phase. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When memory pressure is detected during the probe phase, spill enough partitions to free at least 50% of in-memory data instead of just the single largest partition. Previously, spilling one ~200MB partition barely made room for the next sub-batch, leaving ~5GB of probe data in memory and causing OS OOM kills. Now the spill loop continues until meaningful headroom is created. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Rationale
Comet has much better performance and scalability when using hash join rather than sort-merge join, but we cannot enable hash join by default or for all workloads because there is no spill-to-disk support, so it can cause OOM.
This PR adds a new hash join implementation with spilling built in, so that we can support all workloads with hash join.
Performance
CometJoinBenchmark
TPC-H
TPC-DS
Fails on q72 with GHJ. This is the most challenging join in the benchmark.
It is not possible to run this benchmark with HJ because it OOMs.
This chart is based on a run without q72.
Implementation Details
SpillWriterfor efficient disk accessCometGraceHashJoinExecSpark operator with production metrics (build/probe/join time, spill count/bytes)spark.comet.exec.graceHashJoin.enabled(default false) andspark.comet.exec.graceHashJoin.numPartitions(default 16)Metrics
🤖 Generated with Claude Code