diff --git a/llm_server/routes/queue.py b/llm_server/routes/queue.py index 8b8b85f..b9be18d 100644 --- a/llm_server/routes/queue.py +++ b/llm_server/routes/queue.py @@ -26,7 +26,6 @@ class RedisPriorityQueue: self.pubsub.subscribe('events') def put(self, item, priority, selected_model): - event = DataEvent() # Check if the IP is already in the dictionary and if it has reached the limit ip_count = self.redis.hget('queued_ip_count', item[1]) if ip_count: @@ -36,6 +35,7 @@ class RedisPriorityQueue: print(f'Rejecting request from {item[1]} - {ip_count} requests in progress.') return None # reject the request + event = DataEvent() self.redis.zadd('queue', {json.dumps((item, event.event_id, selected_model)): -priority}) self.increment_ip_count(item[1], 'queued_ip_count') return event @@ -46,7 +46,13 @@ class RedisPriorityQueue: if data: item = json.loads(data[0][0]) client_ip = item[0][1] + + b = self.redis.hget('queued_ip_count', item[1]) self.decrement_ip_count(client_ip, 'queued_ip_count') + a = self.redis.hget('queued_ip_count', item[1]) + + print(item[1], a, b) + return item time.sleep(0.1) # wait for something to be added to the queue @@ -56,20 +62,13 @@ class RedisPriorityQueue: def decrement_ip_count(self, client_ip: str, redis_key): new_count = self.redis.hincrby(redis_key, client_ip, -1) if new_count <= 0: - self.redis.hdel(redis_key, client_ip) + self.redis.hdel(redis_key, [client_ip]) def __len__(self): return self.redis.zcard('queue') def len(self, model_name): - print(self.redis.zrange('queue', 0, -1)) - - count = 0 - for key in self.redis.zrange('queue', 0, -1): - item = json.loads(key) - if item[2] == model_name: - count += 1 - return count + self.redis.zrange('queue', 0, -1) def get_queued_ip_count(self, client_ip: str): q = self.redis.hget('queued_ip_count', client_ip)