fixes empty AWS streaming responses when under heavy load

This commit is contained in:
nai-degen 2023-10-15 00:05:36 -05:00
parent af4d8dae40
commit f6cfc6e882
2 changed files with 34 additions and 6 deletions

View File

@ -1,3 +1,4 @@
import express from "express";
import { pipeline } from "stream"; import { pipeline } from "stream";
import { promisify } from "util"; import { promisify } from "util";
import { import {
@ -5,7 +6,8 @@ import {
copySseResponseHeaders, copySseResponseHeaders,
initializeSseStream initializeSseStream
} from "../../../shared/streaming"; } from "../../../shared/streaming";
import { decodeResponseBody, RawResponseBodyHandler } from "."; import { enqueue } from "../../queue";
import { decodeResponseBody, RawResponseBodyHandler, RetryableError } from ".";
import { SSEStreamAdapter } from "./streaming/sse-stream-adapter"; import { SSEStreamAdapter } from "./streaming/sse-stream-adapter";
import { SSEMessageTransformer } from "./streaming/sse-message-transformer"; import { SSEMessageTransformer } from "./streaming/sse-message-transformer";
import { EventAggregator } from "./streaming/event-aggregator"; import { EventAggregator } from "./streaming/event-aggregator";
@ -33,7 +35,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
} }
if (proxyRes.statusCode! > 201) { if (proxyRes.statusCode! > 201) {
req.isStreaming = false; // Forces non-streaming response handler to execute req.isStreaming = false;
req.log.warn( req.log.warn(
{ statusCode: proxyRes.statusCode, key: hash }, { statusCode: proxyRes.statusCode, key: hash },
`Streaming request returned error status code. Falling back to non-streaming response handler.` `Streaming request returned error status code. Falling back to non-streaming response handler.`
@ -79,9 +81,18 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
res.end(); res.end();
return aggregator.getFinalResponse(); return aggregator.getFinalResponse();
} catch (err) { } catch (err) {
if (err instanceof RetryableError) {
req.log.info(
{ key: req.key!.hash, retryCount: req.retryCount },
`Re-enqueueing request due to retryable error during streaming response.`
);
req.retryCount++;
enqueue(req);
} else {
const errorEvent = buildFakeSse("stream-error", err.message, req); const errorEvent = buildFakeSse("stream-error", err.message, req);
res.write(`${errorEvent}data: [DONE]\n\n`); res.write(`${errorEvent}data: [DONE]\n\n`);
res.end(); res.end();
}
throw err; throw err;
} }
}; };

View File

@ -2,12 +2,16 @@ import { Transform, TransformOptions } from "stream";
// @ts-ignore // @ts-ignore
import { Parser } from "lifion-aws-event-stream"; import { Parser } from "lifion-aws-event-stream";
import { logger } from "../../../../logger"; import { logger } from "../../../../logger";
import { RetryableError } from "../index";
const log = logger.child({ module: "sse-stream-adapter" }); const log = logger.child({ module: "sse-stream-adapter" });
type SSEStreamAdapterOptions = TransformOptions & { contentType?: string }; type SSEStreamAdapterOptions = TransformOptions & { contentType?: string };
type AwsEventStreamMessage = { type AwsEventStreamMessage = {
headers: { ":message-type": "event" | "exception" }; headers: {
":message-type": "event" | "exception";
":exception-type"?: string;
};
payload: { message?: string /** base64 encoded */; bytes?: string }; payload: { message?: string /** base64 encoded */; bytes?: string };
}; };
@ -36,6 +40,19 @@ export class SSEStreamAdapter extends Transform {
protected processAwsEvent(event: AwsEventStreamMessage): string | null { protected processAwsEvent(event: AwsEventStreamMessage): string | null {
const { payload, headers } = event; const { payload, headers } = event;
if (headers[":message-type"] === "exception" || !payload.bytes) { if (headers[":message-type"] === "exception" || !payload.bytes) {
// Under high load, AWS can rugpull us by returning a 200 and starting the
// stream but then immediately sending a rate limit error as the first
// event. My guess is some race condition in their rate limiting check
// that occurs if two requests arrive at the same time when only one
// concurrency slot is available.
if (headers[":exception-type"] === "throttlingException") {
log.warn(
{ event: JSON.stringify(event) },
"AWS request throttled after streaming has already started; retrying"
);
throw new RetryableError("AWS request throttled mid-stream");
}
log.error( log.error(
{ event: JSON.stringify(event) }, { event: JSON.stringify(event) },
"Received bad streaming event from AWS" "Received bad streaming event from AWS"