From e0624e30fd779c83fd4e1d6f8469b19dc469241a Mon Sep 17 00:00:00 2001 From: valadaptive <54326-valadaptive@users.noreply.gitgud.io> Date: Sat, 9 Dec 2023 06:18:01 +0000 Subject: [PATCH] Fix some corner cases in SSE parsing (khanon/oai-reverse-proxy!56) --- .../middleware/response/streaming/sse-stream-adapter.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/proxy/middleware/response/streaming/sse-stream-adapter.ts b/src/proxy/middleware/response/streaming/sse-stream-adapter.ts index f3f7654..eb29645 100644 --- a/src/proxy/middleware/response/streaming/sse-stream-adapter.ts +++ b/src/proxy/middleware/response/streaming/sse-stream-adapter.ts @@ -1,4 +1,5 @@ import { Transform, TransformOptions } from "stream"; +import { StringDecoder } from "string_decoder"; // @ts-ignore import { Parser } from "lifion-aws-event-stream"; import { logger } from "../../../../logger"; @@ -23,6 +24,7 @@ export class SSEStreamAdapter extends Transform { private readonly isAwsStream; private parser = new Parser(); private partialMessage = ""; + private decoder = new StringDecoder("utf8"); constructor(options?: SSEStreamAdapterOptions) { super(options); @@ -79,16 +81,16 @@ export class SSEStreamAdapter extends Transform { // We may receive multiple (or partial) SSE messages in a single chunk, // so we need to buffer and emit separate stream events for full // messages so we can parse/transform them properly. - const str = chunk.toString("utf8"); + const str = this.decoder.write(chunk); - const fullMessages = (this.partialMessage + str).split(/\r?\n\r?\n/); + const fullMessages = (this.partialMessage + str).split(/\r\r|\n\n|\r\n\r\n/); this.partialMessage = fullMessages.pop() || ""; for (const message of fullMessages) { // Mixing line endings will break some clients and our request queue // will have already sent \n for heartbeats, so we need to normalize // to \n. - this.push(message.replace(/\r\n/g, "\n") + "\n\n"); + this.push(message.replace(/\r\n?/g, "\n") + "\n\n"); } } callback();