-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostgres-event-loop.ts
More file actions
41 lines (33 loc) · 1.12 KB
/
postgres-event-loop.ts
File metadata and controls
41 lines (33 loc) · 1.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import type { SQL } from "bun";
import { PostgresTaskQueue } from "./postgres-task-queue";
import { PostgresTimers } from "./postgres-timers";
import type { EventProcessor } from "@yieldstar/core";
import type { Logger } from "pino";
export class PostgresEventLoop {
taskQueue: PostgresTaskQueue;
timers: PostgresTimers;
private isRunning: boolean = false;
constructor(sql: SQL) {
this.taskQueue = new PostgresTaskQueue(sql);
this.timers = new PostgresTimers({ sql, taskQueue: this.taskQueue });
}
start(params: { onNewEvent: EventProcessor; logger: Logger }) {
this.isRunning = true;
this.loop(params.onNewEvent, params.logger);
}
stop() {
this.isRunning = false;
}
private async loop(processEvent: EventProcessor, logger: Logger) {
if (!this.isRunning) return;
while (!(await this.taskQueue.isEmpty())) {
const task = await this.taskQueue.process();
if (task) {
await processEvent(task.event, logger);
await this.taskQueue.remove(task.taskId);
}
}
await this.timers.processTimers();
setTimeout(() => this.loop(processEvent, logger), 10);
}
}