fixes AWS mid-stream rate limits not actually marking key as rate-limited
This commit is contained in:
parent
52ec2ec265
commit
89e9b67f3f
|
@ -11,6 +11,7 @@ import { decodeResponseBody, RawResponseBodyHandler, RetryableError } from ".";
|
|||
import { SSEStreamAdapter } from "./streaming/sse-stream-adapter";
|
||||
import { SSEMessageTransformer } from "./streaming/sse-message-transformer";
|
||||
import { EventAggregator } from "./streaming/event-aggregator";
|
||||
import { keyPool } from "../../../shared/key-management";
|
||||
|
||||
const pipelineAsync = promisify(pipeline);
|
||||
|
||||
|
@ -61,7 +62,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
|
|||
const adapter = new SSEStreamAdapter({ contentType });
|
||||
const aggregator = new EventAggregator({ format: req.outboundApi });
|
||||
const transformer = new SSEMessageTransformer({
|
||||
inputFormat: req.outboundApi, // outbound from the request's perspective
|
||||
inputFormat: req.outboundApi,
|
||||
inputApiVersion: String(req.headers["anthropic-version"]),
|
||||
logger: req.log,
|
||||
requestId: String(req.id),
|
||||
|
@ -82,7 +83,8 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
|
|||
return aggregator.getFinalResponse();
|
||||
} catch (err) {
|
||||
if (err instanceof RetryableError) {
|
||||
req.log.info(
|
||||
keyPool.markRateLimited(req.key!)
|
||||
req.log.warn(
|
||||
{ key: req.key!.hash, retryCount: req.retryCount },
|
||||
`Re-enqueueing request due to retryable error during streaming response.`
|
||||
);
|
||||
|
|
|
@ -73,6 +73,7 @@ export class SSEMessageTransformer extends Transform {
|
|||
if (!transformedMessage) return callback();
|
||||
|
||||
if (this.msgCount === 1) {
|
||||
// TODO: does this need to be skipped for passthroughToOpenAI?
|
||||
this.push(createInitialMessage(transformedMessage));
|
||||
}
|
||||
this.push(transformedMessage);
|
||||
|
|
|
@ -40,6 +40,7 @@ export class SSEStreamAdapter extends Transform {
|
|||
protected processAwsEvent(event: AwsEventStreamMessage): string | null {
|
||||
const { payload, headers } = event;
|
||||
if (headers[":message-type"] === "exception" || !payload.bytes) {
|
||||
const eventStr = JSON.stringify(event);
|
||||
// Under high load, AWS can rugpull us by returning a 200 and starting the
|
||||
// stream but then immediately sending a rate limit error as the first
|
||||
// event. My guess is some race condition in their rate limiting check
|
||||
|
@ -47,18 +48,17 @@ export class SSEStreamAdapter extends Transform {
|
|||
// concurrency slot is available.
|
||||
if (headers[":exception-type"] === "throttlingException") {
|
||||
log.warn(
|
||||
{ event: JSON.stringify(event) },
|
||||
{ event: eventStr },
|
||||
"AWS request throttled after streaming has already started; retrying"
|
||||
);
|
||||
throw new RetryableError("AWS request throttled mid-stream");
|
||||
}
|
||||
|
||||
} else {
|
||||
log.error(
|
||||
{ event: JSON.stringify(event) },
|
||||
"Received bad streaming event from AWS"
|
||||
{ event: eventStr },
|
||||
"Received unexpected AWS event stream message"
|
||||
);
|
||||
const message = JSON.stringify(event);
|
||||
return getFakeErrorCompletion("proxy AWS error", message);
|
||||
return getFakeErrorCompletion("proxy AWS error", eventStr);
|
||||
}
|
||||
} else {
|
||||
const { bytes } = payload;
|
||||
// technically this is a transformation but we don't really distinguish
|
||||
|
|
Loading…
Reference in New Issue