From b7cee8737073cdfef62df7ea13f16acfdec0e3b1 Mon Sep 17 00:00:00 2001 From: Daniel Salazar Date: Thu, 16 Oct 2025 13:02:19 -0700 Subject: [PATCH] feat: purchase addons logic in metering (#1765) --- .../MeteringService/MeteringService.ts | 70 +++--- .../src/services/MeteringService/types.ts | 24 ++ .../repositories/DBKVStore/DBKVStore.ts | 208 +++++++++--------- 3 files changed, 160 insertions(+), 142 deletions(-) create mode 100644 src/backend/src/services/MeteringService/types.ts diff --git a/src/backend/src/services/MeteringService/MeteringService.ts b/src/backend/src/services/MeteringService/MeteringService.ts index 58342db0..78115f93 100644 --- a/src/backend/src/services/MeteringService/MeteringService.ts +++ b/src/backend/src/services/MeteringService/MeteringService.ts @@ -6,25 +6,7 @@ import type { SUService } from '../SUService.js'; import { DEFAULT_FREE_SUBSCRIPTION, DEFAULT_TEMP_SUBSCRIPTION, GLOBAL_APP_KEY, METRICS_PREFIX, PERIOD_ESCAPE, POLICY_PREFIX } from './consts.js'; import { COST_MAPS } from './costMaps/index.js'; import { SUB_POLICIES } from './subPolicies/index.js'; - -interface PolicyAddOns { - purchasedCredits: number - purchasedStorage: number - rateDiscounts: { - [usageType: string]: number | string // TODO DS: string to support graduated discounts eventually - } -} -interface UsageByType { - total: number - [serviceName: string]: number -} - -interface MeteringAndBillingServiceDeps { - kvStore: DBKVStore, - superUserService: SUService, - alarmService: AlarmService - eventService: EventService -} +import { MeteringAndBillingServiceDeps, UsageAddons, UsageByType } from './types.js'; /** * Handles usage metering and supports stubbs for billing methods for current scoped actor */ @@ -90,17 +72,11 @@ export class MeteringAndBillingService { [`${usageType}.count`]: 1, }; - const lastUpdatedKey = `${METRICS_PREFIX}:actor:${actorId}:lastUpdated`; - const lastUpdatedPromise = this.#kvStore.set({ - key: lastUpdatedKey, - value: Date.now(), - }); - const actorUsageKey = `${METRICS_PREFIX}:actor:${actorId}:${currentMonth}`; const actorUsagesPromise = this.#kvStore.incr({ key: actorUsageKey, pathAndAmountMap, - }); + }) as Promise; const puterConsumptionKey = `${METRICS_PREFIX}:puter:${currentMonth}`; // global consumption across all users and apps this.#kvStore.incr({ @@ -137,7 +113,33 @@ export class MeteringAndBillingService { console.warn('Failed to increment aux usage data \'actorAppTotalsKey\' with error: ', e); }); - return (await Promise.all([lastUpdatedPromise, actorUsagesPromise]))[1] as UsageByType; + const lastUpdatedKey = `${METRICS_PREFIX}:actor:${actorId}:lastUpdated`; + this.#kvStore.set({ + key: lastUpdatedKey, + value: Date.now(), + }).catch((e: Error) => { + console.warn('Failed to set lastUpdatedKey with error: ', e); + }); + + // update addon usage if we are over the allowance + const actorSubscriptionPromise = this.getActorSubscription(actor); + const actorAddonsPromise = this.getActorAddons(actor); + const [actorUsages, actorSubscription, actorAddons] = (await Promise.all([actorUsagesPromise, actorSubscriptionPromise, actorAddonsPromise])); + if ( actorUsages.total > actorSubscription.monthUsageAllowance && actorAddons.purchasedCredits ) { + // if we are now over the allowance, start consuming purchased credits + const withinBoundsUsage = Math.max(0, actorSubscription.monthUsageAllowance - actorUsages.total + totalCost); + const overageUsage = totalCost - withinBoundsUsage; + + if ( overageUsage > 0 ) { + await this.#kvStore.incr({ + key: `${POLICY_PREFIX}:actor:${actorId}:addons`, + pathAndAmountMap: { + consumedPurchaseCredits: Math.max(overageUsage, actorAddons.purchasedCredits - actorAddons.consumedPurchaseCredits), // don't go over the purchased credits, technically a race condition here, but optimistically rare + }, + }); + } + } + return actorUsages; }); } catch (e) { console.error('Metering: Failed to increment usage for actor', actor, 'usageType', usageType, 'usageAmount', usageAmount, e); @@ -225,14 +227,14 @@ export class MeteringAndBillingService { async getAllowedUsage(actor: Actor) { const userSubscriptionPromise = this.getActorSubscription(actor); - const userPolicyAddonsPromise = this.getActorPolicyAddons(actor); + const userAddonsPromise = this.getActorAddons(actor); const currentUsagePromise = this.getActorCurrentMonthUsageDetails(actor); - const [userSubscription, userPolicyAddons, currentMonthUsage] = await Promise.all([userSubscriptionPromise, userPolicyAddonsPromise, currentUsagePromise]); + const [userSubscription, addons, currentMonthUsage] = await Promise.all([userSubscriptionPromise, userAddonsPromise, currentUsagePromise]); return { - remaining: Math.max(0, (userSubscription.monthUsageAllowance || 0) + (userPolicyAddons?.purchasedCredits || 0) - (currentMonthUsage.usage.total || 0)), + remaining: Math.max(0, (userSubscription.monthUsageAllowance || 0) + (addons?.purchasedCredits || 0) - (currentMonthUsage.usage.total || 0) - (addons?.consumedPurchaseCredits || 0)), monthUsageAllowance: userSubscription.monthUsageAllowance, - userPolicyAddons, + addons, }; } @@ -275,14 +277,14 @@ export class MeteringAndBillingService { return availablePolicies.find(({ id }) => id === userSubscriptionId) || availablePolicies.find(({ id }) => id === defaultSubscriptionId)!; } - async getActorPolicyAddons(actor: Actor) { + async getActorAddons(actor: Actor) { if ( !actor.type?.user?.uuid ) { throw new Error('Actor must be a user to get policy addons'); } const key = `${POLICY_PREFIX}:actor:${actor.type.user?.uuid}:addons`; return this.#superUserService.sudo(async () => { - const policyAddOns = await this.#kvStore.get({ key }); - return (policyAddOns ?? {}) as PolicyAddOns; + const addons = await this.#kvStore.get({ key }); + return (addons ?? {}) as UsageAddons; }); } diff --git a/src/backend/src/services/MeteringService/types.ts b/src/backend/src/services/MeteringService/types.ts new file mode 100644 index 00000000..28cc89cb --- /dev/null +++ b/src/backend/src/services/MeteringService/types.ts @@ -0,0 +1,24 @@ +import type { AlarmService } from "../../modules/core/AlarmService"; +import type { EventService } from "../EventService"; +import type { DBKVStore } from "../repositories/DBKVStore/DBKVStore"; +import type { SUService } from "../SUService"; + +export interface UsageAddons { + purchasedCredits: number // total extra credits purchased - not expirable + consumedPurchaseCredits: number // total credits consumed from purchased ones - these are flattened upon new 'purchase' + purchasedStorage: number // TODO DS: not implemented yet + rateDiscounts: { + [usageType: string]: number | string // TODO DS: string to support graduated discounts eventually + } +} +export interface UsageByType { + total: number + [serviceName: string]: number +} + +export interface MeteringAndBillingServiceDeps { + kvStore: DBKVStore, + superUserService: SUService, + alarmService: AlarmService + eventService: EventService +} \ No newline at end of file diff --git a/src/backend/src/services/repositories/DBKVStore/DBKVStore.ts b/src/backend/src/services/repositories/DBKVStore/DBKVStore.ts index 115838f6..c04908e6 100644 --- a/src/backend/src/services/repositories/DBKVStore/DBKVStore.ts +++ b/src/backend/src/services/repositories/DBKVStore/DBKVStore.ts @@ -1,92 +1,91 @@ -// TypeScript conversion of DBKVStore.mjs import murmurhash from "murmurhash"; +// eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore import APIError from '../../../api/APIError.js'; +// eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore import { Context } from "../../../util/context.js"; +import type { MeteringAndBillingService } from "../../MeteringService/MeteringService.js"; const GLOBAL_APP_KEY = 'global'; export class DBKVStore { + // eslint-disable-next-line @typescript-eslint/no-explicit-any #db: any; - #meteringService: any; - #global_config: any = {}; + #meteringService: MeteringAndBillingService; + #global_config: Record = {}; // TODO DS: make table name configurable - constructor({ sqlClient, meteringAndBillingService, globalConfig }: { sqlClient: any, meteringAndBillingService: any, globalConfig: any }) { + constructor({ sqlClient, meteringAndBillingService, globalConfig }: { sqlClient: unknown, meteringAndBillingService: MeteringAndBillingService, globalConfig: Record }) { this.#db = sqlClient; this.#meteringService = meteringAndBillingService; this.#global_config = globalConfig; } - async get({ key }: { key: any }): Promise { + async get({ key }: { key: string | string[] }) { const actor = Context.get('actor'); const app = actor.type?.app ?? undefined; const user = actor.type?.user ?? undefined; - if (!user) { + if ( !user ) { throw new Error('User not found'); } - const deleteExpired = async (rows: any[]) => { + const deleteExpired = async (rows: { kkey_hash: string }[]) => { const query = `DELETE FROM kv WHERE user_id=? AND app=? AND kkey_hash IN (${rows.map(() => '?').join(',')})`; - const params = [user.id, app?.uid ?? GLOBAL_APP_KEY, ...rows.map((r: any) => r.kkey_hash)]; + const params = [user.id, app?.uid ?? GLOBAL_APP_KEY, ...rows.map((r) => r.kkey_hash)]; return await this.#db.write(query, params); }; - if (Array.isArray(key)) { + if ( Array.isArray(key) ) { const keys = key; - const key_hashes = keys.map((key: any) => murmurhash.v3(key)); + const key_hashes = keys.map((key: string) => murmurhash.v3(key)); const rows = app ? await this.#db.read('SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND app=? AND kkey_hash IN (?)', [user.id, app.uid, key_hashes]) - : await this.#db.read( - `SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash IN (${key_hashes.map(() => '?').join(',')})`, - [user.id, key_hashes] - ); + : await this.#db.read(`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash IN (${key_hashes.map(() => '?').join(',')})`, + [user.id, key_hashes]); - const kv: any = {}; - rows.forEach((row: any) => { + const kvPairs: Record = {}; + rows.forEach((row: { kkey: string, value: string }) => { row.value = this.#db.case({ mysql: () => row.value, otherwise: () => JSON.parse(row.value ?? 'null'), })(); - kv[row.kkey] = row.value; + kvPairs[row.kkey] = row.value; }); - const expiredKeys: any[] = []; - rows.forEach((row: any) => { - if (row?.expireAt && row.expireAt < Date.now() / 1000) { + const expiredKeys: { kkey_hash: string }[] = []; + rows.forEach((row: { kkey: string, expireAt: number, kkey_hash: string, value: unknown }) => { + if ( row?.expireAt && row.expireAt < Date.now() / 1000 ) { expiredKeys.push(row); - kv[row.kkey] = null; + kvPairs[row.kkey] = null; } else { - kv[row.kkey] = row.value ?? null; + kvPairs[row.kkey] = row.value ?? null; } }); // clean up expired keys asynchronously - if (expiredKeys.length) { + if ( expiredKeys.length ) { deleteExpired(expiredKeys); } - return keys.map((key: any) => kv[key]); + return keys.map((key: string) => kvPairs[key]); } const key_hash = murmurhash.v3(key); const kv = app ? await this.#db.read('SELECT * FROM kv WHERE user_id=? AND app=? AND kkey_hash=? LIMIT 1', [user.id, app.uid, key_hash]) - : await this.#db.read( - `SELECT * FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash=? LIMIT 1`, - [user.id, key_hash] - ); + : await this.#db.read(`SELECT * FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash=? LIMIT 1`, + [user.id, key_hash]); - if (kv[0]) { + if ( kv[0] ) { kv[0].value = this.#db.case({ mysql: () => kv[0].value, otherwise: () => JSON.parse(kv[0].value ?? 'null'), })(); } - if (kv[0]?.expireAt && kv[0].expireAt < Date.now() / 1000) { + if ( kv[0]?.expireAt && kv[0].expireAt < Date.now() / 1000 ) { // key has expired // clean up asynchronously deleteExpired([kv[0]]); @@ -98,50 +97,47 @@ export class DBKVStore { return kv[0]?.value ?? null; } - async set({ key, value, expireAt }: { key: any, value: any, expireAt?: any }): Promise { + async set({ key, value, expireAt }: { key: string, value: unknown, expireAt?: number }) { const actor = Context.get('actor'); const config = this.#global_config; key = String(key); - if (Buffer.byteLength(key, 'utf8') > config.kv_max_key_size) { + if ( Buffer.byteLength(key, 'utf8') > (config.kv_max_key_size as number) ) { throw new Error(`key is too large. Max size is ${config.kv_max_key_size}.`); } - value = value === undefined ? null : value; if ( value !== null && - Buffer.byteLength(JSON.stringify(value), 'utf8') > config.kv_max_value_size + Buffer.byteLength(JSON.stringify(value), 'utf8') > (config.kv_max_value_size as number) ) { throw new Error(`value is too large. Max size is ${config.kv_max_value_size}.`); } const app = actor.type?.app ?? undefined; const user = actor.type?.user ?? undefined; - if (!user) { + if ( !user ) { throw new Error('User not found'); } const key_hash = murmurhash.v3(key); try { - await this.#db.write( - `INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt) + await this.#db.write(`INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt) VALUES (?, ?, ?, ?, ?, ?) ${this.#db.case({ - mysql: 'ON DUPLICATE KEY UPDATE value = ?', - sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET value = excluded.value', - }) - }`, - [ - user.id, - app?.uid ?? GLOBAL_APP_KEY, - key_hash, - key, - JSON.stringify(value), - expireAt ?? null, - ...this.#db.case({ mysql: [value], otherwise: [] }), - ] - ); - } catch (e: any) { + mysql: 'ON DUPLICATE KEY UPDATE value = ?', + sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET value = excluded.value', + }) + }`, + [ + user.id, + app?.uid ?? GLOBAL_APP_KEY, + key_hash, + key, + JSON.stringify(value), + expireAt ?? null, + ...this.#db.case({ mysql: [value], otherwise: [] }), + ]); + } catch ( e: unknown ) { console.error(e); } @@ -150,11 +146,11 @@ export class DBKVStore { return true; } - async del({ key }: { key: any }): Promise { + async del({ key }: { key: string }) { const actor = Context.get('actor'); const app = actor.type?.app ?? undefined; const user = actor.type?.user ?? undefined; - if (!user) { + if ( !user ) { throw new Error('User not found'); } @@ -171,27 +167,25 @@ export class DBKVStore { return true; } - async list({ as }: { as?: any }): Promise { + async list({ as }: { as?: string }) { const actor = Context.get('actor'); const app = actor.type?.app ?? undefined; const user = actor.type?.user ?? undefined; - if (!user) { + if ( !user ) { throw new Error('User not found'); } let rows = app ? await this.#db.read('SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND app=?', [user.id, app.uid]) - : await this.#db.read( - `SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}')`, - [user.id] - ); + : await this.#db.read(`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}')`, + [user.id]); - rows = rows.filter((row: any) => { + rows = rows.filter((row: { expireAt: number }) => { return !row?.expireAt || row?.expireAt > Date.now() / 1000; }); - rows = rows.map((row: any) => ({ + rows = rows.map((row: { kkey: string, value: string }) => ({ key: row.kkey, value: this.#db.case({ mysql: () => row.value, @@ -201,17 +195,17 @@ export class DBKVStore { as = as || 'entries'; - if (!['keys', 'values', 'entries'].includes(as)) { + if ( !['keys', 'values', 'entries'].includes(as) ) { throw APIError.create('field_invalid', null, { key: 'as', expected: '"keys", "values", or "entries"', }); } - if (as === 'keys') { - rows = rows.map((row: any) => row.key); - } else if (as === 'values') { - rows = rows.map((row: any) => row.value); + if ( as === 'keys' ) { + rows = rows.map((row: { key: string }) => row.key); + } else if ( as === 'values' ) { + rows = rows.map((row: { value: unknown }) => row.value); } await this.#meteringService.incrementUsage(actor, 'kv:read', rows.length); @@ -219,11 +213,11 @@ export class DBKVStore { return rows; } - async flush(): Promise { + async flush() { const actor = Context.get('actor'); const app = actor.type?.app ?? undefined; const user = actor.type?.user ?? undefined; - if (!user) { + if ( !user ) { throw new Error('User not found'); } @@ -237,8 +231,8 @@ export class DBKVStore { return true; } - async expireAt({ key, timestamp }: { key: any, timestamp: any }): Promise { - if (key === '') { + async expireAt({ key, timestamp }: { key: string, timestamp: number }){ + if ( key === '' ) { throw APIError.create('field_empty', null, { key: 'key', }); @@ -249,8 +243,8 @@ export class DBKVStore { return await this.#expireat(key, timestamp); } - async expire({ key, ttl }: { key: any, ttl: any }): Promise { - if (key === '') { + async expire({ key, ttl }: { key: string, ttl: number }) { + if ( key === '' ) { throw APIError.create('field_empty', null, { key: 'key', }); @@ -264,86 +258,84 @@ export class DBKVStore { return await this.#expireat(key, timestamp); } - async incr({ key, pathAndAmountMap }: { key: string, pathAndAmountMap: Record }): Promise { + async incr>({ key, pathAndAmountMap }: { key: string, pathAndAmountMap: T }): Promise> { let currVal = await this.get({ key }); const pathEntries = Object.entries(pathAndAmountMap); - if (typeof currVal !== 'object' && pathEntries.length <= 1 && !pathEntries[0]?.[0]) { + if ( typeof currVal !== 'object' && pathEntries.length <= 1 && !pathEntries[0]?.[0] ) { const amount = pathEntries[0]?.[1] ?? 1; this.set({ key, value: (Number(currVal) || 0) + amount }); - return (Number(currVal) || 0) + amount; + return ((Number(currVal) || 0) + amount) as T extends { "": number } ? number : Record; } // TODO DS: support arrays this also needs dynamodb implementation - if (Array.isArray(currVal)) { + if ( Array.isArray(currVal) ) { throw new Error('Current value is an array'); } - if (!currVal) { + if ( !currVal ) { currVal = {}; } - if (typeof currVal !== 'object') { + if ( typeof currVal !== 'object' ) { throw new Error('Current value is not an object'); } // create or change values as needed - for (const [path, amount] of Object.entries(pathAndAmountMap)) { + for ( const [path, amount] of Object.entries(pathAndAmountMap) ) { const pathParts = path.split('.'); let obj = currVal; - if (obj === null) continue; - for (let i = 0; i < pathParts.length - 1; i++) { + if ( obj === null ) continue; + for ( let i = 0; i < pathParts.length - 1; i++ ) { const part = pathParts[i]; - if (!obj[part]) { + if ( !obj[part] ) { obj[part] = {}; } - if (typeof obj[part] !== 'object' || Array.isArray(currVal)) { + if ( typeof obj[part] !== 'object' || Array.isArray(currVal) ) { throw new Error(`Path ${pathParts.slice(0, i + 1).join('.')} is not an object`); } obj = obj[part]; } - if (obj === null) continue; + if ( obj === null ) continue; const lastPart = pathParts[pathParts.length - 1]; - if (!obj[lastPart]) { + if ( !obj[lastPart] ) { obj[lastPart] = 0; } - if (typeof obj[lastPart] !== 'number') { + if ( typeof obj[lastPart] !== 'number' ) { throw new Error(`Value at path ${path} is not a number`); } obj[lastPart] += amount; } this.set({ key, value: currVal }); - return currVal; + return currVal as T extends { "": number } ? number : Record; } - async decr(...params: Parameters): Promise { + async decr(...params: Parameters): ReturnType { return await this.incr(...params); } - async #expireat(key: any, timestamp: any): Promise { + async #expireat(key: string, timestamp: number){ const actor = Context.get('actor'); const app = actor.type?.app ?? undefined; const user = actor.type?.user ?? undefined; - if (!user) { + if ( !user ) { throw new Error('User not found'); } const key_hash = murmurhash.v3(key); try { - await this.#db.write( - `INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt) + await this.#db.write(`INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt) VALUES (?, ?, ?, ?, ?, ?) ${this.#db.case({ - mysql: 'ON DUPLICATE KEY UPDATE expireAt = ?', - sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET expireAt = excluded.expireAt', - }) - }`, - [ - user.id, - app?.uid ?? GLOBAL_APP_KEY, - key_hash, - key, - null, // empty value - timestamp, - ...this.#db.case({ mysql: [timestamp], otherwise: [] }), - ] - ); - } catch (e: any) { + mysql: 'ON DUPLICATE KEY UPDATE expireAt = ?', + sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET expireAt = excluded.expireAt', + }) + }`, + [ + user.id, + app?.uid ?? GLOBAL_APP_KEY, + key_hash, + key, + null, // empty value + timestamp, + ...this.#db.case({ mysql: [timestamp], otherwise: [] }), + ]); + } catch ( e: unknown ) { console.error(e); } }