uses exponential moving average for wait time calculation
This commit is contained in:
parent
4a68c14477
commit
5fabe1d1f8
|
@ -445,6 +445,13 @@ Logs are anonymous and do not contain IP addresses or timestamps. [You can see t
|
|||
if (hasGpt432k && allowedGpt432k) {
|
||||
waits.push(`**GPT-4-32k:** ${gpt432kWait}`);
|
||||
}
|
||||
|
||||
const dalleWait = getQueueInformation("dall-e").estimatedQueueTime;
|
||||
const hasDalle = keys.some((k) => k.modelFamilies.includes("dall-e"));
|
||||
const allowedDalle = config.allowedModelFamilies.includes("dall-e");
|
||||
if (hasDalle && allowedDalle) {
|
||||
waits.push(`**DALL-E:** ${dalleWait}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (config.anthropicKey) {
|
||||
|
|
|
@ -16,7 +16,8 @@ import { keyPool } from "../shared/key-management";
|
|||
import {
|
||||
getClaudeModelFamily,
|
||||
getGooglePalmModelFamily,
|
||||
getOpenAIModelFamily, MODEL_FAMILIES,
|
||||
getOpenAIModelFamily,
|
||||
MODEL_FAMILIES,
|
||||
ModelFamily,
|
||||
} from "../shared/models";
|
||||
import { buildFakeSse, initializeSseStream } from "../shared/streaming";
|
||||
|
@ -253,6 +254,11 @@ function cleanQueue() {
|
|||
}
|
||||
|
||||
export function start() {
|
||||
MODEL_FAMILIES.forEach((modelFamily) => {
|
||||
historicalEmas.set(modelFamily, 0);
|
||||
currentEmas.set(modelFamily, 0);
|
||||
estimates.set(modelFamily, 0);
|
||||
});
|
||||
processQueue();
|
||||
cleanQueue();
|
||||
log.info(`Started request queue.`);
|
||||
|
@ -275,29 +281,66 @@ export function trackWaitTime(req: Request) {
|
|||
});
|
||||
}
|
||||
|
||||
const WAIT_TIME_INTERVAL = 3000;
|
||||
const ALPHA_HISTORICAL = 0.2;
|
||||
const ALPHA_CURRENT = 0.3;
|
||||
const historicalEmas: Map<ModelFamily, number> = new Map();
|
||||
const currentEmas: Map<ModelFamily, number> = new Map();
|
||||
const estimates: Map<ModelFamily, number> = new Map();
|
||||
|
||||
export function getEstimatedWaitTime(partition: ModelFamily) {
|
||||
return estimates.get(partition) ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns average wait time for the given queue partition in milliseconds.
|
||||
* Returns estimated wait time for the given queue partition in milliseconds.
|
||||
* Requests which are deprioritized are not included in the calculation as they
|
||||
* would skew the results due to their longer wait times.
|
||||
*/
|
||||
export function getEstimatedWaitTime(partition: ModelFamily) {
|
||||
function calculateWaitTime(partition: ModelFamily) {
|
||||
const now = Date.now();
|
||||
const recentWaits = waitTimes.filter((wait) => {
|
||||
const isSamePartition = wait.partition === partition;
|
||||
const isRecent = now - wait.end < 300 * 1000;
|
||||
const isNormalPriority = !wait.isDeprioritized;
|
||||
return isSamePartition && isRecent && isNormalPriority;
|
||||
});
|
||||
if (recentWaits.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
const recentWaits = waitTimes
|
||||
.filter((wait) => {
|
||||
const isSamePartition = wait.partition === partition;
|
||||
const isRecent = now - wait.end < 300 * 1000;
|
||||
const isNormalPriority = !wait.isDeprioritized;
|
||||
return isSamePartition && isRecent && isNormalPriority;
|
||||
})
|
||||
.map((wait) => wait.end - wait.start);
|
||||
const recentAverage = recentWaits.length
|
||||
? recentWaits.reduce((sum, wait) => sum + wait, 0) / recentWaits.length
|
||||
: 0;
|
||||
|
||||
return (
|
||||
recentWaits.reduce((sum, wait) => sum + wait.end - wait.start, 0) /
|
||||
recentWaits.length
|
||||
const historicalEma = historicalEmas.get(partition) ?? 0;
|
||||
historicalEmas.set(
|
||||
partition,
|
||||
ALPHA_HISTORICAL * recentAverage + (1 - ALPHA_HISTORICAL) * historicalEma
|
||||
);
|
||||
|
||||
const currentWaits = queue
|
||||
.filter((req) => {
|
||||
const isSamePartition = getPartitionForRequest(req) === partition;
|
||||
const isNormalPriority = !isFromSharedIp(req);
|
||||
return isSamePartition && isNormalPriority;
|
||||
})
|
||||
.map((req) => now - req.startTime!);
|
||||
const longestCurrentWait = Math.max(...currentWaits, 0);
|
||||
|
||||
const currentEma = currentEmas.get(partition) ?? 0;
|
||||
currentEmas.set(
|
||||
partition,
|
||||
ALPHA_CURRENT * longestCurrentWait + (1 - ALPHA_CURRENT) * currentEma
|
||||
);
|
||||
|
||||
return (historicalEma + currentEma) / 2;
|
||||
}
|
||||
|
||||
setInterval(() => {
|
||||
MODEL_FAMILIES.forEach((modelFamily) => {
|
||||
estimates.set(modelFamily, calculateWaitTime(modelFamily));
|
||||
});
|
||||
}, WAIT_TIME_INTERVAL);
|
||||
|
||||
export function getQueueLength(partition: ModelFamily | "all" = "all") {
|
||||
if (partition === "all") {
|
||||
return queue.length;
|
||||
|
|
Loading…
Reference in New Issue