From cee20670d40bdf58475462ec811311d5a89e7aa8 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Fri, 22 May 2026 23:04:04 -0700 Subject: [PATCH] stream: wait for push writer end fallback to drain When endSync() returns -1 after buffered writes, a follow-up end() should stay pending until the readable side drains the queued data. Do not make duplex channel close() wait for that drain, since close() only needs to signal EOF to the peer. Waiting there can deadlock when the peer starts reading only after close() resolves. Fixes: https://github.com/nodejs/node/issues/63502 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/duplex.js | 8 ++------ lib/internal/streams/iter/push.js | 9 ++++++++- test/parallel/test-stream-iter-push-writer.js | 20 +++++++++++++++++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/iter/duplex.js b/lib/internal/streams/iter/duplex.js index 591837f70eb4cb..bd06f37303cfc6 100644 --- a/lib/internal/streams/iter/duplex.js +++ b/lib/internal/streams/iter/duplex.js @@ -74,9 +74,7 @@ function duplex(options = { __proto__: null }) { if (aClosed) return; aClosed = true; // End the writer (signals end-of-stream to B's readable) - if (aWriter.endSync() < 0) { - await aWriter.end(); - } + aWriter.endSync(); // Stop iteration of this channel's readable if (aReadableIterator?.return) { await aReadableIterator.return(); @@ -104,9 +102,7 @@ function duplex(options = { __proto__: null }) { async close() { if (bClosed) return; bClosed = true; - if (bWriter.endSync() < 0) { - await bWriter.end(); - } + bWriter.endSync(); if (bReadableIterator?.return) { await bReadableIterator.return(); bReadableIterator = null; diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index 1c367ff02bae71..e531e65428ef6d 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -274,7 +274,10 @@ class PushQueue { if (this.#writerState === 'errored') { return -2; // Signal to reject with stored error } - if (this.#writerState === 'closing' || this.#writerState === 'closed') { + if (this.#writerState === 'closing') { + return -3; // Signal to PushWriter: wait for drain to complete + } + if (this.#writerState === 'closed') { return this.#bytesWritten; // Idempotent } @@ -636,6 +639,10 @@ class PushWriter { if (result === -3) { // Closing: buffer has data, create deferred promise that resolves // when consumer drains past the end sentinel + const pendingEndPromise = this.#queue.pendingEndPromise; + if (pendingEndPromise !== null) { + return pendingEndPromise; + } const { promise, resolve, reject } = PromiseWithResolvers(); this.#queue.setPendingEnd({ __proto__: null, promise, resolve, reject }); return promise; diff --git a/test/parallel/test-stream-iter-push-writer.js b/test/parallel/test-stream-iter-push-writer.js index e7e783d7b74a9c..8f5ba28a9d1fb7 100644 --- a/test/parallel/test-stream-iter-push-writer.js +++ b/test/parallel/test-stream-iter-push-writer.js @@ -232,6 +232,25 @@ async function testEndAsyncReturnValue() { await consume; } +async function testEndAfterEndSyncWaitsForDrain() { + const { writer, readable } = push(); + writer.writeSync('hello'); + assert.strictEqual(writer.endSync(), -1); + + let ended = false; + const end = writer.end().then((n) => { + ended = true; + return n; + }); + + await Promise.resolve(); + assert.strictEqual(ended, false); + + // eslint-disable-next-line no-unused-vars + for await (const _ of readable) { /* drain */ } + assert.strictEqual(await end, 5); +} + async function testWriteUint8Array() { const { writer, readable } = push(); writer.write(new Uint8Array([72, 73])); // 'HI' @@ -413,6 +432,7 @@ Promise.all([ testOndrainProtocolErrorPropagates(), testFail(), testEndAsyncReturnValue(), + testEndAfterEndSyncWaitsForDrain(), testWriteUint8Array(), testOndrainWaitsForDrain(), testConsumerThrowRejectsWrites(),