feat: purchase addons logic in metering (#1765)

This commit is contained in:
Daniel Salazar
2025-10-16 13:02:19 -07:00
committed by GitHub
parent 08e26af85d
commit b7cee87370
3 changed files with 160 additions and 142 deletions

View File

@@ -6,25 +6,7 @@ import type { SUService } from '../SUService.js';
import { DEFAULT_FREE_SUBSCRIPTION, DEFAULT_TEMP_SUBSCRIPTION, GLOBAL_APP_KEY, METRICS_PREFIX, PERIOD_ESCAPE, POLICY_PREFIX } from './consts.js';
import { COST_MAPS } from './costMaps/index.js';
import { SUB_POLICIES } from './subPolicies/index.js';
interface PolicyAddOns {
purchasedCredits: number
purchasedStorage: number
rateDiscounts: {
[usageType: string]: number | string // TODO DS: string to support graduated discounts eventually
}
}
interface UsageByType {
total: number
[serviceName: string]: number
}
interface MeteringAndBillingServiceDeps {
kvStore: DBKVStore,
superUserService: SUService,
alarmService: AlarmService
eventService: EventService
}
import { MeteringAndBillingServiceDeps, UsageAddons, UsageByType } from './types.js';
/**
* Handles usage metering and supports stubbs for billing methods for current scoped actor
*/
@@ -90,17 +72,11 @@ export class MeteringAndBillingService {
[`${usageType}.count`]: 1,
};
const lastUpdatedKey = `${METRICS_PREFIX}:actor:${actorId}:lastUpdated`;
const lastUpdatedPromise = this.#kvStore.set({
key: lastUpdatedKey,
value: Date.now(),
});
const actorUsageKey = `${METRICS_PREFIX}:actor:${actorId}:${currentMonth}`;
const actorUsagesPromise = this.#kvStore.incr({
key: actorUsageKey,
pathAndAmountMap,
});
}) as Promise<UsageByType>;
const puterConsumptionKey = `${METRICS_PREFIX}:puter:${currentMonth}`; // global consumption across all users and apps
this.#kvStore.incr({
@@ -137,7 +113,33 @@ export class MeteringAndBillingService {
console.warn('Failed to increment aux usage data \'actorAppTotalsKey\' with error: ', e);
});
return (await Promise.all([lastUpdatedPromise, actorUsagesPromise]))[1] as UsageByType;
const lastUpdatedKey = `${METRICS_PREFIX}:actor:${actorId}:lastUpdated`;
this.#kvStore.set({
key: lastUpdatedKey,
value: Date.now(),
}).catch((e: Error) => {
console.warn('Failed to set lastUpdatedKey with error: ', e);
});
// update addon usage if we are over the allowance
const actorSubscriptionPromise = this.getActorSubscription(actor);
const actorAddonsPromise = this.getActorAddons(actor);
const [actorUsages, actorSubscription, actorAddons] = (await Promise.all([actorUsagesPromise, actorSubscriptionPromise, actorAddonsPromise]));
if ( actorUsages.total > actorSubscription.monthUsageAllowance && actorAddons.purchasedCredits ) {
// if we are now over the allowance, start consuming purchased credits
const withinBoundsUsage = Math.max(0, actorSubscription.monthUsageAllowance - actorUsages.total + totalCost);
const overageUsage = totalCost - withinBoundsUsage;
if ( overageUsage > 0 ) {
await this.#kvStore.incr({
key: `${POLICY_PREFIX}:actor:${actorId}:addons`,
pathAndAmountMap: {
consumedPurchaseCredits: Math.max(overageUsage, actorAddons.purchasedCredits - actorAddons.consumedPurchaseCredits), // don't go over the purchased credits, technically a race condition here, but optimistically rare
},
});
}
}
return actorUsages;
});
} catch (e) {
console.error('Metering: Failed to increment usage for actor', actor, 'usageType', usageType, 'usageAmount', usageAmount, e);
@@ -225,14 +227,14 @@ export class MeteringAndBillingService {
async getAllowedUsage(actor: Actor) {
const userSubscriptionPromise = this.getActorSubscription(actor);
const userPolicyAddonsPromise = this.getActorPolicyAddons(actor);
const userAddonsPromise = this.getActorAddons(actor);
const currentUsagePromise = this.getActorCurrentMonthUsageDetails(actor);
const [userSubscription, userPolicyAddons, currentMonthUsage] = await Promise.all([userSubscriptionPromise, userPolicyAddonsPromise, currentUsagePromise]);
const [userSubscription, addons, currentMonthUsage] = await Promise.all([userSubscriptionPromise, userAddonsPromise, currentUsagePromise]);
return {
remaining: Math.max(0, (userSubscription.monthUsageAllowance || 0) + (userPolicyAddons?.purchasedCredits || 0) - (currentMonthUsage.usage.total || 0)),
remaining: Math.max(0, (userSubscription.monthUsageAllowance || 0) + (addons?.purchasedCredits || 0) - (currentMonthUsage.usage.total || 0) - (addons?.consumedPurchaseCredits || 0)),
monthUsageAllowance: userSubscription.monthUsageAllowance,
userPolicyAddons,
addons,
};
}
@@ -275,14 +277,14 @@ export class MeteringAndBillingService {
return availablePolicies.find(({ id }) => id === userSubscriptionId) || availablePolicies.find(({ id }) => id === defaultSubscriptionId)!;
}
async getActorPolicyAddons(actor: Actor) {
async getActorAddons(actor: Actor) {
if ( !actor.type?.user?.uuid ) {
throw new Error('Actor must be a user to get policy addons');
}
const key = `${POLICY_PREFIX}:actor:${actor.type.user?.uuid}:addons`;
return this.#superUserService.sudo(async () => {
const policyAddOns = await this.#kvStore.get({ key });
return (policyAddOns ?? {}) as PolicyAddOns;
const addons = await this.#kvStore.get({ key });
return (addons ?? {}) as UsageAddons;
});
}

View File

@@ -0,0 +1,24 @@
import type { AlarmService } from "../../modules/core/AlarmService";
import type { EventService } from "../EventService";
import type { DBKVStore } from "../repositories/DBKVStore/DBKVStore";
import type { SUService } from "../SUService";
export interface UsageAddons {
purchasedCredits: number // total extra credits purchased - not expirable
consumedPurchaseCredits: number // total credits consumed from purchased ones - these are flattened upon new 'purchase'
purchasedStorage: number // TODO DS: not implemented yet
rateDiscounts: {
[usageType: string]: number | string // TODO DS: string to support graduated discounts eventually
}
}
export interface UsageByType {
total: number
[serviceName: string]: number
}
export interface MeteringAndBillingServiceDeps {
kvStore: DBKVStore,
superUserService: SUService,
alarmService: AlarmService
eventService: EventService
}

View File

@@ -1,92 +1,91 @@
// TypeScript conversion of DBKVStore.mjs
import murmurhash from "murmurhash";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import APIError from '../../../api/APIError.js';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import { Context } from "../../../util/context.js";
import type { MeteringAndBillingService } from "../../MeteringService/MeteringService.js";
const GLOBAL_APP_KEY = 'global';
export class DBKVStore {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
#db: any;
#meteringService: any;
#global_config: any = {};
#meteringService: MeteringAndBillingService;
#global_config: Record<string, unknown> = {};
// TODO DS: make table name configurable
constructor({ sqlClient, meteringAndBillingService, globalConfig }: { sqlClient: any, meteringAndBillingService: any, globalConfig: any }) {
constructor({ sqlClient, meteringAndBillingService, globalConfig }: { sqlClient: unknown, meteringAndBillingService: MeteringAndBillingService, globalConfig: Record<string, unknown> }) {
this.#db = sqlClient;
this.#meteringService = meteringAndBillingService;
this.#global_config = globalConfig;
}
async get({ key }: { key: any }): Promise<any> {
async get({ key }: { key: string | string[] }) {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if (!user) {
if ( !user ) {
throw new Error('User not found');
}
const deleteExpired = async (rows: any[]) => {
const deleteExpired = async (rows: { kkey_hash: string }[]) => {
const query = `DELETE FROM kv WHERE user_id=? AND app=? AND kkey_hash IN (${rows.map(() => '?').join(',')})`;
const params = [user.id, app?.uid ?? GLOBAL_APP_KEY, ...rows.map((r: any) => r.kkey_hash)];
const params = [user.id, app?.uid ?? GLOBAL_APP_KEY, ...rows.map((r) => r.kkey_hash)];
return await this.#db.write(query, params);
};
if (Array.isArray(key)) {
if ( Array.isArray(key) ) {
const keys = key;
const key_hashes = keys.map((key: any) => murmurhash.v3(key));
const key_hashes = keys.map((key: string) => murmurhash.v3(key));
const rows = app
? await this.#db.read('SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND app=? AND kkey_hash IN (?)', [user.id, app.uid, key_hashes])
: await this.#db.read(
`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash IN (${key_hashes.map(() => '?').join(',')})`,
[user.id, key_hashes]
);
: await this.#db.read(`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash IN (${key_hashes.map(() => '?').join(',')})`,
[user.id, key_hashes]);
const kv: any = {};
rows.forEach((row: any) => {
const kvPairs: Record<string, unknown> = {};
rows.forEach((row: { kkey: string, value: string }) => {
row.value = this.#db.case({
mysql: () => row.value,
otherwise: () => JSON.parse(row.value ?? 'null'),
})();
kv[row.kkey] = row.value;
kvPairs[row.kkey] = row.value;
});
const expiredKeys: any[] = [];
rows.forEach((row: any) => {
if (row?.expireAt && row.expireAt < Date.now() / 1000) {
const expiredKeys: { kkey_hash: string }[] = [];
rows.forEach((row: { kkey: string, expireAt: number, kkey_hash: string, value: unknown }) => {
if ( row?.expireAt && row.expireAt < Date.now() / 1000 ) {
expiredKeys.push(row);
kv[row.kkey] = null;
kvPairs[row.kkey] = null;
} else {
kv[row.kkey] = row.value ?? null;
kvPairs[row.kkey] = row.value ?? null;
}
});
// clean up expired keys asynchronously
if (expiredKeys.length) {
if ( expiredKeys.length ) {
deleteExpired(expiredKeys);
}
return keys.map((key: any) => kv[key]);
return keys.map((key: string) => kvPairs[key]);
}
const key_hash = murmurhash.v3(key);
const kv = app
? await this.#db.read('SELECT * FROM kv WHERE user_id=? AND app=? AND kkey_hash=? LIMIT 1', [user.id, app.uid, key_hash])
: await this.#db.read(
`SELECT * FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash=? LIMIT 1`,
[user.id, key_hash]
);
: await this.#db.read(`SELECT * FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash=? LIMIT 1`,
[user.id, key_hash]);
if (kv[0]) {
if ( kv[0] ) {
kv[0].value = this.#db.case({
mysql: () => kv[0].value,
otherwise: () => JSON.parse(kv[0].value ?? 'null'),
})();
}
if (kv[0]?.expireAt && kv[0].expireAt < Date.now() / 1000) {
if ( kv[0]?.expireAt && kv[0].expireAt < Date.now() / 1000 ) {
// key has expired
// clean up asynchronously
deleteExpired([kv[0]]);
@@ -98,50 +97,47 @@ export class DBKVStore {
return kv[0]?.value ?? null;
}
async set({ key, value, expireAt }: { key: any, value: any, expireAt?: any }): Promise<boolean> {
async set({ key, value, expireAt }: { key: string, value: unknown, expireAt?: number }) {
const actor = Context.get('actor');
const config = this.#global_config;
key = String(key);
if (Buffer.byteLength(key, 'utf8') > config.kv_max_key_size) {
if ( Buffer.byteLength(key, 'utf8') > (config.kv_max_key_size as number) ) {
throw new Error(`key is too large. Max size is ${config.kv_max_key_size}.`);
}
value = value === undefined ? null : value;
if (
value !== null &&
Buffer.byteLength(JSON.stringify(value), 'utf8') > config.kv_max_value_size
Buffer.byteLength(JSON.stringify(value), 'utf8') > (config.kv_max_value_size as number)
) {
throw new Error(`value is too large. Max size is ${config.kv_max_value_size}.`);
}
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if (!user) {
if ( !user ) {
throw new Error('User not found');
}
const key_hash = murmurhash.v3(key);
try {
await this.#db.write(
`INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt)
await this.#db.write(`INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt)
VALUES (?, ?, ?, ?, ?, ?) ${this.#db.case({
mysql: 'ON DUPLICATE KEY UPDATE value = ?',
sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET value = excluded.value',
})
}`,
[
user.id,
app?.uid ?? GLOBAL_APP_KEY,
key_hash,
key,
JSON.stringify(value),
expireAt ?? null,
...this.#db.case({ mysql: [value], otherwise: [] }),
]
);
} catch (e: any) {
mysql: 'ON DUPLICATE KEY UPDATE value = ?',
sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET value = excluded.value',
})
}`,
[
user.id,
app?.uid ?? GLOBAL_APP_KEY,
key_hash,
key,
JSON.stringify(value),
expireAt ?? null,
...this.#db.case({ mysql: [value], otherwise: [] }),
]);
} catch ( e: unknown ) {
console.error(e);
}
@@ -150,11 +146,11 @@ export class DBKVStore {
return true;
}
async del({ key }: { key: any }): Promise<boolean> {
async del({ key }: { key: string }) {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if (!user) {
if ( !user ) {
throw new Error('User not found');
}
@@ -171,27 +167,25 @@ export class DBKVStore {
return true;
}
async list({ as }: { as?: any }): Promise<any> {
async list({ as }: { as?: string }) {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if (!user) {
if ( !user ) {
throw new Error('User not found');
}
let rows = app
? await this.#db.read('SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND app=?', [user.id, app.uid])
: await this.#db.read(
`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}')`,
[user.id]
);
: await this.#db.read(`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}')`,
[user.id]);
rows = rows.filter((row: any) => {
rows = rows.filter((row: { expireAt: number }) => {
return !row?.expireAt || row?.expireAt > Date.now() / 1000;
});
rows = rows.map((row: any) => ({
rows = rows.map((row: { kkey: string, value: string }) => ({
key: row.kkey,
value: this.#db.case({
mysql: () => row.value,
@@ -201,17 +195,17 @@ export class DBKVStore {
as = as || 'entries';
if (!['keys', 'values', 'entries'].includes(as)) {
if ( !['keys', 'values', 'entries'].includes(as) ) {
throw APIError.create('field_invalid', null, {
key: 'as',
expected: '"keys", "values", or "entries"',
});
}
if (as === 'keys') {
rows = rows.map((row: any) => row.key);
} else if (as === 'values') {
rows = rows.map((row: any) => row.value);
if ( as === 'keys' ) {
rows = rows.map((row: { key: string }) => row.key);
} else if ( as === 'values' ) {
rows = rows.map((row: { value: unknown }) => row.value);
}
await this.#meteringService.incrementUsage(actor, 'kv:read', rows.length);
@@ -219,11 +213,11 @@ export class DBKVStore {
return rows;
}
async flush(): Promise<boolean> {
async flush() {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if (!user) {
if ( !user ) {
throw new Error('User not found');
}
@@ -237,8 +231,8 @@ export class DBKVStore {
return true;
}
async expireAt({ key, timestamp }: { key: any, timestamp: any }): Promise<any> {
if (key === '') {
async expireAt({ key, timestamp }: { key: string, timestamp: number }){
if ( key === '' ) {
throw APIError.create('field_empty', null, {
key: 'key',
});
@@ -249,8 +243,8 @@ export class DBKVStore {
return await this.#expireat(key, timestamp);
}
async expire({ key, ttl }: { key: any, ttl: any }): Promise<any> {
if (key === '') {
async expire({ key, ttl }: { key: string, ttl: number }) {
if ( key === '' ) {
throw APIError.create('field_empty', null, {
key: 'key',
});
@@ -264,86 +258,84 @@ export class DBKVStore {
return await this.#expireat(key, timestamp);
}
async incr({ key, pathAndAmountMap }: { key: string, pathAndAmountMap: Record<string, number> }): Promise<any> {
async incr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string, pathAndAmountMap: T }): Promise<T extends { "": number } ? number : Record<string, number>> {
let currVal = await this.get({ key });
const pathEntries = Object.entries(pathAndAmountMap);
if (typeof currVal !== 'object' && pathEntries.length <= 1 && !pathEntries[0]?.[0]) {
if ( typeof currVal !== 'object' && pathEntries.length <= 1 && !pathEntries[0]?.[0] ) {
const amount = pathEntries[0]?.[1] ?? 1;
this.set({ key, value: (Number(currVal) || 0) + amount });
return (Number(currVal) || 0) + amount;
return ((Number(currVal) || 0) + amount) as T extends { "": number } ? number : Record<string, number>;
}
// TODO DS: support arrays this also needs dynamodb implementation
if (Array.isArray(currVal)) {
if ( Array.isArray(currVal) ) {
throw new Error('Current value is an array');
}
if (!currVal) {
if ( !currVal ) {
currVal = {};
}
if (typeof currVal !== 'object') {
if ( typeof currVal !== 'object' ) {
throw new Error('Current value is not an object');
}
// create or change values as needed
for (const [path, amount] of Object.entries(pathAndAmountMap)) {
for ( const [path, amount] of Object.entries(pathAndAmountMap) ) {
const pathParts = path.split('.');
let obj = currVal;
if (obj === null) continue;
for (let i = 0; i < pathParts.length - 1; i++) {
if ( obj === null ) continue;
for ( let i = 0; i < pathParts.length - 1; i++ ) {
const part = pathParts[i];
if (!obj[part]) {
if ( !obj[part] ) {
obj[part] = {};
}
if (typeof obj[part] !== 'object' || Array.isArray(currVal)) {
if ( typeof obj[part] !== 'object' || Array.isArray(currVal) ) {
throw new Error(`Path ${pathParts.slice(0, i + 1).join('.')} is not an object`);
}
obj = obj[part];
}
if (obj === null) continue;
if ( obj === null ) continue;
const lastPart = pathParts[pathParts.length - 1];
if (!obj[lastPart]) {
if ( !obj[lastPart] ) {
obj[lastPart] = 0;
}
if (typeof obj[lastPart] !== 'number') {
if ( typeof obj[lastPart] !== 'number' ) {
throw new Error(`Value at path ${path} is not a number`);
}
obj[lastPart] += amount;
}
this.set({ key, value: currVal });
return currVal;
return currVal as T extends { "": number } ? number : Record<string, number>;
}
async decr(...params: Parameters<typeof DBKVStore.prototype.incr>): Promise<any> {
async decr(...params: Parameters<typeof DBKVStore.prototype.incr>): ReturnType<typeof DBKVStore.prototype.incr> {
return await this.incr(...params);
}
async #expireat(key: any, timestamp: any): Promise<any> {
async #expireat(key: string, timestamp: number){
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if (!user) {
if ( !user ) {
throw new Error('User not found');
}
const key_hash = murmurhash.v3(key);
try {
await this.#db.write(
`INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt)
await this.#db.write(`INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt)
VALUES (?, ?, ?, ?, ?, ?) ${this.#db.case({
mysql: 'ON DUPLICATE KEY UPDATE expireAt = ?',
sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET expireAt = excluded.expireAt',
})
}`,
[
user.id,
app?.uid ?? GLOBAL_APP_KEY,
key_hash,
key,
null, // empty value
timestamp,
...this.#db.case({ mysql: [timestamp], otherwise: [] }),
]
);
} catch (e: any) {
mysql: 'ON DUPLICATE KEY UPDATE expireAt = ?',
sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET expireAt = excluded.expireAt',
})
}`,
[
user.id,
app?.uid ?? GLOBAL_APP_KEY,
key_hash,
key,
null, // empty value
timestamp,
...this.#db.case({ mysql: [timestamp], otherwise: [] }),
]);
} catch ( e: unknown ) {
console.error(e);
}
}