From dbfdcd6d23680a3fac81a1bd7d71fcfbcf0a6bbb Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Wed, 6 Apr 2022 16:48:28 -0700 Subject: [PATCH] chore: Refactor worker, emails and data cleanup to task system (#3337) * Refactor worker, all emails on task system * fix * lint * fix: Remove a bunch of expect-error comments in related tests * refactor: Move work from utils.gc into tasks * test * Add tracing to tasks and processors fix: DebounceProcessor triggering on all events Event.add -> Event.schedule --- docs/ARCHITECTURE.md | 3 +- server/commands/accountProvisioner.test.ts | 8 +- server/commands/accountProvisioner.ts | 11 +- server/commands/documentUpdater.ts | 2 +- server/commands/userInviter.ts | 20 +-- server/emails/CollectionNotificationEmail.tsx | 9 +- server/emails/DocumentNotificationEmail.tsx | 30 ++--- server/logging/logger.ts | 1 + server/mailer.tsx | 68 +++++----- server/models/Event.ts | 6 +- server/queues/index.ts | 4 +- ...nks.test.ts => BacklinksProcessor.test.ts} | 83 ++++++++---- .../{backlinks.ts => BacklinksProcessor.ts} | 16 ++- server/queues/processors/BaseProcessor.ts | 7 + .../{debouncer.ts => DebounceProcessor.ts} | 16 ++- .../{exports.ts => ExportsProcessor.ts} | 34 +++-- .../{imports.ts => ImportsProcessor.ts} | 13 +- ...test.ts => NotificationsProcessor.test.ts} | 46 ++++--- ...fications.ts => NotificationsProcessor.ts} | 48 ++++--- ...ons.test.ts => RevisionsProcessor.test.ts} | 21 ++- .../{revisions.ts => RevisionsProcessor.ts} | 11 +- .../{slack.ts => SlackProcessor.ts} | 15 ++- .../{websockets.ts => WebsocketsProcessor.ts} | 6 +- server/queues/processors/emails.ts | 12 -- server/queues/processors/index.ts | 16 +++ server/queues/tasks/BaseTask.ts | 36 +++++ .../tasks/CleanupDeletedDocumentsTask.test.ts | 72 ++++++++++ .../tasks/CleanupDeletedDocumentsTask.ts | 40 ++++++ .../queues/tasks/CleanupDeletedTeamsTask.ts | 42 ++++++ .../CleanupExpiredFileOperationsTask.test.ts | 56 ++++++++ .../tasks/CleanupExpiredFileOperationsTask.ts | 40 ++++++ server/queues/tasks/EmailTask.ts | 15 +++ server/queues/tasks/index.ts | 16 +++ server/routes/api/documents.ts | 2 +- server/routes/api/utils.test.ts | 124 ------------------ server/routes/api/utils.ts | 65 +-------- server/routes/auth/providers/email.test.ts | 18 +-- server/routes/auth/providers/email.ts | 22 ++-- server/services/admin.ts | 8 +- server/services/websockets.ts | 8 +- server/services/worker.ts | 103 ++++++++------- 41 files changed, 729 insertions(+), 444 deletions(-) rename server/queues/processors/{backlinks.test.ts => BacklinksProcessor.test.ts} (76%) rename server/queues/processors/{backlinks.ts => BacklinksProcessor.ts} (90%) create mode 100644 server/queues/processors/BaseProcessor.ts rename server/queues/processors/{debouncer.ts => DebounceProcessor.ts} (72%) rename server/queues/processors/{exports.ts => ExportsProcessor.ts} (81%) rename server/queues/processors/{imports.ts => ImportsProcessor.ts} (85%) rename server/queues/processors/{notifications.test.ts => NotificationsProcessor.test.ts} (79%) rename server/queues/processors/{notifications.ts => NotificationsProcessor.ts} (80%) rename server/queues/processors/{revisions.test.ts => RevisionsProcessor.test.ts} (71%) rename server/queues/processors/{revisions.ts => RevisionsProcessor.ts} (72%) rename server/queues/processors/{slack.ts => SlackProcessor.ts} (89%) rename server/queues/processors/{websockets.ts => WebsocketsProcessor.ts} (99%) delete mode 100644 server/queues/processors/emails.ts create mode 100644 server/queues/processors/index.ts create mode 100644 server/queues/tasks/BaseTask.ts create mode 100644 server/queues/tasks/CleanupDeletedDocumentsTask.test.ts create mode 100644 server/queues/tasks/CleanupDeletedDocumentsTask.ts create mode 100644 server/queues/tasks/CleanupDeletedTeamsTask.ts create mode 100644 server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts create mode 100644 server/queues/tasks/CleanupExpiredFileOperationsTask.ts create mode 100644 server/queues/tasks/EmailTask.ts create mode 100644 server/queues/tasks/index.ts diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 63516723a0..3c045329c2 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -45,7 +45,8 @@ server ├── policies - Authorization logic based on cancan ├── presenters - JSON presenters for database models, the interface between backend -> frontend ├── queues - Async queue definitions -│ └── processors - Processors perform async jobs, usually working on events from the event bus +│ └── processors - Processors perform jobs on events from the event bus +│ └── tasks - Tasks are arbitrary async jobs not from the event bus ├── services - Services start distinct portions of the application eg api, worker ├── static - Static assets ├── test - Test helpers and fixtures, tests themselves are colocated diff --git a/server/commands/accountProvisioner.test.ts b/server/commands/accountProvisioner.test.ts index b3a591ba10..c1070c7405 100644 --- a/server/commands/accountProvisioner.test.ts +++ b/server/commands/accountProvisioner.test.ts @@ -1,6 +1,6 @@ -import mailer from "@server/mailer"; import Collection from "@server/models/Collection"; import UserAuthentication from "@server/models/UserAuthentication"; +import EmailTask from "@server/queues/tasks/EmailTask"; import { buildUser, buildTeam } from "@server/test/factories"; import { flushdb } from "@server/test/support"; import accountProvisioner from "./accountProvisioner"; @@ -13,7 +13,7 @@ describe("accountProvisioner", () => { const ip = "127.0.0.1"; it("should create a new user and team", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); const { user, team, isNewTeam, isNewUser } = await accountProvisioner({ ip, user: { @@ -55,7 +55,7 @@ describe("accountProvisioner", () => { }); it("should update exising user and authentication", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); const existingTeam = await buildTeam(); const providers = await existingTeam.$get("authenticationProviders"); const authenticationProvider = providers[0]; @@ -149,7 +149,7 @@ describe("accountProvisioner", () => { }); it("should create a new user in an existing team", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); const team = await buildTeam(); const authenticationProviders = await team.$get("authenticationProviders"); const authenticationProvider = authenticationProviders[0]; diff --git a/server/commands/accountProvisioner.ts b/server/commands/accountProvisioner.ts index edb5301952..7d4b3b952e 100644 --- a/server/commands/accountProvisioner.ts +++ b/server/commands/accountProvisioner.ts @@ -6,8 +6,8 @@ import { AuthenticationProviderDisabledError, } from "@server/errors"; import { APM } from "@server/logging/tracing"; -import mailer from "@server/mailer"; import { Collection, Team, User } from "@server/models"; +import EmailTask from "@server/queues/tasks/EmailTask"; import teamCreator from "./teamCreator"; import userCreator from "./userCreator"; @@ -89,9 +89,12 @@ async function accountProvisioner({ const { isNewUser, user } = result; if (isNewUser) { - await mailer.sendTemplate("welcome", { - to: user.email, - teamUrl: team.url, + await EmailTask.schedule({ + type: "welcome", + options: { + to: user.email, + teamUrl: team.url, + }, }); } diff --git a/server/commands/documentUpdater.ts b/server/commands/documentUpdater.ts index 3bcc552bd8..de4f41bc13 100644 --- a/server/commands/documentUpdater.ts +++ b/server/commands/documentUpdater.ts @@ -55,7 +55,7 @@ export default async function documentUpdater({ return; } - await Event.add({ + await Event.schedule({ name: "documents.update", documentId: document.id, collectionId: document.collectionId, diff --git a/server/commands/userInviter.ts b/server/commands/userInviter.ts index 4b2a7ea001..e1c6a83df9 100644 --- a/server/commands/userInviter.ts +++ b/server/commands/userInviter.ts @@ -2,8 +2,8 @@ import invariant from "invariant"; import { uniqBy } from "lodash"; import { Role } from "@shared/types"; import Logger from "@server/logging/logger"; -import mailer from "@server/mailer"; import { User, Event, Team } from "@server/models"; +import EmailTask from "@server/queues/tasks/EmailTask"; type Invite = { name: string; @@ -74,13 +74,17 @@ export default async function userInviter({ }, ip, }); - await mailer.sendTemplate("invite", { - to: invite.email, - name: invite.name, - actorName: user.name, - actorEmail: user.email, - teamName: team.name, - teamUrl: team.url, + + await EmailTask.schedule({ + type: "invite", + options: { + to: invite.email, + name: invite.name, + actorName: user.name, + actorEmail: user.email, + teamName: team.name, + teamUrl: team.url, + }, }); if (process.env.NODE_ENV === "development") { diff --git a/server/emails/CollectionNotificationEmail.tsx b/server/emails/CollectionNotificationEmail.tsx index bfbbb291ec..a79a1f664a 100644 --- a/server/emails/CollectionNotificationEmail.tsx +++ b/server/emails/CollectionNotificationEmail.tsx @@ -1,5 +1,5 @@ import * as React from "react"; -import { User, Collection } from "@server/models"; +import { Collection } from "@server/models"; import Body from "./components/Body"; import Button from "./components/Button"; import EmailTemplate from "./components/EmailLayout"; @@ -9,26 +9,23 @@ import Header from "./components/Header"; import Heading from "./components/Heading"; export type Props = { - actor: User; collection: Collection; eventName: string; unsubscribeUrl: string; }; export const collectionNotificationEmailText = ({ - actor, collection, eventName = "created", }: Props) => ` ${collection.name} -${actor.name} ${eventName} the collection "${collection.name}" +${collection.user.name} ${eventName} the collection "${collection.name}" Open Collection: ${process.env.URL}${collection.url} `; export const CollectionNotificationEmail = ({ - actor, collection, eventName = "created", unsubscribeUrl, @@ -40,7 +37,7 @@ export const CollectionNotificationEmail = ({ {collection.name}

- {actor.name} {eventName} the collection "{collection.name}". + {collection.user.name} {eventName} the collection "{collection.name}".

diff --git a/server/emails/DocumentNotificationEmail.tsx b/server/emails/DocumentNotificationEmail.tsx index 4fc12b8f4b..9622ef1aab 100644 --- a/server/emails/DocumentNotificationEmail.tsx +++ b/server/emails/DocumentNotificationEmail.tsx @@ -1,5 +1,5 @@ import * as React from "react"; -import { Document, User, Team, Collection } from "@server/models"; +import { Document } from "@server/models"; import Body from "./components/Body"; import Button from "./components/Button"; import EmailTemplate from "./components/EmailLayout"; @@ -9,34 +9,34 @@ import Header from "./components/Header"; import Heading from "./components/Heading"; export type Props = { - actor: User; - team: Team; document: Document; - collection: Collection; + actorName: string; + collectionName: string; eventName: string; + teamUrl: string; unsubscribeUrl: string; }; export const documentNotificationEmailText = ({ - actor, - team, + actorName, + teamUrl, document, - collection, + collectionName, eventName = "published", }: Props) => ` "${document.title}" ${eventName} -${actor.name} ${eventName} the document "${document.title}", in the ${collection.name} collection. +${actorName} ${eventName} the document "${document.title}", in the ${collectionName} collection. -Open Document: ${team.url}${document.url} +Open Document: ${teamUrl}${document.url} `; export const DocumentNotificationEmail = ({ - actor, - team, document, - collection, + actorName, + collectionName, eventName = "published", + teamUrl, unsubscribeUrl, }: Props) => { return ( @@ -48,15 +48,15 @@ export const DocumentNotificationEmail = ({ "{document.title}" {eventName}

- {actor.name} {eventName} the document "{document.title}", in the{" "} - {collection.name} collection. + {actorName} {eventName} the document "{document.title}", in the{" "} + {collectionName} collection.


{document.getSummary()}

- +

diff --git a/server/logging/logger.ts b/server/logging/logger.ts index 2e59411721..630c3c200b 100644 --- a/server/logging/logger.ts +++ b/server/logging/logger.ts @@ -11,6 +11,7 @@ type LogCategory = | "http" | "commands" | "processor" + | "task" | "email" | "queue" | "database" diff --git a/server/mailer.tsx b/server/mailer.tsx index 058e43f6d3..dc106beed7 100644 --- a/server/mailer.tsx +++ b/server/mailer.tsx @@ -1,13 +1,13 @@ +import invariant from "invariant"; import nodemailer from "nodemailer"; import Oy from "oy-vey"; import * as React from "react"; +import { Collection, Document } from "@server/models"; import { - Props as CollectionNotificationEmailT, CollectionNotificationEmail, collectionNotificationEmailText, } from "./emails/CollectionNotificationEmail"; import { - Props as DocumentNotificationEmailT, DocumentNotificationEmail, documentNotificationEmailText, } from "./emails/DocumentNotificationEmail"; @@ -28,7 +28,6 @@ import { SigninEmail, signinEmailText } from "./emails/SigninEmail"; import { WelcomeEmail, welcomeEmailText } from "./emails/WelcomeEmail"; import { baseStyles } from "./emails/components/EmailLayout"; import Logger from "./logging/logger"; -import { emailsQueue } from "./queues"; const useTestEmailService = process.env.NODE_ENV === "development" && !process.env.SMTP_USERNAME; @@ -223,49 +222,44 @@ export class Mailer { }); }; - documentNotification = async ( - opts: { - to: string; - } & DocumentNotificationEmailT - ) => { + documentNotification = async (opts: { + to: string; + eventName: string; + actorName: string; + documentId: string; + teamUrl: string; + collectionName: string; + unsubscribeUrl: string; + }) => { + const document = await Document.unscoped().findByPk(opts.documentId); + invariant(document, "Document not found"); + this.sendMail({ to: opts.to, - title: `“${opts.document.title}” ${opts.eventName}`, - previewText: `${opts.actor.name} ${opts.eventName} a new document`, - html: , - text: documentNotificationEmailText(opts), + title: `“${document.title}” ${opts.eventName}`, + previewText: `${opts.actorName} ${opts.eventName} a new document`, + html: , + text: documentNotificationEmailText({ ...opts, document }), }); }; - collectionNotification = async ( - opts: { - to: string; - } & CollectionNotificationEmailT - ) => { + collectionNotification = async (opts: { + to: string; + eventName: string; + collectionId: string; + unsubscribeUrl: string; + }) => { + const collection = await Collection.findByPk(opts.collectionId); + invariant(collection, "Collection not found"); + this.sendMail({ to: opts.to, - title: `“${opts.collection.name}” ${opts.eventName}`, - previewText: `${opts.actor.name} ${opts.eventName} a collection`, - html: , - text: collectionNotificationEmailText(opts), + title: `“${collection.name}” ${opts.eventName}`, + previewText: `${collection.user.name} ${opts.eventName} a collection`, + html: , + text: collectionNotificationEmailText({ ...opts, collection }), }); }; - - sendTemplate = (type: EmailTypes, opts: Record = {}) => { - return emailsQueue.add( - { - type, - opts, - }, - { - attempts: 5, - backoff: { - type: "exponential", - delay: 60 * 1000, - }, - } - ); - }; } const mailer = new Mailer(); diff --git a/server/models/Event.ts b/server/models/Event.ts index 1180b731b9..1cef5359cf 100644 --- a/server/models/Event.ts +++ b/server/models/Event.ts @@ -86,9 +86,9 @@ class Event extends BaseModel { @Column(DataType.UUID) teamId: string; - // add can be used to send events into the event system without recording them - // in the database or audit trail - static add(event: Partial) { + // Schedule can be used to send events into the event system without recording + // them in the database or audit trail – consider using a task instead. + static schedule(event: Partial) { const now = new Date(); globalEventQueue.add( this.build({ diff --git a/server/queues/index.ts b/server/queues/index.ts index 1c08c3e864..2ec47e83b8 100644 --- a/server/queues/index.ts +++ b/server/queues/index.ts @@ -4,6 +4,6 @@ export const globalEventQueue = createQueue("globalEvents"); export const processorEventQueue = createQueue("processorEvents"); -export const websocketsQueue = createQueue("websockets"); +export const websocketQueue = createQueue("websockets"); -export const emailsQueue = createQueue("emails"); +export const taskQueue = createQueue("tasks"); diff --git a/server/queues/processors/backlinks.test.ts b/server/queues/processors/BacklinksProcessor.test.ts similarity index 76% rename from server/queues/processors/backlinks.test.ts rename to server/queues/processors/BacklinksProcessor.test.ts index 270299bbc9..20f1462f21 100644 --- a/server/queues/processors/backlinks.test.ts +++ b/server/queues/processors/BacklinksProcessor.test.ts @@ -1,9 +1,10 @@ import { Backlink } from "@server/models"; import { buildDocument } from "@server/test/factories"; import { flushdb } from "@server/test/support"; -import BacklinksService from "./backlinks"; +import BacklinksProcessor from "./BacklinksProcessor"; + +const ip = "127.0.0.1"; -const Backlinks = new BacklinksService(); beforeEach(() => flushdb()); beforeEach(jest.resetAllMocks); @@ -13,13 +14,16 @@ describe("documents.publish", () => { const document = await buildDocument({ text: `[this is a link](${otherDocument.url})`, }); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.publish"; doc... Remove this comment to see the full error message - await Backlinks.on({ + + const processor = new BacklinksProcessor(); + await processor.perform({ name: "documents.publish", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + data: { title: document.title }, + ip, }); const backlinks = await Backlink.findAll({ where: { @@ -38,13 +42,16 @@ describe("documents.publish", () => { }); document.text = `[this is a link](${otherDocument.url})`; await document.save(); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.publish"; doc... Remove this comment to see the full error message - await Backlinks.on({ + + const processor = new BacklinksProcessor(); + await processor.perform({ name: "documents.publish", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + data: { title: document.title }, + ip, }); const backlinks = await Backlink.findAll({ where: { @@ -61,13 +68,17 @@ describe("documents.update", () => { const document = await buildDocument({ text: `[this is a link](${otherDocument.url})`, }); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message - await Backlinks.on({ + + const processor = new BacklinksProcessor(); + await processor.perform({ name: "documents.update", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + createdAt: new Date().toISOString(), + data: { title: document.title, autosave: false, done: true }, + ip, }); const backlinks = await Backlink.findAll({ where: { @@ -85,13 +96,17 @@ describe("documents.update", () => { }); document.text = `[this is a link](${otherDocument.url})`; await document.save(); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message - await Backlinks.on({ + + const processor = new BacklinksProcessor(); + await processor.perform({ name: "documents.update", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + createdAt: new Date().toISOString(), + data: { title: document.title, autosave: false, done: true }, + ip, }); const backlinks = await Backlink.findAll({ where: { @@ -106,13 +121,17 @@ describe("documents.update", () => { const document = await buildDocument(); document.text = `[this is a link](${otherDocument.url})`; await document.save(); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message - await Backlinks.on({ + + const processor = new BacklinksProcessor(); + await processor.perform({ name: "documents.update", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + createdAt: new Date().toISOString(), + data: { title: document.title, autosave: false, done: true }, + ip, }); const backlinks = await Backlink.findAll({ where: { @@ -130,25 +149,31 @@ describe("documents.update", () => { [this is a another link](${yetAnotherDocument.url})`, }); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.publish"; doc... Remove this comment to see the full error message - await Backlinks.on({ + + const processor = new BacklinksProcessor(); + await processor.perform({ name: "documents.publish", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + data: { title: document.title }, + ip, }); document.text = `First link is gone [this is a another link](${yetAnotherDocument.url})`; await document.save(); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message - await Backlinks.on({ + + await processor.perform({ name: "documents.update", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + createdAt: new Date().toISOString(), + data: { title: document.title, autosave: false, done: true }, + ip, }); const backlinks = await Backlink.findAll({ where: { @@ -166,21 +191,27 @@ describe("documents.delete", () => { const document = await buildDocument(); document.text = `[this is a link](${otherDocument.url})`; await document.save(); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message - await Backlinks.on({ + + const processor = new BacklinksProcessor(); + await processor.perform({ name: "documents.update", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + createdAt: new Date().toISOString(), + data: { title: document.title, autosave: false, done: true }, + ip, }); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.delete"; docu... Remove this comment to see the full error message - await Backlinks.on({ + + await processor.perform({ name: "documents.delete", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + data: { title: document.title }, + ip, }); const backlinks = await Backlink.findAll({ where: { @@ -201,29 +232,33 @@ describe("documents.title_change", () => { document.text = `[${otherDocument.title}](${otherDocument.url})`; await document.save(); // ensure the backlinks are created - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update"; docu... Remove this comment to see the full error message - await Backlinks.on({ + const processor = new BacklinksProcessor(); + await processor.perform({ name: "documents.update", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + createdAt: new Date().toISOString(), + data: { title: document.title, autosave: false, done: true }, + ip, }); // change the title of the linked doc otherDocument.title = newTitle; await otherDocument.save(); // does the text get updated with the new title - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.title_change"... Remove this comment to see the full error message - await Backlinks.on({ + await processor.perform({ name: "documents.title_change", documentId: otherDocument.id, collectionId: otherDocument.collectionId, teamId: otherDocument.teamId, actorId: otherDocument.createdById, + createdAt: new Date().toISOString(), data: { previousTitle, title: newTitle, }, + ip, }); await document.reload(); expect(document.text).toBe(`[${newTitle}](${otherDocument.url})`); diff --git a/server/queues/processors/backlinks.ts b/server/queues/processors/BacklinksProcessor.ts similarity index 90% rename from server/queues/processors/backlinks.ts rename to server/queues/processors/BacklinksProcessor.ts index 8c34d48c87..2626f65d80 100644 --- a/server/queues/processors/backlinks.ts +++ b/server/queues/processors/BacklinksProcessor.ts @@ -1,11 +1,21 @@ import { Op } from "sequelize"; +import { APM } from "@server/logging/tracing"; import { Document, Backlink, Team } from "@server/models"; +import { Event, DocumentEvent, RevisionEvent } from "@server/types"; import parseDocumentIds from "@server/utils/parseDocumentIds"; import slugify from "@server/utils/slugify"; -import { DocumentEvent, RevisionEvent } from "../../types"; +import BaseProcessor from "./BaseProcessor"; -export default class BacklinksProcessor { - async on(event: DocumentEvent | RevisionEvent) { +@APM.trace() +export default class BacklinksProcessor extends BaseProcessor { + static applicableEvents: Event["name"][] = [ + "documents.publish", + "documents.update", + "documents.title_change", + "documents.delete", + ]; + + async perform(event: DocumentEvent | RevisionEvent) { switch (event.name) { case "documents.publish": { const document = await Document.findByPk(event.documentId); diff --git a/server/queues/processors/BaseProcessor.ts b/server/queues/processors/BaseProcessor.ts new file mode 100644 index 0000000000..b077068243 --- /dev/null +++ b/server/queues/processors/BaseProcessor.ts @@ -0,0 +1,7 @@ +import { Event } from "@server/types"; + +export default abstract class BaseProcessor { + static applicableEvents: Event["name"][] = []; + + public abstract perform(event: Event): Promise; +} diff --git a/server/queues/processors/debouncer.ts b/server/queues/processors/DebounceProcessor.ts similarity index 72% rename from server/queues/processors/debouncer.ts rename to server/queues/processors/DebounceProcessor.ts index 466614c8bb..3f6d9d42e4 100644 --- a/server/queues/processors/debouncer.ts +++ b/server/queues/processors/DebounceProcessor.ts @@ -1,9 +1,17 @@ +import { APM } from "@server/logging/tracing"; import Document from "@server/models/Document"; -import { globalEventQueue } from "../../queues"; -import { Event } from "../../types"; +import { Event } from "@server/types"; +import { globalEventQueue } from ".."; +import BaseProcessor from "./BaseProcessor"; -export default class DebounceProcessor { - async on(event: Event) { +@APM.trace() +export default class DebounceProcessor extends BaseProcessor { + static applicableEvents: Event["name"][] = [ + "documents.update", + "documents.update.delayed", + ]; + + async perform(event: Event) { switch (event.name) { case "documents.update": { globalEventQueue.add( diff --git a/server/queues/processors/exports.ts b/server/queues/processors/ExportsProcessor.ts similarity index 81% rename from server/queues/processors/exports.ts rename to server/queues/processors/ExportsProcessor.ts index 1d239ff729..a74b71b6e0 100644 --- a/server/queues/processors/exports.ts +++ b/server/queues/processors/ExportsProcessor.ts @@ -1,14 +1,20 @@ import fs from "fs"; import invariant from "invariant"; import Logger from "@server/logging/logger"; -import mailer from "@server/mailer"; import { FileOperation, Collection, Event, Team, User } from "@server/models"; +import EmailTask from "@server/queues/tasks/EmailTask"; import { Event as TEvent } from "@server/types"; import { uploadToS3FromBuffer } from "@server/utils/s3"; import { archiveCollections } from "@server/utils/zip"; +import BaseProcessor from "./BaseProcessor"; -export default class ExportsProcessor { - async on(event: TEvent) { +export default class ExportsProcessor extends BaseProcessor { + static applicableEvents: TEvent["name"][] = [ + "collections.export", + "collections.export_all", + ]; + + async perform(event: TEvent) { switch (event.name) { case "collections.export": case "collections.export_all": { @@ -82,15 +88,21 @@ export default class ExportsProcessor { }); if (state === "error") { - mailer.sendTemplate("exportFailure", { - to: user.email, - teamUrl: team.url, + await EmailTask.schedule({ + type: "exportFailure", + options: { + to: user.email, + teamUrl: team.url, + }, }); } else { - mailer.sendTemplate("exportSuccess", { - to: user.email, - id: fileOperation.id, - teamUrl: team.url, + await EmailTask.schedule({ + type: "exportSuccess", + options: { + to: user.email, + id: fileOperation.id, + teamUrl: team.url, + }, }); } } @@ -108,7 +120,7 @@ export default class ExportsProcessor { data: Partial ) { await fileOperation.update(data); - await Event.add({ + await Event.schedule({ name: "fileOperations.update", teamId, actorId, diff --git a/server/queues/processors/imports.ts b/server/queues/processors/ImportsProcessor.ts similarity index 85% rename from server/queues/processors/imports.ts rename to server/queues/processors/ImportsProcessor.ts index 2dd35da2b2..813af09c19 100644 --- a/server/queues/processors/imports.ts +++ b/server/queues/processors/ImportsProcessor.ts @@ -4,10 +4,13 @@ import File from "formidable/lib/file"; import invariant from "invariant"; import collectionImporter from "@server/commands/collectionImporter"; import { Event, FileOperation, Attachment, User } from "@server/models"; -import { Event as TEvent } from "../../types"; +import { Event as TEvent } from "@server/types"; +import BaseProcessor from "./BaseProcessor"; -export default class ImportsProcessor { - async on(event: TEvent) { +export default class ImportsProcessor extends BaseProcessor { + static applicableEvents: TEvent["name"][] = ["collections.import"]; + + async perform(event: TEvent) { switch (event.name) { case "collections.import": { let state, error; @@ -27,7 +30,7 @@ export default class ImportsProcessor { teamId: user.teamId, }); - await Event.add({ + await Event.schedule({ name: "fileOperations.create", modelId: fileOperation.id, teamId: user.teamId, @@ -59,7 +62,7 @@ export default class ImportsProcessor { error = err.message; } finally { await fileOperation.update({ state, error }); - await Event.add({ + await Event.schedule({ name: "fileOperations.update", modelId: fileOperation.id, teamId: user.teamId, diff --git a/server/queues/processors/notifications.test.ts b/server/queues/processors/NotificationsProcessor.test.ts similarity index 79% rename from server/queues/processors/notifications.test.ts rename to server/queues/processors/NotificationsProcessor.test.ts index f6fe0a3730..90870fdd2c 100644 --- a/server/queues/processors/notifications.test.ts +++ b/server/queues/processors/NotificationsProcessor.test.ts @@ -1,16 +1,16 @@ -import mailer from "@server/mailer"; import { View, NotificationSetting } from "@server/models"; +import EmailTask from "@server/queues/tasks/EmailTask"; import { buildDocument, buildCollection, buildUser, } from "@server/test/factories"; import { flushdb } from "@server/test/support"; -import NotificationsService from "./notifications"; +import NotificationsProcessor from "./NotificationsProcessor"; -jest.mock("@server/mailer"); +jest.mock("@server/queues/tasks/EmailTask"); +const ip = "127.0.0.1"; -const Notifications = new NotificationsService(); beforeEach(() => flushdb()); beforeEach(jest.resetAllMocks); @@ -26,18 +26,20 @@ describe("documents.publish", () => { teamId: user.teamId, event: "documents.publish", }); - await Notifications.on({ + + const processor = new NotificationsProcessor(); + await processor.perform({ name: "documents.publish", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, - ip: "127.0.0.1", data: { title: document.title, }, + ip, }); - expect(mailer.documentNotification).not.toHaveBeenCalled(); + expect(EmailTask.schedule).not.toHaveBeenCalled(); }); test("should send a notification to other users in team", async () => { @@ -51,18 +53,19 @@ describe("documents.publish", () => { event: "documents.publish", }); - await Notifications.on({ + const processor = new NotificationsProcessor(); + await processor.perform({ name: "documents.publish", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, - ip: "127.0.0.1", data: { title: document.title, }, + ip, }); - expect(mailer.documentNotification).toHaveBeenCalled(); + expect(EmailTask.schedule).toHaveBeenCalled(); }); test("should not send a notification to users without collection access", async () => { @@ -80,18 +83,19 @@ describe("documents.publish", () => { teamId: user.teamId, event: "documents.publish", }); - await Notifications.on({ + const processor = new NotificationsProcessor(); + await processor.perform({ name: "documents.publish", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, - ip: "127.0.0.1", data: { title: document.title, }, + ip, }); - expect(mailer.documentNotification).not.toHaveBeenCalled(); + expect(EmailTask.schedule).not.toHaveBeenCalled(); }); }); @@ -108,13 +112,14 @@ describe("revisions.create", () => { teamId: collaborator.teamId, event: "documents.update", }); - await Notifications.on({ + const processor = new NotificationsProcessor(); + await processor.perform({ name: "revisions.create", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, }); - expect(mailer.documentNotification).toHaveBeenCalled(); + expect(EmailTask.schedule).toHaveBeenCalled(); }); test("should not send a notification if viewed since update", async () => { @@ -130,13 +135,15 @@ describe("revisions.create", () => { event: "documents.update", }); await View.touch(document.id, collaborator.id, true); - await Notifications.on({ + + const processor = new NotificationsProcessor(); + await processor.perform({ name: "revisions.create", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, }); - expect(mailer.documentNotification).not.toHaveBeenCalled(); + expect(EmailTask.schedule).not.toHaveBeenCalled(); }); test("should not send a notification to last editor", async () => { @@ -150,12 +157,13 @@ describe("revisions.create", () => { teamId: user.teamId, event: "documents.update", }); - await Notifications.on({ + const processor = new NotificationsProcessor(); + await processor.perform({ name: "revisions.create", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, }); - expect(mailer.documentNotification).not.toHaveBeenCalled(); + expect(EmailTask.schedule).not.toHaveBeenCalled(); }); }); diff --git a/server/queues/processors/notifications.ts b/server/queues/processors/NotificationsProcessor.ts similarity index 80% rename from server/queues/processors/notifications.ts rename to server/queues/processors/NotificationsProcessor.ts index a5e9fbd12e..54682c4372 100644 --- a/server/queues/processors/notifications.ts +++ b/server/queues/processors/NotificationsProcessor.ts @@ -1,6 +1,6 @@ import { Op } from "sequelize"; import Logger from "@server/logging/logger"; -import mailer from "@server/mailer"; +import { APM } from "@server/logging/tracing"; import { View, Document, @@ -15,9 +15,18 @@ import { RevisionEvent, Event, } from "@server/types"; +import EmailTask from "../tasks/EmailTask"; +import BaseProcessor from "./BaseProcessor"; -export default class NotificationsProcessor { - async on(event: Event) { +@APM.trace() +export default class NotificationsProcessor extends BaseProcessor { + static applicableEvents: Event["name"][] = [ + "documents.publish", + "revisions.create", + "collections.create", + ]; + + async perform(event: Event) { switch (event.name) { case "documents.publish": case "revisions.create": @@ -114,14 +123,17 @@ export default class NotificationsProcessor { continue; } - mailer.documentNotification({ - to: setting.user.email, - eventName, - document, - team, - collection, - actor: document.updatedBy, - unsubscribeUrl: setting.unsubscribeUrl, + await EmailTask.schedule({ + type: "documentNotification", + options: { + to: setting.user.email, + eventName, + documentId: document.id, + teamUrl: team.url, + actorName: document.updatedBy.name, + collectionName: collection.name, + unsubscribeUrl: setting.unsubscribeUrl, + }, }); } } @@ -165,12 +177,14 @@ export default class NotificationsProcessor { continue; } - mailer.collectionNotification({ - to: setting.user.email, - eventName: "created", - collection, - actor: collection.user, - unsubscribeUrl: setting.unsubscribeUrl, + await EmailTask.schedule({ + type: "collectionNotification", + options: { + to: setting.user.email, + eventName: "created", + collectionId: collection.id, + unsubscribeUrl: setting.unsubscribeUrl, + }, }); } } diff --git a/server/queues/processors/revisions.test.ts b/server/queues/processors/RevisionsProcessor.test.ts similarity index 71% rename from server/queues/processors/revisions.test.ts rename to server/queues/processors/RevisionsProcessor.test.ts index 514594b368..0c813d73f2 100644 --- a/server/queues/processors/revisions.test.ts +++ b/server/queues/processors/RevisionsProcessor.test.ts @@ -1,22 +1,27 @@ import { Revision } from "@server/models"; import { buildDocument } from "@server/test/factories"; import { flushdb } from "@server/test/support"; -import RevisionsService from "./revisions"; +import RevisionsProcessor from "./RevisionsProcessor"; + +const ip = "127.0.0.1"; -const Revisions = new RevisionsService(); beforeEach(() => flushdb()); beforeEach(jest.resetAllMocks); describe("documents.update.debounced", () => { test("should create a revision", async () => { const document = await buildDocument(); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update.deboun... Remove this comment to see the full error message - await Revisions.on({ + + const processor = new RevisionsProcessor(); + await processor.perform({ name: "documents.update.debounced", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + createdAt: new Date().toISOString(), + data: { title: document.title, autosave: false, done: true }, + ip, }); const amount = await Revision.count({ where: { @@ -29,13 +34,17 @@ describe("documents.update.debounced", () => { test("should not create a revision if identical to previous", async () => { const document = await buildDocument(); await Revision.createFromDocument(document); - // @ts-expect-error ts-migrate(2345) FIXME: Argument of type '{ name: "documents.update.deboun... Remove this comment to see the full error message - await Revisions.on({ + + const processor = new RevisionsProcessor(); + await processor.perform({ name: "documents.update.debounced", documentId: document.id, collectionId: document.collectionId, teamId: document.teamId, actorId: document.createdById, + createdAt: new Date().toISOString(), + data: { title: document.title, autosave: false, done: true }, + ip, }); const amount = await Revision.count({ where: { diff --git a/server/queues/processors/revisions.ts b/server/queues/processors/RevisionsProcessor.ts similarity index 72% rename from server/queues/processors/revisions.ts rename to server/queues/processors/RevisionsProcessor.ts index 9c0389aa44..856383b8d7 100644 --- a/server/queues/processors/revisions.ts +++ b/server/queues/processors/RevisionsProcessor.ts @@ -1,10 +1,15 @@ import invariant from "invariant"; import revisionCreator from "@server/commands/revisionCreator"; +import { APM } from "@server/logging/tracing"; import { Revision, Document, User } from "@server/models"; -import { DocumentEvent, RevisionEvent } from "../../types"; +import { DocumentEvent, RevisionEvent, Event } from "@server/types"; +import BaseProcessor from "./BaseProcessor"; -export default class RevisionsProcessor { - async on(event: DocumentEvent | RevisionEvent) { +@APM.trace() +export default class RevisionsProcessor extends BaseProcessor { + static applicableEvents: Event["name"][] = ["documents.update.debounced"]; + + async perform(event: DocumentEvent | RevisionEvent) { switch (event.name) { case "documents.update.debounced": { const document = await Document.findByPk(event.documentId); diff --git a/server/queues/processors/slack.ts b/server/queues/processors/SlackProcessor.ts similarity index 89% rename from server/queues/processors/slack.ts rename to server/queues/processors/SlackProcessor.ts index f793ee2776..a5d81f5407 100644 --- a/server/queues/processors/slack.ts +++ b/server/queues/processors/SlackProcessor.ts @@ -1,5 +1,6 @@ import fetch from "fetch-with-proxy"; import { Op } from "sequelize"; +import { APM } from "@server/logging/tracing"; import { Document, Integration, Collection, Team } from "@server/models"; import { presentSlackAttachment } from "@server/presenters"; import { @@ -7,10 +8,18 @@ import { IntegrationEvent, RevisionEvent, Event, -} from "../../types"; +} from "@server/types"; +import BaseProcessor from "./BaseProcessor"; -export default class SlackProcessor { - async on(event: Event) { +@APM.trace() +export default class SlackProcessor extends BaseProcessor { + static applicableEvents: Event["name"][] = [ + "documents.publish", + "revisions.create", + "integrations.create", + ]; + + async perform(event: Event) { switch (event.name) { case "documents.publish": case "revisions.create": diff --git a/server/queues/processors/websockets.ts b/server/queues/processors/WebsocketsProcessor.ts similarity index 99% rename from server/queues/processors/websockets.ts rename to server/queues/processors/WebsocketsProcessor.ts index 15bcf72e2e..16807e31de 100644 --- a/server/queues/processors/websockets.ts +++ b/server/queues/processors/WebsocketsProcessor.ts @@ -1,5 +1,7 @@ import { subHours } from "date-fns"; import { Op } from "sequelize"; +import { Server } from "socket.io"; +import { APM } from "@server/logging/tracing"; import { Document, Collection, @@ -17,8 +19,9 @@ import { } from "@server/presenters"; import { Event } from "../../types"; +@APM.trace() export default class WebsocketsProcessor { - async on(event: Event, socketio: any) { + async perform(event: Event, socketio: Server) { switch (event.name) { case "documents.publish": case "documents.restore": @@ -604,6 +607,7 @@ export default class WebsocketsProcessor { } default: + return; } } } diff --git a/server/queues/processors/emails.ts b/server/queues/processors/emails.ts deleted file mode 100644 index 1e2e9c7972..0000000000 --- a/server/queues/processors/emails.ts +++ /dev/null @@ -1,12 +0,0 @@ -import mailer, { EmailSendOptions, EmailTypes } from "../../mailer"; - -type EmailEvent = { - type: EmailTypes; - opts: EmailSendOptions; -}; - -export default class EmailsProcessor { - async on(event: EmailEvent) { - await mailer[event.type](event.opts); - } -} diff --git a/server/queues/processors/index.ts b/server/queues/processors/index.ts new file mode 100644 index 0000000000..fb911c4783 --- /dev/null +++ b/server/queues/processors/index.ts @@ -0,0 +1,16 @@ +import { requireDirectory } from "@server/utils/fs"; + +const processors = {}; + +requireDirectory(__dirname).forEach(([module, id]) => { + // @ts-expect-error ts-migrate(2339) FIXME: Property 'default' does not exist on type 'unknown' + const { default: Processor } = module; + + if (id === "index") { + return; + } + + processors[id] = Processor; +}); + +export default processors; diff --git a/server/queues/tasks/BaseTask.ts b/server/queues/tasks/BaseTask.ts new file mode 100644 index 0000000000..9d985121ac --- /dev/null +++ b/server/queues/tasks/BaseTask.ts @@ -0,0 +1,36 @@ +import { JobOptions } from "bull"; +import { taskQueue } from "../"; + +export enum TaskPriority { + Background = 40, + Low = 30, + Normal = 20, + High = 10, +} + +export default abstract class BaseTask { + public static schedule(props: T) { + // @ts-expect-error cannot create an instance of an abstract class, we wont + const task = new this(); + return taskQueue.add( + { + name: this.name, + props, + }, + task.options + ); + } + + public abstract perform(props: T): Promise; + + public get options(): JobOptions { + return { + priority: TaskPriority.Normal, + attempts: 5, + backoff: { + type: "exponential", + delay: 60 * 1000, + }, + }; + } +} diff --git a/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts b/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts new file mode 100644 index 0000000000..0c654d6cef --- /dev/null +++ b/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts @@ -0,0 +1,72 @@ +import { subDays } from "date-fns"; +import { Document } from "@server/models"; +import { buildDocument } from "@server/test/factories"; +import { flushdb } from "@server/test/support"; +import CleanupDeletedDocumentsTask from "./CleanupDeletedDocumentsTask"; + +beforeEach(() => flushdb()); + +describe("CleanupDeletedDocumentsTask", () => { + it("should not destroy documents not deleted", async () => { + await buildDocument({ + publishedAt: new Date(), + }); + + const task = new CleanupDeletedDocumentsTask(); + await task.perform({ limit: 100 }); + + expect( + await Document.unscoped().count({ + paranoid: false, + }) + ).toEqual(1); + }); + + it("should not destroy documents deleted less than 30 days ago", async () => { + await buildDocument({ + publishedAt: new Date(), + deletedAt: subDays(new Date(), 25), + }); + + const task = new CleanupDeletedDocumentsTask(); + await task.perform({ limit: 100 }); + + expect( + await Document.unscoped().count({ + paranoid: false, + }) + ).toEqual(1); + }); + + it("should destroy documents deleted more than 30 days ago", async () => { + await buildDocument({ + publishedAt: new Date(), + deletedAt: subDays(new Date(), 60), + }); + + const task = new CleanupDeletedDocumentsTask(); + await task.perform({ limit: 100 }); + + expect( + await Document.unscoped().count({ + paranoid: false, + }) + ).toEqual(0); + }); + + it("should destroy draft documents deleted more than 30 days ago", async () => { + await buildDocument({ + publishedAt: undefined, + deletedAt: subDays(new Date(), 60), + }); + + const task = new CleanupDeletedDocumentsTask(); + await task.perform({ limit: 100 }); + + expect( + await Document.unscoped().count({ + paranoid: false, + }) + ).toEqual(0); + }); +}); diff --git a/server/queues/tasks/CleanupDeletedDocumentsTask.ts b/server/queues/tasks/CleanupDeletedDocumentsTask.ts new file mode 100644 index 0000000000..ed54e8f79e --- /dev/null +++ b/server/queues/tasks/CleanupDeletedDocumentsTask.ts @@ -0,0 +1,40 @@ +import { subDays } from "date-fns"; +import { Op } from "sequelize"; +import documentPermanentDeleter from "@server/commands/documentPermanentDeleter"; +import Logger from "@server/logging/logger"; +import { APM } from "@server/logging/tracing"; +import { Document } from "@server/models"; +import BaseTask, { TaskPriority } from "./BaseTask"; + +type Props = { + limit: number; +}; + +@APM.trace() +export default class CleanupDeletedDocumentsTask extends BaseTask { + public async perform({ limit }: Props) { + Logger.info( + "task", + `Permanently destroying upto ${limit} documents older than 30 days…` + ); + const documents = await Document.scope("withDrafts").findAll({ + attributes: ["id", "teamId", "text", "deletedAt"], + where: { + deletedAt: { + [Op.lt]: subDays(new Date(), 30), + }, + }, + paranoid: false, + limit, + }); + const countDeletedDocument = await documentPermanentDeleter(documents); + Logger.info("task", `Destroyed ${countDeletedDocument} documents`); + } + + public get options() { + return { + attempts: 1, + priority: TaskPriority.Background, + }; + } +} diff --git a/server/queues/tasks/CleanupDeletedTeamsTask.ts b/server/queues/tasks/CleanupDeletedTeamsTask.ts new file mode 100644 index 0000000000..d874a31bbe --- /dev/null +++ b/server/queues/tasks/CleanupDeletedTeamsTask.ts @@ -0,0 +1,42 @@ +import { subDays } from "date-fns"; +import { Op } from "sequelize"; +import teamPermanentDeleter from "@server/commands/teamPermanentDeleter"; +import Logger from "@server/logging/logger"; +import { APM } from "@server/logging/tracing"; +import { Team } from "@server/models"; +import BaseTask, { TaskPriority } from "./BaseTask"; + +type Props = { + limit: number; +}; + +@APM.trace() +export default class CleanupDeletedTeamsTask extends BaseTask { + public async perform({ limit }: Props) { + Logger.info( + "task", + `Permanently destroying upto ${limit} teams older than 30 days…` + ); + const teams = await Team.findAll({ + where: { + deletedAt: { + [Op.lt]: subDays(new Date(), 30), + }, + }, + paranoid: false, + limit, + }); + + for (const team of teams) { + await teamPermanentDeleter(team); + } + Logger.info("task", `Destroyed ${teams.length} teams`); + } + + public get options() { + return { + attempts: 1, + priority: TaskPriority.Background, + }; + } +} diff --git a/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts b/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts new file mode 100644 index 0000000000..e43ecdb47d --- /dev/null +++ b/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts @@ -0,0 +1,56 @@ +import { subDays } from "date-fns"; +import { FileOperation } from "@server/models"; +import { buildFileOperation } from "@server/test/factories"; +import { flushdb } from "@server/test/support"; +import CleanupExpiredFileOperationsTask from "./CleanupExpiredFileOperationsTask"; + +beforeEach(() => flushdb()); + +describe("CleanupExpiredFileOperationsTask", () => { + it("should expire exports older than 30 days ago", async () => { + await buildFileOperation({ + type: "export", + state: "complete", + createdAt: subDays(new Date(), 30), + }); + await buildFileOperation({ + type: "export", + state: "complete", + }); + + /* This is a test helper that creates a new task and runs it. */ + const task = new CleanupExpiredFileOperationsTask(); + await task.perform({ limit: 100 }); + + const data = await FileOperation.count({ + where: { + type: "export", + state: "expired", + }, + }); + expect(data).toEqual(1); + }); + + it("should not expire exports made less than 30 days ago", async () => { + await buildFileOperation({ + type: "export", + state: "complete", + createdAt: subDays(new Date(), 29), + }); + await buildFileOperation({ + type: "export", + state: "complete", + }); + + const task = new CleanupExpiredFileOperationsTask(); + await task.perform({ limit: 100 }); + + const data = await FileOperation.count({ + where: { + type: "export", + state: "expired", + }, + }); + expect(data).toEqual(0); + }); +}); diff --git a/server/queues/tasks/CleanupExpiredFileOperationsTask.ts b/server/queues/tasks/CleanupExpiredFileOperationsTask.ts new file mode 100644 index 0000000000..7544f22e88 --- /dev/null +++ b/server/queues/tasks/CleanupExpiredFileOperationsTask.ts @@ -0,0 +1,40 @@ +import { subDays } from "date-fns"; +import { Op } from "sequelize"; +import Logger from "@server/logging/logger"; +import { APM } from "@server/logging/tracing"; +import { FileOperation } from "@server/models"; +import BaseTask, { TaskPriority } from "./BaseTask"; + +type Props = { + limit: number; +}; + +@APM.trace() +export default class CleanupExpiredFileOperationsTask extends BaseTask { + public async perform({ limit }: Props) { + Logger.info("task", `Expiring export file operations older than 30 days…`); + const fileOperations = await FileOperation.unscoped().findAll({ + where: { + type: "export", + createdAt: { + [Op.lt]: subDays(new Date(), 30), + }, + state: { + [Op.ne]: "expired", + }, + }, + limit, + }); + await Promise.all( + fileOperations.map((fileOperation) => fileOperation.expire()) + ); + Logger.info("task", `Expired ${fileOperations.length} file operations`); + } + + public get options() { + return { + attempts: 1, + priority: TaskPriority.Background, + }; + } +} diff --git a/server/queues/tasks/EmailTask.ts b/server/queues/tasks/EmailTask.ts new file mode 100644 index 0000000000..362ca0ce03 --- /dev/null +++ b/server/queues/tasks/EmailTask.ts @@ -0,0 +1,15 @@ +import { APM } from "@server/logging/tracing"; +import mailer, { EmailSendOptions, EmailTypes } from "../../mailer"; +import BaseTask from "./BaseTask"; + +type Props = { + type: EmailTypes; + options: EmailSendOptions; +}; + +@APM.trace() +export default class EmailTask extends BaseTask { + public async perform(props: Props) { + await mailer[props.type](props.options); + } +} diff --git a/server/queues/tasks/index.ts b/server/queues/tasks/index.ts new file mode 100644 index 0000000000..96e8040ee9 --- /dev/null +++ b/server/queues/tasks/index.ts @@ -0,0 +1,16 @@ +import { requireDirectory } from "@server/utils/fs"; + +const tasks = {}; + +requireDirectory(__dirname).forEach(([module, id]) => { + // @ts-expect-error ts-migrate(2339) FIXME: Property 'default' does not exist on type 'unknown' + const { default: Task } = module; + + if (id === "index") { + return; + } + + tasks[id] = Task; +}); + +export default tasks; diff --git a/server/routes/api/documents.ts b/server/routes/api/documents.ts index e381cc5817..25d3e8b810 100644 --- a/server/routes/api/documents.ts +++ b/server/routes/api/documents.ts @@ -1024,7 +1024,7 @@ router.post("documents.update", auth(), async (ctx) => { } if (document.title !== previousTitle) { - Event.add({ + Event.schedule({ name: "documents.title_change", documentId: document.id, collectionId: document.collectionId, diff --git a/server/routes/api/utils.test.ts b/server/routes/api/utils.test.ts index 6b2b8c4692..7376e5a53b 100644 --- a/server/routes/api/utils.test.ts +++ b/server/routes/api/utils.test.ts @@ -1,8 +1,5 @@ -import { subDays } from "date-fns"; import TestServer from "fetch-test-server"; -import { Document, FileOperation } from "@server/models"; import webService from "@server/services/web"; -import { buildDocument, buildFileOperation } from "@server/test/factories"; import { flushdb } from "@server/test/support"; const app = webService(); @@ -12,127 +9,6 @@ beforeEach(() => flushdb()); afterAll(() => server.close()); describe("#utils.gc", () => { - it("should not destroy documents not deleted", async () => { - await buildDocument({ - publishedAt: new Date(), - }); - const res = await server.post("/api/utils.gc", { - body: { - token: process.env.UTILS_SECRET, - }, - }); - expect(res.status).toEqual(200); - expect( - await Document.unscoped().count({ - paranoid: false, - }) - ).toEqual(1); - }); - - it("should not destroy documents deleted less than 30 days ago", async () => { - await buildDocument({ - publishedAt: new Date(), - deletedAt: subDays(new Date(), 25), - }); - const res = await server.post("/api/utils.gc", { - body: { - token: process.env.UTILS_SECRET, - }, - }); - expect(res.status).toEqual(200); - expect( - await Document.unscoped().count({ - paranoid: false, - }) - ).toEqual(1); - }); - - it("should destroy documents deleted more than 30 days ago", async () => { - await buildDocument({ - publishedAt: new Date(), - deletedAt: subDays(new Date(), 60), - }); - const res = await server.post("/api/utils.gc", { - body: { - token: process.env.UTILS_SECRET, - }, - }); - expect(res.status).toEqual(200); - expect( - await Document.unscoped().count({ - paranoid: false, - }) - ).toEqual(0); - }); - - it("should destroy draft documents deleted more than 30 days ago", async () => { - await buildDocument({ - publishedAt: undefined, - deletedAt: subDays(new Date(), 60), - }); - const res = await server.post("/api/utils.gc", { - body: { - token: process.env.UTILS_SECRET, - }, - }); - expect(res.status).toEqual(200); - expect( - await Document.unscoped().count({ - paranoid: false, - }) - ).toEqual(0); - }); - - it("should expire exports older than 30 days ago", async () => { - await buildFileOperation({ - type: "export", - state: "complete", - createdAt: subDays(new Date(), 30), - }); - await buildFileOperation({ - type: "export", - state: "complete", - }); - const res = await server.post("/api/utils.gc", { - body: { - token: process.env.UTILS_SECRET, - }, - }); - const data = await FileOperation.count({ - where: { - type: "export", - state: "expired", - }, - }); - expect(res.status).toEqual(200); - expect(data).toEqual(1); - }); - - it("should not expire exports made less than 30 days ago", async () => { - await buildFileOperation({ - type: "export", - state: "complete", - createdAt: subDays(new Date(), 29), - }); - await buildFileOperation({ - type: "export", - state: "complete", - }); - const res = await server.post("/api/utils.gc", { - body: { - token: process.env.UTILS_SECRET, - }, - }); - const data = await FileOperation.count({ - where: { - type: "export", - state: "expired", - }, - }); - expect(res.status).toEqual(200); - expect(data).toEqual(0); - }); - it("should require authentication", async () => { const res = await server.post("/api/utils.gc"); expect(res.status).toEqual(401); diff --git a/server/routes/api/utils.ts b/server/routes/api/utils.ts index ccb9a9c080..2ad06fffb9 100644 --- a/server/routes/api/utils.ts +++ b/server/routes/api/utils.ts @@ -1,11 +1,8 @@ -import { subDays } from "date-fns"; import Router from "koa-router"; -import { Op } from "sequelize"; -import documentPermanentDeleter from "@server/commands/documentPermanentDeleter"; -import teamPermanentDeleter from "@server/commands/teamPermanentDeleter"; import { AuthenticationError } from "@server/errors"; -import { Document, Team, FileOperation } from "@server/models"; -import Logger from "../../logging/logger"; +import CleanupDeletedDocumentsTask from "@server/queues/tasks/CleanupDeletedDocumentsTask"; +import CleanupDeletedTeamsTask from "@server/queues/tasks/CleanupDeletedTeamsTask"; +import CleanupExpiredFileOperationsTask from "@server/queues/tasks/CleanupExpiredFileOperationsTask"; const router = new Router(); @@ -16,59 +13,11 @@ router.post("utils.gc", async (ctx) => { throw AuthenticationError("Invalid secret token"); } - Logger.info( - "utils", - `Permanently destroying upto ${limit} documents older than 30 days…` - ); - const documents = await Document.scope("withDrafts").findAll({ - attributes: ["id", "teamId", "text", "deletedAt"], - where: { - deletedAt: { - [Op.lt]: subDays(new Date(), 30), - }, - }, - paranoid: false, - limit, - }); - const countDeletedDocument = await documentPermanentDeleter(documents); - Logger.info("utils", `Destroyed ${countDeletedDocument} documents`); - Logger.info( - "utils", - `Expiring all the collection export older than 30 days…` - ); - const exports = await FileOperation.unscoped().findAll({ - where: { - type: "export", - createdAt: { - [Op.lt]: subDays(new Date(), 30), - }, - state: { - [Op.ne]: "expired", - }, - }, - }); - await Promise.all( - exports.map(async (e) => { - await e.expire(); - }) - ); - Logger.info( - "utils", - `Permanently destroying upto ${limit} teams older than 30 days…` - ); - const teams = await Team.findAll({ - where: { - deletedAt: { - [Op.lt]: subDays(new Date(), 30), - }, - }, - paranoid: false, - limit, - }); + await CleanupDeletedDocumentsTask.schedule({ limit }); - for (const team of teams) { - await teamPermanentDeleter(team); - } + await CleanupExpiredFileOperationsTask.schedule({ limit }); + + await CleanupDeletedTeamsTask.schedule({ limit }); ctx.body = { success: true, diff --git a/server/routes/auth/providers/email.test.ts b/server/routes/auth/providers/email.test.ts index 52cd77b9c4..5e151a5cd9 100644 --- a/server/routes/auth/providers/email.test.ts +++ b/server/routes/auth/providers/email.test.ts @@ -1,5 +1,5 @@ import TestServer from "fetch-test-server"; -import mailer from "@server/mailer"; +import EmailTask from "@server/queues/tasks/EmailTask"; import webService from "@server/services/web"; import { buildUser, buildGuestUser, buildTeam } from "@server/test/factories"; import { flushdb } from "@server/test/support"; @@ -24,7 +24,7 @@ describe("email", () => { }); it("should respond with redirect location when user is SSO enabled", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); const user = await buildUser(); const res = await server.post("/auth/email", { body: { @@ -42,7 +42,7 @@ describe("email", () => { process.env.URL = "http://localoutline.com"; process.env.SUBDOMAINS_ENABLED = "true"; const user = await buildUser(); - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); await buildTeam({ subdomain: "example", }); @@ -62,7 +62,7 @@ describe("email", () => { }); it("should respond with success when user is not SSO enabled", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); const user = await buildGuestUser(); const res = await server.post("/auth/email", { body: { @@ -77,7 +77,7 @@ describe("email", () => { }); it("should respond with success regardless of whether successful to prevent crawling email logins", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); const res = await server.post("/auth/email", { body: { email: "user@example.com", @@ -91,7 +91,7 @@ describe("email", () => { }); describe("with multiple users matching email", () => { it("should default to current subdomain with SSO", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); process.env.URL = "http://localoutline.com"; process.env.SUBDOMAINS_ENABLED = "true"; const email = "sso-user@example.org"; @@ -121,7 +121,7 @@ describe("email", () => { }); it("should default to current subdomain with guest email", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); process.env.URL = "http://localoutline.com"; process.env.SUBDOMAINS_ENABLED = "true"; const email = "guest-user@example.org"; @@ -151,7 +151,7 @@ describe("email", () => { }); it("should default to custom domain with SSO", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); const email = "sso-user-2@example.org"; const team = await buildTeam({ domain: "docs.mycompany.com", @@ -179,7 +179,7 @@ describe("email", () => { }); it("should default to custom domain with guest email", async () => { - const spy = jest.spyOn(mailer, "sendTemplate"); + const spy = jest.spyOn(EmailTask, "schedule"); const email = "guest-user-2@example.org"; const team = await buildTeam({ domain: "docs.mycompany.com", diff --git a/server/routes/auth/providers/email.ts b/server/routes/auth/providers/email.ts index 5a5383cc9e..4ce575a872 100644 --- a/server/routes/auth/providers/email.ts +++ b/server/routes/auth/providers/email.ts @@ -3,10 +3,10 @@ import Router from "koa-router"; import { find } from "lodash"; import { parseDomain, isCustomSubdomain } from "@shared/utils/domains"; import { AuthorizationError } from "@server/errors"; -import mailer from "@server/mailer"; import errorHandling from "@server/middlewares/errorHandling"; import methodOverride from "@server/middlewares/methodOverride"; import { User, Team } from "@server/models"; +import EmailTask from "@server/queues/tasks/EmailTask"; import { signIn } from "@server/utils/authentication"; import { isCustomDomain } from "@server/utils/domains"; import { getUserForEmailSigninToken } from "@server/utils/jwt"; @@ -110,10 +110,13 @@ router.post("email", errorHandling(), async (ctx) => { } // send email to users registered address with a short-lived token - await mailer.sendTemplate("signin", { - to: user.email, - token: user.getEmailSigninToken(), - teamUrl: team.url, + await EmailTask.schedule({ + type: "signin", + options: { + to: user.email, + token: user.getEmailSigninToken(), + teamUrl: team.url, + }, }); user.lastSigninEmailSentAt = new Date(); await user.save(); @@ -147,9 +150,12 @@ router.get("email.callback", async (ctx) => { } if (user.isInvited) { - await mailer.sendTemplate("welcome", { - to: user.email, - teamUrl: user.team.url, + await EmailTask.schedule({ + type: "welcome", + options: { + to: user.email, + teamUrl: user.team.url, + }, }); } diff --git a/server/services/admin.ts b/server/services/admin.ts index 189069781c..182d414613 100644 --- a/server/services/admin.ts +++ b/server/services/admin.ts @@ -3,10 +3,10 @@ import { BullAdapter } from "@bull-board/api/bullAdapter"; import { KoaAdapter } from "@bull-board/koa"; import Koa from "koa"; import { - emailsQueue, globalEventQueue, processorEventQueue, - websocketsQueue, + websocketQueue, + taskQueue, } from "../queues"; export default function init(app: Koa) { @@ -15,8 +15,8 @@ export default function init(app: Koa) { queues: [ new BullAdapter(globalEventQueue), new BullAdapter(processorEventQueue), - new BullAdapter(emailsQueue), - new BullAdapter(websocketsQueue), + new BullAdapter(websocketQueue), + new BullAdapter(taskQueue), ], serverAdapter, }); diff --git a/server/services/websockets.ts b/server/services/websockets.ts index b3069bc2df..52228de852 100644 --- a/server/services/websockets.ts +++ b/server/services/websockets.ts @@ -9,8 +9,8 @@ import Metrics from "@server/logging/metrics"; import { Document, Collection, View } from "@server/models"; import { can } from "@server/policies"; import { getUserForJWT } from "@server/utils/jwt"; -import { websocketsQueue } from "../queues"; -import WebsocketsProcessor from "../queues/processors/websockets"; +import { websocketQueue } from "../queues"; +import WebsocketsProcessor from "../queues/processors/WebsocketsProcessor"; import { client, subscriber } from "../redis"; export default function init(app: Koa, server: http.Server) { @@ -247,9 +247,9 @@ export default function init(app: Koa, server: http.Server) { // Handle events from event queue that should be sent to the clients down ws const websockets = new WebsocketsProcessor(); - websocketsQueue.process(async function websocketEventsProcessor(job) { + websocketQueue.process(async function websocketEventsProcessor(job) { const event = job.data; - websockets.on(event, io).catch((error) => { + websockets.perform(event, io).catch((error) => { Logger.error("Error processing websocket event", error, { event, }); diff --git a/server/services/worker.ts b/server/services/worker.ts index 7040f45034..3473bf0f94 100644 --- a/server/services/worker.ts +++ b/server/services/worker.ts @@ -2,68 +2,77 @@ import Logger from "@server/logging/logger"; import { globalEventQueue, processorEventQueue, - websocketsQueue, - emailsQueue, + websocketQueue, + taskQueue, } from "../queues"; -import Backlinks from "../queues/processors/backlinks"; -import Debouncer from "../queues/processors/debouncer"; -import Emails from "../queues/processors/emails"; -import Exports from "../queues/processors/exports"; -import Imports from "../queues/processors/imports"; -import Notifications from "../queues/processors/notifications"; -import Revisions from "../queues/processors/revisions"; -import Slack from "../queues/processors/slack"; - -const EmailsProcessor = new Emails(); -const eventProcessors = { - backlinks: new Backlinks(), - debouncer: new Debouncer(), - imports: new Imports(), - exports: new Exports(), - notifications: new Notifications(), - revisions: new Revisions(), - slack: new Slack(), -}; +import processors from "../queues/processors"; +import tasks from "../queues/tasks"; export default function init() { - // this queue processes global events and hands them off to services + // This queue processes the global event bus globalEventQueue.process(function (job) { - Object.keys(eventProcessors).forEach((name) => { - processorEventQueue.add({ ...job.data, service: name }); - }); - websocketsQueue.add(job.data); - }); - processorEventQueue.process(function (job) { const event = job.data; - const processor = eventProcessors[event.service]; - if (!processor) { - Logger.warn(`Received event for processor that isn't registered`, event); - return; + // For each registered processor we check to see if it wants to handle the + // event (applicableEvents), and if so add a new queued job specifically + // for that processor. + for (const name in processors) { + const ProcessorClass = processors[name]; + + if (name === "WebsocketsProcessor") { + // websockets are a special case on their own queue because they must + // only be consumed by the websockets service rather than workers. + websocketQueue.add(job.data); + } else if ( + ProcessorClass.applicableEvents.length === 0 || + ProcessorClass.applicableEvents.includes(event.name) + ) { + processorEventQueue.add({ event, name }); + } + } + }); + + // Jobs for individual processors are processed here. Only applicable events + // as unapplicable events were filtered in the global event queue above. + processorEventQueue.process(function (job) { + const { event, name } = job.data; + const ProcessorClass = processors[name]; + + if (!ProcessorClass) { + throw new Error( + `Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.` + ); } - if (processor.on) { - Logger.info("processor", `${event.service} processing ${event.name}`, { + const processor = new ProcessorClass(); + + if (processor.perform) { + Logger.info("processor", `${name} processing ${event.name}`, { name: event.name, modelId: event.modelId, }); - processor.on(event).catch((error: Error) => { - Logger.error( - `Error processing ${event.name} in ${event.service}`, - error, - event - ); + processor.perform(event).catch((error: Error) => { + Logger.error(`Error processing ${event.name} in ${name}`, error, event); }); } }); - emailsQueue.process(function (job) { - const event = job.data; - EmailsProcessor.on(event).catch((error) => { - Logger.error( - `Error processing ${event.name} in emails processor`, - error, - event + + // Jobs for async tasks are processed here. + taskQueue.process(function (job) { + const { name, props } = job.data; + const TaskClass = tasks[name]; + + if (!TaskClass) { + throw new Error( + `Task "${name}" is not registered. Check the file name matches the class name.` ); + } + + Logger.info("task", `${name} triggered`, props); + + const task = new TaskClass(); + task.perform(props).catch((error: Error) => { + Logger.error(`Error processing task in ${name}`, error, props); }); }); }