From 42959d66db17a39658cdfddbad20391794b50b7e Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Thu, 27 Nov 2025 16:57:52 +0100 Subject: [PATCH] chore: Add cron task partitioning (#10736) * wip * Implementation complete * tidying * test * Address feedback * Remove duplicative retry logic from UpdateDocumentsPopularityScoreTask. Now that we're split across many runs this is not neccessary * Refactor to subclass, config to instance * Refactor BaseTask to named export * fix: Missing partition * tsc * Feedback --- .../github/server/tasks/GitHubWebhookTask.ts | 2 +- .../tasks/UploadLinearWorkspaceLogoTask.ts | 2 +- .../CleanupWebhookDeliveriesTask.test.ts | 8 +- .../tasks/CleanupWebhookDeliveriesTask.ts | 27 +- .../server/tasks/DeliverWebhookTask.ts | 2 +- server/emails/templates/BaseEmail.tsx | 2 +- server/queues/tasks/APIImportTask.ts | 2 +- server/queues/tasks/CacheIssueSourcesTask.ts | 2 +- .../tasks/CleanupDeletedDocumentsTask.test.ts | 16 +- .../tasks/CleanupDeletedDocumentsTask.ts | 22 +- server/queues/tasks/CleanupDeletedTeamTask.ts | 2 +- .../queues/tasks/CleanupDeletedTeamsTask.ts | 22 +- server/queues/tasks/CleanupDemotedUserTask.ts | 2 +- .../tasks/CleanupExpiredAttachmentsTask.ts | 17 +- .../CleanupExpiredFileOperationsTask.test.ts | 12 +- .../tasks/CleanupExpiredFileOperationsTask.ts | 17 +- .../CleanupOAuthAuthorizationCodeTask.ts | 15 +- server/queues/tasks/CleanupOldEventsTask.ts | 23 +- server/queues/tasks/CleanupOldImportsTask.ts | 15 +- .../tasks/CleanupOldNotificationsTask.ts | 21 +- .../CollectionAddUserNotificationsTask.ts | 2 +- .../CollectionCreatedNotificationsTask.ts | 2 +- .../CollectionSubscriptionRemoveUserTask.ts | 2 +- .../tasks/CommentCreatedNotificationsTask.ts | 2 +- .../tasks/CommentUpdatedNotificationsTask.ts | 2 +- server/queues/tasks/DeleteAttachmentTask.ts | 2 +- .../tasks/DetachDraftsFromCollectionTask.ts | 2 +- .../DocumentAddGroupNotificationsTask.ts | 2 +- .../tasks/DocumentAddUserNotificationsTask.ts | 2 +- server/queues/tasks/DocumentImportTask.ts | 2 +- .../DocumentPublishedNotificationsTask.ts | 2 +- .../DocumentSubscriptionRemoveUserTask.ts | 2 +- server/queues/tasks/DocumentUpdateTextTask.ts | 2 +- server/queues/tasks/EmailTask.ts | 2 +- server/queues/tasks/EmptyTrashTask.ts | 2 +- .../ErrorTimedOutFileOperationsTask.test.ts | 12 +- .../tasks/ErrorTimedOutFileOperationsTask.ts | 17 +- .../queues/tasks/ErrorTimedOutImportsTask.ts | 17 +- server/queues/tasks/ExportTask.ts | 2 +- server/queues/tasks/ImportTask.ts | 2 +- server/queues/tasks/InviteReminderTask.ts | 15 +- .../tasks/ReactionCreatedNotificationsTask.ts | 2 +- .../tasks/ReactionRemovedNotificationsTask.ts | 2 +- .../tasks/RevisionCreatedNotificationsTask.ts | 2 +- .../UpdateDocumentsPopularityScoreTask.ts | 122 ++++--- .../tasks/UpdateTeamAttachmentsSizeTask.ts | 2 +- .../tasks/UpdateTeamsAttachmentsSizeTask.ts | 13 +- .../tasks/UploadAttachmentFromUrlTask.ts | 2 +- .../tasks/UploadAttachmentsForImportTask.ts | 2 +- server/queues/tasks/UploadTeamAvatarTask.ts | 2 +- server/queues/tasks/UploadUserAvatarTask.ts | 2 +- server/queues/tasks/ValidateSSOAccessTask.ts | 2 +- server/queues/tasks/{ => base}/BaseTask.ts | 17 +- server/queues/tasks/base/CronTask.test.ts | 297 ++++++++++++++++++ server/queues/tasks/base/CronTask.ts | 163 ++++++++++ server/queues/tasks/index.ts | 2 +- server/routes/api/cron/cron.ts | 79 +++-- server/services/cron.ts | 33 +- server/utils/PluginManager.ts | 2 +- 59 files changed, 811 insertions(+), 261 deletions(-) rename server/queues/tasks/{ => base}/BaseTask.ts (78%) create mode 100644 server/queues/tasks/base/CronTask.test.ts create mode 100644 server/queues/tasks/base/CronTask.ts diff --git a/plugins/github/server/tasks/GitHubWebhookTask.ts b/plugins/github/server/tasks/GitHubWebhookTask.ts index 61e710f147..912413abae 100644 --- a/plugins/github/server/tasks/GitHubWebhookTask.ts +++ b/plugins/github/server/tasks/GitHubWebhookTask.ts @@ -1,5 +1,5 @@ import { IntegrationService } from "@shared/types"; -import BaseTask from "@server/queues/tasks/BaseTask"; +import { BaseTask } from "@server/queues/tasks/base/BaseTask"; import { Hook, PluginManager } from "@server/utils/PluginManager"; type Props = { diff --git a/plugins/linear/server/tasks/UploadLinearWorkspaceLogoTask.ts b/plugins/linear/server/tasks/UploadLinearWorkspaceLogoTask.ts index a0a815ef2d..63d1067dbf 100644 --- a/plugins/linear/server/tasks/UploadLinearWorkspaceLogoTask.ts +++ b/plugins/linear/server/tasks/UploadLinearWorkspaceLogoTask.ts @@ -1,7 +1,7 @@ import { IntegrationService, IntegrationType } from "@shared/types"; import { Integration } from "@server/models"; import { Buckets } from "@server/models/helpers/AttachmentHelper"; -import BaseTask, { TaskPriority } from "@server/queues/tasks/BaseTask"; +import { BaseTask, TaskPriority } from "@server/queues/tasks/base/BaseTask"; import FileStorage from "@server/storage/files"; type Props = { diff --git a/plugins/webhooks/server/tasks/CleanupWebhookDeliveriesTask.test.ts b/plugins/webhooks/server/tasks/CleanupWebhookDeliveriesTask.test.ts index a66fc13fa4..798a2bea01 100644 --- a/plugins/webhooks/server/tasks/CleanupWebhookDeliveriesTask.test.ts +++ b/plugins/webhooks/server/tasks/CleanupWebhookDeliveriesTask.test.ts @@ -21,7 +21,13 @@ describe("CleanupWebookDeliveriesTask", () => { }); const task = new CleanupWebhookDeliveriesTask(); - await task.perform(); + await task.perform({ + limit: 100, + partition: { + partitionIndex: 0, + partitionCount: 1, + }, + }); expect(await deliveryExists(brandNewWebhookDelivery)).toBe(true); expect(await deliveryExists(newishWebhookDelivery)).toBe(true); diff --git a/plugins/webhooks/server/tasks/CleanupWebhookDeliveriesTask.ts b/plugins/webhooks/server/tasks/CleanupWebhookDeliveriesTask.ts index 46657afbd3..e08f2fab4a 100644 --- a/plugins/webhooks/server/tasks/CleanupWebhookDeliveriesTask.ts +++ b/plugins/webhooks/server/tasks/CleanupWebhookDeliveriesTask.ts @@ -2,28 +2,35 @@ import { subDays } from "date-fns"; import { Op } from "sequelize"; import Logger from "@server/logging/Logger"; import { WebhookDelivery } from "@server/models"; -import BaseTask, { - TaskPriority, - TaskSchedule, -} from "@server/queues/tasks/BaseTask"; +import { TaskPriority } from "@server/queues/tasks/base/BaseTask"; +import { + CronTask, + TaskInterval, + Props, +} from "@server/queues/tasks/base/CronTask"; +import { Hour } from "@shared/utils/time"; -type Props = Record; - -export default class CleanupWebhookDeliveriesTask extends BaseTask { - static cron = TaskSchedule.Day; - - public async perform() { +export default class CleanupWebhookDeliveriesTask extends CronTask { + public async perform({ partition }: Props) { Logger.info("task", `Deleting WebhookDeliveries older than one week…`); const count = await WebhookDelivery.unscoped().destroy({ where: { createdAt: { [Op.lt]: subDays(new Date(), 7), }, + ...this.getPartitionWhereClause("id", partition), }, }); Logger.info("task", `${count} old WebhookDeliveries deleted.`); } + public get cron() { + return { + interval: TaskInterval.Day, + partitionWindow: Hour.ms, + }; + } + public get options() { return { attempts: 1, diff --git a/plugins/webhooks/server/tasks/DeliverWebhookTask.ts b/plugins/webhooks/server/tasks/DeliverWebhookTask.ts index 911bed660b..f9c9395fcc 100644 --- a/plugins/webhooks/server/tasks/DeliverWebhookTask.ts +++ b/plugins/webhooks/server/tasks/DeliverWebhookTask.ts @@ -42,7 +42,7 @@ import { presentGroupMembership, presentComment, } from "@server/presenters"; -import BaseTask from "@server/queues/tasks/BaseTask"; +import { BaseTask } from "@server/queues/tasks/base/BaseTask"; import { CollectionEvent, CollectionGroupEvent, diff --git a/server/emails/templates/BaseEmail.tsx b/server/emails/templates/BaseEmail.tsx index 6f869ae468..88816ac7ab 100644 --- a/server/emails/templates/BaseEmail.tsx +++ b/server/emails/templates/BaseEmail.tsx @@ -16,7 +16,7 @@ import HTMLHelper from "@server/models/helpers/HTMLHelper"; import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper"; import { TextHelper } from "@server/models/helpers/TextHelper"; import { taskQueue } from "@server/queues"; -import { TaskPriority } from "@server/queues/tasks/BaseTask"; +import { TaskPriority } from "@server/queues/tasks/base/BaseTask"; import { NotificationMetadata } from "@server/types"; export enum EmailMessageCategory { diff --git a/server/queues/tasks/APIImportTask.ts b/server/queues/tasks/APIImportTask.ts index 7294aedec0..600ef9c30a 100644 --- a/server/queues/tasks/APIImportTask.ts +++ b/server/queues/tasks/APIImportTask.ts @@ -23,7 +23,7 @@ import AttachmentHelper from "@server/models/helpers/AttachmentHelper"; import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper"; import { sequelize } from "@server/storage/database"; import { PagePerImportTask } from "../processors/ImportsProcessor"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; import UploadAttachmentsForImportTask from "./UploadAttachmentsForImportTask"; export type ProcessOutput = { diff --git a/server/queues/tasks/CacheIssueSourcesTask.ts b/server/queues/tasks/CacheIssueSourcesTask.ts index b824a9f3b6..8bf1553bcc 100644 --- a/server/queues/tasks/CacheIssueSourcesTask.ts +++ b/server/queues/tasks/CacheIssueSourcesTask.ts @@ -1,7 +1,7 @@ import { Integration } from "@server/models"; import { sequelize } from "@server/storage/database"; import { Hook, PluginManager } from "@server/utils/PluginManager"; -import BaseTask from "./BaseTask"; +import { BaseTask } from "./base/BaseTask"; type Props = { integrationId: string; diff --git a/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts b/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts index d67096cc39..51f1eb2f6f 100644 --- a/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts +++ b/server/queues/tasks/CleanupDeletedDocumentsTask.test.ts @@ -3,6 +3,14 @@ import { Document } from "@server/models"; import { buildDocument, buildTeam } from "@server/test/factories"; import CleanupDeletedDocumentsTask from "./CleanupDeletedDocumentsTask"; +const props = { + limit: 100, + partition: { + partitionIndex: 0, + partitionCount: 1, + }, +}; + describe("CleanupDeletedDocumentsTask", () => { it("should not destroy documents not deleted", async () => { const team = await buildTeam(); @@ -12,7 +20,7 @@ describe("CleanupDeletedDocumentsTask", () => { }); const task = new CleanupDeletedDocumentsTask(); - await task.perform({ limit: 100 }); + await task.perform(props); expect( await Document.unscoped().count({ @@ -33,7 +41,7 @@ describe("CleanupDeletedDocumentsTask", () => { }); const task = new CleanupDeletedDocumentsTask(); - await task.perform({ limit: 100 }); + await task.perform(props); expect( await Document.unscoped().count({ @@ -54,7 +62,7 @@ describe("CleanupDeletedDocumentsTask", () => { }); const task = new CleanupDeletedDocumentsTask(); - await task.perform({ limit: 100 }); + await task.perform(props); expect( await Document.unscoped().count({ @@ -75,7 +83,7 @@ describe("CleanupDeletedDocumentsTask", () => { }); const task = new CleanupDeletedDocumentsTask(); - await task.perform({ limit: 100 }); + await task.perform(props); expect( await Document.unscoped().count({ diff --git a/server/queues/tasks/CleanupDeletedDocumentsTask.ts b/server/queues/tasks/CleanupDeletedDocumentsTask.ts index 42c6b8d486..e7451088dc 100644 --- a/server/queues/tasks/CleanupDeletedDocumentsTask.ts +++ b/server/queues/tasks/CleanupDeletedDocumentsTask.ts @@ -3,16 +3,12 @@ import { Op } from "sequelize"; import documentPermanentDeleter from "@server/commands/documentPermanentDeleter"; import Logger from "@server/logging/Logger"; import { Document } from "@server/models"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; +import { TaskPriority } from "./base/BaseTask"; +import { Minute } from "@shared/utils/time"; +import { CronTask, Props, TaskInterval } from "./base/CronTask"; -type Props = { - limit: number; -}; - -export default class CleanupDeletedDocumentsTask extends BaseTask { - static cron = TaskSchedule.Hour; - - public async perform({ limit }: Props) { +export default class CleanupDeletedDocumentsTask extends CronTask { + public async perform({ limit, partition }: Props) { Logger.info( "task", `Permanently destroying upto ${limit} documents older than 30 days…` @@ -25,6 +21,7 @@ export default class CleanupDeletedDocumentsTask extends BaseTask { deletedAt: { [Op.lt]: subDays(new Date(), 30), }, + ...this.getPartitionWhereClause("id", partition), }, paranoid: false, limit, @@ -39,4 +36,11 @@ export default class CleanupDeletedDocumentsTask extends BaseTask { priority: TaskPriority.Background, }; } + + public get cron() { + return { + interval: TaskInterval.Hour, + partitionWindow: 15 * Minute.ms, + }; + } } diff --git a/server/queues/tasks/CleanupDeletedTeamTask.ts b/server/queues/tasks/CleanupDeletedTeamTask.ts index 540a2c022c..bc635f558d 100644 --- a/server/queues/tasks/CleanupDeletedTeamTask.ts +++ b/server/queues/tasks/CleanupDeletedTeamTask.ts @@ -1,6 +1,6 @@ import teamPermanentDeleter from "@server/commands/teamPermanentDeleter"; import { Team } from "@server/models"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; type Props = { /** The team ID to permanantly destroy */ diff --git a/server/queues/tasks/CleanupDeletedTeamsTask.ts b/server/queues/tasks/CleanupDeletedTeamsTask.ts index ed27c066d3..ca057b4ab2 100644 --- a/server/queues/tasks/CleanupDeletedTeamsTask.ts +++ b/server/queues/tasks/CleanupDeletedTeamsTask.ts @@ -1,18 +1,14 @@ import { subDays } from "date-fns"; import { Op } from "sequelize"; +import { Minute } from "@shared/utils/time"; import Logger from "@server/logging/Logger"; import { Team } from "@server/models"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; +import { TaskPriority } from "./base/BaseTask"; import CleanupDeletedTeamTask from "./CleanupDeletedTeamTask"; +import { CronTask, TaskInterval, Props } from "./base/CronTask"; -type Props = { - limit: number; -}; - -export default class CleanupDeletedTeamsTask extends BaseTask { - static cron = TaskSchedule.Hour; - - public async perform({ limit }: Props) { +export default class CleanupDeletedTeamsTask extends CronTask { + public async perform({ limit, partition }: Props) { Logger.info( "task", `Permanently destroying upto ${limit} teams older than 30 days…` @@ -23,6 +19,7 @@ export default class CleanupDeletedTeamsTask extends BaseTask { deletedAt: { [Op.lt]: subDays(new Date(), 30), }, + ...this.getPartitionWhereClause("id", partition), }, paranoid: false, limit, @@ -35,6 +32,13 @@ export default class CleanupDeletedTeamsTask extends BaseTask { } } + public get cron() { + return { + interval: TaskInterval.Hour, + partitionWindow: 15 * Minute.ms, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/CleanupDemotedUserTask.ts b/server/queues/tasks/CleanupDemotedUserTask.ts index 9255a45e6a..259e8c80f9 100644 --- a/server/queues/tasks/CleanupDemotedUserTask.ts +++ b/server/queues/tasks/CleanupDemotedUserTask.ts @@ -2,7 +2,7 @@ import Logger from "@server/logging/Logger"; import { WebhookSubscription, ApiKey, User } from "@server/models"; import { cannot } from "@server/policies"; import { sequelize } from "@server/storage/database"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; type Props = { userId: string; diff --git a/server/queues/tasks/CleanupExpiredAttachmentsTask.ts b/server/queues/tasks/CleanupExpiredAttachmentsTask.ts index 195f511724..40fc28c896 100644 --- a/server/queues/tasks/CleanupExpiredAttachmentsTask.ts +++ b/server/queues/tasks/CleanupExpiredAttachmentsTask.ts @@ -1,15 +1,10 @@ import { Op } from "sequelize"; import Logger from "@server/logging/Logger"; import { Attachment } from "@server/models"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; - -type Props = { - limit: number; -}; - -export default class CleanupExpiredAttachmentsTask extends BaseTask { - static cron = TaskSchedule.Hour; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, Props, TaskInterval } from "./base/CronTask"; +export default class CleanupExpiredAttachmentsTask extends CronTask { public async perform({ limit }: Props) { Logger.info("task", `Deleting expired attachments…`); const attachments = await Attachment.unscoped().findAll({ @@ -24,6 +19,12 @@ export default class CleanupExpiredAttachmentsTask extends BaseTask { Logger.info("task", `Removed ${attachments.length} attachments`); } + public get cron() { + return { + interval: TaskInterval.Hour, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts b/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts index 00b2a17d3d..abd393869a 100644 --- a/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts +++ b/server/queues/tasks/CleanupExpiredFileOperationsTask.test.ts @@ -4,6 +4,14 @@ import { FileOperation } from "@server/models"; import { buildFileOperation, buildTeam } from "@server/test/factories"; import CleanupExpiredFileOperationsTask from "./CleanupExpiredFileOperationsTask"; +const props = { + limit: 100, + partition: { + partitionIndex: 0, + partitionCount: 1, + }, +}; + describe("CleanupExpiredFileOperationsTask", () => { it("should expire exports older than 15 days ago", async () => { const team = await buildTeam(); @@ -20,7 +28,7 @@ describe("CleanupExpiredFileOperationsTask", () => { }); const task = new CleanupExpiredFileOperationsTask(); - await task.perform({ limit: 100 }); + await task.perform(props); const data = await FileOperation.count({ where: { @@ -47,7 +55,7 @@ describe("CleanupExpiredFileOperationsTask", () => { }); const task = new CleanupExpiredFileOperationsTask(); - await task.perform({ limit: 100 }); + await task.perform(props); const data = await FileOperation.count({ where: { diff --git a/server/queues/tasks/CleanupExpiredFileOperationsTask.ts b/server/queues/tasks/CleanupExpiredFileOperationsTask.ts index 4e3e79b1fa..3fa09b4ac7 100644 --- a/server/queues/tasks/CleanupExpiredFileOperationsTask.ts +++ b/server/queues/tasks/CleanupExpiredFileOperationsTask.ts @@ -3,15 +3,10 @@ import { Op } from "sequelize"; import { FileOperationState } from "@shared/types"; import Logger from "@server/logging/Logger"; import { FileOperation } from "@server/models"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; - -type Props = { - limit: number; -}; - -export default class CleanupExpiredFileOperationsTask extends BaseTask { - static cron = TaskSchedule.Hour; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, Props, TaskInterval } from "./base/CronTask"; +export default class CleanupExpiredFileOperationsTask extends CronTask { public async perform({ limit }: Props) { Logger.info("task", `Expiring file operations older than 15 days…`); const fileOperations = await FileOperation.unscoped().findAll({ @@ -31,6 +26,12 @@ export default class CleanupExpiredFileOperationsTask extends BaseTask { Logger.info("task", `Expired ${fileOperations.length} file operations`); } + public get cron() { + return { + interval: TaskInterval.Hour, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/CleanupOAuthAuthorizationCodeTask.ts b/server/queues/tasks/CleanupOAuthAuthorizationCodeTask.ts index 572c8c1ec6..e9b15b9de5 100644 --- a/server/queues/tasks/CleanupOAuthAuthorizationCodeTask.ts +++ b/server/queues/tasks/CleanupOAuthAuthorizationCodeTask.ts @@ -2,13 +2,10 @@ import { subMonths } from "date-fns"; import { Op } from "sequelize"; import Logger from "@server/logging/Logger"; import { OAuthAuthorizationCode } from "@server/models"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; - -type Props = Record; - -export default class CleanupOAuthAuthorizationCodeTask extends BaseTask { - static cron = TaskSchedule.Day; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, TaskInterval } from "./base/CronTask"; +export default class CleanupOAuthAuthorizationCodeTask extends CronTask { public async perform() { Logger.info( "task", @@ -24,6 +21,12 @@ export default class CleanupOAuthAuthorizationCodeTask extends BaseTask { Logger.info("task", `${count} expired OAuth authorization codes deleted.`); } + public get cron() { + return { + interval: TaskInterval.Day, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/CleanupOldEventsTask.ts b/server/queues/tasks/CleanupOldEventsTask.ts index 6d1189f7e4..61a167c937 100644 --- a/server/queues/tasks/CleanupOldEventsTask.ts +++ b/server/queues/tasks/CleanupOldEventsTask.ts @@ -2,17 +2,12 @@ import { subDays } from "date-fns"; import { Op } from "sequelize"; import Logger from "@server/logging/Logger"; import { Event } from "@server/models"; -import BaseTask, { - TaskPriority, - TaskSchedule, -} from "@server/queues/tasks/BaseTask"; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, Props, TaskInterval } from "./base/CronTask"; +import { Minute } from "@shared/utils/time"; -type Props = Record; - -export default class CleanupOldEventsTask extends BaseTask { - static cron = TaskSchedule.Hour; - - public async perform() { +export default class CleanupOldEventsTask extends CronTask { + public async perform({ partition }: Props) { // TODO: Hardcoded right now, configurable later const retentionDays = 365; const cutoffDate = subDays(new Date(), retentionDays); @@ -27,6 +22,7 @@ export default class CleanupOldEventsTask extends BaseTask { createdAt: { [Op.lt]: cutoffDate, }, + ...this.getPartitionWhereClause("id", partition), }, batchLimit: 1000, totalLimit: maxEventsPerTask, @@ -51,6 +47,13 @@ export default class CleanupOldEventsTask extends BaseTask { } } + public get cron() { + return { + interval: TaskInterval.Hour, + partitionWindow: 15 * Minute.ms, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/CleanupOldImportsTask.ts b/server/queues/tasks/CleanupOldImportsTask.ts index 2bb66b439a..886c4d2845 100644 --- a/server/queues/tasks/CleanupOldImportsTask.ts +++ b/server/queues/tasks/CleanupOldImportsTask.ts @@ -3,16 +3,13 @@ import { Op } from "sequelize"; import { ImportState } from "@shared/types"; import Logger from "@server/logging/Logger"; import { Import, ImportTask } from "@server/models"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; - -type Props = Record; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, TaskInterval } from "./base/CronTask"; /** * A task that deletes the import_tasks for old imports which are completed, errored (or) canceled. */ -export default class CleanupOldImportsTask extends BaseTask { - static cron = TaskSchedule.Day; - +export default class CleanupOldImportsTask extends CronTask { public async perform() { // TODO: Hardcoded right now, configurable later const retentionDays = 1; @@ -76,6 +73,12 @@ export default class CleanupOldImportsTask extends BaseTask { } } + public get cron() { + return { + interval: TaskInterval.Day, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/CleanupOldNotificationsTask.ts b/server/queues/tasks/CleanupOldNotificationsTask.ts index 5a4624f528..b9c2eaeecb 100644 --- a/server/queues/tasks/CleanupOldNotificationsTask.ts +++ b/server/queues/tasks/CleanupOldNotificationsTask.ts @@ -2,14 +2,12 @@ import { subMonths } from "date-fns"; import { Op } from "sequelize"; import Logger from "@server/logging/Logger"; import { Notification } from "@server/models"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; +import { TaskPriority } from "./base/BaseTask"; +import { Minute } from "@shared/utils/time"; +import { CronTask, Props, TaskInterval } from "./base/CronTask"; -type Props = Record; - -export default class CleanupOldNotificationsTask extends BaseTask { - static cron = TaskSchedule.Hour; - - public async perform() { +export default class CleanupOldNotificationsTask extends CronTask { + public async perform({ partition }: Props) { Logger.info("task", `Permanently destroying old notifications…`); let count; @@ -18,6 +16,7 @@ export default class CleanupOldNotificationsTask extends BaseTask { createdAt: { [Op.lt]: subMonths(new Date(), 12), }, + ...this.getPartitionWhereClause("id", partition), }, }); @@ -34,6 +33,7 @@ export default class CleanupOldNotificationsTask extends BaseTask { createdAt: { [Op.lt]: subMonths(new Date(), 6), }, + ...this.getPartitionWhereClause("id", partition), }, }); @@ -43,6 +43,13 @@ export default class CleanupOldNotificationsTask extends BaseTask { ); } + public get cron() { + return { + interval: TaskInterval.Hour, + partitionWindow: 15 * Minute.ms, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/CollectionAddUserNotificationsTask.ts b/server/queues/tasks/CollectionAddUserNotificationsTask.ts index 1bbf45723e..5ca4aa4285 100644 --- a/server/queues/tasks/CollectionAddUserNotificationsTask.ts +++ b/server/queues/tasks/CollectionAddUserNotificationsTask.ts @@ -1,7 +1,7 @@ import { NotificationEventType } from "@shared/types"; import { Notification, User } from "@server/models"; import { CollectionUserEvent } from "@server/types"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; export default class CollectionAddUserNotificationsTask extends BaseTask { public async perform(event: CollectionUserEvent) { diff --git a/server/queues/tasks/CollectionCreatedNotificationsTask.ts b/server/queues/tasks/CollectionCreatedNotificationsTask.ts index ff506d112e..0c9d1d5883 100644 --- a/server/queues/tasks/CollectionCreatedNotificationsTask.ts +++ b/server/queues/tasks/CollectionCreatedNotificationsTask.ts @@ -2,7 +2,7 @@ import { NotificationEventType } from "@shared/types"; import { Collection, Notification } from "@server/models"; import NotificationHelper from "@server/models/helpers/NotificationHelper"; import { CollectionEvent } from "@server/types"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; export default class CollectionCreatedNotificationsTask extends BaseTask { public async perform(event: CollectionEvent) { diff --git a/server/queues/tasks/CollectionSubscriptionRemoveUserTask.ts b/server/queues/tasks/CollectionSubscriptionRemoveUserTask.ts index a0f1d48eb0..aa59d5fb21 100644 --- a/server/queues/tasks/CollectionSubscriptionRemoveUserTask.ts +++ b/server/queues/tasks/CollectionSubscriptionRemoveUserTask.ts @@ -6,7 +6,7 @@ import { Collection, Subscription, User } from "@server/models"; import { can } from "@server/policies"; import { sequelize } from "@server/storage/database"; import { CollectionUserEvent } from "@server/types"; -import BaseTask from "./BaseTask"; +import { BaseTask } from "./base/BaseTask"; export default class CollectionSubscriptionRemoveUserTask extends BaseTask { public async perform(event: CollectionUserEvent) { diff --git a/server/queues/tasks/CommentCreatedNotificationsTask.ts b/server/queues/tasks/CommentCreatedNotificationsTask.ts index 983181aee9..0d599621d5 100644 --- a/server/queues/tasks/CommentCreatedNotificationsTask.ts +++ b/server/queues/tasks/CommentCreatedNotificationsTask.ts @@ -18,7 +18,7 @@ import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper"; import { sequelize } from "@server/storage/database"; import { CommentEvent } from "@server/types"; import { canUserAccessDocument } from "@server/utils/permissions"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; export default class CommentCreatedNotificationsTask extends BaseTask { public async perform(event: CommentEvent) { diff --git a/server/queues/tasks/CommentUpdatedNotificationsTask.ts b/server/queues/tasks/CommentUpdatedNotificationsTask.ts index e79ceae2bd..b647981dfb 100644 --- a/server/queues/tasks/CommentUpdatedNotificationsTask.ts +++ b/server/queues/tasks/CommentUpdatedNotificationsTask.ts @@ -12,7 +12,7 @@ import { import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper"; import { CommentEvent, CommentUpdateEvent } from "@server/types"; import { canUserAccessDocument } from "@server/utils/permissions"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; export default class CommentUpdatedNotificationsTask extends BaseTask { public async perform(event: CommentUpdateEvent) { diff --git a/server/queues/tasks/DeleteAttachmentTask.ts b/server/queues/tasks/DeleteAttachmentTask.ts index 19fe72ffb5..756202de09 100644 --- a/server/queues/tasks/DeleteAttachmentTask.ts +++ b/server/queues/tasks/DeleteAttachmentTask.ts @@ -1,5 +1,5 @@ import { Attachment } from "@server/models"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; type Props = { teamId: string; diff --git a/server/queues/tasks/DetachDraftsFromCollectionTask.ts b/server/queues/tasks/DetachDraftsFromCollectionTask.ts index c23a5763ef..622b7595ee 100644 --- a/server/queues/tasks/DetachDraftsFromCollectionTask.ts +++ b/server/queues/tasks/DetachDraftsFromCollectionTask.ts @@ -2,7 +2,7 @@ import { Op } from "sequelize"; import documentMover from "@server/commands/documentMover"; import { Collection, Document, User } from "@server/models"; import { sequelize } from "@server/storage/database"; -import BaseTask from "./BaseTask"; +import { BaseTask } from "./base/BaseTask"; import { createContext } from "@server/context"; type Props = { diff --git a/server/queues/tasks/DocumentAddGroupNotificationsTask.ts b/server/queues/tasks/DocumentAddGroupNotificationsTask.ts index 8a7c3746cc..bd2e1e5521 100644 --- a/server/queues/tasks/DocumentAddGroupNotificationsTask.ts +++ b/server/queues/tasks/DocumentAddGroupNotificationsTask.ts @@ -1,7 +1,7 @@ import { Op } from "sequelize"; import { GroupUser } from "@server/models"; import { DocumentGroupEvent, DocumentUserEvent } from "@server/types"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; import DocumentAddUserNotificationsTask from "./DocumentAddUserNotificationsTask"; export default class DocumentAddGroupNotificationsTask extends BaseTask { diff --git a/server/queues/tasks/DocumentAddUserNotificationsTask.ts b/server/queues/tasks/DocumentAddUserNotificationsTask.ts index 9b60dcd9e0..746d09739c 100644 --- a/server/queues/tasks/DocumentAddUserNotificationsTask.ts +++ b/server/queues/tasks/DocumentAddUserNotificationsTask.ts @@ -3,7 +3,7 @@ import Logger from "@server/logging/Logger"; import { Notification, User } from "@server/models"; import { DocumentUserEvent } from "@server/types"; import { isElevatedPermission } from "@server/utils/permissions"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; export default class DocumentAddUserNotificationsTask extends BaseTask { public async perform(event: DocumentUserEvent) { diff --git a/server/queues/tasks/DocumentImportTask.ts b/server/queues/tasks/DocumentImportTask.ts index a0339190fc..7caf57b9fa 100644 --- a/server/queues/tasks/DocumentImportTask.ts +++ b/server/queues/tasks/DocumentImportTask.ts @@ -5,7 +5,7 @@ import { createContext } from "@server/context"; import { User } from "@server/models"; import { sequelize } from "@server/storage/database"; import FileStorage from "@server/storage/files"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; type Props = { userId: string; diff --git a/server/queues/tasks/DocumentPublishedNotificationsTask.ts b/server/queues/tasks/DocumentPublishedNotificationsTask.ts index 80bc5f790d..243abb22bb 100644 --- a/server/queues/tasks/DocumentPublishedNotificationsTask.ts +++ b/server/queues/tasks/DocumentPublishedNotificationsTask.ts @@ -5,7 +5,7 @@ import { DocumentHelper } from "@server/models/helpers/DocumentHelper"; import NotificationHelper from "@server/models/helpers/NotificationHelper"; import { DocumentEvent } from "@server/types"; import { canUserAccessDocument } from "@server/utils/permissions"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; export default class DocumentPublishedNotificationsTask extends BaseTask { public async perform(event: DocumentEvent) { diff --git a/server/queues/tasks/DocumentSubscriptionRemoveUserTask.ts b/server/queues/tasks/DocumentSubscriptionRemoveUserTask.ts index 6151d561e3..e686c02d6a 100644 --- a/server/queues/tasks/DocumentSubscriptionRemoveUserTask.ts +++ b/server/queues/tasks/DocumentSubscriptionRemoveUserTask.ts @@ -6,7 +6,7 @@ import { Document, Subscription, User } from "@server/models"; import { can } from "@server/policies"; import { sequelize } from "@server/storage/database"; import { DocumentUserEvent } from "@server/types"; -import BaseTask from "./BaseTask"; +import { BaseTask } from "./base/BaseTask"; export default class DocumentSubscriptionRemoveUserTask extends BaseTask { public async perform(event: DocumentUserEvent) { diff --git a/server/queues/tasks/DocumentUpdateTextTask.ts b/server/queues/tasks/DocumentUpdateTextTask.ts index 2d0800f525..d7184ae1ab 100644 --- a/server/queues/tasks/DocumentUpdateTextTask.ts +++ b/server/queues/tasks/DocumentUpdateTextTask.ts @@ -5,7 +5,7 @@ import { schema, serializer } from "@server/editor"; import { Document } from "@server/models"; import { DocumentEvent } from "@server/types"; import { DocumentHelper } from "@server/models/helpers/DocumentHelper"; -import BaseTask from "./BaseTask"; +import { BaseTask } from "./base/BaseTask"; export default class DocumentUpdateTextTask extends BaseTask { public async perform(event: DocumentEvent) { diff --git a/server/queues/tasks/EmailTask.ts b/server/queues/tasks/EmailTask.ts index 618226bb81..a026e3bd1c 100644 --- a/server/queues/tasks/EmailTask.ts +++ b/server/queues/tasks/EmailTask.ts @@ -1,5 +1,5 @@ import emails from "@server/emails/templates"; -import BaseTask from "./BaseTask"; +import { BaseTask } from "./base/BaseTask"; type Props = { templateName: string; diff --git a/server/queues/tasks/EmptyTrashTask.ts b/server/queues/tasks/EmptyTrashTask.ts index ac3831705a..ffcbc5d568 100644 --- a/server/queues/tasks/EmptyTrashTask.ts +++ b/server/queues/tasks/EmptyTrashTask.ts @@ -1,7 +1,7 @@ import { Op } from "sequelize"; import documentPermanentDeleter from "@server/commands/documentPermanentDeleter"; import { Document } from "@server/models"; -import BaseTask from "./BaseTask"; +import { BaseTask } from "./base/BaseTask"; type Props = { documentIds: string[]; diff --git a/server/queues/tasks/ErrorTimedOutFileOperationsTask.test.ts b/server/queues/tasks/ErrorTimedOutFileOperationsTask.test.ts index e706676925..8fe45cd214 100644 --- a/server/queues/tasks/ErrorTimedOutFileOperationsTask.test.ts +++ b/server/queues/tasks/ErrorTimedOutFileOperationsTask.test.ts @@ -4,6 +4,14 @@ import { FileOperation } from "@server/models"; import { buildFileOperation, buildTeam } from "@server/test/factories"; import ErrorTimedOutFileOperationsTask from "./ErrorTimedOutFileOperationsTask"; +const props = { + limit: 100, + partition: { + partitionIndex: 0, + partitionCount: 1, + }, +}; + describe("ErrorTimedOutFileOperationsTask", () => { it("should error exports older than 12 hours", async () => { const team = await buildTeam(); @@ -20,7 +28,7 @@ describe("ErrorTimedOutFileOperationsTask", () => { }); const task = new ErrorTimedOutFileOperationsTask(); - await task.perform({ limit: 100 }); + await task.perform(props); const data = await FileOperation.count({ where: { @@ -41,7 +49,7 @@ describe("ErrorTimedOutFileOperationsTask", () => { }); const task = new ErrorTimedOutFileOperationsTask(); - await task.perform({ limit: 100 }); + await task.perform(props); const data = await FileOperation.count({ where: { diff --git a/server/queues/tasks/ErrorTimedOutFileOperationsTask.ts b/server/queues/tasks/ErrorTimedOutFileOperationsTask.ts index c5e99e33c0..03688296ba 100644 --- a/server/queues/tasks/ErrorTimedOutFileOperationsTask.ts +++ b/server/queues/tasks/ErrorTimedOutFileOperationsTask.ts @@ -3,15 +3,10 @@ import { Op } from "sequelize"; import { FileOperationState } from "@shared/types"; import Logger from "@server/logging/Logger"; import { FileOperation } from "@server/models"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; - -type Props = { - limit: number; -}; - -export default class ErrorTimedOutFileOperationsTask extends BaseTask { - static cron = TaskSchedule.Hour; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, TaskInterval, Props } from "./base/CronTask"; +export default class ErrorTimedOutFileOperationsTask extends CronTask { public async perform({ limit }: Props) { Logger.info("task", `Error file operations running longer than 12 hours…`); const fileOperations = await FileOperation.unscoped().findAll({ @@ -40,6 +35,12 @@ export default class ErrorTimedOutFileOperationsTask extends BaseTask { Logger.info("task", `Updated ${fileOperations.length} file operations`); } + public get cron() { + return { + interval: TaskInterval.Hour, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/ErrorTimedOutImportsTask.ts b/server/queues/tasks/ErrorTimedOutImportsTask.ts index 9f3559e774..523c40c6d5 100644 --- a/server/queues/tasks/ErrorTimedOutImportsTask.ts +++ b/server/queues/tasks/ErrorTimedOutImportsTask.ts @@ -4,18 +4,13 @@ import { ImportState, ImportTaskState } from "@shared/types"; import Logger from "@server/logging/Logger"; import { Import, ImportTask } from "@server/models"; import { sequelize } from "@server/storage/database"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; - -type Props = { - limit: number; -}; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, TaskInterval, Props } from "./base/CronTask"; /** * A task that moves the stuck imports to errored state. */ -export default class ErrorTimedOutImportsTask extends BaseTask { - static cron = TaskSchedule.Hour; - +export default class ErrorTimedOutImportsTask extends CronTask { public async perform({ limit }: Props) { // TODO: Hardcoded right now, configurable later const thresholdHours = 24; @@ -82,6 +77,12 @@ export default class ErrorTimedOutImportsTask extends BaseTask { } } + public get cron() { + return { + interval: TaskInterval.Hour, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/ExportTask.ts b/server/queues/tasks/ExportTask.ts index 43beba792a..348ba96a1b 100644 --- a/server/queues/tasks/ExportTask.ts +++ b/server/queues/tasks/ExportTask.ts @@ -17,7 +17,7 @@ import { } from "@server/models"; import fileOperationPresenter from "@server/presenters/fileOperation"; import FileStorage from "@server/storage/files"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; import { Op } from "sequelize"; import { WhereOptions } from "sequelize"; import { sequelizeReadOnly } from "@server/storage/database"; diff --git a/server/queues/tasks/ImportTask.ts b/server/queues/tasks/ImportTask.ts index 9d3ecbcd2d..8428d748bf 100644 --- a/server/queues/tasks/ImportTask.ts +++ b/server/queues/tasks/ImportTask.ts @@ -28,7 +28,7 @@ import { import { sequelize } from "@server/storage/database"; import ZipHelper from "@server/utils/ZipHelper"; import { generateUrlId } from "@server/utils/url"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; import env from "@server/env"; type Props = { diff --git a/server/queues/tasks/InviteReminderTask.ts b/server/queues/tasks/InviteReminderTask.ts index be559c777f..3bf734b13d 100644 --- a/server/queues/tasks/InviteReminderTask.ts +++ b/server/queues/tasks/InviteReminderTask.ts @@ -4,13 +4,10 @@ import InviteReminderEmail from "@server/emails/templates/InviteReminderEmail"; import { User } from "@server/models"; import { UserFlag } from "@server/models/User"; import { sequelize } from "@server/storage/database"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; - -type Props = Record; - -export default class InviteReminderTask extends BaseTask { - static cron = TaskSchedule.Day; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, TaskInterval } from "./base/CronTask"; +export default class InviteReminderTask extends CronTask { public async perform() { const users = await User.scope("invited").findAll({ attributes: ["id"], @@ -58,6 +55,12 @@ export default class InviteReminderTask extends BaseTask { } } + public get cron() { + return { + interval: TaskInterval.Day, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/ReactionCreatedNotificationsTask.ts b/server/queues/tasks/ReactionCreatedNotificationsTask.ts index 31c3356348..b7df930b46 100644 --- a/server/queues/tasks/ReactionCreatedNotificationsTask.ts +++ b/server/queues/tasks/ReactionCreatedNotificationsTask.ts @@ -2,7 +2,7 @@ import { NotificationEventType } from "@shared/types"; import { Comment, Document, Notification, User } from "@server/models"; import { CommentReactionEvent } from "@server/types"; import { canUserAccessDocument } from "@server/utils/permissions"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; export default class ReactionCreatedNotificationsTask extends BaseTask { public async perform(event: CommentReactionEvent) { diff --git a/server/queues/tasks/ReactionRemovedNotificationsTask.ts b/server/queues/tasks/ReactionRemovedNotificationsTask.ts index 9f762c1d7a..86326939a0 100644 --- a/server/queues/tasks/ReactionRemovedNotificationsTask.ts +++ b/server/queues/tasks/ReactionRemovedNotificationsTask.ts @@ -1,7 +1,7 @@ import { NotificationEventType } from "@shared/types"; import { Notification, User } from "@server/models"; import { CommentReactionEvent } from "@server/types"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; import { createContext } from "@server/context"; import { sequelize } from "@server/storage/database"; import { Op } from "sequelize"; diff --git a/server/queues/tasks/RevisionCreatedNotificationsTask.ts b/server/queues/tasks/RevisionCreatedNotificationsTask.ts index 8f5cf99e09..18668022e0 100644 --- a/server/queues/tasks/RevisionCreatedNotificationsTask.ts +++ b/server/queues/tasks/RevisionCreatedNotificationsTask.ts @@ -17,7 +17,7 @@ import { DocumentHelper } from "@server/models/helpers/DocumentHelper"; import NotificationHelper from "@server/models/helpers/NotificationHelper"; import { RevisionEvent } from "@server/types"; import { canUserAccessDocument } from "@server/utils/permissions"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; export default class RevisionCreatedNotificationsTask extends BaseTask { public async perform(event: RevisionEvent) { diff --git a/server/queues/tasks/UpdateDocumentsPopularityScoreTask.ts b/server/queues/tasks/UpdateDocumentsPopularityScoreTask.ts index 2d1424719b..b2c7e7ee55 100644 --- a/server/queues/tasks/UpdateDocumentsPopularityScoreTask.ts +++ b/server/queues/tasks/UpdateDocumentsPopularityScoreTask.ts @@ -1,13 +1,12 @@ import crypto from "crypto"; import { subWeeks } from "date-fns"; import { QueryTypes } from "sequelize"; +import { Minute } from "@shared/utils/time"; import env from "@server/env"; import Logger from "@server/logging/Logger"; -import BaseTask, { TaskSchedule } from "./BaseTask"; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, PartitionInfo, Props, TaskInterval } from "./base/CronTask"; import { sequelize, sequelizeReadOnly } from "@server/storage/database"; -import { sleep } from "@server/utils/timers"; - -type Props = Record; /** * Number of hours to add to age to smooth the decay curve, @@ -26,15 +25,10 @@ const ACTIVITY_WEIGHTS = { }; /** - * Batch size for processing updates - kept small to minimize lock duration + * Batch size for processing updates - kept small to minimize query duration */ const BATCH_SIZE = 50; -/** - * Maximum retries for failed batch operations - */ -const MAX_RETRIES = 2; - /** * Statement timeout for individual queries to prevent runaway locks */ @@ -50,16 +44,13 @@ interface DocumentScore { score: number; } -export default class UpdateDocumentsPopularityScoreTask extends BaseTask { +export default class UpdateDocumentsPopularityScoreTask extends CronTask { /** * Unique table name for this task run to prevent conflicts with concurrent runs */ private workingTable: string = ""; - static cron = TaskSchedule.Hour; - - public async perform() { - Logger.info("task", "Updating document popularity scores…"); + public async perform({ partition }: Props) { // Only run every 6 hours (at hours 0, 6, 12, 18) const currentHour = new Date().getHours(); if (currentHour % 6 !== 0) { @@ -79,7 +70,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask try { // Setup: Create working table and populate with active document IDs - await this.setupWorkingTable(threshold); + await this.setupWorkingTable(threshold, partition); const activeCount = await this.getWorkingTableCount(); @@ -107,16 +98,13 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask batchNumber++; try { - const updated = await this.processBatchWithRetry(threshold, now); + const updated = await this.processBatch(threshold, now); totalUpdated += updated; Logger.debug( "task", `Batch ${batchNumber}: updated ${updated} documents, ${remaining - updated} remaining` ); - - // Add delay between batches to reduce database contention - await sleep(10); } catch (error) { totalErrors++; Logger.error(`Batch ${batchNumber} failed after retries`, error); @@ -144,7 +132,10 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask * that have recent activity. Unlogged tables are faster because they * skip WAL logging, and data loss on crash is acceptable here. */ - private async setupWorkingTable(threshold: Date): Promise { + private async setupWorkingTable( + threshold: Date, + partition: PartitionInfo + ): Promise { // Drop any existing table first to avoid type conflicts from previous crashed runs await sequelize.query(`DROP TABLE IF EXISTS ${this.workingTable} CASCADE`); @@ -156,6 +147,8 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask ) `); + const [startUuid, endUuid] = this.getPartitionBounds(partition); + // Populate with documents that have recent activity and are valid // (published, not deleted). Using JOINs to filter upfront. await sequelize.query( @@ -179,8 +172,9 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask WHERE v."documentId" = d.id AND v."updatedAt" >= :threshold ) ) + ${startUuid && endUuid ? "AND d.id >= :startUuid AND d.id <= :endUuid" : ""} `, - { replacements: { threshold } } + { replacements: { threshold, startUuid, endUuid } } ); // Create index on processed column for efficient batch selection @@ -203,57 +197,41 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask /** * Processes a batch of documents with retry logic. */ - private async processBatchWithRetry( - threshold: Date, - now: Date, - attempt = 1 - ): Promise { - try { - // Step 1: Get batch of document IDs to process - const batch = await sequelize.query<{ documentId: string }>( - ` + private async processBatch(threshold: Date, now: Date): Promise { + // Step 1: Get batch of document IDs to process + const batch = await sequelize.query<{ documentId: string }>( + ` SELECT "documentId" FROM ${this.workingTable} WHERE NOT processed ORDER BY "documentId" LIMIT :limit `, - { - replacements: { limit: BATCH_SIZE }, - type: QueryTypes.SELECT, - } - ); - - if (batch.length === 0) { - return 0; + { + replacements: { limit: BATCH_SIZE }, + type: QueryTypes.SELECT, } + ); - const documentIds = batch.map((b) => b.documentId); - - // Step 2: Calculate scores outside of a transaction - const scores = await this.calculateScoresForDocuments( - documentIds, - threshold, - now - ); - - // Step 3: Update document scores - await this.updateDocumentScores(scores); - - // Step 4: Mark batch as processed - await this.markBatchProcessed(documentIds); - - return documentIds.length; - } catch (error) { - if (attempt < MAX_RETRIES) { - Logger.warn( - `Batch update failed, retrying (attempt ${attempt + 1}/${MAX_RETRIES})`, - { error } - ); - await sleep(1000 * attempt); - return this.processBatchWithRetry(threshold, now, attempt + 1); - } - throw error; + if (batch.length === 0) { + return 0; } + + const documentIds = batch.map((b) => b.documentId); + + // Step 2: Calculate scores outside of a transaction + const scores = await this.calculateScoresForDocuments( + documentIds, + threshold, + now + ); + + // Step 3: Update document scores + await this.updateDocumentScores(scores); + + // Step 4: Mark batch as processed + await this.markBatchProcessed(documentIds); + + return documentIds.length; } /** @@ -409,4 +387,18 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask Logger.warn("Failed to clean up working table", { error }); } } + + public get cron() { + return { + interval: TaskInterval.Hour, + partitionWindow: 30 * Minute.ms, + }; + } + + public get options() { + return { + attempts: 1, + priority: TaskPriority.Background, + }; + } } diff --git a/server/queues/tasks/UpdateTeamAttachmentsSizeTask.ts b/server/queues/tasks/UpdateTeamAttachmentsSizeTask.ts index 5d50f891b6..ea11dacae0 100644 --- a/server/queues/tasks/UpdateTeamAttachmentsSizeTask.ts +++ b/server/queues/tasks/UpdateTeamAttachmentsSizeTask.ts @@ -1,5 +1,5 @@ import { Attachment, Team } from "@server/models"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; import { sequelizeReadOnly } from "@server/storage/database"; type Props = { diff --git a/server/queues/tasks/UpdateTeamsAttachmentsSizeTask.ts b/server/queues/tasks/UpdateTeamsAttachmentsSizeTask.ts index d91d1b0694..b76b12aee6 100644 --- a/server/queues/tasks/UpdateTeamsAttachmentsSizeTask.ts +++ b/server/queues/tasks/UpdateTeamsAttachmentsSizeTask.ts @@ -3,16 +3,15 @@ import { Op } from "sequelize"; import Logger from "@server/logging/Logger"; import { Attachment } from "@server/models"; import { sequelize } from "@server/storage/database"; -import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask"; +import { TaskPriority } from "./base/BaseTask"; +import { CronTask, TaskInterval } from "./base/CronTask"; import UpdateTeamAttachmentsSizeTask from "./UpdateTeamAttachmentsSizeTask"; type Props = { limit: number; }; -export default class UpdateTeamsAttachmentsSizeTask extends BaseTask { - static cron = TaskSchedule.Day; - +export default class UpdateTeamsAttachmentsSizeTask extends CronTask { public async perform({ limit }: Props) { Logger.info( "task", @@ -44,6 +43,12 @@ export default class UpdateTeamsAttachmentsSizeTask extends BaseTask { ); } + public get cron() { + return { + interval: TaskInterval.Day, + }; + } + public get options() { return { attempts: 1, diff --git a/server/queues/tasks/UploadAttachmentFromUrlTask.ts b/server/queues/tasks/UploadAttachmentFromUrlTask.ts index 81aa606deb..5c7c18a1df 100644 --- a/server/queues/tasks/UploadAttachmentFromUrlTask.ts +++ b/server/queues/tasks/UploadAttachmentFromUrlTask.ts @@ -1,7 +1,7 @@ import { createContext } from "@server/context"; import { Attachment } from "@server/models"; import FileStorage from "@server/storage/files"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; import { sequelize } from "@server/storage/database"; type Props = { diff --git a/server/queues/tasks/UploadAttachmentsForImportTask.ts b/server/queues/tasks/UploadAttachmentsForImportTask.ts index 1c04de11ca..3cc52b85aa 100644 --- a/server/queues/tasks/UploadAttachmentsForImportTask.ts +++ b/server/queues/tasks/UploadAttachmentsForImportTask.ts @@ -2,7 +2,7 @@ import { Sema } from "async-sema"; import Logger from "@server/logging/Logger"; import { Attachment } from "@server/models"; import FileStorage from "@server/storage/files"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; import env from "@server/env"; const ConcurrentUploads = 5; diff --git a/server/queues/tasks/UploadTeamAvatarTask.ts b/server/queues/tasks/UploadTeamAvatarTask.ts index d8f8e0115c..bbce066b8c 100644 --- a/server/queues/tasks/UploadTeamAvatarTask.ts +++ b/server/queues/tasks/UploadTeamAvatarTask.ts @@ -2,7 +2,7 @@ import { randomUUID } from "crypto"; import { Team } from "@server/models"; import { Buckets } from "@server/models/helpers/AttachmentHelper"; import FileStorage from "@server/storage/files"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; type Props = { /** The teamId to operate on */ diff --git a/server/queues/tasks/UploadUserAvatarTask.ts b/server/queues/tasks/UploadUserAvatarTask.ts index 3b930b77c6..66a1360677 100644 --- a/server/queues/tasks/UploadUserAvatarTask.ts +++ b/server/queues/tasks/UploadUserAvatarTask.ts @@ -2,7 +2,7 @@ import { createHash, randomUUID } from "crypto"; import { User } from "@server/models"; import { Buckets } from "@server/models/helpers/AttachmentHelper"; import FileStorage from "@server/storage/files"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; type Props = { /** The userId to operate on */ diff --git a/server/queues/tasks/ValidateSSOAccessTask.ts b/server/queues/tasks/ValidateSSOAccessTask.ts index 002ddd600e..3ad4c7309e 100644 --- a/server/queues/tasks/ValidateSSOAccessTask.ts +++ b/server/queues/tasks/ValidateSSOAccessTask.ts @@ -1,7 +1,7 @@ import Logger from "@server/logging/Logger"; import { User, UserAuthentication } from "@server/models"; import { sequelize } from "@server/storage/database"; -import BaseTask, { TaskPriority } from "./BaseTask"; +import { BaseTask, TaskPriority } from "./base/BaseTask"; type Props = { userId: string; diff --git a/server/queues/tasks/BaseTask.ts b/server/queues/tasks/base/BaseTask.ts similarity index 78% rename from server/queues/tasks/BaseTask.ts rename to server/queues/tasks/base/BaseTask.ts index 3b423fb1f8..6cee76b264 100644 --- a/server/queues/tasks/BaseTask.ts +++ b/server/queues/tasks/base/BaseTask.ts @@ -1,5 +1,5 @@ import { Job, JobOptions } from "bull"; -import { taskQueue } from "../"; +import { taskQueue } from "../../"; export enum TaskPriority { Background = 40, @@ -8,18 +8,7 @@ export enum TaskPriority { High = 10, } -export enum TaskSchedule { - Day = "daily", - Hour = "hourly", - Minute = "minute", -} - -export default abstract class BaseTask> { - /** - * An optional schedule for this task to be run automatically. - */ - static cron: TaskSchedule | undefined; - +export abstract class BaseTask> { /** * Schedule this task type to be processed asynchronously by a worker. * @@ -57,7 +46,7 @@ export default abstract class BaseTask> { } /** - * Job options such as priority and retry strategy, as defined by Bull. + * Job options such as priority and retry strategy. */ public get options(): JobOptions { return { diff --git a/server/queues/tasks/base/CronTask.test.ts b/server/queues/tasks/base/CronTask.test.ts new file mode 100644 index 0000000000..1a5c35e61b --- /dev/null +++ b/server/queues/tasks/base/CronTask.test.ts @@ -0,0 +1,297 @@ +import { Op } from "sequelize"; +import { CronTask, PartitionInfo, TaskInterval } from "./CronTask"; + +// Create a concrete implementation of CronTask for testing +class TestTask extends CronTask { + public async perform() { + // Not used in these tests + } + + public get cron() { + return { + interval: TaskInterval.Day, + }; + } + + public testPartitionWhereClause( + idField: string, + partition: PartitionInfo | undefined + ) { + return this.getPartitionWhereClause(idField, partition); + } +} + +describe("CronTask", () => { + let task: TestTask; + + beforeEach(() => { + task = new TestTask(); + }); + + describe("getPartitionWhereClause", () => { + it("should return empty object when partition is undefined", () => { + const where = task.testPartitionWhereClause("id", undefined); + expect(where).toEqual({}); + }); + + it("should generate range WHERE clause for valid partition", () => { + const where = task.testPartitionWhereClause("id", { + partitionIndex: 0, + partitionCount: 3, + }) as any; + + expect(where).toBeDefined(); + expect(where.id).toBeDefined(); + expect(where.id[Op.gte]).toBeDefined(); + expect(where.id[Op.lte]).toBeDefined(); + }); + + it("should generate correct UUID ranges for 3 partitions", () => { + const where0 = task.testPartitionWhereClause("id", { + partitionIndex: 0, + partitionCount: 3, + }) as any; + + const where1 = task.testPartitionWhereClause("id", { + partitionIndex: 1, + partitionCount: 3, + }) as any; + + const where2 = task.testPartitionWhereClause("id", { + partitionIndex: 2, + partitionCount: 3, + }) as any; + + // Partition 0: Should start from 00000000 + expect(where0.id[Op.gte]).toBe("00000000-0000-4000-8000-000000000000"); + expect(where0.id[Op.lte]).toBe("55555554-ffff-4fff-bfff-ffffffffffff"); + + // Partition 1: Should start from 55555555 + expect(where1.id[Op.gte]).toBe("55555555-0000-4000-8000-000000000000"); + expect(where1.id[Op.lte]).toBe("aaaaaaa9-ffff-4fff-bfff-ffffffffffff"); + + // Partition 2: Should end at ffffffff + expect(where2.id[Op.gte]).toBe("aaaaaaaa-0000-4000-8000-000000000000"); + expect(where2.id[Op.lte]).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff"); + }); + + it("should generate correct UUID ranges for 2 partitions", () => { + const where0 = task.testPartitionWhereClause("id", { + partitionIndex: 0, + partitionCount: 2, + }) as any; + + const where1 = task.testPartitionWhereClause("id", { + partitionIndex: 1, + partitionCount: 2, + }) as any; + + // Partition 0: 0x00000000 to 0x7fffffff + expect(where0.id[Op.gte]).toBe("00000000-0000-4000-8000-000000000000"); + expect(where0.id[Op.lte]).toBe("7fffffff-ffff-4fff-bfff-ffffffffffff"); + + // Partition 1: 0x80000000 to 0xffffffff + expect(where1.id[Op.gte]).toBe("80000000-0000-4000-8000-000000000000"); + expect(where1.id[Op.lte]).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff"); + }); + + it("should distribute UUID space evenly", () => { + const partitionCount = 4; + const ranges: Array<{ start: string; end: string }> = []; + + for (let i = 0; i < partitionCount; i++) { + const where = task.testPartitionWhereClause("id", { + partitionIndex: i, + partitionCount, + }) as any; + ranges.push({ + start: where.id[Op.gte], + end: where.id[Op.lte], + }); + } + + // Check that ranges don't overlap and cover the entire space + expect(ranges[0].start).toBe("00000000-0000-4000-8000-000000000000"); + expect(ranges[3].end).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff"); + + // Check that each range ends where the next begins (approximately) + for (let i = 0; i < partitionCount - 1; i++) { + const currentEnd = ranges[i].end.substring(0, 8); + const nextStart = ranges[i + 1].start.substring(0, 8); + + // Convert to numbers to check they're consecutive + const currentEndNum = parseInt(currentEnd, 16); + const nextStartNum = parseInt(nextStart, 16); + + // Should be consecutive or very close + expect(nextStartNum - currentEndNum).toBeLessThanOrEqual(1); + } + }); + + it("should handle single partition (no partitioning)", () => { + const where = task.testPartitionWhereClause("id", { + partitionIndex: 0, + partitionCount: 1, + }) as any; + + // Should cover entire UUID space + expect(where.id[Op.gte]).toBe("00000000-0000-4000-8000-000000000000"); + expect(where.id[Op.lte]).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff"); + }); + + it("should throw error for invalid partition info", () => { + expect(() => { + task.testPartitionWhereClause("id", { + partitionIndex: -1, + partitionCount: 3, + }); + }).toThrow("Invalid partition info: index -1, count 3"); + + expect(() => { + task.testPartitionWhereClause("id", { + partitionIndex: 3, + partitionCount: 3, + }); + }).toThrow("Invalid partition info: index 3, count 3"); + + expect(() => { + task.testPartitionWhereClause("id", { + partitionIndex: 0, + partitionCount: 0, + }); + }).toThrow("Invalid partition info: index 0, count 0"); + }); + + it("should work with different field names", () => { + const where1 = task.testPartitionWhereClause("id", { + partitionIndex: 0, + partitionCount: 2, + }) as any; + + const where2 = task.testPartitionWhereClause("documentId", { + partitionIndex: 0, + partitionCount: 2, + }) as any; + + expect(where1.id).toBeDefined(); + expect(where1.documentId).toBeUndefined(); + expect(where2.documentId).toBeDefined(); + expect(where2.id).toBeUndefined(); + }); + + it("should handle large partition counts efficiently", () => { + const partitionCount = 100; + const ranges: Array<{ start: string; end: string }> = []; + + for (let i = 0; i < partitionCount; i++) { + const where = task.testPartitionWhereClause("id", { + partitionIndex: i, + partitionCount, + }) as any; + ranges.push({ + start: where.id[Op.gte], + end: where.id[Op.lte], + }); + } + + // First partition should start at 00000000 + expect(ranges[0].start).toBe("00000000-0000-4000-8000-000000000000"); + // Last partition should end at ffffffff + expect(ranges[99].end).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff"); + + // Each partition should have a unique range + const startValues = new Set(ranges.map((r) => r.start)); + expect(startValues.size).toBe(100); + }); + + it("should calculate correct hex values for partition boundaries", () => { + // Test specific calculations + const where = task.testPartitionWhereClause("id", { + partitionIndex: 1, + partitionCount: 16, // 16 partitions = 0x10000000 per partition + }) as any; + + // Partition 1 should be from 0x10000000 to 0x1fffffff + expect(where.id[Op.gte]).toBe("10000000-0000-4000-8000-000000000000"); + expect(where.id[Op.lte]).toBe("1fffffff-ffff-4fff-bfff-ffffffffffff"); + }); + + it("should ensure all UUIDs map to exactly one partition", () => { + const testUuids = [ + "00000000-0000-4000-8000-000000000000", // Min UUID + "12345678-9abc-4ef0-9234-567890abcdef", + "55555555-5555-4555-9555-555555555555", + "87654321-fedc-4a98-b654-321098765432", + "aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa", + "deadbeef-cafe-4abe-aeed-dec0debac1e5", + "ffffffff-ffff-4fff-bfff-ffffffffffff", // Max UUID + ]; + + const partitionCount = 3; + + for (const uuid of testUuids) { + let matchCount = 0; + let matchedPartition = -1; + + for (let i = 0; i < partitionCount; i++) { + const where = task.testPartitionWhereClause("id", { + partitionIndex: i, + partitionCount, + }) as any; + + const startUuid = where.id[Op.gte]; + const endUuid = where.id[Op.lte]; + + // Check if UUID falls within this partition's range + if (uuid >= startUuid && uuid <= endUuid) { + matchCount++; + matchedPartition = i; + } + } + + // Each UUID should match exactly one partition + expect(matchCount).toBe(1); + expect(matchedPartition).toBeGreaterThanOrEqual(0); + expect(matchedPartition).toBeLessThan(partitionCount); + } + }); + + it("should generate non-overlapping ranges for any partition count", () => { + const testCounts = [2, 3, 5, 7, 10, 16, 32]; + + for (const partitionCount of testCounts) { + const ranges: Array<{ start: string; end: string }> = []; + + // Get all partition ranges + for (let i = 0; i < partitionCount; i++) { + const where = task.testPartitionWhereClause("id", { + partitionIndex: i, + partitionCount, + }) as any; + ranges.push({ + start: where.id[Op.gte], + end: where.id[Op.lte], + }); + } + + // Verify no gaps between consecutive partitions + for (let i = 0; i < partitionCount - 1; i++) { + const currentEnd = ranges[i].end.substring(0, 8); + const nextStart = ranges[i + 1].start.substring(0, 8); + + const currentEndNum = parseInt(currentEnd, 16); + const nextStartNum = parseInt(nextStart, 16); + + // Next partition should start exactly where current ends + 1 + expect(nextStartNum).toBe(currentEndNum + 1); + } + + // Verify coverage of entire UUID space + expect(ranges[0].start).toBe("00000000-0000-4000-8000-000000000000"); + expect(ranges[partitionCount - 1].end).toBe( + "ffffffff-ffff-4fff-bfff-ffffffffffff" + ); + } + }); + }); +}); diff --git a/server/queues/tasks/base/CronTask.ts b/server/queues/tasks/base/CronTask.ts new file mode 100644 index 0000000000..37269cc1c2 --- /dev/null +++ b/server/queues/tasks/base/CronTask.ts @@ -0,0 +1,163 @@ +import { Op, WhereOptions } from "sequelize"; +import { BaseTask } from "./BaseTask"; + +export enum TaskInterval { + Day = "daily", + Hour = "hourly", +} + +export type TaskSchedule = { + /** The interval at which to run this task */ + interval: TaskInterval; + /** + * An optional time window (in milliseconds) over which to spread the START time + * of this task when triggered by cron. + * + * **Important**: This only delays when tasks START - it does NOT partition the work. + * To distribute work across multiple workers, tasks must also use the `partition` + * prop and implement partitioned queries using `getPartitionWhereClause()`. + * + * When set, each task gets a deterministic delay based on its name, ensuring + * consistent scheduling across runs and preventing all tasks from starting + * simultaneously. + * + * @example + * // Run hourly, but spread task start times over 10 minutes + * interval: TaskInterval.Hour, + * partitionWindow: 10 * Minute.ms // 10 minutes + * + * @example + * // Run daily, but spread task start times over 1 hour + * interval: TaskInterval.Day, + * partitionWindow: 60 * Minute.ms // 1 hour + */ + partitionWindow?: number; +}; + +/** + * Partition information for distributing work across multiple worker instances. + */ +export type PartitionInfo = { + /** + * The partition number for this task instance (0-based). + */ + partitionIndex: number; + /** + * The total number of partitions. + */ + partitionCount: number; +}; + +/** + * Properties for cron-scheduled tasks. + */ +export type Props = { + limit: number; + partition: PartitionInfo; +}; + +export abstract class CronTask extends BaseTask { + /** The schedule configuration for this cron task */ + public abstract get cron(): TaskSchedule; + + /** + * Optimized partitioning method for UUID primary keys using range-based distribution. + * Divides the UUID space into N equal ranges and assigns each partition a range. + * + * The UUID space (0x00000000-... to 0xffffffff-...) is divided into N equal ranges. + * For example, with 3 partitions: + * - Partition 0: '00000000-0000-4000-8000-000000000000' to '55555554-ffff-4fff-bfff-ffffffffffff' + * - Partition 1: '55555555-0000-4000-8000-000000000000' to 'aaaaaaaa-ffff-4fff-bfff-ffffffffffff' + * - Partition 2: 'aaaaaaab-0000-4000-8000-000000000000' to 'ffffffff-ffff-4fff-bfff-ffffffffffff' + * + * @param partitionInfo The partition information + * @returns The start and end UUID bounds for the partition + */ + protected getPartitionBounds( + partitionInfo: PartitionInfo | undefined + ): [string, string] { + if (!partitionInfo) { + return [ + "00000000-0000-4000-8000-000000000000", + "ffffffff-ffff-4fff-bfff-ffffffffffff", + ]; + } + + const { partitionIndex, partitionCount } = partitionInfo; + + if ( + partitionCount <= 0 || + partitionIndex < 0 || + partitionIndex >= partitionCount + ) { + throw new Error( + `Invalid partition info: index ${partitionIndex}, count ${partitionCount}` + ); + } + + // 2^32 total possible values for the first 32 bits (4.3 billion) + const TOTAL_VALUES = 0x100000000; + + // The maximum possible integer value (0xFFFFFFFF) + const MAX_VALUE = TOTAL_VALUES - 1; + + // Ensure even distribution of values by calculating exact range size + const rangeSize = Math.floor(TOTAL_VALUES / partitionCount); + const rangeStart = partitionIndex * rangeSize; + + let rangeEnd: number; + if (partitionIndex === partitionCount - 1) { + // The last partition takes any remainder and goes up to the max value + rangeEnd = MAX_VALUE; + } else { + // The end is the start of the *next* partition minus 1 + rangeEnd = (partitionIndex + 1) * rangeSize - 1; + } + + // Use Number.prototype.toString(16) and padStart(8, '0') for the 32-bit hex prefix + const startHex = rangeStart.toString(16).padStart(8, "0"); + const endHex = rangeEnd.toString(16).padStart(8, "0"); + + // Start: First 32 bits (prefix) followed by the lowest possible values for the rest + // Ensures correct UUID v4 version (4xxx) and variant (8|9|a|bxxx) bits + const startUuid = `${startHex}-0000-4000-8000-000000000000`; + + // End: First 32 bits (prefix) followed by the highest possible values for the rest + // Ensures correct UUID v4 version (4xxx) and variant (8|9|a|bxxx) bits + const endUuid = `${endHex}-ffff-4fff-bfff-ffffffffffff`; + + return [startUuid, endUuid]; + } + + /** + * Optimized partitioning method for UUID primary keys using range-based distribution. + * Divides the UUID space into N equal ranges and assigns each partition a range. + * + * @param idField The name of the UUID primary key field to partition on + * @param partitionInfo The partition information + * @returns A WHERE clause for partitioned queries + * + * @example + * const where = { + * deletedAt: { [Op.lt]: someDate }, + * ...this.getPartitionWhereClause("id", props.partition) + * }; + */ + protected getPartitionWhereClause( + idField: string, + partitionInfo: PartitionInfo | undefined + ): WhereOptions { + if (!partitionInfo) { + return {}; + } + + const [startUuid, endUuid] = this.getPartitionBounds(partitionInfo); + + return { + [idField]: { + [Op.gte]: startUuid, + [Op.lte]: endUuid, + }, + }; + } +} diff --git a/server/queues/tasks/index.ts b/server/queues/tasks/index.ts index b59e30343b..aac559a9fd 100644 --- a/server/queues/tasks/index.ts +++ b/server/queues/tasks/index.ts @@ -1,6 +1,6 @@ import { Hook, PluginManager } from "@server/utils/PluginManager"; import { requireDirectory } from "@server/utils/fs"; -import BaseTask from "./BaseTask"; +import { BaseTask } from "./base/BaseTask"; const tasks: Record = {}; diff --git a/server/routes/api/cron/cron.ts b/server/routes/api/cron/cron.ts index d7925a8be9..f0516c4a54 100644 --- a/server/routes/api/cron/cron.ts +++ b/server/routes/api/cron/cron.ts @@ -1,24 +1,24 @@ import Router from "koa-router"; import env from "@server/env"; import { AuthenticationError } from "@server/errors"; +import Logger from "@server/logging/Logger"; import validate from "@server/middlewares/validate"; import tasks from "@server/queues/tasks"; -import { TaskSchedule } from "@server/queues/tasks/BaseTask"; +import { CronTask, TaskInterval } from "@server/queues/tasks/base/CronTask"; import { APIContext } from "@server/types"; import { safeEqual } from "@server/utils/crypto"; import * as T from "./schema"; +import { Minute } from "@shared/utils/time"; const router = new Router(); - -/** Whether the minutely cron job has been received */ -const receivedPeriods = new Set(); +const receivedPeriods = new Set(); const cronHandler = async (ctx: APIContext) => { - const period = Object.values(TaskSchedule).includes( - ctx.params.period as TaskSchedule + const period = Object.values(TaskInterval).includes( + ctx.params.period as TaskInterval ) - ? (ctx.params.period as TaskSchedule) - : TaskSchedule.Day; + ? (ctx.params.period as TaskInterval) + : TaskInterval.Day; const token = (ctx.input.body.token ?? ctx.input.query.token) as string; const limit = ctx.input.body.limit ?? ctx.input.query.limit; @@ -30,26 +30,54 @@ const cronHandler = async (ctx: APIContext) => { for (const name in tasks) { const TaskClass = tasks[name]; - if (TaskClass.cron === period) { - // @ts-expect-error We won't instantiate an abstract class - await new TaskClass().schedule({ limit }); + if (!(TaskClass.prototype instanceof CronTask)) { + continue; + } + // @ts-expect-error We won't instantiate an abstract class + const taskInstance = new TaskClass() as CronTask; + + const cronConfig = taskInstance.cron; + const partitionWindow = cronConfig.partitionWindow; + const shouldSchedule = + cronConfig.interval === period || // Backwards compatibility for installations that have not set up // cron jobs periods other than daily. - } else if ( - TaskClass.cron === TaskSchedule.Minute && - !receivedPeriods.has(TaskSchedule.Minute) && - (period === TaskSchedule.Hour || period === TaskSchedule.Day) - ) { - // @ts-expect-error We won't instantiate an abstract class - await new TaskClass().schedule({ limit }); - } else if ( - TaskClass.cron === TaskSchedule.Hour && - !receivedPeriods.has(TaskSchedule.Hour) && - period === TaskSchedule.Day - ) { - // @ts-expect-error We won't instantiate an abstract class - await new TaskClass().schedule({ limit }); + (cronConfig.interval === TaskInterval.Hour && + !receivedPeriods.has(TaskInterval.Hour) && + period === TaskInterval.Day); + + if (shouldSchedule) { + if (partitionWindow && partitionWindow > 0) { + // Split the task into partitions to spread work across time window + // by dividing the partitionWindow into minutes and scheduling a delayed + // task for each minute. + const partitions = Math.ceil(partitionWindow / Minute.ms); + for (let i = 0; i < partitions; i++) { + const delay = Math.floor((partitionWindow / partitions) * i); + const partition = { + partitionIndex: i, + partitionCount: partitions, + }; + + Logger.debug( + "task", + `Scheduling partitioned task ${name} (partition ${ + i + 1 + }/${partitions}) with delay of ${delay / 1000}s` + ); + + await taskInstance.schedule({ limit, partition }, { delay }); + } + } else { + await taskInstance.schedule({ + limit, + partition: { + partitionIndex: 0, + partitionCount: 1, + }, + }); + } } } @@ -57,7 +85,6 @@ const cronHandler = async (ctx: APIContext) => { success: true, }; }; - router.get("cron.:period", validate(T.CronSchema), cronHandler); router.post("cron.:period", validate(T.CronSchema), cronHandler); diff --git a/server/services/cron.ts b/server/services/cron.ts index 7f43f229e8..a3a176924e 100644 --- a/server/services/cron.ts +++ b/server/services/cron.ts @@ -1,27 +1,36 @@ -import { Day, Hour, Minute, Second } from "@shared/utils/time"; +import { Day, Hour, Second } from "@shared/utils/time"; import tasks from "@server/queues/tasks"; -import { TaskSchedule } from "@server/queues/tasks/BaseTask"; +import { CronTask, TaskInterval } from "@server/queues/tasks/base/CronTask"; export default function init() { - async function run(schedule: TaskSchedule) { + async function run(schedule: TaskInterval) { + const partition = { + partitionIndex: 0, + partitionCount: 1, + }; + for (const name in tasks) { const TaskClass = tasks[name]; - if (TaskClass.cron === schedule) { - // @ts-expect-error We won't instantiate an abstract class - await new TaskClass().schedule({ limit: 10000 }); + if (!(TaskClass.prototype instanceof CronTask)) { + continue; + } + + // @ts-expect-error We won't instantiate an abstract class + const taskInstance = new TaskClass() as CronTask; + + if (taskInstance.cron.interval === schedule) { + await taskInstance.schedule({ limit: 10000, partition }); } } } - setInterval(() => void run(TaskSchedule.Day), Day.ms); - setInterval(() => void run(TaskSchedule.Hour), Hour.ms); - setInterval(() => void run(TaskSchedule.Minute), Minute.ms); + setInterval(() => void run(TaskInterval.Day), Day.ms); + setInterval(() => void run(TaskInterval.Hour), Hour.ms); // Just give everything time to startup before running the first time. Not // _technically_ required to function. setTimeout(() => { - void run(TaskSchedule.Day); - void run(TaskSchedule.Hour); - void run(TaskSchedule.Minute); + void run(TaskInterval.Day); + void run(TaskInterval.Hour); }, 5 * Second.ms); } diff --git a/server/utils/PluginManager.ts b/server/utils/PluginManager.ts index c3f0df7872..a50f281022 100644 --- a/server/utils/PluginManager.ts +++ b/server/utils/PluginManager.ts @@ -7,7 +7,7 @@ import type BaseEmail from "@server/emails/templates/BaseEmail"; import env from "@server/env"; import Logger from "@server/logging/Logger"; import type BaseProcessor from "@server/queues/processors/BaseProcessor"; -import type BaseTask from "@server/queues/tasks/BaseTask"; +import type { BaseTask } from "@server/queues/tasks/base/BaseTask"; import { UnfurlSignature, UninstallSignature } from "@server/types"; import { BaseIssueProvider } from "./BaseIssueProvider";