diff --git a/src/flowJob.ts b/src/flowJob.ts new file mode 100644 index 0000000..78a1d99 --- /dev/null +++ b/src/flowJob.ts @@ -0,0 +1,14 @@ +import { Job } from './job' +import { FlowJobCreateData } from './types' + +export class FlowJob { + job: Job + children?: FlowJob[] + + constructor(data: FlowJobCreateData) { + this.job = new Job(data.job) + this.children = data.children + ? data.children.map((j) => new FlowJob(j)) + : [] + } +} diff --git a/src/flowQueue.ts b/src/flowQueue.ts new file mode 100644 index 0000000..250fd2e --- /dev/null +++ b/src/flowQueue.ts @@ -0,0 +1,132 @@ +import { KV, Kvm } from '@nats-io/kv' +import { FlowJob } from './flowJob' +import { Job } from './job' +import { Queue } from './queue' +import { ChildToParentsKVValue, DependenciesKVValue } from './types' + +export class FlowQueue extends Queue { + private parentChildrenStore: KV | null = null + private childParentsStore: KV | null = null + + public override async setup(): Promise { + try { + await super.setup() + const kvm = await new Kvm(this.connection) + this.parentChildrenStore = await kvm.create(`${this.name}_parent_id`) + this.childParentsStore = await kvm.create(`${this.name}_parents`) + } catch (e) { + console.error(`Error connecting to JetStream: ${e}`) + throw e + } + } + + public async addFlowJob(tree: FlowJob, priority: number = 1): Promise { + const deepestJobs = await this.traverseJobTree(tree) + await this.addJobs(deepestJobs, priority) + } + + private async traverseJobTree( + node: FlowJob, + parentId: string | null = null, + ): Promise { + const currentJob = node.job + if (parentId) { + currentJob.meta.parentId = parentId + } + + const children = node.children || [] + if (children.length === 0) { + return [currentJob] + } + + const newChildrenForCurrentJobCount: number = + await this.updateChildDependencies({ + parentJob: currentJob, + childJobs: children.map((child) => child.job), + }) + + await this.updateParentDependecies({ + parentJob: currentJob, + newChildrentCount: newChildrenForCurrentJobCount, + }) + + const deepestJobs: Job[] = [] + for (const child of children) { + const traverseResult = await this.traverseJobTree(child, currentJob.id) + deepestJobs.push(...traverseResult) + } + + return deepestJobs + } + + private async updateChildDependencies({ + parentJob, + childJobs, + }: { + parentJob: Job + childJobs: Job[] + }) { + let newChildrenForCurrentJobCount: number = 0 + for (const childJob of childJobs) { + const childParentsKVEntry = await this.childParentsStore!.get(childJob.id) + if (!childParentsKVEntry) { + await this.childParentsStore!.put( + childJob.id, + JSON.stringify({ + parentIds: [parentJob.id], + }), + ) + newChildrenForCurrentJobCount++ + continue + } + + const parentsInfo: ChildToParentsKVValue = childParentsKVEntry.json() + + if (parentsInfo.parentIds.includes(parentJob.id)) continue + + parentsInfo.parentIds.push(parentJob.id) + await this.childParentsStore!.put( + childJob.id, + JSON.stringify(parentsInfo), + { + previousSeq: childParentsKVEntry.revision, + }, + ) + newChildrenForCurrentJobCount++ + } + + return newChildrenForCurrentJobCount + } + + private async updateParentDependecies({ + parentJob, + newChildrentCount, + }: { + parentJob: Job + newChildrentCount: number + }) { + const existingParentDependencies = await this.parentChildrenStore!.get( + parentJob.id, + ) + if (!existingParentDependencies) { + await this.parentChildrenStore!.put( + parentJob.id, + JSON.stringify({ + ...parentJob, + childrenCount: newChildrentCount, + }), + ) + } else { + const parentDependencies: DependenciesKVValue = + existingParentDependencies.json() + parentDependencies.childrenCount += newChildrentCount + await this.parentChildrenStore!.put( + parentJob.id, + JSON.stringify(parentDependencies), + { + previousSeq: existingParentDependencies.revision, + }, + ) + } + } +} diff --git a/src/index.ts b/src/index.ts index 6997e71..c7b6c94 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ +export * from './flowQueue' export * from './queue' export * from './job' export * from './worker' diff --git a/src/job.ts b/src/job.ts index 92a3d92..2840e51 100644 --- a/src/job.ts +++ b/src/job.ts @@ -8,7 +8,7 @@ export class Job { startTime: number retryCount: number timeout: number - // parentId?: string + parentId?: string } data: unknown queueName: string diff --git a/src/jobEvent.ts b/src/jobEvent.ts index a7703c3..79cd647 100644 --- a/src/jobEvent.ts +++ b/src/jobEvent.ts @@ -5,16 +5,17 @@ type Event = { export type JobCompletedEvent = Event<'JOB_COMPLETED', { jobId: string }> export type JobFailedEvent = Event<'JOB_FAILED', { jobId: string }> -// export type JobChildCompletedEvent = Event< -// 'JOB_CHILD_COMPLETED', -// { childId: string; parentId: string } -// > -// export type JobChildFailedEvent = Event< -// 'JOB_CHILD_FAILED', -// { childId: string; parentId: string } -// > +export type JobChildCompletedEvent = Event< + 'JOB_CHILD_COMPLETED', + { childId: string; parentId: string } +> +export type JobChildFailedEvent = Event< + 'JOB_CHILD_FAILED', + { childId: string; parentId: string } +> export type JobEvent = - // | JobChildFailedEvent - // | JobChildCompletedEvent - JobCompletedEvent | JobFailedEvent + | JobChildFailedEvent + | JobChildCompletedEvent + | JobCompletedEvent + | JobFailedEvent diff --git a/src/types.ts b/src/types.ts index d758aa8..90d6a6c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,8 +1,8 @@ -// import { Job } from './job' +import { Job } from './job' -// export type ParentJob = Job & { -// childrenCount: number -// } +export type ParentJob = Job & { + childrenCount: number +} export type JobCreateData = { name: string @@ -13,18 +13,18 @@ export type JobCreateData = { timeout?: number } -// export type FlowJobCreateData = { -// job: JobCreateData -// children?: FlowJobCreateData[] -// } +export type FlowJobCreateData = { + job: JobCreateData + children?: FlowJobCreateData[] +} export type RateLimit = { duration: number max: number } -// export type DependenciesKVValue = ParentJob +export type DependenciesKVValue = ParentJob -// export type ChildToParentsKVValue = { -// parentIds: string[] -// } +export type ChildToParentsKVValue = { + parentIds: string[] +} diff --git a/src/worker.ts b/src/worker.ts index 53f0ce9..9081141 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -5,14 +5,15 @@ import { AckPolicy, JetStreamManager, } from '@nats-io/jetstream' -import { RateLimit } from './types' +import { KV, Kvm } from '@nats-io/kv' +import { ChildToParentsKVValue, DependenciesKVValue, RateLimit } from './types' import { Limiter, FixedWindowLimiter, IntervalLimiter } from './limiter' import { sleep } from './utils' import { headers, TimeoutError } from '@nats-io/nats-core' import { Job } from './job' import { - // JobChildCompletedEvent, - // JobChildFailedEvent, + JobChildCompletedEvent, + JobChildFailedEvent, JobCompletedEvent, JobEvent, JobFailedEvent, @@ -56,6 +57,8 @@ export class Worker { protected processingNow = 0 protected loopPromise: Promise | null = null protected workerEventsLoopPromise: Promise | null = null + protected parentChildrenStore: KV | null = null + protected childParentsStore: KV | null = null protected priorityQuota?: Map< number, { @@ -65,6 +68,7 @@ export class Worker { > = new Map() protected jobCompletedConsumer: Consumer | null = null protected jobFailedConsumer: Consumer | null = null + protected parentNotificationConsumer: Consumer | null = null constructor(opts: WorkerOpts) { this.client = opts.client @@ -102,7 +106,10 @@ export class Worker { public async setup() { try { this.manager = await this.client.jetstreamManager() + const kvm = await new Kvm(this.client) // TODO: Rename + this.parentChildrenStore = await kvm.open(`${this.name}_parent_id`) + this.childParentsStore = await kvm.open(`${this.name}_parents`) this.consumers = await this.setupConsumers() await this.setupInternalQueues(this.manager) @@ -131,6 +138,16 @@ export class Worker { name: `${this.name}_failed`, subjects: [`${this.name}_failed`], }) + + // Parent notification + await manager.streams.add({ + name: `${this.name}_parent_notification`, + subjects: [`${this.name}_parent_notification`], + }) + + const kvm = await new Kvm(this.client) + this.parentChildrenStore = await kvm.create(`${this.name}_parent_id`) + this.childParentsStore = await kvm.create(`${this.name}_parents`) } private async setupInternalQueuesConsumers(manager: JetStreamManager) { @@ -151,6 +168,13 @@ export class Worker { ack_policy: AckPolicy.All, }) + await manager.consumers.add(`${this.name}_parent_notification`, { + filter_subject: `${this.name}_parent_notification`, + name: `parent_notification_consumer`, + durable_name: `parent_notification_consumer`, + ack_policy: AckPolicy.All, + }) + this.jobCompletedConsumer = await this.client.consumers.get( `${this.name}_completed`, `job_completed_consumer`, @@ -159,6 +183,10 @@ export class Worker { `${this.name}_failed`, `job_failed_consumer`, ) + this.parentNotificationConsumer = await this.client.consumers.get( + `${this.name}_parent_notification`, + `parent_notification_consumer`, + ) } private async publishJobCompletedEvent(job: Job) { @@ -191,6 +219,24 @@ export class Worker { }) } + private async publishChildJobCompletedEvent(event: JobChildCompletedEvent) { + const subject = `${this.name}_parent_notification` + const messageHeaders = headers() + messageHeaders.set('Nats-Msg-Id', crypto.randomUUID()) + await this.client.publish(subject, JSON.stringify(event), { + headers: messageHeaders, + }) + } + + private async publishChildJobFailedEvent(event: JobChildFailedEvent) { + const subject = `${this.name}_parent_notification` + const messageHeaders = headers() + messageHeaders.set('Nats-Msg-Id', crypto.randomUUID()) + await this.client.publish(subject, JSON.stringify(event), { + headers: messageHeaders, + }) + } + private async setupConsumers(): Promise { const consumers: Consumer[] = [] for (let i = 1; i <= this.priorities; i++) { @@ -315,13 +361,16 @@ export class Worker { protected async workerEventsLoop() { while (this.running) { - const [jobCompletedEvents, jobFailedEvents] = await Promise.all([ - this.fetch(this.jobCompletedConsumer!, 1), - this.fetch(this.jobFailedConsumer!, 1), - ]) + const [jobCompletedEvents, jobFailedEvents, parentNotificationEvents] = + await Promise.all([ + this.fetch(this.jobCompletedConsumer!, 1), + this.fetch(this.jobFailedConsumer!, 1), + this.fetch(this.parentNotificationConsumer!, 1), + ]) jobCompletedEvents.forEach((j) => this.processEventMessage(j)) jobFailedEvents.forEach((j) => this.processEventMessage(j)) + parentNotificationEvents.forEach((j) => this.processEventMessage(j)) await sleep(100) } } @@ -331,16 +380,14 @@ export class Worker { console.log('Processing event:', event) if (event.event === 'JOB_COMPLETED') { - // await this.processJobCompletedEvent(event) + await this.processJobCompletedEvent(event) } else if (event.event === 'JOB_FAILED') { - // await this.processJobFailedEvent(event) - } - // else if (event.event === 'JOB_CHILD_COMPLETED') { - // // await this.processChildJobCompletedEvent(event) - // } else if (event.event === 'JOB_CHILD_FAILED') { - // // await this.processChildJobFailedEvent(event) - // } - else { + await this.processJobFailedEvent(event) + } else if (event.event === 'JOB_CHILD_COMPLETED') { + await this.processChildJobCompletedEvent(event) + } else if (event.event === 'JOB_CHILD_FAILED') { + await this.processChildJobFailedEvent(event) + } else { console.error('Unknown event:', event) } @@ -348,27 +395,119 @@ export class Worker { console.log('Processing event finished:', event) } - // protected async processJobCompletedEvent( - // _jobCompletedEvent: JobCompletedEvent, - // ) { - // return - // } + protected async processJobCompletedEvent( + jobCompletedEvent: JobCompletedEvent, + ) { + const childParentsValue = await this.childParentsStore!.get( + jobCompletedEvent.data.jobId, + ) + if (!childParentsValue) return + + const childParents: ChildToParentsKVValue = childParentsValue.json() + const parentIds = childParents.parentIds + + for (const parentId of parentIds) { + const childCompletedEvent: JobChildCompletedEvent = { + event: 'JOB_CHILD_COMPLETED', + data: { + childId: jobCompletedEvent.data.jobId, + parentId: parentId, + }, + } + + await this.publishChildJobCompletedEvent(childCompletedEvent) + } + + await this.childParentsStore!.delete(jobCompletedEvent.data.jobId) + } - // protected async processJobFailedEvent(_event: JobFailedEvent) { - // return - // } + protected async processJobFailedEvent(event: JobFailedEvent) { + const childParentsValue = await this.childParentsStore!.get( + event.data.jobId, + ) + if (!childParentsValue) { + return + } - // protected async processChildJobCompletedEvent( - // _childJobCompletedEvent: JobChildCompletedEvent, - // ) { - // return - // } + const childParents: ChildToParentsKVValue = childParentsValue.json() + const parentIds = childParents.parentIds - // protected async processChildJobFailedEvent( - // _childJobCompletedEvent: JobChildFailedEvent, - // ) { - // return - // } + for (const parentId of parentIds) { + const childCompletedEvent: JobChildFailedEvent = { + event: 'JOB_CHILD_FAILED', + data: { + childId: event.data.jobId, + parentId: parentId, + }, + } + + await this.publishChildJobFailedEvent(childCompletedEvent) + } + + await this.childParentsStore!.delete(event.data.jobId) + } + + protected async processChildJobCompletedEvent( + childJobCompletedEvent: JobChildCompletedEvent, + ) { + try { + const parentId = childJobCompletedEvent.data.parentId + const parentChildrenDependenciesEntry = + await this.parentChildrenStore!.get(parentId) + + if (!parentChildrenDependenciesEntry) { + throw new Error('Parent job not found in KV store.') + } + + const parentChildrenDependencies: DependenciesKVValue = + parentChildrenDependenciesEntry.json() + + parentChildrenDependencies.childrenCount -= 1 + + if (parentChildrenDependencies.childrenCount === 0) { + await this.parentChildrenStore!.delete(parentId) + await this.publishParentJob(parentChildrenDependencies) + } else { + await this.parentChildrenStore!.put( + parentChildrenDependencies.id, + JSON.stringify(parentChildrenDependencies), + { + previousSeq: parentChildrenDependenciesEntry.revision, + }, + ) + } + } catch (e) { + console.error( + 'Failed to process child job completed event:', + childJobCompletedEvent, + 'Error:', + e, + ) + throw e + } + } + + protected async processChildJobFailedEvent( + childJobCompletedEvent: JobChildFailedEvent, + ) { + const parentId = childJobCompletedEvent.data.parentId + const parentChildrenDependenciesEntry = await this.parentChildrenStore!.get( + parentId, + ) + + if (!parentChildrenDependenciesEntry) { + throw new Error('Parent job not found in KV store.') + } + + const parentChildrenDependencies: DependenciesKVValue = + parentChildrenDependenciesEntry.json() + + parentChildrenDependencies.meta.failed = true + + // TODO: What if child fails publishes parent, then another child completes and cannot access parent? + await this.parentChildrenStore!.delete(parentId) + await this.publishParentJob(parentChildrenDependencies) + } protected async processTask(j: JsMsg) { this.processingNow += 1 @@ -437,18 +576,18 @@ export class Worker { } } - // protected async publishParentJob(parentJobData: Job): Promise { - // const subject = `${parentJobData.queueName}.1` - // const jobBytes = JSON.stringify(parentJobData) - // const msgHeaders = headers() - // msgHeaders.set('Nats-Msg-Id', parentJobData.id) - // await this.client.publish(subject, jobBytes, { - // headers: msgHeaders, - // }) - // console.log( - // `ParentJob: name=${parentJobData.name} id=${parentJobData.id} added to topic=${subject} successfully`, - // ) - // } + protected async publishParentJob(parentJobData: Job): Promise { + const subject = `${parentJobData.queueName}.1` + const jobBytes = JSON.stringify(parentJobData) + const msgHeaders = headers() + msgHeaders.set('Nats-Msg-Id', parentJobData.id) + await this.client.publish(subject, jobBytes, { + headers: msgHeaders, + }) + console.log( + `ParentJob: name=${parentJobData.name} id=${parentJobData.id} added to topic=${subject} successfully`, + ) + } protected async fetch(consumer: Consumer, count: number): Promise { // TODO: Maybe fail to fetch consumer info diff --git a/test/flowQueue/addFlowJob.test.ts b/test/flowQueue/addFlowJob.test.ts new file mode 100644 index 0000000..dc7e941 --- /dev/null +++ b/test/flowQueue/addFlowJob.test.ts @@ -0,0 +1,301 @@ +import { + describe, + it, + before, + after, + beforeEach, + afterEach, + mock, +} from 'node:test' +import { + jetstream, + JetStreamClient, + JetStreamManager, +} from '@nats-io/jetstream' +import { connect } from '@nats-io/transport-node' +import { NatsConnection } from '@nats-io/nats-core' +import assert from 'node:assert' +import { FlowQueue } from '../../src/flowQueue' +import { FlowJob } from '../../src/flowJob' +import { Job } from '../../src/job' +import { KV, Kvm } from '@nats-io/kv' +import { ChildToParentsKVValue, DependenciesKVValue } from '../../src/types' + +describe('FlowQueue.addFlowJob()', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + const queueMaxPriority = 3 + let queue: FlowQueue | undefined = undefined + let parentToChildrenKV: KV + let childToParentsKV: KV + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + beforeEach(async () => { + queue = new FlowQueue({ + name: queueName, + client: js, + connection: nc, + priorities: queueMaxPriority, + duplicateWindow: 2000, + }) + await queue.setup() + const kvm = new Kvm(js) + parentToChildrenKV = await kvm.open(`${queueName}_parent_id`) + childToParentsKV = await kvm.open(`${queueName}_parents`) + }) + + afterEach(async () => { + await jsm.streams.delete(queueName).catch(() => {}) + const kvm = new Kvm(js) + parentToChildrenKV = await kvm.open(`${queueName}_parent_id`) + if (parentToChildrenKV) await parentToChildrenKV.destroy() + childToParentsKV = await kvm.open(`${queueName}_parents`) + if (childToParentsKV) await childToParentsKV.destroy() + }) + + after(async () => { + await nc.close() + }) + + it('should add flowJob without children', async () => { + const job = new FlowJob({ + job: new Job({ + name: 'test', + queueName: queueName, + data: {}, + }), + }) + await queue!.addFlowJob(job) + + const stream = await jsm.streams.get(queueName) + const streamInfo = await stream.info() + assert.strictEqual(streamInfo.state.messages, 1) + }) + + it('should add parent info to KV', async () => { + mock.timers.enable({ + apis: ['Date'], + now: new Date('2023-05-14T11:01:58.135Z'), + }) + const childJob1 = new FlowJob({ + job: new Job({ + name: 'test-child', + queueName, + data: {}, + }), + }) + const job = new FlowJob({ + job: new Job({ + id: 'id-test', + name: 'test', + queueName: queueName, + data: {}, + }), + children: [childJob1], + }) + await queue!.addFlowJob(job) + + const keyValue = await parentToChildrenKV.get('id-test') + if (!keyValue) throw new Error('Parent not found') + + const data: DependenciesKVValue = keyValue.json() + assert.deepStrictEqual(data, { + id: 'id-test', + name: 'test', + meta: { + retryCount: 0, + startTime: new Date('2023-05-14T11:01:58.135Z').getTime(), + failed: false, + timeout: 0, + }, + data: {}, + queueName: 'queue', + childrenCount: 1, + }) + + mock.timers.reset() + }) + + it('should set parentId for child', async () => { + const childJob1 = new FlowJob({ + job: new Job({ + id: 'id-child1', + name: 'test-child', + queueName, + data: {}, + }), + }) + const job = new FlowJob({ + job: new Job({ + id: 'id-test', + name: 'test', + queueName: queueName, + data: {}, + }), + children: [childJob1], + }) + await queue!.addFlowJob(job) + + const message = await jsm.streams.getMessage(queueName, { + seq: 1, + }) + if (!message) throw new Error('Message not found') + + const parsedData: Job = message.json() + assert.strictEqual(parsedData.id, 'id-child1') + assert.strictEqual(parsedData.meta.parentId, 'id-test') + }) + + it('should set parents for child in KV', async () => { + mock.timers.enable({ + apis: ['Date'], + now: new Date('2023-05-14T11:01:58.135Z'), + }) + const childJob1 = new FlowJob({ + job: new Job({ + id: 'test-child', + name: 'test-child', + queueName, + data: {}, + }), + }) + const job = new FlowJob({ + job: new Job({ + id: 'id-test', + name: 'test', + queueName: queueName, + data: {}, + }), + children: [childJob1], + }) + await queue!.addFlowJob(job) + + const childParentsValue = await childToParentsKV.get('test-child') + if (!childParentsValue?.value) + throw new Error('Child parents value not found') + + const childParentsData: ChildToParentsKVValue = childParentsValue.json() + assert.deepStrictEqual(childParentsData, { + parentIds: ['id-test'], + }) + + mock.timers.reset() + }) + + it('should add new parent to child if child has multiple parents info to KV', async () => { + mock.timers.enable({ + apis: ['Date'], + now: new Date('2023-05-14T11:01:58.135Z'), + }) + const childJob1 = new FlowJob({ + job: new Job({ + id: 'child1', + name: 'child1', + queueName, + data: {}, + }), + }) + const parent1 = new FlowJob({ + job: new Job({ + id: 'parent1', + name: 'parent1', + queueName: queueName, + data: {}, + }), + children: [childJob1], + }) + const parent2 = new FlowJob({ + job: new Job({ + id: 'parent2', + name: 'parent2', + queueName: queueName, + data: {}, + }), + children: [childJob1], + }) + await queue!.addFlowJob(parent1) + await queue!.addFlowJob(parent2) + + const childParentsValue = await childToParentsKV.get('child1') + if (!childParentsValue?.value) + throw new Error('Child parents value not found') + + const childParentsData: ChildToParentsKVValue = childParentsValue.json() + assert.deepStrictEqual(childParentsData, { + parentIds: ['parent1', 'parent2'], + }) + + mock.timers.reset() + }) + + it('should add new child to parent', async () => { + mock.timers.enable({ + apis: ['Date'], + now: new Date('2023-05-14T11:01:58.135Z'), + }) + const childJob1 = new FlowJob({ + job: new Job({ + id: 'child1', + name: 'child1', + queueName, + data: {}, + }), + }) + const parent1 = new FlowJob({ + job: new Job({ + id: 'parent1', + name: 'parent1', + queueName: queueName, + data: {}, + }), + children: [childJob1], + }) + const childJob2 = new FlowJob({ + job: new Job({ + id: 'child2', + name: 'child2', + queueName: queueName, + data: {}, + }), + children: [], + }) + const parent2 = new FlowJob({ + job: new Job({ + id: 'parent1', + name: 'parent1', + queueName: queueName, + data: {}, + }), + children: [childJob2], + }) + await queue!.addFlowJob(parent1) + await queue!.addFlowJob(parent2) + + const parentValue = await parentToChildrenKV.get('parent1') + if (!parentValue) throw new Error('Parent not found') + const parentData: DependenciesKVValue = parentValue.json() + assert.deepStrictEqual(parentData, { + id: 'parent1', + name: 'parent1', + meta: { + retryCount: 0, + startTime: new Date('2023-05-14T11:01:58.135Z').getTime(), + failed: false, + timeout: 0, + }, + data: {}, + queueName: 'queue', + childrenCount: 2, + }) + + mock.timers.reset() + }) +}) diff --git a/test/worker/process/flowJob/test.ts b/test/worker/process/flowJob/test.ts new file mode 100644 index 0000000..22df11e --- /dev/null +++ b/test/worker/process/flowJob/test.ts @@ -0,0 +1,183 @@ +import { + after, + afterEach, + before, + beforeEach, + describe, + it, + mock, +} from 'node:test' +import { Worker } from '../../../../src/worker' +import { connect, NatsConnection } from '@nats-io/transport-node' +import { + jetstream, + JetStreamClient, + JetStreamManager, + JsMsg, +} from '@nats-io/jetstream' +import { Job } from '../../../../src/job' +import assert from 'assert' +import { sleep } from '../../../../src/utils' +import { FlowJob } from '../../../../src/flowJob' +import { FlowQueue } from '../../../../src/flowQueue' +import { deleteAllKV } from '../../../helpers/deleteAllKV' +import { deleteAllStreams } from '../../../helpers/deleteAllStreams' + +describe('Worker.process(): flowJob', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + const queueMaxPriority = 3 + const maxRetries = 2 + let queue: FlowQueue | undefined = undefined + let worker: Worker | undefined = undefined + let processorMock: ReturnType< + typeof mock.fn<(job: JsMsg, timeout: number) => Promise> + > + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + await jsm.streams.delete(queueName).catch(() => {}) + + await deleteAllKV(nc) + await deleteAllStreams(jsm) + }) + + beforeEach(async () => { + // Create a mock function for the processor + processorMock = mock.fn<(job: JsMsg, timeout: number) => Promise>( + async () => {}, + ) + + queue = new FlowQueue({ + name: queueName, + client: js, + connection: nc, + priorities: queueMaxPriority, + duplicateWindow: 2000, + }) + await queue.setup() + + worker = new Worker({ + name: queueName, + client: js, + processor: processorMock, + maxRetries, + }) + + await worker.setup() + }) + + afterEach(async () => { + await worker!.stop() + await deleteAllKV(nc) + await deleteAllStreams(jsm) + }) + + after(async () => { + await nc.close() + }) + + it('should notify process parent after all children were processed', async () => { + await worker!.start() + const child1 = new Job({ + id: 'child1', + name: 'child1', + queueName: queueName, + data: { + message: 42, + }, + }) + const child2 = new Job({ + id: 'child2', + name: 'child2', + queueName: queueName, + data: { + message: 42, + }, + }) + const parentJob = new Job({ + id: 'parentJob', + name: 'parentJob', + queueName: queueName, + data: { + message: 42, + }, + }) + + const flowJobChild1 = new FlowJob({ + job: child1, + children: [], + }) + const flowJobChild2 = new FlowJob({ + job: child2, + children: [], + }) + const flowJobParent = new FlowJob({ + job: parentJob, + children: [flowJobChild1, flowJobChild2], + }) + + await queue!.addFlowJob(flowJobParent) + + await sleep(10000) // Wait for job to be processed + + assert.strictEqual(processorMock.mock.calls.length, 3) + + const processedParent: Job = processorMock.mock.calls[2].arguments[0].json() + assert(processedParent.id === flowJobParent.job.id) + }) + + it('should process parent after added child to parent after processing started', async () => { + await worker!.start() + const child1 = new Job({ + id: 'child1', + name: 'child1', + queueName: queueName, + data: { + message: 42, + }, + }) + const child2 = new Job({ + id: 'child2', + name: 'child2', + queueName: queueName, + data: { + message: 42, + }, + }) + const parentJob = new Job({ + id: 'parentJob', + name: 'parentJob', + queueName: queueName, + data: { + message: 42, + }, + }) + + const flowJobChild1 = new FlowJob({ + job: child1, + children: [], + }) + const flowJobChild2 = new FlowJob({ + job: child2, + children: [], + }) + const flowJobParent = new FlowJob({ + job: parentJob, + children: [flowJobChild1, flowJobChild2], + }) + + await queue!.addFlowJob(flowJobParent) + + await sleep(10000) // Wait for job to be processed + + assert.strictEqual(processorMock.mock.calls.length, 3) + + const processedParent: Job = processorMock.mock.calls[2].arguments[0].json() + assert(processedParent.id === flowJobParent.job.id) + }) +})