import numpy as np from llm_server.cluster.cluster_config import get_a_cluster_backend, cluster_config from llm_server.cluster.stores import redis_running_models from llm_server.config.global_config import GlobalConfig from llm_server.custom_redis import redis from llm_server.llm.generator import generator from llm_server.llm.info import get_info from llm_server.llm.vllm.vllm_backend import VLLMBackend from llm_server.routes.queue import priority_queue from llm_server.routes.stats import calculate_wait_time, get_active_gen_workers_model def get_backends_from_model(model_name: str): """ Get the backends that are running a specific model. This is the inverse of `get_model_choices()`. :param model_name: :return: """ assert isinstance(model_name, str) return [x.decode('utf-8') for x in redis_running_models.smembers(model_name)] def get_running_models(): """ Get all the models that are in the cluster. :return: """ return [x for x in list(redis_running_models.keys())] def is_valid_model(model_name: str) -> bool: """ Is this a model that is being hosted in the cluster? :param model_name: :return: """ return redis_running_models.exists(model_name) def test_backend(backend_url: str, test_prompt: bool = False): """ Test (using a test prompt) a backend to check if it is online. :param backend_url: :param test_prompt: :return: """ backend_info = cluster_config.get_backend(backend_url) if test_prompt: handler = VLLMBackend(backend_url) parameters, _ = handler.get_parameters({ "stream": False, "temperature": 0, "max_new_tokens": 3, }) data = { 'prompt': 'test prompt', **parameters } try: success, response, err = generator(data, backend_url, timeout=10) if not success or not response or err: return False, {} except: return False, {} i = get_info(backend_url, backend_info['mode']) if not i.get('model'): return False, {} return True, i def get_model_choices(regen: bool = False) -> tuple[dict, dict]: """ Get the infor and stats of the models hosted in the cluster. :param regen: :return: """ if not regen: c = redis.getp('model_choices') if c: return c base_client_api = redis.get('base_client_api', dtype=str) running_models = get_running_models() model_choices = {} for model in running_models: b = get_backends_from_model(model) context_size = [] avg_gen_per_worker = [] concurrent_gens = 0 for backend_url in b: backend_info = cluster_config.get_backend(backend_url) if backend_info.get('model_config'): context_size.append(backend_info['model_config']['max_position_embeddings']) if backend_info.get('average_generation_elapsed_sec'): avg_gen_per_worker.append(backend_info['average_generation_elapsed_sec']) concurrent_gens += backend_info['concurrent_gens'] active_gen_workers = get_active_gen_workers_model(model) proompters_in_queue = priority_queue.len(model) if len(avg_gen_per_worker): average_generation_elapsed_sec = np.average(avg_gen_per_worker) else: average_generation_elapsed_sec = 0 estimated_wait_sec = calculate_wait_time(average_generation_elapsed_sec, proompters_in_queue, concurrent_gens, active_gen_workers) model_choices[model] = { 'model': model, 'client_api': f'https://{base_client_api}/{model}', 'ws_client_api': f'wss://{base_client_api}/{model}/v1/stream' if GlobalConfig.get().enable_streaming else None, 'openai_client_api': f'https://{base_client_api}/openai/{model}/v1' if GlobalConfig.get().enable_openi_compatible_backend else 'disabled', 'backend_count': len(b), 'estimated_wait': estimated_wait_sec, 'queued': proompters_in_queue, 'processing': active_gen_workers, 'avg_generation_time': average_generation_elapsed_sec, 'concurrent_gens': concurrent_gens, 'context_size': min(context_size) if len(context_size) else None } # Python wants to sort lowercase vs. uppercase letters differently. model_choices = dict(sorted(model_choices.items(), key=lambda item: item[0].upper())) default_backend_url = get_a_cluster_backend() default_backend_info = cluster_config.get_backend(default_backend_url) if not default_backend_info.get('model'): # If everything is offline. model_choices = {} default_model = None else: default_model = default_backend_info['model'] redis.setp('model_choices', (model_choices, default_model)) return model_choices, default_model