175 lines
6.9 KiB
Python
175 lines
6.9 KiB
Python
import json
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from typing import Union
|
|
|
|
from llm_server.cluster.cluster_config import cluster_config
|
|
from llm_server.config.global_config import GlobalConfig
|
|
from llm_server.database.conn import CursorFromConnectionFromPool
|
|
from llm_server.llm import get_token_count
|
|
|
|
|
|
def do_db_log(ip: str, token: str, prompt: str, response: Union[str, None], gen_time: Union[int, float, None], parameters: dict, headers: dict, backend_response_code: int, request_url: str, backend_url: str, response_tokens: int = None, is_error: bool = False):
|
|
assert isinstance(prompt, str)
|
|
assert isinstance(backend_url, str)
|
|
|
|
# Try not to shove JSON into the database.
|
|
if isinstance(response, dict) and response.get('results'):
|
|
response = response['results'][0]['text']
|
|
try:
|
|
j = json.loads(response)
|
|
if j.get('results'):
|
|
response = j['results'][0]['text']
|
|
except:
|
|
pass
|
|
|
|
prompt_tokens = get_token_count(prompt, backend_url)
|
|
|
|
if not is_error:
|
|
if not response_tokens:
|
|
response_tokens = get_token_count(response, backend_url)
|
|
else:
|
|
response_tokens = 0
|
|
|
|
# Sometimes we may want to insert null into the DB, but
|
|
# usually we want to insert a float.
|
|
if gen_time is not None:
|
|
gen_time = round(gen_time, 3)
|
|
if is_error:
|
|
gen_time = None
|
|
|
|
if not GlobalConfig.get().log_prompts:
|
|
prompt = None
|
|
|
|
if not GlobalConfig.get().log_prompts and not is_error:
|
|
# TODO: test and verify this works as expected
|
|
response = None
|
|
|
|
if token:
|
|
increment_token_uses(token)
|
|
|
|
backend_info = cluster_config.get_backend(backend_url)
|
|
running_model = backend_info.get('model')
|
|
backend_mode = backend_info['mode']
|
|
timestamp = datetime.now()
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute("""
|
|
INSERT INTO messages
|
|
(ip, token, model, backend_mode, backend_url, request_url, generation_time, prompt, prompt_tokens, response, response_tokens, response_status, parameters, headers, timestamp)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
(ip, token, running_model, backend_mode, backend_url, request_url, gen_time, prompt, prompt_tokens, response, response_tokens, backend_response_code, json.dumps(parameters), json.dumps(headers), timestamp))
|
|
|
|
|
|
def is_valid_api_key(api_key: str):
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute("SELECT token, uses, max_uses, expire, disabled FROM token_auth WHERE token = %s", (api_key,))
|
|
row = cursor.fetchone()
|
|
if row is not None:
|
|
token, uses, max_uses, expire, disabled = row
|
|
disabled = bool(disabled)
|
|
if ((uses is None or max_uses is None) or uses < max_uses) and (expire is None or expire > time.time()) and not disabled:
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_api_key_moderated(api_key):
|
|
if not api_key:
|
|
return GlobalConfig.get().openai_moderation_enabled
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute("SELECT openai_moderation_enabled FROM token_auth WHERE token = %s", (api_key,))
|
|
row = cursor.fetchone()
|
|
if row is not None:
|
|
return bool(row[0])
|
|
return GlobalConfig.get().openai_moderation_enabled
|
|
|
|
|
|
def get_number_of_rows(table_name):
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table_name} WHERE token NOT LIKE 'SYSTEM__%%' OR token IS NULL")
|
|
result = cursor.fetchone()
|
|
return result[0]
|
|
|
|
|
|
def average_column(table_name, column_name):
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute(f"SELECT AVG({column_name}) FROM {table_name} WHERE token NOT LIKE 'SYSTEM__%%' OR token IS NULL")
|
|
result = cursor.fetchone()
|
|
return result[0]
|
|
|
|
|
|
def average_column_for_model(table_name, column_name, model_name):
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute(f"SELECT AVG({column_name}) FROM {table_name} WHERE model = %s AND token NOT LIKE 'SYSTEM__%%' OR token IS NULL", (model_name,))
|
|
result = cursor.fetchone()
|
|
return result[0]
|
|
|
|
|
|
def weighted_average_column_for_model(table_name, column_name, model_name, backend_name, backend_url, exclude_zeros: bool = False, include_system_tokens: bool = True):
|
|
if include_system_tokens:
|
|
sql = f"SELECT {column_name}, id FROM {table_name} WHERE model = %s AND backend_mode = %s AND backend_url = %s ORDER BY id DESC"
|
|
else:
|
|
sql = f"SELECT {column_name}, id FROM {table_name} WHERE model = %s AND backend_mode = %s AND backend_url = %s AND (token NOT LIKE 'SYSTEM__%%' OR token IS NULL) ORDER BY id DESC"
|
|
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
try:
|
|
cursor.execute(sql, (model_name, backend_name, backend_url,))
|
|
results = cursor.fetchall()
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
total_weight = 0
|
|
weighted_sum = 0
|
|
for i, (value, rowid) in enumerate(results):
|
|
if value is None or (exclude_zeros and value == 0):
|
|
continue
|
|
weight = i + 1
|
|
total_weight += weight
|
|
weighted_sum += weight * value
|
|
|
|
if total_weight > 0:
|
|
# Avoid division by zero
|
|
calculated_avg = weighted_sum / total_weight
|
|
else:
|
|
calculated_avg = 0
|
|
|
|
return calculated_avg
|
|
|
|
|
|
def sum_column(table_name, column_name):
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute(f"SELECT SUM({column_name}) FROM {table_name} WHERE token NOT LIKE 'SYSTEM__%%' OR token IS NULL")
|
|
result = cursor.fetchone()
|
|
return result[0] if result else 0
|
|
|
|
|
|
def get_distinct_ips_24h():
|
|
# Get the current time and subtract 24 hours (in seconds)
|
|
past_24_hours = datetime.now() - timedelta(days=1)
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute("SELECT COUNT(DISTINCT ip) FROM messages WHERE timestamp >= %s AND (token NOT LIKE 'SYSTEM__%%' OR token IS NULL)", (past_24_hours,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result else 0
|
|
|
|
|
|
def increment_token_uses(token):
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute('UPDATE token_auth SET uses = uses + 1 WHERE token = %s', (token,))
|
|
|
|
|
|
def get_token_ratelimit(token):
|
|
priority = 9990
|
|
simultaneous_ip = GlobalConfig.get().simultaneous_requests_per_ip
|
|
if token:
|
|
with CursorFromConnectionFromPool() as cursor:
|
|
cursor.execute("SELECT priority, simultaneous_ip FROM token_auth WHERE token = %s", (token,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
priority, simultaneous_ip = result
|
|
if simultaneous_ip is None:
|
|
# No ratelimit for this token if null
|
|
simultaneous_ip = 999999999
|
|
return priority, simultaneous_ip
|