2023-08-23 20:12:38 -06:00
|
|
|
import heapq
|
|
|
|
import threading
|
2023-08-23 21:33:52 -06:00
|
|
|
import time
|
2023-08-23 20:12:38 -06:00
|
|
|
|
|
|
|
from llm_server.llm.generator import generator
|
2023-08-23 22:01:06 -06:00
|
|
|
from llm_server.routes.cache import redis
|
2023-08-23 21:33:52 -06:00
|
|
|
from llm_server.routes.stats import generation_elapsed, generation_elapsed_lock
|
2023-08-23 20:12:38 -06:00
|
|
|
|
|
|
|
|
|
|
|
class PriorityQueue:
|
|
|
|
def __init__(self):
|
|
|
|
self._queue = []
|
|
|
|
self._index = 0
|
|
|
|
self._cv = threading.Condition()
|
|
|
|
|
|
|
|
def put(self, item, priority):
|
|
|
|
event = DataEvent()
|
|
|
|
with self._cv:
|
|
|
|
heapq.heappush(self._queue, (-priority, self._index, item, event))
|
|
|
|
self._index += 1
|
|
|
|
self._cv.notify()
|
|
|
|
return event
|
|
|
|
|
|
|
|
def get(self):
|
|
|
|
with self._cv:
|
|
|
|
while len(self._queue) == 0:
|
|
|
|
self._cv.wait()
|
2023-08-23 20:33:49 -06:00
|
|
|
return heapq.heappop(self._queue)
|
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
return len(self._queue)
|
2023-08-23 20:12:38 -06:00
|
|
|
|
|
|
|
|
|
|
|
priority_queue = PriorityQueue()
|
|
|
|
|
|
|
|
|
|
|
|
class DataEvent(threading.Event):
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__()
|
|
|
|
self.data = None
|
|
|
|
|
|
|
|
|
|
|
|
def worker():
|
2023-08-23 22:01:06 -06:00
|
|
|
global active_gen_workers
|
2023-08-23 20:12:38 -06:00
|
|
|
while True:
|
2023-08-23 20:33:49 -06:00
|
|
|
priority, index, (request_json_body, client_ip, token, parameters), event = priority_queue.get()
|
2023-08-23 22:01:06 -06:00
|
|
|
|
|
|
|
redis.incr('active_gen_workers')
|
|
|
|
|
2023-08-23 21:33:52 -06:00
|
|
|
start_time = time.time()
|
2023-08-23 20:12:38 -06:00
|
|
|
success, response, error_msg = generator(request_json_body)
|
2023-08-23 21:33:52 -06:00
|
|
|
|
|
|
|
end_time = time.time()
|
|
|
|
elapsed_time = end_time - start_time
|
|
|
|
with generation_elapsed_lock:
|
|
|
|
generation_elapsed.append((end_time, elapsed_time))
|
|
|
|
|
2023-08-23 20:12:38 -06:00
|
|
|
event.data = (success, response, error_msg)
|
|
|
|
event.set()
|
|
|
|
|
2023-08-23 22:01:06 -06:00
|
|
|
redis.decr('active_gen_workers')
|
|
|
|
|
2023-08-23 20:12:38 -06:00
|
|
|
|
|
|
|
def start_workers(num_workers: int):
|
2023-08-23 20:33:49 -06:00
|
|
|
for _ in range(num_workers):
|
2023-08-23 20:12:38 -06:00
|
|
|
threading.Thread(target=worker).start()
|