Skip to content
22 changes: 22 additions & 0 deletions packages/livekit-rtc/src/room.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,28 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
});

// Close all in-progress stream controllers to prevent FD leaks.
// Streams that were receiving data but never got a trailer (e.g. the sender
// disconnected mid-transfer) would otherwise keep their ReadableStream open
// indefinitely, leaking the underlying controller and any buffered chunks.
for (const [, streamController] of this.byteStreamControllers) {
try {
streamController.controller.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more appropriate to trigger call an controller.error(new Error("Disconnected while receiving")

} catch {
// controller may already be closed
}
}
this.byteStreamControllers.clear();

for (const [, streamController] of this.textStreamControllers) {
try {
streamController.controller.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

} catch {
// controller may already be closed
}
}
this.textStreamControllers.clear();

FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
this.removeAllListeners();
}
Expand Down
40 changes: 40 additions & 0 deletions packages/livekit-rtc/src/tests/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,4 +514,44 @@ describeE2E('livekit-rtc e2e', () => {
},
testTimeoutMs * 2,
);

it(
'cleans up stream controllers when disconnecting during an active stream',
async () => {
const { rooms } = await connectTestRooms(2);
const [receivingRoom, sendingRoom] = rooms;
const topic = 'cleanup-stream-topic';

// Register a handler on the receiving side that will intentionally
// NOT fully consume the stream — simulating an abandoned transfer.
let readerReceived = false;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
receivingRoom!.registerTextStreamHandler(topic, async (_reader, _sender) => {
readerReceived = true;
// Deliberately do not call reader.readAll() so the stream stays open
});

// Start sending a text stream but don't close it
const writer = await sendingRoom!.localParticipant!.streamText({ topic });
await writer.write('partial data');

// Wait for the receiving side to get the stream header
await waitFor(() => readerReceived, {
timeoutMs: 5000,
debugName: 'text stream header received',
});

// Disconnect the receiving room while the stream is still open.
// This should close the stream controller without throwing.
await receivingRoom!.disconnect();

// Also close the writer and disconnect the sender
await writer.close();
await sendingRoom!.disconnect();

// If we got here without hanging or throwing, the stream controller
// was properly cleaned up on disconnect.
},
testTimeoutMs,
);
});
Loading