import heapq import threading import time from llm_server.llm.generator import generator from llm_server.routes.stats import generation_elapsed, generation_elapsed_lock class PriorityQueue: def __init__(self): self._queue = [] self._index = 0 self._cv = threading.Condition() def put(self, item, priority): event = DataEvent() with self._cv: heapq.heappush(self._queue, (-priority, self._index, item, event)) self._index += 1 self._cv.notify() return event def get(self): with self._cv: while len(self._queue) == 0: self._cv.wait() return heapq.heappop(self._queue) def __len__(self): return len(self._queue) priority_queue = PriorityQueue() class DataEvent(threading.Event): def __init__(self): super().__init__() self.data = None def worker(): while True: priority, index, (request_json_body, client_ip, token, parameters), event = priority_queue.get() start_time = time.time() success, response, error_msg = generator(request_json_body) end_time = time.time() elapsed_time = end_time - start_time with generation_elapsed_lock: generation_elapsed.append((end_time, elapsed_time)) event.data = (success, response, error_msg) event.set() def start_workers(num_workers: int): for _ in range(num_workers): threading.Thread(target=worker).start()