diff --git a/package.json b/package.json index a5aec5879..56461f78d 100644 --- a/package.json +++ b/package.json @@ -74,6 +74,9 @@ "@tanstack/react-start": "1.168.26", "@tanstack/react-start-client": "1.168.14", "@tanstack/react-table": "^8.21.3", + "@tanstack/workflow-core": "0.0.3", + "@tanstack/workflow-runtime": "0.0.2", + "@tanstack/workflow-store-drizzle-postgres": "0.0.4", "@types/d3": "^7.4.3", "@uploadthing/react": "^7.3.3", "@visx/hierarchy": "^3.12.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2ed90dbdb..d935d005a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -138,6 +138,15 @@ importers: '@tanstack/react-table': specifier: ^8.21.3 version: 8.21.3(react-dom@19.2.3(react@19.2.3))(react@19.2.3) + '@tanstack/workflow-core': + specifier: 0.0.3 + version: 0.0.3 + '@tanstack/workflow-runtime': + specifier: 0.0.2 + version: 0.0.2 + '@tanstack/workflow-store-drizzle-postgres': + specifier: 0.0.4 + version: 0.0.4(@opentelemetry/api@1.9.1)(@types/pg@8.20.0)(postgres@3.4.8) '@types/d3': specifier: ^7.4.3 version: 7.4.3 @@ -3591,6 +3600,18 @@ packages: resolution: {integrity: sha512-uhOeFyxLcU41HzvrxsGpiWdcMbScY1EDgbZ5K7DVRMYInbLYWAC0EA/kx9wXAoSM8q82bUG2hRl8+EAjE6XAbA==} engines: {node: '>=20.19'} + '@tanstack/workflow-core@0.0.3': + resolution: {integrity: sha512-0ev5ISQR8knUBAco46bc8h2/bMGS3ZPlEC/I/EgeD7PXE0DnmNJLvrkwZ5KfdzcbdM8pqJQLR5coBHtNurOcEg==} + engines: {node: '>=18'} + + '@tanstack/workflow-runtime@0.0.2': + resolution: {integrity: sha512-v+cv8rIOaE0a50Md6WSPge11wLn9NVxcGbVanKQe2BlZuNtDVIayZFUgn3RXHQp68MQmrdu2bK6aT9BSMZ4z2g==} + engines: {node: '>=18'} + + '@tanstack/workflow-store-drizzle-postgres@0.0.4': + resolution: {integrity: sha512-IguipqLh04rvO3FnWxuPp/BiTJPnJiB7jlGH/M/qlfhUkFA0x4bPJ+MjYxa1sQaJLJbpYJI/Z2AVzRwfhDAFRw==} + engines: {node: '>=18'} + '@tweenjs/tween.js@23.1.3': resolution: {integrity: sha512-vJmvvwFxYuGnF2axRtPYocag6Clbb5YS7kLL+SO/TeVFzHqDIWrNKYtcsPMibjDx9O+bu+psAy9NKfWklassUA==} @@ -9951,6 +9972,50 @@ snapshots: '@tanstack/virtual-file-routes@1.162.0': {} + '@tanstack/workflow-core@0.0.3': + dependencies: + '@standard-schema/spec': 1.1.0 + + '@tanstack/workflow-runtime@0.0.2': + dependencies: + '@tanstack/workflow-core': 0.0.3 + + '@tanstack/workflow-store-drizzle-postgres@0.0.4(@opentelemetry/api@1.9.1)(@types/pg@8.20.0)(postgres@3.4.8)': + dependencies: + '@tanstack/workflow-core': 0.0.3 + '@tanstack/workflow-runtime': 0.0.2 + drizzle-orm: 0.45.2(@opentelemetry/api@1.9.1)(@types/pg@8.20.0)(postgres@3.4.8) + transitivePeerDependencies: + - '@aws-sdk/client-rds-data' + - '@cloudflare/workers-types' + - '@electric-sql/pglite' + - '@libsql/client' + - '@libsql/client-wasm' + - '@neondatabase/serverless' + - '@op-engineering/op-sqlite' + - '@opentelemetry/api' + - '@planetscale/database' + - '@prisma/client' + - '@tidbcloud/serverless' + - '@types/better-sqlite3' + - '@types/pg' + - '@types/sql.js' + - '@upstash/redis' + - '@vercel/postgres' + - '@xata.io/client' + - better-sqlite3 + - bun-types + - expo-sqlite + - gel + - knex + - kysely + - mysql2 + - pg + - postgres + - prisma + - sql.js + - sqlite3 + '@tweenjs/tween.js@23.1.3': {} '@tybys/wasm-util@0.10.1': diff --git a/src/db/schema.ts b/src/db/schema.ts index 77c59847b..35f65b565 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -1140,14 +1140,11 @@ export type NewIntentPackage = InferInsertModel // Per-version snapshot of a package's skills (latest + last 5 versions) // -// syncStatus acts as a durable work queue: +// syncStatus tracks domain progress for each discovered package version: // 'pending' -- version discovered, tarball not yet downloaded/extracted // 'synced' -- skills extracted and indexed successfully // 'failed' -- tarball processing failed (will be retried next cycle) -// -// This means the scheduled function can be interrupted at any point and -// resume from where it left off. Only the currently in-flight version is -// at risk of being re-processed on restart (upserts make that safe). +// Workflow run/step replay lives in the Workflow Postgres store, not here. export const intentPackageVersions = pgTable( 'intent_package_versions', { diff --git a/src/routes/admin/intent.tsx b/src/routes/admin/intent.tsx index 44dcdab67..872e0b838 100644 --- a/src/routes/admin/intent.tsx +++ b/src/routes/admin/intent.tsx @@ -20,6 +20,7 @@ import { getIntentAdminStats, listIntentPackages, listFailedVersions, + listIntentWorkflowRuns, triggerIntentDiscover, triggerIntentProcess, retryIntentVersion, @@ -40,6 +41,7 @@ const QK = { stats: ['admin', 'intent', 'stats'] as const, packages: ['admin', 'intent', 'packages'] as const, failed: ['admin', 'intent', 'failed'] as const, + workflows: ['admin', 'intent', 'workflows'] as const, } // --------------------------------------------------------------------------- @@ -69,6 +71,12 @@ function IntentAdminPage() { queryFn: () => listFailedVersions(), }) + const workflowsQuery = useQuery({ + queryKey: QK.workflows, + queryFn: () => listIntentWorkflowRuns(), + refetchInterval: 10_000, + }) + const discoverMutation = useMutation({ mutationFn: () => triggerIntentDiscover(), onSuccess: invalidateAll, @@ -324,6 +332,11 @@ function IntentAdminPage() { /> + + {/* Failed versions (shown prominently when non-zero) */} {(failedQuery.data?.length ?? 0) > 0 && ( + readonly loading: boolean +}) { + return ( +
+

+ + Workflow Runs +

+ {loading ? ( +
+ ) : runs.length === 0 ? ( + + No workflow runs recorded yet. + + ) : ( +
+ + + + + + + + + + + {runs.map((run) => ( + + + + + + + ))} + +
+ Workflow + + Status + + Run + + Updated +
+ {run.workflowId} + + + {run.waitingFor + ? `${run.status}:${run.waitingFor}` + : run.status} + + + {run.runId} + + {formatDistanceToNow(run.updatedAt, { addSuffix: true })} +
+
+ )} +
+ ) +} + +function getWorkflowStatusClass(status: string): string { + switch (status) { + case 'finished': + return 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/40 dark:text-emerald-300' + case 'errored': + case 'aborted': + return 'bg-red-100 text-red-700 dark:bg-red-900/40 dark:text-red-300' + case 'paused': + return 'bg-amber-100 text-amber-700 dark:bg-amber-900/40 dark:text-amber-300' + case 'running': + case 'queued': + return 'bg-sky-100 text-sky-700 dark:bg-sky-900/40 dark:text-sky-300' + default: + return 'bg-gray-100 text-gray-700 dark:bg-gray-800 dark:text-gray-300' + } +} + // --------------------------------------------------------------------------- // Result banner // --------------------------------------------------------------------------- diff --git a/src/server/scheduled.server.ts b/src/server/scheduled.server.ts index 82baecf71..d6c0a267c 100644 --- a/src/server/scheduled.server.ts +++ b/src/server/scheduled.server.ts @@ -1,48 +1,15 @@ +import { materializeWorkflowSchedules } from '@tanstack/workflow-runtime' import { pruneStaleCacheRows } from '~/utils/github-content-cache.server' import { refreshGitHubOrgStats, refreshNpmOrgStats, } from '~/utils/stats.functions' -import { - extractSkillsFromTarball, - fetchPackument, - isIntentCompatible, - searchIntentPackages, - selectVersionsToSync, -} from '~/utils/intent.server' -import { - enqueuePackageVersion, - getKnownVersions, - getPendingVersions, - markPackageVerified, - markVersionFailed, - markVersionSynced, - replaceSkillsForVersion, - upsertIntentPackage, -} from '~/utils/intent-db.server' +import { workflowRuntime } from '~/utils/workflow-runtime.server' const CONTENT_CACHE_PRUNE_CRON = '0 9 * * *' const STATS_AND_INTENT_DISCOVER_CRON = '0 */6 * * *' const INTENT_PROCESS_CRON = '*/15 * * * *' -const INTENT_PROCESS_BUDGET_MS = 12 * 60 * 1000 - -type GitHubSearchResponse = { - items: Array<{ path: string; repository: { full_name: string } }> -} - -type GitHubContentResponse = { - content?: string -} - -type PackageJson = { - name?: string - private?: boolean -} - -type NpmLatestResponse = { - dist?: { tarball?: string } - version?: string -} +const WORKFLOW_SWEEP_MAX_DURATION_MS = 25_000 export async function runScheduledTasks(cron: string, scheduledTime: number) { switch (cron) { @@ -53,17 +20,47 @@ export async function runScheduledTasks(cron: string, scheduledTime: number) { await Promise.all([ runGitHubStatsRefresh(scheduledTime), runNpmStatsRefresh(scheduledTime), - runIntentDiscovery(scheduledTime), + runWorkflowSweep(cron, scheduledTime), ]) return case INTENT_PROCESS_CRON: - await runIntentQueueProcess(scheduledTime) + await runWorkflowSweep(cron, scheduledTime) return default: console.warn(`[scheduled] No task registered for cron: ${cron}`) } } +async function runWorkflowSweep(cron: string, scheduledTime: number) { + const startTime = Date.now() + console.log('[workflow-sweep] Starting workflow sweep...') + + try { + const materialized = await materializeWorkflowSchedules(workflowRuntime, { + now: scheduledTime, + }) + const sweep = await workflowRuntime.sweep({ + now: scheduledTime, + leaseOwner: `cloudflare:${cron}:${scheduledTime}`, + maxDurationMs: WORKFLOW_SWEEP_MAX_DURATION_MS, + maxScheduledRuns: 10, + maxTimers: 10, + includeEvents: false, + }) + const duration = Date.now() - startTime + + console.log( + `[workflow-sweep] Completed in ${duration}ms - materialized: ${materialized.length}, scheduled: ${JSON.stringify(sweep.summary.scheduled)}, timers: ${JSON.stringify(sweep.summary.timers)}, remaining: ${sweep.remainingMayExist}`, + ) + console.log( + '[workflow-sweep] Scheduled time:', + new Date(scheduledTime).toISOString(), + ) + } catch (error) { + logScheduledError('workflow-sweep', startTime, error) + } +} + async function runContentCachePrune(scheduledTime: number) { const startTime = Date.now() console.log('[prune-content-cache] Starting prune...') @@ -133,342 +130,10 @@ async function runNpmStatsRefresh(scheduledTime: number) { } } -async function runIntentDiscovery(scheduledTime: number) { - const startTime = Date.now() - console.log('[intent-discover] Starting discovery (NPM + GitHub)...') - - let versionsEnqueued = 0 - const errors: Array = [] - - try { - console.log( - '[intent-discover] Searching NPM for keywords:tanstack-intent...', - ) - const searchResults = await searchIntentPackages() - console.log( - `[intent-discover] NPM found ${searchResults.objects.length} candidates`, - ) - - for (const { package: pkg } of searchResults.objects) { - try { - await upsertIntentPackage({ name: pkg.name, verified: false }) - - const packument = await fetchPackument(pkg.name) - const latestVersion = packument['dist-tags'].latest - if (!latestVersion) continue - - const latestMeta = packument.versions[latestVersion] - if (!latestMeta || !isIntentCompatible(latestMeta)) continue - - await markPackageVerified(pkg.name) - - const knownVersions = await getKnownVersions(pkg.name) - const toEnqueue = selectVersionsToSync(packument, knownVersions) - for (const { publishedAt, tarball, version } of toEnqueue) { - await enqueuePackageVersion({ - packageName: pkg.name, - publishedAt, - tarballUrl: tarball, - version, - }) - versionsEnqueued++ - } - console.log( - `[intent-discover] NPM: ${pkg.name} - enqueued ${toEnqueue.length}`, - ) - } catch (error) { - const message = `npm/${pkg.name}: ${getErrorMessage(error)}` - console.error(`[intent-discover] ${message}`) - errors.push(message) - } - } - } catch (error) { - console.error('[intent-discover] NPM path failed:', getErrorMessage(error)) - } - - const githubToken = process.env.GITHUB_AUTH_TOKEN - if (githubToken) { - const githubEnqueued = await discoverIntentPackagesFromGitHub( - githubToken, - errors, - ) - versionsEnqueued += githubEnqueued - } else { - console.warn( - '[intent-discover] GITHUB_AUTH_TOKEN not set, skipping GitHub path', - ) - } - - const duration = Date.now() - startTime - console.log( - `[intent-discover] Done in ${duration}ms - enqueued: ${versionsEnqueued}, errors: ${errors.length}`, - ) - if (errors.length > 0) { - console.warn(`[intent-discover] Errors:\n ${errors.join('\n ')}`) - } - console.log( - '[intent-discover] Scheduled time:', - new Date(scheduledTime).toISOString(), - ) -} - -async function discoverIntentPackagesFromGitHub( - githubToken: string, - errors: Array, -) { - let versionsEnqueued = 0 - - try { - console.log( - '[intent-discover] Searching GitHub for @tanstack/intent dependents...', - ) - const ghHeaders = { - Accept: 'application/vnd.github.v3+json', - Authorization: `Bearer ${githubToken}`, - } - - const searchRes = await fetch( - 'https://api.github.com/search/code?q=%22%40tanstack%2Fintent%22+filename%3Apackage.json&per_page=100', - { headers: ghHeaders }, - ) - if (!searchRes.ok) { - throw new Error(`GitHub search ${searchRes.status}`) - } - - const searchDataJson: unknown = await searchRes.json() - if (!isGitHubSearchResponse(searchDataJson)) { - throw new Error('Invalid GitHub search response') - } - - const seen = new Set() - const candidates = searchDataJson.items.filter((item) => { - const key = `${item.repository.full_name}|${item.path}` - if (seen.has(key)) return false - seen.add(key) - return true - }) - - console.log( - `[intent-discover] GitHub found ${candidates.length} package.json files`, - ) - - for (const { path, repo } of candidates.map((item) => ({ - path: item.path, - repo: item.repository.full_name, - }))) { - try { - const enqueued = await discoverIntentPackageFromGitHubRepo( - repo, - path, - ghHeaders, - ) - versionsEnqueued += enqueued - } catch (error) { - const message = `github/${repo}: ${getErrorMessage(error)}` - console.error(`[intent-discover] ${message}`) - errors.push(message) - } - } - } catch (error) { - console.error( - '[intent-discover] GitHub path failed:', - getErrorMessage(error), - ) - } - - return versionsEnqueued -} - -async function discoverIntentPackageFromGitHubRepo( - repo: string, - path: string, - headers: Record, -) { - const contentRes = await fetch( - `https://api.github.com/repos/${repo}/contents/${path}`, - { headers }, - ) - if (!contentRes.ok) return 0 - - const contentDataJson: unknown = await contentRes.json() - if (!isGitHubContentResponse(contentDataJson)) return 0 - const contentData = contentDataJson - if (!contentData.content) return 0 - - const pkgJson: unknown = JSON.parse(decodeBase64Utf8(contentData.content)) - if (!isPackageJson(pkgJson)) return 0 - - const pkgName = pkgJson.name - if (!pkgName || pkgJson.private) return 0 - - const npmRes = await fetch( - `https://registry.npmjs.org/${encodeURIComponent(pkgName)}/latest`, - ) - if (!npmRes.ok) return 0 - - const npmMetaJson: unknown = await npmRes.json() - if (!isNpmLatestResponse(npmMetaJson)) return 0 - const npmMeta = npmMetaJson - if (!npmMeta.version || !npmMeta.dist?.tarball) return 0 - - const skills = await extractSkillsFromTarball(npmMeta.dist.tarball) - if (skills.length === 0) return 0 - - await upsertIntentPackage({ name: pkgName, verified: true }) - await markPackageVerified(pkgName) - - const packument = await fetchPackument(pkgName) - const knownVersions = await getKnownVersions(pkgName) - const toEnqueue = selectVersionsToSync(packument, knownVersions) - - for (const { publishedAt, tarball, version } of toEnqueue) { - await enqueuePackageVersion({ - packageName: pkgName, - publishedAt, - tarballUrl: tarball, - version, - }) - } - - if (toEnqueue.length > 0) { - console.log( - `[intent-discover] GitHub: ${pkgName} - enqueued ${toEnqueue.length}`, - ) - } - - return toEnqueue.length -} - -async function runIntentQueueProcess(scheduledTime: number) { - const startTime = Date.now() - const deadline = startTime + INTENT_PROCESS_BUDGET_MS - - console.log('[intent-process] Starting queue drain...') - - let failed = 0 - let processed = 0 - let skipped = 0 - - try { - const batchSize = 50 - - while (Date.now() < deadline) { - const remaining = deadline - Date.now() - const pending = await getPendingVersions(batchSize) - - if (pending.length === 0) { - console.log('[intent-process] Queue empty, nothing to do') - break - } - - console.log( - `[intent-process] ${pending.length} pending version(s), ${Math.round( - remaining / 1000, - )}s remaining`, - ) - - for (const item of pending) { - if (Date.now() >= deadline) { - skipped += pending.length - pending.indexOf(item) - console.log( - `[intent-process] Budget exhausted, stopping. ${skipped} item(s) deferred to next run.`, - ) - break - } - - if (!item.tarballUrl) { - await markVersionFailed(item.id, 'No tarball URL recorded') - failed++ - continue - } - - try { - const skills = await extractSkillsFromTarball(item.tarballUrl) - await replaceSkillsForVersion(item.id, skills) - await markVersionSynced(item.id, skills.length) - processed++ - console.log( - `[intent-process] ${item.packageName}@${item.version} - ${skills.length} skill(s)`, - ) - } catch (error) { - const reason = getErrorMessage(error) - await markVersionFailed(item.id, reason) - failed++ - console.error( - `[intent-process] ${item.packageName}@${item.version}: ${reason}`, - ) - } - } - - if (pending.length < batchSize) break - } - - const duration = Date.now() - startTime - console.log( - `[intent-process] Done in ${duration}ms - processed: ${processed}, failed: ${failed}, deferred: ${skipped}`, - ) - console.log( - '[intent-process] Scheduled time:', - new Date(scheduledTime).toISOString(), - ) - } catch (error) { - logScheduledError('intent-process', startTime, error) - } -} - function getErrorMessage(error: unknown) { return error instanceof Error ? error.message : String(error) } -function decodeBase64Utf8(value: string) { - const binary = atob(value.replace(/\s/g, '')) - const bytes = Uint8Array.from(binary, (char) => char.charCodeAt(0)) - return new TextDecoder().decode(bytes) -} - -function isRecord(value: unknown): value is Record { - return typeof value === 'object' && value !== null -} - -function isGitHubSearchResponse(value: unknown): value is GitHubSearchResponse { - if (!isRecord(value) || !Array.isArray(value.items)) return false - - return value.items.every((item) => { - if (!isRecord(item) || typeof item.path !== 'string') return false - const repository = item.repository - return isRecord(repository) && typeof repository.full_name === 'string' - }) -} - -function isGitHubContentResponse( - value: unknown, -): value is GitHubContentResponse { - if (!isRecord(value)) return false - return value.content === undefined || typeof value.content === 'string' -} - -function isPackageJson(value: unknown): value is PackageJson { - if (!isRecord(value)) return false - const isNameValid = value.name === undefined || typeof value.name === 'string' - const isPrivateValid = - value.private === undefined || typeof value.private === 'boolean' - return isNameValid && isPrivateValid -} - -function isNpmLatestResponse(value: unknown): value is NpmLatestResponse { - if (!isRecord(value)) return false - - const dist = value.dist - const isDistValid = - dist === undefined || - (isRecord(dist) && - (dist.tarball === undefined || typeof dist.tarball === 'string')) - const isVersionValid = - value.version === undefined || typeof value.version === 'string' - - return isDistValid && isVersionValid -} - function logScheduledError(task: string, startTime: number, error: unknown) { const duration = Date.now() - startTime const errorMessage = getErrorMessage(error) diff --git a/src/utils/intent-admin.functions.ts b/src/utils/intent-admin.functions.ts index f65b38ba0..eeb7cf37a 100644 --- a/src/utils/intent-admin.functions.ts +++ b/src/utils/intent-admin.functions.ts @@ -6,6 +6,7 @@ import { getIntentAdminStats as getIntentAdminStatsServer, listFailedVersions as listFailedVersionsServer, listIntentPackages as listIntentPackagesServer, + listIntentWorkflowRuns as listIntentWorkflowRunsServer, resetFailedVersions as resetFailedVersionsServer, retryIntentVersion as retryIntentVersionServer, seedIntentPackage as seedIntentPackageServer, @@ -26,6 +27,10 @@ export const listFailedVersions = createServerFn({ method: 'GET' }).handler( async () => listFailedVersionsServer(), ) +export const listIntentWorkflowRuns = createServerFn({ method: 'GET' }).handler( + async () => listIntentWorkflowRunsServer(), +) + export const triggerIntentDiscover = createServerFn({ method: 'POST' }).handler( async () => triggerIntentDiscoverServer(), ) diff --git a/src/utils/intent-admin.server.ts b/src/utils/intent-admin.server.ts index 28a99ca49..6a9eab950 100644 --- a/src/utils/intent-admin.server.ts +++ b/src/utils/intent-admin.server.ts @@ -12,9 +12,7 @@ import { import { eq, desc, sql } from 'drizzle-orm' import { requireCapability } from './auth.server' import { - searchIntentPackages, fetchPackument, - isIntentCompatible, selectVersionsToSync, extractSkillsFromTarball, } from './intent.server' @@ -22,12 +20,18 @@ import { upsertIntentPackage, getKnownVersions, enqueuePackageVersion, - markPackageVerified, replaceSkillsForVersion, - getPendingVersions, markVersionSynced, - markVersionFailed, } from './intent-db.server' +import { intentWorkflowRegistrations } from '~/utils/intent-workflows.server' +import { + discoverIntentPackages, + processIntentVersion, + selectPendingIntentVersions, + summarizeIntentProcessResults, +} from '~/utils/intent-sync.server' +import type { IntentVersionProcessResult } from '~/utils/intent-sync.server' +import { workflowExecutionStore } from '~/utils/workflow-runtime.server' // --------------------------------------------------------------------------- // Stats / overview @@ -74,6 +78,34 @@ export async function getIntentAdminStats() { } } +export async function listIntentWorkflowRuns() { + await requireCapability({ data: { capability: 'admin' } }) + + const runs = await Promise.all( + Object.keys(intentWorkflowRegistrations).map((workflowId) => + workflowExecutionStore.listRuns({ + workflowId, + limit: 5, + }), + ), + ) + + return runs + .flat() + .sort((a, b) => b.updatedAt - a.updatedAt) + .slice(0, 10) + .map((run) => ({ + runId: run.runId, + workflowId: run.workflowId, + workflowVersion: run.workflowVersion, + status: run.status, + waitingFor: run.waitingFor?.signalName, + wakeAt: run.wakeAt ? new Date(run.wakeAt) : null, + createdAt: new Date(run.createdAt), + updatedAt: new Date(run.updatedAt), + })) +} + // --------------------------------------------------------------------------- // Package list (all known packages with status) // --------------------------------------------------------------------------- @@ -144,48 +176,7 @@ export async function listFailedVersions() { export async function triggerIntentDiscover() { await requireCapability({ data: { capability: 'admin' } }) - let packagesDiscovered = 0 - let packagesVerified = 0 - let versionsEnqueued = 0 - const errors: Array = [] - - const searchResults = await searchIntentPackages() - packagesDiscovered = searchResults.objects.length - - for (const { package: pkg } of searchResults.objects) { - try { - await upsertIntentPackage({ name: pkg.name, verified: false }) - - const packument = await fetchPackument(pkg.name) - const latestVersion = packument['dist-tags'].latest - if (!latestVersion) continue - - const latestMeta = packument.versions[latestVersion] - if (!latestMeta || !isIntentCompatible(latestMeta)) continue - - await markPackageVerified(pkg.name) - packagesVerified++ - - const knownVersions = await getKnownVersions(pkg.name) - const versionsToEnqueue = selectVersionsToSync(packument, knownVersions) - - for (const { version, tarball, publishedAt } of versionsToEnqueue) { - await enqueuePackageVersion({ - packageName: pkg.name, - version, - tarballUrl: tarball, - publishedAt, - }) - versionsEnqueued++ - } - } catch (err) { - errors.push( - `${pkg.name}: ${err instanceof Error ? err.message : String(err)}`, - ) - } - } - - return { packagesDiscovered, packagesVerified, versionsEnqueued, errors } + return discoverIntentPackages() } // --------------------------------------------------------------------------- @@ -195,50 +186,22 @@ export async function triggerIntentDiscover() { export async function triggerIntentProcess({ data }: { data: any }) { await requireCapability({ data: { capability: 'admin' } }) - const pending = await getPendingVersions(data.limit) - const results: Array<{ - packageName: string - version: string - status: 'synced' | 'failed' - skillCount?: number - error?: string - }> = [] - - for (const item of pending) { - if (!item.tarballUrl) { - await markVersionFailed(item.id, 'No tarball URL recorded') - results.push({ - packageName: item.packageName, - version: item.version, - status: 'failed', - error: 'No tarball URL recorded', - }) - continue - } - + const pending = await selectPendingIntentVersions({ limit: data.limit ?? 10 }) + const results: Array = [] + for (const version of pending) { try { - const skills = await extractSkillsFromTarball(item.tarballUrl) - await replaceSkillsForVersion(item.id, skills) - await markVersionSynced(item.id, skills.length) - results.push({ - packageName: item.packageName, - version: item.version, - status: 'synced', - skillCount: skills.length, - }) - } catch (err) { - const error = err instanceof Error ? err.message : String(err) - await markVersionFailed(item.id, error) + results.push(await processIntentVersion(version.id)) + } catch (error) { results.push({ - packageName: item.packageName, - version: item.version, + packageName: version.packageName, + version: version.version, status: 'failed', - error, + error: error instanceof Error ? error.message : String(error), }) } } - return { processed: results.length, results } + return summarizeIntentProcessResults(results) } // --------------------------------------------------------------------------- diff --git a/src/utils/intent-db.server.ts b/src/utils/intent-db.server.ts index 66d187173..a0323e9d3 100644 --- a/src/utils/intent-db.server.ts +++ b/src/utils/intent-db.server.ts @@ -83,14 +83,16 @@ export async function getKnownVersions( // Pull up to `limit` pending versions ordered by createdAt (FIFO queue). // Also includes 'failed' rows so they get retried each cycle. -export async function getPendingVersions(limit: number): Promise< - Array<{ - id: number - packageName: string - version: string - tarballUrl: string | null - }> -> { +export interface PendingIntentVersion { + id: number + packageName: string + version: string + tarballUrl: string | null +} + +export async function getPendingVersions( + limit: number, +): Promise> { return db .select({ id: intentPackageVersions.id, @@ -104,6 +106,30 @@ export async function getPendingVersions(limit: number): Promise< .limit(limit) } +export interface IntentVersionForProcessing extends PendingIntentVersion { + syncStatus: string + skillCount: number +} + +export async function getVersionForProcessing( + id: number, +): Promise { + const rows = await db + .select({ + id: intentPackageVersions.id, + packageName: intentPackageVersions.packageName, + version: intentPackageVersions.version, + tarballUrl: intentPackageVersions.tarballUrl, + syncStatus: intentPackageVersions.syncStatus, + skillCount: intentPackageVersions.skillCount, + }) + .from(intentPackageVersions) + .where(eq(intentPackageVersions.id, id)) + .limit(1) + + return rows[0] +} + export async function getSkillsForVersion( packageVersionId: number, ): Promise> { diff --git a/src/utils/intent-sync.server.ts b/src/utils/intent-sync.server.ts new file mode 100644 index 000000000..b8ae7015a --- /dev/null +++ b/src/utils/intent-sync.server.ts @@ -0,0 +1,353 @@ +import { z } from 'zod' +import { getCurrentHostRuntimeEnv } from '~/server/runtime/host.server' +import { + extractSkillsFromTarball, + fetchPackument, + isIntentCompatible, + searchIntentPackages, + selectVersionsToSync, +} from '~/utils/intent.server' +import { + enqueuePackageVersion, + getKnownVersions, + getPendingVersions, + getVersionForProcessing, + markPackageVerified, + markVersionFailed, + markVersionSynced, + replaceSkillsForVersion, + upsertIntentPackage, +} from '~/utils/intent-db.server' + +const githubSearchResponseSchema = z.object({ + items: z.array( + z.object({ + path: z.string(), + repository: z.object({ full_name: z.string() }), + }), + ), +}) + +const githubContentResponseSchema = z.object({ + content: z.string().optional(), +}) + +const packageJsonSchema = z.object({ + name: z.string().optional(), + private: z.boolean().optional(), +}) + +const npmLatestSchema = z.object({ + version: z.string().optional(), + dist: z.object({ tarball: z.string().optional() }).optional(), +}) + +export interface IntentDiscoveryResult { + packagesDiscovered: number + githubCandidates: number + packagesVerified: number + versionsEnqueued: number + errors: Array +} + +export interface IntentVersionProcessResult { + packageName: string + version: string + status: 'synced' | 'failed' + skillCount?: number + error?: string +} + +export interface IntentProcessResult { + processed: number + failed: number + deferred: number + results: Array +} + +export interface IntentVersionToProcess { + id: number + packageName: string + version: string +} + +export interface IntentSyncOperations { + discoverIntentPackages: () => Promise + selectPendingIntentVersions: (options: { + limit: number + }) => Promise> + processIntentVersion: ( + versionId: number, + ) => Promise +} + +export const defaultIntentSyncOperations: IntentSyncOperations = { + discoverIntentPackages, + selectPendingIntentVersions, + processIntentVersion, +} + +export async function discoverIntentPackages(): Promise { + const errors: Array = [] + let packagesDiscovered = 0 + let githubCandidates = 0 + let packagesVerified = 0 + let versionsEnqueued = 0 + + try { + const searchResults = await searchIntentPackages() + const packageNames = dedupe( + searchResults.objects.map((item) => item.package.name), + ) + packagesDiscovered = packageNames.length + + for (const packageName of packageNames) { + try { + const enqueued = await discoverNpmPackage(packageName) + if (enqueued !== null) { + packagesVerified++ + versionsEnqueued += enqueued + } + } catch (error) { + errors.push(`npm/${packageName}: ${getErrorMessage(error)}`) + } + } + } catch (error) { + errors.push(`npm-search: ${getErrorMessage(error)}`) + } + + const githubToken = + getCurrentHostRuntimeEnv()?.GITHUB_AUTH_TOKEN ?? + process.env.GITHUB_AUTH_TOKEN + if (githubToken) { + try { + const githubResult = await discoverGitHubPackages(githubToken) + githubCandidates = githubResult.githubCandidates + packagesVerified += githubResult.packagesVerified + versionsEnqueued += githubResult.versionsEnqueued + errors.push(...githubResult.errors) + } catch (error) { + errors.push(`github-search: ${getErrorMessage(error)}`) + } + } + + return { + packagesDiscovered, + githubCandidates, + packagesVerified, + versionsEnqueued, + errors, + } +} + +export async function selectPendingIntentVersions(options: { + limit: number +}): Promise> { + const versions = await getPendingVersions(options.limit) + + return versions.map((version) => ({ + id: version.id, + packageName: version.packageName, + version: version.version, + })) +} + +export function summarizeIntentProcessResults( + results: Array, +): IntentProcessResult { + return { + processed: results.filter((result) => result.status === 'synced').length, + failed: results.filter((result) => result.status === 'failed').length, + deferred: 0, + results, + } +} + +async function discoverNpmPackage(packageName: string): Promise { + await upsertIntentPackage({ name: packageName, verified: false }) + + const packument = await fetchPackument(packageName) + const latestVersion = packument['dist-tags'].latest + const latestMeta = latestVersion ? packument.versions[latestVersion] : null + if (!latestVersion || !latestMeta || !isIntentCompatible(latestMeta)) { + return null + } + + await markPackageVerified(packageName) + return enqueueVersionsFromPackument(packageName, packument) +} + +async function discoverGitHubPackages(githubToken: string): Promise<{ + githubCandidates: number + packagesVerified: number + versionsEnqueued: number + errors: Array +}> { + const ghHeaders = { + Authorization: `Bearer ${githubToken}`, + Accept: 'application/vnd.github.v3+json', + } + const searchRes = await fetch( + 'https://api.github.com/search/code?q=%22%40tanstack%2Fintent%22+filename%3Apackage.json&per_page=100', + { headers: ghHeaders }, + ) + if (!searchRes.ok) throw new Error(`GitHub search ${searchRes.status}`) + + const searchData = githubSearchResponseSchema.parse(await searchRes.json()) + const candidates = dedupeBy( + searchData.items.map((item) => ({ + repo: item.repository.full_name, + path: item.path, + })), + (item) => `${item.repo}|${item.path}`, + ) + let packagesVerified = 0 + let versionsEnqueued = 0 + const errors: Array = [] + + for (const candidate of candidates) { + try { + const enqueued = await discoverGitHubPackage(candidate, ghHeaders) + if (enqueued !== null) { + packagesVerified++ + versionsEnqueued += enqueued + } + } catch (error) { + errors.push( + `github/${candidate.repo}/${candidate.path}: ${getErrorMessage(error)}`, + ) + } + } + + return { + githubCandidates: candidates.length, + packagesVerified, + versionsEnqueued, + errors, + } +} + +async function discoverGitHubPackage( + candidate: { repo: string; path: string }, + headers: HeadersInit, +): Promise { + const contentRes = await fetch( + `https://api.github.com/repos/${candidate.repo}/contents/${candidate.path}`, + { headers }, + ) + if (!contentRes.ok) return null + + const contentData = githubContentResponseSchema.parse(await contentRes.json()) + if (!contentData.content) return null + + const packageJson = packageJsonSchema.parse( + JSON.parse(Buffer.from(contentData.content, 'base64').toString('utf-8')), + ) + if (!packageJson.name || packageJson.private) return null + + const npmRes = await fetch( + `https://registry.npmjs.org/${encodeURIComponent(packageJson.name)}/latest`, + ) + if (!npmRes.ok) return null + + const npmMeta = npmLatestSchema.parse(await npmRes.json()) + if (!npmMeta.dist?.tarball) return null + + const skills = await extractSkillsFromTarball(npmMeta.dist.tarball) + if (skills.length === 0) return null + + await upsertIntentPackage({ name: packageJson.name, verified: true }) + await markPackageVerified(packageJson.name) + + const packument = await fetchPackument(packageJson.name) + return enqueueVersionsFromPackument(packageJson.name, packument) +} + +async function enqueueVersionsFromPackument( + packageName: string, + packument: Awaited>, +): Promise { + const knownVersions = await getKnownVersions(packageName) + const versionsToEnqueue = selectVersionsToSync(packument, knownVersions) + + for (const version of versionsToEnqueue) { + await enqueuePackageVersion({ + packageName, + version: version.version, + tarballUrl: version.tarball, + publishedAt: version.publishedAt, + }) + } + + return versionsToEnqueue.length +} + +export async function processIntentVersion( + versionId: number, +): Promise { + const version = await getVersionForProcessing(versionId) + if (!version) { + throw new Error(`Intent package version ${versionId} not found`) + } + + if (version.syncStatus === 'synced') { + return { + packageName: version.packageName, + version: version.version, + status: 'synced', + skillCount: version.skillCount, + } + } + + if (!version.tarballUrl) { + const reason = 'No tarball URL recorded' + await markVersionFailed(version.id, reason) + return { + packageName: version.packageName, + version: version.version, + status: 'failed', + error: reason, + } + } + + try { + const skills = await extractSkillsFromTarball(version.tarballUrl) + await replaceSkillsForVersion(version.id, skills) + await markVersionSynced(version.id, skills.length) + return { + packageName: version.packageName, + version: version.version, + status: 'synced', + skillCount: skills.length, + } + } catch (error) { + const reason = getErrorMessage(error) + await markVersionFailed(version.id, reason) + return { + packageName: version.packageName, + version: version.version, + status: 'failed', + error: reason, + } + } +} + +function dedupe(values: Array): Array { + return dedupeBy(values, (value) => value) +} + +function dedupeBy(values: Array, getKey: (value: T) => string): Array { + const seen = new Set() + const result: Array = [] + for (const value of values) { + const key = getKey(value) + if (seen.has(key)) continue + seen.add(key) + result.push(value) + } + return result +} + +function getErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error) +} diff --git a/src/utils/intent-workflows.server.ts b/src/utils/intent-workflows.server.ts new file mode 100644 index 000000000..f5f8508e7 --- /dev/null +++ b/src/utils/intent-workflows.server.ts @@ -0,0 +1,125 @@ +import { createWorkflow } from '@tanstack/workflow-core' +import type { StepOptions } from '@tanstack/workflow-core' +import { every } from '@tanstack/workflow-runtime' +import type { WorkflowRegistrationMap } from '@tanstack/workflow-runtime' +import { z } from 'zod' +import { + defaultIntentSyncOperations, + summarizeIntentProcessResults, +} from '~/utils/intent-sync.server' +import type { + IntentSyncOperations, + IntentVersionProcessResult, +} from '~/utils/intent-sync.server' + +const intentDiscoverInputSchema = z.object({ + source: z.enum(['schedule', 'admin']).default('schedule'), +}) + +const intentProcessInputSchema = z.object({ + batchSize: z.number().int().positive().max(100), + source: z.enum(['schedule', 'admin']).default('schedule'), +}) + +const discoverStepOptions = { + retry: { maxAttempts: 2, backoff: 'exponential', baseMs: 1_000 }, + timeout: 10 * 60 * 1000, +} satisfies StepOptions + +const selectPendingVersionsStepOptions = { + timeout: 30_000, +} satisfies StepOptions + +const processVersionStepOptions = { + timeout: 2 * 60 * 1000, +} satisfies StepOptions + +function createIntentDiscoverWorkflow( + operations: IntentSyncOperations = defaultIntentSyncOperations, +) { + return createWorkflow({ + id: 'intent-discover-workflow', + input: intentDiscoverInputSchema, + }).handler((ctx) => + ctx.step( + 'discover-intent-packages', + () => operations.discoverIntentPackages(), + discoverStepOptions, + ), + ) +} + +export function createIntentProcessWorkflow( + operations: IntentSyncOperations = defaultIntentSyncOperations, +) { + return createWorkflow({ + id: 'intent-process-workflow', + input: intentProcessInputSchema, + }).handler(async (ctx) => { + const versions = await ctx.step( + 'select-pending-versions', + () => + operations.selectPendingIntentVersions({ + limit: ctx.input.batchSize, + }), + selectPendingVersionsStepOptions, + ) + const results: Array = [] + + for (const version of versions) { + try { + results.push( + await ctx.step( + `process-version:${version.id}`, + () => operations.processIntentVersion(version.id), + processVersionStepOptions, + ), + ) + } catch (error) { + results.push({ + packageName: version.packageName, + version: version.version, + status: 'failed', + error: getErrorMessage(error), + }) + } + } + + return summarizeIntentProcessResults(results) + }) +} + +const intentDiscoverWorkflow = createIntentDiscoverWorkflow() +const intentProcessWorkflow = createIntentProcessWorkflow() + +export const intentWorkflowRegistrations = { + [intentDiscoverWorkflow.id]: { + load: async () => intentDiscoverWorkflow, + schedules: [ + { + id: 'intent-discover-every-6h', + schedule: every.hours(6), + overlapPolicy: 'skip', + input: { source: 'schedule' }, + }, + ], + }, + [intentProcessWorkflow.id]: { + load: async () => intentProcessWorkflow, + schedules: [ + { + id: 'intent-process-every-15m', + schedule: every.minutes(15), + overlapPolicy: 'skip', + input: { + batchSize: 50, + source: 'schedule', + }, + }, + ], + }, +} satisfies WorkflowRegistrationMap + +function getErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error) +} diff --git a/src/utils/workflow-runtime.server.ts b/src/utils/workflow-runtime.server.ts new file mode 100644 index 000000000..de3727ed1 --- /dev/null +++ b/src/utils/workflow-runtime.server.ts @@ -0,0 +1,11 @@ +import { defineWorkflowRuntime } from '@tanstack/workflow-runtime' +import { createDrizzlePostgresWorkflowStore } from '@tanstack/workflow-store-drizzle-postgres' +import { db } from '~/db/client' +import { intentWorkflowRegistrations } from '~/utils/intent-workflows.server' + +export const workflowExecutionStore = createDrizzlePostgresWorkflowStore({ db }) + +export const workflowRuntime = defineWorkflowRuntime({ + store: workflowExecutionStore, + workflows: intentWorkflowRegistrations, +}) diff --git a/tests/intent-workflow.test.ts b/tests/intent-workflow.test.ts new file mode 100644 index 000000000..cbd2d6ec6 --- /dev/null +++ b/tests/intent-workflow.test.ts @@ -0,0 +1,209 @@ +import assert from 'node:assert/strict' +import { test } from 'node:test' +import { + defineWorkflowRuntime, + every, + inMemoryWorkflowExecutionStore, + materializeWorkflowSchedules, +} from '@tanstack/workflow-runtime' +import type { WorkflowExecutionStore } from '@tanstack/workflow-runtime' +import { createIntentProcessWorkflow } from '../src/utils/intent-workflows.server' +import type { + IntentProcessResult, + IntentSyncOperations, + IntentVersionProcessResult, +} from '../src/utils/intent-sync.server' + +test('duplicate scheduled invocation with the same bucket is idempotent', async () => { + let selectCalls = 0 + let processCalls = 0 + const store = inMemoryWorkflowExecutionStore() + const { processSchedule, processWorkflow, runtime } = createTestIntentRuntime( + { + store, + operations: { + ...noopOperations, + selectPendingIntentVersions: async () => { + selectCalls++ + return [{ id: 1, packageName: '@example/pkg', version: '1.0.0' }] + }, + processIntentVersion: async () => { + processCalls++ + return { + packageName: '@example/pkg', + version: '1.0.0', + status: 'synced', + skillCount: 1, + } + }, + }, + }, + ) + const now = Date.UTC(2026, 4, 26, 12, 0, 0) + + await materializeWorkflowSchedules(runtime, { now }) + const first = await runtime.sweep({ now, includeEvents: false }) + await materializeWorkflowSchedules(runtime, { now }) + const second = await runtime.sweep({ now, includeEvents: false }) + + const processRun = first.scheduled.find( + (run) => run.workflowId === processWorkflow.id, + ) + assert.ok(processRun) + assert.equal( + processRun.runId, + `${processWorkflow.id}:${processSchedule.id}:${now}`, + ) + assert.equal(second.scheduled.length, 0) + assert.equal(selectCalls, 1) + assert.equal(processCalls, 1) +}) + +test('failed package version step does not prevent other versions from processing', async () => { + const goodResult: IntentVersionProcessResult = { + packageName: '@example/good', + version: '1.0.0', + status: 'synced', + skillCount: 1, + } + const badResult: IntentVersionProcessResult = { + packageName: '@example/bad', + version: '1.0.0', + status: 'failed', + error: 'tarball failed', + } + const laterResult: IntentVersionProcessResult = { + packageName: '@example/later', + version: '1.0.0', + status: 'synced', + skillCount: 2, + } + const expected: IntentProcessResult = { + processed: 2, + failed: 1, + deferred: 0, + results: [goodResult, badResult, laterResult], + } + const { processWorkflow, runtime } = createTestIntentRuntime({ + store: inMemoryWorkflowExecutionStore(), + operations: { + ...noopOperations, + selectPendingIntentVersions: async () => [ + { id: 1, packageName: '@example/good', version: '1.0.0' }, + { id: 2, packageName: '@example/bad', version: '1.0.0' }, + { id: 3, packageName: '@example/later', version: '1.0.0' }, + ], + processIntentVersion: async (versionId: number) => { + if (versionId === 1) return goodResult + if (versionId === 2) throw new Error('tarball failed') + return laterResult + }, + }, + }) + + const result = await runtime.startRun({ + workflowId: processWorkflow.id, + runId: 'intent-process:test-partial-failure', + input: { batchSize: 3, source: 'admin' }, + now: Date.UTC(2026, 4, 26, 12, 15, 0), + includeEvents: false, + }) + + assert.equal(result.kind, 'completed') + assert.ok(result.run) + assert.deepEqual(result.run.output, expected) +}) + +test('process workflow continues from queue state across scheduled invocations', async () => { + const queue = [ + { id: 1, packageName: '@example/one', version: '1.0.0' }, + { id: 2, packageName: '@example/two', version: '1.0.0' }, + ] + const processed: Array = [] + const store = inMemoryWorkflowExecutionStore() + const operations: IntentSyncOperations = { + ...noopOperations, + selectPendingIntentVersions: async () => { + const next = queue.shift() + return next ? [next] : [] + }, + processIntentVersion: async (versionId: number) => { + const packageName = versionId === 1 ? '@example/one' : '@example/two' + const version = '1.0.0' + processed.push(`${packageName}@${version}`) + return { + packageName, + version, + status: 'synced', + skillCount: 1, + } + }, + } + const first = createTestIntentRuntime({ + store, + operations, + }) + const second = createTestIntentRuntime({ + store, + operations, + }) + const firstBucket = Date.UTC(2026, 4, 26, 12, 0, 0) + const secondBucket = Date.UTC(2026, 4, 26, 12, 15, 0) + + await materializeWorkflowSchedules(first.runtime, { now: firstBucket }) + await first.runtime.sweep({ now: firstBucket, includeEvents: false }) + await materializeWorkflowSchedules(second.runtime, { now: secondBucket }) + await second.runtime.sweep({ now: secondBucket, includeEvents: false }) + + const runs = await store.listRuns({ + workflowId: first.processWorkflow.id, + limit: 10, + }) + + assert.deepEqual(processed, ['@example/one@1.0.0', '@example/two@1.0.0']) + assert.equal(runs.length, 2) +}) + +const noopOperations: IntentSyncOperations = { + discoverIntentPackages: async () => ({ + packagesDiscovered: 0, + githubCandidates: 0, + packagesVerified: 0, + versionsEnqueued: 0, + errors: [], + }), + selectPendingIntentVersions: async () => [], + processIntentVersion: async () => { + throw new Error('No test version configured') + }, +} + +function createTestIntentRuntime(options: { + operations: IntentSyncOperations + store?: WorkflowExecutionStore +}) { + const processWorkflow = createIntentProcessWorkflow(options.operations) + const processSchedule = { + id: 'intent-process-every-15m', + schedule: every.minutes(15), + overlapPolicy: 'skip', + input: { + batchSize: 50, + source: 'schedule', + }, + } as const + + return { + processSchedule, + processWorkflow, + runtime: defineWorkflowRuntime({ + store: options.store ?? inMemoryWorkflowExecutionStore(), + workflows: { + [processWorkflow.id]: { + load: async () => processWorkflow, + schedules: [processSchedule], + }, + }, + }), + } +}