Skip to content

fix: race condition in SpillPool caused by buffered stream#20067

Open
dekuu5 wants to merge 8 commits intoapache:mainfrom
dekuu5:test_SpillPool
Open

fix: race condition in SpillPool caused by buffered stream#20067
dekuu5 wants to merge 8 commits intoapache:mainfrom
dekuu5:test_SpillPool

Conversation

@dekuu5
Copy link

@dekuu5 dekuu5 commented Jan 29, 2026

Which issue does this PR close?

Rationale for this change

The SpillPool test spill::spill_pool::channel was failing indeterministically due to a race condition between SpillFile's coordination logic and the spawn_buffered background task used in the stream reader.

What changes are included in this PR?

read_spill_as_stream now returns a normal stream not a buffered stream

Are these changes tested?

No, the fix is for an existing test mentioned in the issue. I wasn't able to find the bug initially, but I found it by stress-testing the original test 100 times in parallel.

Are there any user-facing changes?

no

i still think we should add a buffer stream that is aware of the coordination of the writer and reader but i can't warp my head around it yet.

Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
Copilot AI review requested due to automatic review settings January 29, 2026 18:27
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jan 29, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes an indeterministic test failure in SpillPool caused by a race condition between the writer and reader coordination logic when using a buffered stream.

Changes:

  • Removed the spawn_buffered wrapper from read_spill_as_stream to return an unbuffered stream
  • Removed the unused import of spawn_buffered from spill_manager.rs

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Member

@martin-g martin-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not too complex maybe add a test case verifying that there is no racing any more

max_record_batch_memory,
)));

Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems batch_read_buffer_capacity is no more used.
It could be deprecated or maybe even removed.
https://github.com/dekuu5/datafusion/blob/1b8ef43fdd1424a3e4fe2db213fec4e7228788b0/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L276 is a no-op

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay sure i will look into that and the tests also

Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
@dekuu5 dekuu5 requested a review from martin-g January 31, 2026 18:38
@dekuu5
Copy link
Author

dekuu5 commented Feb 4, 2026

Hello can i get a review on this pr
@martin-g @2010YOUY01

@adriangb
Copy link
Contributor

I have yet to review this but I wonder how it relates to #20159

@adriangb
Copy link
Contributor

@martin-g are you able to triage this since you started review already?

@martin-g
Copy link
Member

@dekuu5 Please fix the failing CI checks!
I will make a new review in the meantime!


#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_concurrent_writer_reader_race_condition() -> Result<()> {
// stress testing the concurncy in the reader and the reader to make sure there is now race condtion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// stress testing the concurncy in the reader and the reader to make sure there is now race condtion
// stress testing the concurrency in the reader and the writer to make sure there is now race condition

/// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
/// This method will generate output in FIFO order: the batch appended first
/// will be read first.
pub fn read_spill_as_stream(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, read_spill_as_stream() is exactly the same as read_spill_as_stream_unbuffered() below.
Maybe the buffering impl should have been preserved but the caller with the issue should have been changed to use read_spill_as_stream_unbuffered() ?!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I dig into this, the more I think there should be a better solution.
The PR solves the issue by removing the pre-fetching of spilled data.
IMO we should focus on finding the reason why the pre-fetching gets the wrong EOF (and drops the reader) and fix it.
#20027 (comment)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, read_spill_as_stream() is exactly the same as read_spill_as_stream_unbuffered() below. Maybe the buffering impl should have been preserved but the caller with the issue should have been changed to use read_spill_as_stream_unbuffered() ?!

yes indeed but i wrote this before #20159 was merged i was thinking about doing an unbuffered stream but i thought no other functions use it so why make another one

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I dig into this, the more I think there should be a better solution. The PR solves the issue by removing the pre-fetching of spilled data. IMO we should focus on finding the reason why the pre-fetching gets the wrong EOF (and drops the reader) and fix it. #20027 (comment)

Yes, I thought of that. I think a better approach is to make the buffered stream somehow aware of the synchronization between the reader and the writer. Maybe spawn_buffered should know the writer's status? That was what came to mind at the time, but I am not yet sure how to implement this.

@dekuu5
Copy link
Author

dekuu5 commented Feb 14, 2026

@martin-g this pr #20159 fix the problem should i try fixing it by using the buffed stream ? or should i close this pr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Indeterministic test failure in SpillPool

3 participants