fixes uncounted tokens when Response stream is prematurely closed

This commit is contained in:
nai-degen 2024-03-14 21:32:20 -05:00
parent 367ac3d075
commit 57d9791270
2 changed files with 24 additions and 6 deletions

View File

@ -86,6 +86,12 @@ export const signAwsRequest: RequestPreprocessor = async (req) => {
newRequest.headers["accept"] = "*/*";
}
const { key, body, inboundApi, outboundApi } = req;
req.log.info(
{ key, model: body.model, inboundApi, outboundApi },
"Assigned AWS credentials to request"
);
req.signedRequest = await sign(newRequest, getCredentialParts(req));
};

View File

@ -1,3 +1,4 @@
import express from "express";
import { pipeline, Readable, Transform } from "stream";
import StreamArray from "stream-json/streamers/StreamArray";
import { StringDecoder } from "string_decoder";
@ -52,10 +53,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
return decodeResponseBody(proxyRes, req, res);
}
req.log.debug(
{ headers: proxyRes.headers, key: hash },
`Starting to proxy SSE stream.`
);
req.log.debug({ headers: proxyRes.headers }, `Starting to proxy SSE stream.`);
// Typically, streaming will have already been initialized by the request
// queue to send heartbeat pings.
@ -97,8 +95,11 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
});
try {
await pipelineAsync(proxyRes, decoder, adapter, transformer);
req.log.debug({ key: hash }, `Finished proxying SSE stream.`);
await Promise.race([
handleAbortedStream(req, res),
pipelineAsync(proxyRes, decoder, adapter, transformer),
]);
req.log.debug(`Finished proxying SSE stream.`);
res.end();
return aggregator.getFinalResponse();
} catch (err) {
@ -141,6 +142,17 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
}
};
function handleAbortedStream(req: express.Request, res: express.Response) {
return new Promise<void>((resolve) =>
res.on("close", () => {
if (!res.writableEnded) {
req.log.info("Client prematurely closed connection during stream.");
}
resolve();
})
);
}
function getDecoder(options: {
input: Readable;
api: APIFormat;