diff --git a/.env.example b/.env.example index a3f746a..f7a2e04 100644 --- a/.env.example +++ b/.env.example @@ -11,7 +11,6 @@ # REJECT_MESSAGE="This content violates /aicg/'s acceptable use policy." # CHECK_KEYS=true # QUOTA_DISPLAY_MODE=full -# QUEUE_MODE=fair # BLOCKED_ORIGINS=reddit.com,9gag.com # BLOCK_MESSAGE="You must be over the age of majority in your country to use this service." # BLOCK_REDIRECT="https://roblox.com/" diff --git a/src/config.ts b/src/config.ts index 99a2886..7eb65db 100644 --- a/src/config.ts +++ b/src/config.ts @@ -9,7 +9,6 @@ const startupLogger = pino({ level: "debug" }).child({ module: "startup" }); const isDev = process.env.NODE_ENV !== "production"; type PromptLoggingBackend = "google_sheets"; -export type DequeueMode = "fair" | "random" | "none"; type Config = { /** The port the proxy server will listen on. */ @@ -107,16 +106,6 @@ type Config = { * `full`: Displays information about keys' quota limits */ quotaDisplayMode: "none" | "full"; - /** - * Which request queueing strategy to use when keys are over their rate limit. - * - * `fair`: Requests are serviced in the order they were received (default) - * - * `random`: Requests are serviced randomly - * - * `none`: Requests are not queued and users have to retry manually - */ - queueMode: DequeueMode; /** * Comma-separated list of origins to block. Requests matching any of these * origins or referers will be rejected. @@ -179,7 +168,6 @@ export const config: Config = { "GOOGLE_SHEETS_SPREADSHEET_ID", undefined ), - queueMode: getEnvWithDefault("QUEUE_MODE", "fair"), blockedOrigins: getEnvWithDefault("BLOCKED_ORIGINS", undefined), blockMessage: getEnvWithDefault( "BLOCK_MESSAGE", diff --git a/src/info-page.ts b/src/info-page.ts index 7e4563f..2514ab5 100644 --- a/src/info-page.ts +++ b/src/info-page.ts @@ -148,17 +148,15 @@ function getOpenAIInfo() { }; } - if (config.queueMode !== "none") { - const turboQueue = getQueueInformation("turbo"); + const turboQueue = getQueueInformation("turbo"); - info.turbo.proomptersInQueue = turboQueue.proomptersInQueue; - info.turbo.estimatedQueueTime = turboQueue.estimatedQueueTime; + info.turbo.proomptersInQueue = turboQueue.proomptersInQueue; + info.turbo.estimatedQueueTime = turboQueue.estimatedQueueTime; - if (hasGpt4) { - const gpt4Queue = getQueueInformation("gpt-4"); - info.gpt4.proomptersInQueue = gpt4Queue.proomptersInQueue; - info.gpt4.estimatedQueueTime = gpt4Queue.estimatedQueueTime; - } + if (hasGpt4) { + const gpt4Queue = getQueueInformation("gpt-4"); + info.gpt4.proomptersInQueue = gpt4Queue.proomptersInQueue; + info.gpt4.estimatedQueueTime = gpt4Queue.estimatedQueueTime; } return info; @@ -168,11 +166,9 @@ function getAnthropicInfo() { const claudeInfo: Partial = {}; const keys = keyPool.list().filter((k) => k.service === "anthropic"); claudeInfo.activeKeys = keys.filter((k) => !k.isDisabled).length; - if (config.queueMode !== "none") { - const queue = getQueueInformation("claude"); - claudeInfo.proomptersInQueue = queue.proomptersInQueue; - claudeInfo.estimatedQueueTime = queue.estimatedQueueTime; - } + const queue = getQueueInformation("claude"); + claudeInfo.proomptersInQueue = queue.proomptersInQueue; + claudeInfo.estimatedQueueTime = queue.estimatedQueueTime; return { claude: claudeInfo }; } @@ -198,26 +194,24 @@ Logs are anonymous and do not contain IP addresses or timestamps. [You can see t **If you are uncomfortable with this, don't send prompts to this proxy!**`; } - if (config.queueMode !== "none") { - const waits: string[] = []; - infoBody += `\n## Estimated Wait Times\nIf the AI is busy, your prompt will processed when a slot frees up.`; + const waits: string[] = []; + infoBody += `\n## Estimated Wait Times\nIf the AI is busy, your prompt will processed when a slot frees up.`; - if (config.openaiKey) { - const turboWait = getQueueInformation("turbo").estimatedQueueTime; - const gpt4Wait = getQueueInformation("gpt-4").estimatedQueueTime; - waits.push(`**Turbo:** ${turboWait}`); - if (keyPool.list().some((k) => k.isGpt4) && !config.turboOnly) { - waits.push(`**GPT-4:** ${gpt4Wait}`); - } + if (config.openaiKey) { + const turboWait = getQueueInformation("turbo").estimatedQueueTime; + const gpt4Wait = getQueueInformation("gpt-4").estimatedQueueTime; + waits.push(`**Turbo:** ${turboWait}`); + if (keyPool.list().some((k) => k.isGpt4) && !config.turboOnly) { + waits.push(`**GPT-4:** ${gpt4Wait}`); } - - if (config.anthropicKey) { - const claudeWait = getQueueInformation("claude").estimatedQueueTime; - waits.push(`**Claude:** ${claudeWait}`); - } - infoBody += "\n\n" + waits.join(" / "); } + if (config.anthropicKey) { + const claudeWait = getQueueInformation("claude").estimatedQueueTime; + waits.push(`**Claude:** ${claudeWait}`); + } + infoBody += "\n\n" + waits.join(" / "); + if (customGreeting) { infoBody += `\n## Server Greeting\n ${customGreeting}`; @@ -227,9 +221,6 @@ ${customGreeting}`; /** Returns queue time in seconds, or minutes + seconds if over 60 seconds. */ function getQueueInformation(partition: QueuePartition) { - if (config.queueMode === "none") { - return {}; - } const waitMs = getEstimatedWaitTime(partition); const waitTime = waitMs < 60000 diff --git a/src/proxy/kobold.ts b/src/proxy/kobold.ts index 3fcdf46..c8708a1 100644 --- a/src/proxy/kobold.ts +++ b/src/proxy/kobold.ts @@ -33,12 +33,6 @@ const rewriteRequest = ( req: Request, res: Response ) => { - if (config.queueMode !== "none") { - const msg = `Queueing is enabled on this proxy instance and is incompatible with the KoboldAI endpoint. Use the OpenAI endpoint instead.`; - proxyReq.destroy(new Error(msg)); - return; - } - req.body.stream = false; const rewriterPipeline = [ addKey, diff --git a/src/proxy/middleware/response/index.ts b/src/proxy/middleware/response/index.ts index 6c9ca7f..7ba3c6d 100644 --- a/src/proxy/middleware/response/index.ts +++ b/src/proxy/middleware/response/index.ts @@ -341,11 +341,8 @@ function maybeHandleMissingPreambleError( "Request failed due to missing preamble. Key will be marked as such for subsequent requests." ); keyPool.update(req.key!, { requiresPreamble: true }); - if (config.queueMode !== "none") { - reenqueueRequest(req); - throw new RetryableError("Claude request re-enqueued to add preamble."); - } - errorPayload.proxy_note = `This Claude key requires special prompt formatting. Try again; the proxy will reformat your prompt next time.`; + reenqueueRequest(req); + throw new RetryableError("Claude request re-enqueued to add preamble."); } else { errorPayload.proxy_note = `Proxy received unrecognized error from Anthropic. Check the specific error for more information.`; } @@ -357,11 +354,8 @@ function handleAnthropicRateLimitError( ) { if (errorPayload.error?.type === "rate_limit_error") { keyPool.markRateLimited(req.key!); - if (config.queueMode !== "none") { - reenqueueRequest(req); - throw new RetryableError("Claude rate-limited request re-enqueued."); - } - errorPayload.proxy_note = `There are too many in-flight requests for this key. Try again later.`; + reenqueueRequest(req); + throw new RetryableError("Claude rate-limited request re-enqueued."); } else { errorPayload.proxy_note = `Unrecognized rate limit error from Anthropic. Key may be over quota.`; } @@ -388,13 +382,11 @@ function handleOpenAIRateLimitError( } else if (type === "requests" || type === "tokens") { // Per-minute request or token rate limit is exceeded, which we can retry keyPool.markRateLimited(req.key!); - if (config.queueMode !== "none") { - reenqueueRequest(req); - // This is confusing, but it will bubble up to the top-level response - // handler and cause the request to go back into the request queue. - throw new RetryableError("Rate-limited request re-enqueued."); - } - errorPayload.proxy_note = `Assigned key's '${type}' rate limit has been exceeded. Try again later.`; + // I'm aware this is confusing -- throwing this class of error will cause + // the proxy response handler to return without terminating the request, + // so that it can be placed back in the queue. + reenqueueRequest(req); + throw new RetryableError("Rate-limited request re-enqueued."); } else { // OpenAI probably overloaded errorPayload.proxy_note = `This is likely a temporary error with OpenAI. Try again in a few seconds.`; diff --git a/src/proxy/queue.ts b/src/proxy/queue.ts index 4b3eded..0ac3c35 100644 --- a/src/proxy/queue.ts +++ b/src/proxy/queue.ts @@ -16,7 +16,6 @@ */ import type { Handler, Request } from "express"; -import { config, DequeueMode } from "../config"; import { keyPool, SupportedModel } from "../key-management"; import { logger } from "../logger"; import { AGNAI_DOT_CHAT_IP } from "./rate-limit"; @@ -27,8 +26,6 @@ export type QueuePartition = "claude" | "turbo" | "gpt-4"; const queue: Request[] = []; const log = logger.child({ module: "request-queue" }); -let dequeueMode: DequeueMode = "fair"; - /** Maximum number of queue slots for Agnai.chat requests. */ const AGNAI_CONCURRENCY_LIMIT = 15; /** Maximum number of queue slots for individual users. */ @@ -160,18 +157,9 @@ export function dequeue(partition: QueuePartition): Request | undefined { return undefined; } - let req: Request; - - if (dequeueMode === "fair") { - // Dequeue the request that has been waiting the longest - req = modelQueue.reduce((prev, curr) => - prev.startTime < curr.startTime ? prev : curr - ); - } else { - // Dequeue a random request - const index = Math.floor(Math.random() * modelQueue.length); - req = modelQueue[index]; - } + const req = modelQueue.reduce((prev, curr) => + prev.startTime < curr.startTime ? prev : curr + ); queue.splice(queue.indexOf(req), 1); if (req.onAborted) { @@ -293,10 +281,6 @@ export function getQueueLength(partition: QueuePartition | "all" = "all") { export function createQueueMiddleware(proxyMiddleware: Handler): Handler { return (req, res, next) => { - if (config.queueMode === "none") { - return proxyMiddleware(req, res, next); - } - req.proceed = () => { proxyMiddleware(req, res, next); }; diff --git a/src/server.ts b/src/server.ts index 5ac0af4..8b1d37c 100644 --- a/src/server.ts +++ b/src/server.ts @@ -102,10 +102,8 @@ async function start() { logQueue.start(); } - if (config.queueMode !== "none") { - logger.info("Starting request queue..."); - startRequestQueue(); - } + logger.info("Starting request queue..."); + startRequestQueue(); app.listen(PORT, async () => { logger.info({ port: PORT }, "Now listening for connections.");