Skip to content
Closed
206 changes: 206 additions & 0 deletions kiloclaw/src/durable-objects/kiloclaw-instance.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,212 @@ describe('createNewMachine: persist ID before waitForState', () => {
});
});

describe('createNewMachine 409 recovery', () => {
it('adopts existing machine when createMachine returns 409 already_exists', async () => {
const { instance, storage } = createInstance();
await seedProvisioned(storage, { status: 'stopped', flyMachineId: null });

// createMachine throws 409 with machine ID in the body
(flyClient.createMachine as Mock).mockRejectedValue(
new FlyApiError('conflict', 409, 'already_exists machine ID e82d3d7b44d987')
);
// getMachine for volume guard + inline start
(flyClient.getMachine as Mock).mockResolvedValue({
id: 'e82d3d7b44d987',
state: 'stopped',
region: 'iad',
config: {
mounts: [{ volume: 'vol-1', path: '/root' }],
guest: { cpus: 1, memory_mb: 256, cpu_kind: 'shared' },
},
});
(flyClient.updateMachine as Mock).mockResolvedValue({ id: 'e82d3d7b44d987' });
(flyClient.waitForState as Mock).mockResolvedValue(undefined);
(flyClient.getVolume as Mock).mockResolvedValue({ id: 'vol-1' });

await instance.start('user-1');

// Should have adopted the machine ID from the 409 body
expect(storage._store.get('flyMachineId')).toBe('e82d3d7b44d987');
// Should NOT have called createMachine a second time
expect(flyClient.createMachine).toHaveBeenCalledTimes(1);
// Should have started the existing machine
expect(flyClient.getMachine).toHaveBeenCalledWith(expect.anything(), 'e82d3d7b44d987');
// Full state transition should complete
expect(storage._store.get('status')).toBe('running');
});

it('persists flyMachineId even when adopted machine start throws', async () => {
const { instance, storage } = createInstance();
await seedProvisioned(storage, { status: 'stopped', flyMachineId: null });

(flyClient.createMachine as Mock).mockRejectedValue(
new FlyApiError('conflict', 409, 'already_exists machine ID e82d3d7b44d987')
);
(flyClient.getMachine as Mock).mockResolvedValue({
id: 'e82d3d7b44d987',
state: 'stopped',
region: 'iad',
config: {
mounts: [{ volume: 'vol-1', path: '/root' }],
guest: { cpus: 1, memory_mb: 256, cpu_kind: 'shared' },
},
});
(flyClient.updateMachine as Mock).mockResolvedValue({ id: 'e82d3d7b44d987' });
// waitForState fails after adoption
(flyClient.waitForState as Mock).mockRejectedValue(new Error('timeout waiting for started'));
(flyClient.getVolume as Mock).mockResolvedValue({ id: 'vol-1' });

await expect(instance.start('user-1')).rejects.toThrow('timeout waiting for started');

// flyMachineId is durably written before startExistingMachine runs (per the
// "Machine ID persisted before waiting" invariant), so the reconciliation alarm
// can retry on the next cycle.
expect(storage._store.get('flyMachineId')).toBe('e82d3d7b44d987');
// status should not have advanced to 'running'
expect(storage._store.get('status')).toBe('stopped');
});

it('logs and re-throws when 409 body has already_exists but no machine ID', async () => {
const { instance, storage } = createInstance();
await seedProvisioned(storage, { status: 'stopped', flyMachineId: null });

(flyClient.createMachine as Mock).mockRejectedValue(
new FlyApiError('conflict', 409, 'already_exists: conflict')
);
(flyClient.getVolume as Mock).mockResolvedValue({ id: 'vol-1' });

await expect(instance.start('user-1')).rejects.toThrow('conflict');

// Should have logged the body for debugging
expect(console.error).toHaveBeenCalledWith(
'[DO] 409 already_exists but could not extract machine ID from body:',
'already_exists: conflict'
);
});

it('destroys stale 409 machine when its volume does not match expected volume', async () => {
const { instance, storage } = createInstance();
// Volume was swapped (e.g. after replaceStrandedVolume) — DO expects vol-new
await seedProvisioned(storage, {
status: 'stopped',
flyMachineId: null,
flyVolumeId: 'vol-new',
});

const conflictErr = new FlyApiError('conflict', 409, 'already_exists machine ID stale123');
(flyClient.createMachine as Mock).mockRejectedValue(conflictErr);
// The stale machine is attached to the OLD volume
(flyClient.getMachine as Mock).mockResolvedValue({
id: 'stale123',
state: 'stopped',
region: 'iad',
config: { mounts: [{ volume: 'vol-old', path: '/root' }] },
});
(flyClient.destroyMachine as Mock).mockResolvedValue(undefined);
(flyClient.getVolume as Mock).mockResolvedValue({ id: 'vol-new' });

await expect(instance.start('user-1')).rejects.toThrow('conflict');

// Should have destroyed the stale machine
expect(flyClient.destroyMachine).toHaveBeenCalledWith(expect.anything(), 'stale123');
// Should NOT have adopted it
expect(storage._store.get('flyMachineId')).toBeNull();
expect(console.warn).toHaveBeenCalledWith(expect.stringContaining('destroying stale machine'));
});

it('adopts 409 machine when its volume matches expected volume', async () => {
const { instance, storage } = createInstance();
await seedProvisioned(storage, { status: 'stopped', flyMachineId: null, flyVolumeId: 'vol-1' });

(flyClient.createMachine as Mock).mockRejectedValue(
new FlyApiError('conflict', 409, 'already_exists machine ID e82d3d7b44d987')
);
// The existing machine has the SAME volume — safe to adopt
(flyClient.getMachine as Mock).mockResolvedValue({
id: 'e82d3d7b44d987',
state: 'stopped',
region: 'iad',
config: {
mounts: [{ volume: 'vol-1', path: '/root' }],
guest: { cpus: 1, memory_mb: 256, cpu_kind: 'shared' },
},
});
(flyClient.updateMachine as Mock).mockResolvedValue({ id: 'e82d3d7b44d987' });
(flyClient.waitForState as Mock).mockResolvedValue(undefined);
(flyClient.getVolume as Mock).mockResolvedValue({ id: 'vol-1' });

await instance.start('user-1');

expect(storage._store.get('flyMachineId')).toBe('e82d3d7b44d987');
expect(flyClient.destroyMachine).not.toHaveBeenCalled();
expect(storage._store.get('status')).toBe('running');
});

it('re-throws 409 without already_exists in body', async () => {
const { instance, storage } = createInstance();
await seedProvisioned(storage, { status: 'stopped', flyMachineId: null });

(flyClient.createMachine as Mock).mockRejectedValue(
new FlyApiError('conflict', 409, 'some other conflict')
);
(flyClient.getVolume as Mock).mockResolvedValue({ id: 'vol-1' });

await expect(instance.start('user-1')).rejects.toThrow('conflict');

// Should NOT have tried to extract machine ID
expect(console.error).not.toHaveBeenCalledWith(
expect.stringContaining('could not extract machine ID'),
expect.anything()
);
});

it('throws on 404 during adopted machine start instead of recursing', async () => {
const { instance, storage } = createInstance();
await seedProvisioned(storage, { status: 'stopped', flyMachineId: null });

const conflictErr = new FlyApiError('conflict', 409, 'already_exists machine ID ghost123');
(flyClient.createMachine as Mock).mockRejectedValue(conflictErr);
// First getMachine (volume guard) succeeds, second (inline start) returns 404
(flyClient.getMachine as Mock)
.mockResolvedValueOnce({
id: 'ghost123',
state: 'stopped',
region: 'iad',
config: {
mounts: [{ volume: 'vol-1', path: '/root' }],
guest: { cpus: 1, memory_mb: 256, cpu_kind: 'shared' },
},
})
.mockRejectedValueOnce(new FlyApiError('not found', 404, 'not found'));
(flyClient.getVolume as Mock).mockResolvedValue({ id: 'vol-1' });

// Should throw (not loop) — alarm will retry
await expect(instance.start('user-1')).rejects.toThrow();

// createMachine should only have been called once (no recursion)
expect(flyClient.createMachine).toHaveBeenCalledTimes(1);
// flyMachineId was persisted before the start attempt
expect(storage._store.get('flyMachineId')).toBe('ghost123');
});

it('re-throws non-409 errors from createMachine', async () => {
const { instance, storage } = createInstance();
await seedProvisioned(storage, { status: 'stopped', flyMachineId: null });

(flyClient.createMachine as Mock).mockRejectedValue(
new FlyApiError('internal error', 500, 'server error')
);
(flyClient.getVolume as Mock).mockResolvedValue({ id: 'vol-1' });

await expect(instance.start('user-1')).rejects.toThrow('internal error');

// Machine ID should not have been set (stays null from seed)
expect(storage._store.get('flyMachineId')).toBeNull();
expect(storage._store.get('status')).toBe('stopped');
});
});

describe('gateway process control via controller', () => {
it('allows gateway status calls when machine ID exists even if DO status is stale', async () => {
const { instance, storage } = createInstance();
Expand Down
76 changes: 70 additions & 6 deletions kiloclaw/src/durable-objects/kiloclaw-instance/fly-machines.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { KiloClawEnv } from '../../types';
import type { FlyClientConfig } from '../../fly/client';
import type { FlyMachineConfig } from '../../fly/types';
import type { FlyMachine, FlyMachineConfig } from '../../fly/types';
import * as fly from '../../fly/client';
import {
DEFAULT_VOLUME_SIZE_GB,
Expand Down Expand Up @@ -215,11 +215,75 @@ export async function createNewMachine(
minSecretsVersion?: number,
envFlyRegion?: string
): Promise<void> {
const machine = await fly.createMachine(flyConfig, machineConfig, {
name: state.sandboxId ?? undefined,
region: state.flyRegion ?? envFlyRegion ?? undefined,
minSecretsVersion,
});
let machine: FlyMachine;
try {
machine = await fly.createMachine(flyConfig, machineConfig, {
name: state.sandboxId ?? undefined,
region: state.flyRegion ?? envFlyRegion ?? undefined,
minSecretsVersion,
});
} catch (err) {
// Safety net: if the machine already exists (e.g. DO state lost flyMachineId
// and metadata recovery didn't run or failed), adopt it instead of crashing.
if (
err instanceof fly.FlyApiError &&
err.status === 409 &&
err.body.includes('already_exists')
) {
const match = err.body.match(/machine ID (\w+)/);
if (match) {
const existingId = match[1];

// Guard: if the caller expects a specific volume (e.g. after
// replaceStrandedVolume swapped to a new volume), verify the
// existing machine is actually attached to that volume. If it
// isn't, it's a stale machine on the old host/volume — adopting
// it would undo the recovery. Destroy it so the next alarm
// retry can create cleanly.
const expectedVolume = machineConfig.mounts?.[0]?.volume;
if (expectedVolume) {
const existing = await fly.getMachine(flyConfig, existingId);
const actualVolume = existing.config?.mounts?.[0]?.volume;
if (actualVolume !== expectedVolume) {
console.warn(
`[DO] 409 machine ${existingId} has volume ${actualVolume}, expected ${expectedVolume} — destroying stale machine`
);
try {
await fly.destroyMachine(flyConfig, existingId);
} catch (destroyErr) {
if (!fly.isFlyNotFound(destroyErr)) {
console.warn('[DO] Failed to destroy stale 409 machine:', destroyErr);
}
}
throw err;
Comment thread
evanjacobson marked this conversation as resolved.
}
}

console.log('[DO] Machine already exists (409), adopting:', existingId);
state.flyMachineId = existingId;
Comment thread
evanjacobson marked this conversation as resolved.
await ctx.storage.put(storageUpdate({ flyMachineId: existingId }));
// Start the adopted machine inline. Don't use startExistingMachine
// here — its 404 fallback calls createNewMachine, which would
// loop back here if the adopted machine disappears while Fly's
// name registry still returns 409. If anything fails, we throw
// and let the alarm retry; flyMachineId is already persisted.
const adopted = await fly.getMachine(flyConfig, existingId);
if (adopted.state === 'stopped' || adopted.state === 'created') {
await fly.updateMachine(flyConfig, existingId, machineConfig, { minSecretsVersion });
Comment thread
evanjacobson marked this conversation as resolved.
Outdated
await fly.waitForState(flyConfig, existingId, 'started', STARTUP_TIMEOUT_SECONDS);
} else if (adopted.state !== 'started') {
await fly.waitForState(flyConfig, existingId, 'started', STARTUP_TIMEOUT_SECONDS);
}
return;
} else {
console.error(
'[DO] 409 already_exists but could not extract machine ID from body:',
err.body
);
}
}
throw err;
}
state.flyMachineId = machine.id;

await ctx.storage.put(storageUpdate({ flyMachineId: machine.id }));
Expand Down
Loading