import json import time import traceback from datetime import datetime, timedelta from typing import Union from llm_server.cluster.cluster_config import cluster_config from llm_server.config.global_config import GlobalConfig from llm_server.database.conn import CursorFromConnectionFromPool from llm_server.llm import get_token_count def do_db_log(ip: str, token: str, prompt: str, response: Union[str, None], gen_time: Union[int, float, None], parameters: dict, headers: dict, backend_response_code: int, request_url: str, backend_url: str, response_tokens: int = None, is_error: bool = False): assert isinstance(prompt, str) assert isinstance(backend_url, str) # Try not to shove JSON into the database. if isinstance(response, dict) and response.get('results'): response = response['results'][0]['text'] try: j = json.loads(response) if j.get('results'): response = j['results'][0]['text'] except: pass prompt_tokens = get_token_count(prompt, backend_url) if not is_error: if not response_tokens: response_tokens = get_token_count(response, backend_url) else: response_tokens = 0 # Sometimes we may want to insert null into the DB, but # usually we want to insert a float. if gen_time is not None: gen_time = round(gen_time, 3) if is_error: gen_time = None if not GlobalConfig.get().log_prompts: prompt = None if not GlobalConfig.get().log_prompts and not is_error: # TODO: test and verify this works as expected response = None if token: increment_token_uses(token) backend_info = cluster_config.get_backend(backend_url) running_model = backend_info.get('model') backend_mode = backend_info['mode'] timestamp = datetime.now() with CursorFromConnectionFromPool() as cursor: cursor.execute(""" INSERT INTO messages (ip, token, model, backend_mode, backend_url, request_url, generation_time, prompt, prompt_tokens, response, response_tokens, response_status, parameters, headers, timestamp) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, (ip, token, running_model, backend_mode, backend_url, request_url, gen_time, prompt, prompt_tokens, response, response_tokens, backend_response_code, json.dumps(parameters), json.dumps(headers), timestamp)) def is_valid_api_key(api_key: str): with CursorFromConnectionFromPool() as cursor: cursor.execute("SELECT token, uses, max_uses, expire, disabled FROM token_auth WHERE token = %s", (api_key,)) row = cursor.fetchone() if row is not None: token, uses, max_uses, expire, disabled = row disabled = bool(disabled) if ((uses is None or max_uses is None) or uses < max_uses) and (expire is None or expire > time.time()) and not disabled: return True return False def is_api_key_moderated(api_key): if not api_key: return GlobalConfig.get().openai_moderation_enabled with CursorFromConnectionFromPool() as cursor: cursor.execute("SELECT openai_moderation_enabled FROM token_auth WHERE token = %s", (api_key,)) row = cursor.fetchone() if row is not None: return bool(row[0]) return GlobalConfig.get().openai_moderation_enabled def get_number_of_rows(table_name): with CursorFromConnectionFromPool() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {table_name} WHERE token NOT LIKE 'SYSTEM__%%' OR token IS NULL") result = cursor.fetchone() return result[0] def average_column(table_name, column_name): with CursorFromConnectionFromPool() as cursor: cursor.execute(f"SELECT AVG({column_name}) FROM {table_name} WHERE token NOT LIKE 'SYSTEM__%%' OR token IS NULL") result = cursor.fetchone() return result[0] def average_column_for_model(table_name, column_name, model_name): with CursorFromConnectionFromPool() as cursor: cursor.execute(f"SELECT AVG({column_name}) FROM {table_name} WHERE model = %s AND token NOT LIKE 'SYSTEM__%%' OR token IS NULL", (model_name,)) result = cursor.fetchone() return result[0] def weighted_average_column_for_model(table_name, column_name, model_name, backend_name, backend_url, exclude_zeros: bool = False, include_system_tokens: bool = True): if include_system_tokens: sql = f"SELECT {column_name}, id FROM {table_name} WHERE model = %s AND backend_mode = %s AND backend_url = %s ORDER BY id DESC" else: sql = f"SELECT {column_name}, id FROM {table_name} WHERE model = %s AND backend_mode = %s AND backend_url = %s AND (token NOT LIKE 'SYSTEM__%%' OR token IS NULL) ORDER BY id DESC" with CursorFromConnectionFromPool() as cursor: try: cursor.execute(sql, (model_name, backend_name, backend_url,)) results = cursor.fetchall() except Exception: traceback.print_exc() return None total_weight = 0 weighted_sum = 0 for i, (value, rowid) in enumerate(results): if value is None or (exclude_zeros and value == 0): continue weight = i + 1 total_weight += weight weighted_sum += weight * value if total_weight > 0: # Avoid division by zero calculated_avg = weighted_sum / total_weight else: calculated_avg = 0 return calculated_avg def sum_column(table_name, column_name): with CursorFromConnectionFromPool() as cursor: cursor.execute(f"SELECT SUM({column_name}) FROM {table_name} WHERE token NOT LIKE 'SYSTEM__%%' OR token IS NULL") result = cursor.fetchone() return result[0] if result else 0 def get_distinct_ips_24h(): # Get the current time and subtract 24 hours (in seconds) past_24_hours = datetime.now() - timedelta(days=1) with CursorFromConnectionFromPool() as cursor: cursor.execute("SELECT COUNT(DISTINCT ip) FROM messages WHERE timestamp >= %s AND (token NOT LIKE 'SYSTEM__%%' OR token IS NULL)", (past_24_hours,)) result = cursor.fetchone() return result[0] if result else 0 def increment_token_uses(token): with CursorFromConnectionFromPool() as cursor: cursor.execute('UPDATE token_auth SET uses = uses + 1 WHERE token = %s', (token,)) def get_token_ratelimit(token): priority = 9990 simultaneous_ip = GlobalConfig.get().simultaneous_requests_per_ip if token: with CursorFromConnectionFromPool() as cursor: cursor.execute("SELECT priority, simultaneous_ip FROM token_auth WHERE token = %s", (token,)) result = cursor.fetchone() if result: priority, simultaneous_ip = result if simultaneous_ip is None: # No ratelimit for this token if null simultaneous_ip = 999999999 return priority, simultaneous_ip