import json import threading import traceback import redis as redis_redis from llm_server.llm.openai.moderation import check_moderation_endpoint redis_moderation = redis_redis.Redis() def start_moderation_workers(num_workers): i = 0 for _ in range(num_workers): t = threading.Thread(target=moderation_worker) t.daemon = True t.start() i += 1 print(f'Started {i} moderation workers.') def moderation_worker(): while True: result = redis_moderation.blpop('queue:msgs_to_check') try: msg, tag = json.loads(result[1]) _, categories = check_moderation_endpoint(msg) redis_moderation.rpush('queue:flagged_categories', json.dumps((tag, categories))) except: print(result) traceback.print_exc() continue def add_moderation_task(msg, tag): redis_moderation.rpush('queue:msgs_to_check', json.dumps((msg, str(tag)))) def get_results(tag, num_tasks): tag = str(tag) # Required for comparison with Redis results. flagged_categories = set() num_results = 0 while num_results < num_tasks: result = redis_moderation.blpop('queue:flagged_categories') result_tag, categories = json.loads(result[1]) if result_tag == tag: if categories: for item in categories: flagged_categories.add(item) num_results += 1 return list(flagged_categories)