diff --git a/CHANGELOG.md b/CHANGELOG.md index 570182c..c6aad0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ - Improved OpenCode malformed JSON diagnostics with output length, event kinds, and a bounded preview, thanks @rohitjavvadi. - Fixed Express route mapping for aliased Router imports that follow block comment banners, thanks @rohitjavvadi. - Fixed Bun package-manager detection to recognize the text `bun.lock` lockfile, thanks @austinm911. +- Changed `clawpatch review --jobs` default from a fixed `10` to `floor(cpuCores / 2)` clamped to `[1, 10]`. Explicit `--jobs ` is honored as before. +- Added `clawpatch review --rate-limit-per-minute ` (also `CLAWPATCH_RPM`) to cap how many provider calls may start within any rolling 60s window across jobs. Default unset preserves prior behavior. ## 0.3.0 - 2026-05-18 diff --git a/src/app.ts b/src/app.ts index 05be074..a5c8aae 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,6 +1,6 @@ import { appendFile, lstat, readFile, realpath, writeFile } from "node:fs/promises"; import { join, relative, resolve } from "node:path"; -import { hostname } from "node:os"; +import { cpus, hostname } from "node:os"; import { changedPathsBetweenSnapshots, hasSourceDirtyWorktree, @@ -74,6 +74,7 @@ import { reasoningEfforts, } from "./types.js"; import { validationCommandsForFeature } from "./validation.js"; +import { createRpmLimiter, defaultJobs, rpmFromFlag } from "./rpm-limiter.js"; export type AppContext = { root: string; @@ -313,6 +314,9 @@ export async function reviewCommand( error: unknown; }> = []; const jobs = Math.min(reviewJobs(flags), Math.max(features.length, 1)); + const limiter = createRpmLimiter( + rpmFromFlag(stringFlag(flags, "rateLimitPerMinute"), process.env["CLAWPATCH_RPM"]), + ); let cursor = 0; emitProgress(context, "review", "start", { run: currentRunId, @@ -329,6 +333,7 @@ export async function reviewCommand( return; } try { + await limiter.acquire(); const reviewed = await reviewFeature({ context, loaded, @@ -1923,12 +1928,19 @@ async function filterFindingsByOwnedFilesSince( return filterFindingsByChangedOwnedFiles(findings, features, changed); } -function reviewJobs(flags: Record): number { - const parsed = Number(stringFlag(flags, "jobs") ?? "10"); - if (!Number.isFinite(parsed) || parsed < 1) { - return 1; +export function reviewJobs( + flags: Record, + coreCount: number = cpus().length, +): number { + const explicit = stringFlag(flags, "jobs"); + if (explicit !== undefined) { + const parsed = Number(explicit); + if (!Number.isFinite(parsed) || parsed < 1) { + return 1; + } + return Math.min(Math.floor(parsed), 32); } - return Math.min(Math.floor(parsed), 32); + return defaultJobs(coreCount); } function reviewMode(flags: Record): ReviewMode { diff --git a/src/cli.ts b/src/cli.ts index e6f858c..a2344eb 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -161,6 +161,7 @@ const commandFlags = { "since", "jobs", "mode", + "rateLimitPerMinute", "provider", "model", "reasoningEffort", @@ -221,6 +222,7 @@ const valueFlagNames = new Set([ "since", "jobs", "mode", + "rate-limit-per-minute", "source", "provider", "model", @@ -409,8 +411,9 @@ Flags: --project --limit --since - --jobs default: 10 + --jobs default: ~half of CPU cores, max 10 --mode + --rate-limit-per-minute cap provider calls per 60s window (env: CLAWPATCH_RPM) --provider --model --reasoning-effort diff --git a/src/review-jobs.test.ts b/src/review-jobs.test.ts new file mode 100644 index 0000000..88e8f8a --- /dev/null +++ b/src/review-jobs.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from "vitest"; +import { reviewJobs } from "./app.js"; + +describe("reviewJobs", () => { + it("defaults to floor(cores / 2) capped at 10 when --jobs is not given", () => { + expect(reviewJobs({}, 4)).toBe(2); + expect(reviewJobs({}, 8)).toBe(4); + expect(reviewJobs({}, 32)).toBe(10); + expect(reviewJobs({}, 1)).toBe(1); + }); + + it("honors explicit --jobs value", () => { + expect(reviewJobs({ jobs: "7" }, 32)).toBe(7); + expect(reviewJobs({ jobs: "1" }, 32)).toBe(1); + }); + + it("caps explicit --jobs at 32", () => { + expect(reviewJobs({ jobs: "100" }, 4)).toBe(32); + }); + + it("treats invalid explicit --jobs as 1", () => { + expect(reviewJobs({ jobs: "abc" }, 8)).toBe(1); + expect(reviewJobs({ jobs: "0" }, 8)).toBe(1); + }); +}); diff --git a/src/rpm-limiter.test.ts b/src/rpm-limiter.test.ts new file mode 100644 index 0000000..2e02945 --- /dev/null +++ b/src/rpm-limiter.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, it } from "vitest"; +import { createRpmLimiter, defaultJobs, rpmFromFlag } from "./rpm-limiter.js"; + +type ScheduledTimeout = { handler: () => void; runAt: number }; + +function makeFakeClock() { + let nowMs = 0; + const pending: ScheduledTimeout[] = []; + return { + clock: { + now: () => nowMs, + setTimeout: (handler: () => void, ms: number) => { + pending.push({ handler, runAt: nowMs + ms }); + }, + }, + advance(ms: number): void { + nowMs += ms; + // Fire any timeouts due at or before the current time, in scheduled order. + while (true) { + const due = pending.findIndex((entry) => entry.runAt <= nowMs); + if (due === -1) { + return; + } + const [entry] = pending.splice(due, 1); + entry?.handler(); + } + }, + setNow(ms: number): void { + nowMs = ms; + }, + pending: () => pending.length, + }; +} + +describe("defaultJobs", () => { + it("returns floor(cores / 2) capped at 10", () => { + expect(defaultJobs(4)).toBe(2); + expect(defaultJobs(8)).toBe(4); + expect(defaultJobs(32)).toBe(10); + }); + + it("clamps to a minimum of 1", () => { + expect(defaultJobs(1)).toBe(1); + expect(defaultJobs(0)).toBe(1); + expect(defaultJobs(Number.NaN)).toBe(1); + }); +}); + +describe("rpmFromFlag", () => { + it("prefers explicit flag over env", () => { + expect(rpmFromFlag("30", "60")).toBe(30); + }); + + it("falls back to env when flag is missing", () => { + expect(rpmFromFlag(undefined, "45")).toBe(45); + }); + + it("returns undefined when neither is set", () => { + expect(rpmFromFlag(undefined, undefined)).toBeUndefined(); + expect(rpmFromFlag("", "")).toBeUndefined(); + }); + + it("returns undefined for invalid values", () => { + expect(rpmFromFlag("abc", undefined)).toBeUndefined(); + expect(rpmFromFlag("0", undefined)).toBeUndefined(); + expect(rpmFromFlag("-5", undefined)).toBeUndefined(); + }); +}); + +describe("createRpmLimiter", () => { + it("is a no-op when limit is undefined", async () => { + const limiter = createRpmLimiter(undefined); + for (let i = 0; i < 100; i += 1) { + await limiter.acquire(); + } + }); + + it("is a no-op when limit is invalid", async () => { + const limiter = createRpmLimiter(0); + await limiter.acquire(); + const limiter2 = createRpmLimiter(Number.NaN); + await limiter2.acquire(); + }); + + it("allows up to N starts in a 60s window without delay", async () => { + const fake = makeFakeClock(); + const limiter = createRpmLimiter(3, fake.clock); + await limiter.acquire(); + await limiter.acquire(); + await limiter.acquire(); + expect(fake.pending()).toBe(0); + }); + + it("delays the (N+1)th call until the oldest slot expires", async () => { + const fake = makeFakeClock(); + const limiter = createRpmLimiter(2, fake.clock); + await limiter.acquire(); + fake.advance(10_000); + await limiter.acquire(); + + let resolved = false; + const pending = limiter.acquire().then(() => { + resolved = true; + }); + + // Allow the chain to evaluate `step` and schedule its setTimeout. + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + // First slot was at t=0, window is 60s, so the next slot opens at t=60_000. + fake.advance(50_000); + await pending; + expect(resolved).toBe(true); + }); +}); diff --git a/src/rpm-limiter.ts b/src/rpm-limiter.ts new file mode 100644 index 0000000..55ffdb6 --- /dev/null +++ b/src/rpm-limiter.ts @@ -0,0 +1,90 @@ +export type RpmLimiter = { + acquire(): Promise; +}; + +type Clock = { + now(): number; + setTimeout(handler: () => void, ms: number): void; +}; + +const defaultClock: Clock = { + now: () => Date.now(), + setTimeout: (handler, ms) => { + setTimeout(handler, ms).unref?.(); + }, +}; + +const noopLimiter: RpmLimiter = { + acquire: async () => {}, +}; + +export function createRpmLimiter( + limit: number | undefined, + clock: Clock = defaultClock, +): RpmLimiter { + if (limit === undefined || !Number.isFinite(limit) || limit < 1) { + return noopLimiter; + } + const max = Math.floor(limit); + const window = 60_000; + const starts: number[] = []; + let chain: Promise = Promise.resolve(); + + function pruneOlder(reference: number): void { + while (starts.length > 0) { + const head = starts[0]; + if (head === undefined || reference - head < window) { + return; + } + starts.shift(); + } + } + + async function step(): Promise { + const now = clock.now(); + pruneOlder(now); + if (starts.length < max) { + starts.push(now); + return; + } + const oldest = starts[0] ?? now; + const wait = window - (now - oldest); + await new Promise((resolveWait) => { + clock.setTimeout(resolveWait, Math.max(wait, 0)); + }); + const after = clock.now(); + pruneOlder(after); + starts.push(after); + } + + return { + acquire(): Promise { + const next = chain.then(step); + // Ensure rejections do not poison the chain for subsequent acquirers. + chain = next.catch(() => undefined); + return next; + }, + }; +} + +export function rpmFromFlag( + explicit: string | undefined, + envValue: string | undefined, +): number | undefined { + const raw = explicit ?? envValue; + if (raw === undefined || raw === "") { + return undefined; + } + const parsed = Number(raw); + if (!Number.isFinite(parsed) || parsed < 1) { + return undefined; + } + return Math.floor(parsed); +} + +export function defaultJobs(coreCount: number): number { + if (!Number.isFinite(coreCount) || coreCount < 1) { + return 1; + } + return Math.min(Math.max(Math.floor(coreCount / 2), 1), 10); +}