diff --git a/server/env.ts b/server/env.ts index dbee4fb19f..cf4b2033b2 100644 --- a/server/env.ts +++ b/server/env.ts @@ -94,6 +94,20 @@ export class Environment { ]) public DATABASE_URL = this.toOptionalString(environment.DATABASE_URL); + /** + * Optional database URL for read replica to distribute read queries + * and reduce load on primary database. + */ + @IsOptional() + @IsUrl({ + require_tld: false, + allow_underscores: true, + protocols: ["postgres", "postgresql"], + }) + public DATABASE_URL_READ_ONLY = this.toOptionalString( + environment.DATABASE_URL_READ_ONLY + ); + /** * Database host for individual component configuration. */ @@ -754,6 +768,24 @@ export class Environment { @Public public APP_NAME = "Outline"; + /** + * Gravity constant for time decay in popularity scoring. Higher values cause + * faster decay of older content. Default is 0.7. + */ + @IsOptional() + @IsNumber() + public POPULARITY_GRAVITY = + this.toOptionalNumber(environment.POPULARITY_GRAVITY) ?? 0.7; + + /** + * Number of weeks of activity to consider when calculating popularity scores. + * Default is 2 weeks. + */ + @IsOptional() + @IsNumber() + public POPULARITY_ACTIVITY_THRESHOLD_WEEKS = + this.toOptionalNumber(environment.POPULARITY_ACTIVITY_THRESHOLD_WEEKS) ?? 2; + /** * Returns true if the current installation is the cloud hosted version at * getoutline.com diff --git a/server/models/Attachment.ts b/server/models/Attachment.ts index a24b39d75d..74cd6c8c2e 100644 --- a/server/models/Attachment.ts +++ b/server/models/Attachment.ts @@ -6,6 +6,7 @@ import { InferCreationAttributes, QueryTypes, FindOptions, + Sequelize, } from "sequelize"; import { BeforeDestroy, @@ -182,11 +183,15 @@ class Attachment extends IdModel< /** * Get the total size of all attachments for a given team. * + * @param connection - The Sequelize connection to use for the query. * @param teamId - The ID of the team to get the total size for. * @returns A promise resolving to the total size of all attachments for the given team in bytes. */ - static async getTotalSizeForTeam(teamId: string): Promise { - const result = await this.sequelize!.query<{ total: string }>( + static async getTotalSizeForTeam( + connection: Sequelize, + teamId: string + ): Promise { + const result = await connection.query<{ total: string }>( ` SELECT SUM(size) as total FROM attachments diff --git a/server/queues/tasks/ExportTask.ts b/server/queues/tasks/ExportTask.ts index 87da2c5e04..43beba792a 100644 --- a/server/queues/tasks/ExportTask.ts +++ b/server/queues/tasks/ExportTask.ts @@ -20,6 +20,7 @@ import FileStorage from "@server/storage/files"; import BaseTask, { TaskPriority } from "./BaseTask"; import { Op } from "sequelize"; import { WhereOptions } from "sequelize"; +import { sequelizeReadOnly } from "@server/storage/database"; type Props = { fileOperationId: string; @@ -68,6 +69,7 @@ export default abstract class ExportTask extends BaseTask { if (!fileOperation.collectionId) { const totalAttachmentsSize = await Attachment.getTotalSizeForTeam( + sequelizeReadOnly, user.teamId ); diff --git a/server/queues/tasks/UpdateDocumentsPopularityScoreTask.ts b/server/queues/tasks/UpdateDocumentsPopularityScoreTask.ts index 4e123b0bc6..56f55199f0 100644 --- a/server/queues/tasks/UpdateDocumentsPopularityScoreTask.ts +++ b/server/queues/tasks/UpdateDocumentsPopularityScoreTask.ts @@ -1,23 +1,14 @@ import crypto from "crypto"; import { subWeeks } from "date-fns"; import { QueryTypes } from "sequelize"; +import env from "@server/env"; import Logger from "@server/logging/Logger"; import BaseTask, { TaskSchedule } from "./BaseTask"; -import { sequelize } from "@server/storage/database"; +import { sequelize, sequelizeReadOnly } from "@server/storage/database"; import { sleep } from "@server/utils/timers"; type Props = Record; -/** - * Gravity constant for time decay. Higher values cause faster decay of older content. - * With `GRAVITY = 0.7`: - * - Content from **1 day ago** retains ~30% of its score - * - Content from **3 days ago** retains ~15% of its score - * - Content from **1 week ago** retains ~8% of its score - * - Content from **2 weeks ago** retains ~4% of its score - */ -const GRAVITY = 0.7; - /** * Number of hours to add to age to smooth the decay curve, * preventing brand new content from having disproportionately @@ -34,25 +25,20 @@ const ACTIVITY_WEIGHTS = { view: 0.5, }; -/** - * Only recalculate scores for activity within this period. - */ -const ACTIVITY_THRESHOLD_WEEKS = 2; - /** * Batch size for processing updates - kept small to minimize lock duration */ -const BATCH_SIZE = 100; +const BATCH_SIZE = 50; /** * Maximum retries for failed batch operations */ -const MAX_RETRIES = 3; +const MAX_RETRIES = 2; /** * Statement timeout for individual queries to prevent runaway locks */ -const STATEMENT_TIMEOUT_MS = 10000; +const STATEMENT_TIMEOUT_MS = 30000; /** * Base name for the working table used to track documents to process @@ -75,7 +61,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask Logger.info("task", "Updating document popularity scores…"); const now = new Date(); - const activityThreshold = subWeeks(now, ACTIVITY_THRESHOLD_WEEKS); + const threshold = subWeeks(now, env.POPULARITY_ACTIVITY_THRESHOLD_WEEKS); // Generate unique table name for this run to prevent conflicts const uniqueId = crypto.randomBytes(8).toString("hex"); @@ -83,7 +69,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask try { // Setup: Create working table and populate with active document IDs - await this.setupWorkingTable(activityThreshold); + await this.setupWorkingTable(threshold); const activeCount = await this.getWorkingTableCount(); @@ -111,10 +97,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask batchNumber++; try { - const updated = await this.processBatchWithRetry( - activityThreshold, - now - ); + const updated = await this.processBatchWithRetry(threshold, now); totalUpdated += updated; Logger.debug( @@ -123,7 +106,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask ); // Add delay between batches to reduce database contention - await sleep(1000); + await sleep(10); } catch (error) { totalErrors++; Logger.error(`Batch ${batchNumber} failed after retries`, error); @@ -151,7 +134,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask * that have recent activity. Unlogged tables are faster because they * skip WAL logging, and data loss on crash is acceptable here. */ - private async setupWorkingTable(activityThreshold: Date): Promise { + private async setupWorkingTable(threshold: Date): Promise { // Drop any existing table first to avoid type conflicts from previous crashed runs await sequelize.query(`DROP TABLE IF EXISTS ${this.workingTable} CASCADE`); @@ -187,7 +170,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask ) ) `, - { replacements: { threshold: activityThreshold } } + { replacements: { threshold } } ); // Create index on processed column for efficient batch selection @@ -211,7 +194,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask * Processes a batch of documents with retry logic. */ private async processBatchWithRetry( - activityThreshold: Date, + threshold: Date, now: Date, attempt = 1 ): Promise { @@ -239,7 +222,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask // Step 2: Calculate scores outside of a transaction const scores = await this.calculateScoresForDocuments( documentIds, - activityThreshold, + threshold, now ); @@ -257,7 +240,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask { error } ); await sleep(1000 * attempt); - return this.processBatchWithRetry(activityThreshold, now, attempt + 1); + return this.processBatchWithRetry(threshold, now, attempt + 1); } throw error; } @@ -269,13 +252,10 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask */ private async calculateScoresForDocuments( documentIds: string[], - activityThreshold: Date, + threshold: Date, now: Date ): Promise { - // Build VALUES clause for the batch - const valuesClause = documentIds.map((id) => `('${id}'::uuid)`).join(", "); - - const results = await sequelize.query<{ + const results = await sequelizeReadOnly.query<{ documentId: string; total_score: string; }>( @@ -283,7 +263,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask SET LOCAL statement_timeout = '${STATEMENT_TIMEOUT_MS}ms'; WITH batch_docs AS ( - SELECT * FROM (VALUES ${valuesClause}) AS t(id) + SELECT unnest(ARRAY[:documentIds]::uuid[]) AS id ), revision_scores AS ( SELECT @@ -331,9 +311,10 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask `, { replacements: { - threshold: activityThreshold, + documentIds, + threshold, now, - gravity: GRAVITY, + gravity: env.POPULARITY_GRAVITY, timeOffset: TIME_OFFSET_HOURS, revisionWeight: ACTIVITY_WEIGHTS.revision, commentWeight: ACTIVITY_WEIGHTS.comment, @@ -359,7 +340,6 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask for (const { documentId, score } of scores) { await sequelize.query( ` - SET LOCAL statement_timeout = '${STATEMENT_TIMEOUT_MS}ms'; UPDATE documents SET "popularityScore" = :score WHERE id = :documentId @@ -375,14 +355,15 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask * Marks documents as processed in the working table */ private async markBatchProcessed(documentIds: string[]): Promise { - const valuesClause = documentIds.map((id) => `('${id}'::uuid)`).join(", "); - await sequelize.query( ` UPDATE ${this.workingTable} SET processed = TRUE - WHERE "documentId" IN (SELECT id FROM (VALUES ${valuesClause}) AS t(id)) - ` + WHERE "documentId" IN (SELECT unnest(ARRAY[:documentIds]::uuid[])) + `, + { + replacements: { documentIds }, + } ); } diff --git a/server/queues/tasks/UpdateTeamAttachmentsSizeTask.ts b/server/queues/tasks/UpdateTeamAttachmentsSizeTask.ts index 49c5f2c1e7..5d50f891b6 100644 --- a/server/queues/tasks/UpdateTeamAttachmentsSizeTask.ts +++ b/server/queues/tasks/UpdateTeamAttachmentsSizeTask.ts @@ -1,5 +1,6 @@ import { Attachment, Team } from "@server/models"; import BaseTask, { TaskPriority } from "./BaseTask"; +import { sequelizeReadOnly } from "@server/storage/database"; type Props = { /** The teamId to operate on */ @@ -11,7 +12,10 @@ type Props = { */ export default class UpdateTeamAttachmentsSizeTask extends BaseTask { public async perform({ teamId }: Props) { - const sizeInBytes = await Attachment.getTotalSizeForTeam(teamId); + const sizeInBytes = await Attachment.getTotalSizeForTeam( + sequelizeReadOnly, + teamId + ); if (!sizeInBytes) { return; } diff --git a/server/storage/database.ts b/server/storage/database.ts index dfa2e93f43..4ef8894c08 100644 --- a/server/storage/database.ts +++ b/server/storage/database.ts @@ -47,10 +47,12 @@ export function createDatabaseInstance( InferAttributes, InferCreationAttributes >; - } + }, + options?: { readOnly?: boolean } ): Sequelize { try { let instance; + const isReadOnly = options?.readOnly ?? false; // Common options for both URL and object configurations const commonOptions: SequelizeOptions = { @@ -70,17 +72,21 @@ export function createDatabaseInstance( }, models: Object.values(input), pool: { - max: poolMax, + // Read-only connections can have larger pools since there's no write contention + max: isReadOnly ? poolMax * 2 : poolMax, min: poolMin, acquire: 30000, idle: 10000, }, - retry: { - match: [/deadlock/i], - max: 3, - backoffBase: 200, - backoffExponent: 1.5, - }, + // Only retry on deadlocks for write connections + retry: isReadOnly + ? undefined + : { + match: [/deadlock/i], + max: 3, + backoffBase: 200, + backoffExponent: 1.5, + }, schema, }; @@ -92,6 +98,29 @@ export function createDatabaseInstance( } sequelizeStrictAttributes(instance); + + // Add hooks to warn about write operations on read-only connections + if (isReadOnly) { + const warnWriteOperation = (operation: string) => { + Logger.warn( + `Attempted ${operation} operation on read-only database connection` + ); + }; + + instance.addHook("beforeCreate", () => warnWriteOperation("CREATE")); + instance.addHook("beforeUpdate", () => warnWriteOperation("UPDATE")); + instance.addHook("beforeDestroy", () => warnWriteOperation("DELETE")); + instance.addHook("beforeBulkCreate", () => + warnWriteOperation("BULK CREATE") + ); + instance.addHook("beforeBulkUpdate", () => + warnWriteOperation("BULK UPDATE") + ); + instance.addHook("beforeBulkDestroy", () => + warnWriteOperation("BULK DELETE") + ); + } + return instance; } catch (_err) { Logger.fatal( @@ -178,6 +207,20 @@ export function createMigrationRunner( export const sequelize = createDatabaseInstance(databaseConfig, models); +/** + * Read-only database connection for read replicas. + * Falls back to the main connection if DATABASE_URL_READ_ONLY is not set. + */ +export const sequelizeReadOnly = env.DATABASE_URL_READ_ONLY + ? createDatabaseInstance( + env.DATABASE_URL_READ_ONLY, + {}, + { + readOnly: true, + } + ) + : sequelize; + export const migrations = createMigrationRunner(sequelize, [ "migrations/*.js", { cwd: path.resolve("server") },