fix
This commit is contained in:
parent
7cb624c5f5
commit
364b795268
|
@ -26,6 +26,7 @@ class RedisPriorityQueue:
|
||||||
self.pubsub.subscribe('events')
|
self.pubsub.subscribe('events')
|
||||||
|
|
||||||
def put(self, item, priority, selected_model):
|
def put(self, item, priority, selected_model):
|
||||||
|
event = DataEvent()
|
||||||
# Check if the IP is already in the dictionary and if it has reached the limit
|
# Check if the IP is already in the dictionary and if it has reached the limit
|
||||||
ip_count = self.redis.hget('queued_ip_count', item[1])
|
ip_count = self.redis.hget('queued_ip_count', item[1])
|
||||||
if ip_count:
|
if ip_count:
|
||||||
|
@ -35,7 +36,6 @@ class RedisPriorityQueue:
|
||||||
print(f'Rejecting request from {item[1]} - {ip_count} requests in progress.')
|
print(f'Rejecting request from {item[1]} - {ip_count} requests in progress.')
|
||||||
return None # reject the request
|
return None # reject the request
|
||||||
|
|
||||||
event = DataEvent()
|
|
||||||
self.redis.zadd('queue', {json.dumps((item, event.event_id, selected_model)): -priority})
|
self.redis.zadd('queue', {json.dumps((item, event.event_id, selected_model)): -priority})
|
||||||
self.increment_ip_count(item[1], 'queued_ip_count')
|
self.increment_ip_count(item[1], 'queued_ip_count')
|
||||||
return event
|
return event
|
||||||
|
@ -46,13 +46,7 @@ class RedisPriorityQueue:
|
||||||
if data:
|
if data:
|
||||||
item = json.loads(data[0][0])
|
item = json.loads(data[0][0])
|
||||||
client_ip = item[0][1]
|
client_ip = item[0][1]
|
||||||
|
|
||||||
b = self.redis.hget('queued_ip_count', item[1])
|
|
||||||
self.decrement_ip_count(client_ip, 'queued_ip_count')
|
self.decrement_ip_count(client_ip, 'queued_ip_count')
|
||||||
a = self.redis.hget('queued_ip_count', item[1])
|
|
||||||
|
|
||||||
print(item[1], a, b)
|
|
||||||
|
|
||||||
return item
|
return item
|
||||||
time.sleep(0.1) # wait for something to be added to the queue
|
time.sleep(0.1) # wait for something to be added to the queue
|
||||||
|
|
||||||
|
@ -68,7 +62,12 @@ class RedisPriorityQueue:
|
||||||
return self.redis.zcard('queue')
|
return self.redis.zcard('queue')
|
||||||
|
|
||||||
def len(self, model_name):
|
def len(self, model_name):
|
||||||
self.redis.zrange('queue', 0, -1)
|
count = 0
|
||||||
|
for key in self.redis.zrange('queue', 0, -1):
|
||||||
|
item = json.loads(key)
|
||||||
|
if item[2] == model_name:
|
||||||
|
count += 1
|
||||||
|
return count
|
||||||
|
|
||||||
def get_queued_ip_count(self, client_ip: str):
|
def get_queued_ip_count(self, client_ip: str):
|
||||||
q = self.redis.hget('queued_ip_count', client_ip)
|
q = self.redis.hget('queued_ip_count', client_ip)
|
||||||
|
|
Reference in New Issue