diff --git a/lib/internal/streams/iter/duplex.js b/lib/internal/streams/iter/duplex.js index 591837f70eb4cb..bd06f37303cfc6 100644 --- a/lib/internal/streams/iter/duplex.js +++ b/lib/internal/streams/iter/duplex.js @@ -74,9 +74,7 @@ function duplex(options = { __proto__: null }) { if (aClosed) return; aClosed = true; // End the writer (signals end-of-stream to B's readable) - if (aWriter.endSync() < 0) { - await aWriter.end(); - } + aWriter.endSync(); // Stop iteration of this channel's readable if (aReadableIterator?.return) { await aReadableIterator.return(); @@ -104,9 +102,7 @@ function duplex(options = { __proto__: null }) { async close() { if (bClosed) return; bClosed = true; - if (bWriter.endSync() < 0) { - await bWriter.end(); - } + bWriter.endSync(); if (bReadableIterator?.return) { await bReadableIterator.return(); bReadableIterator = null; diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index 1c367ff02bae71..e531e65428ef6d 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -274,7 +274,10 @@ class PushQueue { if (this.#writerState === 'errored') { return -2; // Signal to reject with stored error } - if (this.#writerState === 'closing' || this.#writerState === 'closed') { + if (this.#writerState === 'closing') { + return -3; // Signal to PushWriter: wait for drain to complete + } + if (this.#writerState === 'closed') { return this.#bytesWritten; // Idempotent } @@ -636,6 +639,10 @@ class PushWriter { if (result === -3) { // Closing: buffer has data, create deferred promise that resolves // when consumer drains past the end sentinel + const pendingEndPromise = this.#queue.pendingEndPromise; + if (pendingEndPromise !== null) { + return pendingEndPromise; + } const { promise, resolve, reject } = PromiseWithResolvers(); this.#queue.setPendingEnd({ __proto__: null, promise, resolve, reject }); return promise; diff --git a/test/parallel/test-stream-iter-push-writer.js b/test/parallel/test-stream-iter-push-writer.js index e7e783d7b74a9c..8f5ba28a9d1fb7 100644 --- a/test/parallel/test-stream-iter-push-writer.js +++ b/test/parallel/test-stream-iter-push-writer.js @@ -232,6 +232,25 @@ async function testEndAsyncReturnValue() { await consume; } +async function testEndAfterEndSyncWaitsForDrain() { + const { writer, readable } = push(); + writer.writeSync('hello'); + assert.strictEqual(writer.endSync(), -1); + + let ended = false; + const end = writer.end().then((n) => { + ended = true; + return n; + }); + + await Promise.resolve(); + assert.strictEqual(ended, false); + + // eslint-disable-next-line no-unused-vars + for await (const _ of readable) { /* drain */ } + assert.strictEqual(await end, 5); +} + async function testWriteUint8Array() { const { writer, readable } = push(); writer.write(new Uint8Array([72, 73])); // 'HI' @@ -413,6 +432,7 @@ Promise.all([ testOndrainProtocolErrorPropagates(), testFail(), testEndAsyncReturnValue(), + testEndAfterEndSyncWaitsForDrain(), testWriteUint8Array(), testOndrainWaitsForDrain(), testConsumerThrowRejectsWrites(),