dev: integrate stream normalization for Claude

This commit is contained in:
KernelDeimos
2025-02-04 13:10:14 -05:00
parent 0ce8fdc9f6
commit 437467e0ce
2 changed files with 65 additions and 90 deletions
@@ -18,6 +18,7 @@
*/
// METADATA // {"ai-commented":{"service":"claude"}}
const { PassThrough } = require("stream");
const APIError = require("../../api/APIError");
const config = require("../../config");
const { PermissionUtil } = require("../../services/auth/PermissionService");
@@ -29,6 +30,7 @@ const { Context } = require("../../util/context");
const { AsModeration } = require("./lib/AsModeration");
const FunctionCalling = require("./lib/FunctionCalling");
const Messages = require("./lib/Messages");
const Streaming = require("./lib/Streaming");
// Maximum number of fallback attempts when a model fails, including the first attempt
const MAX_FALLBACKS = 3 + 1; // includes first attempt
@@ -380,7 +382,7 @@ class AIChatService extends BaseService {
if ( intended_service === this.service_name ) {
throw new Error('Calling ai-chat directly is not yet supported');
}
const svc_driver = this.services.get('driver');
let ret, error;
let service_used = intended_service;
@@ -526,6 +528,24 @@ class AIChatService extends BaseService {
usage,
});
})();
if ( ret.result.value.init_chat_stream ) {
const stream = new PassThrough();
const retval = new TypedValue({
$: 'stream',
content_type: 'application/x-ndjson',
chunked: true,
}, stream);
const chatStream = new Streaming.AIChatStream({
stream,
});
ret.result.value.init_chat_stream({ chatStream });
return retval;
}
return ret.result.value.response;
} else {
await svc_event.emit('ai.prompt.report-usage', {
@@ -125,13 +125,7 @@ class ClaudeService extends BaseService {
if ( stream ) {
let usage_promise = new TeePromise();
const stream = new PassThrough();
const retval = new TypedValue({
$: 'stream',
content_type: 'application/x-ndjson',
chunked: true,
}, stream);
(async () => {
const init_chat_stream = async ({ chatStream }) => {
const completion = await this.anthropic.messages.stream({
model: model ?? this.get_default_model(),
max_tokens: (model === 'claude-3-5-sonnet-20241022' || model === 'claude-3-5-sonnet-20240620') ? 8192 : 4096,
@@ -142,76 +136,7 @@ class ClaudeService extends BaseService {
});
const counts = { input_tokens: 0, output_tokens: 0 };
let content_block; // for when it's buffered ("tool use")
let buffer = '';
let state;
const STATES = {
ready: {
on_event: (event) => {
if ( event.type === 'content_block_start' ) {
if ( event.content_block.type === 'text' ) {
state = STATES.message;
} else if ( event.content_block.type === 'tool_use' ) {
state = STATES.tool_use;
content_block = event.content_block;
buffer = '';
}
}
}
},
message: {
on_event: (event) => {
if ( event.type === 'content_block_stop' ) {
state = STATES.ready;
}
if (
event.type !== 'content_block_delta' ||
event.delta.type !== 'text_delta'
) return;
const str = JSON.stringify({
type: 'text',
text: event.delta.text,
});
stream.write(str + '\n');
}
},
tool_use: {
on_event: (event) => {
if ( event.type === 'content_block_stop' ) {
state = STATES.ready;
// Yeah... claude will send an empty string instead of
// an empty object when there's no input. So we have to
// check for that. Good job, Anthropic.
if ( buffer === '' ) {
buffer = '{}';
}
const str = JSON.stringify({
...content_block,
// deprecated {}.tool_use (because it was like that before)
tool_use: {
...content_block,
input: JSON.parse(buffer),
},
});
stream.write(str + '\n');
buffer = '';
return;
}
if (
event.type !== 'content_block_delta' ||
event.delta.type !== 'input_json_delta'
) return;
buffer += event.delta.partial_json;
}
}
};
state = STATES.ready;
let message, contentBlock;
for await ( const event of completion ) {
// console.log('EVENT', event);
const input_tokens =
@@ -222,24 +147,54 @@ class ClaudeService extends BaseService {
if ( input_tokens ) counts.input_tokens += input_tokens;
if ( output_tokens ) counts.output_tokens += output_tokens;
state.on_event(event);
if ( event.type === 'message_start' ) {
message = chatStream.message();
continue;
}
if ( event.type === 'message_stop' ) {
message.end();
message = null;
continue;
}
if (
event.type !== 'content_block_delta' ||
event.delta.type !== 'text_delta'
) continue;
const str = JSON.stringify({
type: 'text',
text: event.delta.text,
});
if ( event.type === 'content_block_start' ) {
if ( event.content_block.type === 'tool_use' ) {
contentBlock = message.contentBlock({
type: event.content_block.type,
id: event.content_block.id,
name: event.content_block.name,
});
continue;
}
contentBlock = message.contentBlock({
type: event.content_block.type,
});
continue;
}
if ( event.type === 'content_block_stop' ) {
contentBlock.end();
contentBlock = null;
continue;
}
if ( event.type === 'content_block_delta' ) {
if ( event.delta.type === 'input_json_delta' ) {
contentBlock.addPartialJSON(event.delta.partial_json);
continue;
}
if ( event.delta.type === 'text_delta' ) {
contentBlock.addText(event.delta.text);
continue;
}
}
}
stream.end();
usage_promise.resolve(counts);
})();
};
return new TypedValue({ $: 'ai-chat-intermediate' }, {
init_chat_stream,
stream: true,
response: retval,
usage_promise: usage_promise,
});
}