diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md
index 63516723a0..3c045329c2 100644
--- a/docs/ARCHITECTURE.md
+++ b/docs/ARCHITECTURE.md
@@ -45,7 +45,8 @@ server
├── policies - Authorization logic based on cancan
├── presenters - JSON presenters for database models, the interface between backend -> frontend
├── queues - Async queue definitions
-│ └── processors - Processors perform async jobs, usually working on events from the event bus
+│ └── processors - Processors perform jobs on events from the event bus
+│ └── tasks - Tasks are arbitrary async jobs not from the event bus
├── services - Services start distinct portions of the application eg api, worker
├── static - Static assets
├── test - Test helpers and fixtures, tests themselves are colocated
diff --git a/server/commands/accountProvisioner.test.ts b/server/commands/accountProvisioner.test.ts
index b3a591ba10..c1070c7405 100644
--- a/server/commands/accountProvisioner.test.ts
+++ b/server/commands/accountProvisioner.test.ts
@@ -1,6 +1,6 @@
-import mailer from "@server/mailer";
import Collection from "@server/models/Collection";
import UserAuthentication from "@server/models/UserAuthentication";
+import EmailTask from "@server/queues/tasks/EmailTask";
import { buildUser, buildTeam } from "@server/test/factories";
import { flushdb } from "@server/test/support";
import accountProvisioner from "./accountProvisioner";
@@ -13,7 +13,7 @@ describe("accountProvisioner", () => {
const ip = "127.0.0.1";
it("should create a new user and team", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
const { user, team, isNewTeam, isNewUser } = await accountProvisioner({
ip,
user: {
@@ -55,7 +55,7 @@ describe("accountProvisioner", () => {
});
it("should update exising user and authentication", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
const existingTeam = await buildTeam();
const providers = await existingTeam.$get("authenticationProviders");
const authenticationProvider = providers[0];
@@ -149,7 +149,7 @@ describe("accountProvisioner", () => {
});
it("should create a new user in an existing team", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
const team = await buildTeam();
const authenticationProviders = await team.$get("authenticationProviders");
const authenticationProvider = authenticationProviders[0];
diff --git a/server/commands/accountProvisioner.ts b/server/commands/accountProvisioner.ts
index edb5301952..7d4b3b952e 100644
--- a/server/commands/accountProvisioner.ts
+++ b/server/commands/accountProvisioner.ts
@@ -6,8 +6,8 @@ import {
AuthenticationProviderDisabledError,
} from "@server/errors";
import { APM } from "@server/logging/tracing";
-import mailer from "@server/mailer";
import { Collection, Team, User } from "@server/models";
+import EmailTask from "@server/queues/tasks/EmailTask";
import teamCreator from "./teamCreator";
import userCreator from "./userCreator";
@@ -89,9 +89,12 @@ async function accountProvisioner({
const { isNewUser, user } = result;
if (isNewUser) {
- await mailer.sendTemplate("welcome", {
- to: user.email,
- teamUrl: team.url,
+ await EmailTask.schedule({
+ type: "welcome",
+ options: {
+ to: user.email,
+ teamUrl: team.url,
+ },
});
}
diff --git a/server/commands/documentUpdater.ts b/server/commands/documentUpdater.ts
index 3bcc552bd8..de4f41bc13 100644
--- a/server/commands/documentUpdater.ts
+++ b/server/commands/documentUpdater.ts
@@ -55,7 +55,7 @@ export default async function documentUpdater({
return;
}
- await Event.add({
+ await Event.schedule({
name: "documents.update",
documentId: document.id,
collectionId: document.collectionId,
diff --git a/server/commands/userInviter.ts b/server/commands/userInviter.ts
index 4b2a7ea001..e1c6a83df9 100644
--- a/server/commands/userInviter.ts
+++ b/server/commands/userInviter.ts
@@ -2,8 +2,8 @@ import invariant from "invariant";
import { uniqBy } from "lodash";
import { Role } from "@shared/types";
import Logger from "@server/logging/logger";
-import mailer from "@server/mailer";
import { User, Event, Team } from "@server/models";
+import EmailTask from "@server/queues/tasks/EmailTask";
type Invite = {
name: string;
@@ -74,13 +74,17 @@ export default async function userInviter({
},
ip,
});
- await mailer.sendTemplate("invite", {
- to: invite.email,
- name: invite.name,
- actorName: user.name,
- actorEmail: user.email,
- teamName: team.name,
- teamUrl: team.url,
+
+ await EmailTask.schedule({
+ type: "invite",
+ options: {
+ to: invite.email,
+ name: invite.name,
+ actorName: user.name,
+ actorEmail: user.email,
+ teamName: team.name,
+ teamUrl: team.url,
+ },
});
if (process.env.NODE_ENV === "development") {
diff --git a/server/emails/CollectionNotificationEmail.tsx b/server/emails/CollectionNotificationEmail.tsx
index bfbbb291ec..a79a1f664a 100644
--- a/server/emails/CollectionNotificationEmail.tsx
+++ b/server/emails/CollectionNotificationEmail.tsx
@@ -1,5 +1,5 @@
import * as React from "react";
-import { User, Collection } from "@server/models";
+import { Collection } from "@server/models";
import Body from "./components/Body";
import Button from "./components/Button";
import EmailTemplate from "./components/EmailLayout";
@@ -9,26 +9,23 @@ import Header from "./components/Header";
import Heading from "./components/Heading";
export type Props = {
- actor: User;
collection: Collection;
eventName: string;
unsubscribeUrl: string;
};
export const collectionNotificationEmailText = ({
- actor,
collection,
eventName = "created",
}: Props) => `
${collection.name}
-${actor.name} ${eventName} the collection "${collection.name}"
+${collection.user.name} ${eventName} the collection "${collection.name}"
Open Collection: ${process.env.URL}${collection.url}
`;
export const CollectionNotificationEmail = ({
- actor,
collection,
eventName = "created",
unsubscribeUrl,
@@ -40,7 +37,7 @@ export const CollectionNotificationEmail = ({
{collection.name}
- {actor.name} {eventName} the collection "{collection.name}".
+ {collection.user.name} {eventName} the collection "{collection.name}".
diff --git a/server/emails/DocumentNotificationEmail.tsx b/server/emails/DocumentNotificationEmail.tsx
index 4fc12b8f4b..9622ef1aab 100644
--- a/server/emails/DocumentNotificationEmail.tsx
+++ b/server/emails/DocumentNotificationEmail.tsx
@@ -1,5 +1,5 @@
import * as React from "react";
-import { Document, User, Team, Collection } from "@server/models";
+import { Document } from "@server/models";
import Body from "./components/Body";
import Button from "./components/Button";
import EmailTemplate from "./components/EmailLayout";
@@ -9,34 +9,34 @@ import Header from "./components/Header";
import Heading from "./components/Heading";
export type Props = {
- actor: User;
- team: Team;
document: Document;
- collection: Collection;
+ actorName: string;
+ collectionName: string;
eventName: string;
+ teamUrl: string;
unsubscribeUrl: string;
};
export const documentNotificationEmailText = ({
- actor,
- team,
+ actorName,
+ teamUrl,
document,
- collection,
+ collectionName,
eventName = "published",
}: Props) => `
"${document.title}" ${eventName}
-${actor.name} ${eventName} the document "${document.title}", in the ${collection.name} collection.
+${actorName} ${eventName} the document "${document.title}", in the ${collectionName} collection.
-Open Document: ${team.url}${document.url}
+Open Document: ${teamUrl}${document.url}
`;
export const DocumentNotificationEmail = ({
- actor,
- team,
document,
- collection,
+ actorName,
+ collectionName,
eventName = "published",
+ teamUrl,
unsubscribeUrl,
}: Props) => {
return (
@@ -48,15 +48,15 @@ export const DocumentNotificationEmail = ({
"{document.title}" {eventName}
- {actor.name} {eventName} the document "{document.title}", in the{" "}
- {collection.name} collection.
+ {actorName} {eventName} the document "{document.title}", in the{" "}
+ {collectionName} collection.
{document.getSummary()}
-
+
diff --git a/server/logging/logger.ts b/server/logging/logger.ts
index 2e59411721..630c3c200b 100644
--- a/server/logging/logger.ts
+++ b/server/logging/logger.ts
@@ -11,6 +11,7 @@ type LogCategory =
| "http"
| "commands"
| "processor"
+ | "task"
| "email"
| "queue"
| "database"
diff --git a/server/mailer.tsx b/server/mailer.tsx
index 058e43f6d3..dc106beed7 100644
--- a/server/mailer.tsx
+++ b/server/mailer.tsx
@@ -1,13 +1,13 @@
+import invariant from "invariant";
import nodemailer from "nodemailer";
import Oy from "oy-vey";
import * as React from "react";
+import { Collection, Document } from "@server/models";
import {
- Props as CollectionNotificationEmailT,
CollectionNotificationEmail,
collectionNotificationEmailText,
} from "./emails/CollectionNotificationEmail";
import {
- Props as DocumentNotificationEmailT,
DocumentNotificationEmail,
documentNotificationEmailText,
} from "./emails/DocumentNotificationEmail";
@@ -28,7 +28,6 @@ import { SigninEmail, signinEmailText } from "./emails/SigninEmail";
import { WelcomeEmail, welcomeEmailText } from "./emails/WelcomeEmail";
import { baseStyles } from "./emails/components/EmailLayout";
import Logger from "./logging/logger";
-import { emailsQueue } from "./queues";
const useTestEmailService =
process.env.NODE_ENV === "development" && !process.env.SMTP_USERNAME;
@@ -223,49 +222,44 @@ export class Mailer {
});
};
- documentNotification = async (
- opts: {
- to: string;
- } & DocumentNotificationEmailT
- ) => {
+ documentNotification = async (opts: {
+ to: string;
+ eventName: string;
+ actorName: string;
+ documentId: string;
+ teamUrl: string;
+ collectionName: string;
+ unsubscribeUrl: string;
+ }) => {
+ const document = await Document.unscoped().findByPk(opts.documentId);
+ invariant(document, "Document not found");
+
this.sendMail({
to: opts.to,
- title: `“${opts.document.title}” ${opts.eventName}`,
- previewText: `${opts.actor.name} ${opts.eventName} a new document`,
- html: ,
- text: documentNotificationEmailText(opts),
+ title: `“${document.title}” ${opts.eventName}`,
+ previewText: `${opts.actorName} ${opts.eventName} a new document`,
+ html: ,
+ text: documentNotificationEmailText({ ...opts, document }),
});
};
- collectionNotification = async (
- opts: {
- to: string;
- } & CollectionNotificationEmailT
- ) => {
+ collectionNotification = async (opts: {
+ to: string;
+ eventName: string;
+ collectionId: string;
+ unsubscribeUrl: string;
+ }) => {
+ const collection = await Collection.findByPk(opts.collectionId);
+ invariant(collection, "Collection not found");
+
this.sendMail({
to: opts.to,
- title: `“${opts.collection.name}” ${opts.eventName}`,
- previewText: `${opts.actor.name} ${opts.eventName} a collection`,
- html: ,
- text: collectionNotificationEmailText(opts),
+ title: `“${collection.name}” ${opts.eventName}`,
+ previewText: `${collection.user.name} ${opts.eventName} a collection`,
+ html: ,
+ text: collectionNotificationEmailText({ ...opts, collection }),
});
};
-
- sendTemplate = (type: EmailTypes, opts: Record = {}) => {
- return emailsQueue.add(
- {
- type,
- opts,
- },
- {
- attempts: 5,
- backoff: {
- type: "exponential",
- delay: 60 * 1000,
- },
- }
- );
- };
}
const mailer = new Mailer();
diff --git a/server/models/Event.ts b/server/models/Event.ts
index 1180b731b9..1cef5359cf 100644
--- a/server/models/Event.ts
+++ b/server/models/Event.ts
@@ -86,9 +86,9 @@ class Event extends BaseModel {
@Column(DataType.UUID)
teamId: string;
- // add can be used to send events into the event system without recording them
- // in the database or audit trail
- static add(event: Partial) {
+ // Schedule can be used to send events into the event system without recording
+ // them in the database or audit trail – consider using a task instead.
+ static schedule(event: Partial) {
const now = new Date();
globalEventQueue.add(
this.build({
diff --git a/server/queues/index.ts b/server/queues/index.ts
index 1c08c3e864..2ec47e83b8 100644
--- a/server/queues/index.ts
+++ b/server/queues/index.ts
@@ -4,6 +4,6 @@ export const globalEventQueue = createQueue("globalEvents");
export const processorEventQueue = createQueue("processorEvents");
-export const websocketsQueue = createQueue("websockets");
+export const websocketQueue = createQueue("websockets");
-export const emailsQueue = createQueue("emails");
+export const taskQueue = createQueue("tasks");
diff --git a/server/queues/processors/backlinks.test.ts b/server/queues/processors/BacklinksProcessor.test.ts
similarity index 76%
rename from server/queues/processors/backlinks.test.ts
rename to server/queues/processors/BacklinksProcessor.test.ts
index 270299bbc9..20f1462f21 100644
--- a/server/queues/processors/backlinks.test.ts
+++ b/server/queues/processors/BacklinksProcessor.test.ts
@@ -1,9 +1,10 @@
import { Backlink } from "@server/models";
import { buildDocument } from "@server/test/factories";
import { flushdb } from "@server/test/support";
-import BacklinksService from "./backlinks";
+import BacklinksProcessor from "./BacklinksProcessor";
+
+const ip = "127.0.0.1";
-const Backlinks = new BacklinksService();
beforeEach(() => flushdb());
beforeEach(jest.resetAllMocks);
@@ -13,13 +14,16 @@ describe("documents.publish", () => {
const document = await buildDocument({
text: `[this is a link](${otherDocument.url})`,
});
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.publish"; doc... Remove this comment to see the full error message
- await Backlinks.on({
+
+ const processor = new BacklinksProcessor();
+ await processor.perform({
name: "documents.publish",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ data: { title: document.title },
+ ip,
});
const backlinks = await Backlink.findAll({
where: {
@@ -38,13 +42,16 @@ describe("documents.publish", () => {
});
document.text = `[this is a link](${otherDocument.url})`;
await document.save();
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.publish"; doc... Remove this comment to see the full error message
- await Backlinks.on({
+
+ const processor = new BacklinksProcessor();
+ await processor.perform({
name: "documents.publish",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ data: { title: document.title },
+ ip,
});
const backlinks = await Backlink.findAll({
where: {
@@ -61,13 +68,17 @@ describe("documents.update", () => {
const document = await buildDocument({
text: `[this is a link](${otherDocument.url})`,
});
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message
- await Backlinks.on({
+
+ const processor = new BacklinksProcessor();
+ await processor.perform({
name: "documents.update",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ createdAt: new Date().toISOString(),
+ data: { title: document.title, autosave: false, done: true },
+ ip,
});
const backlinks = await Backlink.findAll({
where: {
@@ -85,13 +96,17 @@ describe("documents.update", () => {
});
document.text = `[this is a link](${otherDocument.url})`;
await document.save();
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message
- await Backlinks.on({
+
+ const processor = new BacklinksProcessor();
+ await processor.perform({
name: "documents.update",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ createdAt: new Date().toISOString(),
+ data: { title: document.title, autosave: false, done: true },
+ ip,
});
const backlinks = await Backlink.findAll({
where: {
@@ -106,13 +121,17 @@ describe("documents.update", () => {
const document = await buildDocument();
document.text = `[this is a link](${otherDocument.url})`;
await document.save();
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message
- await Backlinks.on({
+
+ const processor = new BacklinksProcessor();
+ await processor.perform({
name: "documents.update",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ createdAt: new Date().toISOString(),
+ data: { title: document.title, autosave: false, done: true },
+ ip,
});
const backlinks = await Backlink.findAll({
where: {
@@ -130,25 +149,31 @@ describe("documents.update", () => {
[this is a another link](${yetAnotherDocument.url})`,
});
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.publish"; doc... Remove this comment to see the full error message
- await Backlinks.on({
+
+ const processor = new BacklinksProcessor();
+ await processor.perform({
name: "documents.publish",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ data: { title: document.title },
+ ip,
});
document.text = `First link is gone
[this is a another link](${yetAnotherDocument.url})`;
await document.save();
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message
- await Backlinks.on({
+
+ await processor.perform({
name: "documents.update",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ createdAt: new Date().toISOString(),
+ data: { title: document.title, autosave: false, done: true },
+ ip,
});
const backlinks = await Backlink.findAll({
where: {
@@ -166,21 +191,27 @@ describe("documents.delete", () => {
const document = await buildDocument();
document.text = `[this is a link](${otherDocument.url})`;
await document.save();
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message
- await Backlinks.on({
+
+ const processor = new BacklinksProcessor();
+ await processor.perform({
name: "documents.update",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ createdAt: new Date().toISOString(),
+ data: { title: document.title, autosave: false, done: true },
+ ip,
});
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.delete"; docu... Remove this comment to see the full error message
- await Backlinks.on({
+
+ await processor.perform({
name: "documents.delete",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ data: { title: document.title },
+ ip,
});
const backlinks = await Backlink.findAll({
where: {
@@ -201,29 +232,33 @@ describe("documents.title_change", () => {
document.text = `[${otherDocument.title}](${otherDocument.url})`;
await document.save();
// ensure the backlinks are created
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message
- await Backlinks.on({
+ const processor = new BacklinksProcessor();
+ await processor.perform({
name: "documents.update",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ createdAt: new Date().toISOString(),
+ data: { title: document.title, autosave: false, done: true },
+ ip,
});
// change the title of the linked doc
otherDocument.title = newTitle;
await otherDocument.save();
// does the text get updated with the new title
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.title_change"... Remove this comment to see the full error message
- await Backlinks.on({
+ await processor.perform({
name: "documents.title_change",
documentId: otherDocument.id,
collectionId: otherDocument.collectionId,
teamId: otherDocument.teamId,
actorId: otherDocument.createdById,
+ createdAt: new Date().toISOString(),
data: {
previousTitle,
title: newTitle,
},
+ ip,
});
await document.reload();
expect(document.text).toBe(`[${newTitle}](${otherDocument.url})`);
diff --git a/server/queues/processors/backlinks.ts b/server/queues/processors/BacklinksProcessor.ts
similarity index 90%
rename from server/queues/processors/backlinks.ts
rename to server/queues/processors/BacklinksProcessor.ts
index 8c34d48c87..2626f65d80 100644
--- a/server/queues/processors/backlinks.ts
+++ b/server/queues/processors/BacklinksProcessor.ts
@@ -1,11 +1,21 @@
import { Op } from "sequelize";
+import { APM } from "@server/logging/tracing";
import { Document, Backlink, Team } from "@server/models";
+import { Event, DocumentEvent, RevisionEvent } from "@server/types";
import parseDocumentIds from "@server/utils/parseDocumentIds";
import slugify from "@server/utils/slugify";
-import { DocumentEvent, RevisionEvent } from "../../types";
+import BaseProcessor from "./BaseProcessor";
-export default class BacklinksProcessor {
- async on(event: DocumentEvent | RevisionEvent) {
+@APM.trace()
+export default class BacklinksProcessor extends BaseProcessor {
+ static applicableEvents: Event["name"][] = [
+ "documents.publish",
+ "documents.update",
+ "documents.title_change",
+ "documents.delete",
+ ];
+
+ async perform(event: DocumentEvent | RevisionEvent) {
switch (event.name) {
case "documents.publish": {
const document = await Document.findByPk(event.documentId);
diff --git a/server/queues/processors/BaseProcessor.ts b/server/queues/processors/BaseProcessor.ts
new file mode 100644
index 0000000000..b077068243
--- /dev/null
+++ b/server/queues/processors/BaseProcessor.ts
@@ -0,0 +1,7 @@
+import { Event } from "@server/types";
+
+export default abstract class BaseProcessor {
+ static applicableEvents: Event["name"][] = [];
+
+ public abstract perform(event: Event): Promise;
+}
diff --git a/server/queues/processors/debouncer.ts b/server/queues/processors/DebounceProcessor.ts
similarity index 72%
rename from server/queues/processors/debouncer.ts
rename to server/queues/processors/DebounceProcessor.ts
index 466614c8bb..3f6d9d42e4 100644
--- a/server/queues/processors/debouncer.ts
+++ b/server/queues/processors/DebounceProcessor.ts
@@ -1,9 +1,17 @@
+import { APM } from "@server/logging/tracing";
import Document from "@server/models/Document";
-import { globalEventQueue } from "../../queues";
-import { Event } from "../../types";
+import { Event } from "@server/types";
+import { globalEventQueue } from "..";
+import BaseProcessor from "./BaseProcessor";
-export default class DebounceProcessor {
- async on(event: Event) {
+@APM.trace()
+export default class DebounceProcessor extends BaseProcessor {
+ static applicableEvents: Event["name"][] = [
+ "documents.update",
+ "documents.update.delayed",
+ ];
+
+ async perform(event: Event) {
switch (event.name) {
case "documents.update": {
globalEventQueue.add(
diff --git a/server/queues/processors/exports.ts b/server/queues/processors/ExportsProcessor.ts
similarity index 81%
rename from server/queues/processors/exports.ts
rename to server/queues/processors/ExportsProcessor.ts
index 1d239ff729..a74b71b6e0 100644
--- a/server/queues/processors/exports.ts
+++ b/server/queues/processors/ExportsProcessor.ts
@@ -1,14 +1,20 @@
import fs from "fs";
import invariant from "invariant";
import Logger from "@server/logging/logger";
-import mailer from "@server/mailer";
import { FileOperation, Collection, Event, Team, User } from "@server/models";
+import EmailTask from "@server/queues/tasks/EmailTask";
import { Event as TEvent } from "@server/types";
import { uploadToS3FromBuffer } from "@server/utils/s3";
import { archiveCollections } from "@server/utils/zip";
+import BaseProcessor from "./BaseProcessor";
-export default class ExportsProcessor {
- async on(event: TEvent) {
+export default class ExportsProcessor extends BaseProcessor {
+ static applicableEvents: TEvent["name"][] = [
+ "collections.export",
+ "collections.export_all",
+ ];
+
+ async perform(event: TEvent) {
switch (event.name) {
case "collections.export":
case "collections.export_all": {
@@ -82,15 +88,21 @@ export default class ExportsProcessor {
});
if (state === "error") {
- mailer.sendTemplate("exportFailure", {
- to: user.email,
- teamUrl: team.url,
+ await EmailTask.schedule({
+ type: "exportFailure",
+ options: {
+ to: user.email,
+ teamUrl: team.url,
+ },
});
} else {
- mailer.sendTemplate("exportSuccess", {
- to: user.email,
- id: fileOperation.id,
- teamUrl: team.url,
+ await EmailTask.schedule({
+ type: "exportSuccess",
+ options: {
+ to: user.email,
+ id: fileOperation.id,
+ teamUrl: team.url,
+ },
});
}
}
@@ -108,7 +120,7 @@ export default class ExportsProcessor {
data: Partial
) {
await fileOperation.update(data);
- await Event.add({
+ await Event.schedule({
name: "fileOperations.update",
teamId,
actorId,
diff --git a/server/queues/processors/imports.ts b/server/queues/processors/ImportsProcessor.ts
similarity index 85%
rename from server/queues/processors/imports.ts
rename to server/queues/processors/ImportsProcessor.ts
index 2dd35da2b2..813af09c19 100644
--- a/server/queues/processors/imports.ts
+++ b/server/queues/processors/ImportsProcessor.ts
@@ -4,10 +4,13 @@ import File from "formidable/lib/file";
import invariant from "invariant";
import collectionImporter from "@server/commands/collectionImporter";
import { Event, FileOperation, Attachment, User } from "@server/models";
-import { Event as TEvent } from "../../types";
+import { Event as TEvent } from "@server/types";
+import BaseProcessor from "./BaseProcessor";
-export default class ImportsProcessor {
- async on(event: TEvent) {
+export default class ImportsProcessor extends BaseProcessor {
+ static applicableEvents: TEvent["name"][] = ["collections.import"];
+
+ async perform(event: TEvent) {
switch (event.name) {
case "collections.import": {
let state, error;
@@ -27,7 +30,7 @@ export default class ImportsProcessor {
teamId: user.teamId,
});
- await Event.add({
+ await Event.schedule({
name: "fileOperations.create",
modelId: fileOperation.id,
teamId: user.teamId,
@@ -59,7 +62,7 @@ export default class ImportsProcessor {
error = err.message;
} finally {
await fileOperation.update({ state, error });
- await Event.add({
+ await Event.schedule({
name: "fileOperations.update",
modelId: fileOperation.id,
teamId: user.teamId,
diff --git a/server/queues/processors/notifications.test.ts b/server/queues/processors/NotificationsProcessor.test.ts
similarity index 79%
rename from server/queues/processors/notifications.test.ts
rename to server/queues/processors/NotificationsProcessor.test.ts
index f6fe0a3730..90870fdd2c 100644
--- a/server/queues/processors/notifications.test.ts
+++ b/server/queues/processors/NotificationsProcessor.test.ts
@@ -1,16 +1,16 @@
-import mailer from "@server/mailer";
import { View, NotificationSetting } from "@server/models";
+import EmailTask from "@server/queues/tasks/EmailTask";
import {
buildDocument,
buildCollection,
buildUser,
} from "@server/test/factories";
import { flushdb } from "@server/test/support";
-import NotificationsService from "./notifications";
+import NotificationsProcessor from "./NotificationsProcessor";
-jest.mock("@server/mailer");
+jest.mock("@server/queues/tasks/EmailTask");
+const ip = "127.0.0.1";
-const Notifications = new NotificationsService();
beforeEach(() => flushdb());
beforeEach(jest.resetAllMocks);
@@ -26,18 +26,20 @@ describe("documents.publish", () => {
teamId: user.teamId,
event: "documents.publish",
});
- await Notifications.on({
+
+ const processor = new NotificationsProcessor();
+ await processor.perform({
name: "documents.publish",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
- ip: "127.0.0.1",
data: {
title: document.title,
},
+ ip,
});
- expect(mailer.documentNotification).not.toHaveBeenCalled();
+ expect(EmailTask.schedule).not.toHaveBeenCalled();
});
test("should send a notification to other users in team", async () => {
@@ -51,18 +53,19 @@ describe("documents.publish", () => {
event: "documents.publish",
});
- await Notifications.on({
+ const processor = new NotificationsProcessor();
+ await processor.perform({
name: "documents.publish",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
- ip: "127.0.0.1",
data: {
title: document.title,
},
+ ip,
});
- expect(mailer.documentNotification).toHaveBeenCalled();
+ expect(EmailTask.schedule).toHaveBeenCalled();
});
test("should not send a notification to users without collection access", async () => {
@@ -80,18 +83,19 @@ describe("documents.publish", () => {
teamId: user.teamId,
event: "documents.publish",
});
- await Notifications.on({
+ const processor = new NotificationsProcessor();
+ await processor.perform({
name: "documents.publish",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
- ip: "127.0.0.1",
data: {
title: document.title,
},
+ ip,
});
- expect(mailer.documentNotification).not.toHaveBeenCalled();
+ expect(EmailTask.schedule).not.toHaveBeenCalled();
});
});
@@ -108,13 +112,14 @@ describe("revisions.create", () => {
teamId: collaborator.teamId,
event: "documents.update",
});
- await Notifications.on({
+ const processor = new NotificationsProcessor();
+ await processor.perform({
name: "revisions.create",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
});
- expect(mailer.documentNotification).toHaveBeenCalled();
+ expect(EmailTask.schedule).toHaveBeenCalled();
});
test("should not send a notification if viewed since update", async () => {
@@ -130,13 +135,15 @@ describe("revisions.create", () => {
event: "documents.update",
});
await View.touch(document.id, collaborator.id, true);
- await Notifications.on({
+
+ const processor = new NotificationsProcessor();
+ await processor.perform({
name: "revisions.create",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
});
- expect(mailer.documentNotification).not.toHaveBeenCalled();
+ expect(EmailTask.schedule).not.toHaveBeenCalled();
});
test("should not send a notification to last editor", async () => {
@@ -150,12 +157,13 @@ describe("revisions.create", () => {
teamId: user.teamId,
event: "documents.update",
});
- await Notifications.on({
+ const processor = new NotificationsProcessor();
+ await processor.perform({
name: "revisions.create",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
});
- expect(mailer.documentNotification).not.toHaveBeenCalled();
+ expect(EmailTask.schedule).not.toHaveBeenCalled();
});
});
diff --git a/server/queues/processors/notifications.ts b/server/queues/processors/NotificationsProcessor.ts
similarity index 80%
rename from server/queues/processors/notifications.ts
rename to server/queues/processors/NotificationsProcessor.ts
index a5e9fbd12e..54682c4372 100644
--- a/server/queues/processors/notifications.ts
+++ b/server/queues/processors/NotificationsProcessor.ts
@@ -1,6 +1,6 @@
import { Op } from "sequelize";
import Logger from "@server/logging/logger";
-import mailer from "@server/mailer";
+import { APM } from "@server/logging/tracing";
import {
View,
Document,
@@ -15,9 +15,18 @@ import {
RevisionEvent,
Event,
} from "@server/types";
+import EmailTask from "../tasks/EmailTask";
+import BaseProcessor from "./BaseProcessor";
-export default class NotificationsProcessor {
- async on(event: Event) {
+@APM.trace()
+export default class NotificationsProcessor extends BaseProcessor {
+ static applicableEvents: Event["name"][] = [
+ "documents.publish",
+ "revisions.create",
+ "collections.create",
+ ];
+
+ async perform(event: Event) {
switch (event.name) {
case "documents.publish":
case "revisions.create":
@@ -114,14 +123,17 @@ export default class NotificationsProcessor {
continue;
}
- mailer.documentNotification({
- to: setting.user.email,
- eventName,
- document,
- team,
- collection,
- actor: document.updatedBy,
- unsubscribeUrl: setting.unsubscribeUrl,
+ await EmailTask.schedule({
+ type: "documentNotification",
+ options: {
+ to: setting.user.email,
+ eventName,
+ documentId: document.id,
+ teamUrl: team.url,
+ actorName: document.updatedBy.name,
+ collectionName: collection.name,
+ unsubscribeUrl: setting.unsubscribeUrl,
+ },
});
}
}
@@ -165,12 +177,14 @@ export default class NotificationsProcessor {
continue;
}
- mailer.collectionNotification({
- to: setting.user.email,
- eventName: "created",
- collection,
- actor: collection.user,
- unsubscribeUrl: setting.unsubscribeUrl,
+ await EmailTask.schedule({
+ type: "collectionNotification",
+ options: {
+ to: setting.user.email,
+ eventName: "created",
+ collectionId: collection.id,
+ unsubscribeUrl: setting.unsubscribeUrl,
+ },
});
}
}
diff --git a/server/queues/processors/revisions.test.ts b/server/queues/processors/RevisionsProcessor.test.ts
similarity index 71%
rename from server/queues/processors/revisions.test.ts
rename to server/queues/processors/RevisionsProcessor.test.ts
index 514594b368..0c813d73f2 100644
--- a/server/queues/processors/revisions.test.ts
+++ b/server/queues/processors/RevisionsProcessor.test.ts
@@ -1,22 +1,27 @@
import { Revision } from "@server/models";
import { buildDocument } from "@server/test/factories";
import { flushdb } from "@server/test/support";
-import RevisionsService from "./revisions";
+import RevisionsProcessor from "./RevisionsProcessor";
+
+const ip = "127.0.0.1";
-const Revisions = new RevisionsService();
beforeEach(() => flushdb());
beforeEach(jest.resetAllMocks);
describe("documents.update.debounced", () => {
test("should create a revision", async () => {
const document = await buildDocument();
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update.deboun... Remove this comment to see the full error message
- await Revisions.on({
+
+ const processor = new RevisionsProcessor();
+ await processor.perform({
name: "documents.update.debounced",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ createdAt: new Date().toISOString(),
+ data: { title: document.title, autosave: false, done: true },
+ ip,
});
const amount = await Revision.count({
where: {
@@ -29,13 +34,17 @@ describe("documents.update.debounced", () => {
test("should not create a revision if identical to previous", async () => {
const document = await buildDocument();
await Revision.createFromDocument(document);
- // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update.deboun... Remove this comment to see the full error message
- await Revisions.on({
+
+ const processor = new RevisionsProcessor();
+ await processor.perform({
name: "documents.update.debounced",
documentId: document.id,
collectionId: document.collectionId,
teamId: document.teamId,
actorId: document.createdById,
+ createdAt: new Date().toISOString(),
+ data: { title: document.title, autosave: false, done: true },
+ ip,
});
const amount = await Revision.count({
where: {
diff --git a/server/queues/processors/revisions.ts b/server/queues/processors/RevisionsProcessor.ts
similarity index 72%
rename from server/queues/processors/revisions.ts
rename to server/queues/processors/RevisionsProcessor.ts
index 9c0389aa44..856383b8d7 100644
--- a/server/queues/processors/revisions.ts
+++ b/server/queues/processors/RevisionsProcessor.ts
@@ -1,10 +1,15 @@
import invariant from "invariant";
import revisionCreator from "@server/commands/revisionCreator";
+import { APM } from "@server/logging/tracing";
import { Revision, Document, User } from "@server/models";
-import { DocumentEvent, RevisionEvent } from "../../types";
+import { DocumentEvent, RevisionEvent, Event } from "@server/types";
+import BaseProcessor from "./BaseProcessor";
-export default class RevisionsProcessor {
- async on(event: DocumentEvent | RevisionEvent) {
+@APM.trace()
+export default class RevisionsProcessor extends BaseProcessor {
+ static applicableEvents: Event["name"][] = ["documents.update.debounced"];
+
+ async perform(event: DocumentEvent | RevisionEvent) {
switch (event.name) {
case "documents.update.debounced": {
const document = await Document.findByPk(event.documentId);
diff --git a/server/queues/processors/slack.ts b/server/queues/processors/SlackProcessor.ts
similarity index 89%
rename from server/queues/processors/slack.ts
rename to server/queues/processors/SlackProcessor.ts
index f793ee2776..a5d81f5407 100644
--- a/server/queues/processors/slack.ts
+++ b/server/queues/processors/SlackProcessor.ts
@@ -1,5 +1,6 @@
import fetch from "fetch-with-proxy";
import { Op } from "sequelize";
+import { APM } from "@server/logging/tracing";
import { Document, Integration, Collection, Team } from "@server/models";
import { presentSlackAttachment } from "@server/presenters";
import {
@@ -7,10 +8,18 @@ import {
IntegrationEvent,
RevisionEvent,
Event,
-} from "../../types";
+} from "@server/types";
+import BaseProcessor from "./BaseProcessor";
-export default class SlackProcessor {
- async on(event: Event) {
+@APM.trace()
+export default class SlackProcessor extends BaseProcessor {
+ static applicableEvents: Event["name"][] = [
+ "documents.publish",
+ "revisions.create",
+ "integrations.create",
+ ];
+
+ async perform(event: Event) {
switch (event.name) {
case "documents.publish":
case "revisions.create":
diff --git a/server/queues/processors/websockets.ts b/server/queues/processors/WebsocketsProcessor.ts
similarity index 99%
rename from server/queues/processors/websockets.ts
rename to server/queues/processors/WebsocketsProcessor.ts
index 15bcf72e2e..16807e31de 100644
--- a/server/queues/processors/websockets.ts
+++ b/server/queues/processors/WebsocketsProcessor.ts
@@ -1,5 +1,7 @@
import { subHours } from "date-fns";
import { Op } from "sequelize";
+import { Server } from "socket.io";
+import { APM } from "@server/logging/tracing";
import {
Document,
Collection,
@@ -17,8 +19,9 @@ import {
} from "@server/presenters";
import { Event } from "../../types";
+@APM.trace()
export default class WebsocketsProcessor {
- async on(event: Event, socketio: any) {
+ async perform(event: Event, socketio: Server) {
switch (event.name) {
case "documents.publish":
case "documents.restore":
@@ -604,6 +607,7 @@ export default class WebsocketsProcessor {
}
default:
+ return;
}
}
}
diff --git a/server/queues/processors/emails.ts b/server/queues/processors/emails.ts
deleted file mode 100644
index 1e2e9c7972..0000000000
--- a/server/queues/processors/emails.ts
+++ /dev/null
@@ -1,12 +0,0 @@
-import mailer, { EmailSendOptions, EmailTypes } from "../../mailer";
-
-type EmailEvent = {
- type: EmailTypes;
- opts: EmailSendOptions;
-};
-
-export default class EmailsProcessor {
- async on(event: EmailEvent) {
- await mailer[event.type](event.opts);
- }
-}
diff --git a/server/queues/processors/index.ts b/server/queues/processors/index.ts
new file mode 100644
index 0000000000..fb911c4783
--- /dev/null
+++ b/server/queues/processors/index.ts
@@ -0,0 +1,16 @@
+import { requireDirectory } from "@server/utils/fs";
+
+const processors = {};
+
+requireDirectory(__dirname).forEach(([module, id]) => {
+ // @ts-expect-error ts-migrate(2339) FIXME: Property 'default' does not exist on type 'unknown'
+ const { default: Processor } = module;
+
+ if (id === "index") {
+ return;
+ }
+
+ processors[id] = Processor;
+});
+
+export default processors;
diff --git a/server/queues/tasks/BaseTask.ts b/server/queues/tasks/BaseTask.ts
new file mode 100644
index 0000000000..9d985121ac
--- /dev/null
+++ b/server/queues/tasks/BaseTask.ts
@@ -0,0 +1,36 @@
+import { JobOptions } from "bull";
+import { taskQueue } from "../";
+
+export enum TaskPriority {
+ Background = 40,
+ Low = 30,
+ Normal = 20,
+ High = 10,
+}
+
+export default abstract class BaseTask {
+ public static schedule(props: T) {
+ // @ts-expect-error cannot create an instance of an abstract class, we wont
+ const task = new this();
+ return taskQueue.add(
+ {
+ name: this.name,
+ props,
+ },
+ task.options
+ );
+ }
+
+ public abstract perform(props: T): Promise;
+
+ public get options(): JobOptions {
+ return {
+ priority: TaskPriority.Normal,
+ attempts: 5,
+ backoff: {
+ type: "exponential",
+ delay: 60 * 1000,
+ },
+ };
+ }
+}
diff --git a/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts b/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts
new file mode 100644
index 0000000000..0c654d6cef
--- /dev/null
+++ b/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts
@@ -0,0 +1,72 @@
+import { subDays } from "date-fns";
+import { Document } from "@server/models";
+import { buildDocument } from "@server/test/factories";
+import { flushdb } from "@server/test/support";
+import CleanupDeletedDocumentsTask from "./CleanupDeletedDocumentsTask";
+
+beforeEach(() => flushdb());
+
+describe("CleanupDeletedDocumentsTask", () => {
+ it("should not destroy documents not deleted", async () => {
+ await buildDocument({
+ publishedAt: new Date(),
+ });
+
+ const task = new CleanupDeletedDocumentsTask();
+ await task.perform({ limit: 100 });
+
+ expect(
+ await Document.unscoped().count({
+ paranoid: false,
+ })
+ ).toEqual(1);
+ });
+
+ it("should not destroy documents deleted less than 30 days ago", async () => {
+ await buildDocument({
+ publishedAt: new Date(),
+ deletedAt: subDays(new Date(), 25),
+ });
+
+ const task = new CleanupDeletedDocumentsTask();
+ await task.perform({ limit: 100 });
+
+ expect(
+ await Document.unscoped().count({
+ paranoid: false,
+ })
+ ).toEqual(1);
+ });
+
+ it("should destroy documents deleted more than 30 days ago", async () => {
+ await buildDocument({
+ publishedAt: new Date(),
+ deletedAt: subDays(new Date(), 60),
+ });
+
+ const task = new CleanupDeletedDocumentsTask();
+ await task.perform({ limit: 100 });
+
+ expect(
+ await Document.unscoped().count({
+ paranoid: false,
+ })
+ ).toEqual(0);
+ });
+
+ it("should destroy draft documents deleted more than 30 days ago", async () => {
+ await buildDocument({
+ publishedAt: undefined,
+ deletedAt: subDays(new Date(), 60),
+ });
+
+ const task = new CleanupDeletedDocumentsTask();
+ await task.perform({ limit: 100 });
+
+ expect(
+ await Document.unscoped().count({
+ paranoid: false,
+ })
+ ).toEqual(0);
+ });
+});
diff --git a/server/queues/tasks/CleanupDeletedDocumentsTask.ts b/server/queues/tasks/CleanupDeletedDocumentsTask.ts
new file mode 100644
index 0000000000..ed54e8f79e
--- /dev/null
+++ b/server/queues/tasks/CleanupDeletedDocumentsTask.ts
@@ -0,0 +1,40 @@
+import { subDays } from "date-fns";
+import { Op } from "sequelize";
+import documentPermanentDeleter from "@server/commands/documentPermanentDeleter";
+import Logger from "@server/logging/logger";
+import { APM } from "@server/logging/tracing";
+import { Document } from "@server/models";
+import BaseTask, { TaskPriority } from "./BaseTask";
+
+type Props = {
+ limit: number;
+};
+
+@APM.trace()
+export default class CleanupDeletedDocumentsTask extends BaseTask {
+ public async perform({ limit }: Props) {
+ Logger.info(
+ "task",
+ `Permanently destroying upto ${limit} documents older than 30 days…`
+ );
+ const documents = await Document.scope("withDrafts").findAll({
+ attributes: ["id", "teamId", "text", "deletedAt"],
+ where: {
+ deletedAt: {
+ [Op.lt]: subDays(new Date(), 30),
+ },
+ },
+ paranoid: false,
+ limit,
+ });
+ const countDeletedDocument = await documentPermanentDeleter(documents);
+ Logger.info("task", `Destroyed ${countDeletedDocument} documents`);
+ }
+
+ public get options() {
+ return {
+ attempts: 1,
+ priority: TaskPriority.Background,
+ };
+ }
+}
diff --git a/server/queues/tasks/CleanupDeletedTeamsTask.ts b/server/queues/tasks/CleanupDeletedTeamsTask.ts
new file mode 100644
index 0000000000..d874a31bbe
--- /dev/null
+++ b/server/queues/tasks/CleanupDeletedTeamsTask.ts
@@ -0,0 +1,42 @@
+import { subDays } from "date-fns";
+import { Op } from "sequelize";
+import teamPermanentDeleter from "@server/commands/teamPermanentDeleter";
+import Logger from "@server/logging/logger";
+import { APM } from "@server/logging/tracing";
+import { Team } from "@server/models";
+import BaseTask, { TaskPriority } from "./BaseTask";
+
+type Props = {
+ limit: number;
+};
+
+@APM.trace()
+export default class CleanupDeletedTeamsTask extends BaseTask {
+ public async perform({ limit }: Props) {
+ Logger.info(
+ "task",
+ `Permanently destroying upto ${limit} teams older than 30 days…`
+ );
+ const teams = await Team.findAll({
+ where: {
+ deletedAt: {
+ [Op.lt]: subDays(new Date(), 30),
+ },
+ },
+ paranoid: false,
+ limit,
+ });
+
+ for (const team of teams) {
+ await teamPermanentDeleter(team);
+ }
+ Logger.info("task", `Destroyed ${teams.length} teams`);
+ }
+
+ public get options() {
+ return {
+ attempts: 1,
+ priority: TaskPriority.Background,
+ };
+ }
+}
diff --git a/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts b/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts
new file mode 100644
index 0000000000..e43ecdb47d
--- /dev/null
+++ b/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts
@@ -0,0 +1,56 @@
+import { subDays } from "date-fns";
+import { FileOperation } from "@server/models";
+import { buildFileOperation } from "@server/test/factories";
+import { flushdb } from "@server/test/support";
+import CleanupExpiredFileOperationsTask from "./CleanupExpiredFileOperationsTask";
+
+beforeEach(() => flushdb());
+
+describe("CleanupExpiredFileOperationsTask", () => {
+ it("should expire exports older than 30 days ago", async () => {
+ await buildFileOperation({
+ type: "export",
+ state: "complete",
+ createdAt: subDays(new Date(), 30),
+ });
+ await buildFileOperation({
+ type: "export",
+ state: "complete",
+ });
+
+ /* This is a test helper that creates a new task and runs it. */
+ const task = new CleanupExpiredFileOperationsTask();
+ await task.perform({ limit: 100 });
+
+ const data = await FileOperation.count({
+ where: {
+ type: "export",
+ state: "expired",
+ },
+ });
+ expect(data).toEqual(1);
+ });
+
+ it("should not expire exports made less than 30 days ago", async () => {
+ await buildFileOperation({
+ type: "export",
+ state: "complete",
+ createdAt: subDays(new Date(), 29),
+ });
+ await buildFileOperation({
+ type: "export",
+ state: "complete",
+ });
+
+ const task = new CleanupExpiredFileOperationsTask();
+ await task.perform({ limit: 100 });
+
+ const data = await FileOperation.count({
+ where: {
+ type: "export",
+ state: "expired",
+ },
+ });
+ expect(data).toEqual(0);
+ });
+});
diff --git a/server/queues/tasks/CleanupExpiredFileOperationsTask.ts b/server/queues/tasks/CleanupExpiredFileOperationsTask.ts
new file mode 100644
index 0000000000..7544f22e88
--- /dev/null
+++ b/server/queues/tasks/CleanupExpiredFileOperationsTask.ts
@@ -0,0 +1,40 @@
+import { subDays } from "date-fns";
+import { Op } from "sequelize";
+import Logger from "@server/logging/logger";
+import { APM } from "@server/logging/tracing";
+import { FileOperation } from "@server/models";
+import BaseTask, { TaskPriority } from "./BaseTask";
+
+type Props = {
+ limit: number;
+};
+
+@APM.trace()
+export default class CleanupExpiredFileOperationsTask extends BaseTask {
+ public async perform({ limit }: Props) {
+ Logger.info("task", `Expiring export file operations older than 30 days…`);
+ const fileOperations = await FileOperation.unscoped().findAll({
+ where: {
+ type: "export",
+ createdAt: {
+ [Op.lt]: subDays(new Date(), 30),
+ },
+ state: {
+ [Op.ne]: "expired",
+ },
+ },
+ limit,
+ });
+ await Promise.all(
+ fileOperations.map((fileOperation) => fileOperation.expire())
+ );
+ Logger.info("task", `Expired ${fileOperations.length} file operations`);
+ }
+
+ public get options() {
+ return {
+ attempts: 1,
+ priority: TaskPriority.Background,
+ };
+ }
+}
diff --git a/server/queues/tasks/EmailTask.ts b/server/queues/tasks/EmailTask.ts
new file mode 100644
index 0000000000..362ca0ce03
--- /dev/null
+++ b/server/queues/tasks/EmailTask.ts
@@ -0,0 +1,15 @@
+import { APM } from "@server/logging/tracing";
+import mailer, { EmailSendOptions, EmailTypes } from "../../mailer";
+import BaseTask from "./BaseTask";
+
+type Props = {
+ type: EmailTypes;
+ options: EmailSendOptions;
+};
+
+@APM.trace()
+export default class EmailTask extends BaseTask {
+ public async perform(props: Props) {
+ await mailer[props.type](props.options);
+ }
+}
diff --git a/server/queues/tasks/index.ts b/server/queues/tasks/index.ts
new file mode 100644
index 0000000000..96e8040ee9
--- /dev/null
+++ b/server/queues/tasks/index.ts
@@ -0,0 +1,16 @@
+import { requireDirectory } from "@server/utils/fs";
+
+const tasks = {};
+
+requireDirectory(__dirname).forEach(([module, id]) => {
+ // @ts-expect-error ts-migrate(2339) FIXME: Property 'default' does not exist on type 'unknown'
+ const { default: Task } = module;
+
+ if (id === "index") {
+ return;
+ }
+
+ tasks[id] = Task;
+});
+
+export default tasks;
diff --git a/server/routes/api/documents.ts b/server/routes/api/documents.ts
index e381cc5817..25d3e8b810 100644
--- a/server/routes/api/documents.ts
+++ b/server/routes/api/documents.ts
@@ -1024,7 +1024,7 @@ router.post("documents.update", auth(), async (ctx) => {
}
if (document.title !== previousTitle) {
- Event.add({
+ Event.schedule({
name: "documents.title_change",
documentId: document.id,
collectionId: document.collectionId,
diff --git a/server/routes/api/utils.test.ts b/server/routes/api/utils.test.ts
index 6b2b8c4692..7376e5a53b 100644
--- a/server/routes/api/utils.test.ts
+++ b/server/routes/api/utils.test.ts
@@ -1,8 +1,5 @@
-import { subDays } from "date-fns";
import TestServer from "fetch-test-server";
-import { Document, FileOperation } from "@server/models";
import webService from "@server/services/web";
-import { buildDocument, buildFileOperation } from "@server/test/factories";
import { flushdb } from "@server/test/support";
const app = webService();
@@ -12,127 +9,6 @@ beforeEach(() => flushdb());
afterAll(() => server.close());
describe("#utils.gc", () => {
- it("should not destroy documents not deleted", async () => {
- await buildDocument({
- publishedAt: new Date(),
- });
- const res = await server.post("/api/utils.gc", {
- body: {
- token: process.env.UTILS_SECRET,
- },
- });
- expect(res.status).toEqual(200);
- expect(
- await Document.unscoped().count({
- paranoid: false,
- })
- ).toEqual(1);
- });
-
- it("should not destroy documents deleted less than 30 days ago", async () => {
- await buildDocument({
- publishedAt: new Date(),
- deletedAt: subDays(new Date(), 25),
- });
- const res = await server.post("/api/utils.gc", {
- body: {
- token: process.env.UTILS_SECRET,
- },
- });
- expect(res.status).toEqual(200);
- expect(
- await Document.unscoped().count({
- paranoid: false,
- })
- ).toEqual(1);
- });
-
- it("should destroy documents deleted more than 30 days ago", async () => {
- await buildDocument({
- publishedAt: new Date(),
- deletedAt: subDays(new Date(), 60),
- });
- const res = await server.post("/api/utils.gc", {
- body: {
- token: process.env.UTILS_SECRET,
- },
- });
- expect(res.status).toEqual(200);
- expect(
- await Document.unscoped().count({
- paranoid: false,
- })
- ).toEqual(0);
- });
-
- it("should destroy draft documents deleted more than 30 days ago", async () => {
- await buildDocument({
- publishedAt: undefined,
- deletedAt: subDays(new Date(), 60),
- });
- const res = await server.post("/api/utils.gc", {
- body: {
- token: process.env.UTILS_SECRET,
- },
- });
- expect(res.status).toEqual(200);
- expect(
- await Document.unscoped().count({
- paranoid: false,
- })
- ).toEqual(0);
- });
-
- it("should expire exports older than 30 days ago", async () => {
- await buildFileOperation({
- type: "export",
- state: "complete",
- createdAt: subDays(new Date(), 30),
- });
- await buildFileOperation({
- type: "export",
- state: "complete",
- });
- const res = await server.post("/api/utils.gc", {
- body: {
- token: process.env.UTILS_SECRET,
- },
- });
- const data = await FileOperation.count({
- where: {
- type: "export",
- state: "expired",
- },
- });
- expect(res.status).toEqual(200);
- expect(data).toEqual(1);
- });
-
- it("should not expire exports made less than 30 days ago", async () => {
- await buildFileOperation({
- type: "export",
- state: "complete",
- createdAt: subDays(new Date(), 29),
- });
- await buildFileOperation({
- type: "export",
- state: "complete",
- });
- const res = await server.post("/api/utils.gc", {
- body: {
- token: process.env.UTILS_SECRET,
- },
- });
- const data = await FileOperation.count({
- where: {
- type: "export",
- state: "expired",
- },
- });
- expect(res.status).toEqual(200);
- expect(data).toEqual(0);
- });
-
it("should require authentication", async () => {
const res = await server.post("/api/utils.gc");
expect(res.status).toEqual(401);
diff --git a/server/routes/api/utils.ts b/server/routes/api/utils.ts
index ccb9a9c080..2ad06fffb9 100644
--- a/server/routes/api/utils.ts
+++ b/server/routes/api/utils.ts
@@ -1,11 +1,8 @@
-import { subDays } from "date-fns";
import Router from "koa-router";
-import { Op } from "sequelize";
-import documentPermanentDeleter from "@server/commands/documentPermanentDeleter";
-import teamPermanentDeleter from "@server/commands/teamPermanentDeleter";
import { AuthenticationError } from "@server/errors";
-import { Document, Team, FileOperation } from "@server/models";
-import Logger from "../../logging/logger";
+import CleanupDeletedDocumentsTask from "@server/queues/tasks/CleanupDeletedDocumentsTask";
+import CleanupDeletedTeamsTask from "@server/queues/tasks/CleanupDeletedTeamsTask";
+import CleanupExpiredFileOperationsTask from "@server/queues/tasks/CleanupExpiredFileOperationsTask";
const router = new Router();
@@ -16,59 +13,11 @@ router.post("utils.gc", async (ctx) => {
throw AuthenticationError("Invalid secret token");
}
- Logger.info(
- "utils",
- `Permanently destroying upto ${limit} documents older than 30 days…`
- );
- const documents = await Document.scope("withDrafts").findAll({
- attributes: ["id", "teamId", "text", "deletedAt"],
- where: {
- deletedAt: {
- [Op.lt]: subDays(new Date(), 30),
- },
- },
- paranoid: false,
- limit,
- });
- const countDeletedDocument = await documentPermanentDeleter(documents);
- Logger.info("utils", `Destroyed ${countDeletedDocument} documents`);
- Logger.info(
- "utils",
- `Expiring all the collection export older than 30 days…`
- );
- const exports = await FileOperation.unscoped().findAll({
- where: {
- type: "export",
- createdAt: {
- [Op.lt]: subDays(new Date(), 30),
- },
- state: {
- [Op.ne]: "expired",
- },
- },
- });
- await Promise.all(
- exports.map(async (e) => {
- await e.expire();
- })
- );
- Logger.info(
- "utils",
- `Permanently destroying upto ${limit} teams older than 30 days…`
- );
- const teams = await Team.findAll({
- where: {
- deletedAt: {
- [Op.lt]: subDays(new Date(), 30),
- },
- },
- paranoid: false,
- limit,
- });
+ await CleanupDeletedDocumentsTask.schedule({ limit });
- for (const team of teams) {
- await teamPermanentDeleter(team);
- }
+ await CleanupExpiredFileOperationsTask.schedule({ limit });
+
+ await CleanupDeletedTeamsTask.schedule({ limit });
ctx.body = {
success: true,
diff --git a/server/routes/auth/providers/email.test.ts b/server/routes/auth/providers/email.test.ts
index 52cd77b9c4..5e151a5cd9 100644
--- a/server/routes/auth/providers/email.test.ts
+++ b/server/routes/auth/providers/email.test.ts
@@ -1,5 +1,5 @@
import TestServer from "fetch-test-server";
-import mailer from "@server/mailer";
+import EmailTask from "@server/queues/tasks/EmailTask";
import webService from "@server/services/web";
import { buildUser, buildGuestUser, buildTeam } from "@server/test/factories";
import { flushdb } from "@server/test/support";
@@ -24,7 +24,7 @@ describe("email", () => {
});
it("should respond with redirect location when user is SSO enabled", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
const user = await buildUser();
const res = await server.post("/auth/email", {
body: {
@@ -42,7 +42,7 @@ describe("email", () => {
process.env.URL = "http://localoutline.com";
process.env.SUBDOMAINS_ENABLED = "true";
const user = await buildUser();
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
await buildTeam({
subdomain: "example",
});
@@ -62,7 +62,7 @@ describe("email", () => {
});
it("should respond with success when user is not SSO enabled", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
const user = await buildGuestUser();
const res = await server.post("/auth/email", {
body: {
@@ -77,7 +77,7 @@ describe("email", () => {
});
it("should respond with success regardless of whether successful to prevent crawling email logins", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
const res = await server.post("/auth/email", {
body: {
email: "user@example.com",
@@ -91,7 +91,7 @@ describe("email", () => {
});
describe("with multiple users matching email", () => {
it("should default to current subdomain with SSO", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
process.env.URL = "http://localoutline.com";
process.env.SUBDOMAINS_ENABLED = "true";
const email = "sso-user@example.org";
@@ -121,7 +121,7 @@ describe("email", () => {
});
it("should default to current subdomain with guest email", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
process.env.URL = "http://localoutline.com";
process.env.SUBDOMAINS_ENABLED = "true";
const email = "guest-user@example.org";
@@ -151,7 +151,7 @@ describe("email", () => {
});
it("should default to custom domain with SSO", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
const email = "sso-user-2@example.org";
const team = await buildTeam({
domain: "docs.mycompany.com",
@@ -179,7 +179,7 @@ describe("email", () => {
});
it("should default to custom domain with guest email", async () => {
- const spy = jest.spyOn(mailer, "sendTemplate");
+ const spy = jest.spyOn(EmailTask, "schedule");
const email = "guest-user-2@example.org";
const team = await buildTeam({
domain: "docs.mycompany.com",
diff --git a/server/routes/auth/providers/email.ts b/server/routes/auth/providers/email.ts
index 5a5383cc9e..4ce575a872 100644
--- a/server/routes/auth/providers/email.ts
+++ b/server/routes/auth/providers/email.ts
@@ -3,10 +3,10 @@ import Router from "koa-router";
import { find } from "lodash";
import { parseDomain, isCustomSubdomain } from "@shared/utils/domains";
import { AuthorizationError } from "@server/errors";
-import mailer from "@server/mailer";
import errorHandling from "@server/middlewares/errorHandling";
import methodOverride from "@server/middlewares/methodOverride";
import { User, Team } from "@server/models";
+import EmailTask from "@server/queues/tasks/EmailTask";
import { signIn } from "@server/utils/authentication";
import { isCustomDomain } from "@server/utils/domains";
import { getUserForEmailSigninToken } from "@server/utils/jwt";
@@ -110,10 +110,13 @@ router.post("email", errorHandling(), async (ctx) => {
}
// send email to users registered address with a short-lived token
- await mailer.sendTemplate("signin", {
- to: user.email,
- token: user.getEmailSigninToken(),
- teamUrl: team.url,
+ await EmailTask.schedule({
+ type: "signin",
+ options: {
+ to: user.email,
+ token: user.getEmailSigninToken(),
+ teamUrl: team.url,
+ },
});
user.lastSigninEmailSentAt = new Date();
await user.save();
@@ -147,9 +150,12 @@ router.get("email.callback", async (ctx) => {
}
if (user.isInvited) {
- await mailer.sendTemplate("welcome", {
- to: user.email,
- teamUrl: user.team.url,
+ await EmailTask.schedule({
+ type: "welcome",
+ options: {
+ to: user.email,
+ teamUrl: user.team.url,
+ },
});
}
diff --git a/server/services/admin.ts b/server/services/admin.ts
index 189069781c..182d414613 100644
--- a/server/services/admin.ts
+++ b/server/services/admin.ts
@@ -3,10 +3,10 @@ import { BullAdapter } from "@bull-board/api/bullAdapter";
import { KoaAdapter } from "@bull-board/koa";
import Koa from "koa";
import {
- emailsQueue,
globalEventQueue,
processorEventQueue,
- websocketsQueue,
+ websocketQueue,
+ taskQueue,
} from "../queues";
export default function init(app: Koa) {
@@ -15,8 +15,8 @@ export default function init(app: Koa) {
queues: [
new BullAdapter(globalEventQueue),
new BullAdapter(processorEventQueue),
- new BullAdapter(emailsQueue),
- new BullAdapter(websocketsQueue),
+ new BullAdapter(websocketQueue),
+ new BullAdapter(taskQueue),
],
serverAdapter,
});
diff --git a/server/services/websockets.ts b/server/services/websockets.ts
index b3069bc2df..52228de852 100644
--- a/server/services/websockets.ts
+++ b/server/services/websockets.ts
@@ -9,8 +9,8 @@ import Metrics from "@server/logging/metrics";
import { Document, Collection, View } from "@server/models";
import { can } from "@server/policies";
import { getUserForJWT } from "@server/utils/jwt";
-import { websocketsQueue } from "../queues";
-import WebsocketsProcessor from "../queues/processors/websockets";
+import { websocketQueue } from "../queues";
+import WebsocketsProcessor from "../queues/processors/WebsocketsProcessor";
import { client, subscriber } from "../redis";
export default function init(app: Koa, server: http.Server) {
@@ -247,9 +247,9 @@ export default function init(app: Koa, server: http.Server) {
// Handle events from event queue that should be sent to the clients down ws
const websockets = new WebsocketsProcessor();
- websocketsQueue.process(async function websocketEventsProcessor(job) {
+ websocketQueue.process(async function websocketEventsProcessor(job) {
const event = job.data;
- websockets.on(event, io).catch((error) => {
+ websockets.perform(event, io).catch((error) => {
Logger.error("Error processing websocket event", error, {
event,
});
diff --git a/server/services/worker.ts b/server/services/worker.ts
index 7040f45034..3473bf0f94 100644
--- a/server/services/worker.ts
+++ b/server/services/worker.ts
@@ -2,68 +2,77 @@ import Logger from "@server/logging/logger";
import {
globalEventQueue,
processorEventQueue,
- websocketsQueue,
- emailsQueue,
+ websocketQueue,
+ taskQueue,
} from "../queues";
-import Backlinks from "../queues/processors/backlinks";
-import Debouncer from "../queues/processors/debouncer";
-import Emails from "../queues/processors/emails";
-import Exports from "../queues/processors/exports";
-import Imports from "../queues/processors/imports";
-import Notifications from "../queues/processors/notifications";
-import Revisions from "../queues/processors/revisions";
-import Slack from "../queues/processors/slack";
-
-const EmailsProcessor = new Emails();
-const eventProcessors = {
- backlinks: new Backlinks(),
- debouncer: new Debouncer(),
- imports: new Imports(),
- exports: new Exports(),
- notifications: new Notifications(),
- revisions: new Revisions(),
- slack: new Slack(),
-};
+import processors from "../queues/processors";
+import tasks from "../queues/tasks";
export default function init() {
- // this queue processes global events and hands them off to services
+ // This queue processes the global event bus
globalEventQueue.process(function (job) {
- Object.keys(eventProcessors).forEach((name) => {
- processorEventQueue.add({ ...job.data, service: name });
- });
- websocketsQueue.add(job.data);
- });
- processorEventQueue.process(function (job) {
const event = job.data;
- const processor = eventProcessors[event.service];
- if (!processor) {
- Logger.warn(`Received event for processor that isn't registered`, event);
- return;
+ // For each registered processor we check to see if it wants to handle the
+ // event (applicableEvents), and if so add a new queued job specifically
+ // for that processor.
+ for (const name in processors) {
+ const ProcessorClass = processors[name];
+
+ if (name === "WebsocketsProcessor") {
+ // websockets are a special case on their own queue because they must
+ // only be consumed by the websockets service rather than workers.
+ websocketQueue.add(job.data);
+ } else if (
+ ProcessorClass.applicableEvents.length === 0 ||
+ ProcessorClass.applicableEvents.includes(event.name)
+ ) {
+ processorEventQueue.add({ event, name });
+ }
+ }
+ });
+
+ // Jobs for individual processors are processed here. Only applicable events
+ // as unapplicable events were filtered in the global event queue above.
+ processorEventQueue.process(function (job) {
+ const { event, name } = job.data;
+ const ProcessorClass = processors[name];
+
+ if (!ProcessorClass) {
+ throw new Error(
+ `Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.`
+ );
}
- if (processor.on) {
- Logger.info("processor", `${event.service} processing ${event.name}`, {
+ const processor = new ProcessorClass();
+
+ if (processor.perform) {
+ Logger.info("processor", `${name} processing ${event.name}`, {
name: event.name,
modelId: event.modelId,
});
- processor.on(event).catch((error: Error) => {
- Logger.error(
- `Error processing ${event.name} in ${event.service}`,
- error,
- event
- );
+ processor.perform(event).catch((error: Error) => {
+ Logger.error(`Error processing ${event.name} in ${name}`, error, event);
});
}
});
- emailsQueue.process(function (job) {
- const event = job.data;
- EmailsProcessor.on(event).catch((error) => {
- Logger.error(
- `Error processing ${event.name} in emails processor`,
- error,
- event
+
+ // Jobs for async tasks are processed here.
+ taskQueue.process(function (job) {
+ const { name, props } = job.data;
+ const TaskClass = tasks[name];
+
+ if (!TaskClass) {
+ throw new Error(
+ `Task "${name}" is not registered. Check the file name matches the class name.`
);
+ }
+
+ Logger.info("task", `${name} triggered`, props);
+
+ const task = new TaskClass();
+ task.perform(props).catch((error: Error) => {
+ Logger.error(`Error processing task in ${name}`, error, props);
});
});
}