chore: Add cron task partitioning (#10736)

* wip

* Implementation complete

* tidying

* test

* Address feedback

* Remove duplicative retry logic from UpdateDocumentsPopularityScoreTask.
Now that we're split across many runs this is not neccessary

* Refactor to subclass, config to instance

* Refactor BaseTask to named export

* fix: Missing partition

* tsc

* Feedback
This commit is contained in:
Tom Moor
2025-11-27 16:57:52 +01:00
committed by GitHub
parent 4212e0e8d4
commit 42959d66db
59 changed files with 811 additions and 261 deletions

View File

@@ -1,5 +1,5 @@
import { IntegrationService } from "@shared/types";
import BaseTask from "@server/queues/tasks/BaseTask";
import { BaseTask } from "@server/queues/tasks/base/BaseTask";
import { Hook, PluginManager } from "@server/utils/PluginManager";
type Props = {

View File

@@ -1,7 +1,7 @@
import { IntegrationService, IntegrationType } from "@shared/types";
import { Integration } from "@server/models";
import { Buckets } from "@server/models/helpers/AttachmentHelper";
import BaseTask, { TaskPriority } from "@server/queues/tasks/BaseTask";
import { BaseTask, TaskPriority } from "@server/queues/tasks/base/BaseTask";
import FileStorage from "@server/storage/files";
type Props = {

View File

@@ -21,7 +21,13 @@ describe("CleanupWebookDeliveriesTask", () => {
});
const task = new CleanupWebhookDeliveriesTask();
await task.perform();
await task.perform({
limit: 100,
partition: {
partitionIndex: 0,
partitionCount: 1,
},
});
expect(await deliveryExists(brandNewWebhookDelivery)).toBe(true);
expect(await deliveryExists(newishWebhookDelivery)).toBe(true);

View File

@@ -2,28 +2,35 @@ import { subDays } from "date-fns";
import { Op } from "sequelize";
import Logger from "@server/logging/Logger";
import { WebhookDelivery } from "@server/models";
import BaseTask, {
TaskPriority,
TaskSchedule,
} from "@server/queues/tasks/BaseTask";
import { TaskPriority } from "@server/queues/tasks/base/BaseTask";
import {
CronTask,
TaskInterval,
Props,
} from "@server/queues/tasks/base/CronTask";
import { Hour } from "@shared/utils/time";
type Props = Record<string, never>;
export default class CleanupWebhookDeliveriesTask extends BaseTask<Props> {
static cron = TaskSchedule.Day;
public async perform() {
export default class CleanupWebhookDeliveriesTask extends CronTask {
public async perform({ partition }: Props) {
Logger.info("task", `Deleting WebhookDeliveries older than one week…`);
const count = await WebhookDelivery.unscoped().destroy({
where: {
createdAt: {
[Op.lt]: subDays(new Date(), 7),
},
...this.getPartitionWhereClause("id", partition),
},
});
Logger.info("task", `${count} old WebhookDeliveries deleted.`);
}
public get cron() {
return {
interval: TaskInterval.Day,
partitionWindow: Hour.ms,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -42,7 +42,7 @@ import {
presentGroupMembership,
presentComment,
} from "@server/presenters";
import BaseTask from "@server/queues/tasks/BaseTask";
import { BaseTask } from "@server/queues/tasks/base/BaseTask";
import {
CollectionEvent,
CollectionGroupEvent,

View File

@@ -16,7 +16,7 @@ import HTMLHelper from "@server/models/helpers/HTMLHelper";
import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper";
import { TextHelper } from "@server/models/helpers/TextHelper";
import { taskQueue } from "@server/queues";
import { TaskPriority } from "@server/queues/tasks/BaseTask";
import { TaskPriority } from "@server/queues/tasks/base/BaseTask";
import { NotificationMetadata } from "@server/types";
export enum EmailMessageCategory {

View File

@@ -23,7 +23,7 @@ import AttachmentHelper from "@server/models/helpers/AttachmentHelper";
import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper";
import { sequelize } from "@server/storage/database";
import { PagePerImportTask } from "../processors/ImportsProcessor";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import UploadAttachmentsForImportTask from "./UploadAttachmentsForImportTask";
export type ProcessOutput<T extends ImportableIntegrationService> = {

View File

@@ -1,7 +1,7 @@
import { Integration } from "@server/models";
import { sequelize } from "@server/storage/database";
import { Hook, PluginManager } from "@server/utils/PluginManager";
import BaseTask from "./BaseTask";
import { BaseTask } from "./base/BaseTask";
type Props = {
integrationId: string;

View File

@@ -3,6 +3,14 @@ import { Document } from "@server/models";
import { buildDocument, buildTeam } from "@server/test/factories";
import CleanupDeletedDocumentsTask from "./CleanupDeletedDocumentsTask";
const props = {
limit: 100,
partition: {
partitionIndex: 0,
partitionCount: 1,
},
};
describe("CleanupDeletedDocumentsTask", () => {
it("should not destroy documents not deleted", async () => {
const team = await buildTeam();
@@ -12,7 +20,7 @@ describe("CleanupDeletedDocumentsTask", () => {
});
const task = new CleanupDeletedDocumentsTask();
await task.perform({ limit: 100 });
await task.perform(props);
expect(
await Document.unscoped().count({
@@ -33,7 +41,7 @@ describe("CleanupDeletedDocumentsTask", () => {
});
const task = new CleanupDeletedDocumentsTask();
await task.perform({ limit: 100 });
await task.perform(props);
expect(
await Document.unscoped().count({
@@ -54,7 +62,7 @@ describe("CleanupDeletedDocumentsTask", () => {
});
const task = new CleanupDeletedDocumentsTask();
await task.perform({ limit: 100 });
await task.perform(props);
expect(
await Document.unscoped().count({
@@ -75,7 +83,7 @@ describe("CleanupDeletedDocumentsTask", () => {
});
const task = new CleanupDeletedDocumentsTask();
await task.perform({ limit: 100 });
await task.perform(props);
expect(
await Document.unscoped().count({

View File

@@ -3,16 +3,12 @@ import { Op } from "sequelize";
import documentPermanentDeleter from "@server/commands/documentPermanentDeleter";
import Logger from "@server/logging/Logger";
import { Document } from "@server/models";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
import { TaskPriority } from "./base/BaseTask";
import { Minute } from "@shared/utils/time";
import { CronTask, Props, TaskInterval } from "./base/CronTask";
type Props = {
limit: number;
};
export default class CleanupDeletedDocumentsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
public async perform({ limit }: Props) {
export default class CleanupDeletedDocumentsTask extends CronTask {
public async perform({ limit, partition }: Props) {
Logger.info(
"task",
`Permanently destroying upto ${limit} documents older than 30 days…`
@@ -25,6 +21,7 @@ export default class CleanupDeletedDocumentsTask extends BaseTask<Props> {
deletedAt: {
[Op.lt]: subDays(new Date(), 30),
},
...this.getPartitionWhereClause("id", partition),
},
paranoid: false,
limit,
@@ -39,4 +36,11 @@ export default class CleanupDeletedDocumentsTask extends BaseTask<Props> {
priority: TaskPriority.Background,
};
}
public get cron() {
return {
interval: TaskInterval.Hour,
partitionWindow: 15 * Minute.ms,
};
}
}

View File

@@ -1,6 +1,6 @@
import teamPermanentDeleter from "@server/commands/teamPermanentDeleter";
import { Team } from "@server/models";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
type Props = {
/** The team ID to permanantly destroy */

View File

@@ -1,18 +1,14 @@
import { subDays } from "date-fns";
import { Op } from "sequelize";
import { Minute } from "@shared/utils/time";
import Logger from "@server/logging/Logger";
import { Team } from "@server/models";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
import { TaskPriority } from "./base/BaseTask";
import CleanupDeletedTeamTask from "./CleanupDeletedTeamTask";
import { CronTask, TaskInterval, Props } from "./base/CronTask";
type Props = {
limit: number;
};
export default class CleanupDeletedTeamsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
public async perform({ limit }: Props) {
export default class CleanupDeletedTeamsTask extends CronTask {
public async perform({ limit, partition }: Props) {
Logger.info(
"task",
`Permanently destroying upto ${limit} teams older than 30 days…`
@@ -23,6 +19,7 @@ export default class CleanupDeletedTeamsTask extends BaseTask<Props> {
deletedAt: {
[Op.lt]: subDays(new Date(), 30),
},
...this.getPartitionWhereClause("id", partition),
},
paranoid: false,
limit,
@@ -35,6 +32,13 @@ export default class CleanupDeletedTeamsTask extends BaseTask<Props> {
}
}
public get cron() {
return {
interval: TaskInterval.Hour,
partitionWindow: 15 * Minute.ms,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -2,7 +2,7 @@ import Logger from "@server/logging/Logger";
import { WebhookSubscription, ApiKey, User } from "@server/models";
import { cannot } from "@server/policies";
import { sequelize } from "@server/storage/database";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
type Props = {
userId: string;

View File

@@ -1,15 +1,10 @@
import { Op } from "sequelize";
import Logger from "@server/logging/Logger";
import { Attachment } from "@server/models";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = {
limit: number;
};
export default class CleanupExpiredAttachmentsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
import { TaskPriority } from "./base/BaseTask";
import { CronTask, Props, TaskInterval } from "./base/CronTask";
export default class CleanupExpiredAttachmentsTask extends CronTask {
public async perform({ limit }: Props) {
Logger.info("task", `Deleting expired attachments…`);
const attachments = await Attachment.unscoped().findAll({
@@ -24,6 +19,12 @@ export default class CleanupExpiredAttachmentsTask extends BaseTask<Props> {
Logger.info("task", `Removed ${attachments.length} attachments`);
}
public get cron() {
return {
interval: TaskInterval.Hour,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -4,6 +4,14 @@ import { FileOperation } from "@server/models";
import { buildFileOperation, buildTeam } from "@server/test/factories";
import CleanupExpiredFileOperationsTask from "./CleanupExpiredFileOperationsTask";
const props = {
limit: 100,
partition: {
partitionIndex: 0,
partitionCount: 1,
},
};
describe("CleanupExpiredFileOperationsTask", () => {
it("should expire exports older than 15 days ago", async () => {
const team = await buildTeam();
@@ -20,7 +28,7 @@ describe("CleanupExpiredFileOperationsTask", () => {
});
const task = new CleanupExpiredFileOperationsTask();
await task.perform({ limit: 100 });
await task.perform(props);
const data = await FileOperation.count({
where: {
@@ -47,7 +55,7 @@ describe("CleanupExpiredFileOperationsTask", () => {
});
const task = new CleanupExpiredFileOperationsTask();
await task.perform({ limit: 100 });
await task.perform(props);
const data = await FileOperation.count({
where: {

View File

@@ -3,15 +3,10 @@ import { Op } from "sequelize";
import { FileOperationState } from "@shared/types";
import Logger from "@server/logging/Logger";
import { FileOperation } from "@server/models";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = {
limit: number;
};
export default class CleanupExpiredFileOperationsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
import { TaskPriority } from "./base/BaseTask";
import { CronTask, Props, TaskInterval } from "./base/CronTask";
export default class CleanupExpiredFileOperationsTask extends CronTask {
public async perform({ limit }: Props) {
Logger.info("task", `Expiring file operations older than 15 days…`);
const fileOperations = await FileOperation.unscoped().findAll({
@@ -31,6 +26,12 @@ export default class CleanupExpiredFileOperationsTask extends BaseTask<Props> {
Logger.info("task", `Expired ${fileOperations.length} file operations`);
}
public get cron() {
return {
interval: TaskInterval.Hour,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -2,13 +2,10 @@ import { subMonths } from "date-fns";
import { Op } from "sequelize";
import Logger from "@server/logging/Logger";
import { OAuthAuthorizationCode } from "@server/models";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = Record<string, never>;
export default class CleanupOAuthAuthorizationCodeTask extends BaseTask<Props> {
static cron = TaskSchedule.Day;
import { TaskPriority } from "./base/BaseTask";
import { CronTask, TaskInterval } from "./base/CronTask";
export default class CleanupOAuthAuthorizationCodeTask extends CronTask {
public async perform() {
Logger.info(
"task",
@@ -24,6 +21,12 @@ export default class CleanupOAuthAuthorizationCodeTask extends BaseTask<Props> {
Logger.info("task", `${count} expired OAuth authorization codes deleted.`);
}
public get cron() {
return {
interval: TaskInterval.Day,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -2,17 +2,12 @@ import { subDays } from "date-fns";
import { Op } from "sequelize";
import Logger from "@server/logging/Logger";
import { Event } from "@server/models";
import BaseTask, {
TaskPriority,
TaskSchedule,
} from "@server/queues/tasks/BaseTask";
import { TaskPriority } from "./base/BaseTask";
import { CronTask, Props, TaskInterval } from "./base/CronTask";
import { Minute } from "@shared/utils/time";
type Props = Record<string, never>;
export default class CleanupOldEventsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
public async perform() {
export default class CleanupOldEventsTask extends CronTask {
public async perform({ partition }: Props) {
// TODO: Hardcoded right now, configurable later
const retentionDays = 365;
const cutoffDate = subDays(new Date(), retentionDays);
@@ -27,6 +22,7 @@ export default class CleanupOldEventsTask extends BaseTask<Props> {
createdAt: {
[Op.lt]: cutoffDate,
},
...this.getPartitionWhereClause("id", partition),
},
batchLimit: 1000,
totalLimit: maxEventsPerTask,
@@ -51,6 +47,13 @@ export default class CleanupOldEventsTask extends BaseTask<Props> {
}
}
public get cron() {
return {
interval: TaskInterval.Hour,
partitionWindow: 15 * Minute.ms,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -3,16 +3,13 @@ import { Op } from "sequelize";
import { ImportState } from "@shared/types";
import Logger from "@server/logging/Logger";
import { Import, ImportTask } from "@server/models";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = Record<string, never>;
import { TaskPriority } from "./base/BaseTask";
import { CronTask, TaskInterval } from "./base/CronTask";
/**
* A task that deletes the import_tasks for old imports which are completed, errored (or) canceled.
*/
export default class CleanupOldImportsTask extends BaseTask<Props> {
static cron = TaskSchedule.Day;
export default class CleanupOldImportsTask extends CronTask {
public async perform() {
// TODO: Hardcoded right now, configurable later
const retentionDays = 1;
@@ -76,6 +73,12 @@ export default class CleanupOldImportsTask extends BaseTask<Props> {
}
}
public get cron() {
return {
interval: TaskInterval.Day,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -2,14 +2,12 @@ import { subMonths } from "date-fns";
import { Op } from "sequelize";
import Logger from "@server/logging/Logger";
import { Notification } from "@server/models";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
import { TaskPriority } from "./base/BaseTask";
import { Minute } from "@shared/utils/time";
import { CronTask, Props, TaskInterval } from "./base/CronTask";
type Props = Record<string, never>;
export default class CleanupOldNotificationsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
public async perform() {
export default class CleanupOldNotificationsTask extends CronTask {
public async perform({ partition }: Props) {
Logger.info("task", `Permanently destroying old notifications…`);
let count;
@@ -18,6 +16,7 @@ export default class CleanupOldNotificationsTask extends BaseTask<Props> {
createdAt: {
[Op.lt]: subMonths(new Date(), 12),
},
...this.getPartitionWhereClause("id", partition),
},
});
@@ -34,6 +33,7 @@ export default class CleanupOldNotificationsTask extends BaseTask<Props> {
createdAt: {
[Op.lt]: subMonths(new Date(), 6),
},
...this.getPartitionWhereClause("id", partition),
},
});
@@ -43,6 +43,13 @@ export default class CleanupOldNotificationsTask extends BaseTask<Props> {
);
}
public get cron() {
return {
interval: TaskInterval.Hour,
partitionWindow: 15 * Minute.ms,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -1,7 +1,7 @@
import { NotificationEventType } from "@shared/types";
import { Notification, User } from "@server/models";
import { CollectionUserEvent } from "@server/types";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
export default class CollectionAddUserNotificationsTask extends BaseTask<CollectionUserEvent> {
public async perform(event: CollectionUserEvent) {

View File

@@ -2,7 +2,7 @@ import { NotificationEventType } from "@shared/types";
import { Collection, Notification } from "@server/models";
import NotificationHelper from "@server/models/helpers/NotificationHelper";
import { CollectionEvent } from "@server/types";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
export default class CollectionCreatedNotificationsTask extends BaseTask<CollectionEvent> {
public async perform(event: CollectionEvent) {

View File

@@ -6,7 +6,7 @@ import { Collection, Subscription, User } from "@server/models";
import { can } from "@server/policies";
import { sequelize } from "@server/storage/database";
import { CollectionUserEvent } from "@server/types";
import BaseTask from "./BaseTask";
import { BaseTask } from "./base/BaseTask";
export default class CollectionSubscriptionRemoveUserTask extends BaseTask<CollectionUserEvent> {
public async perform(event: CollectionUserEvent) {

View File

@@ -18,7 +18,7 @@ import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper";
import { sequelize } from "@server/storage/database";
import { CommentEvent } from "@server/types";
import { canUserAccessDocument } from "@server/utils/permissions";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
export default class CommentCreatedNotificationsTask extends BaseTask<CommentEvent> {
public async perform(event: CommentEvent) {

View File

@@ -12,7 +12,7 @@ import {
import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper";
import { CommentEvent, CommentUpdateEvent } from "@server/types";
import { canUserAccessDocument } from "@server/utils/permissions";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
export default class CommentUpdatedNotificationsTask extends BaseTask<CommentEvent> {
public async perform(event: CommentUpdateEvent) {

View File

@@ -1,5 +1,5 @@
import { Attachment } from "@server/models";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
type Props = {
teamId: string;

View File

@@ -2,7 +2,7 @@ import { Op } from "sequelize";
import documentMover from "@server/commands/documentMover";
import { Collection, Document, User } from "@server/models";
import { sequelize } from "@server/storage/database";
import BaseTask from "./BaseTask";
import { BaseTask } from "./base/BaseTask";
import { createContext } from "@server/context";
type Props = {

View File

@@ -1,7 +1,7 @@
import { Op } from "sequelize";
import { GroupUser } from "@server/models";
import { DocumentGroupEvent, DocumentUserEvent } from "@server/types";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import DocumentAddUserNotificationsTask from "./DocumentAddUserNotificationsTask";
export default class DocumentAddGroupNotificationsTask extends BaseTask<DocumentGroupEvent> {

View File

@@ -3,7 +3,7 @@ import Logger from "@server/logging/Logger";
import { Notification, User } from "@server/models";
import { DocumentUserEvent } from "@server/types";
import { isElevatedPermission } from "@server/utils/permissions";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
export default class DocumentAddUserNotificationsTask extends BaseTask<DocumentUserEvent> {
public async perform(event: DocumentUserEvent) {

View File

@@ -5,7 +5,7 @@ import { createContext } from "@server/context";
import { User } from "@server/models";
import { sequelize } from "@server/storage/database";
import FileStorage from "@server/storage/files";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
type Props = {
userId: string;

View File

@@ -5,7 +5,7 @@ import { DocumentHelper } from "@server/models/helpers/DocumentHelper";
import NotificationHelper from "@server/models/helpers/NotificationHelper";
import { DocumentEvent } from "@server/types";
import { canUserAccessDocument } from "@server/utils/permissions";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
export default class DocumentPublishedNotificationsTask extends BaseTask<DocumentEvent> {
public async perform(event: DocumentEvent) {

View File

@@ -6,7 +6,7 @@ import { Document, Subscription, User } from "@server/models";
import { can } from "@server/policies";
import { sequelize } from "@server/storage/database";
import { DocumentUserEvent } from "@server/types";
import BaseTask from "./BaseTask";
import { BaseTask } from "./base/BaseTask";
export default class DocumentSubscriptionRemoveUserTask extends BaseTask<DocumentUserEvent> {
public async perform(event: DocumentUserEvent) {

View File

@@ -5,7 +5,7 @@ import { schema, serializer } from "@server/editor";
import { Document } from "@server/models";
import { DocumentEvent } from "@server/types";
import { DocumentHelper } from "@server/models/helpers/DocumentHelper";
import BaseTask from "./BaseTask";
import { BaseTask } from "./base/BaseTask";
export default class DocumentUpdateTextTask extends BaseTask<DocumentEvent> {
public async perform(event: DocumentEvent) {

View File

@@ -1,5 +1,5 @@
import emails from "@server/emails/templates";
import BaseTask from "./BaseTask";
import { BaseTask } from "./base/BaseTask";
type Props = {
templateName: string;

View File

@@ -1,7 +1,7 @@
import { Op } from "sequelize";
import documentPermanentDeleter from "@server/commands/documentPermanentDeleter";
import { Document } from "@server/models";
import BaseTask from "./BaseTask";
import { BaseTask } from "./base/BaseTask";
type Props = {
documentIds: string[];

View File

@@ -4,6 +4,14 @@ import { FileOperation } from "@server/models";
import { buildFileOperation, buildTeam } from "@server/test/factories";
import ErrorTimedOutFileOperationsTask from "./ErrorTimedOutFileOperationsTask";
const props = {
limit: 100,
partition: {
partitionIndex: 0,
partitionCount: 1,
},
};
describe("ErrorTimedOutFileOperationsTask", () => {
it("should error exports older than 12 hours", async () => {
const team = await buildTeam();
@@ -20,7 +28,7 @@ describe("ErrorTimedOutFileOperationsTask", () => {
});
const task = new ErrorTimedOutFileOperationsTask();
await task.perform({ limit: 100 });
await task.perform(props);
const data = await FileOperation.count({
where: {
@@ -41,7 +49,7 @@ describe("ErrorTimedOutFileOperationsTask", () => {
});
const task = new ErrorTimedOutFileOperationsTask();
await task.perform({ limit: 100 });
await task.perform(props);
const data = await FileOperation.count({
where: {

View File

@@ -3,15 +3,10 @@ import { Op } from "sequelize";
import { FileOperationState } from "@shared/types";
import Logger from "@server/logging/Logger";
import { FileOperation } from "@server/models";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = {
limit: number;
};
export default class ErrorTimedOutFileOperationsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
import { TaskPriority } from "./base/BaseTask";
import { CronTask, TaskInterval, Props } from "./base/CronTask";
export default class ErrorTimedOutFileOperationsTask extends CronTask {
public async perform({ limit }: Props) {
Logger.info("task", `Error file operations running longer than 12 hours…`);
const fileOperations = await FileOperation.unscoped().findAll({
@@ -40,6 +35,12 @@ export default class ErrorTimedOutFileOperationsTask extends BaseTask<Props> {
Logger.info("task", `Updated ${fileOperations.length} file operations`);
}
public get cron() {
return {
interval: TaskInterval.Hour,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -4,18 +4,13 @@ import { ImportState, ImportTaskState } from "@shared/types";
import Logger from "@server/logging/Logger";
import { Import, ImportTask } from "@server/models";
import { sequelize } from "@server/storage/database";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = {
limit: number;
};
import { TaskPriority } from "./base/BaseTask";
import { CronTask, TaskInterval, Props } from "./base/CronTask";
/**
* A task that moves the stuck imports to errored state.
*/
export default class ErrorTimedOutImportsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
export default class ErrorTimedOutImportsTask extends CronTask {
public async perform({ limit }: Props) {
// TODO: Hardcoded right now, configurable later
const thresholdHours = 24;
@@ -82,6 +77,12 @@ export default class ErrorTimedOutImportsTask extends BaseTask<Props> {
}
}
public get cron() {
return {
interval: TaskInterval.Hour,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -17,7 +17,7 @@ import {
} from "@server/models";
import fileOperationPresenter from "@server/presenters/fileOperation";
import FileStorage from "@server/storage/files";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import { Op } from "sequelize";
import { WhereOptions } from "sequelize";
import { sequelizeReadOnly } from "@server/storage/database";

View File

@@ -28,7 +28,7 @@ import {
import { sequelize } from "@server/storage/database";
import ZipHelper from "@server/utils/ZipHelper";
import { generateUrlId } from "@server/utils/url";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import env from "@server/env";
type Props = {

View File

@@ -4,13 +4,10 @@ import InviteReminderEmail from "@server/emails/templates/InviteReminderEmail";
import { User } from "@server/models";
import { UserFlag } from "@server/models/User";
import { sequelize } from "@server/storage/database";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = Record<string, never>;
export default class InviteReminderTask extends BaseTask<Props> {
static cron = TaskSchedule.Day;
import { TaskPriority } from "./base/BaseTask";
import { CronTask, TaskInterval } from "./base/CronTask";
export default class InviteReminderTask extends CronTask {
public async perform() {
const users = await User.scope("invited").findAll({
attributes: ["id"],
@@ -58,6 +55,12 @@ export default class InviteReminderTask extends BaseTask<Props> {
}
}
public get cron() {
return {
interval: TaskInterval.Day,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -2,7 +2,7 @@ import { NotificationEventType } from "@shared/types";
import { Comment, Document, Notification, User } from "@server/models";
import { CommentReactionEvent } from "@server/types";
import { canUserAccessDocument } from "@server/utils/permissions";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
export default class ReactionCreatedNotificationsTask extends BaseTask<CommentReactionEvent> {
public async perform(event: CommentReactionEvent) {

View File

@@ -1,7 +1,7 @@
import { NotificationEventType } from "@shared/types";
import { Notification, User } from "@server/models";
import { CommentReactionEvent } from "@server/types";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import { createContext } from "@server/context";
import { sequelize } from "@server/storage/database";
import { Op } from "sequelize";

View File

@@ -17,7 +17,7 @@ import { DocumentHelper } from "@server/models/helpers/DocumentHelper";
import NotificationHelper from "@server/models/helpers/NotificationHelper";
import { RevisionEvent } from "@server/types";
import { canUserAccessDocument } from "@server/utils/permissions";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
export default class RevisionCreatedNotificationsTask extends BaseTask<RevisionEvent> {
public async perform(event: RevisionEvent) {

View File

@@ -1,13 +1,12 @@
import crypto from "crypto";
import { subWeeks } from "date-fns";
import { QueryTypes } from "sequelize";
import { Minute } from "@shared/utils/time";
import env from "@server/env";
import Logger from "@server/logging/Logger";
import BaseTask, { TaskSchedule } from "./BaseTask";
import { TaskPriority } from "./base/BaseTask";
import { CronTask, PartitionInfo, Props, TaskInterval } from "./base/CronTask";
import { sequelize, sequelizeReadOnly } from "@server/storage/database";
import { sleep } from "@server/utils/timers";
type Props = Record<string, never>;
/**
* Number of hours to add to age to smooth the decay curve,
@@ -26,15 +25,10 @@ const ACTIVITY_WEIGHTS = {
};
/**
* Batch size for processing updates - kept small to minimize lock duration
* Batch size for processing updates - kept small to minimize query duration
*/
const BATCH_SIZE = 50;
/**
* Maximum retries for failed batch operations
*/
const MAX_RETRIES = 2;
/**
* Statement timeout for individual queries to prevent runaway locks
*/
@@ -50,16 +44,13 @@ interface DocumentScore {
score: number;
}
export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props> {
export default class UpdateDocumentsPopularityScoreTask extends CronTask {
/**
* Unique table name for this task run to prevent conflicts with concurrent runs
*/
private workingTable: string = "";
static cron = TaskSchedule.Hour;
public async perform() {
Logger.info("task", "Updating document popularity scores…");
public async perform({ partition }: Props) {
// Only run every 6 hours (at hours 0, 6, 12, 18)
const currentHour = new Date().getHours();
if (currentHour % 6 !== 0) {
@@ -79,7 +70,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
try {
// Setup: Create working table and populate with active document IDs
await this.setupWorkingTable(threshold);
await this.setupWorkingTable(threshold, partition);
const activeCount = await this.getWorkingTableCount();
@@ -107,16 +98,13 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
batchNumber++;
try {
const updated = await this.processBatchWithRetry(threshold, now);
const updated = await this.processBatch(threshold, now);
totalUpdated += updated;
Logger.debug(
"task",
`Batch ${batchNumber}: updated ${updated} documents, ${remaining - updated} remaining`
);
// Add delay between batches to reduce database contention
await sleep(10);
} catch (error) {
totalErrors++;
Logger.error(`Batch ${batchNumber} failed after retries`, error);
@@ -144,7 +132,10 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
* that have recent activity. Unlogged tables are faster because they
* skip WAL logging, and data loss on crash is acceptable here.
*/
private async setupWorkingTable(threshold: Date): Promise<void> {
private async setupWorkingTable(
threshold: Date,
partition: PartitionInfo
): Promise<void> {
// Drop any existing table first to avoid type conflicts from previous crashed runs
await sequelize.query(`DROP TABLE IF EXISTS ${this.workingTable} CASCADE`);
@@ -156,6 +147,8 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
)
`);
const [startUuid, endUuid] = this.getPartitionBounds(partition);
// Populate with documents that have recent activity and are valid
// (published, not deleted). Using JOINs to filter upfront.
await sequelize.query(
@@ -179,8 +172,9 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
WHERE v."documentId" = d.id AND v."updatedAt" >= :threshold
)
)
${startUuid && endUuid ? "AND d.id >= :startUuid AND d.id <= :endUuid" : ""}
`,
{ replacements: { threshold } }
{ replacements: { threshold, startUuid, endUuid } }
);
// Create index on processed column for efficient batch selection
@@ -203,57 +197,41 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
/**
* Processes a batch of documents with retry logic.
*/
private async processBatchWithRetry(
threshold: Date,
now: Date,
attempt = 1
): Promise<number> {
try {
// Step 1: Get batch of document IDs to process
const batch = await sequelize.query<{ documentId: string }>(
`
private async processBatch(threshold: Date, now: Date): Promise<number> {
// Step 1: Get batch of document IDs to process
const batch = await sequelize.query<{ documentId: string }>(
`
SELECT "documentId" FROM ${this.workingTable}
WHERE NOT processed
ORDER BY "documentId"
LIMIT :limit
`,
{
replacements: { limit: BATCH_SIZE },
type: QueryTypes.SELECT,
}
);
if (batch.length === 0) {
return 0;
{
replacements: { limit: BATCH_SIZE },
type: QueryTypes.SELECT,
}
);
const documentIds = batch.map((b) => b.documentId);
// Step 2: Calculate scores outside of a transaction
const scores = await this.calculateScoresForDocuments(
documentIds,
threshold,
now
);
// Step 3: Update document scores
await this.updateDocumentScores(scores);
// Step 4: Mark batch as processed
await this.markBatchProcessed(documentIds);
return documentIds.length;
} catch (error) {
if (attempt < MAX_RETRIES) {
Logger.warn(
`Batch update failed, retrying (attempt ${attempt + 1}/${MAX_RETRIES})`,
{ error }
);
await sleep(1000 * attempt);
return this.processBatchWithRetry(threshold, now, attempt + 1);
}
throw error;
if (batch.length === 0) {
return 0;
}
const documentIds = batch.map((b) => b.documentId);
// Step 2: Calculate scores outside of a transaction
const scores = await this.calculateScoresForDocuments(
documentIds,
threshold,
now
);
// Step 3: Update document scores
await this.updateDocumentScores(scores);
// Step 4: Mark batch as processed
await this.markBatchProcessed(documentIds);
return documentIds.length;
}
/**
@@ -409,4 +387,18 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
Logger.warn("Failed to clean up working table", { error });
}
}
public get cron() {
return {
interval: TaskInterval.Hour,
partitionWindow: 30 * Minute.ms,
};
}
public get options() {
return {
attempts: 1,
priority: TaskPriority.Background,
};
}
}

View File

@@ -1,5 +1,5 @@
import { Attachment, Team } from "@server/models";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import { sequelizeReadOnly } from "@server/storage/database";
type Props = {

View File

@@ -3,16 +3,15 @@ import { Op } from "sequelize";
import Logger from "@server/logging/Logger";
import { Attachment } from "@server/models";
import { sequelize } from "@server/storage/database";
import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
import { TaskPriority } from "./base/BaseTask";
import { CronTask, TaskInterval } from "./base/CronTask";
import UpdateTeamAttachmentsSizeTask from "./UpdateTeamAttachmentsSizeTask";
type Props = {
limit: number;
};
export default class UpdateTeamsAttachmentsSizeTask extends BaseTask<Props> {
static cron = TaskSchedule.Day;
export default class UpdateTeamsAttachmentsSizeTask extends CronTask {
public async perform({ limit }: Props) {
Logger.info(
"task",
@@ -44,6 +43,12 @@ export default class UpdateTeamsAttachmentsSizeTask extends BaseTask<Props> {
);
}
public get cron() {
return {
interval: TaskInterval.Day,
};
}
public get options() {
return {
attempts: 1,

View File

@@ -1,7 +1,7 @@
import { createContext } from "@server/context";
import { Attachment } from "@server/models";
import FileStorage from "@server/storage/files";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import { sequelize } from "@server/storage/database";
type Props = {

View File

@@ -2,7 +2,7 @@ import { Sema } from "async-sema";
import Logger from "@server/logging/Logger";
import { Attachment } from "@server/models";
import FileStorage from "@server/storage/files";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import env from "@server/env";
const ConcurrentUploads = 5;

View File

@@ -2,7 +2,7 @@ import { randomUUID } from "crypto";
import { Team } from "@server/models";
import { Buckets } from "@server/models/helpers/AttachmentHelper";
import FileStorage from "@server/storage/files";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
type Props = {
/** The teamId to operate on */

View File

@@ -2,7 +2,7 @@ import { createHash, randomUUID } from "crypto";
import { User } from "@server/models";
import { Buckets } from "@server/models/helpers/AttachmentHelper";
import FileStorage from "@server/storage/files";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
type Props = {
/** The userId to operate on */

View File

@@ -1,7 +1,7 @@
import Logger from "@server/logging/Logger";
import { User, UserAuthentication } from "@server/models";
import { sequelize } from "@server/storage/database";
import BaseTask, { TaskPriority } from "./BaseTask";
import { BaseTask, TaskPriority } from "./base/BaseTask";
type Props = {
userId: string;

View File

@@ -1,5 +1,5 @@
import { Job, JobOptions } from "bull";
import { taskQueue } from "../";
import { taskQueue } from "../../";
export enum TaskPriority {
Background = 40,
@@ -8,18 +8,7 @@ export enum TaskPriority {
High = 10,
}
export enum TaskSchedule {
Day = "daily",
Hour = "hourly",
Minute = "minute",
}
export default abstract class BaseTask<T extends Record<string, any>> {
/**
* An optional schedule for this task to be run automatically.
*/
static cron: TaskSchedule | undefined;
export abstract class BaseTask<T extends Record<string, any>> {
/**
* Schedule this task type to be processed asynchronously by a worker.
*
@@ -57,7 +46,7 @@ export default abstract class BaseTask<T extends Record<string, any>> {
}
/**
* Job options such as priority and retry strategy, as defined by Bull.
* Job options such as priority and retry strategy.
*/
public get options(): JobOptions {
return {

View File

@@ -0,0 +1,297 @@
import { Op } from "sequelize";
import { CronTask, PartitionInfo, TaskInterval } from "./CronTask";
// Create a concrete implementation of CronTask for testing
class TestTask extends CronTask {
public async perform() {
// Not used in these tests
}
public get cron() {
return {
interval: TaskInterval.Day,
};
}
public testPartitionWhereClause(
idField: string,
partition: PartitionInfo | undefined
) {
return this.getPartitionWhereClause(idField, partition);
}
}
describe("CronTask", () => {
let task: TestTask;
beforeEach(() => {
task = new TestTask();
});
describe("getPartitionWhereClause", () => {
it("should return empty object when partition is undefined", () => {
const where = task.testPartitionWhereClause("id", undefined);
expect(where).toEqual({});
});
it("should generate range WHERE clause for valid partition", () => {
const where = task.testPartitionWhereClause("id", {
partitionIndex: 0,
partitionCount: 3,
}) as any;
expect(where).toBeDefined();
expect(where.id).toBeDefined();
expect(where.id[Op.gte]).toBeDefined();
expect(where.id[Op.lte]).toBeDefined();
});
it("should generate correct UUID ranges for 3 partitions", () => {
const where0 = task.testPartitionWhereClause("id", {
partitionIndex: 0,
partitionCount: 3,
}) as any;
const where1 = task.testPartitionWhereClause("id", {
partitionIndex: 1,
partitionCount: 3,
}) as any;
const where2 = task.testPartitionWhereClause("id", {
partitionIndex: 2,
partitionCount: 3,
}) as any;
// Partition 0: Should start from 00000000
expect(where0.id[Op.gte]).toBe("00000000-0000-4000-8000-000000000000");
expect(where0.id[Op.lte]).toBe("55555554-ffff-4fff-bfff-ffffffffffff");
// Partition 1: Should start from 55555555
expect(where1.id[Op.gte]).toBe("55555555-0000-4000-8000-000000000000");
expect(where1.id[Op.lte]).toBe("aaaaaaa9-ffff-4fff-bfff-ffffffffffff");
// Partition 2: Should end at ffffffff
expect(where2.id[Op.gte]).toBe("aaaaaaaa-0000-4000-8000-000000000000");
expect(where2.id[Op.lte]).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff");
});
it("should generate correct UUID ranges for 2 partitions", () => {
const where0 = task.testPartitionWhereClause("id", {
partitionIndex: 0,
partitionCount: 2,
}) as any;
const where1 = task.testPartitionWhereClause("id", {
partitionIndex: 1,
partitionCount: 2,
}) as any;
// Partition 0: 0x00000000 to 0x7fffffff
expect(where0.id[Op.gte]).toBe("00000000-0000-4000-8000-000000000000");
expect(where0.id[Op.lte]).toBe("7fffffff-ffff-4fff-bfff-ffffffffffff");
// Partition 1: 0x80000000 to 0xffffffff
expect(where1.id[Op.gte]).toBe("80000000-0000-4000-8000-000000000000");
expect(where1.id[Op.lte]).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff");
});
it("should distribute UUID space evenly", () => {
const partitionCount = 4;
const ranges: Array<{ start: string; end: string }> = [];
for (let i = 0; i < partitionCount; i++) {
const where = task.testPartitionWhereClause("id", {
partitionIndex: i,
partitionCount,
}) as any;
ranges.push({
start: where.id[Op.gte],
end: where.id[Op.lte],
});
}
// Check that ranges don't overlap and cover the entire space
expect(ranges[0].start).toBe("00000000-0000-4000-8000-000000000000");
expect(ranges[3].end).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff");
// Check that each range ends where the next begins (approximately)
for (let i = 0; i < partitionCount - 1; i++) {
const currentEnd = ranges[i].end.substring(0, 8);
const nextStart = ranges[i + 1].start.substring(0, 8);
// Convert to numbers to check they're consecutive
const currentEndNum = parseInt(currentEnd, 16);
const nextStartNum = parseInt(nextStart, 16);
// Should be consecutive or very close
expect(nextStartNum - currentEndNum).toBeLessThanOrEqual(1);
}
});
it("should handle single partition (no partitioning)", () => {
const where = task.testPartitionWhereClause("id", {
partitionIndex: 0,
partitionCount: 1,
}) as any;
// Should cover entire UUID space
expect(where.id[Op.gte]).toBe("00000000-0000-4000-8000-000000000000");
expect(where.id[Op.lte]).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff");
});
it("should throw error for invalid partition info", () => {
expect(() => {
task.testPartitionWhereClause("id", {
partitionIndex: -1,
partitionCount: 3,
});
}).toThrow("Invalid partition info: index -1, count 3");
expect(() => {
task.testPartitionWhereClause("id", {
partitionIndex: 3,
partitionCount: 3,
});
}).toThrow("Invalid partition info: index 3, count 3");
expect(() => {
task.testPartitionWhereClause("id", {
partitionIndex: 0,
partitionCount: 0,
});
}).toThrow("Invalid partition info: index 0, count 0");
});
it("should work with different field names", () => {
const where1 = task.testPartitionWhereClause("id", {
partitionIndex: 0,
partitionCount: 2,
}) as any;
const where2 = task.testPartitionWhereClause("documentId", {
partitionIndex: 0,
partitionCount: 2,
}) as any;
expect(where1.id).toBeDefined();
expect(where1.documentId).toBeUndefined();
expect(where2.documentId).toBeDefined();
expect(where2.id).toBeUndefined();
});
it("should handle large partition counts efficiently", () => {
const partitionCount = 100;
const ranges: Array<{ start: string; end: string }> = [];
for (let i = 0; i < partitionCount; i++) {
const where = task.testPartitionWhereClause("id", {
partitionIndex: i,
partitionCount,
}) as any;
ranges.push({
start: where.id[Op.gte],
end: where.id[Op.lte],
});
}
// First partition should start at 00000000
expect(ranges[0].start).toBe("00000000-0000-4000-8000-000000000000");
// Last partition should end at ffffffff
expect(ranges[99].end).toBe("ffffffff-ffff-4fff-bfff-ffffffffffff");
// Each partition should have a unique range
const startValues = new Set(ranges.map((r) => r.start));
expect(startValues.size).toBe(100);
});
it("should calculate correct hex values for partition boundaries", () => {
// Test specific calculations
const where = task.testPartitionWhereClause("id", {
partitionIndex: 1,
partitionCount: 16, // 16 partitions = 0x10000000 per partition
}) as any;
// Partition 1 should be from 0x10000000 to 0x1fffffff
expect(where.id[Op.gte]).toBe("10000000-0000-4000-8000-000000000000");
expect(where.id[Op.lte]).toBe("1fffffff-ffff-4fff-bfff-ffffffffffff");
});
it("should ensure all UUIDs map to exactly one partition", () => {
const testUuids = [
"00000000-0000-4000-8000-000000000000", // Min UUID
"12345678-9abc-4ef0-9234-567890abcdef",
"55555555-5555-4555-9555-555555555555",
"87654321-fedc-4a98-b654-321098765432",
"aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa",
"deadbeef-cafe-4abe-aeed-dec0debac1e5",
"ffffffff-ffff-4fff-bfff-ffffffffffff", // Max UUID
];
const partitionCount = 3;
for (const uuid of testUuids) {
let matchCount = 0;
let matchedPartition = -1;
for (let i = 0; i < partitionCount; i++) {
const where = task.testPartitionWhereClause("id", {
partitionIndex: i,
partitionCount,
}) as any;
const startUuid = where.id[Op.gte];
const endUuid = where.id[Op.lte];
// Check if UUID falls within this partition's range
if (uuid >= startUuid && uuid <= endUuid) {
matchCount++;
matchedPartition = i;
}
}
// Each UUID should match exactly one partition
expect(matchCount).toBe(1);
expect(matchedPartition).toBeGreaterThanOrEqual(0);
expect(matchedPartition).toBeLessThan(partitionCount);
}
});
it("should generate non-overlapping ranges for any partition count", () => {
const testCounts = [2, 3, 5, 7, 10, 16, 32];
for (const partitionCount of testCounts) {
const ranges: Array<{ start: string; end: string }> = [];
// Get all partition ranges
for (let i = 0; i < partitionCount; i++) {
const where = task.testPartitionWhereClause("id", {
partitionIndex: i,
partitionCount,
}) as any;
ranges.push({
start: where.id[Op.gte],
end: where.id[Op.lte],
});
}
// Verify no gaps between consecutive partitions
for (let i = 0; i < partitionCount - 1; i++) {
const currentEnd = ranges[i].end.substring(0, 8);
const nextStart = ranges[i + 1].start.substring(0, 8);
const currentEndNum = parseInt(currentEnd, 16);
const nextStartNum = parseInt(nextStart, 16);
// Next partition should start exactly where current ends + 1
expect(nextStartNum).toBe(currentEndNum + 1);
}
// Verify coverage of entire UUID space
expect(ranges[0].start).toBe("00000000-0000-4000-8000-000000000000");
expect(ranges[partitionCount - 1].end).toBe(
"ffffffff-ffff-4fff-bfff-ffffffffffff"
);
}
});
});
});

View File

@@ -0,0 +1,163 @@
import { Op, WhereOptions } from "sequelize";
import { BaseTask } from "./BaseTask";
export enum TaskInterval {
Day = "daily",
Hour = "hourly",
}
export type TaskSchedule = {
/** The interval at which to run this task */
interval: TaskInterval;
/**
* An optional time window (in milliseconds) over which to spread the START time
* of this task when triggered by cron.
*
* **Important**: This only delays when tasks START - it does NOT partition the work.
* To distribute work across multiple workers, tasks must also use the `partition`
* prop and implement partitioned queries using `getPartitionWhereClause()`.
*
* When set, each task gets a deterministic delay based on its name, ensuring
* consistent scheduling across runs and preventing all tasks from starting
* simultaneously.
*
* @example
* // Run hourly, but spread task start times over 10 minutes
* interval: TaskInterval.Hour,
* partitionWindow: 10 * Minute.ms // 10 minutes
*
* @example
* // Run daily, but spread task start times over 1 hour
* interval: TaskInterval.Day,
* partitionWindow: 60 * Minute.ms // 1 hour
*/
partitionWindow?: number;
};
/**
* Partition information for distributing work across multiple worker instances.
*/
export type PartitionInfo = {
/**
* The partition number for this task instance (0-based).
*/
partitionIndex: number;
/**
* The total number of partitions.
*/
partitionCount: number;
};
/**
* Properties for cron-scheduled tasks.
*/
export type Props = {
limit: number;
partition: PartitionInfo;
};
export abstract class CronTask extends BaseTask<Props> {
/** The schedule configuration for this cron task */
public abstract get cron(): TaskSchedule;
/**
* Optimized partitioning method for UUID primary keys using range-based distribution.
* Divides the UUID space into N equal ranges and assigns each partition a range.
*
* The UUID space (0x00000000-... to 0xffffffff-...) is divided into N equal ranges.
* For example, with 3 partitions:
* - Partition 0: '00000000-0000-4000-8000-000000000000' to '55555554-ffff-4fff-bfff-ffffffffffff'
* - Partition 1: '55555555-0000-4000-8000-000000000000' to 'aaaaaaaa-ffff-4fff-bfff-ffffffffffff'
* - Partition 2: 'aaaaaaab-0000-4000-8000-000000000000' to 'ffffffff-ffff-4fff-bfff-ffffffffffff'
*
* @param partitionInfo The partition information
* @returns The start and end UUID bounds for the partition
*/
protected getPartitionBounds(
partitionInfo: PartitionInfo | undefined
): [string, string] {
if (!partitionInfo) {
return [
"00000000-0000-4000-8000-000000000000",
"ffffffff-ffff-4fff-bfff-ffffffffffff",
];
}
const { partitionIndex, partitionCount } = partitionInfo;
if (
partitionCount <= 0 ||
partitionIndex < 0 ||
partitionIndex >= partitionCount
) {
throw new Error(
`Invalid partition info: index ${partitionIndex}, count ${partitionCount}`
);
}
// 2^32 total possible values for the first 32 bits (4.3 billion)
const TOTAL_VALUES = 0x100000000;
// The maximum possible integer value (0xFFFFFFFF)
const MAX_VALUE = TOTAL_VALUES - 1;
// Ensure even distribution of values by calculating exact range size
const rangeSize = Math.floor(TOTAL_VALUES / partitionCount);
const rangeStart = partitionIndex * rangeSize;
let rangeEnd: number;
if (partitionIndex === partitionCount - 1) {
// The last partition takes any remainder and goes up to the max value
rangeEnd = MAX_VALUE;
} else {
// The end is the start of the *next* partition minus 1
rangeEnd = (partitionIndex + 1) * rangeSize - 1;
}
// Use Number.prototype.toString(16) and padStart(8, '0') for the 32-bit hex prefix
const startHex = rangeStart.toString(16).padStart(8, "0");
const endHex = rangeEnd.toString(16).padStart(8, "0");
// Start: First 32 bits (prefix) followed by the lowest possible values for the rest
// Ensures correct UUID v4 version (4xxx) and variant (8|9|a|bxxx) bits
const startUuid = `${startHex}-0000-4000-8000-000000000000`;
// End: First 32 bits (prefix) followed by the highest possible values for the rest
// Ensures correct UUID v4 version (4xxx) and variant (8|9|a|bxxx) bits
const endUuid = `${endHex}-ffff-4fff-bfff-ffffffffffff`;
return [startUuid, endUuid];
}
/**
* Optimized partitioning method for UUID primary keys using range-based distribution.
* Divides the UUID space into N equal ranges and assigns each partition a range.
*
* @param idField The name of the UUID primary key field to partition on
* @param partitionInfo The partition information
* @returns A WHERE clause for partitioned queries
*
* @example
* const where = {
* deletedAt: { [Op.lt]: someDate },
* ...this.getPartitionWhereClause("id", props.partition)
* };
*/
protected getPartitionWhereClause(
idField: string,
partitionInfo: PartitionInfo | undefined
): WhereOptions {
if (!partitionInfo) {
return {};
}
const [startUuid, endUuid] = this.getPartitionBounds(partitionInfo);
return {
[idField]: {
[Op.gte]: startUuid,
[Op.lte]: endUuid,
},
};
}
}

View File

@@ -1,6 +1,6 @@
import { Hook, PluginManager } from "@server/utils/PluginManager";
import { requireDirectory } from "@server/utils/fs";
import BaseTask from "./BaseTask";
import { BaseTask } from "./base/BaseTask";
const tasks: Record<string, typeof BaseTask> = {};

View File

@@ -1,24 +1,24 @@
import Router from "koa-router";
import env from "@server/env";
import { AuthenticationError } from "@server/errors";
import Logger from "@server/logging/Logger";
import validate from "@server/middlewares/validate";
import tasks from "@server/queues/tasks";
import { TaskSchedule } from "@server/queues/tasks/BaseTask";
import { CronTask, TaskInterval } from "@server/queues/tasks/base/CronTask";
import { APIContext } from "@server/types";
import { safeEqual } from "@server/utils/crypto";
import * as T from "./schema";
import { Minute } from "@shared/utils/time";
const router = new Router();
/** Whether the minutely cron job has been received */
const receivedPeriods = new Set<TaskSchedule>();
const receivedPeriods = new Set<TaskInterval>();
const cronHandler = async (ctx: APIContext<T.CronSchemaReq>) => {
const period = Object.values(TaskSchedule).includes(
ctx.params.period as TaskSchedule
const period = Object.values(TaskInterval).includes(
ctx.params.period as TaskInterval
)
? (ctx.params.period as TaskSchedule)
: TaskSchedule.Day;
? (ctx.params.period as TaskInterval)
: TaskInterval.Day;
const token = (ctx.input.body.token ?? ctx.input.query.token) as string;
const limit = ctx.input.body.limit ?? ctx.input.query.limit;
@@ -30,26 +30,54 @@ const cronHandler = async (ctx: APIContext<T.CronSchemaReq>) => {
for (const name in tasks) {
const TaskClass = tasks[name];
if (TaskClass.cron === period) {
// @ts-expect-error We won't instantiate an abstract class
await new TaskClass().schedule({ limit });
if (!(TaskClass.prototype instanceof CronTask)) {
continue;
}
// @ts-expect-error We won't instantiate an abstract class
const taskInstance = new TaskClass() as CronTask;
const cronConfig = taskInstance.cron;
const partitionWindow = cronConfig.partitionWindow;
const shouldSchedule =
cronConfig.interval === period ||
// Backwards compatibility for installations that have not set up
// cron jobs periods other than daily.
} else if (
TaskClass.cron === TaskSchedule.Minute &&
!receivedPeriods.has(TaskSchedule.Minute) &&
(period === TaskSchedule.Hour || period === TaskSchedule.Day)
) {
// @ts-expect-error We won't instantiate an abstract class
await new TaskClass().schedule({ limit });
} else if (
TaskClass.cron === TaskSchedule.Hour &&
!receivedPeriods.has(TaskSchedule.Hour) &&
period === TaskSchedule.Day
) {
// @ts-expect-error We won't instantiate an abstract class
await new TaskClass().schedule({ limit });
(cronConfig.interval === TaskInterval.Hour &&
!receivedPeriods.has(TaskInterval.Hour) &&
period === TaskInterval.Day);
if (shouldSchedule) {
if (partitionWindow && partitionWindow > 0) {
// Split the task into partitions to spread work across time window
// by dividing the partitionWindow into minutes and scheduling a delayed
// task for each minute.
const partitions = Math.ceil(partitionWindow / Minute.ms);
for (let i = 0; i < partitions; i++) {
const delay = Math.floor((partitionWindow / partitions) * i);
const partition = {
partitionIndex: i,
partitionCount: partitions,
};
Logger.debug(
"task",
`Scheduling partitioned task ${name} (partition ${
i + 1
}/${partitions}) with delay of ${delay / 1000}s`
);
await taskInstance.schedule({ limit, partition }, { delay });
}
} else {
await taskInstance.schedule({
limit,
partition: {
partitionIndex: 0,
partitionCount: 1,
},
});
}
}
}
@@ -57,7 +85,6 @@ const cronHandler = async (ctx: APIContext<T.CronSchemaReq>) => {
success: true,
};
};
router.get("cron.:period", validate(T.CronSchema), cronHandler);
router.post("cron.:period", validate(T.CronSchema), cronHandler);

View File

@@ -1,27 +1,36 @@
import { Day, Hour, Minute, Second } from "@shared/utils/time";
import { Day, Hour, Second } from "@shared/utils/time";
import tasks from "@server/queues/tasks";
import { TaskSchedule } from "@server/queues/tasks/BaseTask";
import { CronTask, TaskInterval } from "@server/queues/tasks/base/CronTask";
export default function init() {
async function run(schedule: TaskSchedule) {
async function run(schedule: TaskInterval) {
const partition = {
partitionIndex: 0,
partitionCount: 1,
};
for (const name in tasks) {
const TaskClass = tasks[name];
if (TaskClass.cron === schedule) {
// @ts-expect-error We won't instantiate an abstract class
await new TaskClass().schedule({ limit: 10000 });
if (!(TaskClass.prototype instanceof CronTask)) {
continue;
}
// @ts-expect-error We won't instantiate an abstract class
const taskInstance = new TaskClass() as CronTask;
if (taskInstance.cron.interval === schedule) {
await taskInstance.schedule({ limit: 10000, partition });
}
}
}
setInterval(() => void run(TaskSchedule.Day), Day.ms);
setInterval(() => void run(TaskSchedule.Hour), Hour.ms);
setInterval(() => void run(TaskSchedule.Minute), Minute.ms);
setInterval(() => void run(TaskInterval.Day), Day.ms);
setInterval(() => void run(TaskInterval.Hour), Hour.ms);
// Just give everything time to startup before running the first time. Not
// _technically_ required to function.
setTimeout(() => {
void run(TaskSchedule.Day);
void run(TaskSchedule.Hour);
void run(TaskSchedule.Minute);
void run(TaskInterval.Day);
void run(TaskInterval.Hour);
}, 5 * Second.ms);
}

View File

@@ -7,7 +7,7 @@ import type BaseEmail from "@server/emails/templates/BaseEmail";
import env from "@server/env";
import Logger from "@server/logging/Logger";
import type BaseProcessor from "@server/queues/processors/BaseProcessor";
import type BaseTask from "@server/queues/tasks/BaseTask";
import type { BaseTask } from "@server/queues/tasks/base/BaseTask";
import { UnfurlSignature, UninstallSignature } from "@server/types";
import { BaseIssueProvider } from "./BaseIssueProvider";