Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/long-keys-roll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Cancel losing timeout in AudioMixer race to prevent orphaned timers
102 changes: 102 additions & 0 deletions packages/livekit-rtc/src/audio_mixer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,106 @@ describe('AudioMixer', () => {
// Should get at least 2 frames (stream exhausts after 2)
expect(frames.length).toBeGreaterThanOrEqual(2);
});

it('completes mixing without lingering timers when iterator is fast', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
// Long timeout so the iterator always wins the race.
// Before the fix, each iteration leaked a 5s timer; with the fix,
// cancel() clears it immediately so the mixer shuts down without delay.
streamTimeoutMs: 5000,
});

const stream = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 42);
mixer.addStream(stream);

const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 2) break;
}

await mixer.aclose();

expect(frames.length).toBe(2);
// Verify the frames contain the expected mixed value
for (const frame of frames) {
expect(frame.data[0]).toBe(42);
}
});

it('produces frames even with many race iterations', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
streamTimeoutMs: 5000,
});

// Use more frames to stress multiple race iterations
const stream = createMockAudioStream(6, sampleRate, numChannels, samplesPerChannel, 10);
mixer.addStream(stream);

const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 4) break;
}

await mixer.aclose();

expect(frames.length).toBe(4);
// All frames should contain the expected value
for (const frame of frames) {
expect(frame.data[0]).toBe(10);
}
});

it('handles slow streams via timeout path', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
// Very short timeout to trigger the timeout path
streamTimeoutMs: 1,
});

// Create a stream that is slower than the timeout
async function* slowStream(): AsyncGenerator<AudioFrame> {
await new Promise((resolve) => setTimeout(resolve, 200));
const data = new Int16Array(numChannels * samplesPerChannel).fill(500);
yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel);
}

// Suppress the expected console.warn from the timeout path
const originalWarn = console.warn;
const warnings: string[] = [];
console.warn = (...args: unknown[]) => {
warnings.push(args.map(String).join(' '));
};

try {
mixer.addStream(slowStream());

// The mixer should produce a frame (zero-padded due to timeout)
// and auto-close when the stream exhausts.
const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 1) break;
}

await mixer.aclose();

// The timeout warning should have been logged
expect(warnings.some((w) => w.includes('stream timeout after'))).toBe(true);
} finally {
console.warn = originalWarn;
}
});
});
33 changes: 27 additions & 6 deletions packages/livekit-rtc/src/audio_mixer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,20 +310,26 @@ export class AudioMixer {

// Accumulate data until we have at least chunkSize samples
while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) {
const { result, clearTimeout: cancel } = this.timeoutRace(
iterator.next(),
this.streamTimeoutMs,
);
try {
const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]);
const value = await result;
cancel();

if (result === 'timeout') {
if (value === 'timeout') {
console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`);
break;
}
const iterResult = value;

if (result.done) {
if (iterResult.done) {
exhausted = true;
break;
}

const frame = result.value;
const frame = iterResult.value;
const newData = frame.data;

// Mark that we received data in this call
Expand All @@ -339,6 +345,9 @@ export class AudioMixer {
buf = combined;
}
} catch (error) {
// Clear the timeout on the error path too, so it doesn't linger
// when iterator.next() rejects.
cancel();
console.error(`AudioMixer: Error reading from stream:`, error);
exhausted = true;
break;
Expand Down Expand Up @@ -412,7 +421,19 @@ export class AudioMixer {
return new Promise((resolve) => setTimeout(resolve, ms));
}

private timeout(ms: number): Promise<'timeout'> {
return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms));
/** Race a promise against a timeout, returning a handle to clear the timer
* so the losing setTimeout doesn't linger after the winner resolves. */
private timeoutRace<T>(
promise: Promise<T>,
ms: number,
): { result: Promise<T | 'timeout'>; clearTimeout: () => void } {
let timer: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<'timeout'>((resolve) => {
timer = setTimeout(() => resolve('timeout'), ms);
});
return {
result: Promise.race([promise, timeoutPromise]),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be a bit leaner if we simply did

return Promise.race([promise.finally(clearTimeout), timeoutPromise])

and all callees of this util would get the cleanup out of the box

clearTimeout: () => clearTimeout(timer),
};
}
}
Loading