mirror of
https://github.com/outline/outline.git
synced 2025-12-30 07:19:52 -06:00
fix: Speed up popularity score calculation further (#10728)
* fix: Speed up popularity score calculation further * Add READ_ONLY database connection * UNNEST performs better * Move config to env
This commit is contained in:
@@ -94,6 +94,20 @@ export class Environment {
|
||||
])
|
||||
public DATABASE_URL = this.toOptionalString(environment.DATABASE_URL);
|
||||
|
||||
/**
|
||||
* Optional database URL for read replica to distribute read queries
|
||||
* and reduce load on primary database.
|
||||
*/
|
||||
@IsOptional()
|
||||
@IsUrl({
|
||||
require_tld: false,
|
||||
allow_underscores: true,
|
||||
protocols: ["postgres", "postgresql"],
|
||||
})
|
||||
public DATABASE_URL_READ_ONLY = this.toOptionalString(
|
||||
environment.DATABASE_URL_READ_ONLY
|
||||
);
|
||||
|
||||
/**
|
||||
* Database host for individual component configuration.
|
||||
*/
|
||||
@@ -754,6 +768,24 @@ export class Environment {
|
||||
@Public
|
||||
public APP_NAME = "Outline";
|
||||
|
||||
/**
|
||||
* Gravity constant for time decay in popularity scoring. Higher values cause
|
||||
* faster decay of older content. Default is 0.7.
|
||||
*/
|
||||
@IsOptional()
|
||||
@IsNumber()
|
||||
public POPULARITY_GRAVITY =
|
||||
this.toOptionalNumber(environment.POPULARITY_GRAVITY) ?? 0.7;
|
||||
|
||||
/**
|
||||
* Number of weeks of activity to consider when calculating popularity scores.
|
||||
* Default is 2 weeks.
|
||||
*/
|
||||
@IsOptional()
|
||||
@IsNumber()
|
||||
public POPULARITY_ACTIVITY_THRESHOLD_WEEKS =
|
||||
this.toOptionalNumber(environment.POPULARITY_ACTIVITY_THRESHOLD_WEEKS) ?? 2;
|
||||
|
||||
/**
|
||||
* Returns true if the current installation is the cloud hosted version at
|
||||
* getoutline.com
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
InferCreationAttributes,
|
||||
QueryTypes,
|
||||
FindOptions,
|
||||
Sequelize,
|
||||
} from "sequelize";
|
||||
import {
|
||||
BeforeDestroy,
|
||||
@@ -182,11 +183,15 @@ class Attachment extends IdModel<
|
||||
/**
|
||||
* Get the total size of all attachments for a given team.
|
||||
*
|
||||
* @param connection - The Sequelize connection to use for the query.
|
||||
* @param teamId - The ID of the team to get the total size for.
|
||||
* @returns A promise resolving to the total size of all attachments for the given team in bytes.
|
||||
*/
|
||||
static async getTotalSizeForTeam(teamId: string): Promise<number> {
|
||||
const result = await this.sequelize!.query<{ total: string }>(
|
||||
static async getTotalSizeForTeam(
|
||||
connection: Sequelize,
|
||||
teamId: string
|
||||
): Promise<number> {
|
||||
const result = await connection.query<{ total: string }>(
|
||||
`
|
||||
SELECT SUM(size) as total
|
||||
FROM attachments
|
||||
|
||||
@@ -20,6 +20,7 @@ import FileStorage from "@server/storage/files";
|
||||
import BaseTask, { TaskPriority } from "./BaseTask";
|
||||
import { Op } from "sequelize";
|
||||
import { WhereOptions } from "sequelize";
|
||||
import { sequelizeReadOnly } from "@server/storage/database";
|
||||
|
||||
type Props = {
|
||||
fileOperationId: string;
|
||||
@@ -68,6 +69,7 @@ export default abstract class ExportTask extends BaseTask<Props> {
|
||||
|
||||
if (!fileOperation.collectionId) {
|
||||
const totalAttachmentsSize = await Attachment.getTotalSizeForTeam(
|
||||
sequelizeReadOnly,
|
||||
user.teamId
|
||||
);
|
||||
|
||||
|
||||
@@ -1,23 +1,14 @@
|
||||
import crypto from "crypto";
|
||||
import { subWeeks } from "date-fns";
|
||||
import { QueryTypes } from "sequelize";
|
||||
import env from "@server/env";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import BaseTask, { TaskSchedule } from "./BaseTask";
|
||||
import { sequelize } from "@server/storage/database";
|
||||
import { sequelize, sequelizeReadOnly } from "@server/storage/database";
|
||||
import { sleep } from "@server/utils/timers";
|
||||
|
||||
type Props = Record<string, never>;
|
||||
|
||||
/**
|
||||
* Gravity constant for time decay. Higher values cause faster decay of older content.
|
||||
* With `GRAVITY = 0.7`:
|
||||
* - Content from **1 day ago** retains ~30% of its score
|
||||
* - Content from **3 days ago** retains ~15% of its score
|
||||
* - Content from **1 week ago** retains ~8% of its score
|
||||
* - Content from **2 weeks ago** retains ~4% of its score
|
||||
*/
|
||||
const GRAVITY = 0.7;
|
||||
|
||||
/**
|
||||
* Number of hours to add to age to smooth the decay curve,
|
||||
* preventing brand new content from having disproportionately
|
||||
@@ -34,25 +25,20 @@ const ACTIVITY_WEIGHTS = {
|
||||
view: 0.5,
|
||||
};
|
||||
|
||||
/**
|
||||
* Only recalculate scores for activity within this period.
|
||||
*/
|
||||
const ACTIVITY_THRESHOLD_WEEKS = 2;
|
||||
|
||||
/**
|
||||
* Batch size for processing updates - kept small to minimize lock duration
|
||||
*/
|
||||
const BATCH_SIZE = 100;
|
||||
const BATCH_SIZE = 50;
|
||||
|
||||
/**
|
||||
* Maximum retries for failed batch operations
|
||||
*/
|
||||
const MAX_RETRIES = 3;
|
||||
const MAX_RETRIES = 2;
|
||||
|
||||
/**
|
||||
* Statement timeout for individual queries to prevent runaway locks
|
||||
*/
|
||||
const STATEMENT_TIMEOUT_MS = 10000;
|
||||
const STATEMENT_TIMEOUT_MS = 30000;
|
||||
|
||||
/**
|
||||
* Base name for the working table used to track documents to process
|
||||
@@ -75,7 +61,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
Logger.info("task", "Updating document popularity scores…");
|
||||
|
||||
const now = new Date();
|
||||
const activityThreshold = subWeeks(now, ACTIVITY_THRESHOLD_WEEKS);
|
||||
const threshold = subWeeks(now, env.POPULARITY_ACTIVITY_THRESHOLD_WEEKS);
|
||||
|
||||
// Generate unique table name for this run to prevent conflicts
|
||||
const uniqueId = crypto.randomBytes(8).toString("hex");
|
||||
@@ -83,7 +69,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
|
||||
try {
|
||||
// Setup: Create working table and populate with active document IDs
|
||||
await this.setupWorkingTable(activityThreshold);
|
||||
await this.setupWorkingTable(threshold);
|
||||
|
||||
const activeCount = await this.getWorkingTableCount();
|
||||
|
||||
@@ -111,10 +97,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
batchNumber++;
|
||||
|
||||
try {
|
||||
const updated = await this.processBatchWithRetry(
|
||||
activityThreshold,
|
||||
now
|
||||
);
|
||||
const updated = await this.processBatchWithRetry(threshold, now);
|
||||
totalUpdated += updated;
|
||||
|
||||
Logger.debug(
|
||||
@@ -123,7 +106,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
);
|
||||
|
||||
// Add delay between batches to reduce database contention
|
||||
await sleep(1000);
|
||||
await sleep(10);
|
||||
} catch (error) {
|
||||
totalErrors++;
|
||||
Logger.error(`Batch ${batchNumber} failed after retries`, error);
|
||||
@@ -151,7 +134,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
* that have recent activity. Unlogged tables are faster because they
|
||||
* skip WAL logging, and data loss on crash is acceptable here.
|
||||
*/
|
||||
private async setupWorkingTable(activityThreshold: Date): Promise<void> {
|
||||
private async setupWorkingTable(threshold: Date): Promise<void> {
|
||||
// Drop any existing table first to avoid type conflicts from previous crashed runs
|
||||
await sequelize.query(`DROP TABLE IF EXISTS ${this.workingTable} CASCADE`);
|
||||
|
||||
@@ -187,7 +170,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
)
|
||||
)
|
||||
`,
|
||||
{ replacements: { threshold: activityThreshold } }
|
||||
{ replacements: { threshold } }
|
||||
);
|
||||
|
||||
// Create index on processed column for efficient batch selection
|
||||
@@ -211,7 +194,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
* Processes a batch of documents with retry logic.
|
||||
*/
|
||||
private async processBatchWithRetry(
|
||||
activityThreshold: Date,
|
||||
threshold: Date,
|
||||
now: Date,
|
||||
attempt = 1
|
||||
): Promise<number> {
|
||||
@@ -239,7 +222,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
// Step 2: Calculate scores outside of a transaction
|
||||
const scores = await this.calculateScoresForDocuments(
|
||||
documentIds,
|
||||
activityThreshold,
|
||||
threshold,
|
||||
now
|
||||
);
|
||||
|
||||
@@ -257,7 +240,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
{ error }
|
||||
);
|
||||
await sleep(1000 * attempt);
|
||||
return this.processBatchWithRetry(activityThreshold, now, attempt + 1);
|
||||
return this.processBatchWithRetry(threshold, now, attempt + 1);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
@@ -269,13 +252,10 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
*/
|
||||
private async calculateScoresForDocuments(
|
||||
documentIds: string[],
|
||||
activityThreshold: Date,
|
||||
threshold: Date,
|
||||
now: Date
|
||||
): Promise<DocumentScore[]> {
|
||||
// Build VALUES clause for the batch
|
||||
const valuesClause = documentIds.map((id) => `('${id}'::uuid)`).join(", ");
|
||||
|
||||
const results = await sequelize.query<{
|
||||
const results = await sequelizeReadOnly.query<{
|
||||
documentId: string;
|
||||
total_score: string;
|
||||
}>(
|
||||
@@ -283,7 +263,7 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
SET LOCAL statement_timeout = '${STATEMENT_TIMEOUT_MS}ms';
|
||||
|
||||
WITH batch_docs AS (
|
||||
SELECT * FROM (VALUES ${valuesClause}) AS t(id)
|
||||
SELECT unnest(ARRAY[:documentIds]::uuid[]) AS id
|
||||
),
|
||||
revision_scores AS (
|
||||
SELECT
|
||||
@@ -331,9 +311,10 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
`,
|
||||
{
|
||||
replacements: {
|
||||
threshold: activityThreshold,
|
||||
documentIds,
|
||||
threshold,
|
||||
now,
|
||||
gravity: GRAVITY,
|
||||
gravity: env.POPULARITY_GRAVITY,
|
||||
timeOffset: TIME_OFFSET_HOURS,
|
||||
revisionWeight: ACTIVITY_WEIGHTS.revision,
|
||||
commentWeight: ACTIVITY_WEIGHTS.comment,
|
||||
@@ -359,7 +340,6 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
for (const { documentId, score } of scores) {
|
||||
await sequelize.query(
|
||||
`
|
||||
SET LOCAL statement_timeout = '${STATEMENT_TIMEOUT_MS}ms';
|
||||
UPDATE documents
|
||||
SET "popularityScore" = :score
|
||||
WHERE id = :documentId
|
||||
@@ -375,14 +355,15 @@ export default class UpdateDocumentsPopularityScoreTask extends BaseTask<Props>
|
||||
* Marks documents as processed in the working table
|
||||
*/
|
||||
private async markBatchProcessed(documentIds: string[]): Promise<void> {
|
||||
const valuesClause = documentIds.map((id) => `('${id}'::uuid)`).join(", ");
|
||||
|
||||
await sequelize.query(
|
||||
`
|
||||
UPDATE ${this.workingTable}
|
||||
SET processed = TRUE
|
||||
WHERE "documentId" IN (SELECT id FROM (VALUES ${valuesClause}) AS t(id))
|
||||
`
|
||||
WHERE "documentId" IN (SELECT unnest(ARRAY[:documentIds]::uuid[]))
|
||||
`,
|
||||
{
|
||||
replacements: { documentIds },
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Attachment, Team } from "@server/models";
|
||||
import BaseTask, { TaskPriority } from "./BaseTask";
|
||||
import { sequelizeReadOnly } from "@server/storage/database";
|
||||
|
||||
type Props = {
|
||||
/** The teamId to operate on */
|
||||
@@ -11,7 +12,10 @@ type Props = {
|
||||
*/
|
||||
export default class UpdateTeamAttachmentsSizeTask extends BaseTask<Props> {
|
||||
public async perform({ teamId }: Props) {
|
||||
const sizeInBytes = await Attachment.getTotalSizeForTeam(teamId);
|
||||
const sizeInBytes = await Attachment.getTotalSizeForTeam(
|
||||
sequelizeReadOnly,
|
||||
teamId
|
||||
);
|
||||
if (!sizeInBytes) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -47,10 +47,12 @@ export function createDatabaseInstance(
|
||||
InferAttributes<Model>,
|
||||
InferCreationAttributes<Model>
|
||||
>;
|
||||
}
|
||||
},
|
||||
options?: { readOnly?: boolean }
|
||||
): Sequelize {
|
||||
try {
|
||||
let instance;
|
||||
const isReadOnly = options?.readOnly ?? false;
|
||||
|
||||
// Common options for both URL and object configurations
|
||||
const commonOptions: SequelizeOptions = {
|
||||
@@ -70,17 +72,21 @@ export function createDatabaseInstance(
|
||||
},
|
||||
models: Object.values(input),
|
||||
pool: {
|
||||
max: poolMax,
|
||||
// Read-only connections can have larger pools since there's no write contention
|
||||
max: isReadOnly ? poolMax * 2 : poolMax,
|
||||
min: poolMin,
|
||||
acquire: 30000,
|
||||
idle: 10000,
|
||||
},
|
||||
retry: {
|
||||
match: [/deadlock/i],
|
||||
max: 3,
|
||||
backoffBase: 200,
|
||||
backoffExponent: 1.5,
|
||||
},
|
||||
// Only retry on deadlocks for write connections
|
||||
retry: isReadOnly
|
||||
? undefined
|
||||
: {
|
||||
match: [/deadlock/i],
|
||||
max: 3,
|
||||
backoffBase: 200,
|
||||
backoffExponent: 1.5,
|
||||
},
|
||||
schema,
|
||||
};
|
||||
|
||||
@@ -92,6 +98,29 @@ export function createDatabaseInstance(
|
||||
}
|
||||
|
||||
sequelizeStrictAttributes(instance);
|
||||
|
||||
// Add hooks to warn about write operations on read-only connections
|
||||
if (isReadOnly) {
|
||||
const warnWriteOperation = (operation: string) => {
|
||||
Logger.warn(
|
||||
`Attempted ${operation} operation on read-only database connection`
|
||||
);
|
||||
};
|
||||
|
||||
instance.addHook("beforeCreate", () => warnWriteOperation("CREATE"));
|
||||
instance.addHook("beforeUpdate", () => warnWriteOperation("UPDATE"));
|
||||
instance.addHook("beforeDestroy", () => warnWriteOperation("DELETE"));
|
||||
instance.addHook("beforeBulkCreate", () =>
|
||||
warnWriteOperation("BULK CREATE")
|
||||
);
|
||||
instance.addHook("beforeBulkUpdate", () =>
|
||||
warnWriteOperation("BULK UPDATE")
|
||||
);
|
||||
instance.addHook("beforeBulkDestroy", () =>
|
||||
warnWriteOperation("BULK DELETE")
|
||||
);
|
||||
}
|
||||
|
||||
return instance;
|
||||
} catch (_err) {
|
||||
Logger.fatal(
|
||||
@@ -178,6 +207,20 @@ export function createMigrationRunner(
|
||||
|
||||
export const sequelize = createDatabaseInstance(databaseConfig, models);
|
||||
|
||||
/**
|
||||
* Read-only database connection for read replicas.
|
||||
* Falls back to the main connection if DATABASE_URL_READ_ONLY is not set.
|
||||
*/
|
||||
export const sequelizeReadOnly = env.DATABASE_URL_READ_ONLY
|
||||
? createDatabaseInstance(
|
||||
env.DATABASE_URL_READ_ONLY,
|
||||
{},
|
||||
{
|
||||
readOnly: true,
|
||||
}
|
||||
)
|
||||
: sequelize;
|
||||
|
||||
export const migrations = createMigrationRunner(sequelize, [
|
||||
"migrations/*.js",
|
||||
{ cwd: path.resolve("server") },
|
||||
|
||||
Reference in New Issue
Block a user