feat: route for global usage aggregate (#1810)

* feat: route for global usage aggregate

* feat: add globalUsageEndpoint to puter-js

* fix: globalUsage endpoint
This commit is contained in:
Daniel Salazar
2025-10-22 16:46:13 -07:00
committed by GitHub
parent c9ab01cf1e
commit 256189a7de
6 changed files with 123 additions and 18 deletions

View File

@@ -1,3 +1,7 @@
{
"unlimitedUsage": false
"unlimitedUsage": false,
"allowedGlobalUsageUsers": [
"06ab2f87-aef5-441b-9c60-debbb8d24dda",
"d8fd169b-4e93-484a-bd84-115b5a2f0ed4"
]
}

View File

@@ -35,4 +35,23 @@ extension.get('/metering/usage/:appId', { subdomain: 'api' }, async (req, res) =
return;
});
extension.get('/metering/globalUsage', { subdomain: 'api' }, async (req, res) => {
const meteringService = meteringServiceWrapper.meteringService;
const actor = req.actor;
if ( !actor ) {
throw Error('actor not found in context');
}
// check if actor is allowed to view global usage
const allowedUsers = extension.config.allowedGlobalUsageUsers || [];
if ( !allowedUsers.includes(actor.type?.user.uuid) ) {
res.status(403).json({ error: 'You are not authorized to view global usage' });
return;
}
const globalUsage = await meteringService.getGlobalUsage();
res.status(200).json(globalUsage);
return;
});
console.debug('Loaded /metering/usage route');

View File

@@ -7,12 +7,14 @@ import type { SUService } from '../SUService.js';
import { DEFAULT_FREE_SUBSCRIPTION, DEFAULT_TEMP_SUBSCRIPTION, GLOBAL_APP_KEY, METRICS_PREFIX, PERIOD_ESCAPE, POLICY_PREFIX } from './consts.js';
import { COST_MAPS } from './costMaps/index.js';
import { SUB_POLICIES } from './subPolicies/index.js';
import { MeteringServiceDeps, UsageAddons, UsageByType } from './types.js';
import { AppTotals, MeteringServiceDeps, UsageAddons, UsageByType, UsageRecord } from './types.js';
/**
* Handles usage metering and supports stubbs for billing methods for current scoped actor
*/
export class MeteringService {
static GLOBAL_SHARD_COUNT = 1000; // number of global usage shards to spread writes across
static APP_SHARD_COUNT = 100; // number of app usage shards to spread writes across
#kvStore: DBKVStore;
#superUserService: SUService;
#alarmService: AlarmService;
@@ -42,7 +44,7 @@ export class MeteringService {
* @returns
*/
#generateGloabalUsageKey(userId: string, appId: string, currentMonth: string) {
const hashOfUserAndApp = murmurhash.v3(`${userId}:${appId}`) % 1000;
const hashOfUserAndApp = murmurhash.v3(`${userId}:${appId}`) % MeteringService.GLOBAL_SHARD_COUNT;
const key = `${METRICS_PREFIX}:puter:${hashOfUserAndApp}:${currentMonth}`;
return key;
}
@@ -54,7 +56,7 @@ export class MeteringService {
* @returns
*/
#generateAppUsageKey(appId: string, currentMonth: string) {
const hashOfApp = murmurhash.v3(`${appId}`) % 100;
const hashOfApp = murmurhash.v3(`${appId}`) % MeteringService.APP_SHARD_COUNT;
const key = `${METRICS_PREFIX}:app:${appId}:${hashOfApp}:${currentMonth}`;
return key;
}
@@ -182,7 +184,7 @@ export class MeteringService {
}
return actorUsages;
});
} catch (e) {
} catch( e ) {
console.error('Metering: Failed to increment usage for actor', actor, 'usageType', usageType, 'usageAmount', usageAmount, e);
this.#alarmService.create('metering-service-error', (e as Error).message, {
error: e,
@@ -207,21 +209,21 @@ export class MeteringService {
];
return await this.#superUserService.sudo(async () => {
const [usage, appTotals] = await this.#kvStore.get({ key: keys }) as [UsageByType | null, Record<string, UsageByType> | null];
const [usage, appTotals] = await this.#kvStore.get({ key: keys }) as [UsageByType | null, Record<string, AppTotals> | null];
// only show details of app based on actor, aggregate all as others, except if app is global one or null, then show all
const appId = actor.type?.app?.uid;
if ( appTotals && appId ) {
const filteredAppTotals: Record<string, UsageByType> = {};
let othersTotal: UsageByType = {} as UsageByType;
const filteredAppTotals: Record<string, AppTotals> = {};
let othersTotal: AppTotals = {} as AppTotals;
Object.entries(appTotals).forEach(([appKey, appUsage]) => {
if ( appKey === appId ) {
filteredAppTotals[appKey] = appUsage;
} else {
Object.entries(appUsage).forEach(([usageKind, amount]) => {
if ( !othersTotal[usageKind] ) {
othersTotal[usageKind] = 0;
if ( !othersTotal[usageKind as keyof AppTotals] ) {
othersTotal[usageKind as keyof AppTotals] = 0;
}
othersTotal[usageKind] += amount;
othersTotal[usageKind as keyof AppTotals] += amount;
});
}
});
@@ -347,6 +349,37 @@ export class MeteringService {
});
}
async getGlobalUsage(){
// TODO DS: add validation here?
const currentMonth = this.#getMonthYearString();
const keyPrefix = `${METRICS_PREFIX}:puter:`;
return this.#superUserService.sudo(async () => {
const keys = [];
for ( let shard = 0; shard < MeteringService.GLOBAL_SHARD_COUNT; shard++ ) {
keys.push(`${keyPrefix}${shard}:${currentMonth}`);
}
keys.push(`${keyPrefix}${currentMonth}`); // for initial unsharded data
const usages = await this.#kvStore.get({ key: keys }) as UsageByType[];
const aggregatedUsage: UsageByType = { total: 0 };
usages.filter(Boolean).forEach(({ total, ...usage } = {} as UsageByType) => {
aggregatedUsage.total += total || 0;
Object.entries((usage || {}) as Record<string, UsageRecord>).forEach(([usageKind, record]) => {
if ( !aggregatedUsage[usageKind] ) {
aggregatedUsage[usageKind] = { cost: 0, units: 0, count: 0 } as UsageRecord;
}
const aggregatedRecord = aggregatedUsage[usageKind] as UsageRecord;
aggregatedRecord.cost += record.cost;
aggregatedRecord.count += record.count;
aggregatedRecord.units += record.units;
});
});
return aggregatedUsage;
});
}
// eslint-disable-next-line
async #updateAddonCredit(actor: Actor, tokenAmount: number) {
if ( !actor.type?.user?.uuid ) {

View File

@@ -11,11 +11,21 @@ export interface UsageAddons {
[usageType: string]: number | string // TODO DS: string to support graduated discounts eventually
}
}
export interface UsageByType {
total: number
[serviceName: string]: number
export interface RecursiveRecord<T> { [k: string]: T | RecursiveRecord<T> }
export interface UsageRecord {
cost: number,
count: number,
units: number
}
export type UsageByType = { [k:string]: number | UsageRecord } & { total: number };
export interface AppTotals {
total: number,
count: number
}
export interface MeteringServiceDeps {
kvStore: DBKVStore,
superUserService: SUService,

View File

@@ -6,6 +6,7 @@ import APIError from '../../../api/APIError.js';
// @ts-ignore
import { Context } from '../../../util/context.js';
import type { MeteringService } from '../../MeteringService/MeteringService.js';
import { RecursiveRecord } from '../../MeteringService/types.js';
const GLOBAL_APP_KEY = 'global';
@@ -69,7 +70,7 @@ export class DBKVStore {
deleteExpired(expiredKeys);
}
return keys.map((key: string) => kvPairs[key]);
return keys.map((key: string) => kvPairs[key]) as unknown[];
}
const key_hash = murmurhash.v3(key);
@@ -258,7 +259,7 @@ export class DBKVStore {
return await this.#expireat(key, timestamp);
}
async incr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string, pathAndAmountMap: T }): Promise<T extends { '': number } ? number : Record<string, number>> {
async incr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string, pathAndAmountMap: T }): Promise<T extends { '': number } ? number : RecursiveRecord<number>> {
if ( Object.values(pathAndAmountMap).find((v) => typeof v !== 'number') ) {
throw new Error('All values in pathAndAmountMap must be numbers');
}
@@ -267,7 +268,7 @@ export class DBKVStore {
if ( typeof currVal !== 'object' && pathEntries.length <= 1 && !pathEntries[0]?.[0] ) {
const amount = pathEntries[0]?.[1] ?? 1;
this.set({ key, value: (Number(currVal) || 0) + amount });
return ((Number(currVal) || 0) + amount) as T extends { '': number } ? number : Record<string, number>;
return ((Number(currVal) || 0) + amount) as T extends { '': number } ? number : RecursiveRecord<number>;
}
// TODO DS: support arrays this also needs dynamodb implementation
if ( Array.isArray(currVal) ) {
@@ -305,7 +306,7 @@ export class DBKVStore {
obj[lastPart] += amount;
}
this.set({ key, value: currVal });
return currVal as T extends { '': number } ? number : Record<string, number>;
return currVal as T extends { '': number } ? number : RecursiveRecord<number>;
}
async decr(...params: Parameters<typeof DBKVStore.prototype.incr>): ReturnType<typeof DBKVStore.prototype.incr> {

View File

@@ -251,6 +251,44 @@ class Auth{
throw error;
}
}
async getGlobalUsage() {
try {
const resp = await fetch(`${this.APIOrigin}/metering/globalUsage`, {
headers: {
Authorization: `Bearer ${this.authToken}`,
},
});
const result = await resp.json();
// Log the response
if ( globalThis.puter?.apiCallLogger?.isEnabled() ) {
globalThis.puter.apiCallLogger.logRequest({
service: 'auth',
operation: 'global_usage',
params: {},
result: result,
});
}
return result;
} catch( error ) {
// Log the error
if ( globalThis.puter?.apiCallLogger?.isEnabled() ) {
globalThis.puter.apiCallLogger.logRequest({
service: 'auth',
operation: 'global_usage',
params: {},
error: {
message: error.message || error.toString(),
stack: error.stack,
},
});
}
throw error;
}
}
}
export default Auth;