Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/job-manager/lib/JobManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class JobManager {
* @prop {Object} [GhostJob.data] - data to be passed into the job
* @prop {Boolean} [GhostJob.offloaded] - creates an "offloaded" job running in a worker thread by default. If set to "false" runs an "inline" job on the same event loop
*/
addJob({name, at, job, data, offloaded = true}) {
async addJob({name, at, job, data, offloaded = true}) {
if (offloaded) {
logging.info('Adding offloaded job to the inline job queue');
let schedule;
Expand Down Expand Up @@ -242,7 +242,7 @@ class JobManager {
}

const breeJob = assembleBreeJob(at, job, data, name);
this.bree.add(breeJob);
await this.bree.add(breeJob);
return this.bree.start(name);
} else {
logging.info(`Adding one-off job to inlineQueue with current length = ${this.inlineQueue.length()} called '${name || 'anonymous'}'`);
Expand Down
2 changes: 1 addition & 1 deletion packages/job-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"@breejs/later": "4.2.0",
"@tryghost/errors": "^2.1.0",
"@tryghost/logging": "^3.1.0",
"bree": "6.5.0",
"bree": "9.2.9",
"cron-validate": "1.4.5",
"fastq": "1.20.1",
"p-wait-for": "3.2.0",
Expand Down
164 changes: 83 additions & 81 deletions packages/job-manager/test/job-manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ describe('Job Manager', function () {
});

describe('Offloaded jobs', function () {
it('accepts cron schedule when worker scheduling is stubbed', function () {
sandbox.stub(jobManager.bree, 'add').returns();
sandbox.stub(jobManager.bree, 'start').returns();
it('accepts cron schedule when worker scheduling is stubbed', async function () {
sandbox.stub(jobManager.bree, 'add').resolves();
sandbox.stub(jobManager.bree, 'start').resolves();

const jobPath = path.resolve(__dirname, './jobs/simple.js');
jobManager.addJob({
await jobManager.addJob({
at: '* * * * * *',
job: jobPath,
name: 'cron-job'
Expand Down Expand Up @@ -141,105 +141,107 @@ describe('Job Manager', function () {
const jobPath = path.resolve(__dirname, './jobs/simple.js');

const clock = FakeTimers.install({now: Date.now()});
jobManager.addJob({
at: timeInTenSeconds,
job: jobPath,
name: 'job-in-ten'
});
try {
await jobManager.addJob({
at: timeInTenSeconds,
job: jobPath,
name: 'job-in-ten'
});

assert.equal(typeof jobManager.bree.timeouts['job-in-ten'], 'object');
assert.equal(typeof jobManager.bree.workers['job-in-ten'], 'undefined');
assert.equal(jobManager.bree.timeouts.has('job-in-ten'), true);
assert.equal(jobManager.bree.workers.has('job-in-ten'), false);

// allow to run the job and start the worker
await clock.nextAsync();
// allow to run the job and start the worker
await clock.nextAsync();

assert.equal(typeof jobManager.bree.workers['job-in-ten'], 'object');
assert.equal(jobManager.bree.workers.has('job-in-ten'), true);

const promise = new Promise((resolve, reject) => {
jobManager.bree.workers['job-in-ten'].on('error', reject);
jobManager.bree.workers['job-in-ten'].on('exit', (code) => {
assert.equal(code, 0);
resolve();
const promise = new Promise((resolve, reject) => {
jobManager.bree.workers.get('job-in-ten').on('error', reject);
jobManager.bree.workers.get('job-in-ten').on('exit', (code) => {
assert.equal(code, 0);
resolve();
});
});
});

// allow job to finish execution and exit
clock.next();
// allow job to finish execution and exit
clock.next();

await promise;
await promise;

assert.equal(typeof jobManager.bree.workers['job-in-ten'], 'undefined');

clock.uninstall();
assert.equal(jobManager.bree.workers.has('job-in-ten'), false);
} finally {
clock.uninstall();
}
});

it('schedules a job to run immediately', async function () {
const clock = FakeTimers.install({now: Date.now()});
try {
const jobPath = path.resolve(__dirname, './jobs/simple.js');
await jobManager.addJob({
job: jobPath,
name: 'job-now'
});

const jobPath = path.resolve(__dirname, './jobs/simple.js');
jobManager.addJob({
job: jobPath,
name: 'job-now'
});

assert.equal(typeof jobManager.bree.timeouts['job-now'], 'object');
assert.equal(jobManager.bree.timeouts.has('job-now'), true);

// allow scheduler to pick up the job
clock.tick(1);
// allow scheduler to pick up the job
await clock.tickAsync(1);

assert.equal(typeof jobManager.bree.workers['job-now'], 'object');
assert.equal(jobManager.bree.workers.has('job-now'), true);

const promise = new Promise((resolve, reject) => {
jobManager.bree.workers['job-now'].on('error', reject);
jobManager.bree.workers['job-now'].on('exit', (code) => {
assert.equal(code, 0);
resolve();
const promise = new Promise((resolve, reject) => {
jobManager.bree.workers.get('job-now').on('error', reject);
jobManager.bree.workers.get('job-now').on('exit', (code) => {
assert.equal(code, 0);
resolve();
});
});
});

await promise;
await promise;

assert.equal(typeof jobManager.bree.workers['job-now'], 'undefined');

clock.uninstall();
assert.equal(jobManager.bree.workers.has('job-now'), false);
} finally {
clock.uninstall();
}
});

it('fails to schedule a job with the same name to run immediately one after another', async function () {
const clock = FakeTimers.install({now: Date.now()});
try {
const jobPath = path.resolve(__dirname, './jobs/simple.js');
await jobManager.addJob({
job: jobPath,
name: 'job-now'
});

const jobPath = path.resolve(__dirname, './jobs/simple.js');
jobManager.addJob({
job: jobPath,
name: 'job-now'
});

assert.equal(typeof jobManager.bree.timeouts['job-now'], 'object');
assert.equal(jobManager.bree.timeouts.has('job-now'), true);

// allow scheduler to pick up the job
clock.tick(1);
// allow scheduler to pick up the job
await clock.tickAsync(1);

assert.equal(typeof jobManager.bree.workers['job-now'], 'object');
assert.equal(jobManager.bree.workers.has('job-now'), true);

const promise = new Promise((resolve, reject) => {
jobManager.bree.workers['job-now'].on('error', reject);
jobManager.bree.workers['job-now'].on('exit', (code) => {
assert.equal(code, 0);
resolve();
const promise = new Promise((resolve, reject) => {
jobManager.bree.workers.get('job-now').on('error', reject);
jobManager.bree.workers.get('job-now').on('exit', (code) => {
assert.equal(code, 0);
resolve();
});
});
});

await promise;
await promise;

assert.equal(typeof jobManager.bree.workers['job-now'], 'undefined');
assert.equal(jobManager.bree.workers.has('job-now'), false);

assert.throws(() => {
jobManager.addJob({
await assert.rejects(() => jobManager.addJob({
job: jobPath,
name: 'job-now'
});
}, /Job #1 has a duplicate job name of job-now/);

clock.uninstall();
}), /Job #1 has a duplicate job name of job-now/);
} finally {
clock.uninstall();
}
});

it('uses custom error handler when job fails', async function (){
Expand All @@ -250,7 +252,7 @@ describe('Job Manager', function () {
jobManager = new JobManager({errorHandler: spyHandler, config: stubConfig});
const completion = jobManager.awaitCompletion('will-fail');

jobManager.addJob({
await jobManager.addJob({
job,
name: 'will-fail'
});
Expand All @@ -267,13 +269,13 @@ describe('Job Manager', function () {
jobManager = new JobManager({workerMessageHandler: workerMessageHandlerSpy, config: stubConfig});
const completion = jobManager.awaitCompletion('will-send-msg');

jobManager.addJob({
await jobManager.addJob({
job: path.resolve(__dirname, './jobs/message.js'),
name: 'will-send-msg'
});
jobManager.bree.run('will-send-msg');
await jobManager.bree.run('will-send-msg');
await delay(100);
jobManager.bree.workers['will-send-msg'].postMessage('hello from Ghost!');
jobManager.bree.workers.get('will-send-msg').postMessage('hello from Ghost!');

await completion;

Expand Down Expand Up @@ -547,7 +549,7 @@ describe('Job Manager', function () {
// allow job to get picked up and executed
await delay(100);

jobManager.bree.workers['successful-oneoff'].postMessage('be done!');
jobManager.bree.workers.get('successful-oneoff').postMessage('be done!');

// allow the message to be passed around
await jobCompletion;
Expand Down Expand Up @@ -694,7 +696,7 @@ describe('Job Manager', function () {
const timeInTenSeconds = new Date(Date.now() + 10);
const jobPath = path.resolve(__dirname, './jobs/simple.js');

jobManager.addJob({
await jobManager.addJob({
at: timeInTenSeconds,
job: jobPath,
name: 'job-in-ten'
Expand Down Expand Up @@ -727,20 +729,20 @@ describe('Job Manager', function () {
it('gracefully shuts down an interval job', async function () {
jobManager = new JobManager({config: stubConfig});

jobManager.addJob({
await jobManager.addJob({
at: 'every 5 seconds',
job: path.resolve(__dirname, './jobs/graceful.js')
});

await delay(1); // let the job execution kick in

assert.equal(Object.keys(jobManager.bree.workers).length, 0);
assert.equal(Object.keys(jobManager.bree.timeouts).length, 0);
assert.equal(Object.keys(jobManager.bree.intervals).length, 1);
assert.equal(jobManager.bree.workers.size, 0);
assert.equal(jobManager.bree.timeouts.size, 0);
assert.equal(jobManager.bree.intervals.size, 1);

await jobManager.shutdown();

assert.equal(Object.keys(jobManager.bree.intervals).length, 0);
assert.equal(jobManager.bree.intervals.size, 0);
});

it('gracefully shuts down the job queue worker pool');
Expand Down
Loading