diff --git a/src/proxy/queue.ts b/src/proxy/queue.ts index 3a198f7..8af2fd2 100644 --- a/src/proxy/queue.ts +++ b/src/proxy/queue.ts @@ -22,7 +22,7 @@ import { } from "../shared/models"; import { initializeSseStream } from "../shared/streaming"; import { logger } from "../logger"; -import { getUniqueIps, SHARED_IP_ADDRESSES } from "./rate-limit"; +import { getUniqueIps } from "./rate-limit"; import { RequestPreprocessor } from "./middleware/request"; import { handleProxyError } from "./middleware/common"; import { sendErrorToClient } from "./middleware/response/error-generator"; @@ -31,7 +31,9 @@ const queue: Request[] = []; const log = logger.child({ module: "request-queue" }); /** Maximum number of queue slots for individual users. */ -const USER_CONCURRENCY_LIMIT = parseInt(process.env.USER_CONCURRENCY_LIMIT ?? "1"); +const USER_CONCURRENCY_LIMIT = parseInt( + process.env.USER_CONCURRENCY_LIMIT ?? "1" +); /** Maximum number of queue slots for Agnai.chat requests. */ const AGNAI_CONCURRENCY_LIMIT = USER_CONCURRENCY_LIMIT * 5; const MIN_HEARTBEAT_SIZE = parseInt(process.env.MIN_HEARTBEAT_SIZE_B ?? "512"); @@ -58,39 +60,20 @@ const QUEUE_JOIN_TIMEOUT = 5000; function getIdentifier(req: Request) { if (req.user) return req.user.token; if (req.risuToken) return req.risuToken; - if (isFromSharedIp(req)) return "shared-ip"; + // if (isFromSharedIp(req)) return "shared-ip"; return req.ip; } const sharesIdentifierWith = (incoming: Request) => (queued: Request) => getIdentifier(queued) === getIdentifier(incoming); -const isFromSharedIp = (req: Request) => SHARED_IP_ADDRESSES.has(req.ip); - async function enqueue(req: Request) { const enqueuedRequestCount = queue.filter(sharesIdentifierWith(req)).length; - let isGuest = req.user?.token === undefined; - // Requests from shared IP addresses such as Agnai.chat are exempt from IP- - // based rate limiting but can only occupy a certain number of slots in the - // queue. Authenticated users always get a single spot in the queue. - const isSharedIp = isFromSharedIp(req); - const maxConcurrentQueuedRequests = - isGuest && isSharedIp ? AGNAI_CONCURRENCY_LIMIT : USER_CONCURRENCY_LIMIT; - if (enqueuedRequestCount >= maxConcurrentQueuedRequests) { - if (isSharedIp) { - // Re-enqueued requests are not counted towards the limit since they - // already made it through the queue once. - if (req.retryCount === 0) { - throw new TooManyRequestsError( - "Too many agnai.chat requests are already queued" - ); - } - } else { - throw new TooManyRequestsError( - "Your IP or user token already has another request in the queue." - ); - } + if (enqueuedRequestCount >= USER_CONCURRENCY_LIMIT) { + throw new TooManyRequestsError( + "Your IP or user token already has another request in the queue." + ); } // shitty hack to remove hpm's event listeners on retried requests @@ -146,19 +129,7 @@ export async function reenqueueRequest(req: Request) { } function getQueueForPartition(partition: ModelFamily): Request[] { - return queue - .filter((req) => getModelFamilyForRequest(req) === partition) - .sort((a, b) => { - // Certain requests are exempted from IP-based rate limiting because they - // come from a shared IP address. To prevent these requests from starving - // out other requests during periods of high traffic, we sort them to the - // end of the queue. - const aIsExempted = isFromSharedIp(a); - const bIsExempted = isFromSharedIp(b); - if (aIsExempted && !bIsExempted) return 1; - if (!aIsExempted && bIsExempted) return -1; - return 0; - }); + return queue.filter((req) => getModelFamilyForRequest(req) === partition); } export function dequeue(partition: ModelFamily): Request | undefined { @@ -261,7 +232,6 @@ let waitTimes: { partition: ModelFamily; start: number; end: number; - isDeprioritized: boolean; }[] = []; /** Adds a successful request to the list of wait times. */ @@ -270,7 +240,6 @@ export function trackWaitTime(req: Request) { partition: getModelFamilyForRequest(req), start: req.startTime!, end: req.queueOutTime ?? Date.now(), - isDeprioritized: isFromSharedIp(req), }); } @@ -296,8 +265,7 @@ function calculateWaitTime(partition: ModelFamily) { .filter((wait) => { const isSamePartition = wait.partition === partition; const isRecent = now - wait.end < 300 * 1000; - const isNormalPriority = !wait.isDeprioritized; - return isSamePartition && isRecent && isNormalPriority; + return isSamePartition && isRecent; }) .map((wait) => wait.end - wait.start); const recentAverage = recentWaits.length @@ -311,11 +279,7 @@ function calculateWaitTime(partition: ModelFamily) { ); const currentWaits = queue - .filter((req) => { - const isSamePartition = getModelFamilyForRequest(req) === partition; - const isNormalPriority = !isFromSharedIp(req); - return isSamePartition && isNormalPriority; - }) + .filter((req) => getModelFamilyForRequest(req) === partition) .map((req) => now - req.startTime!); const longestCurrentWait = Math.max(...currentWaits, 0); diff --git a/src/proxy/rate-limit.ts b/src/proxy/rate-limit.ts index b70e249..5d025b9 100644 --- a/src/proxy/rate-limit.ts +++ b/src/proxy/rate-limit.ts @@ -1,14 +1,6 @@ import { Request, Response, NextFunction } from "express"; import { config } from "../config"; -export const SHARED_IP_ADDRESSES = new Set([ - // Agnai.chat - "157.230.249.32", // old - "157.245.148.56", - "174.138.29.50", - "209.97.162.44", -]); - const ONE_MINUTE_MS = 60 * 1000; type Timestamp = number; @@ -20,7 +12,10 @@ const exemptedRequests: Timestamp[] = []; const isRecentAttempt = (now: Timestamp) => (attempt: Timestamp) => attempt > now - ONE_MINUTE_MS; -const getTryAgainInMs = (ip: string, type: "text" | "image") => { +/** + * Returns duration in seconds to wait before retrying for Retry-After header. + */ +const getRetryAfter = (ip: string, type: "text" | "image") => { const now = Date.now(); const attempts = lastAttempts.get(ip) || []; const validAttempts = attempts.filter(isRecentAttempt(now)); @@ -29,7 +24,7 @@ const getTryAgainInMs = (ip: string, type: "text" | "image") => { type === "text" ? config.textModelRateLimit : config.imageModelRateLimit; if (validAttempts.length >= limit) { - return validAttempts[0] - now + ONE_MINUTE_MS; + return (validAttempts[0] - now + ONE_MINUTE_MS) / 1000; } else { lastAttempts.set(ip, [...validAttempts, now]); return 0; @@ -96,22 +91,11 @@ export const ipLimiter = async ( if (!textLimit && !imageLimit) return next(); if (req.user?.type === "special") return next(); - // Exempts Agnai.chat from IP-based rate limiting because its IPs are shared - // by many users. Instead, the request queue will limit the number of such - // requests that may wait in the queue at a time, and sorts them to the end to - // let individual users go first. - if (SHARED_IP_ADDRESSES.has(req.ip)) { - exemptedRequests.push(Date.now()); - req.log.info( - { ip: req.ip, recentExemptions: exemptedRequests.length }, - "Exempting Agnai request from rate limiting." - ); - return next(); - } - - const type = (req.baseUrl + req.path).includes("openai-image") - ? "image" - : "text"; + const path = req.baseUrl + req.path; + const type = + path.includes("openai-image") || path.includes("images/generations") + ? "image" + : "text"; const limit = type === "image" ? imageLimit : textLimit; // If user is authenticated, key rate limiting by their token. Otherwise, key @@ -123,15 +107,15 @@ export const ipLimiter = async ( res.set("X-RateLimit-Remaining", remaining.toString()); res.set("X-RateLimit-Reset", reset.toString()); - const tryAgainInMs = getTryAgainInMs(rateLimitKey, type); - if (tryAgainInMs > 0) { - res.set("Retry-After", tryAgainInMs.toString()); + const retryAfterTime = getRetryAfter(rateLimitKey, type); + req.log.debug({ retryAfterTime }, "Retry-After header"); + if (retryAfterTime > 0) { + const waitSec = Math.ceil(retryAfterTime).toString(); + res.set("Retry-After", waitSec); res.status(429).json({ error: { type: "proxy_rate_limited", - message: `This model type is rate limited to ${limit} prompts per minute. Please try again in ${Math.ceil( - tryAgainInMs / 1000 - )} seconds.`, + message: `This model type is rate limited to ${limit} prompts per minute. Please try again in ${waitSec} seconds.`, }, }); } else {