diff --git a/src/proxy/middleware/response/handle-streamed-response.ts b/src/proxy/middleware/response/handle-streamed-response.ts index 80214e5..77850f5 100644 --- a/src/proxy/middleware/response/handle-streamed-response.ts +++ b/src/proxy/middleware/response/handle-streamed-response.ts @@ -1,3 +1,4 @@ +import express from "express"; import { pipeline } from "stream"; import { promisify } from "util"; import { @@ -5,7 +6,8 @@ import { copySseResponseHeaders, initializeSseStream } from "../../../shared/streaming"; -import { decodeResponseBody, RawResponseBodyHandler } from "."; +import { enqueue } from "../../queue"; +import { decodeResponseBody, RawResponseBodyHandler, RetryableError } from "."; import { SSEStreamAdapter } from "./streaming/sse-stream-adapter"; import { SSEMessageTransformer } from "./streaming/sse-message-transformer"; import { EventAggregator } from "./streaming/event-aggregator"; @@ -33,7 +35,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async ( } if (proxyRes.statusCode! > 201) { - req.isStreaming = false; // Forces non-streaming response handler to execute + req.isStreaming = false; req.log.warn( { statusCode: proxyRes.statusCode, key: hash }, `Streaming request returned error status code. Falling back to non-streaming response handler.` @@ -79,9 +81,18 @@ export const handleStreamedResponse: RawResponseBodyHandler = async ( res.end(); return aggregator.getFinalResponse(); } catch (err) { - const errorEvent = buildFakeSse("stream-error", err.message, req); - res.write(`${errorEvent}data: [DONE]\n\n`); - res.end(); + if (err instanceof RetryableError) { + req.log.info( + { key: req.key!.hash, retryCount: req.retryCount }, + `Re-enqueueing request due to retryable error during streaming response.` + ); + req.retryCount++; + enqueue(req); + } else { + const errorEvent = buildFakeSse("stream-error", err.message, req); + res.write(`${errorEvent}data: [DONE]\n\n`); + res.end(); + } throw err; } }; diff --git a/src/proxy/middleware/response/streaming/sse-stream-adapter.ts b/src/proxy/middleware/response/streaming/sse-stream-adapter.ts index 7ae1996..734289f 100644 --- a/src/proxy/middleware/response/streaming/sse-stream-adapter.ts +++ b/src/proxy/middleware/response/streaming/sse-stream-adapter.ts @@ -2,12 +2,16 @@ import { Transform, TransformOptions } from "stream"; // @ts-ignore import { Parser } from "lifion-aws-event-stream"; import { logger } from "../../../../logger"; +import { RetryableError } from "../index"; const log = logger.child({ module: "sse-stream-adapter" }); type SSEStreamAdapterOptions = TransformOptions & { contentType?: string }; type AwsEventStreamMessage = { - headers: { ":message-type": "event" | "exception" }; + headers: { + ":message-type": "event" | "exception"; + ":exception-type"?: string; + }; payload: { message?: string /** base64 encoded */; bytes?: string }; }; @@ -36,6 +40,19 @@ export class SSEStreamAdapter extends Transform { protected processAwsEvent(event: AwsEventStreamMessage): string | null { const { payload, headers } = event; if (headers[":message-type"] === "exception" || !payload.bytes) { + // Under high load, AWS can rugpull us by returning a 200 and starting the + // stream but then immediately sending a rate limit error as the first + // event. My guess is some race condition in their rate limiting check + // that occurs if two requests arrive at the same time when only one + // concurrency slot is available. + if (headers[":exception-type"] === "throttlingException") { + log.warn( + { event: JSON.stringify(event) }, + "AWS request throttled after streaming has already started; retrying" + ); + throw new RetryableError("AWS request throttled mid-stream"); + } + log.error( { event: JSON.stringify(event) }, "Received bad streaming event from AWS"