mirror of
https://github.com/outline/outline.git
synced 2025-12-21 10:39:41 -06:00
69 lines
1.9 KiB
TypeScript
69 lines
1.9 KiB
TypeScript
import Queue from "bull";
|
|
import { snakeCase } from "lodash";
|
|
import { Second } from "@shared/utils/time";
|
|
import env from "@server/env";
|
|
import Metrics from "@server/logging/Metrics";
|
|
import Redis from "../redis";
|
|
import ShutdownHelper, { ShutdownOrder } from "./ShutdownHelper";
|
|
|
|
export function createQueue(
|
|
name: string,
|
|
defaultJobOptions?: Partial<Queue.JobOptions>
|
|
) {
|
|
const prefix = `queue.${snakeCase(name)}`;
|
|
|
|
// Notes on reusing Redis connections for Bull:
|
|
// https://github.com/OptimalBits/bull/blob/b6d530f72a774be0fd4936ddb4ad9df3b183f4b6/PATTERNS.md#reusing-redis-connections
|
|
const queue = new Queue(name, {
|
|
createClient(type) {
|
|
switch (type) {
|
|
case "client":
|
|
return Redis.defaultClient;
|
|
|
|
case "subscriber":
|
|
return Redis.defaultSubscriber;
|
|
|
|
case "bclient":
|
|
return new Redis(env.REDIS_URL, {
|
|
maxRetriesPerRequest: null,
|
|
connectionNameSuffix: "bull",
|
|
});
|
|
|
|
default:
|
|
throw new Error(`Unexpected connection type: ${type}`);
|
|
}
|
|
},
|
|
defaultJobOptions: {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
...defaultJobOptions,
|
|
},
|
|
});
|
|
queue.on("stalled", () => {
|
|
Metrics.increment(`${prefix}.jobs.stalled`);
|
|
});
|
|
queue.on("completed", () => {
|
|
Metrics.increment(`${prefix}.jobs.completed`);
|
|
});
|
|
queue.on("error", () => {
|
|
Metrics.increment(`${prefix}.jobs.errored`);
|
|
});
|
|
queue.on("failed", () => {
|
|
Metrics.increment(`${prefix}.jobs.failed`);
|
|
});
|
|
|
|
if (env.ENVIRONMENT !== "test") {
|
|
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
|
setInterval(async () => {
|
|
Metrics.gauge(`${prefix}.count`, await queue.count());
|
|
Metrics.gauge(`${prefix}.delayed_count`, await queue.getDelayedCount());
|
|
}, 5 * Second);
|
|
}
|
|
|
|
ShutdownHelper.add(name, ShutdownOrder.normal, async () => {
|
|
await queue.close();
|
|
});
|
|
|
|
return queue;
|
|
}
|