From 437467e0cedfa4f985ab42c67490eee5dc42329a Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Tue, 4 Feb 2025 13:10:14 -0500 Subject: [PATCH] dev: integrate stream normalization for Claude --- .../src/modules/puterai/AIChatService.js | 22 ++- .../src/modules/puterai/ClaudeService.js | 133 ++++++------------ 2 files changed, 65 insertions(+), 90 deletions(-) diff --git a/src/backend/src/modules/puterai/AIChatService.js b/src/backend/src/modules/puterai/AIChatService.js index 0541c65e1..677ea6ae7 100644 --- a/src/backend/src/modules/puterai/AIChatService.js +++ b/src/backend/src/modules/puterai/AIChatService.js @@ -18,6 +18,7 @@ */ // METADATA // {"ai-commented":{"service":"claude"}} +const { PassThrough } = require("stream"); const APIError = require("../../api/APIError"); const config = require("../../config"); const { PermissionUtil } = require("../../services/auth/PermissionService"); @@ -29,6 +30,7 @@ const { Context } = require("../../util/context"); const { AsModeration } = require("./lib/AsModeration"); const FunctionCalling = require("./lib/FunctionCalling"); const Messages = require("./lib/Messages"); +const Streaming = require("./lib/Streaming"); // Maximum number of fallback attempts when a model fails, including the first attempt const MAX_FALLBACKS = 3 + 1; // includes first attempt @@ -380,7 +382,7 @@ class AIChatService extends BaseService { if ( intended_service === this.service_name ) { throw new Error('Calling ai-chat directly is not yet supported'); } - + const svc_driver = this.services.get('driver'); let ret, error; let service_used = intended_service; @@ -526,6 +528,24 @@ class AIChatService extends BaseService { usage, }); })(); + + if ( ret.result.value.init_chat_stream ) { + const stream = new PassThrough(); + const retval = new TypedValue({ + $: 'stream', + content_type: 'application/x-ndjson', + chunked: true, + }, stream); + + const chatStream = new Streaming.AIChatStream({ + stream, + }); + + ret.result.value.init_chat_stream({ chatStream }); + + return retval; + } + return ret.result.value.response; } else { await svc_event.emit('ai.prompt.report-usage', { diff --git a/src/backend/src/modules/puterai/ClaudeService.js b/src/backend/src/modules/puterai/ClaudeService.js index 0ee08d750..473f9125a 100644 --- a/src/backend/src/modules/puterai/ClaudeService.js +++ b/src/backend/src/modules/puterai/ClaudeService.js @@ -125,13 +125,7 @@ class ClaudeService extends BaseService { if ( stream ) { let usage_promise = new TeePromise(); - const stream = new PassThrough(); - const retval = new TypedValue({ - $: 'stream', - content_type: 'application/x-ndjson', - chunked: true, - }, stream); - (async () => { + const init_chat_stream = async ({ chatStream }) => { const completion = await this.anthropic.messages.stream({ model: model ?? this.get_default_model(), max_tokens: (model === 'claude-3-5-sonnet-20241022' || model === 'claude-3-5-sonnet-20240620') ? 8192 : 4096, @@ -142,76 +136,7 @@ class ClaudeService extends BaseService { }); const counts = { input_tokens: 0, output_tokens: 0 }; - let content_block; // for when it's buffered ("tool use") - let buffer = ''; - - let state; - const STATES = { - ready: { - on_event: (event) => { - if ( event.type === 'content_block_start' ) { - if ( event.content_block.type === 'text' ) { - state = STATES.message; - } else if ( event.content_block.type === 'tool_use' ) { - state = STATES.tool_use; - content_block = event.content_block; - buffer = ''; - } - } - } - }, - message: { - on_event: (event) => { - if ( event.type === 'content_block_stop' ) { - state = STATES.ready; - } - if ( - event.type !== 'content_block_delta' || - event.delta.type !== 'text_delta' - ) return; - const str = JSON.stringify({ - type: 'text', - text: event.delta.text, - }); - stream.write(str + '\n'); - } - }, - tool_use: { - on_event: (event) => { - if ( event.type === 'content_block_stop' ) { - state = STATES.ready; - - // Yeah... claude will send an empty string instead of - // an empty object when there's no input. So we have to - // check for that. Good job, Anthropic. - if ( buffer === '' ) { - buffer = '{}'; - } - const str = JSON.stringify({ - ...content_block, - - // deprecated {}.tool_use (because it was like that before) - tool_use: { - ...content_block, - input: JSON.parse(buffer), - }, - }); - stream.write(str + '\n'); - buffer = ''; - return; - } - - if ( - event.type !== 'content_block_delta' || - event.delta.type !== 'input_json_delta' - ) return; - - buffer += event.delta.partial_json; - } - } - }; - state = STATES.ready; - + let message, contentBlock; for await ( const event of completion ) { // console.log('EVENT', event); const input_tokens = @@ -222,24 +147,54 @@ class ClaudeService extends BaseService { if ( input_tokens ) counts.input_tokens += input_tokens; if ( output_tokens ) counts.output_tokens += output_tokens; - state.on_event(event); + if ( event.type === 'message_start' ) { + message = chatStream.message(); + continue; + } + if ( event.type === 'message_stop' ) { + message.end(); + message = null; + continue; + } - if ( - event.type !== 'content_block_delta' || - event.delta.type !== 'text_delta' - ) continue; - const str = JSON.stringify({ - type: 'text', - text: event.delta.text, - }); + if ( event.type === 'content_block_start' ) { + if ( event.content_block.type === 'tool_use' ) { + contentBlock = message.contentBlock({ + type: event.content_block.type, + id: event.content_block.id, + name: event.content_block.name, + }); + continue; + } + contentBlock = message.contentBlock({ + type: event.content_block.type, + }); + continue; + } + + if ( event.type === 'content_block_stop' ) { + contentBlock.end(); + contentBlock = null; + continue; + } + + if ( event.type === 'content_block_delta' ) { + if ( event.delta.type === 'input_json_delta' ) { + contentBlock.addPartialJSON(event.delta.partial_json); + continue; + } + if ( event.delta.type === 'text_delta' ) { + contentBlock.addText(event.delta.text); + continue; + } + } } - stream.end(); usage_promise.resolve(counts); - })(); + }; return new TypedValue({ $: 'ai-chat-intermediate' }, { + init_chat_stream, stream: true, - response: retval, usage_promise: usage_promise, }); }