diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 43cfcded03def4..69a2cc65098089 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1532,10 +1532,12 @@ function readableStreamPipeTo( } async function step() { - if (shuttingDown) - return true; + if (shuttingDown) return true; - await writer[kState].ready.promise; + if (dest[kState].backpressure) { + await writer[kState].ready.promise; + if (shuttingDown) return true; + } const controller = source[kState].controller; @@ -1549,7 +1551,6 @@ function readableStreamPipeTo( while (controller[kState].queue.length > 0) { if (shuttingDown) return true; - source[kState].disturbed = true; const chunk = dequeueValue(controller); if (controller[kState].closeRequested && !controller[kState].queue.length) { @@ -1563,20 +1564,17 @@ function readableStreamPipeTo( setPromiseHandled(state.currentWrite); // Check backpressure after each write - if (dest[kState].state === 'writable') { - const desiredSize = writer.desiredSize; - if (desiredSize !== null && desiredSize <= 0) { - // Backpressure - stop batch and wait for ready - break; - } + if (dest[kState].backpressure) { + // Backpressure - stop batch and wait for ready + break; + } else if (dest[kState].state !== 'writable' || writableStreamCloseQueuedOrInFlight(dest)) { + // Closing or erroring - stop batch and wait for shutdown + break; } } // Trigger pull if needed after batch - if (source[kState].state === 'readable' && - !controller[kState].closeRequested) { - readableStreamDefaultControllerCallPullIfNeeded(controller); - } + readableStreamDefaultControllerCallPullIfNeeded(controller); // Check if stream closed during batch if (source[kState].state === 'closed') {