import hashlib import pickle import traceback from llm_server import opts from llm_server.cluster.redis_cycle import add_backend_cycler, redis_cycle from llm_server.cluster.stores import redis_running_models from llm_server.custom_redis import RedisCustom from llm_server.routes.helpers.model import estimate_model_size class RedisClusterStore: def __init__(self, name: str, **kwargs): self.name = name self.config_redis = RedisCustom(name, **kwargs) def clear(self): self.config_redis.flush() def load(self, config: dict): for k, v in config.items(): self.add_backend(k, v) def add_backend(self, name: str, values: dict): self.config_redis.hset(name, mapping={k: pickle.dumps(v) for k, v in values.items()}) self.set_backend_value(name, 'online', False) h = hashlib.sha256(name.encode('utf-8')).hexdigest() self.set_backend_value(name, 'hash', f'{h[:8]}-{h[-8:]}') def set_backend_value(self, backend: str, key: str, value): # By storing the value as a pickle we don't have to cast anything when getting the value from Redis. self.config_redis.hset(backend, key, pickle.dumps(value)) def get_backend(self, name: str): r = self.config_redis.hgetall(name) output = {} for k, v in r.items(): output[k.decode('utf8')] = pickle.loads(v) return output def all(self): keys = self.config_redis.keys('*') if keys: result = {} for key in keys: if key != f'{self.name}:____': v = self.get_backend(key) result[key] = v return result else: return {} def validate_backend(self, backend_url: str): """ Returns the backend URL that was given, or a new one if that was offline. :param backend_url: :return: """ backend_info = self.get_backend(backend_url) if not backend_info['online']: old = backend_url backend_url = get_a_cluster_backend() print(f'Backend {old} offline. Request was redirected to {backend_url}') return backend_url cluster_config = RedisClusterStore('cluster_config') def get_backends(): backends = cluster_config.all() result = {} for k, v in backends.items(): b = cluster_config.get_backend(k) status = b.get('online', False) priority = b['priority'] result[k] = {'status': status, 'priority': priority} try: if not opts.prioritize_by_size: online_backends = sorted( ((url, info) for url, info in backends.items() if info['online']), key=lambda kv: -kv[1]['priority'], reverse=True ) else: online_backends = sorted( ((url, info) for url, info in backends.items() if info['online']), key=lambda kv: estimate_model_size(kv[1]['model_config']), reverse=True ) offline_backends = sorted( ((url, info) for url, info in backends.items() if not info['online']), key=lambda kv: -kv[1]['priority'], reverse=True ) return [url for url, info in online_backends], [url for url, info in offline_backends] except KeyError: traceback.print_exc() print(backends) def get_a_cluster_backend(model=None): """ Get a backend from Redis. If there are no online backends, return None. If `model` is not supplied, we will pick one ourself. """ if model: # First, determine if there are multiple backends hosting the same model. backends_hosting_model = [i.decode('utf-8') for i in redis_running_models.smembers(model)] # If so, create an iterator for those backends if len(backends_hosting_model): add_backend_cycler(model, backends_hosting_model) cycled = redis_cycle(model) if len(cycled): return cycled[0] else: # No backend hosting that model return None else: online, _ = get_backends() if len(online): return online[0]