feat: dynamo in oss (#2195)

* feat: dynamo in oss

* fix: service name mismatch for dynamo client

* fix: dynalite boot

* fix: tests
This commit is contained in:
Daniel Salazar
2025-12-18 17:22:49 -08:00
committed by GitHub
parent 9aefed5515
commit a1689c4ea3
29 changed files with 2042 additions and 630 deletions

1
.gitignore vendored
View File

@@ -68,3 +68,4 @@ AGENTS.md
coverage/
*.log
undefined

View File

@@ -64,6 +64,7 @@ something like the following (updated 2025-02-26):
"engine": "sqlite",
"path": "puter-database.sqlite"
},
"dynamo" :{"path":"./puter-ddb"},
"thumbnails": {
"engine": "http"
},

7
extensions/api.d.ts vendored
View File

@@ -5,7 +5,7 @@ import type { Actor } from '@heyputer/backend/src/services/auth/Actor.js';
import type { BaseDatabaseAccessService } from '@heyputer/backend/src/services/database/BaseDatabaseAccessService.d.ts';
import type { MeteringService } from '@heyputer/backend/src/services/MeteringService/MeteringService.ts';
import type { MeteringServiceWrapper } from '@heyputer/backend/src/services/MeteringService/MeteringServiceWrapper.mjs';
import type { DBKVStore } from '@heyputer/backend/src/services/repositories/DBKVStore/DBKVStore.ts';
import { DynamoKVStore } from '@heyputer/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.ts';
import type { SUService } from '@heyputer/backend/src/services/SUService.js';
import type { IUser } from '@heyputer/backend/src/services/User.js';
import type { UserService } from '@heyputer/backend/src/services/UserService.d.ts';
@@ -14,7 +14,6 @@ import type { RequestHandler } from 'express';
import type FSNodeContext from '../src/backend/src/filesystem/FSNodeContext.js';
import type helpers from '../src/backend/src/helpers.js';
import type * as ExtensionControllerExports from './ExtensionController/src/ExtensionController.ts';
declare global {
namespace Express {
interface Request {
@@ -80,7 +79,7 @@ type StripPrefix<TPrefix extends string, T extends string> = T extends `${TPrefi
// TODO DS: define this globally in core to use it there too
interface ServiceNameMap {
'meteringService': Pick<MeteringServiceWrapper, 'meteringService'> & MeteringService // TODO DS: squash into a single class without wrapper
'puter-kvstore': DBKVStore
'puter-kvstore': DynamoKVStore
'su': SUService
'database': BaseDatabaseAccessService
'user': UserService
@@ -97,7 +96,7 @@ interface Extension extends RouterMethods {
on(event: 'create.drivers', listener: (event: { createDriver: (interface: string, service: string, executors: any) => any }) => void),
on(event: 'create.permissions', listener: (event: { grant_to_everyone: (permission: string) => void, grant_to_users: (permission: string) => void }) => void)
on(event: 'create.interfaces', listener: (event: { createInterface: (interface: string, interfaces: DriverInterface) => void }) => void)
import(module: 'data'): { db: BaseDatabaseAccessService, kv: DBKVStore & { get: (string) => void, set: (string, string) => void }, cache: unknown }// TODO DS: type cache better
import(module: 'data'): { db: BaseDatabaseAccessService, kv: DynamoKVStore, cache: unknown } // TODO DS: type cache better
import(module: 'core'): CoreRuntimeModule,
import(module: 'fs'): FilesystemModule,
import(module: 'query'): typeof query,

1282
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -71,13 +71,16 @@
},
"dependencies": {
"@anthropic-ai/sdk": "^0.68.0",
"@aws-sdk/client-dynamodb": "^3.490.0",
"@aws-sdk/client-secrets-manager": "^3.879.0",
"@aws-sdk/client-sns": "^3.907.0",
"@aws-sdk/lib-dynamodb": "^3.490.0",
"@google/genai": "^1.19.0",
"@heyputer/putility": "^1.0.2",
"@paralleldrive/cuid2": "^2.2.2",
"@stylistic/eslint-plugin-js": "^4.4.1",
"dedent": "^1.5.3",
"dynalite": "^4.0.0",
"express-xml-bodyparser": "^0.4.1",
"ioredis": "^5.6.0",
"javascript-time-ago": "^2.5.11",

View File

@@ -152,6 +152,9 @@ const install = async ({ context, services, app, useapi, modapi }) => {
// side-effects from the events of other services.
// === Services which extend BaseService ===
const { DDBClientWrapper } = require('./services/repositories/DDBClientWrapper');
services.registerService('dynamo', DDBClientWrapper);
services.registerService('system-validation', SystemValidationService);
services.registerService('commands', CommandService);
services.registerService('__api-filesystem', FilesystemAPIService);
@@ -376,6 +379,9 @@ const install = async ({ context, services, app, useapi, modapi }) => {
const { MeteringServiceWrapper } = require('./services/MeteringService/MeteringServiceWrapper.mjs');
services.registerService('meteringService', MeteringServiceWrapper);
const { DynamoKVStoreWrapper } = require('./services/repositories/DynamoKVStore/DynamoKVStoreWrapper');
services.registerService('puter-kvstore', DynamoKVStoreWrapper);
const { PermissionShortcutService } = require('./services/auth/PermissionShortcutService');
services.registerService('permission-shortcut', PermissionShortcutService);

View File

@@ -560,7 +560,7 @@ class APIError {
*
* @static
* @param {number|string} status
* @param {Error} source
* @param {Error | null} source
* @param {string|Error|object} fields one of the following:
* - a string to use as the error message
* - an Error object to use as the source of the error

View File

@@ -31,6 +31,9 @@ module.exports = {
engine: 'sqlite',
path: 'puter-database.sqlite',
},
dynamo: {
path: './puter-ddb',
},
thumbnails: {
engine: 'purejs',
},

View File

@@ -0,0 +1,5 @@
const config = require('./config.js');
module.exports = {
config,
};

View File

@@ -18,6 +18,7 @@
*/
import { SpanStatusCode, trace } from '@opentelemetry/api';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-grpc';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
import { Resource } from '@opentelemetry/resources';
import { ConsoleMetricExporter, PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
@@ -26,7 +27,6 @@ import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-base';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import config from '../../config.js';
import BaseService from '../../services/BaseService.js';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-grpc';
export class TelemetryService extends BaseService {
/** @type {import('@opentelemetry/api').Tracer} */

View File

@@ -38,9 +38,6 @@ class SelfHostedModule extends AdvancedBase {
const DevCreditService = require('./DevCreditService');
services.registerService('dev-credit', DevCreditService);
const { DBKVServiceWrapper } = require('../../services/repositories/DBKVStore/index.mjs');
services.registerService('puter-kvstore', DBKVServiceWrapper);
// TODO: sucks
const RELATIVE_PATH = '../../../../../';

View File

@@ -12,21 +12,23 @@ import { FeatureFlagService } from '../../services/FeatureFlagService.js';
import { GetUserService } from '../../services/GetUserService.js';
import { InformationService } from '../../services/information/InformationService.js';
import { MeteringServiceWrapper } from '../../services/MeteringService/MeteringServiceWrapper.mjs';
import { NotificationService } from '../../services/NotificationService.js';
import { RegistrantService } from '../../services/RegistrantService.js';
import { RegistryService } from '../../services/RegistryService.js';
import { DBKVServiceWrapper } from '../../services/repositories/DBKVStore/index.mjs';
import { ScriptService } from '../../services/ScriptService.js';
import { SessionService } from '../../services/SessionService.js';
import { SUService } from '../../services/SUService.js';
import { SystemValidationService } from '../../services/SystemValidationService.js';
import { TraceService } from '../../services/TraceService.js';
import { AlarmService } from '../core/AlarmService.js';
import APIErrorService from '../web/APIErrorService.js';
import { NotificationService } from '../../services/NotificationService';
import { RegistrantService } from '../../services/RegistrantService';
import { RegistryService } from '../../services/RegistryService';
import { DDBClientWrapper } from '../../services/repositories/DDBClientWrapper.js';
import { DynamoKVStoreWrapper } from '../../services/repositories/DynamoKVStore/DynamoKVStoreWrapper';
import { ScriptService } from '../../services/ScriptService';
import { SessionService } from '../../services/SessionService';
import { SUService } from '../../services/SUService';
import { SystemValidationService } from '../../services/SystemValidationService';
import { TraceService } from '../../services/TraceService';
import { AlarmService } from '../core/AlarmService';
import APIErrorService from '../web/APIErrorService';
export class TestCoreModule {
async install (context) {
const services = context.get('services');
services.registerService('dynamo', DDBClientWrapper);
services.registerService('whoami', DetailProviderService);
services.registerService('get-user', GetUserService);
services.registerService('database', SqliteDatabaseAccessService);
@@ -36,7 +38,7 @@ export class TestCoreModule {
services.registerService('event', EventService);
services.registerService('commands', CommandService);
services.registerService('meteringService', MeteringServiceWrapper);
services.registerService('puter-kvstore', DBKVServiceWrapper);
services.registerService('puter-kvstore', DynamoKVStoreWrapper);
services.registerService('permission', PermissionService);
services.registerService('group', GroupService);
services.registerService('anomaly', AnomalyService);

View File

@@ -1,5 +1,15 @@
import { SqliteDatabaseAccessService } from './database/SqliteDatabaseAccessService';
import type { MeteringService } from './MeteringService/MeteringService';
import { MeteringServiceWrapper } from './MeteringService/MeteringServiceWrapper.mjs';
import { DynamoKVStore } from './repositories/DynamoKVStore/DynamoKVStore';
export interface ServiceResources {
services: { get (name: string): any };
services: {
get (name: 'meteringService'): MeteringServiceWrapper;
get (name: 'puter-kvstore'): DynamoKVStore;
get (name: 'database'): SqliteDatabaseAccessService
get (name: string): any
};
config: Record<string, any> & { services?: Record<string, any>; server_id?: string };
name?: string;
args?: any;

View File

@@ -2,7 +2,7 @@ import { describe, expect, it, vi } from 'vitest';
import { createTestKernel } from '../../../tools/test.mjs';
import { Actor } from '../auth/Actor';
import type { EventService } from '../EventService.js';
import { DBKVServiceWrapper } from '../repositories/DBKVStore/index.mjs';
import { DynamoKVStoreWrapper } from '../repositories/DynamoKVStore/DynamoKVStoreWrapper.js';
import { GLOBAL_APP_KEY, PERIOD_ESCAPE } from './consts.js';
import { COST_MAPS } from './costMaps/index.js';
import { MeteringService } from './MeteringService';
@@ -12,7 +12,7 @@ describe('MeteringService', async () => {
const testKernel = await createTestKernel({
serviceMap: {
meteringService: MeteringServiceWrapper,
'puter-kvstore': DBKVServiceWrapper,
'puter-kvstore': DynamoKVStoreWrapper,
},
initLevelString: 'init',
testCore: true,
@@ -20,6 +20,9 @@ describe('MeteringService', async () => {
'database': {
path: ':memory:',
},
'dynamo': {
path: ':memory:',
},
},
});
@@ -132,10 +135,10 @@ describe('MeteringService', async () => {
it('getAllowedUsage respects subscription overrides and consumed usage', async () => {
const actor = makeActor('limited-user');
const customPolicy = { id: 'tiny', monthUsageAllowance: 10, monthlyStorageAllowance: 0 };
const detPolicies = eventService.on('metering:registerAvailablePolicies', (_key, data) => {
const detPolicies = eventService.on('metering:registerAvailablePolicies', (_key: string, data: Record<string, unknown[]>) => {
data.availablePolicies.push(customPolicy);
});
const detUserSub = eventService.on('metering:getUserSubscription', (_key, data) => {
const detUserSub = eventService.on('metering:getUserSubscription', (_key: string, data: Record<string, unknown>) => {
data.userSubscriptionId = customPolicy.id;
});
@@ -204,10 +207,13 @@ describe('MeteringService', async () => {
expect(res['aws-polly:standard:character']).toMatchObject({ cost: 12, units: 10, count: 1 });
});
it('applies the configured cost map rate for every usage type', async () => {
it('applies the configured cost map rate for random samples of usage types', async () => {
const usageAmount = 2;
for ( const [usageType, costPerUnit] of Object.entries(COST_MAPS) ) {
const entries = Object.entries(COST_MAPS);
for ( let i = 0; i < entries.length; i += Math.ceil(Math.random() * entries.length / 10) ) {
const [usageType, costPerUnit] = entries[i];
const actor = makeActor(`cost-map-user-${usageType.replace(/[^a-zA-Z0-9]/g, '-')}`);
const result = await testSubject.meteringService.incrementUsage(actor, usageType, usageAmount);
const escapedUsageType = usageType.replace(/\./g, PERIOD_ESCAPE);
@@ -219,5 +225,5 @@ describe('MeteringService', async () => {
count: 1,
});
}
}, 10000);
}, 30000);
});

View File

@@ -2,12 +2,12 @@ import murmurhash from 'murmurhash';
import type { AlarmService } from '../../modules/core/AlarmService.js';
import { SystemActorType, type Actor } from '../auth/Actor.js';
import type { EventService } from '../EventService';
import type { DBKVStore } from '../repositories/DBKVStore/DBKVStore';
import type { SUService } from '../SUService.js';
import { DEFAULT_FREE_SUBSCRIPTION, DEFAULT_TEMP_SUBSCRIPTION, GLOBAL_APP_KEY, METRICS_PREFIX, PERIOD_ESCAPE, POLICY_PREFIX } from './consts.js';
import { COST_MAPS } from './costMaps/index.js';
import { SUB_POLICIES } from './subPolicies/index.js';
import { AppTotals, MeteringServiceDeps, UsageAddons, UsageByType, UsageRecord } from './types.js';
import { DynamoKVStore } from '../repositories/DynamoKVStore/DynamoKVStore.js';
/**
* Handles usage metering and supports stubbs for billing methods for current scoped actor
*/
@@ -15,7 +15,7 @@ export class MeteringService {
static GLOBAL_SHARD_COUNT = 1000; // number of global usage shards to spread writes across
static APP_SHARD_COUNT = 100; // number of app usage shards to spread writes across
#kvStore: DBKVStore;
#kvStore: DynamoKVStore;
#superUserService: SUService;
#alarmService: AlarmService;
#eventService: EventService;
@@ -416,7 +416,7 @@ export class MeteringService {
if ( actorAppId && actorAppId !== appId && appId !== GLOBAL_APP_KEY ) {
throw new Error('Actor can only get usage details for their own app or global app');
}
return usage || { total: 0 };
return usage || { total: 0 } as UsageByType;
});
}

View File

@@ -1,6 +1,6 @@
import type { AlarmService } from '../../modules/core/AlarmService';
import type { EventService } from '../EventService';
import type { DBKVStore } from '../repositories/DBKVStore/DBKVStore';
import { DynamoKVStore } from '../repositories/DynamoKVStore/DynamoKVStore';
import type { SUService } from '../SUService';
export interface UsageAddons {
@@ -27,7 +27,7 @@ export interface AppTotals {
count: number
}
export interface MeteringServiceDeps {
kvStore: DBKVStore,
kvStore: DynamoKVStore,
superUserService: SUService,
alarmService: AlarmService
eventService: EventService

View File

@@ -12,6 +12,9 @@ describe('ClaudeProvider ', async () => {
'database': {
path: ':memory:',
},
'dynamo': {
path: ':memory:',
},
},
});

View File

@@ -1,29 +0,0 @@
import { BaseService } from '../BaseService';
export type DBMode = 'DB_WRITE' | 'DB_READ';
export interface IBaseDatabaseAccessService {
get(): this;
read(query: string, params?: any[]): Promise<any>;
tryHardRead(query: string, params?: any[]): Promise<any>;
requireRead(query: string, params?: any[]): Promise<any>;
pread(query: string, params?: any[]): Promise<any>;
write(query: string, params?: any[]): Promise<any>;
insert(table_name: string, data: Record<string, any>): Promise<any>;
batch_write(statements: string[]): any;
}
export class BaseDatabaseAccessService extends BaseService implements IBaseDatabaseAccessService {
static DB_WRITE: DBMode;
static DB_READ: DBMode;
case<T>(choices: Record<'mysql' | 'sqlite' | (string & {}), T>): T;
get (): this;
read (query: string, params?: any[]): Promise<any>;
tryHardRead (query: string, params?: any[]): Promise<any>;
requireRead (query: string, params?: any[]): Promise<any>;
pread (query: string, params?: any[]): Promise<any>;
write (query: string, params?: any[]): Promise<any>;
insert (table_name: string, data: Record<string, any>): Promise<any>;
batch_write (statements: string[]): any;
_gen_insert_sql (table_name: string, data: Record<string, any>): string;
}

View File

@@ -0,0 +1,2 @@
*.js
*.js.map

View File

@@ -1 +0,0 @@
*.js

View File

@@ -1,362 +0,0 @@
import murmurhash from 'murmurhash';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import APIError from '../../../api/APIError.js';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import { Context } from '../../../util/context.js';
import type { MeteringService } from '../../MeteringService/MeteringService.js';
import { RecursiveRecord } from '../../MeteringService/types.js';
const GLOBAL_APP_KEY = 'global';
export interface IDBKVStore {
get ({ key }: { key: string | string[] }): Promise<unknown | null | (unknown | null)[]>;
set ({ key, value, expireAt }: { key: string, value: unknown, expireAt?: number }): Promise<boolean>;
del ({ key }: { key: string }): Promise<boolean>;
list ({ as }: { as?: 'keys' | 'values' | 'entries' }): Promise<string[] | unknown[] | { key: string, value: unknown }[]>;
flush (): Promise<boolean>;
expireAt ({ key, timestamp }: { key: string, timestamp: number }): Promise<void>;
expire ({ key, ttl }: { key: string, ttl: number }): Promise<void>;
incr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string, pathAndAmountMap: T }): Promise<T extends { '': number } ? number : RecursiveRecord<number>>;
decr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string, pathAndAmountMap: T }): Promise<T extends { '': number } ? number : RecursiveRecord<number>>;
}
export class DBKVStore implements IDBKVStore {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
#db: any;
#meteringService: MeteringService;
#global_config: Record<string, unknown> = {};
// TODO DS: make table name configurable
constructor ({ sqlClient, meteringService, globalConfig }: { sqlClient: unknown, meteringService: MeteringService, globalConfig: Record<string, unknown> }) {
this.#db = sqlClient;
this.#meteringService = meteringService;
this.#global_config = globalConfig;
}
async get ({ key }: { key: string | string[] }) {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) {
throw new Error('User not found');
}
const deleteExpired = async (rows: { kkey_hash: string }[]) => {
const query = `DELETE FROM kv WHERE user_id=? AND app=? AND kkey_hash IN (${rows.map(() => '?').join(',')})`;
const params = [user.id, app?.uid ?? GLOBAL_APP_KEY, ...rows.map((r) => r.kkey_hash)];
return await this.#db.write(query, params);
};
if ( Array.isArray(key) ) {
const keys = key;
const key_hashes = keys.map((key: string) => murmurhash.v3(key));
const placeholders = key_hashes.map(() => '?').join(',');
const params = app
? [user.id, app.uid, ...key_hashes]
: [user.id, ...key_hashes];
const rows = app
? await this.#db.read(`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND app=? AND kkey_hash IN (${placeholders})`, params)
: await this.#db.read(`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash IN (${placeholders})`,
params);
const kvPairs: Record<string, unknown> = {};
rows.forEach((row: { kkey: string, value: string }) => {
row.value = this.#db.case({
mysql: () => row.value,
otherwise: () => JSON.parse(row.value ?? 'null'),
})();
kvPairs[row.kkey] = row.value;
});
const expiredKeys: { kkey_hash: string }[] = [];
rows.forEach((row: { kkey: string, expireAt: number, kkey_hash: string, value: unknown }) => {
if ( row?.expireAt && row.expireAt < Date.now() / 1000 ) {
expiredKeys.push(row);
kvPairs[row.kkey] = null;
} else {
kvPairs[row.kkey] = row.value ?? null;
}
});
// clean up expired keys asynchronously
if ( expiredKeys.length ) {
deleteExpired(expiredKeys);
}
return keys.map((key: string) => Object.prototype.hasOwnProperty.call(kvPairs, key) ? kvPairs[key] : null) as unknown[];
}
const key_hash = murmurhash.v3(key);
const kv = app
? await this.#db.read('SELECT * FROM kv WHERE user_id=? AND app=? AND kkey_hash=? LIMIT 1', [user.id, app.uid, key_hash])
: await this.#db.read(`SELECT * FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}') AND kkey_hash=? LIMIT 1`,
[user.id, key_hash]);
if ( kv[0] ) {
kv[0].value = this.#db.case({
mysql: () => kv[0].value,
otherwise: () => JSON.parse(kv[0].value ?? 'null'),
})();
}
if ( kv[0]?.expireAt && kv[0].expireAt < Date.now() / 1000 ) {
// key has expired
// clean up asynchronously
deleteExpired([kv[0]]);
return null;
}
await this.#meteringService.incrementUsage(actor, 'kv:read', Array.isArray(key) ? key.length : 1);
return kv[0]?.value ?? null;
}
async set ({ key, value, expireAt }: { key: string, value: unknown, expireAt?: number }) {
const actor = Context.get('actor');
const config = this.#global_config;
key = String(key);
if ( Buffer.byteLength(key, 'utf8') > (config.kv_max_key_size as number) ) {
throw new Error(`key is too large. Max size is ${config.kv_max_key_size}.`);
}
if (
value !== null &&
Buffer.byteLength(JSON.stringify(value), 'utf8') > (config.kv_max_value_size as number)
) {
throw new Error(`value is too large. Max size is ${config.kv_max_value_size}.`);
}
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) {
throw new Error('User not found');
}
const key_hash = murmurhash.v3(key);
try {
await this.#db.write(`INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt)
VALUES (?, ?, ?, ?, ?, ?) ${this.#db.case({
mysql: 'ON DUPLICATE KEY UPDATE value = ?',
sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET value = excluded.value',
})
}`,
[
user.id,
app?.uid ?? GLOBAL_APP_KEY,
key_hash,
key,
JSON.stringify(value),
expireAt ?? undefined,
...this.#db.case({ mysql: [value], otherwise: [] }),
]);
} catch ( e: unknown ) {
console.error(e);
}
await this.#meteringService.incrementUsage(actor, 'kv:write', 1);
return true;
}
async del ({ key }: { key: string }) {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) {
throw new Error('User not found');
}
const key_hash = murmurhash.v3(key);
await this.#db.write('DELETE FROM kv WHERE user_id=? AND app=? AND kkey_hash=?', [
user.id,
app?.uid ?? GLOBAL_APP_KEY,
key_hash,
]);
await this.#meteringService.incrementUsage(actor, 'kv:write', 1);
return true;
}
async list ({ as }: { as?: string }) {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) {
throw new Error('User not found');
}
let rows = app
? await this.#db.read('SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND app=?', [user.id, app.uid])
: await this.#db.read(`SELECT kkey, value, expireAt FROM kv WHERE user_id=? AND (app IS NULL OR app = '${GLOBAL_APP_KEY}')`,
[user.id]);
rows = rows.filter((row: { expireAt: number }) => {
return !row?.expireAt || row?.expireAt > Date.now() / 1000;
});
rows = rows.map((row: { kkey: string, value: string }) => ({
key: row.kkey,
value: this.#db.case({
mysql: () => row.value,
otherwise: () => JSON.parse(row.value ?? 'null'),
})(),
}));
as = as || 'entries';
if ( ! ['keys', 'values', 'entries'].includes(as) ) {
throw APIError.create('field_invalid', undefined, {
key: 'as',
expected: '"keys", "values", or "entries"',
});
}
if ( as === 'keys' ) {
rows = rows.map((row: { key: string }) => row.key);
} else if ( as === 'values' ) {
rows = rows.map((row: { value: unknown }) => row.value);
}
await this.#meteringService.incrementUsage(actor, 'kv:read', rows.length);
return rows;
}
async flush () {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) {
throw new Error('User not found');
}
await this.#db.write('DELETE FROM kv WHERE user_id=? AND app=?', [
user.id,
app?.uid ?? GLOBAL_APP_KEY,
]);
await this.#meteringService.incrementUsage(actor, 'kv:write', 1);
return true;
}
async expireAt ({ key, timestamp }: { key: string, timestamp: number }) {
if ( key === '' ) {
throw APIError.create('field_empty', undefined, {
key: 'key',
});
}
timestamp = Number(timestamp);
return await this.#expireat(key, timestamp);
}
async expire ({ key, ttl }: { key: string, ttl: number }) {
if ( key === '' ) {
throw APIError.create('field_empty', undefined, {
key: 'key',
});
}
ttl = Number(ttl);
// timestamp in seconds
let timestamp = Math.floor(Date.now() / 1000) + ttl;
return await this.#expireat(key, timestamp);
}
async incr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }): Promise<T extends { '': number; } ? number : RecursiveRecord<number>> {
if ( Object.values(pathAndAmountMap).find((v) => typeof v !== 'number') ) {
throw new Error('All values in pathAndAmountMap must be numbers');
}
let currVal = await this.get({ key });
const pathEntries = Object.entries(pathAndAmountMap);
if ( typeof currVal !== 'object' && pathEntries.length <= 1 && !pathEntries[0]?.[0] ) {
const amount = pathEntries[0]?.[1] ?? 1;
this.set({ key, value: (Number(currVal) || 0) + amount });
return ((Number(currVal) || 0) + amount) as T extends { '': number } ? number : RecursiveRecord<number>;
}
// TODO DS: support arrays this also needs dynamodb implementation
if ( Array.isArray(currVal) ) {
throw new Error('Current value is an array');
}
if ( ! currVal ) {
currVal = {};
}
if ( typeof currVal !== 'object' ) {
throw new Error('Current value is not an object');
}
// create or change values as needed
for ( const [path, amount] of Object.entries(pathAndAmountMap) ) {
const pathParts = path.split('.');
let obj = currVal;
if ( obj === null ) continue;
for ( let i = 0; i < pathParts.length - 1; i++ ) {
const part = pathParts[i];
if ( ! obj[part] ) {
obj[part] = {};
}
if ( typeof obj[part] !== 'object' || Array.isArray(currVal) ) {
throw new Error(`Path ${pathParts.slice(0, i + 1).join('.')} is not an object`);
}
obj = obj[part];
}
if ( obj === null ) continue;
const lastPart = pathParts[pathParts.length - 1];
if ( ! obj[lastPart] ) {
obj[lastPart] = 0;
}
if ( typeof obj[lastPart] !== 'number' ) {
throw new Error(`Value at path ${path} is not a number`);
}
obj[lastPart] += amount;
}
this.set({ key, value: currVal });
return currVal as T extends { '': number } ? number : RecursiveRecord<number>;
}
async decr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }): Promise<T extends { '': number; } ? number : RecursiveRecord<number>> {
return this.incr({ key, pathAndAmountMap: Object.fromEntries(Object.entries(pathAndAmountMap).map(([k, v]) => [k, -v])) as T });
}
async #expireat (key: string, timestamp: number) {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) {
throw new Error('User not found');
}
const key_hash = murmurhash.v3(key);
try {
await this.#db.write(`INSERT INTO kv (user_id, app, kkey_hash, kkey, value, expireAt)
VALUES (?, ?, ?, ?, ?, ?) ${this.#db.case({
mysql: 'ON DUPLICATE KEY UPDATE expireAt = ?',
sqlite: 'ON CONFLICT(user_id, app, kkey_hash) DO UPDATE SET expireAt = excluded.expireAt',
})
}`,
[
user.id,
app?.uid ?? GLOBAL_APP_KEY,
key_hash,
key,
undefined, // empty value
timestamp,
...this.#db.case({ mysql: [timestamp], otherwise: [] }),
]);
} catch ( e: unknown ) {
console.error(e);
}
}
}

View File

@@ -1,31 +0,0 @@
import BaseService from '../../BaseService.js';
import { DB_READ } from '../../database/consts.js';
import { DBKVStore } from './DBKVStore.js';
export class DBKVServiceWrapper extends BaseService {
/** @type {DBKVStore} */
kvStore = undefined;
_init () {
/** @type {DBKVStore} */
this.kvStore = new DBKVStore({
sqlClient: this.services.get('database').get(DB_READ, 'kvstore'),
meteringService: this.services.get('meteringService').meteringService,
globalConfig: this.global_config,
});
Object.getOwnPropertyNames(DBKVStore.prototype).forEach(fn => {
if ( fn === 'constructor' ) return;
this[fn] = (...args) => this.kvStore[fn](...args);
});
}
static IMPLEMENTS = {
['puter-kvstore']: Object.getOwnPropertyNames(DBKVStore.prototype)
.filter(n => n !== 'constructor')
.reduce((acc, fn) => ({
...acc,
[fn]: async function (...a) {
return await this.kvStore[fn](...a);
},
}), {}),
};
}

View File

@@ -0,0 +1,208 @@
import { CreateTableCommand, CreateTableCommandInput, DynamoDBClient, UpdateTimeToLiveCommand } from '@aws-sdk/client-dynamodb';
import { BatchGetCommand, BatchGetCommandInput, DeleteCommand, DynamoDBDocumentClient, GetCommand, PutCommand, QueryCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb';
import { NodeHttpHandler } from '@smithy/node-http-handler';
import dynalite from 'dynalite';
import { once } from 'node:events';
import { Agent as httpsAgent } from 'node:https';
interface DBClientConfig {
aws?: {
access_key: string
secret_key: string
region: string
},
path?: string,
endpoint?: string
}
export class DDBClient {
ddbClient: Promise<DynamoDBClient>;
#documentClient!: DynamoDBDocumentClient;
config?: DBClientConfig;
constructor (config?: DBClientConfig) {
this.config = config;
this.ddbClient = this.#getClient();
this.ddbClient.then(client => {
this.#documentClient = DynamoDBDocumentClient.from(client, {
marshallOptions: {
removeUndefinedValues: true,
} });
});
}
async #getClient () {
if ( ! this.config?.aws ) {
console.warn('No config for DynamoDB, will fall back on local dynalite');
const dynaliteInstance = dynalite({ createTableMs: 0, path: this.config?.path === ':memory:' ? undefined : this.config?.path || './puter-ddb' });
const dynaliteServer = dynaliteInstance.listen(0, '127.0.0.1');
await once(dynaliteServer, 'listening');
const address = dynaliteServer.address();
const port = (typeof address === 'object' && address ? address.port : undefined) || 4567;
const dynamoEndpoint = `http://127.0.0.1:${port}`;
return new DynamoDBClient({
credentials: {
accessKeyId: 'fake',
secretAccessKey: 'fake',
},
maxAttempts: 3,
requestHandler: new NodeHttpHandler({
connectionTimeout: 5000,
requestTimeout: 5000,
httpsAgent: new httpsAgent({ keepAlive: true }),
}),
endpoint: dynamoEndpoint,
region: 'us-west-2',
});
}
return new DynamoDBClient({
credentials: {
accessKeyId: this.config.aws.access_key,
secretAccessKey: this.config.aws.secret_key,
},
maxAttempts: 3,
requestHandler: new NodeHttpHandler({
connectionTimeout: 5000,
requestTimeout: 5000,
httpsAgent: new httpsAgent({ keepAlive: true }),
}),
...(this.config.endpoint ? { endpoint: this.config.endpoint } : {}),
region: this.config.aws.region || 'us-west-2',
});
}
async get <T extends Record<string, unknown>>(table: string, key: T, consistentRead = false) {
const command = new GetCommand({
TableName: table,
Key: key,
ConsistentRead: consistentRead,
ReturnConsumedCapacity: 'TOTAL',
});
const response = await this.#documentClient.send(command);
return response;
}
async put <T extends Record<string, unknown>>(table: string, item: T) {
const command = new PutCommand({
TableName: table,
Item: item,
ReturnConsumedCapacity: 'TOTAL',
});
const response = await this.#documentClient.send(command);
return response;
}
async batchGet (params: { table: string, items: Record<string, unknown> }[], consistentRead = false) {
// TODO DS: implement chunking for more than 100 items or more than allowed req size
const allRequestItemsPerTable = params.reduce((acc, curr) => {
if ( ! acc[curr.table] ) acc[curr.table] = [];
acc[curr.table].push(curr.items);
return acc;
}, {} as Record<string, Record<string, unknown>[]>);
const RequestItems: BatchGetCommandInput['RequestItems'] = Object.entries(allRequestItemsPerTable).reduce((acc, [table, keyList]) => {
const Keys = keyList;
acc[table] = {
Keys,
ConsistentRead: consistentRead,
};
return acc;
},
{} as NonNullable<BatchGetCommandInput['RequestItems']>);
const command = new BatchGetCommand({
RequestItems,
ReturnConsumedCapacity: 'TOTAL',
});
return this.#documentClient.send(command);
}
async del<T extends Record<string, unknown>> (table: string, key: T) {
const command = new DeleteCommand({
TableName: table,
Key: key,
ReturnConsumedCapacity: 'TOTAL',
});
return this.#documentClient.send(command);
}
async query<T extends Record<string, unknown>> (table: string, keys: T, limit = 0, pageKey?: Record<string, unknown>, index = '', consistentRead = false) {
const keyExpression = Object.keys(keys).map(key => `#${key} = :${key}`).join(' AND ');
const expressionAttributeValues = Object.entries(keys).reduce((acc, [key, value]) => {
acc[`:${key}`] = value;
return acc;
}, {});
const expressionAttributeNames = Object.keys(keys).reduce((acc, key) => {
acc[`#${key}`] = key;
return acc;
}, {});
const command = new QueryCommand({
TableName: table,
...(!index ? {} : { IndexName: index }),
KeyConditionExpression: keyExpression,
ExpressionAttributeValues: expressionAttributeValues,
ExpressionAttributeNames: expressionAttributeNames,
ConsistentRead: consistentRead,
...(!pageKey ? {} : { ExclusiveStartKey: pageKey }),
...(!limit ? {} : { Limit: limit }),
ReturnConsumedCapacity: 'TOTAL',
});
return await this.#documentClient.send(command);
}
async update<T extends Record<string, unknown>> (table: string, key: T, expression: string, expressionValues: Record<string, unknown>, expressionNames: Record<string, string>) {
const command = new UpdateCommand({
TableName: table,
Key: key,
UpdateExpression: expression,
ExpressionAttributeValues: expressionValues,
ExpressionAttributeNames: expressionNames,
ReturnValues: 'ALL_NEW',
ReturnConsumedCapacity: 'TOTAL',
});
try {
return await this.#documentClient.send(command);
} catch ( e ) {
console.error('DDB Update Error', e);
throw e;
}
}
async createTableIfNotExists (params: CreateTableCommandInput, ttlAttribute?: string) {
if ( this.config?.aws ) {
console.warn('Creating DynamoDB tables in AWS is disabled by default, but if you need to enable it, modify the DDBClient class');
return;
}
try {
await this.#documentClient.send(new CreateTableCommand(params));
} catch ( e ) {
if ( (e as Error)?.name !== 'ResourceInUseException' ) {
throw e;
}
setTimeout(async () => {
if ( ttlAttribute ) {
// ensure TTL is set
await this.#documentClient.send(new UpdateTimeToLiveCommand({
TableName: params.TableName!,
TimeToLiveSpecification: {
AttributeName: ttlAttribute,
Enabled: true,
},
}));
}
}, 5000); // wait 5 seconds to ensure table is active
}
}
}

View File

@@ -0,0 +1,19 @@
import { BaseService } from '@heyputer/backend/src/services/BaseService.js';
import { DDBClient } from './DDBClient.js';
/** Wrapping actual implementation to be usable through our core structure */
class DDBClientServiceWrapper extends BaseService {
ddbClient!: DDBClient;
async _construct () {
this.ddbClient = new DDBClient(this.config as unknown as ConstructorParameters<typeof DDBClient>[0]);
await this.ddbClient.ddbClient; // ensure client is ready
Object.getOwnPropertyNames(DDBClient.prototype).forEach(fn => {
if ( fn === 'constructor' ) return;
this[fn] = (...args: unknown[]) => this.ddbClient[fn](...args);
});
}
}
export const DDBClientWrapper = DDBClientServiceWrapper as unknown as DDBClient;

View File

@@ -1,42 +1,54 @@
import { Actor } from '@heyputer/backend/src/services/auth/Actor.js';
import { SUService } from '@heyputer/backend/src/services/SUService';
import { createTestKernel } from '@heyputer/backend/tools/test.mjs';
import { describe, expect, it } from 'vitest';
import { createTestKernel } from '../../../../tools/test.mjs';
import * as config from '../../../config';
import { Actor } from '../../auth/Actor';
import { DBKVServiceWrapper } from './index.mjs';
import { SUService } from '../../SUService';
import { DynamoKVStore } from './DynamoKVStore.js';
import { DynamoKVStoreWrapper, IDynamoKVStoreWrapper } from './DynamoKVStoreWrapper.js';
import { config } from '../../../loadTestConfig.js';
describe('DBKVStore', async () => {
config.load_config({
'services': {
'database': {
path: ':memory:',
},
},
});
const testKernel = await createTestKernel({
serviceMap: {},
initLevelString: 'init',
testCore: true,
serviceConfigOverrideMap: {
'database': {
path: ':memory:',
},
},
});
const kvServiceWrapper = testKernel.services!.get('puter-kvstore') as DBKVServiceWrapper;
const kvStore = kvServiceWrapper.kvStore;
const su = testKernel.services!.get('su') as SUService;
describe('DynamoKVStore', async () => {
const TABLE_NAME = 'store-kv-v1';
const makeActor = (userId: number | string, appUid?: string) => ({
type: {
user: { id: userId, uuid: String(userId) },
...(appUid ? { app: { uid: appUid } } : {}),
},
}) as unknown as Actor;
}) as Actor;
const testKernel = await createTestKernel({
serviceMap: {
'puter-kvstore': DynamoKVStoreWrapper,
},
initLevelString: 'init',
testCore: true,
serviceConfigOverrideMap: {
'services': {
'puter-kvstore': { tableName: TABLE_NAME },
},
},
});
const testSubject = testKernel.services!.get('puter-kvstore') as IDynamoKVStoreWrapper;
const kvStore = testSubject.kvStore!;
const su = testKernel.services!.get('su') as SUService;
it('should be instantiated', () => {
expect(testSubject).toBeInstanceOf(DynamoKVStoreWrapper);
});
it('should contain a copy of the public methods of DynamoKVStore too', () => {
const meteringMethods = Object.getOwnPropertyNames(DynamoKVStore.prototype)
.filter((name) => name !== 'constructor');
const wrapperMethods = testSubject as unknown as Record<string, unknown>;
const missing = meteringMethods.filter((name) => typeof wrapperMethods[name] !== 'function');
expect(missing).toEqual([]);
});
it('should have DynamoKVStore instantiated', async () => {
expect(testSubject.kvStore).toBeInstanceOf(DynamoKVStore);
});
it('sets and retrieves values for the current actor context', async () => {
const actor = makeActor(1);
const key = 'greeting';
@@ -64,29 +76,6 @@ describe('DBKVStore', async () => {
expect(fromTwo).toBe('two');
});
it('retrieves single and multiple keys respecting app vs global scope', async () => {
const userId = 12;
const globalActor = makeActor(userId);
const appActor = makeActor(userId, 'scoped-app');
await su.sudo(globalActor, () => kvStore.set({ key: 'shared', value: 'global-shared' }));
await su.sudo(globalActor, () => kvStore.set({ key: 'global-only', value: 'global' }));
await su.sudo(appActor, () => kvStore.set({ key: 'shared', value: 'app-shared' }));
await su.sudo(appActor, () => kvStore.set({ key: 'app-only', value: 'app' }));
const globalSingle = await su.sudo(globalActor, () => kvStore.get({ key: 'shared' }));
const appSingle = await su.sudo(appActor, () => kvStore.get({ key: 'shared' }));
expect(globalSingle).toBe('global-shared');
expect(appSingle).toBe('app-shared');
const globalList = await su.sudo(globalActor, () => kvStore.get({ key: ['shared', 'app-only', 'global-only'] }));
const appList = await su.sudo(appActor, () => kvStore.get({ key: ['shared', 'app-only', 'global-only'] }));
expect(globalList).toEqual(['global-shared', null, 'global']);
expect(appList).toEqual(['app-shared', 'app', null]);
});
it('increments nested numeric paths and persists the aggregated totals', async () => {
const actor = makeActor(3);
const key = 'counter-key';
@@ -129,7 +118,9 @@ describe('DBKVStore', async () => {
it('deletes keys with del', async () => {
const actor = makeActor(5);
const key = 'delete-me';
await su.sudo(actor, () => kvStore.set({ key, value: 'bye' }));
await su.sudo(actor, () => {
return kvStore.set({ key, value: 'bye' });
});
const res = await su.sudo(actor, () => kvStore.del({ key }));
const value = await su.sudo(actor, () => kvStore.get({ key }));
@@ -208,10 +199,10 @@ describe('DBKVStore', async () => {
await expect(su.sudo(actor, () => kvStore.set({ key: oversizedKey, value: 'x' })))
.rejects
.toThrow(/key is too large/i);
.toThrow(/1024/i);
await expect(su.sudo(actor, () => kvStore.set({ key: 'ok', value: oversizedValue })))
.rejects
.toThrow(/value is too large/i);
.toThrow(/has exceeded the maximum allowed size/i);
});
});

View File

@@ -0,0 +1,453 @@
import { Actor, SystemActorType } from '@heyputer/backend/src/services/auth/Actor.js';
import type { BaseDatabaseAccessService } from '@heyputer/backend/src/services/database/BaseDatabaseAccessService.js';
import type { MeteringService } from '@heyputer/backend/src/services/MeteringService/MeteringService.js';
import { RecursiveRecord } from '@heyputer/backend/src/services/MeteringService/types.js';
import { Context } from '@heyputer/backend/src/util/context.js';
import murmurhash from 'murmurhash';
import { DDBClient } from '../DDBClient.js';
import { PUTER_KV_STORE_TABLE_DEFINITION } from './tableDefinition.js';
import APIError from '../../../api/APIError.js';
export class DynamoKVStore {
static GLOBAL_APP_KEY = 'os-global';
static LEGACY_GLOBAL_APP_KEY = 'global';
#ddbClient: DDBClient;
#sqlClient: BaseDatabaseAccessService;
#meteringService: MeteringService;
#tableName = 'store-kv-v1';
#pathCleanerRegex = /[:\-+/*]/g;
#enableMigrationFromSQL = false;
constructor ({ ddbClient, sqlClient, tableName, meteringService }: { ddbClient: DDBClient, sqlClient: BaseDatabaseAccessService, tableName: string, meteringService: MeteringService }) {
this.#ddbClient = ddbClient;
this.#sqlClient = sqlClient;
this.#tableName = tableName;
this.#meteringService = meteringService;
this.#enableMigrationFromSQL = !this.#ddbClient.config?.aws; // TODO: disable via config after some time passes
}
async createTableIfNotExists () {
if ( ! this.#enableMigrationFromSQL ) return;
await this.#ddbClient.createTableIfNotExists({ ...PUTER_KV_STORE_TABLE_DEFINITION, TableName: this.#tableName }, 'ttl');
}
#getNameSpace (actor: Actor) {
if ( actor.type instanceof SystemActorType ) {
return 'v1:system';
} else {
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) throw new Error('User not found');
return `v1:${app ? `${user.uuid}:${app.uid}`
: `${user.uuid}:${this.#enableMigrationFromSQL ? DynamoKVStore.LEGACY_GLOBAL_APP_KEY : DynamoKVStore.GLOBAL_APP_KEY}`}`;
}
}
async get ({ key }: { key: string | string[]; }): Promise<unknown | null | (unknown | null)[]> {
if ( key === '' ) {
throw APIError.create('field_empty', null, {
key: 'key',
});
}
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
const namespace = this.#getNameSpace(actor);
const multi = Array.isArray(key);
const keys = multi ? key : [key];
const values: unknown[] = [];
let kvEntries;
let usage;
if ( multi ) {
const entriesAndUsage = (await this.#getBatches(namespace, keys));
kvEntries = entriesAndUsage.kvEntries;
usage = entriesAndUsage.usage;
} else {
const res = await this.#ddbClient.get(this.#tableName, { namespace, key });
kvEntries = res.Item ? [res.Item] : [];
usage = res.ConsumedCapacity?.CapacityUnits ?? 0;
}
this.#meteringService.incrementUsage(actor, 'kv:read', usage || 0);
for ( const key of keys ) {
const kv_entry = kvEntries?.find(e => e.key === key);
const time = Date.now() / 1000;
if ( kv_entry?.ttl && kv_entry.ttl <= (time) ) {
values.push(null);
continue;
}
if ( kv_entry?.value ) {
values.push(kv_entry.value);
continue;
}
if ( this.#enableMigrationFromSQL ) {
const key_hash = murmurhash.v3(key);
const kv_row = await this.#sqlClient.read('SELECT * FROM kv WHERE user_id=? AND app=? AND kkey_hash=? LIMIT 1',
[user.id, app?.uid ?? DynamoKVStore.LEGACY_GLOBAL_APP_KEY, key_hash]);
if ( kv_row[0]?.value ) {
// update and delete from this table
(async () => {
await this.set({ key: kv_row[0].key, value: kv_row[0].value });
await this.#sqlClient.write('DELETE FROM kv WHERE user_id=? AND app=? AND kkey_hash=?',
[user.id, app?.uid ?? DynamoKVStore.LEGACY_GLOBAL_APP_KEY, key_hash]);
})();
values.push(kv_row[0]?.value);
continue;
}
}
values.push(kv_entry?.value ?? null);
}
return multi ? values : values[0];
}
/**
*
* @param {string} namespace
* @param {string[]} allKeys
* @returns
*/
async #getBatches (namespace: string, allKeys: string[]) {
const batches: string[][] = [];
for ( let i = 0; i < allKeys.length; i += 100 ) {
batches.push(allKeys.slice(i, i + 100));
}
const batchPromises = batches.map(async (keys) => {
const requests = [...new Set(keys)].map(k => ({ table: this.#tableName, items: { namespace, key: k } }));
const res = await this.#ddbClient.batchGet(requests);
const kvEntries = res.Responses?.[this.#tableName];
const usage = res.ConsumedCapacity?.reduce((acc, curr) => acc + (curr.CapacityUnits ?? 0), 0);
return { kvEntries, usage };
});
const batchGets = await Promise.all(batchPromises);
return batchGets.reduce((acc, curr) => {
acc.kvEntries!.push(...curr?.kvEntries ?? []);
acc.usage! += curr.usage || 0;
return acc;
}, { kvEntries: [], usage: 0 });
}
async set ({ key, value, expireAt }: { key: string; value: unknown; expireAt?: number; }): Promise<boolean> {
const context = Context.get();
const actor = context.get('actor');
if ( key === '' ) {
throw APIError.create('field_empty', undefined, {
key: 'key',
});
}
key = String(key);
if ( Buffer.byteLength(key, 'utf8') > 1024 ) {
throw new Error(`key is too large. Max size is ${1024}.`);
}
if ( this.#enableMigrationFromSQL ) {
this.get({ key });
}
const namespace = this.#getNameSpace(actor);
const res = await this.#ddbClient.put(this.#tableName, {
namespace,
key,
value,
ttl: expireAt,
});
this.#meteringService.incrementUsage(actor, 'kv:write', res?.ConsumedCapacity?.CapacityUnits ?? 1);
return true;
}
async del ({ key }: { key: string; }): Promise<boolean> {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) throw new Error('User not found');
const namespace = this.#getNameSpace(actor);
const res = await this.#ddbClient.del(this.#tableName, {
namespace,
key,
});
this.#meteringService.incrementUsage(actor, 'kv:write', res?.ConsumedCapacity?.CapacityUnits ?? 1);
if ( this.#enableMigrationFromSQL ) {
const key_hash = murmurhash.v3(key);
await this.#sqlClient.write('DELETE FROM kv WHERE user_id=? AND app=? AND kkey_hash=?',
[user.id, app?.uid ?? DynamoKVStore.LEGACY_GLOBAL_APP_KEY, key_hash]);
}
return true;
}
async list ({ as }: { as?: 'keys' | 'values' | 'entries'; }): Promise<string[] | unknown[] | { key: string; value: unknown; }[]> {
const actor = Context.get('actor');
const app = actor.type?.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) throw new Error('User not found');
const namespace = this.#getNameSpace(actor);
const entriesRes = await this.#ddbClient.query(this.#tableName,
{ namespace });
this.#meteringService.incrementUsage(actor, 'kv:read', entriesRes.ConsumedCapacity?.CapacityUnits ?? 1);
let entries = entriesRes.Items ?? [];
entries = entries?.filter(entry => {
if ( ! entry ) {
return false;
}
if ( entry.ttl && entry.ttl <= (Date.now() / 1000) ) {
return false;
}
return true;
});
if ( this.#enableMigrationFromSQL ) {
const oldEntries = await this.#sqlClient.read('SELECT * FROM kv WHERE user_id=? AND app=?',
[user.id, app?.uid ?? DynamoKVStore.LEGACY_GLOBAL_APP_KEY]);
oldEntries.forEach(oldEntry => {
if ( ! entries.find(e => e.key === oldEntry.kkey) ) {
if ( oldEntry.ttl && oldEntry.ttl <= (Date.now() / 1000) ) {
entries.push({ key: oldEntry.kkey, value: oldEntry.value });
}
}
});
}
entries = entries?.map(entry => ({
key: entry.key,
value: entry.value,
}));
as = as || 'entries';
if ( ! ['keys', 'values', 'entries'].includes(as) ) {
throw APIError.create('field_invalid', undefined, {
key: 'as',
expected: '"keys", "values", or "entries"',
});
}
if ( as === 'keys' ) entries = entries.map(entry => entry.key);
else if ( as === 'values' ) entries = entries.map(entry => entry.value);
return entries;
}
async flush () {
const actor = Context.get('actor');
const app = actor.type.app ?? undefined;
const user = actor.type?.user ?? undefined;
if ( ! user ) throw new Error('User not found');
const namespace = this.#getNameSpace(actor);
// Query all keys
const entriesRes = await this.#ddbClient.query(this.#tableName,
{ namespace });
const entries = entriesRes.Items ?? [];
const readUsage = entriesRes?.ConsumedCapacity?.CapacityUnits ?? 0;
// meter usage
this.#meteringService.incrementUsage(actor, 'kv:read', readUsage);
// TODO DS: implement batch delete so its faster and less demanding on server
const allRes = (await Promise.all(entries.map(entry => {
try {
return this.#ddbClient.del(this.#tableName, {
namespace,
key: entry.key,
});
} catch ( e ) {
console.error('Error deleting key', entry.key, e);
}
}))).filter(Boolean);
const writeUsage = allRes.reduce((acc, curr) => acc + (curr?.ConsumedCapacity?.CapacityUnits ?? 0), 0);
// meter usage
this.#meteringService.incrementUsage(actor, 'kv:write', writeUsage);
if ( this.#enableMigrationFromSQL ) {
await this.#sqlClient.write('DELETE FROM kv WHERE user_id=? AND app=?',
[user.id, app?.uid ?? DynamoKVStore.LEGACY_GLOBAL_APP_KEY]);
}
return !!allRes;
}
async expireAt ({ key, timestamp }: { key: string; timestamp: number; }): Promise<void> {
if ( key === '' ) {
throw APIError.create('field_empty', null, {
key: 'key',
});
}
timestamp = Number(timestamp);
return await this.#expireAt(key, timestamp);
}
async expire ({ key, ttl }: { key: string; ttl: number; }): Promise<void> {
if ( key === '' ) {
throw APIError.create('field_empty', null, {
key: 'key',
});
}
ttl = Number(ttl);
// timestamp in seconds
let timestamp = Math.floor(Date.now() / 1000) + ttl;
return await this.#expireAt(key, timestamp);
}
async #createPaths ( namespace: string, key: string, pathList: string[]) {
// Collect all intermediate map paths for all entries
const allIntermediatePaths = new Set<string>();
pathList.forEach((valPath) => {
const chunks = ['value', ...valPath.split('.')].filter(Boolean);
// For each intermediate map (excluding the leaf)
for ( let i = 1; i < chunks.length; i++ ) {
const subPath = chunks.slice(0, i).join('.');
allIntermediatePaths.add(subPath);
}
});
// TODO DS: make it so that the top layers are checked first to avoid creating each layer multiple times
let writeUnits = 0;
// Ensure each intermediate map layer exists by issuing a separate DynamoDB update for each
for ( const layerPath of allIntermediatePaths ) {
// Build attribute names for the layer
const chunks = layerPath.split('.');
const attrName = chunks.map((chunk) => `#${chunk}`.replaceAll(this.#pathCleanerRegex, '')).join('.');
const expressionNames: Record<string, string> = {};
chunks.forEach((chunk) => {
const cleanedChunk = chunk.split(/\[\d*\]/g)[0];
expressionNames[`#${cleanedChunk}`.replaceAll(this.#pathCleanerRegex, '')] = cleanedChunk;
});
// Issue update to set layer to {} if not exists
const layerUpsertRes = await this.#ddbClient.update(this.#tableName,
{ key, namespace },
`SET ${attrName} = if_not_exists(${attrName}, :emptyMap)`,
{ ':emptyMap': {} },
expressionNames);
writeUnits += layerUpsertRes.ConsumedCapacity?.CapacityUnits ?? 0;
}
return writeUnits;
}
// Ideally the paths support syntax like "a.b[2].c"
async incr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }): Promise<T extends { '': number; } ? number : RecursiveRecord<number>> {
if ( Object.values(pathAndAmountMap).find((v) => typeof v !== 'number') ) {
throw new Error('All values in pathAndAmountMap must be numbers');
}
if ( key === '' ) {
throw APIError.create('field_empty', null, {
key: 'key',
});
}
if ( ! pathAndAmountMap ) {
throw new Error('invalid use of #incr: no pathAndAmountMap');
}
const actor = Context.get('actor');
const user = actor.type?.user ?? undefined;
if ( ! user ) throw new Error('User not found');
const namespace = this.#getNameSpace(actor);
if ( this.#enableMigrationFromSQL ) {
// trigger get to move element if exists
await this.get({ key });
}
const cleanerRegex = /[:\-+/*]/g;
let writeUnits = await this.#createPaths(namespace, key, Object.keys(pathAndAmountMap));
const setStatements = Object.entries(pathAndAmountMap).map(([valPath, _amt], idx) => {
const path = ['value', ...valPath.split('.')].filter(Boolean).join('.');
const attrName = path.split('.').map((chunk) => `#${chunk}`.replaceAll(cleanerRegex, '')).join('.');
return `${attrName} = if_not_exists(${attrName}, :start${idx}) + :incr${idx}`;
});
const valueAttributeValues = Object.entries(pathAndAmountMap).reduce((acc, [_path, amt], idx) => {
acc[`:incr${idx}`] = amt;
acc[`:start${idx}`] = 0;
return acc;
}, {} as Record<string, number>);
const valueAttributeNames = Object.entries(pathAndAmountMap).reduce((acc, [valPath, _amt]) => {
const path = ['value', ...valPath.split('.')].filter(Boolean).join('.');
path.split('.').forEach((chunk) => {
const cleanedChunk = chunk.split(/\[\d*\]/g)[0];
acc[`#${cleanedChunk}`.replaceAll(cleanerRegex, '')] = cleanedChunk;
});
return acc;
}, {} as Record<string, string>);
const res = await this.#ddbClient.update(this.#tableName,
{ key, namespace },
`SET ${[...setStatements].join(', ')}`,
valueAttributeValues,
{ ...valueAttributeNames, '#value': 'value' });
writeUnits += res.ConsumedCapacity?.CapacityUnits ?? 0;
this.#meteringService.incrementUsage(actor, 'kv:write', writeUnits);
return res.Attributes?.value;
}
async decr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }) {
return await this.incr({ key, pathAndAmountMap: Object.fromEntries(Object.entries(pathAndAmountMap).map(([k, v]) => [k, -v])) as T });
}
async #expireAt (key: string, timestamp: number) {
const actor = Context.get('actor');
const user = actor.type?.user ?? undefined;
if ( ! user ) throw new Error('User not found');
const namespace = this.#getNameSpace(actor);
// if possibly migrating from old SQL store, get entry first to move to dynamo
if ( this.#enableMigrationFromSQL ) {
await this.get({ key });
}
const res = await this.#ddbClient.update(this.#tableName,
{ key, namespace },
'SET #ttl = :ttl, #value = if_not_exists(#value, :defaultValue)',
{ ':ttl': timestamp, ':defaultValue': null },
{ '#ttl': 'ttl', '#value': 'value' });
// meter usage
this.#meteringService.incrementUsage(actor, 'kv:write', res?.ConsumedCapacity?.CapacityUnits ?? 1);
}
}

View File

@@ -0,0 +1,38 @@
import { BaseService } from '@heyputer/backend/src/services/BaseService.js';
import { DynamoKVStore } from './DynamoKVStore.js';
/**
* Wrapping implemenation for traits registration and use in our core structure
*/
class DynamoKVStoreServiceWrapper extends BaseService {
kvStore!: DynamoKVStore;
async _init () {
this.kvStore = new DynamoKVStore({
ddbClient: this.services.get('dynamo'),
sqlClient: this.services.get('database').get(),
meteringService: this.services.get('meteringService').meteringService,
tableName: this.config.tableName || 'store-kv-v1',
});
await this.kvStore.createTableIfNotExists();
Object.getOwnPropertyNames(DynamoKVStore.prototype).forEach(fn => {
if ( fn === 'constructor' ) return;
this[fn] = (...args: unknown[]) => this.kvStore[fn](...args);
});
}
static IMPLEMENTS = {
['puter-kvstore']: Object.getOwnPropertyNames(DynamoKVStore.prototype)
.filter(n => n !== 'constructor')
.reduce((acc, fn) => ({
...acc,
[fn]: async function (...a) {
return await (this as DynamoKVStoreServiceWrapper).kvStore[fn](...a);
},
}), {}),
};
}
export type IDynamoKVStoreWrapper = DynamoKVStoreServiceWrapper;
export const DynamoKVStoreWrapper = DynamoKVStoreServiceWrapper as unknown as DynamoKVStore;

View File

@@ -0,0 +1,25 @@
import { CreateTableCommandInput } from '@aws-sdk/client-dynamodb';
export const PUTER_KV_STORE_TABLE_DEFINITION: CreateTableCommandInput = {
TableName: 'store-kv-v1',
BillingMode: 'PAY_PER_REQUEST',
AttributeDefinitions: [
{ AttributeName: 'namespace', AttributeType: 'S' },
{ AttributeName: 'key', AttributeType: 'S' },
{ AttributeName: 'lsi1', AttributeType: 'S' },
],
KeySchema: [
{ AttributeName: 'namespace', KeyType: 'HASH' },
{ AttributeName: 'key', KeyType: 'RANGE' },
],
LocalSecondaryIndexes: [
{
IndexName: 'lsi1-index',
KeySchema: [
{ AttributeName: 'namespace', KeyType: 'HASH' },
{ AttributeName: 'lsi1', KeyType: 'RANGE' },
],
Projection: { ProjectionType: 'ALL' },
},
],
};

View File

@@ -27,6 +27,7 @@ import { HTTPThumbnailService } from '../src/services/thumbnails/HTTPThumbnailSe
import { consoleLogManager } from '../src/util/consolelog.js';
import { Context } from '../src/util/context.js';
import { TestCoreModule } from '../src/modules/test-core/TestCoreModule.js';
import { config } from '../src/loadTestConfig.js';
const { BaseService, EssentialModules } = why;
/**
@@ -291,6 +292,16 @@ export const createTestKernel = async ({
const initLevelMap = { CONSTRUCT: 1, INIT: 2 };
const initLevel = initLevelMap[(`${initLevelString}`).toUpperCase()];
config.load_config({
'services': {
database: {
path: ':memory:',
},
dynamo: {
path: ':memory:',
},
},
});
const testKernel = new TestKernel();
testKernel.add_module(new Core2Module());
if ( testCore ) testKernel.add_module(new TestCoreModule());
@@ -308,8 +319,8 @@ export const createTestKernel = async ({
for ( const name of service_names ) {
const serviceConfigOverride = serviceConfigOverrideMap[name] ;
const globalConfigOverride = globalConfigOverrideMap[name] ;
const serviceConfigOverride = serviceConfigOverrideMap[name];
const globalConfigOverride = globalConfigOverrideMap[name];
if ( serviceConfigOverride ) {
const ins = testKernel.services.instances_[name];