diff --git a/llm_server/cluster/backend.py b/llm_server/cluster/backend.py index 62dc2c5..9e2e19b 100644 --- a/llm_server/cluster/backend.py +++ b/llm_server/cluster/backend.py @@ -110,7 +110,7 @@ def get_model_choices(regen: bool = False): default_backend_url = get_a_cluster_backend() default_backend_info = cluster_config.get_backend(default_backend_url) if not default_backend_info.get('model'): - return None, None + return {}, None default_model = default_backend_info['model'] redis.setp('model_choices', (model_choices, default_model)) diff --git a/llm_server/routes/queue.py b/llm_server/routes/queue.py index 3bbdd1c..6bf66cd 100644 --- a/llm_server/routes/queue.py +++ b/llm_server/routes/queue.py @@ -118,6 +118,7 @@ class DataEvent: def wait(self): for item in self.pubsub.listen(): + print(item) if item['type'] == 'message': return pickle.loads(item['data']) diff --git a/llm_server/routes/v1/generate_stats.py b/llm_server/routes/v1/generate_stats.py index c5fa456..a9148b3 100644 --- a/llm_server/routes/v1/generate_stats.py +++ b/llm_server/routes/v1/generate_stats.py @@ -17,8 +17,6 @@ def generate_stats(regen: bool = False): return c model_choices, default_model = get_model_choices(regen=True) - if not model_choices or not default_model: - return 'Please wait for Redis to be populated...' base_client_api = redis.get('base_client_api', dtype=str) proompters_5_min = len(redis.zrangebyscore('recent_prompters', time.time() - 5 * 60, '+inf')) diff --git a/llm_server/workers/inferencer.py b/llm_server/workers/inferencer.py index 0738c1b..879e422 100644 --- a/llm_server/workers/inferencer.py +++ b/llm_server/workers/inferencer.py @@ -86,22 +86,21 @@ def worker(backend_url): redis_queue = RedisPriorityQueue(backend_url) while True: (request_json_body, client_ip, token, parameters), event_id, selected_model, timestamp, do_stream = redis_queue.get() - backend_info = cluster_config.get_backend(backend_url) - - if not backend_info['online']: - # TODO: communicate to caller - # redis.publish(event_id, 'offline') - return - - if not selected_model: - selected_model = backend_info['model'] - - stream_redis.delete(get_stream_name(worker_id)) # clean up any old streams - increment_ip_count(client_ip, 'processing_ips') - incr_active_workers(selected_model, backend_url) - status_redis.setp(str(worker_id), ('generating', client_ip)) - try: + backend_info = cluster_config.get_backend(backend_url) + + if not backend_info['online']: + redis.publish(event_id, 'canceled') + return + + if not selected_model: + selected_model = backend_info['model'] + + stream_redis.delete(get_stream_name(worker_id)) # clean up any old streams + increment_ip_count(client_ip, 'processing_ips') + incr_active_workers(selected_model, backend_url) + status_redis.setp(str(worker_id), ('generating', client_ip)) + if do_stream: # Return the name of the stream that the slave should connect to. event = DataEvent(event_id) @@ -120,6 +119,7 @@ def worker(backend_url): event.set((success, response, error_msg)) except: traceback.print_exc() + redis.publish(event_id, 'canceled') finally: decrement_ip_count(client_ip, 'processing_ips') decr_active_workers(selected_model, backend_url) diff --git a/llm_server/workers/printer.py b/llm_server/workers/printer.py index 759ae67..bfc62f8 100644 --- a/llm_server/workers/printer.py +++ b/llm_server/workers/printer.py @@ -1,10 +1,12 @@ import logging import time +import traceback -from llm_server.cluster.backend import get_running_models +from llm_server.cluster.backend import get_model_choices, get_running_models from llm_server.cluster.cluster_config import cluster_config from llm_server.custom_redis import redis from llm_server.routes.queue import priority_queue +from llm_server.routes.v1.generate_stats import generate_stats logger = logging.getLogger('console_printer') if not logger.handlers: @@ -19,20 +21,33 @@ if not logger.handlers: def console_printer(): time.sleep(3) while True: - processing = redis.keys('active_gen_workers:http*') # backends always start with http - processing_count = 0 - if len(processing): - for k in processing: - processing_count += redis.get(k, default=0, dtype=int) - backends = [k for k, v in cluster_config.all().items() if v['online']] - activity = priority_queue.activity() + try: + stats = generate_stats() + model_choices, default_model = get_model_choices() - # Calculate the queue size the same way it's done on the stats. - queue_size = 0 - running_models = get_running_models() - for model in running_models: - queue_size += priority_queue.len(model) + processing_count = 0 + backend_count = len(stats['backends']) - # Active Workers and Processing should read the same. If not, that's an issue. - logger.info(f'REQUEST QUEUE -> Active Workers: {len([i for i in activity if i[1]])} | Processing: {processing_count} | Queued: {queue_size} | Backends Online: {len(backends)}') + if model_choices and default_model: + for model, info in model_choices.items(): + processing_count += info['processing'] + + # processing = redis.keys('active_gen_workers:http*') # backends always start with http + # processing_count = 0 + # if len(processing): + # for k in processing: + # processing_count += redis.get(k, default=0, dtype=int) + # backends = [k for k, v in cluster_config.all().items() if v['online']] + activity = priority_queue.activity() + + # Calculate the queue size the same way it's done on the stats. + queue_size = 0 + running_models = get_running_models() + for model in running_models: + queue_size += priority_queue.len(model) + + # Active Workers and Processing should read the same. If not, that's an issue. + logger.info(f'REQUEST QUEUE -> Active Workers: {len([i for i in activity if i[1]])} | Processing: {processing_count} | Queued: {queue_size} | Backends Online: {backend_count}') + except: + traceback.print_exc() time.sleep(10) diff --git a/server.py b/server.py index 43aa9d2..37db35b 100644 --- a/server.py +++ b/server.py @@ -105,17 +105,27 @@ def home(): stats = generate_stats() model_choices, default_model = get_model_choices() - if not model_choices.get(default_model): - return 'The server is still starting up. Please wait...' + if default_model: + if not model_choices.get(default_model): + return 'The server is still starting up. Please wait...' - default_model_info = model_choices[default_model] + default_model_info = model_choices[default_model] - if default_model_info['queued'] == 0 and default_model_info['queued'] >= default_model_info['concurrent_gens']: - # There will be a wait if the queue is empty but prompts are processing, but we don't - # know how long. - default_estimated_wait_sec = f"less than {int(default_model_info['estimated_wait'])} seconds" + if default_model_info['queued'] == 0 and default_model_info['queued'] >= default_model_info['concurrent_gens']: + # There will be a wait if the queue is empty but prompts are processing, but we don't + # know how long. + default_estimated_wait_sec = f"less than {int(default_model_info['estimated_wait'])} seconds" + else: + default_estimated_wait_sec = f"{int(default_model_info['estimated_wait'])} seconds" else: - default_estimated_wait_sec = f"{int(default_model_info['estimated_wait'])} seconds" + default_model_info = { + 'model': 'OFFLINE', + 'processing': 0, + 'queued': 0, + + } + default_estimated_wait_sec = 'OFFLINE' + if len(config['analytics_tracking_code']): analytics_tracking_code = f""