fixes azure dalle using wrong rate limit and out-of-spec Retry-After header

This commit is contained in:
nai-degen 2024-08-27 23:43:11 -05:00
parent 51a9ccceb2
commit 0c448cb59d
2 changed files with 28 additions and 80 deletions

View File

@ -22,7 +22,7 @@ import {
} from "../shared/models";
import { initializeSseStream } from "../shared/streaming";
import { logger } from "../logger";
import { getUniqueIps, SHARED_IP_ADDRESSES } from "./rate-limit";
import { getUniqueIps } from "./rate-limit";
import { RequestPreprocessor } from "./middleware/request";
import { handleProxyError } from "./middleware/common";
import { sendErrorToClient } from "./middleware/response/error-generator";
@ -31,7 +31,9 @@ const queue: Request[] = [];
const log = logger.child({ module: "request-queue" });
/** Maximum number of queue slots for individual users. */
const USER_CONCURRENCY_LIMIT = parseInt(process.env.USER_CONCURRENCY_LIMIT ?? "1");
const USER_CONCURRENCY_LIMIT = parseInt(
process.env.USER_CONCURRENCY_LIMIT ?? "1"
);
/** Maximum number of queue slots for Agnai.chat requests. */
const AGNAI_CONCURRENCY_LIMIT = USER_CONCURRENCY_LIMIT * 5;
const MIN_HEARTBEAT_SIZE = parseInt(process.env.MIN_HEARTBEAT_SIZE_B ?? "512");
@ -58,39 +60,20 @@ const QUEUE_JOIN_TIMEOUT = 5000;
function getIdentifier(req: Request) {
if (req.user) return req.user.token;
if (req.risuToken) return req.risuToken;
if (isFromSharedIp(req)) return "shared-ip";
// if (isFromSharedIp(req)) return "shared-ip";
return req.ip;
}
const sharesIdentifierWith = (incoming: Request) => (queued: Request) =>
getIdentifier(queued) === getIdentifier(incoming);
const isFromSharedIp = (req: Request) => SHARED_IP_ADDRESSES.has(req.ip);
async function enqueue(req: Request) {
const enqueuedRequestCount = queue.filter(sharesIdentifierWith(req)).length;
let isGuest = req.user?.token === undefined;
// Requests from shared IP addresses such as Agnai.chat are exempt from IP-
// based rate limiting but can only occupy a certain number of slots in the
// queue. Authenticated users always get a single spot in the queue.
const isSharedIp = isFromSharedIp(req);
const maxConcurrentQueuedRequests =
isGuest && isSharedIp ? AGNAI_CONCURRENCY_LIMIT : USER_CONCURRENCY_LIMIT;
if (enqueuedRequestCount >= maxConcurrentQueuedRequests) {
if (isSharedIp) {
// Re-enqueued requests are not counted towards the limit since they
// already made it through the queue once.
if (req.retryCount === 0) {
throw new TooManyRequestsError(
"Too many agnai.chat requests are already queued"
);
}
} else {
throw new TooManyRequestsError(
"Your IP or user token already has another request in the queue."
);
}
if (enqueuedRequestCount >= USER_CONCURRENCY_LIMIT) {
throw new TooManyRequestsError(
"Your IP or user token already has another request in the queue."
);
}
// shitty hack to remove hpm's event listeners on retried requests
@ -146,19 +129,7 @@ export async function reenqueueRequest(req: Request) {
}
function getQueueForPartition(partition: ModelFamily): Request[] {
return queue
.filter((req) => getModelFamilyForRequest(req) === partition)
.sort((a, b) => {
// Certain requests are exempted from IP-based rate limiting because they
// come from a shared IP address. To prevent these requests from starving
// out other requests during periods of high traffic, we sort them to the
// end of the queue.
const aIsExempted = isFromSharedIp(a);
const bIsExempted = isFromSharedIp(b);
if (aIsExempted && !bIsExempted) return 1;
if (!aIsExempted && bIsExempted) return -1;
return 0;
});
return queue.filter((req) => getModelFamilyForRequest(req) === partition);
}
export function dequeue(partition: ModelFamily): Request | undefined {
@ -261,7 +232,6 @@ let waitTimes: {
partition: ModelFamily;
start: number;
end: number;
isDeprioritized: boolean;
}[] = [];
/** Adds a successful request to the list of wait times. */
@ -270,7 +240,6 @@ export function trackWaitTime(req: Request) {
partition: getModelFamilyForRequest(req),
start: req.startTime!,
end: req.queueOutTime ?? Date.now(),
isDeprioritized: isFromSharedIp(req),
});
}
@ -296,8 +265,7 @@ function calculateWaitTime(partition: ModelFamily) {
.filter((wait) => {
const isSamePartition = wait.partition === partition;
const isRecent = now - wait.end < 300 * 1000;
const isNormalPriority = !wait.isDeprioritized;
return isSamePartition && isRecent && isNormalPriority;
return isSamePartition && isRecent;
})
.map((wait) => wait.end - wait.start);
const recentAverage = recentWaits.length
@ -311,11 +279,7 @@ function calculateWaitTime(partition: ModelFamily) {
);
const currentWaits = queue
.filter((req) => {
const isSamePartition = getModelFamilyForRequest(req) === partition;
const isNormalPriority = !isFromSharedIp(req);
return isSamePartition && isNormalPriority;
})
.filter((req) => getModelFamilyForRequest(req) === partition)
.map((req) => now - req.startTime!);
const longestCurrentWait = Math.max(...currentWaits, 0);

View File

@ -1,14 +1,6 @@
import { Request, Response, NextFunction } from "express";
import { config } from "../config";
export const SHARED_IP_ADDRESSES = new Set([
// Agnai.chat
"157.230.249.32", // old
"157.245.148.56",
"174.138.29.50",
"209.97.162.44",
]);
const ONE_MINUTE_MS = 60 * 1000;
type Timestamp = number;
@ -20,7 +12,10 @@ const exemptedRequests: Timestamp[] = [];
const isRecentAttempt = (now: Timestamp) => (attempt: Timestamp) =>
attempt > now - ONE_MINUTE_MS;
const getTryAgainInMs = (ip: string, type: "text" | "image") => {
/**
* Returns duration in seconds to wait before retrying for Retry-After header.
*/
const getRetryAfter = (ip: string, type: "text" | "image") => {
const now = Date.now();
const attempts = lastAttempts.get(ip) || [];
const validAttempts = attempts.filter(isRecentAttempt(now));
@ -29,7 +24,7 @@ const getTryAgainInMs = (ip: string, type: "text" | "image") => {
type === "text" ? config.textModelRateLimit : config.imageModelRateLimit;
if (validAttempts.length >= limit) {
return validAttempts[0] - now + ONE_MINUTE_MS;
return (validAttempts[0] - now + ONE_MINUTE_MS) / 1000;
} else {
lastAttempts.set(ip, [...validAttempts, now]);
return 0;
@ -96,22 +91,11 @@ export const ipLimiter = async (
if (!textLimit && !imageLimit) return next();
if (req.user?.type === "special") return next();
// Exempts Agnai.chat from IP-based rate limiting because its IPs are shared
// by many users. Instead, the request queue will limit the number of such
// requests that may wait in the queue at a time, and sorts them to the end to
// let individual users go first.
if (SHARED_IP_ADDRESSES.has(req.ip)) {
exemptedRequests.push(Date.now());
req.log.info(
{ ip: req.ip, recentExemptions: exemptedRequests.length },
"Exempting Agnai request from rate limiting."
);
return next();
}
const type = (req.baseUrl + req.path).includes("openai-image")
? "image"
: "text";
const path = req.baseUrl + req.path;
const type =
path.includes("openai-image") || path.includes("images/generations")
? "image"
: "text";
const limit = type === "image" ? imageLimit : textLimit;
// If user is authenticated, key rate limiting by their token. Otherwise, key
@ -123,15 +107,15 @@ export const ipLimiter = async (
res.set("X-RateLimit-Remaining", remaining.toString());
res.set("X-RateLimit-Reset", reset.toString());
const tryAgainInMs = getTryAgainInMs(rateLimitKey, type);
if (tryAgainInMs > 0) {
res.set("Retry-After", tryAgainInMs.toString());
const retryAfterTime = getRetryAfter(rateLimitKey, type);
req.log.debug({ retryAfterTime }, "Retry-After header");
if (retryAfterTime > 0) {
const waitSec = Math.ceil(retryAfterTime).toString();
res.set("Retry-After", waitSec);
res.status(429).json({
error: {
type: "proxy_rate_limited",
message: `This model type is rate limited to ${limit} prompts per minute. Please try again in ${Math.ceil(
tryAgainInMs / 1000
)} seconds.`,
message: `This model type is rate limited to ${limit} prompts per minute. Please try again in ${waitSec} seconds.`,
},
});
} else {