diff --git a/packages/kernel-browser-runtime/src/utils/comms-query-string.test.ts b/packages/kernel-browser-runtime/src/utils/comms-query-string.test.ts index 184a4943a..f968b22d8 100644 --- a/packages/kernel-browser-runtime/src/utils/comms-query-string.test.ts +++ b/packages/kernel-browser-runtime/src/utils/comms-query-string.test.ts @@ -68,6 +68,7 @@ describe('comms-query-string', () => { allowedWsHosts: ['relay.example.com'], maxRetryAttempts: 5, maxQueue: 200, + streamInactivityTimeoutMs: 60000, }; const params = createCommsQueryString(options); expect(parseCommsQueryString(`?${params.toString()}`)).toStrictEqual( diff --git a/packages/kernel-browser-runtime/src/utils/comms-query-string.ts b/packages/kernel-browser-runtime/src/utils/comms-query-string.ts index 48b947c87..8765ad086 100644 --- a/packages/kernel-browser-runtime/src/utils/comms-query-string.ts +++ b/packages/kernel-browser-runtime/src/utils/comms-query-string.ts @@ -57,6 +57,7 @@ const NUMBER_PARAM_NAMES = [ 'handshakeTimeoutMs', 'writeTimeoutMs', 'ackTimeoutMs', + 'streamInactivityTimeoutMs', ] as const satisfies readonly NumberParamKey[]; const NonNegativeInteger = min(integer(), 0); diff --git a/packages/kernel-cli/src/commands/relay.test.ts b/packages/kernel-cli/src/commands/relay.test.ts index b92459ba0..0f7139da2 100644 --- a/packages/kernel-cli/src/commands/relay.test.ts +++ b/packages/kernel-cli/src/commands/relay.test.ts @@ -27,7 +27,10 @@ vi.mock('../utils.ts', () => ({ waitFor: vi.fn(), })); -const mockLogger = { info: vi.fn(), error: vi.fn() } as unknown as Logger; +const mockLogger: Logger = { + info: vi.fn(), + error: vi.fn(), +} as unknown as Logger; const makeLibp2pMock = ( addrs: string[] = ['/ip4/127.0.0.1/tcp/9001/ws/p2p/QmFoo'], diff --git a/packages/kernel-node-runtime/test/e2e/libp2p-v3-features.test.ts b/packages/kernel-node-runtime/test/e2e/libp2p-v3-features.test.ts new file mode 100644 index 000000000..12a86e08d --- /dev/null +++ b/packages/kernel-node-runtime/test/e2e/libp2p-v3-features.test.ts @@ -0,0 +1,291 @@ +import type { Libp2p } from '@libp2p/interface'; +import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; +import { startRelay } from '@metamask/kernel-utils/libp2p'; +import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel'; +import { delay } from '@ocap/repo-tools/test-utils'; +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { + describe, + it, + expect, + beforeAll, + afterAll, + beforeEach, + afterEach, +} from 'vitest'; + +import { makeTestKernel } from '../helpers/kernel.ts'; +import { + makeRemoteVatConfig, + restartKernelAndReloadVat, + sendRemoteMessage, + setupAliceAndBob, +} from '../helpers/remote-comms.ts'; +import { stopWithTimeout } from '../helpers/stop-with-timeout.ts'; + +const NETWORK_TIMEOUT = 30_000; +const relayPeerId = '12D3KooWJBDqsyHQF2MWiCdU4kdqx4zTsSTLRdShg7Ui6CRWB4uc'; +const testRelays = [`/ip4/127.0.0.1/tcp/9001/ws/p2p/${relayPeerId}`]; + +const testBackoffOptions = { + reconnectionBaseDelayMs: 10, + reconnectionMaxDelayMs: 50, + handshakeTimeoutMs: 3_000, + writeTimeoutMs: 3_000, + ackTimeoutMs: 2_000, +}; + +describe.sequential('libp2p v3 Features E2E', () => { + let relay: Libp2p; + let kernel1: Kernel; + let kernel2: Kernel; + let dbFilename1: string; + let dbFilename2: string; + let tempDir: string; + let kernelStore1: ReturnType; + let kernelStore2: ReturnType; + + beforeAll(async () => { + relay = await startRelay(console); + }); + + afterAll(async () => { + if (relay) { + await relay.stop(); + } + }); + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ocap-v3-')); + dbFilename1 = join(tempDir, 'kernel1.db'); + dbFilename2 = join(tempDir, 'kernel2.db'); + + const kernelDatabase1 = await makeSQLKernelDatabase({ + dbFilename: dbFilename1, + }); + kernelStore1 = makeKernelStore(kernelDatabase1); + + const kernelDatabase2 = await makeSQLKernelDatabase({ + dbFilename: dbFilename2, + }); + kernelStore2 = makeKernelStore(kernelDatabase2); + + kernel1 = await makeTestKernel(kernelDatabase1); + kernel2 = await makeTestKernel(kernelDatabase2); + }); + + afterEach(async () => { + const STOP_TIMEOUT = 3000; + await Promise.all([ + kernel1 && + stopWithTimeout( + async () => kernel1.stop(), + STOP_TIMEOUT, + 'kernel1.stop', + ), + kernel2 && + stopWithTimeout( + async () => kernel2.stop(), + STOP_TIMEOUT, + 'kernel2.stop', + ), + ]); + if (tempDir) { + await rm(tempDir, { recursive: true, force: true }); + } + }); + + describe('peer:disconnect Reconnection', () => { + it( + 'recovers queued message after peer:disconnect triggers reconnection', + async () => { + const { aliceRef, bobURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + testBackoffOptions, + ); + + // Establish initial communication + const initial = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(initial).toContain('vat Bob got "hello" from Alice'); + + // Stop kernel2 — triggers both readChannel error and peer:disconnect. + // The peer:disconnect event acts as a safety net ensuring reconnection + // is attempted even after readChannel clears the channel. + await kernel2.stop(); + + // Queue a message while kernel2 is down — this triggers reconnection + const recoveryPromise = kernel1.queueMessage( + aliceRef, + 'sendRemoteMessage', + [bobURL, 'hello', ['Alice']], + ); + + // Restart kernel2 — reconnection loop delivers the queued message + const bobConfig = makeRemoteVatConfig('Bob'); + const restartResult = await restartKernelAndReloadVat( + dbFilename2, + false, + testRelays, + bobConfig, + testBackoffOptions, + ); + // eslint-disable-next-line require-atomic-updates + kernel2 = restartResult.kernel; + + const result = kunser(await recoveryPromise) as string; + expect(result).toContain('vat Bob got "hello" from Alice'); + + // Verify ongoing connectivity after peer:disconnect recovery + const followUp = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(followUp).toContain('vat Bob got "hello" from Alice'); + }, + NETWORK_TIMEOUT * 2, + ); + }); + + describe('Stream Inactivity Timeout', () => { + it( + 'recovers communication after idle period exceeds inactivity timeout', + async () => { + // Use a short inactivity timeout to test the auto-abort behavior. + // Must be >= MIN_STREAM_INACTIVITY_TIMEOUT_MS (5 s) since the + // transport clamps lower values. + const shortTimeoutOptions = { + ...testBackoffOptions, + streamInactivityTimeoutMs: 6_000, + }; + + const { aliceRef, bobURL } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + shortTimeoutOptions, + ); + + // Establish initial communication + const initial = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(initial).toContain('vat Bob got "hello" from Alice'); + + // Wait longer than the inactivity timeout (6s + buffer). + // The stream should auto-abort due to inactivityTimeout, + // triggering connection loss handling and reconnection. + await delay(8_000); + + // Send another message — the transport layer should + // reconnect since the previous stream was aborted by inactivity. + const afterIdle = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(afterIdle).toContain('vat Bob got "hello" from Alice'); + }, + NETWORK_TIMEOUT * 2, + ); + }); + + describe('Fast Failure on Closed Streams', () => { + it( + 'handles rapid sends during disconnect without hanging', + async () => { + const { aliceRef, bobURL, peerId2 } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + testBackoffOptions, + ); + + // Establish initial connectivity + await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); + + // Intentionally close the connection — writes should fail fast + // via stream.status check rather than waiting for timeout + await kernel1.closeConnection(peerId2); + + const start = Date.now(); + // This should fail quickly (stream.status !== 'open') rather than + // waiting for the full write timeout (3000ms) + await expect( + kernel1.queueMessage(aliceRef, 'sendRemoteMessage', [ + bobURL, + 'hello', + ['Alice'], + ]), + ).rejects.toMatchObject({ + body: expect.stringContaining( + 'Message delivery failed after intentional close', + ), + }); + const elapsed = Date.now() - start; + + // Should fail well under the write timeout + expect(elapsed).toBeLessThan(testBackoffOptions.writeTimeoutMs); + }, + NETWORK_TIMEOUT, + ); + }); + + describe('Connection Type Awareness', () => { + it( + 'establishes relayed connections and communicates successfully', + async () => { + // Both kernels connect via relay — the connection.direct property + // should be false (relayed), and the log should include "relayed". + // This validates that the connection type detection works with real + // libp2p connections. + const { aliceRef, bobURL, peerId1, peerId2 } = await setupAliceAndBob( + kernel1, + kernel2, + kernelStore1, + kernelStore2, + testRelays, + testBackoffOptions, + ); + + // Verify distinct peer IDs (confirms real libp2p nodes) + expect(peerId1).not.toBe(peerId2); + + // Bidirectional communication through relay + const response = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(response).toContain('vat Bob got "hello" from Alice'); + }, + NETWORK_TIMEOUT, + ); + }); +}); diff --git a/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts b/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts index 2139def9a..503bfa043 100644 --- a/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts +++ b/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts @@ -651,24 +651,32 @@ describe.sequential('Remote Communications E2E', () => { }); let kernel3: Kernel | undefined; - // Use a longer ACK timeout for the initiating kernel — it must redeem - // URLs on two peers that restart sequentially through the relay, so the - // default 2s × (3+1) = 8s redemption window is too tight for CI. - const multiPeerBackoff = { ...testBackoffOptions, ackTimeoutMs: 5_000 }; + // The ACK give-up fires after ackTimeoutMs × (MAX_RETRIES + 1). + // kernel2 and kernel3 restart in parallel below, but each restart + // (new kernel + libp2p connect + vat launch) still takes several + // seconds. A 5 s ACK timeout gives a 20 s redemption window. + // maxConnectionAttemptsPerMinute is raised because the fast test + // backoff (10-50 ms) exhausts the default 10/min rate limit before + // the peers finish restarting, blocking reconnection for ~60 s. + const multiPeerOptions = { + ...testBackoffOptions, + ackTimeoutMs: 5_000, + maxConnectionAttemptsPerMinute: 500, + }; try { await kernel1.initRemoteComms({ relays: testRelays, - ...multiPeerBackoff, + ...multiPeerOptions, }); await kernel2.initRemoteComms({ relays: testRelays, - ...testBackoffOptions, + ...multiPeerOptions, }); kernel3 = await makeTestKernel(kernelDatabase3); await kernel3.initRemoteComms({ relays: testRelays, - ...testBackoffOptions, + ...multiPeerOptions, }); const aliceConfig = makeRemoteVatConfig('Alice'); @@ -706,27 +714,30 @@ describe.sequential('Remote Communications E2E', () => { [charlieURL, 'hello', ['Alice']], ); + // Restart both peers in parallel to fit within the 20 s redemption + // window. Sequential restarts doubled the wall-clock time. const bobConfigRestart = makeRemoteVatConfig('Bob'); const charlieConfigRestart = makeRemoteVatConfig('Charlie'); - const restartResult2 = await restartKernelAndReloadVat( - dbFilename2, - false, - testRelays, - bobConfigRestart, - testBackoffOptions, - ); - // eslint-disable-next-line require-atomic-updates - kernel2 = restartResult2.kernel; - - kernel3 = ( - await restartKernelAndReloadVat( + const [restartResult2, restartResult3] = await Promise.all([ + restartKernelAndReloadVat( + dbFilename2, + false, + testRelays, + bobConfigRestart, + multiPeerOptions, + ), + restartKernelAndReloadVat( dbFilename3, false, testRelays, charlieConfigRestart, - testBackoffOptions, - ) - ).kernel; + multiPeerOptions, + ), + ]); + // eslint-disable-next-line require-atomic-updates + kernel2 = restartResult2.kernel; + + kernel3 = restartResult3.kernel; // Both messages should be delivered successfully const bobResult = await bobMessagePromise; diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index ba6f8a9f6..d595ab188 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -722,6 +722,31 @@ describe('RemoteHandle', () => { expect(() => remote.rejectPendingRedemptions(errorMessage)).not.toThrow(); }); + it('giveUp rejects pending messages and redemptions', async () => { + const remote = makeRemote(); + const reason = 'transport gave up'; + + // Send a message to create pending state + const resolutions: VatOneResolution[] = [ + ['rp+3', false, { body: '"value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + + // Start a URL redemption + const redeemPromise = remote.redeemOcapURL('ocap:test@peer'); + + remote.giveUp(reason); + + // Pending redemption was rejected + await expect(redeemPromise).rejects.toThrow(reason); + + // Pending messages were discarded (startSeq advanced past all pending). + // deliverNotify used seq 1, redeemOcapURL used seq 2, so nextSendSeq = 2 + // and startSeq is set to nextSendSeq + 1 = 3. + const seqState = mockKernelStore.getRemoteSeqState(mockRemoteId); + expect(seqState?.startSeq).toBe(3); + }); + it('redeemOcapURL increments redemption counter for multiple redemptions', async () => { const remote = makeRemote(); const mockOcapURL1 = 'url1'; diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 344a43335..b69c1582b 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -376,14 +376,13 @@ export class RemoteHandle implements EndpointHandle { } if (this.#retryCount >= MAX_RETRIES) { - // Give up - reject all pending messages, URL redemptions, and notify RemoteManager + // Clean up locally first (unconditional), then notify RemoteManager + // to reject kernel promises. giveUp() must run even when #onGiveUp + // is unset, otherwise pending messages and redemptions leak. this.#logger.log( `${this.#peerId.slice(0, 8)}:: gave up after ${MAX_RETRIES} retries, rejecting ${this.#getPendingCount()} pending messages`, ); - this.#rejectAllPending(`not acknowledged after ${MAX_RETRIES} retries`); - this.rejectPendingRedemptions( - `Remote connection lost after ${MAX_RETRIES} failed retries`, - ); + this.giveUp(`not acknowledged after ${MAX_RETRIES} retries`); this.#onGiveUp?.(this.#peerId); return; } @@ -1064,6 +1063,19 @@ export class RemoteHandle implements EndpointHandle { this.#pendingRedemptions.clear(); } + /** + * Permanently give up on this remote. Stops retransmitting, rejects all + * pending messages and URL redemptions. Called by RemoteManager when the + * transport layer determines the peer is unreachable. + * + * @param reason - Human-readable reason for giving up. + */ + giveUp(reason: string): void { + this.#clearAckTimeout(); + this.#rejectAllPending(reason); + this.rejectPendingRedemptions(reason); + } + /** * Clean up resources held by this RemoteHandle. * Clears all timers and rejects pending promises to prevent resource leaks diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index 734804af0..33fdf4471 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -750,16 +750,13 @@ describe('RemoteManager', () => { it('handles remote give up callback when remote exists', () => { const peerId = 'peer-to-give-up'; const remote = remoteManager.establishRemote(peerId); - const rejectPendingRedemptionsSpy = vi.spyOn( - remote, - 'rejectPendingRedemptions', - ); + const giveUpSpy = vi.spyOn(remote, 'giveUp'); // Get the callback that was passed to initRemoteComms const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; const onRemoteGiveUp = initCall?.[6] as (peerId: string) => void; onRemoteGiveUp(peerId); - // Verify pending redemptions were rejected - expect(rejectPendingRedemptionsSpy).toHaveBeenCalledWith( + // Verify giveUp was called to stop retransmitting and reject pending work + expect(giveUpSpy).toHaveBeenCalledWith( `Remote connection lost: ${peerId} (max retries reached or non-retryable error)`, ); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index b23fd5693..35dc408db 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -174,17 +174,14 @@ export class RemoteManager { } const { remoteId } = remote; - const failure = kser( - Error( - `Remote connection lost: ${peerId} (max retries reached or non-retryable error)`, - ), - ); - - // Reject pending URL redemptions in the RemoteHandle - // These are JavaScript promises that will propagate rejection to kernel promises - remote.rejectPendingRedemptions( - `Remote connection lost: ${peerId} (max retries reached or non-retryable error)`, - ); + const reason = `Remote connection lost: ${peerId} (max retries reached or non-retryable error)`; + const failure = kser(Error(reason)); + + // Stop retransmitting and reject pending messages + URL redemptions. + // Called from both ACK-timeout and transport give-up paths. The ACK + // path calls giveUp() before invoking this callback, but giveUp() is + // idempotent so the repeat is harmless and keeps the transport path correct. + remote.giveUp(reason); // Reject all promises for which this remote is the decider for (const kpid of this.#kernelStore.getPromisesByDecider(remoteId)) { diff --git a/packages/ocap-kernel/src/remotes/platform/channel-utils.test.ts b/packages/ocap-kernel/src/remotes/platform/channel-utils.test.ts index a41c13f43..d5b214354 100644 --- a/packages/ocap-kernel/src/remotes/platform/channel-utils.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/channel-utils.test.ts @@ -81,6 +81,13 @@ describe('channel-utils', () => { mockChannel = { peerId: 'testPeer', + stream: { + status: 'open', + inactivityTimeout: 0, + addEventListener: vi.fn(), + close: vi.fn(), + abort: vi.fn(), + }, msgStream: { write: vi.fn().mockReturnValue(writePromise), read: vi.fn(), @@ -137,6 +144,20 @@ describe('channel-utils', () => { await expect(writePromise).rejects.toThrow('Write failed'); }); + it.each(['closed', 'closing', 'aborted', 'reset'])( + 'throws immediately when stream status is %s', + async (status) => { + const message = new Uint8Array([1, 2, 3]); + (mockChannel.stream as unknown as { status: string }).status = status; + + await expect( + writeWithTimeout(mockChannel, message, 1000), + ).rejects.toThrow(`Stream is ${status}, cannot write`); + + expect(mockChannel.msgStream.write).not.toHaveBeenCalled(); + }, + ); + it('cleans up timeout listener after successful write', async () => { const message = new Uint8Array([1, 2, 3]); diff --git a/packages/ocap-kernel/src/remotes/platform/channel-utils.ts b/packages/ocap-kernel/src/remotes/platform/channel-utils.ts index 34e3c2e09..355ac2bfe 100644 --- a/packages/ocap-kernel/src/remotes/platform/channel-utils.ts +++ b/packages/ocap-kernel/src/remotes/platform/channel-utils.ts @@ -43,6 +43,12 @@ export async function writeWithTimeout( message: Uint8Array, timeoutMs = DEFAULT_WRITE_TIMEOUT_MS, ): Promise { + // Short-circuit if the underlying stream is already closed/aborted/reset + const { status } = channel.stream; + if (status !== 'open') { + throw Error(`Stream is ${status}, cannot write`); + } + const timeoutSignal = AbortSignal.timeout(timeoutMs); let abortHandler: (() => void) | undefined; const timeoutPromise = new Promise((_resolve, reject) => { diff --git a/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts b/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts index 029352c04..2c86b8af1 100644 --- a/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts @@ -7,12 +7,14 @@ import type { Channel } from '../types.ts'; // Mock heavy/libp2p related deps with minimal shims we can assert against. // Track state shared between mocks and tests +type MockConnection = { + remotePeer: { toString: () => string }; + direct: boolean; +}; + const libp2pState: { handler?: - | (( - stream: object, - connection: { remotePeer: { toString: () => string } }, - ) => void | Promise) + | ((stream: object, connection: MockConnection) => void | Promise) | undefined; dials: { addr: string; @@ -306,7 +308,7 @@ describe('ConnectionFactory', () => { _protocol: string, handler?: ( stream: object, - connection: { remotePeer: { toString: () => string } }, + connection: MockConnection, ) => void | Promise, ) => { libp2pState.handler = handler; @@ -569,6 +571,77 @@ describe('ConnectionFactory', () => { expect.stringContaining('Peer update:'), ); }); + + it('registers peer:disconnect event listener', async () => { + const mockAddEventListener = vi.fn(); + createLibp2p.mockImplementation(async () => ({ + start: vi.fn(), + stop: vi.fn(), + peerId: { toString: () => 'test-peer-id' }, + addEventListener: mockAddEventListener, + dialProtocol: vi.fn(), + handle: vi.fn(), + })); + + factory = await createFactory(); + + expect(mockAddEventListener).toHaveBeenCalledWith( + 'peer:disconnect', + expect.any(Function), + ); + }); + + it('calls disconnect handler on peer:disconnect event', async () => { + factory = await createFactory(); + + const disconnectHandler = vi.fn(); + factory.onPeerDisconnect(disconnectHandler); + + // Fire peer:disconnect event + for (const listener of libp2pState.eventListeners['peer:disconnect'] ?? + []) { + listener({ detail: { toString: () => 'disconnected-peer' } }); + } + + expect(disconnectHandler).toHaveBeenCalledWith('disconnected-peer'); + expect(mockLoggerLog).toHaveBeenCalledWith( + 'peer disconnected (all connections closed): disconnected-peer', + ); + }); + + it('does not call disconnect handler for relay peer IDs', async () => { + factory = await createFactory(); + + const disconnectHandler = vi.fn(); + factory.onPeerDisconnect(disconnectHandler); + + // Fire peer:disconnect for a relay peer (relay1 is in knownRelays) + for (const listener of libp2pState.eventListeners['peer:disconnect'] ?? + []) { + listener({ detail: { toString: () => 'relay1' } }); + } + + // Should log the disconnect but NOT call the handler + expect(mockLoggerLog).toHaveBeenCalledWith( + 'peer disconnected (all connections closed): relay1', + ); + expect(disconnectHandler).not.toHaveBeenCalled(); + }); + + it('logs connection type as direct or relayed for inbound', async () => { + factory = await createFactory(); + factory.onInboundConnection(vi.fn()); + + const inboundStream = {}; + await libp2pState.handler?.(inboundStream, { + remotePeer: { toString: () => 'direct-peer' }, + direct: true, + }); + + expect(mockLoggerLog).toHaveBeenCalledWith( + 'inbound direct connection from peerId:direct-peer', + ); + }); }); describe('onInboundConnection', () => { @@ -582,6 +655,7 @@ describe('ConnectionFactory', () => { const inboundStream = {}; await libp2pState.handler?.(inboundStream, { remotePeer: { toString: () => 'remote-peer' }, + direct: false, }); expect(handler).toHaveBeenCalledWith( @@ -602,12 +676,46 @@ describe('ConnectionFactory', () => { const inboundStream = {}; await libp2pState.handler?.(inboundStream, { remotePeer: { toString: () => 'inbound-peer' }, + direct: false, }); expect(capturedChannel).toBeDefined(); expect(capturedChannel?.msgStream).toBeDefined(); expect(capturedChannel?.peerId).toBe('inbound-peer'); }); + + it('awaits async inbound handler for auto-abort on rejection', async () => { + factory = await createFactory(); + + const handlerError = new Error('handler setup failed'); + factory.onInboundConnection(async () => { + throw handlerError; + }); + + const inboundStream = {}; + await expect( + libp2pState.handler?.(inboundStream, { + remotePeer: { toString: () => 'failing-peer' }, + direct: false, + }), + ).rejects.toThrow('handler setup failed'); + }); + + it('handles sync inbound handler without rejection', async () => { + factory = await createFactory(); + + const handler = vi.fn(); + factory.onInboundConnection(handler); + + const inboundStream = {}; + const result = await libp2pState.handler?.(inboundStream, { + remotePeer: { toString: () => 'sync-peer' }, + direct: false, + }); + + expect(result).toBeUndefined(); + expect(handler).toHaveBeenCalled(); + }); }); describe('candidateAddressStrings', () => { @@ -775,6 +883,30 @@ describe('ConnectionFactory', () => { expect(libp2pState.dials).toHaveLength(0); }); + it('handles TooManyOutboundProtocolStreamsError gracefully', async () => { + const { TooManyOutboundProtocolStreamsError: TooManyStreams } = + await import('@libp2p/interface'); + createLibp2p.mockImplementation(async () => ({ + start: vi.fn(), + stop: vi.fn(), + peerId: { toString: () => 'test-peer' }, + addEventListener: vi.fn(), + dialProtocol: vi.fn(async () => { + throw new TooManyStreams('Too many streams'); + }), + handle: vi.fn(), + })); + + factory = await createFactory(); + + await expect(factory.openChannelOnce('peer123')).rejects.toThrow( + TooManyStreams, + ); + expect(mockLoggerLog).toHaveBeenCalledWith( + expect.stringContaining('too many outbound streams via'), + ); + }); + it('handles timeout errors', async () => { const abortedSignal = { aborted: true, @@ -1406,14 +1538,13 @@ describe('ConnectionFactory', () => { }); describe('closeChannel', () => { - it('closes underlying stream when close is available', async () => { + it('closes stream gracefully', async () => { factory = await createFactory(); const close = vi.fn().mockResolvedValue(undefined); const channel = { peerId: 'peer-close', - msgStream: { - unwrap: () => ({ close }), - }, + stream: { close, abort: vi.fn() }, + msgStream: {}, } as unknown as Channel; await factory.closeChannel(channel, channel.peerId); expect(close).toHaveBeenCalled(); @@ -1422,33 +1553,40 @@ describe('ConnectionFactory', () => { ); }); - it('aborts underlying stream when abort is available', async () => { + it('aborts stream when graceful close fails', async () => { factory = await createFactory(); + const closeError = new Error('close failed'); + const close = vi.fn().mockRejectedValue(closeError); const abort = vi.fn(); const channel = { peerId: 'peer-abort', - msgStream: { - unwrap: () => ({ abort }), - }, + stream: { close, abort }, + msgStream: {}, } as unknown as Channel; await factory.closeChannel(channel, channel.peerId); - expect(abort).toHaveBeenCalledWith(expect.any(AbortError)); + // close() must be attempted before falling back to abort() + expect(close).toHaveBeenCalled(); + expect(abort).toHaveBeenCalledWith(closeError); expect(mockLoggerLog).toHaveBeenCalledWith( `${channel.peerId}:: aborted channel stream`, ); }); - it('logs when underlying stream lacks close and abort', async () => { + it('logs error when abort also throws', async () => { factory = await createFactory(); + const close = vi.fn().mockRejectedValue(new Error('close failed')); + const abort = vi.fn().mockImplementation(() => { + throw new Error('abort failed'); + }); const channel = { - peerId: 'peer-none', - msgStream: { - unwrap: () => ({}), - }, + peerId: 'peer-double-fail', + stream: { close, abort }, + msgStream: {}, } as unknown as Channel; await factory.closeChannel(channel, channel.peerId); + expect(abort).toHaveBeenCalled(); expect(mockLoggerLog).toHaveBeenCalledWith( - `${channel.peerId}:: channel stream lacks close/abort, relying on natural closure`, + expect.stringContaining('closing channel stream'), ); }); }); @@ -1845,6 +1983,7 @@ describe('ConnectionFactory', () => { const inboundStream = {}; await libp2pState.handler?.(inboundStream, { remotePeer: { toString: () => 'inbound-peer' }, + direct: false, }); // Make outbound connection diff --git a/packages/ocap-kernel/src/remotes/platform/connection-factory.ts b/packages/ocap-kernel/src/remotes/platform/connection-factory.ts index 0999ecb2e..40c6da5ed 100644 --- a/packages/ocap-kernel/src/remotes/platform/connection-factory.ts +++ b/packages/ocap-kernel/src/remotes/platform/connection-factory.ts @@ -4,7 +4,10 @@ import { bootstrap } from '@libp2p/bootstrap'; import { circuitRelayTransport } from '@libp2p/circuit-relay-v2'; import { generateKeyPairFromSeed } from '@libp2p/crypto/keys'; import { identify } from '@libp2p/identify'; -import { MuxerClosedError } from '@libp2p/interface'; +import { + MuxerClosedError, + TooManyOutboundProtocolStreamsError, +} from '@libp2p/interface'; import type { PrivateKey, Libp2p } from '@libp2p/interface'; import { ping } from '@libp2p/ping'; import { byteStream } from '@libp2p/utils'; @@ -34,6 +37,7 @@ import type { ConnectionFactoryOptions, DirectTransport, InboundConnectionHandler, + PeerDisconnectHandler, } from '../types.ts'; /** @@ -72,6 +76,8 @@ export class ConnectionFactory { #inboundHandler?: InboundConnectionHandler; + #disconnectHandler?: PeerDisconnectHandler; + /** * Constructor for the ConnectionFactory. * @@ -210,17 +216,21 @@ export class ConnectionFactory { }); // Set up inbound handler - await this.#libp2p.handle('whatever', (stream, connection) => { + await this.#libp2p.handle('whatever', async (stream, connection) => { const msgStream = byteStream(stream); const remotePeerId = connection.remotePeer.toString(); - this.#logger.log(`inbound connection from peerId:${remotePeerId}`); + const connType = connection.direct ? 'direct' : 'relayed'; + this.#logger.log( + `inbound ${connType} connection from peerId:${remotePeerId}`, + ); const channel: Channel = { msgStream, + stream, peerId: remotePeerId, }; - this.#inboundHandler?.(channel); + await this.#inboundHandler?.(channel); }); // Start libp2p @@ -243,6 +253,17 @@ export class ConnectionFactory { } }); + this.#libp2p.addEventListener('peer:disconnect', (evt) => { + const remotePeerId = evt.detail.toString(); + this.#logger.log( + `peer disconnected (all connections closed): ${remotePeerId}`, + ); + // Don't forward relay disconnects — handled by #scheduleRelayReconnect + if (!this.#relayPeerIds.has(remotePeerId)) { + this.#disconnectHandler?.(remotePeerId); + } + }); + await this.#libp2p.start(); } @@ -255,6 +276,16 @@ export class ConnectionFactory { this.#inboundHandler = handler; } + /** + * Set the handler for peer disconnect events. + * Fires when all connections to a peer are closed. + * + * @param handler - The handler for peer disconnects. + */ + onPeerDisconnect(handler: PeerDisconnectHandler): void { + this.#disconnectHandler = handler; + } + /** * Get the listen addresses of the libp2p node. * These are the multiaddr strings that other peers can use to dial this node. @@ -358,7 +389,7 @@ export class ConnectionFactory { `successfully connected to ${peerId} via ${addressString}`, ); const msgStream = byteStream(stream); - const channel: Channel = { msgStream, peerId }; + const channel: Channel = { msgStream, stream, peerId }; this.#logger.log(`opened channel to ${peerId}`); return channel; } catch (problem) { @@ -372,6 +403,14 @@ export class ConnectionFactory { `yamux muxer issue contacting via ${addressString}`, problem, ); + } else if (problem instanceof TooManyOutboundProtocolStreamsError) { + // Local stream limit hit — trying other addresses won't help + this.#outputError( + peerId, + `too many outbound streams via ${addressString}`, + problem, + ); + throw problem; } else if (signalTimeout.aborted) { this.#outputError(peerId, `timed out opening channel`, problem); } else { @@ -443,40 +482,24 @@ export class ConnectionFactory { * @param peerId - The peer ID for logging. */ async closeChannel(channel: Channel, peerId: string): Promise { - try { - // ByteStream.unwrap() returns the underlying libp2p stream. - const maybeWrapper = channel.msgStream as unknown as { - unwrap?: () => unknown; - }; - const underlying = - typeof maybeWrapper.unwrap === 'function' - ? maybeWrapper.unwrap() - : undefined; - - const closable = underlying as - | { close?: () => Promise } - | undefined; - if (closable?.close) { - await closable.close(); - this.#logger.log(`${peerId}:: closed channel stream`); - return; - } + const closeResult = await channel.stream.close().then( + () => 'closed' as const, + (error: unknown) => error, + ); - const abortable = underlying as - | { abort?: (error?: Error) => void } - | undefined; - if (abortable?.abort) { - abortable.abort(new AbortError()); - this.#logger.log(`${peerId}:: aborted channel stream`); - return; - } + if (closeResult === 'closed') { + this.#logger.log(`${peerId}:: closed channel stream`); + return; + } - // If we cannot explicitly close/abort, log and rely on natural closure. - this.#logger.log( - `${peerId}:: channel stream lacks close/abort, relying on natural closure`, + // Graceful close failed -- force abort with the original error. + try { + channel.stream.abort( + closeResult instanceof Error ? closeResult : new AbortError(), ); - } catch (problem) { - this.#outputError(peerId, 'closing channel stream', problem); + this.#logger.log(`${peerId}:: aborted channel stream`); + } catch (abortProblem) { + this.#outputError(peerId, 'closing channel stream', abortProblem); } } diff --git a/packages/ocap-kernel/src/remotes/platform/constants.ts b/packages/ocap-kernel/src/remotes/platform/constants.ts index 29133eb18..14073bb1c 100644 --- a/packages/ocap-kernel/src/remotes/platform/constants.ts +++ b/packages/ocap-kernel/src/remotes/platform/constants.ts @@ -40,5 +40,11 @@ export const RELAY_RECONNECT_MAX_ATTEMPTS = 10; /** Handshake timeout in milliseconds (10 seconds) */ export const HANDSHAKE_TIMEOUT_MS = 10_000; +/** Stream inactivity timeout in milliseconds (2 minutes, matches libp2p default) */ +export const STREAM_INACTIVITY_TIMEOUT_MS = 120_000; + +/** Minimum allowed stream inactivity timeout (5 seconds) to prevent self-DoS */ +export const MIN_STREAM_INACTIVITY_TIMEOUT_MS = 5_000; + /** Default threshold for consecutive identical errors before marking as permanently failed */ export const DEFAULT_CONSECUTIVE_ERROR_THRESHOLD = 5; diff --git a/packages/ocap-kernel/src/remotes/platform/handshake.test.ts b/packages/ocap-kernel/src/remotes/platform/handshake.test.ts index e044ce3f2..a54f85e9c 100644 --- a/packages/ocap-kernel/src/remotes/platform/handshake.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/handshake.test.ts @@ -99,6 +99,7 @@ describe('handshake', () => { mockChannel = { peerId: 'test-peer-id', + stream: { status: 'open' }, msgStream: { write: vi.fn().mockResolvedValue(undefined), read: vi.fn(), @@ -208,6 +209,7 @@ describe('handshake', () => { mockChannel = { peerId: 'test-peer-id', + stream: { status: 'open' }, msgStream: { write: vi.fn().mockResolvedValue(undefined), read: vi.fn(), diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index 3c28b6667..8e3ffafc7 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -67,17 +67,28 @@ vi.mock('./reconnection.ts', () => { }); // Mock ConnectionFactory +type MockStream = { + status: string; + inactivityTimeout: number; + addEventListener: ReturnType; + close: ReturnType; + abort: ReturnType; +}; + type MockChannel = { peerId: string; + stream: MockStream; msgStream: { read: ReturnType; write: ReturnType; + unwrap: ReturnType; }; }; const mockConnectionFactory = { dialIdempotent: vi.fn(), onInboundConnection: vi.fn(), + onPeerDisconnect: vi.fn(), stop: vi.fn().mockResolvedValue(undefined), closeChannel: vi.fn().mockResolvedValue(undefined), getListenAddresses: vi.fn().mockReturnValue([]), @@ -215,21 +226,32 @@ describe('transport.initTransport', () => { } }); - const createMockChannel = (peerId: string): MockChannel => ({ - peerId, - msgStream: { - read: vi - .fn<() => Promise>() - .mockImplementation(async () => { - return await new Promise(() => { - /* Never resolves by default */ - }); - }), - write: vi - .fn<(buffer: Uint8Array) => Promise>() - .mockResolvedValue(undefined), - }, - }); + const createMockChannel = (peerId: string): MockChannel => { + const stream: MockStream = { + status: 'open', + inactivityTimeout: 0, + addEventListener: vi.fn(), + close: vi.fn().mockResolvedValue(undefined), + abort: vi.fn(), + }; + return { + peerId, + stream, + msgStream: { + read: vi + .fn<() => Promise>() + .mockImplementation(async () => { + return await new Promise(() => { + /* Never resolves by default */ + }); + }), + write: vi + .fn<(buffer: Uint8Array) => Promise>() + .mockResolvedValue(undefined), + unwrap: vi.fn(() => stream), + }, + }; + }; describe('initialization', () => { it('passes correct parameters to ConnectionFactory.make', async () => { @@ -573,6 +595,83 @@ describe('transport.initTransport', () => { }); }); + it('treats StreamResetError as connection loss, not intentional close', async () => { + const { StreamResetError: MockStreamResetError } = await import( + '@libp2p/interface' + ); + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler) => { + inboundHandler = handler; + }, + ); + + await initTransport('0x1234', {}, vi.fn()); + + const mockChannel = createMockChannel('peer-1'); + mockChannel.msgStream.read.mockRejectedValue( + new MockStreamResetError('stream reset'), + ); + + inboundHandler?.(mockChannel); + + await vi.waitFor(() => { + // StreamResetError from the remote triggers reconnection, not + // intentional close — a malicious peer could otherwise permanently + // suppress the connection by aborting the stream. + expect(mockLogger.log).toHaveBeenCalledWith( + 'peer-1:: stream reset by remote, reconnecting', + ); + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-1', + ); + }); + }); + + it('starts reconnection on peer:disconnect when no active channel', async () => { + let disconnectHandler: ((peerId: string) => void) | undefined; + mockConnectionFactory.onPeerDisconnect.mockImplementation( + (handler: (peerId: string) => void) => { + disconnectHandler = handler; + }, + ); + + await initTransport('0x1234', {}, vi.fn()); + + // peer-1 has no channel (never connected), so peer:disconnect triggers reconnection + disconnectHandler?.('peer-1'); + + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-1', + ); + }); + + it('does not trigger reconnection on peer:disconnect when channel is active', async () => { + let disconnectHandler: ((peerId: string) => void) | undefined; + mockConnectionFactory.onPeerDisconnect.mockImplementation( + (handler: (peerId: string) => void) => { + disconnectHandler = handler; + }, + ); + + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + const { sendRemoteMessage: send } = await initTransport( + '0x1234', + {}, + vi.fn(), + ); + + // Establish an active channel + await send('peer-1', makeTestMessage('msg1')); + + // peer:disconnect fires but channel is still active — readChannel handles cleanup + disconnectHandler?.('peer-1'); + + expect(mockReconnectionManager.startReconnection).not.toHaveBeenCalled(); + }); + it('throws AbortError when signal aborted during read', async () => { let inboundHandler: ((channel: MockChannel) => void) | undefined; mockConnectionFactory.onInboundConnection.mockImplementation( @@ -742,6 +841,24 @@ describe('transport.initTransport', () => { expect(mockReconnectionManager.clear).toHaveBeenCalled(); }); + it('gracefully closes active channel streams on stop', async () => { + const { sendRemoteMessage, stop } = await initTransport( + '0x1234', + {}, + vi.fn(), + ); + + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + // Establish a channel by sending a message + await sendRemoteMessage('peer-1', makeTestMessage('msg')); + + await stop(); + + expect(mockChannel.stream.close).toHaveBeenCalled(); + }); + it('does not send messages after stop', async () => { const { sendRemoteMessage, stop } = await initTransport( '0x1234', @@ -2185,6 +2302,107 @@ describe('transport.initTransport', () => { }); }); + it('attaches close event listener to underlying stream', async () => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + const { sendRemoteMessage, stop } = await initTransport( + '0x1234', + {}, + vi.fn(), + ); + + await sendRemoteMessage('peer-1', makeTestMessage('msg1')); + + expect(mockChannel.stream.addEventListener).toHaveBeenCalledWith( + 'close', + expect.any(Function), + { once: true }, + ); + + await stop(); + }); + + it('sets default inactivityTimeout on stream when registering channel', async () => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + const { sendRemoteMessage, stop } = await initTransport( + '0x1234', + {}, + vi.fn(), + ); + + await sendRemoteMessage('peer-1', makeTestMessage('msg1')); + + expect(mockChannel.stream.inactivityTimeout).toBe(120_000); + + await stop(); + }); + + it('uses custom streamInactivityTimeoutMs when provided', async () => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + const { sendRemoteMessage, stop } = await initTransport( + '0x1234', + { streamInactivityTimeoutMs: 5_000 }, + vi.fn(), + ); + + await sendRemoteMessage('peer-1', makeTestMessage('msg1')); + + expect(mockChannel.stream.inactivityTimeout).toBe(5_000); + + await stop(); + }); + + it.each([ + { + scenario: 'local close without error', + event: { local: true }, + expectedLog: 'peer-1:: stream closed locally', + }, + { + scenario: 'local close with error', + event: { local: true, error: new Error('write timeout') }, + expectedLog: 'peer-1:: stream closed locally: write timeout', + }, + { + scenario: 'remote close with error', + event: { local: false, error: new Error('connection reset') }, + expectedLog: 'peer-1:: stream reset by remote: connection reset', + }, + { + scenario: 'remote clean close', + event: {}, + expectedLog: 'peer-1:: stream closed by remote (clean)', + }, + ])('logs $scenario via close event', async ({ event, expectedLog }) => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + + const { sendRemoteMessage, stop } = await initTransport( + '0x1234', + {}, + vi.fn(), + ); + + await sendRemoteMessage('peer-1', makeTestMessage('msg1')); + + // Get the close event listener that was attached + const closeListener = mockChannel.stream.addEventListener.mock + .calls[0]?.[1] as (evt: Event) => void; + + // Fire the close event + mockLogger.log.mockClear(); + closeListener(event as unknown as Event); + + expect(mockLogger.log).toHaveBeenCalledWith(expectedLog); + + await stop(); + }); + it('updates lastConnectionTime on each message send', async () => { const mockChannel = createMockChannel('peer-1'); mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 903185ced..34e941455 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -1,3 +1,5 @@ +import { StreamResetError } from '@libp2p/interface'; +import type { StreamCloseEvent } from '@libp2p/interface'; import { AbortError, ResourceLimitError } from '@metamask/kernel-errors'; import { installWakeDetector } from '@metamask/kernel-utils'; import { Logger } from '@metamask/logger'; @@ -14,6 +16,8 @@ import { DEFAULT_STALE_PEER_TIMEOUT_MS, DEFAULT_WRITE_TIMEOUT_MS, SCTP_USER_INITIATED_ABORT, + MIN_STREAM_INACTIVITY_TIMEOUT_MS, + STREAM_INACTIVITY_TIMEOUT_MS, } from './constants.ts'; import { performOutboundHandshake, @@ -41,6 +45,29 @@ import type { RemoteCommsOptions, } from '../types.ts'; +/** + * Detect whether a read error indicates an intentional disconnect. + * Checks for the v3 typed `StreamResetError` (fired when the remote explicitly + * aborts a stream, e.g. during shutdown) and falls back to legacy SCTP sniffing + * for WebRTC user-initiated abort (code 12). + * + * @param problem - The error thrown by a stream read. + * @returns Whether the error represents an intentional disconnect. + */ +function isIntentionalDisconnect(problem: unknown): boolean { + if (problem instanceof StreamResetError) { + return true; + } + const rtcProblem = problem as { + errorDetail?: string; + sctpCauseCode?: number; + }; + return ( + rtcProblem?.errorDetail === 'sctp-failure' && + rtcProblem?.sctpCauseCode === SCTP_USER_INITIATED_ABORT + ); +} + /** * Initialize the remote comm system with information that must be provided by the kernel. * @@ -91,6 +118,7 @@ export async function initTransport( reconnectionMaxDelayMs, handshakeTimeoutMs, writeTimeoutMs, + streamInactivityTimeoutMs, directTransports, allowedWsHosts, } = options; @@ -253,6 +281,32 @@ export async function initTransport( const previousChannel = state.channel; state.channel = channel; peerStateManager.updateConnectionTime(peerId); + + // Set stream inactivity timeout for detecting dead streams. + // This is distinct from the per-write timeout — it covers bidirectional + // silence across the stream's lifetime. + channel.stream.inactivityTimeout = Math.max( + streamInactivityTimeoutMs ?? STREAM_INACTIVITY_TIMEOUT_MS, + MIN_STREAM_INACTIVITY_TIMEOUT_MS, + ); + + // Listen for v3 fine-grained close events for diagnostics. + channel.stream.addEventListener( + 'close', + (evt: Event) => { + const { local, error } = evt as StreamCloseEvent; + if (local) { + const suffix = error ? `: ${error.message}` : ''; + logger.log(`${peerId}:: stream closed locally${suffix}`); + } else if (error) { + logger.log(`${peerId}:: stream reset by remote: ${error.message}`); + } else { + logger.log(`${peerId}:: stream closed by remote (clean)`); + } + }, + { once: true }, + ); + readChannel(channel).catch((problem) => { outputError(peerId, errorContext, problem); }); @@ -365,17 +419,17 @@ export async function initTransport( try { readBuf = await channel.msgStream.read(); } catch (problem) { - // Detect graceful disconnect - const rtcProblem = problem as { - errorDetail?: string; - sctpCauseCode?: number; - }; - if ( - rtcProblem?.errorDetail === 'sctp-failure' && - rtcProblem?.sctpCauseCode === SCTP_USER_INITIATED_ABORT - ) { + if (problem instanceof StreamResetError) { + // Remote-initiated stream reset: treat as connection loss and + // reconnect. Do NOT mark as intentionally closed — a malicious + // peer could otherwise permanently suppress the connection. + logger.log( + `${channel.peerId}:: stream reset by remote, reconnecting`, + ); + handleConnectionLoss(channel.peerId); + } else if (isIntentionalDisconnect(problem)) { + // Locally-initiated close (SCTP user abort): honour as intentional. logger.log(`${channel.peerId}:: remote intentionally disconnected`); - // Mark as intentionally closed and don't trigger reconnection peerStateManager.markIntentionallyClosed(channel.peerId); } else { outputError( @@ -632,6 +686,19 @@ export async function initTransport( }); }); + // Safety net for peer-level disconnects (fires when last connection to a peer closes). + // Only triggers reconnection if readChannel hasn't already cleaned up. + // If a channel still exists, readChannel's own error/finally will handle cleanup. + connectionFactory.onPeerDisconnect((peerId) => { + if (signal.aborted) { + return; // shutting down, don't start spurious reconnection + } + const state = peerStateManager.getState(peerId); + if (!state.channel) { + handleConnectionLoss(peerId); + } + }); + // Install wake detector to reset backoff on sleep/wake cleanupWakeDetector = installWakeDetector(handleWakeFromSleep); @@ -710,17 +777,15 @@ export async function initTransport( cleanupIntervalId = undefined; } stopController.abort(); // cancels all delays and dials - // Close all active channel streams to unblock pending reads + // Gracefully close all active channel streams to unblock pending reads. + // Fire-and-forget: close() sends a FIN so the remote sees a clean stream + // end (read returns undefined) rather than a reset (StreamResetError). for (const state of peerStateManager.getAllStates()) { const { channel } = state; if (channel) { - try { - // Close the stream to unblock any pending read operations - const stream = channel.msgStream.unwrap() as { close?: () => void }; - stream.close?.(); - } catch { - // Ignore errors during cleanup - } + channel.stream.close().catch((closeError) => { + outputError(channel.peerId, 'closing stream during stop', closeError); + }); state.channel = undefined; } } diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 136ad2c11..0e5128838 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -2,10 +2,15 @@ import type { Stream } from '@libp2p/interface'; import type { ByteStream } from '@libp2p/utils'; import type { Logger } from '@metamask/logger'; -export type InboundConnectionHandler = (channel: Channel) => void; +export type InboundConnectionHandler = ( + channel: Channel, +) => Promise | void; + +export type PeerDisconnectHandler = (peerId: string) => void; export type Channel = { msgStream: ByteStream; + stream: Stream; peerId: string; }; @@ -127,6 +132,12 @@ export type RemoteCommsOptions = { * When a sent message is not acknowledged within this timeout, it will be retransmitted. */ ackTimeoutMs?: number | undefined; + /** + * Timeout in milliseconds for stream inactivity (default: 120s). + * If no data flows in either direction for this duration, the stream is + * automatically aborted with an InactivityTimeoutError. + */ + streamInactivityTimeoutMs?: number | undefined; /** * Hostnames or IP addresses permitted for plain ws:// relay connections, * in addition to RFC 1918 / loopback addresses which are always allowed.