fix: race condition in SpillPool caused by buffered stream#20067
fix: race condition in SpillPool caused by buffered stream#20067dekuu5 wants to merge 8 commits intoapache:mainfrom
Conversation
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR fixes an indeterministic test failure in SpillPool caused by a race condition between the writer and reader coordination logic when using a buffered stream.
Changes:
- Removed the
spawn_bufferedwrapper fromread_spill_as_streamto return an unbuffered stream - Removed the unused import of
spawn_bufferedfromspill_manager.rs
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
martin-g
left a comment
There was a problem hiding this comment.
If it is not too complex maybe add a test case verifying that there is no racing any more
| max_record_batch_memory, | ||
| ))); | ||
|
|
||
| Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) |
There was a problem hiding this comment.
It seems batch_read_buffer_capacity is no more used.
It could be deprecated or maybe even removed.
https://github.com/dekuu5/datafusion/blob/1b8ef43fdd1424a3e4fe2db213fec4e7228788b0/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L276 is a no-op
There was a problem hiding this comment.
okay sure i will look into that and the tests also
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
|
Hello can i get a review on this pr |
|
I have yet to review this but I wonder how it relates to #20159 |
|
@martin-g are you able to triage this since you started review already? |
|
@dekuu5 Please fix the failing CI checks! |
|
|
||
| #[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
| async fn test_concurrent_writer_reader_race_condition() -> Result<()> { | ||
| // stress testing the concurncy in the reader and the reader to make sure there is now race condtion |
There was a problem hiding this comment.
| // stress testing the concurncy in the reader and the reader to make sure there is now race condtion | |
| // stress testing the concurrency in the reader and the writer to make sure there is now race condition |
| /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. | ||
| /// This method will generate output in FIFO order: the batch appended first | ||
| /// will be read first. | ||
| pub fn read_spill_as_stream( |
There was a problem hiding this comment.
Now, read_spill_as_stream() is exactly the same as read_spill_as_stream_unbuffered() below.
Maybe the buffering impl should have been preserved but the caller with the issue should have been changed to use read_spill_as_stream_unbuffered() ?!
There was a problem hiding this comment.
The more I dig into this, the more I think there should be a better solution.
The PR solves the issue by removing the pre-fetching of spilled data.
IMO we should focus on finding the reason why the pre-fetching gets the wrong EOF (and drops the reader) and fix it.
#20027 (comment)
There was a problem hiding this comment.
Now,
read_spill_as_stream()is exactly the same asread_spill_as_stream_unbuffered()below. Maybe the buffering impl should have been preserved but the caller with the issue should have been changed to useread_spill_as_stream_unbuffered()?!
yes indeed but i wrote this before #20159 was merged i was thinking about doing an unbuffered stream but i thought no other functions use it so why make another one
There was a problem hiding this comment.
The more I dig into this, the more I think there should be a better solution. The PR solves the issue by removing the pre-fetching of spilled data. IMO we should focus on finding the reason why the pre-fetching gets the wrong EOF (and drops the reader) and fix it. #20027 (comment)
Yes, I thought of that. I think a better approach is to make the buffered stream somehow aware of the synchronization between the reader and the writer. Maybe spawn_buffered should know the writer's status? That was what came to mind at the time, but I am not yet sure how to implement this.
Which issue does this PR close?
SpillPool#20027Rationale for this change
The SpillPool test spill::spill_pool::channel was failing indeterministically due to a race condition between SpillFile's coordination logic and the spawn_buffered background task used in the stream reader.
What changes are included in this PR?
read_spill_as_stream now returns a normal stream not a buffered stream
Are these changes tested?
No, the fix is for an existing test mentioned in the issue. I wasn't able to find the bug initially, but I found it by stress-testing the original test 100 times in parallel.
Are there any user-facing changes?
no
i still think we should add a buffer stream that is aware of the coordination of the writer and reader but i can't warp my head around it yet.