diff --git a/llm_server/cluster/model_choices.py b/llm_server/cluster/model_choices.py index 3df3aea..1aaefca 100644 --- a/llm_server/cluster/model_choices.py +++ b/llm_server/cluster/model_choices.py @@ -23,12 +23,14 @@ def get_model_choices(regen: bool = False): context_size = [] avg_gen_per_worker = [] + concurrent_gens = 0 for backend_url in b: backend_info = cluster_config.get_backend(backend_url) if backend_info.get('model_config'): context_size.append(backend_info['model_config']['max_position_embeddings']) if backend_info.get('average_generation_elapsed_sec'): avg_gen_per_worker.append(backend_info['average_generation_elapsed_sec']) + concurrent_gens += backend_info['concurrent_gens'] active_gen_workers = get_active_gen_workers_model(model) proompters_in_queue = priority_queue.len(model) @@ -37,16 +39,10 @@ def get_model_choices(regen: bool = False): average_generation_elapsed_sec = np.average(avg_gen_per_worker) else: average_generation_elapsed_sec = 0 - estimated_wait_sec = calculate_wait_time(average_generation_elapsed_sec, proompters_in_queue, opts.concurrent_gens, active_gen_workers) - - if proompters_in_queue == 0 and active_gen_workers >= opts.concurrent_gens: - # There will be a wait if the queue is empty but prompts are processing, but we don't - # know how long. - estimated_wait_sec = f"less than {estimated_wait_sec} seconds" - else: - estimated_wait_sec = f"{estimated_wait_sec} seconds" + estimated_wait_sec = calculate_wait_time(average_generation_elapsed_sec, proompters_in_queue, concurrent_gens, active_gen_workers) model_choices[model] = { + 'model': model, 'client_api': f'https://{base_client_api}/{model}', 'ws_client_api': f'wss://{base_client_api}/{model}/v1/stream' if opts.enable_streaming else None, 'openai_client_api': f'https://{base_client_api}/openai/{model}' if opts.enable_openi_compatible_backend else 'disabled', @@ -55,6 +51,7 @@ def get_model_choices(regen: bool = False): 'queued': proompters_in_queue, 'processing': active_gen_workers, 'avg_generation_time': average_generation_elapsed_sec, + 'concurrent_gens': concurrent_gens } if len(context_size): @@ -63,30 +60,8 @@ def get_model_choices(regen: bool = False): # Python wants to sort lowercase vs. uppercase letters differently. model_choices = dict(sorted(model_choices.items(), key=lambda item: item[0].upper())) - default_backend = get_a_cluster_backend() - default_backend_dict = {} - if default_backend: - default_backend_info = cluster_config.get_backend(default_backend) - default_context_size = default_backend_info['model_config']['max_position_embeddings'] - default_average_generation_elapsed_sec = default_backend_info.get('average_generation_elapsed_sec') - default_active_gen_workers = redis.get(f'active_gen_workers:{default_backend}', dtype=int, default=0) - default_proompters_in_queue = priority_queue.len(default_backend_info['model']) - default_estimated_wait_sec = calculate_wait_time(default_average_generation_elapsed_sec, default_proompters_in_queue, default_backend_info['concurrent_gens'], default_active_gen_workers) + default_backend_url = get_a_cluster_backend() + default_model = cluster_config.get_backend(default_backend_url)['model'] - default_backend_dict = { - 'client_api': f'https://{base_client_api}', - 'ws_client_api': f'wss://{base_client_api}/v1/stream' if opts.enable_streaming else None, - 'openai_client_api': f'https://{base_client_api}/openai' if opts.enable_openi_compatible_backend else 'disabled', - 'estimated_wait': default_estimated_wait_sec, - 'queued': default_proompters_in_queue, - 'processing': default_active_gen_workers, - 'context_size': default_context_size, - 'hash': default_backend_info['hash'], - 'model': default_backend_info['model'], - 'avg_generation_time': default_average_generation_elapsed_sec, - 'online': True - } - - redis.setp('model_choices', (model_choices, default_backend_dict)) - - return model_choices, default_backend_dict + redis.setp('model_choices', (model_choices, default_model)) + return model_choices, default_model diff --git a/llm_server/config/config.py b/llm_server/config/config.py index 5308827..b33a9f2 100644 --- a/llm_server/config/config.py +++ b/llm_server/config/config.py @@ -1,6 +1,7 @@ import yaml config_default_vars = { + 'frontend_api_mode': 'ooba', 'log_prompts': False, 'database_path': './proxy-server.db', 'auth_required': False, @@ -28,19 +29,19 @@ config_default_vars = { 'openai_force_no_hashes': True, 'include_system_tokens_in_stats': True, 'openai_moderation_scan_last_n': 5, - 'openai_moderation_workers': 10, 'openai_org_name': 'OpenAI', 'openai_silent_trim': False, 'openai_moderation_enabled': True, 'netdata_root': None, 'show_backends': True, 'cluster_workers': 30, - 'background_homepage_cacher': True + 'background_homepage_cacher': True, + 'openai_moderation_timeout': 5 } -config_required_vars = ['cluster', 'mode', 'llm_middleware_name'] +config_required_vars = ['cluster', 'llm_middleware_name'] mode_ui_names = { - 'oobabooga': ('Text Gen WebUI (ooba)', 'Blocking API url', 'Streaming API url'), + 'ooba': ('Text Gen WebUI (ooba)', 'Blocking API url', 'Streaming API url'), 'vllm': ('Text Gen WebUI (ooba)', 'Blocking API url', 'Streaming API url'), } diff --git a/llm_server/config/load.py b/llm_server/config/load.py index edc5991..6f9db8d 100644 --- a/llm_server/config/load.py +++ b/llm_server/config/load.py @@ -16,15 +16,9 @@ def load_config(config_path): if not success: return success, config, msg - if config['mode'] not in ['oobabooga', 'vllm']: - print('Unknown mode:', config['mode']) - sys.exit(1) - # TODO: this is atrocious - opts.mode = config['mode'] opts.auth_required = config['auth_required'] opts.log_prompts = config['log_prompts'] - opts.concurrent_gens = config['concurrent_gens'] opts.frontend_api_client = config['frontend_api_client'] opts.show_num_prompts = config['show_num_prompts'] opts.show_uptime = config['show_uptime'] @@ -47,13 +41,14 @@ def load_config(config_path): opts.openai_force_no_hashes = config['openai_force_no_hashes'] opts.include_system_tokens_in_stats = config['include_system_tokens_in_stats'] opts.openai_moderation_scan_last_n = config['openai_moderation_scan_last_n'] - opts.openai_moderation_workers = config['openai_moderation_workers'] opts.openai_org_name = config['openai_org_name'] opts.openai_silent_trim = config['openai_silent_trim'] opts.openai_moderation_enabled = config['openai_moderation_enabled'] opts.show_backends = config['show_backends'] opts.cluster_workers = config['cluster_workers'] opts.background_homepage_cacher = config['background_homepage_cacher'] + opts.openai_moderation_timeout = config['openai_moderation_timeout'] + opts.frontend_api_mode = config['frontend_api_mode'] if opts.openai_expose_our_model and not opts.openai_api_key: print('If you set openai_epose_our_model to false, you must set your OpenAI key in openai_api_key.') @@ -75,8 +70,6 @@ def load_config(config_path): if config['load_num_prompts']: redis.set('proompts', get_number_of_rows('prompts')) - redis.set('backend_mode', opts.mode) - return success, config, msg diff --git a/llm_server/opts.py b/llm_server/opts.py index ae07ca4..38542a8 100644 --- a/llm_server/opts.py +++ b/llm_server/opts.py @@ -2,9 +2,7 @@ # TODO: rewrite the config system so I don't have to add every single config default here -concurrent_gens = 3 -mode = 'oobabooga' -backend_url = None +frontend_api_mode = 'ooba' max_new_tokens = 500 auth_required = False log_prompts = False @@ -31,7 +29,6 @@ openai_expose_our_model = False openai_force_no_hashes = True include_system_tokens_in_stats = True openai_moderation_scan_last_n = 5 -openai_moderation_workers = 10 openai_org_name = 'OpenAI' openai_silent_trim = False openai_moderation_enabled = True @@ -39,3 +36,4 @@ cluster = {} show_backends = True cluster_workers = 30 background_homepage_cacher = True +openai_moderation_timeout = 5 diff --git a/llm_server/routes/openai_request_handler.py b/llm_server/routes/openai_request_handler.py index 780e179..835b575 100644 --- a/llm_server/routes/openai_request_handler.py +++ b/llm_server/routes/openai_request_handler.py @@ -41,7 +41,7 @@ class OpenAIRequestHandler(RequestHandler): if opts.openai_moderation_enabled and opts.openai_api_key and is_api_key_moderated(self.token): print('moderating', self.token) try: - # Gather the last message from the user and all preceeding system messages + # Gather the last message from the user and all preceding system messages msg_l = self.request.json['messages'].copy() msg_l.reverse() tag = uuid4() @@ -78,8 +78,8 @@ class OpenAIRequestHandler(RequestHandler): def handle_ratelimited(self, do_log: bool = True): print('OAI ratelimited:', self.client_ip, self.request.headers) - _, default_backend_info = get_model_choices() - w = int(default_backend_info['estimated_wait']) if default_backend_info['estimated_wait'] > 0 else 2 + _, default_model = get_model_choices() + w = int(default_model['estimated_wait']) if default_model['estimated_wait'] > 0 else 2 response = jsonify({ "error": { "message": "Rate limit reached on tokens per min. Limit: 10000 / min. Please try again in 6s. Contact us through our help center at help.openai.com if you continue to have issues.", diff --git a/llm_server/routes/request_handler.py b/llm_server/routes/request_handler.py index 4e8b8e4..53be442 100644 --- a/llm_server/routes/request_handler.py +++ b/llm_server/routes/request_handler.py @@ -44,7 +44,10 @@ class RequestHandler: self.backend = get_backend_handler(self.cluster_backend_info['mode'], self.backend_url) self.parameters = None self.used = False - redis.zadd('recent_prompters', {self.client_ip: time.time()}) + + if not self.token.startswith('SYSTEM__'): + # "recent_prompters" is only used for stats. + redis.zadd('recent_prompters', {self.client_ip: time.time()}) def get_auth_token(self): if self.request_json_body.get('X-API-KEY'): diff --git a/llm_server/routes/v1/generate_stats.py b/llm_server/routes/v1/generate_stats.py index 500f015..c4bc2c0 100644 --- a/llm_server/routes/v1/generate_stats.py +++ b/llm_server/routes/v1/generate_stats.py @@ -47,8 +47,8 @@ def generate_stats(regen: bool = False): 'timestamp': int(time.time()), 'config': { 'gatekeeper': 'none' if opts.auth_required is False else 'token', - 'concurrent': opts.concurrent_gens, 'simultaneous_requests_per_ip': opts.simultaneous_requests_per_ip, + 'api_mode': opts.frontend_api_mode }, 'keys': { 'openaiKeys': '∞', @@ -76,7 +76,7 @@ def generate_stats(regen: bool = False): else: output['backend_info'] = {} - output['default'] = get_model_choices(regen=True)[1] + output['default_model'] = get_model_choices(regen=True)[1] result = deep_sort(output) diff --git a/llm_server/workers/moderator.py b/llm_server/workers/moderator.py index 27ccb28..86c2da5 100644 --- a/llm_server/workers/moderator.py +++ b/llm_server/workers/moderator.py @@ -1,9 +1,11 @@ import json import threading +import time import traceback import redis as redis_redis +from llm_server import opts from llm_server.llm.openai.moderation import check_moderation_endpoint redis_moderation = redis_redis.Redis() @@ -19,10 +21,32 @@ def start_moderation_workers(num_workers): print(f'Started {i} moderation workers.') +# TODO: don't use UUID tags to identify items. Use native redis + +def get_results(tag, num_tasks): + tag = str(tag) # Cast a UUID4 to a string. + flagged_categories = set() + num_results = 0 + start_time = time.time() + while num_results < num_tasks: + result = redis_moderation.blpop(['queue:flagged_categories'], timeout=opts.openai_moderation_timeout) + if result is None: + break # Timeout occurred, break the loop. + result_tag, categories = json.loads(result[1]) + if result_tag == tag: + if categories: + for item in categories: + flagged_categories.add(item) + num_results += 1 + if time.time() - start_time > opts.openai_moderation_timeout: + print('Timed out waiting for result from moderator.') + break + return list(flagged_categories) + + def moderation_worker(): - print('moderator started') while True: - result = redis_moderation.blpop('queue:msgs_to_check') + result = redis_moderation.blpop(['queue:msgs_to_check']) try: msg, tag = json.loads(result[1]) print(tag) @@ -36,18 +60,3 @@ def moderation_worker(): def add_moderation_task(msg, tag): redis_moderation.rpush('queue:msgs_to_check', json.dumps((msg, str(tag)))) - - -def get_results(tag, num_tasks): - tag = str(tag) # Required for comparison with Redis results. - flagged_categories = set() - num_results = 0 - while num_results < num_tasks: - result = redis_moderation.blpop('queue:flagged_categories') - result_tag, categories = json.loads(result[1]) - if result_tag == tag: - if categories: - for item in categories: - flagged_categories.add(item) - num_results += 1 - return list(flagged_categories) diff --git a/llm_server/workers/threader.py b/llm_server/workers/threader.py index 0c82559..fa6c252 100644 --- a/llm_server/workers/threader.py +++ b/llm_server/workers/threader.py @@ -2,6 +2,7 @@ import time from threading import Thread from llm_server import opts +from llm_server.cluster.cluster_config import cluster_config from llm_server.cluster.stores import redis_running_models from llm_server.cluster.worker import cluster_worker from llm_server.routes.v1.generate_stats import generate_stats @@ -26,7 +27,7 @@ def start_background(): t.start() print('Started the main background thread.') - start_moderation_workers(opts.openai_moderation_workers) + start_moderation_workers(opts.cluster_workers * 3) t = Thread(target=cache_stats) t.daemon = True diff --git a/server.py b/server.py index 382c7ff..0eb0f6c 100644 --- a/server.py +++ b/server.py @@ -24,12 +24,14 @@ from llm_server.routes.server_error import handle_server_error from llm_server.routes.v1 import bp from llm_server.sock import init_socketio +# TODO: make sure system tokens are excluded from 5/24 hr proompters # TODO: implement blind RRD controlled via header and only used when there is a queue on the primary backend(s) # TODO: is frequency penalty the same as ooba repetition penalty??? # TODO: make sure openai_moderation_enabled works on websockets, completions, and chat completions # TODO: if a backend is at its limit of concurrent requests, choose a different one # Lower priority +# TODO: fix moderation freezing after a while # TODO: support logit_bias on OpenAI and Ooba endpoints. # TODO: add a way to cancel VLLM gens. Maybe use websockets? # TODO: validate openai_silent_trim works as expected and only when enabled @@ -94,14 +96,15 @@ create_db() def home(): base_client_api = redis.get('base_client_api', dtype=str) stats = generate_stats() - model_choices, default_backend_info = get_model_choices() + model_choices, default_model = get_model_choices() + default_model_info = model_choices[default_model] - if default_backend_info['queued'] == 0 and default_backend_info['queued'] >= opts.concurrent_gens: + if default_model_info['queued'] == 0 and default_model_info['queued'] >= default_model_info['concurrent_gens']: # There will be a wait if the queue is empty but prompts are processing, but we don't # know how long. - default_estimated_wait_sec = f"less than {int(default_backend_info['estimated_wait'])} seconds" + default_estimated_wait_sec = f"less than {int(default_model_info['estimated_wait'])} seconds" else: - default_estimated_wait_sec = f"{int(default_backend_info['estimated_wait'])} seconds" + default_estimated_wait_sec = f"{int(default_model_info['estimated_wait'])} seconds" if len(config['analytics_tracking_code']): analytics_tracking_code = f"" @@ -123,17 +126,17 @@ def home(): llm_middleware_name=opts.llm_middleware_name, analytics_tracking_code=analytics_tracking_code, info_html=info_html, - default_model=default_backend_info['model'], - default_active_gen_workers=default_backend_info['processing'], - default_proompters_in_queue=default_backend_info['queued'], + default_model=default_model_info['model'], + default_active_gen_workers=default_model_info['processing'], + default_proompters_in_queue=default_model_info['queued'], current_model=opts.manual_model_name if opts.manual_model_name else None, # else running_model, client_api=f'https://{base_client_api}', ws_client_api=f'wss://{base_client_api}/v1/stream' if opts.enable_streaming else 'disabled', default_estimated_wait=default_estimated_wait_sec, - mode_name=mode_ui_names[opts.mode][0], - api_input_textbox=mode_ui_names[opts.mode][1], - streaming_input_textbox=mode_ui_names[opts.mode][2], - default_context_size=default_backend_info['context_size'], + mode_name=mode_ui_names[opts.frontend_api_mode][0], + api_input_textbox=mode_ui_names[opts.frontend_api_mode][1], + streaming_input_textbox=mode_ui_names[opts.frontend_api_mode][2], + default_context_size=default_model_info['context_size'], stats_json=json.dumps(stats, indent=4, ensure_ascii=False), extra_info=mode_info, openai_client_api=f'https://{base_client_api}/openai/v1' if opts.enable_openi_compatible_backend else 'disabled', diff --git a/templates/home.html b/templates/home.html index 3a020a4..66340a6 100644 --- a/templates/home.html +++ b/templates/home.html @@ -152,8 +152,16 @@ {% for key, value in model_choices.items() %}

{{ key }} - {{ value.backend_count }} {% if value.backend_count == 1 %}worker{% else %}workers{% endif %}

+ + {% if value.estimated_wait == 0 and value.estimated_wait >= value.concurrent_gens %} + {# There will be a wait if the queue is empty but prompts are processing, but we don't know how long. #} + {% set estimated_wait_sec = "less than " + value.estimated_wait|int|string + " seconds" %} + {% else %} + {% set estimated_wait_sec = value.estimated_wait|int|string + " seconds" %} + {% endif %} +

- Estimated Wait Time: {{ value.estimated_wait }}
+ Estimated Wait Time: {{ estimated_wait_sec }}
Processing: {{ value.processing }}
Queued: {{ value.queued }}