138 lines
4.9 KiB
Python
138 lines
4.9 KiB
Python
import numpy as np
|
|
|
|
from llm_server.cluster.cluster_config import get_a_cluster_backend, cluster_config
|
|
from llm_server.cluster.stores import redis_running_models
|
|
from llm_server.config.global_config import GlobalConfig
|
|
from llm_server.custom_redis import redis
|
|
from llm_server.llm.generator import generator
|
|
from llm_server.llm.info import get_info
|
|
from llm_server.llm.vllm.vllm_backend import VLLMBackend
|
|
from llm_server.routes.queue import priority_queue
|
|
from llm_server.routes.stats import calculate_wait_time, get_active_gen_workers_model
|
|
|
|
|
|
def get_backends_from_model(model_name: str):
|
|
"""
|
|
Get the backends that are running a specific model. This is the inverse of `get_model_choices()`.
|
|
:param model_name:
|
|
:return:
|
|
"""
|
|
assert isinstance(model_name, str)
|
|
return [x.decode('utf-8') for x in redis_running_models.smembers(model_name)]
|
|
|
|
|
|
def get_running_models():
|
|
"""
|
|
Get all the models that are in the cluster.
|
|
:return:
|
|
"""
|
|
return [x for x in list(redis_running_models.keys())]
|
|
|
|
|
|
def is_valid_model(model_name: str) -> bool:
|
|
"""
|
|
Is this a model that is being hosted in the cluster?
|
|
:param model_name:
|
|
:return:
|
|
"""
|
|
return redis_running_models.exists(model_name)
|
|
|
|
|
|
def test_backend(backend_url: str, test_prompt: bool = False):
|
|
"""
|
|
Test (using a test prompt) a backend to check if it is online.
|
|
:param backend_url:
|
|
:param test_prompt:
|
|
:return:
|
|
"""
|
|
backend_info = cluster_config.get_backend(backend_url)
|
|
if test_prompt:
|
|
handler = VLLMBackend(backend_url)
|
|
parameters, _ = handler.get_parameters({
|
|
"stream": False,
|
|
"temperature": 0,
|
|
"max_new_tokens": 3,
|
|
})
|
|
data = {
|
|
'prompt': 'test prompt',
|
|
**parameters
|
|
}
|
|
try:
|
|
success, response, err = generator(data, backend_url, timeout=10)
|
|
if not success or not response or err:
|
|
return False, {}
|
|
except:
|
|
return False, {}
|
|
i = get_info(backend_url, backend_info['mode'])
|
|
if not i.get('model'):
|
|
return False, {}
|
|
return True, i
|
|
|
|
|
|
def get_model_choices(regen: bool = False) -> tuple[dict, dict]:
|
|
"""
|
|
Get the infor and stats of the models hosted in the cluster.
|
|
:param regen:
|
|
:return:
|
|
"""
|
|
if not regen:
|
|
c = redis.getp('model_choices')
|
|
if c:
|
|
return c
|
|
|
|
base_client_api = redis.get('base_client_api', dtype=str)
|
|
running_models = get_running_models()
|
|
|
|
model_choices = {}
|
|
for model in running_models:
|
|
b = get_backends_from_model(model)
|
|
|
|
context_size = []
|
|
avg_gen_per_worker = []
|
|
concurrent_gens = 0
|
|
for backend_url in b:
|
|
backend_info = cluster_config.get_backend(backend_url)
|
|
if backend_info.get('model_config'):
|
|
context_size.append(backend_info['model_config']['max_position_embeddings'])
|
|
if backend_info.get('average_generation_elapsed_sec'):
|
|
avg_gen_per_worker.append(backend_info['average_generation_elapsed_sec'])
|
|
concurrent_gens += backend_info['concurrent_gens']
|
|
|
|
active_gen_workers = get_active_gen_workers_model(model)
|
|
proompters_in_queue = priority_queue.len(model)
|
|
|
|
if len(avg_gen_per_worker):
|
|
average_generation_elapsed_sec = np.average(avg_gen_per_worker)
|
|
else:
|
|
average_generation_elapsed_sec = 0
|
|
estimated_wait_sec = calculate_wait_time(average_generation_elapsed_sec, proompters_in_queue, concurrent_gens, active_gen_workers)
|
|
|
|
model_choices[model] = {
|
|
'model': model,
|
|
'client_api': f'https://{base_client_api}/{model}',
|
|
'ws_client_api': f'wss://{base_client_api}/{model}/v1/stream' if GlobalConfig.get().enable_streaming else None,
|
|
'openai_client_api': f'https://{base_client_api}/openai/{model}/v1' if GlobalConfig.get().enable_openi_compatible_backend else 'disabled',
|
|
'backend_count': len(b),
|
|
'estimated_wait': estimated_wait_sec,
|
|
'queued': proompters_in_queue,
|
|
'processing': active_gen_workers,
|
|
'avg_generation_time': average_generation_elapsed_sec,
|
|
'concurrent_gens': concurrent_gens,
|
|
'context_size': min(context_size) if len(context_size) else None
|
|
}
|
|
|
|
# Python wants to sort lowercase vs. uppercase letters differently.
|
|
model_choices = dict(sorted(model_choices.items(), key=lambda item: item[0].upper()))
|
|
|
|
default_backend_url = get_a_cluster_backend()
|
|
default_backend_info = cluster_config.get_backend(default_backend_url)
|
|
if not default_backend_info.get('model'):
|
|
# If everything is offline.
|
|
model_choices = {}
|
|
default_model = None
|
|
else:
|
|
default_model = default_backend_info['model']
|
|
|
|
redis.setp('model_choices', (model_choices, default_model))
|
|
return model_choices, default_model
|