fixes prompt logging for streamed Claude requests

This commit is contained in:
nai-degen 2023-06-04 12:16:29 -05:00
parent dae1262f7a
commit 38ff0e3f3b
2 changed files with 17 additions and 14 deletions

View File

@ -83,9 +83,8 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
res.flushHeaders(); res.flushHeaders();
} }
const fullChunks: string[] = []; const originalEvents: string[] = [];
let chunkBuffer: string[] = []; let partialMessage = "";
let messageBuffer = "";
let lastPosition = 0; let lastPosition = 0;
type ProxyResHandler<T extends unknown> = (...args: T[]) => void; type ProxyResHandler<T extends unknown> = (...args: T[]) => void;
@ -101,20 +100,17 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
proxyRes.on( proxyRes.on(
"data", "data",
withErrorHandling((chunk) => { withErrorHandling((chunk: Buffer) => {
// We may receive multiple (or partial) SSE messages in a single chunk, // We may receive multiple (or partial) SSE messages in a single chunk,
// so we need to buffer and emit seperate stream events for full // so we need to buffer and emit seperate stream events for full
// messages so we can parse/transform them properly. // messages so we can parse/transform them properly.
const str = chunk.toString(); const str = chunk.toString();
chunkBuffer.push(str);
const newMessages = (messageBuffer + chunkBuffer.join("")).split( // Anthropic uses CRLF line endings (out-of-spec btw)
/\r?\n\r?\n/ // Anthropic uses CRLF line endings (out-of-spec btw) const fullMessages = (partialMessage + str).split(/\r?\n\r?\n/);
); partialMessage = fullMessages.pop() || "";
chunkBuffer = [];
messageBuffer = newMessages.pop() || "";
for (const message of newMessages) { for (const message of fullMessages) {
proxyRes.emit("full-sse-event", message); proxyRes.emit("full-sse-event", message);
} }
}) })
@ -123,13 +119,13 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
proxyRes.on( proxyRes.on(
"full-sse-event", "full-sse-event",
withErrorHandling((data) => { withErrorHandling((data) => {
originalEvents.push(data);
const { event, position } = transformEvent({ const { event, position } = transformEvent({
data, data,
requestApi: req.inboundApi, requestApi: req.inboundApi,
responseApi: req.outboundApi, responseApi: req.outboundApi,
lastPosition, lastPosition,
}); });
fullChunks.push(event);
lastPosition = position; lastPosition = position;
res.write(event + "\n\n"); res.write(event + "\n\n");
}) })
@ -138,7 +134,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
proxyRes.on( proxyRes.on(
"end", "end",
withErrorHandling(() => { withErrorHandling(() => {
let finalBody = convertEventsToFinalResponse(fullChunks, req); let finalBody = convertEventsToFinalResponse(originalEvents, req);
req.log.info({ key: key.hash }, `Finished proxying SSE stream.`); req.log.info({ key: key.hash }, `Finished proxying SSE stream.`);
res.end(); res.end();
resolve(finalBody); resolve(finalBody);
@ -231,6 +227,12 @@ function copyHeaders(proxyRes: http.IncomingMessage, res: Response) {
} }
} }
/**
* Converts the list of incremental SSE events into an object that resembles a
* full, non-streamed response from the API so that subsequent middleware can
* operate on it as if it were a normal response.
* Events are expected to be in the format they were received from the API.
*/
function convertEventsToFinalResponse(events: string[], req: Request) { function convertEventsToFinalResponse(events: string[], req: Request) {
if (req.outboundApi === "openai") { if (req.outboundApi === "openai") {
let response: OpenAiChatCompletionResponse = { let response: OpenAiChatCompletionResponse = {

View File

@ -4,6 +4,7 @@ import { AIService } from "../../../key-management";
import { logQueue } from "../../../prompt-logging"; import { logQueue } from "../../../prompt-logging";
import { isCompletionRequest } from "../common"; import { isCompletionRequest } from "../common";
import { ProxyResHandlerWithBody } from "."; import { ProxyResHandlerWithBody } from ".";
import { logger } from "../../../logger";
/** If prompt logging is enabled, enqueues the prompt for logging. */ /** If prompt logging is enabled, enqueues the prompt for logging. */
export const logPrompt: ProxyResHandlerWithBody = async ( export const logPrompt: ProxyResHandlerWithBody = async (
@ -57,7 +58,7 @@ const getPromptForRequest = (req: Request): string | OaiMessage[] => {
const flattenMessages = (messages: string | OaiMessage[]): string => { const flattenMessages = (messages: string | OaiMessage[]): string => {
if (typeof messages === "string") { if (typeof messages === "string") {
return messages; return messages.trim();
} }
return messages.map((m) => `${m.role}: ${m.content}`).join("\n"); return messages.map((m) => `${m.role}: ${m.content}`).join("\n");
}; };