Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/flowJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Job } from './job'
import { FlowJobCreateData } from './types'

export class FlowJob {
job: Job
children?: FlowJob[]

constructor(data: FlowJobCreateData) {
this.job = new Job(data.job)
this.children = data.children
? data.children.map((j) => new FlowJob(j))
: []
}
}
132 changes: 132 additions & 0 deletions src/flowQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { KV, Kvm } from '@nats-io/kv'
import { FlowJob } from './flowJob'
import { Job } from './job'
import { Queue } from './queue'
import { ChildToParentsKVValue, DependenciesKVValue } from './types'

export class FlowQueue extends Queue {
private parentChildrenStore: KV | null = null
private childParentsStore: KV | null = null

public override async setup(): Promise<void> {
try {
await super.setup()
const kvm = await new Kvm(this.connection)
this.parentChildrenStore = await kvm.create(`${this.name}_parent_id`)
this.childParentsStore = await kvm.create(`${this.name}_parents`)
} catch (e) {
console.error(`Error connecting to JetStream: ${e}`)
throw e
}
}

public async addFlowJob(tree: FlowJob, priority: number = 1): Promise<void> {
const deepestJobs = await this.traverseJobTree(tree)
await this.addJobs(deepestJobs, priority)
}

private async traverseJobTree(
node: FlowJob,
parentId: string | null = null,
): Promise<Job[]> {
const currentJob = node.job
if (parentId) {
currentJob.meta.parentId = parentId
}

const children = node.children || []
if (children.length === 0) {
return [currentJob]
}

const newChildrenForCurrentJobCount: number =
await this.updateChildDependencies({
parentJob: currentJob,
childJobs: children.map((child) => child.job),
})

await this.updateParentDependecies({
parentJob: currentJob,
newChildrentCount: newChildrenForCurrentJobCount,
})

const deepestJobs: Job[] = []
for (const child of children) {
const traverseResult = await this.traverseJobTree(child, currentJob.id)
deepestJobs.push(...traverseResult)
}

return deepestJobs
}

private async updateChildDependencies({
parentJob,
childJobs,
}: {
parentJob: Job
childJobs: Job[]
}) {
let newChildrenForCurrentJobCount: number = 0
for (const childJob of childJobs) {
const childParentsKVEntry = await this.childParentsStore!.get(childJob.id)
if (!childParentsKVEntry) {
await this.childParentsStore!.put(
childJob.id,
JSON.stringify({
parentIds: [parentJob.id],
}),
)
newChildrenForCurrentJobCount++
continue
}

const parentsInfo: ChildToParentsKVValue = childParentsKVEntry.json()

if (parentsInfo.parentIds.includes(parentJob.id)) continue

parentsInfo.parentIds.push(parentJob.id)
await this.childParentsStore!.put(
childJob.id,
JSON.stringify(parentsInfo),
{
previousSeq: childParentsKVEntry.revision,
},
)
newChildrenForCurrentJobCount++
}

return newChildrenForCurrentJobCount
}

private async updateParentDependecies({
parentJob,
newChildrentCount,
}: {
parentJob: Job
newChildrentCount: number
}) {
const existingParentDependencies = await this.parentChildrenStore!.get(
parentJob.id,
)
if (!existingParentDependencies) {
await this.parentChildrenStore!.put(
parentJob.id,
JSON.stringify({
...parentJob,
childrenCount: newChildrentCount,
}),
)
} else {
const parentDependencies: DependenciesKVValue =
existingParentDependencies.json()
parentDependencies.childrenCount += newChildrentCount
await this.parentChildrenStore!.put(
parentJob.id,
JSON.stringify(parentDependencies),
{
previousSeq: existingParentDependencies.revision,
},
)
}
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './flowQueue'
export * from './queue'
export * from './job'
export * from './worker'
2 changes: 1 addition & 1 deletion src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class Job {
startTime: number
retryCount: number
timeout: number
// parentId?: string
parentId?: string
}
data: unknown
queueName: string
Expand Down
23 changes: 12 additions & 11 deletions src/jobEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ type Event<T, D> = {

export type JobCompletedEvent = Event<'JOB_COMPLETED', { jobId: string }>
export type JobFailedEvent = Event<'JOB_FAILED', { jobId: string }>
// export type JobChildCompletedEvent = Event<
// 'JOB_CHILD_COMPLETED',
// { childId: string; parentId: string }
// >
// export type JobChildFailedEvent = Event<
// 'JOB_CHILD_FAILED',
// { childId: string; parentId: string }
// >
export type JobChildCompletedEvent = Event<
'JOB_CHILD_COMPLETED',
{ childId: string; parentId: string }
>
export type JobChildFailedEvent = Event<
'JOB_CHILD_FAILED',
{ childId: string; parentId: string }
>

export type JobEvent =
// | JobChildFailedEvent
// | JobChildCompletedEvent
JobCompletedEvent | JobFailedEvent
| JobChildFailedEvent
| JobChildCompletedEvent
| JobCompletedEvent
| JobFailedEvent
24 changes: 12 additions & 12 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// import { Job } from './job'
import { Job } from './job'

// export type ParentJob = Job & {
// childrenCount: number
// }
export type ParentJob = Job & {
childrenCount: number
}

export type JobCreateData = {
name: string
Expand All @@ -13,18 +13,18 @@ export type JobCreateData = {
timeout?: number
}

// export type FlowJobCreateData = {
// job: JobCreateData
// children?: FlowJobCreateData[]
// }
export type FlowJobCreateData = {
job: JobCreateData
children?: FlowJobCreateData[]
}

export type RateLimit = {
duration: number
max: number
}

// export type DependenciesKVValue = ParentJob
export type DependenciesKVValue = ParentJob

// export type ChildToParentsKVValue = {
// parentIds: string[]
// }
export type ChildToParentsKVValue = {
parentIds: string[]
}
Loading
Loading