local-llm-server/llm_server/workers/moderator.py

52 lines
1.4 KiB
Python

import json
import threading
import traceback
import redis as redis_redis
from llm_server.llm.openai.moderation import check_moderation_endpoint
redis_moderation = redis_redis.Redis()
def start_moderation_workers(num_workers):
i = 0
for _ in range(num_workers):
t = threading.Thread(target=moderation_worker)
t.daemon = True
t.start()
i += 1
print(f'Started {i} moderation workers.')
def moderation_worker():
while True:
result = redis_moderation.blpop('queue:msgs_to_check')
try:
msg, tag = json.loads(result[1])
_, categories = check_moderation_endpoint(msg)
redis_moderation.rpush('queue:flagged_categories', json.dumps((tag, categories)))
except:
print(result)
traceback.print_exc()
continue
def add_moderation_task(msg, tag):
redis_moderation.rpush('queue:msgs_to_check', json.dumps((msg, str(tag))))
def get_results(tag, num_tasks):
tag = str(tag) # Required for comparison with Redis results.
flagged_categories = set()
num_results = 0
while num_results < num_tasks:
result = redis_moderation.blpop('queue:flagged_categories')
result_tag, categories = json.loads(result[1])
if result_tag == tag:
if categories:
for item in categories:
flagged_categories.add(item)
num_results += 1
return list(flagged_categories)