fix: togetherAi metering

This commit is contained in:
Daniel Salazar
2025-12-03 13:30:01 -08:00
parent 2db3331729
commit 2b3eb76d90
3 changed files with 117 additions and 135 deletions

View File

@@ -39,11 +39,11 @@ class ClaudeService extends BaseService {
// Traits definitions
static IMPLEMENTS = {
['puter-chat-completion']: {
async models () {
return this.models();
async models (...args) {
return this.models(...args);
},
async list () {
return this.list();
async list (...args) {
return this.list(...args);
},
async complete (...args) {
return this.complete(...args);

View File

@@ -6,7 +6,6 @@ import OpenAIUtil from '../lib/OpenAIUtil.js';
import { Context } from '../../../util/context.js';
import { models } from './models.mjs';
export class GeminiService extends BaseService {
/**
* @type {import('../../services/MeteringService/MeteringService').MeteringService}
@@ -17,14 +16,14 @@ export class GeminiService extends BaseService {
static IMPLEMENTS = {
['puter-chat-completion']: {
async models () {
return await this.models();
async models (...args) {
return await this.models(...args);
},
async complete (...args) {
return await this.complete(...args);
},
async list () {
return await this.list();
async list (...args) {
return await this.list(...args);
},
},
};
@@ -89,7 +88,7 @@ export class GeminiService extends BaseService {
console.error('Gemini completion error: ', e);
throw e;
}
const modelDetails = (await this.models()).find(m => m.id === model);
return OpenAIUtil.handle_completion_output({
usage_calculator: ({ usage }) => {

View File

@@ -25,6 +25,8 @@ const { nou } = require('../../util/langutil');
const { Together } = require('together-ai');
const OpenAIUtil = require('./lib/OpenAIUtil');
const { Context } = require('../../util/context');
const uuidv4 = require('uuid').v4;
const kv = globalThis.kv;
/**
* TogetherAIService class provides integration with Together AI's language models.
@@ -34,26 +36,32 @@ const { Context } = require('../../util/context');
* @extends BaseService
*/
class TogetherAIService extends BaseService {
/**
* @type {import('../../services/MeteringService/MeteringService').MeteringService}
*/
/** @type {import('../../services/MeteringService/MeteringService').MeteringService} */
meteringService;
static MODULES = {
kv: globalThis.kv,
uuidv4: require('uuid').v4,
/** @type {import('together-ai').Together} */
together;
// trait definitions
static IMPLEMENTS = {
'puter-chat-completion': {
async models (...args) {
return await this.models(...args);
},
async list () {
return await this.list();
},
async complete (...args) {
return await this.complete(...args);
},
},
};
/**
* Initializes the TogetherAI service by setting up the API client and registering as a chat provider
* @async
* @returns {Promise<void>}
* @private
*/
async _init () {
this.together = new Together({
apiKey: this.config.apiKey,
});
this.kvkey = this.modules.uuidv4();
this.kvkey = uuidv4();
const svc_aiChat = this.services.get('ai-chat');
svc_aiChat.register_provider({
@@ -71,120 +79,12 @@ class TogetherAIService extends BaseService {
return 'meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo';
}
static IMPLEMENTS = {
['puter-chat-completion']: {
/**
* Returns a list of available models and their details.
* See AIChatService for more information.
*
* @returns Promise<Array<Object>> Array of model details
*/
async models () {
return await this.models_();
},
/**
* Returns a list of available model names including their aliases
* @returns {Promise<string[]>} Array of model identifiers and their aliases
* @description Retrieves all available model IDs and their aliases,
* flattening them into a single array of strings that can be used for model selection
*/
async list () {
let models = this.modules.kv.get(`${this.kvkey}:models`);
if ( ! models ) models = await this.models_();
return models.map(model => model.id);
},
/**
* AI Chat completion method.
* See AIChatService for more details.
*/
async complete ({ messages, stream, model }) {
if ( model === 'model-fallback-test-1' ) {
throw new Error('Model Fallback Test 1');
}
/** @type {import('together-ai/streaming.mjs').Stream<import("together-ai/resources/chat/completions.mjs").ChatCompletionChunk>} */
const completion = await this.together.chat.completions.create({
model: model ?? this.get_default_model(),
messages: messages,
stream,
});
// Metering integration
const actor = Context.get('actor');
const modelDetails = (await this.models_()).find(m => m.id === model || m.aliases?.include(model));
const modelId = modelDetails ?? this.get_default_model();
if ( stream ) {
const stream = new PassThrough();
const retval = new TypedValue({
$: 'stream',
content_type: 'application/x-ndjson',
chunked: true,
}, stream);
(async () => {
for await ( const chunk of completion ) {
// DRY: same as openai
if ( chunk.usage ) {
// Metering: record usage for streamed chunks
const trackedUsage = OpenAIUtil.extractMeteredUsage(chunk.usage);
const costOverrides = {
prompt_tokens: trackedUsage.prompt_tokens * (modelDetails?.cost?.input ?? 0),
completion_tokens: trackedUsage.completion_tokens * (modelDetails?.cost?.output ?? 0),
};
this.meteringService.utilRecordUsageObject(trackedUsage, actor, modelId, costOverrides);
}
if ( chunk.choices.length < 1 ) continue;
if ( chunk.choices[0].finish_reason ) {
stream.end();
break;
}
if ( nou(chunk.choices[0].delta.content) ) continue;
const str = JSON.stringify({
text: chunk.choices[0].delta.content,
});
stream.write(`${str }\n`);
}
stream.end();
})();
return {
stream: true,
response: retval,
};
}
const ret = completion.choices[0];
ret.usage = {
input_tokens: completion.usage.prompt_tokens,
output_tokens: completion.usage.completion_tokens,
};
const trackedUsage = OpenAIUtil.extractMeteredUsage(completion.usage);
const costOverrides = {
prompt_tokens: trackedUsage.prompt_tokens * (modelDetails?.cost?.input ?? 0),
completion_tokens: trackedUsage.completion_tokens * (modelDetails?.cost?.output ?? 0),
};
// Metering: record usage for non-streamed completion
this.meteringService.utilRecordUsageObject(completion.usage, actor, modelId, costOverrides);
return ret;
},
},
};
/**
* Fetches and caches available AI models from Together API
* @private
* @returns Array of model objects containing id, name, context length,
* description and pricing information
* @remarks Models are cached for 5 minutes in KV store
* @returns
*/
async models_ () {
let models = this.modules.kv.get(`${this.kvkey}:models`);
async models () {
let models = kv.get(`${this.kvkey}:models`);
if ( models ) return models;
const api_models = await this.together.models.list();
models = [];
@@ -214,9 +114,92 @@ class TogetherAIService extends BaseService {
output: 10,
},
});
this.modules.kv.set(`${this.kvkey}:models`, models, { EX: 5 * 60 });
kv.set(`${this.kvkey}:models`, models, { EX: 5 * 60 });
return models;
}
async list () {
let models = kv.get(`${this.kvkey}:models`);
if ( ! models ) models = await this.models();
return models.map(model => model.id);
}
async complete ({ messages, stream, model }) {
if ( model === 'model-fallback-test-1' ) {
throw new Error('Model Fallback Test 1');
}
/** @type {import('together-ai/streaming.mjs').Stream<import("together-ai/resources/chat/completions.mjs").ChatCompletionChunk>} */
const completion = await this.together.chat.completions.create({
model: model ?? this.get_default_model(),
messages: messages,
stream,
});
// Metering integration
const actor = Context.get('actor');
const modelDetails = (await this.models_()).find(m => m.id === model || m.aliases?.include(model));
const modelId = modelDetails ?? this.get_default_model();
if ( stream ) {
const stream = new PassThrough();
const retval = new TypedValue({
$: 'stream',
content_type: 'application/x-ndjson',
chunked: true,
}, stream);
(async () => {
for await ( const chunk of completion ) {
// DRY: same as openai
if ( chunk.usage ) {
// Metering: record usage for streamed chunks
const trackedUsage = OpenAIUtil.extractMeteredUsage(chunk.usage);
const costOverrides = {
prompt_tokens: trackedUsage.prompt_tokens * (modelDetails?.cost?.input ?? 0),
completion_tokens: trackedUsage.completion_tokens * (modelDetails?.cost?.output ?? 0),
};
this.meteringService.utilRecordUsageObject(trackedUsage, actor, modelId, costOverrides);
}
if ( chunk.choices.length < 1 ) continue;
if ( chunk.choices[0].finish_reason ) {
stream.end();
break;
}
if ( nou(chunk.choices[0].delta.content) ) continue;
const str = JSON.stringify({
text: chunk.choices[0].delta.content,
});
stream.write(`${str }\n`);
}
stream.end();
})();
return {
stream: true,
response: retval,
};
}
const ret = completion.choices[0];
ret.usage = {
input_tokens: completion.usage.prompt_tokens,
output_tokens: completion.usage.completion_tokens,
};
const trackedUsage = OpenAIUtil.extractMeteredUsage(completion.usage);
const costOverrides = {
prompt_tokens: trackedUsage.prompt_tokens * (modelDetails?.cost?.input ?? 0),
completion_tokens: trackedUsage.completion_tokens * (modelDetails?.cost?.output ?? 0),
};
// Metering: record usage for non-streamed completion
this.meteringService.utilRecordUsageObject(completion.usage, actor, modelId, costOverrides);
return ret;
}
}
module.exports = {