diff --git a/src/proxy/middleware/response/handle-streamed-response.ts b/src/proxy/middleware/response/handle-streamed-response.ts index 77850f5..b07c86b 100644 --- a/src/proxy/middleware/response/handle-streamed-response.ts +++ b/src/proxy/middleware/response/handle-streamed-response.ts @@ -11,6 +11,7 @@ import { decodeResponseBody, RawResponseBodyHandler, RetryableError } from "."; import { SSEStreamAdapter } from "./streaming/sse-stream-adapter"; import { SSEMessageTransformer } from "./streaming/sse-message-transformer"; import { EventAggregator } from "./streaming/event-aggregator"; +import { keyPool } from "../../../shared/key-management"; const pipelineAsync = promisify(pipeline); @@ -61,7 +62,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async ( const adapter = new SSEStreamAdapter({ contentType }); const aggregator = new EventAggregator({ format: req.outboundApi }); const transformer = new SSEMessageTransformer({ - inputFormat: req.outboundApi, // outbound from the request's perspective + inputFormat: req.outboundApi, inputApiVersion: String(req.headers["anthropic-version"]), logger: req.log, requestId: String(req.id), @@ -82,7 +83,8 @@ export const handleStreamedResponse: RawResponseBodyHandler = async ( return aggregator.getFinalResponse(); } catch (err) { if (err instanceof RetryableError) { - req.log.info( + keyPool.markRateLimited(req.key!) + req.log.warn( { key: req.key!.hash, retryCount: req.retryCount }, `Re-enqueueing request due to retryable error during streaming response.` ); diff --git a/src/proxy/middleware/response/streaming/sse-message-transformer.ts b/src/proxy/middleware/response/streaming/sse-message-transformer.ts index 33d2882..5bd0b8e 100644 --- a/src/proxy/middleware/response/streaming/sse-message-transformer.ts +++ b/src/proxy/middleware/response/streaming/sse-message-transformer.ts @@ -73,6 +73,7 @@ export class SSEMessageTransformer extends Transform { if (!transformedMessage) return callback(); if (this.msgCount === 1) { + // TODO: does this need to be skipped for passthroughToOpenAI? this.push(createInitialMessage(transformedMessage)); } this.push(transformedMessage); diff --git a/src/proxy/middleware/response/streaming/sse-stream-adapter.ts b/src/proxy/middleware/response/streaming/sse-stream-adapter.ts index 734289f..f3f7654 100644 --- a/src/proxy/middleware/response/streaming/sse-stream-adapter.ts +++ b/src/proxy/middleware/response/streaming/sse-stream-adapter.ts @@ -40,6 +40,7 @@ export class SSEStreamAdapter extends Transform { protected processAwsEvent(event: AwsEventStreamMessage): string | null { const { payload, headers } = event; if (headers[":message-type"] === "exception" || !payload.bytes) { + const eventStr = JSON.stringify(event); // Under high load, AWS can rugpull us by returning a 200 and starting the // stream but then immediately sending a rate limit error as the first // event. My guess is some race condition in their rate limiting check @@ -47,18 +48,17 @@ export class SSEStreamAdapter extends Transform { // concurrency slot is available. if (headers[":exception-type"] === "throttlingException") { log.warn( - { event: JSON.stringify(event) }, + { event: eventStr }, "AWS request throttled after streaming has already started; retrying" ); throw new RetryableError("AWS request throttled mid-stream"); + } else { + log.error( + { event: eventStr }, + "Received unexpected AWS event stream message" + ); + return getFakeErrorCompletion("proxy AWS error", eventStr); } - - log.error( - { event: JSON.stringify(event) }, - "Received bad streaming event from AWS" - ); - const message = JSON.stringify(event); - return getFakeErrorCompletion("proxy AWS error", message); } else { const { bytes } = payload; // technically this is a transformation but we don't really distinguish