Merge cluster to master #3

Merged
cyberes merged 163 commits from cluster into master 2023-10-27 19:19:22 -06:00
1 changed files with 9 additions and 10 deletions
Showing only changes of commit 1b21cb69c1 - Show all commits

View File

@ -26,7 +26,6 @@ class RedisPriorityQueue:
self.pubsub.subscribe('events') self.pubsub.subscribe('events')
def put(self, item, priority, selected_model): def put(self, item, priority, selected_model):
event = DataEvent()
# Check if the IP is already in the dictionary and if it has reached the limit # Check if the IP is already in the dictionary and if it has reached the limit
ip_count = self.redis.hget('queued_ip_count', item[1]) ip_count = self.redis.hget('queued_ip_count', item[1])
if ip_count: if ip_count:
@ -36,6 +35,7 @@ class RedisPriorityQueue:
print(f'Rejecting request from {item[1]} - {ip_count} requests in progress.') print(f'Rejecting request from {item[1]} - {ip_count} requests in progress.')
return None # reject the request return None # reject the request
event = DataEvent()
self.redis.zadd('queue', {json.dumps((item, event.event_id, selected_model)): -priority}) self.redis.zadd('queue', {json.dumps((item, event.event_id, selected_model)): -priority})
self.increment_ip_count(item[1], 'queued_ip_count') self.increment_ip_count(item[1], 'queued_ip_count')
return event return event
@ -46,7 +46,13 @@ class RedisPriorityQueue:
if data: if data:
item = json.loads(data[0][0]) item = json.loads(data[0][0])
client_ip = item[0][1] client_ip = item[0][1]
b = self.redis.hget('queued_ip_count', item[1])
self.decrement_ip_count(client_ip, 'queued_ip_count') self.decrement_ip_count(client_ip, 'queued_ip_count')
a = self.redis.hget('queued_ip_count', item[1])
print(item[1], a, b)
return item return item
time.sleep(0.1) # wait for something to be added to the queue time.sleep(0.1) # wait for something to be added to the queue
@ -56,20 +62,13 @@ class RedisPriorityQueue:
def decrement_ip_count(self, client_ip: str, redis_key): def decrement_ip_count(self, client_ip: str, redis_key):
new_count = self.redis.hincrby(redis_key, client_ip, -1) new_count = self.redis.hincrby(redis_key, client_ip, -1)
if new_count <= 0: if new_count <= 0:
self.redis.hdel(redis_key, client_ip) self.redis.hdel(redis_key, [client_ip])
def __len__(self): def __len__(self):
return self.redis.zcard('queue') return self.redis.zcard('queue')
def len(self, model_name): def len(self, model_name):
print(self.redis.zrange('queue', 0, -1)) self.redis.zrange('queue', 0, -1)
count = 0
for key in self.redis.zrange('queue', 0, -1):
item = json.loads(key)
if item[2] == model_name:
count += 1
return count
def get_queued_ip_count(self, client_ip: str): def get_queued_ip_count(self, client_ip: str):
q = self.redis.hget('queued_ip_count', client_ip) q = self.redis.hget('queued_ip_count', client_ip)