Merge cluster to master #3
|
@ -110,7 +110,7 @@ def get_model_choices(regen: bool = False):
|
|||
default_backend_url = get_a_cluster_backend()
|
||||
default_backend_info = cluster_config.get_backend(default_backend_url)
|
||||
if not default_backend_info.get('model'):
|
||||
return None, None
|
||||
return {}, None
|
||||
default_model = default_backend_info['model']
|
||||
|
||||
redis.setp('model_choices', (model_choices, default_model))
|
||||
|
|
|
@ -118,6 +118,7 @@ class DataEvent:
|
|||
|
||||
def wait(self):
|
||||
for item in self.pubsub.listen():
|
||||
print(item)
|
||||
if item['type'] == 'message':
|
||||
return pickle.loads(item['data'])
|
||||
|
||||
|
|
|
@ -17,8 +17,6 @@ def generate_stats(regen: bool = False):
|
|||
return c
|
||||
|
||||
model_choices, default_model = get_model_choices(regen=True)
|
||||
if not model_choices or not default_model:
|
||||
return 'Please wait for Redis to be populated...'
|
||||
|
||||
base_client_api = redis.get('base_client_api', dtype=str)
|
||||
proompters_5_min = len(redis.zrangebyscore('recent_prompters', time.time() - 5 * 60, '+inf'))
|
||||
|
|
|
@ -86,22 +86,21 @@ def worker(backend_url):
|
|||
redis_queue = RedisPriorityQueue(backend_url)
|
||||
while True:
|
||||
(request_json_body, client_ip, token, parameters), event_id, selected_model, timestamp, do_stream = redis_queue.get()
|
||||
backend_info = cluster_config.get_backend(backend_url)
|
||||
|
||||
if not backend_info['online']:
|
||||
# TODO: communicate to caller
|
||||
# redis.publish(event_id, 'offline')
|
||||
return
|
||||
|
||||
if not selected_model:
|
||||
selected_model = backend_info['model']
|
||||
|
||||
stream_redis.delete(get_stream_name(worker_id)) # clean up any old streams
|
||||
increment_ip_count(client_ip, 'processing_ips')
|
||||
incr_active_workers(selected_model, backend_url)
|
||||
status_redis.setp(str(worker_id), ('generating', client_ip))
|
||||
|
||||
try:
|
||||
backend_info = cluster_config.get_backend(backend_url)
|
||||
|
||||
if not backend_info['online']:
|
||||
redis.publish(event_id, 'canceled')
|
||||
return
|
||||
|
||||
if not selected_model:
|
||||
selected_model = backend_info['model']
|
||||
|
||||
stream_redis.delete(get_stream_name(worker_id)) # clean up any old streams
|
||||
increment_ip_count(client_ip, 'processing_ips')
|
||||
incr_active_workers(selected_model, backend_url)
|
||||
status_redis.setp(str(worker_id), ('generating', client_ip))
|
||||
|
||||
if do_stream:
|
||||
# Return the name of the stream that the slave should connect to.
|
||||
event = DataEvent(event_id)
|
||||
|
@ -120,6 +119,7 @@ def worker(backend_url):
|
|||
event.set((success, response, error_msg))
|
||||
except:
|
||||
traceback.print_exc()
|
||||
redis.publish(event_id, 'canceled')
|
||||
finally:
|
||||
decrement_ip_count(client_ip, 'processing_ips')
|
||||
decr_active_workers(selected_model, backend_url)
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
import logging
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from llm_server.cluster.backend import get_running_models
|
||||
from llm_server.cluster.backend import get_model_choices, get_running_models
|
||||
from llm_server.cluster.cluster_config import cluster_config
|
||||
from llm_server.custom_redis import redis
|
||||
from llm_server.routes.queue import priority_queue
|
||||
from llm_server.routes.v1.generate_stats import generate_stats
|
||||
|
||||
logger = logging.getLogger('console_printer')
|
||||
if not logger.handlers:
|
||||
|
@ -19,20 +21,33 @@ if not logger.handlers:
|
|||
def console_printer():
|
||||
time.sleep(3)
|
||||
while True:
|
||||
processing = redis.keys('active_gen_workers:http*') # backends always start with http
|
||||
processing_count = 0
|
||||
if len(processing):
|
||||
for k in processing:
|
||||
processing_count += redis.get(k, default=0, dtype=int)
|
||||
backends = [k for k, v in cluster_config.all().items() if v['online']]
|
||||
activity = priority_queue.activity()
|
||||
try:
|
||||
stats = generate_stats()
|
||||
model_choices, default_model = get_model_choices()
|
||||
|
||||
# Calculate the queue size the same way it's done on the stats.
|
||||
queue_size = 0
|
||||
running_models = get_running_models()
|
||||
for model in running_models:
|
||||
queue_size += priority_queue.len(model)
|
||||
processing_count = 0
|
||||
backend_count = len(stats['backends'])
|
||||
|
||||
# Active Workers and Processing should read the same. If not, that's an issue.
|
||||
logger.info(f'REQUEST QUEUE -> Active Workers: {len([i for i in activity if i[1]])} | Processing: {processing_count} | Queued: {queue_size} | Backends Online: {len(backends)}')
|
||||
if model_choices and default_model:
|
||||
for model, info in model_choices.items():
|
||||
processing_count += info['processing']
|
||||
|
||||
# processing = redis.keys('active_gen_workers:http*') # backends always start with http
|
||||
# processing_count = 0
|
||||
# if len(processing):
|
||||
# for k in processing:
|
||||
# processing_count += redis.get(k, default=0, dtype=int)
|
||||
# backends = [k for k, v in cluster_config.all().items() if v['online']]
|
||||
activity = priority_queue.activity()
|
||||
|
||||
# Calculate the queue size the same way it's done on the stats.
|
||||
queue_size = 0
|
||||
running_models = get_running_models()
|
||||
for model in running_models:
|
||||
queue_size += priority_queue.len(model)
|
||||
|
||||
# Active Workers and Processing should read the same. If not, that's an issue.
|
||||
logger.info(f'REQUEST QUEUE -> Active Workers: {len([i for i in activity if i[1]])} | Processing: {processing_count} | Queued: {queue_size} | Backends Online: {backend_count}')
|
||||
except:
|
||||
traceback.print_exc()
|
||||
time.sleep(10)
|
||||
|
|
26
server.py
26
server.py
|
@ -105,17 +105,27 @@ def home():
|
|||
stats = generate_stats()
|
||||
model_choices, default_model = get_model_choices()
|
||||
|
||||
if not model_choices.get(default_model):
|
||||
return 'The server is still starting up. Please wait...'
|
||||
if default_model:
|
||||
if not model_choices.get(default_model):
|
||||
return 'The server is still starting up. Please wait...'
|
||||
|
||||
default_model_info = model_choices[default_model]
|
||||
default_model_info = model_choices[default_model]
|
||||
|
||||
if default_model_info['queued'] == 0 and default_model_info['queued'] >= default_model_info['concurrent_gens']:
|
||||
# There will be a wait if the queue is empty but prompts are processing, but we don't
|
||||
# know how long.
|
||||
default_estimated_wait_sec = f"less than {int(default_model_info['estimated_wait'])} seconds"
|
||||
if default_model_info['queued'] == 0 and default_model_info['queued'] >= default_model_info['concurrent_gens']:
|
||||
# There will be a wait if the queue is empty but prompts are processing, but we don't
|
||||
# know how long.
|
||||
default_estimated_wait_sec = f"less than {int(default_model_info['estimated_wait'])} seconds"
|
||||
else:
|
||||
default_estimated_wait_sec = f"{int(default_model_info['estimated_wait'])} seconds"
|
||||
else:
|
||||
default_estimated_wait_sec = f"{int(default_model_info['estimated_wait'])} seconds"
|
||||
default_model_info = {
|
||||
'model': 'OFFLINE',
|
||||
'processing': 0,
|
||||
'queued': 0,
|
||||
|
||||
}
|
||||
default_estimated_wait_sec = 'OFFLINE'
|
||||
|
||||
|
||||
if len(config['analytics_tracking_code']):
|
||||
analytics_tracking_code = f"<script>\n{config['analytics_tracking_code']}\n</script>"
|
||||
|
|
Reference in New Issue