Skip to content

Comments

feat: Add experimental Grace Hash Join operator with spill-to-disk support#3564

Draft
andygrove wants to merge 10 commits intoapache:mainfrom
andygrove:feat/grace-hash-join
Draft

feat: Add experimental Grace Hash Join operator with spill-to-disk support#3564
andygrove wants to merge 10 commits intoapache:mainfrom
andygrove:feat/grace-hash-join

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Feb 21, 2026

Rationale

Comet has much better performance and scalability when using hash join rather than sort-merge join, but we cannot enable hash join by default or for all workloads because there is no spill-to-disk support, so it can cause OOM.

This PR adds a new hash join implementation with spilling built in, so that we can support all workloads with hash join.

Performance

CometJoinBenchmark

Join Type Spark SMJ (ms) Comet SMJ (ms) Comet Hash Join (ms) Comet Grace Hash Join (ms) GHJ vs Spark
Inner 560 384 310 279 2.0X
Left 535 367 322 292 1.8X
Right 530 382 321 283 1.9X
Full 624 368 326 293 2.1X
LeftSemi 351 211 204 207 1.7X
LeftAnti 330 181 182 183 1.8X
Inner w/ Filter 450 316 251 223 2.0X

TPC-H

tpch_allqueries

TPC-DS

Fails on q72 with GHJ. This is the most challenging join in the benchmark.

It is not possible to run this benchmark with HJ because it OOMs.

This chart is based on a run without q72.

tpcds_allqueries

Implementation Details

  • Adds a new Grace Hash Join operator that partitions both build and probe sides into hash buckets, spilling to disk when memory is tight, then joining each partition independently
  • Supports all join types (Inner, Left, Right, Full, LeftSemi, LeftAnti) with build-side selection
  • Uses incremental Arrow IPC spill I/O via SpillWriter for efficient disk access
  • Includes recursive repartitioning (up to 3 levels) for oversized partitions
  • Adds CometGraceHashJoinExec Spark operator with production metrics (build/probe/join time, spill count/bytes)
  • Controlled by spark.comet.exec.graceHashJoin.enabled (default false) and spark.comet.exec.graceHashJoin.numPartitions (default 16)
  • Includes comprehensive deterministic tests, fuzz tests with ParquetGenerator, and join microbenchmarks comparing Spark SMJ, Comet SMJ, Comet Hash Join, and Comet Grace Hash Join

Metrics

Screenshot 2026-02-21 at 8 54 18 AM

🤖 Generated with Claude Code

andygrove and others added 6 commits February 21, 2026 07:49
Implement a Grace Hash Join operator that partitions both build and probe
sides into N buckets by hashing join keys, then performs per-partition
hash joins using DataFusion's HashJoinExec. Spills partitions to disk
via Arrow IPC when memory pressure is detected.

Key features:
- SpillWriter for efficient incremental append I/O (no read-rewrite)
- All join types supported (Inner, Left, Right, Full, Semi, Anti)
- Build side selection (BuildLeft/BuildRight) via planner
- Recursive repartitioning for oversized partitions (max depth 3)
- Production metrics (build_time, probe_time, join_time, spill_count, etc.)
- CometGraceHashJoinExec Spark-side integration with metrics wiring
- Comprehensive test suite including fuzz tests with ParquetGenerator

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Benchmarks all join types (Inner, Left, Right, Full, LeftSemi, LeftAnti)
plus filtered joins across Spark Sort Merge Join, Comet Sort Merge Join,
Comet Hash Join, and Comet Grace Hash Join implementations. Sets
COMET_REPLACE_SMJ appropriately for each case and uses auto shuffle mode.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…in is enabled

Grace Hash Join supports all join type / build side combinations, so
bypass the BuildRight+LeftAnti/LeftSemi guard and the
canBuildShuffledHashJoinLeft/Right restrictions when it is enabled.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
RewriteJoin converts SortMergeJoin to Spark's ShuffledHashJoinExec,
which doesn't support LeftSemi/LeftAnti with BuildLeft. The previous
commit bypassed these restrictions when Grace Hash Join was enabled,
but the intermediate ShuffledHashJoinExec fails validation before
CometExecRule can convert it to GraceHashJoinExec.

This reverts commit 02809ec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace N separate take() calls per batch with a prefix-sum approach
borrowed from the shuffle partitioner (multi_partition.rs):

- Add ScratchSpace struct with reusable buffers for hashes, partition
  IDs, and row indices, allocated once and reused across all batches
- Use interleave_record_batch with contiguous index slices instead of
  per-partition UInt32Array allocation + take()
- Concatenate small sub-batches with concat_batches before feeding to
  HashJoinExec to reduce per-batch join overhead
- Estimate per-partition memory sizes proportionally from total batch
  size instead of calling get_array_memory_size() on every sub-batch

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@andygrove andygrove force-pushed the feat/grace-hash-join branch 2 times, most recently from 87810fd to a8d1f07 Compare February 21, 2026 16:55
…sthrough

Two performance optimizations:
1. Replace interleave_record_batch with Arrow's take() kernel in take_partition -
   SIMD-optimized and avoids (batch_idx, row_idx) tuple overhead for single-batch case
2. Skip take_partition when entire batch goes to one partition - use batch directly
   via cheap clone instead of copying through take()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@andygrove andygrove force-pushed the feat/grace-hash-join branch from a8d1f07 to 60400c2 Compare February 21, 2026 17:09
@andygrove
Copy link
Member Author

GHJ is slightly slower than HJ currently.

tpch_queries_compare

@andygrove andygrove changed the title feat: Add Grace Hash Join operator with spill-to-disk support feat: Add experimental Grace Hash Join operator with spill-to-disk support Feb 21, 2026
andygrove and others added 3 commits February 21, 2026 14:02
Remove the build_batches.len() > 1 guard that skipped the repartition
check for single large batches, and multiply build_size by 3 to account
for hash table overhead (~2-3x raw data). Add info-level logging for
build/probe phase summaries and per-partition join decisions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The probe side of GraceHashJoin was accumulating batches without tracking
memory in the reservation, causing OOM when the probe side is massive and
the build side is tiny (e.g. TPC-DS q72 with 171M probe rows vs 15 build
rows). Now probe-side memory is tracked in the shared reservation and
partitions are spilled (both build and probe sides) when memory pressure
is detected during the probe phase.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When memory pressure is detected during the probe phase, spill enough
partitions to free at least 50% of in-memory data instead of just the
single largest partition. Previously, spilling one ~200MB partition
barely made room for the next sub-batch, leaving ~5GB of probe data
in memory and causing OS OOM kills. Now the spill loop continues until
meaningful headroom is created.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant