Skip to content

Commit 033a1bf

Browse files
author
i.galkin@timelesslounge.tech
committed
orphan created
1 parent 601f8af commit 033a1bf

9 files changed

Lines changed: 841 additions & 70 deletions

File tree

src/flowJob.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { Job } from './job'
2+
import { FlowJobCreateData } from './types'
3+
4+
export class FlowJob {
5+
job: Job
6+
children?: FlowJob[]
7+
8+
constructor(data: FlowJobCreateData) {
9+
this.job = new Job(data.job)
10+
this.children = data.children
11+
? data.children.map((j) => new FlowJob(j))
12+
: []
13+
}
14+
}

src/flowQueue.ts

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { KV, Kvm } from '@nats-io/kv'
2+
import { FlowJob } from './flowJob'
3+
import { Job } from './job'
4+
import { Queue } from './queue'
5+
import { ChildToParentsKVValue, DependenciesKVValue } from './types'
6+
7+
export class FlowQueue extends Queue {
8+
private parentChildrenStore: KV | null = null
9+
private childParentsStore: KV | null = null
10+
11+
public override async setup(): Promise<void> {
12+
try {
13+
await super.setup()
14+
const kvm = await new Kvm(this.connection)
15+
this.parentChildrenStore = await kvm.create(`${this.name}_parent_id`)
16+
this.childParentsStore = await kvm.create(`${this.name}_parents`)
17+
} catch (e) {
18+
console.error(`Error connecting to JetStream: ${e}`)
19+
throw e
20+
}
21+
}
22+
23+
public async addFlowJob(tree: FlowJob, priority: number = 1): Promise<void> {
24+
const deepestJobs = await this.traverseJobTree(tree)
25+
await this.addJobs(deepestJobs, priority)
26+
}
27+
28+
private async traverseJobTree(
29+
node: FlowJob,
30+
parentId: string | null = null,
31+
): Promise<Job[]> {
32+
const currentJob = node.job
33+
if (parentId) {
34+
currentJob.meta.parentId = parentId
35+
}
36+
37+
const children = node.children || []
38+
if (children.length === 0) {
39+
return [currentJob]
40+
}
41+
42+
const newChildrenForCurrentJobCount: number =
43+
await this.updateChildDependencies({
44+
parentJob: currentJob,
45+
childJobs: children.map((child) => child.job),
46+
})
47+
48+
await this.updateParentDependecies({
49+
parentJob: currentJob,
50+
newChildrentCount: newChildrenForCurrentJobCount,
51+
})
52+
53+
const deepestJobs: Job[] = []
54+
for (const child of children) {
55+
const traverseResult = await this.traverseJobTree(child, currentJob.id)
56+
deepestJobs.push(...traverseResult)
57+
}
58+
59+
return deepestJobs
60+
}
61+
62+
private async updateChildDependencies({
63+
parentJob,
64+
childJobs,
65+
}: {
66+
parentJob: Job
67+
childJobs: Job[]
68+
}) {
69+
let newChildrenForCurrentJobCount: number = 0
70+
for (const childJob of childJobs) {
71+
const childParentsKVEntry = await this.childParentsStore!.get(childJob.id)
72+
if (!childParentsKVEntry) {
73+
await this.childParentsStore!.put(
74+
childJob.id,
75+
JSON.stringify({
76+
parentIds: [parentJob.id],
77+
}),
78+
)
79+
newChildrenForCurrentJobCount++
80+
continue
81+
}
82+
83+
const parentsInfo: ChildToParentsKVValue = childParentsKVEntry.json()
84+
85+
if (parentsInfo.parentIds.includes(parentJob.id)) continue
86+
87+
parentsInfo.parentIds.push(parentJob.id)
88+
await this.childParentsStore!.put(
89+
childJob.id,
90+
JSON.stringify(parentsInfo),
91+
{
92+
previousSeq: childParentsKVEntry.revision,
93+
},
94+
)
95+
newChildrenForCurrentJobCount++
96+
}
97+
98+
return newChildrenForCurrentJobCount
99+
}
100+
101+
private async updateParentDependecies({
102+
parentJob,
103+
newChildrentCount,
104+
}: {
105+
parentJob: Job
106+
newChildrentCount: number
107+
}) {
108+
const existingParentDependencies = await this.parentChildrenStore!.get(
109+
parentJob.id,
110+
)
111+
if (!existingParentDependencies) {
112+
await this.parentChildrenStore!.put(
113+
parentJob.id,
114+
JSON.stringify({
115+
...parentJob,
116+
childrenCount: newChildrentCount,
117+
}),
118+
)
119+
} else {
120+
const parentDependencies: DependenciesKVValue =
121+
existingParentDependencies.json()
122+
parentDependencies.childrenCount += newChildrentCount
123+
await this.parentChildrenStore!.put(
124+
parentJob.id,
125+
JSON.stringify(parentDependencies),
126+
{
127+
previousSeq: existingParentDependencies.revision,
128+
},
129+
)
130+
}
131+
}
132+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
export * from './flowQueue'
12
export * from './queue'
23
export * from './job'
34
export * from './worker'

src/job.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export class Job {
88
startTime: number
99
retryCount: number
1010
timeout: number
11-
// parentId?: string
11+
parentId?: string
1212
}
1313
data: unknown
1414
queueName: string

src/jobEvent.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@ type Event<T, D> = {
55

66
export type JobCompletedEvent = Event<'JOB_COMPLETED', { jobId: string }>
77
export type JobFailedEvent = Event<'JOB_FAILED', { jobId: string }>
8-
// export type JobChildCompletedEvent = Event<
9-
// 'JOB_CHILD_COMPLETED',
10-
// { childId: string; parentId: string }
11-
// >
12-
// export type JobChildFailedEvent = Event<
13-
// 'JOB_CHILD_FAILED',
14-
// { childId: string; parentId: string }
15-
// >
8+
export type JobChildCompletedEvent = Event<
9+
'JOB_CHILD_COMPLETED',
10+
{ childId: string; parentId: string }
11+
>
12+
export type JobChildFailedEvent = Event<
13+
'JOB_CHILD_FAILED',
14+
{ childId: string; parentId: string }
15+
>
1616

1717
export type JobEvent =
18-
// | JobChildFailedEvent
19-
// | JobChildCompletedEvent
20-
JobCompletedEvent | JobFailedEvent
18+
| JobChildFailedEvent
19+
| JobChildCompletedEvent
20+
| JobCompletedEvent
21+
| JobFailedEvent

src/types.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
// import { Job } from './job'
1+
import { Job } from './job'
22

3-
// export type ParentJob = Job & {
4-
// childrenCount: number
5-
// }
3+
export type ParentJob = Job & {
4+
childrenCount: number
5+
}
66

77
export type JobCreateData = {
88
name: string
@@ -13,18 +13,18 @@ export type JobCreateData = {
1313
timeout?: number
1414
}
1515

16-
// export type FlowJobCreateData = {
17-
// job: JobCreateData
18-
// children?: FlowJobCreateData[]
19-
// }
16+
export type FlowJobCreateData = {
17+
job: JobCreateData
18+
children?: FlowJobCreateData[]
19+
}
2020

2121
export type RateLimit = {
2222
duration: number
2323
max: number
2424
}
2525

26-
// export type DependenciesKVValue = ParentJob
26+
export type DependenciesKVValue = ParentJob
2727

28-
// export type ChildToParentsKVValue = {
29-
// parentIds: string[]
30-
// }
28+
export type ChildToParentsKVValue = {
29+
parentIds: string[]
30+
}

0 commit comments

Comments
 (0)