2023-10-27 19:19:22 -06:00
|
|
|
import time
|
|
|
|
|
|
|
|
from redis import Redis
|
|
|
|
|
2024-05-07 09:48:51 -06:00
|
|
|
from llm_server.logging import create_logger
|
2023-10-27 19:19:22 -06:00
|
|
|
from llm_server.workers.inferencer import STREAM_NAME_PREFIX
|
|
|
|
|
|
|
|
|
|
|
|
# NOT NEEDED
|
|
|
|
|
|
|
|
def cleaner():
|
|
|
|
r = Redis(db=8)
|
|
|
|
stream_info = {}
|
2024-05-07 09:48:51 -06:00
|
|
|
logger = create_logger('cleaner')
|
2023-10-27 19:19:22 -06:00
|
|
|
|
|
|
|
while True:
|
|
|
|
all_streams = r.keys(f'{STREAM_NAME_PREFIX}:*')
|
|
|
|
processed_streams = []
|
|
|
|
for stream in all_streams:
|
|
|
|
stream = stream.decode()
|
|
|
|
current_size = r.xlen(stream)
|
|
|
|
|
|
|
|
# If the stream is new or its size has changed, update the size and time in the dictionary
|
|
|
|
if stream not in stream_info or current_size != stream_info[stream]['size']:
|
|
|
|
stream_info[stream] = {'size': current_size, 'time': time.time()}
|
|
|
|
processed_streams.append(stream)
|
|
|
|
else:
|
|
|
|
# If the size hasn't changed for 5 minutes, delete the stream
|
|
|
|
if time.time() - stream_info[stream]['time'] >= 300:
|
|
|
|
r.delete(stream)
|
2024-05-07 09:48:51 -06:00
|
|
|
logger.debug(f"Stream '{stream}' deleted due to inactivity.")
|
2023-10-27 19:19:22 -06:00
|
|
|
del stream_info[stream]
|
|
|
|
|
|
|
|
time.sleep(60)
|