Skip to content

Commit cc8071c

Browse files
committed
stream: observe abort while awaiting pipeTo source
Use the abort-aware iterator wrapper in the no-transform pipeTo() path so a pending source read does not block AbortSignal handling. Fixes: #64014 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent da51692 commit cc8071c

2 files changed

Lines changed: 28 additions & 1 deletion

File tree

lib/internal/streams/iter/pull.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,7 @@ async function pipeTo(source, ...args) {
10901090
} else if (transforms.length === 0) {
10911091
// Fast path: no transforms - iterate normalized source directly
10921092
if (signal) {
1093-
for await (const batch of normalized) {
1093+
for await (const batch of yieldAbortable(normalized, signal)) {
10941094
signal.throwIfAborted();
10951095
const p = writeBatch(batch);
10961096
if (p) await p;

test/parallel/test-stream-iter-pipeto-signal.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
const common = require('../common');
88
const assert = require('assert');
9+
const { setTimeout } = require('timers/promises');
910
const { pipeTo, from } = require('stream/iter');
1011

1112
// pipeTo with live signal, no transforms — abort mid-stream
@@ -30,6 +31,31 @@ async function testPipeToLiveSignalNoTransforms() {
3031
assert.ok(written.length >= 1);
3132
}
3233

34+
// pipeTo with live signal, no transforms — abort while waiting for next chunk
35+
async function testPipeToLiveSignalNoTransformsPendingNext() {
36+
const ac = new AbortController();
37+
const reason = new Error('abort reason');
38+
const writer = {
39+
write: common.mustNotCall(),
40+
};
41+
42+
async function* source() {
43+
await new Promise(() => {});
44+
}
45+
46+
setTimeout(10).then(common.mustCall(() => ac.abort(reason)));
47+
48+
const result = await Promise.race([
49+
assert.rejects(
50+
() => pipeTo(source(), writer, { signal: ac.signal }),
51+
reason,
52+
).then(() => 'aborted'),
53+
setTimeout(1000, 'timed out'),
54+
]);
55+
56+
assert.strictEqual(result, 'aborted');
57+
}
58+
3359
// pipeTo with live signal + transforms — abort mid-stream
3460
async function testPipeToLiveSignalWithTransforms() {
3561
const ac = new AbortController();
@@ -84,6 +110,7 @@ async function testPipeToLiveSignalWithTransformsCompletes() {
84110

85111
Promise.all([
86112
testPipeToLiveSignalNoTransforms(),
113+
testPipeToLiveSignalNoTransformsPendingNext(),
87114
testPipeToLiveSignalWithTransforms(),
88115
testPipeToLiveSignalCompletes(),
89116
testPipeToLiveSignalWithTransformsCompletes(),

0 commit comments

Comments
 (0)