From 35e9847b27248f8741cddade39797b81ed18a25e Mon Sep 17 00:00:00 2001 From: Cyberes Date: Wed, 27 Sep 2023 18:36:51 -0600 Subject: [PATCH] set inference workers to daemon, add finally to inference worker, hide estimated avg tps --- llm_server/routes/helpers/http.py | 10 +++++++++ llm_server/routes/queue.py | 29 ++++++++++++++----------- llm_server/routes/v1/generate_stats.py | 6 +++-- llm_server/routes/v1/generate_stream.py | 8 +++++++ server.py | 7 ++++++ 5 files changed, 45 insertions(+), 15 deletions(-) diff --git a/llm_server/routes/helpers/http.py b/llm_server/routes/helpers/http.py index 20b37b6..acc7447 100644 --- a/llm_server/routes/helpers/http.py +++ b/llm_server/routes/helpers/http.py @@ -29,6 +29,16 @@ def cache_control(seconds): return decorator +# TODO: +# File "/srv/server/local-llm-server/llm_server/routes/request_handler.py", line 240, in before_request +# response = require_api_key() +# ^^^^^^^^^^^^^^^^^ +# File "/srv/server/local-llm-server/llm_server/routes/helpers/http.py", line 50, in require_api_key +# if token.startswith('SYSTEM__') or opts.auth_required: +# ^^^^^^^^^^^^^^^^ +# AttributeError: 'NoneType' object has no attribute 'startswith' + + def require_api_key(json_body: dict = None): if json_body: request_json = json_body diff --git a/llm_server/routes/queue.py b/llm_server/routes/queue.py index 102c40a..39b4c3e 100644 --- a/llm_server/routes/queue.py +++ b/llm_server/routes/queue.py @@ -107,23 +107,26 @@ def worker(): # TODO: only increment if not valid SYSTEM__ token redis.incr('active_gen_workers') - start_time = time.time() - success, response, error_msg = generator(request_json_body) - end_time = time.time() + try: + start_time = time.time() + success, response, error_msg = generator(request_json_body) + end_time = time.time() - elapsed_time = end_time - start_time - with generation_elapsed_lock: - generation_elapsed.append((end_time, elapsed_time)) + elapsed_time = end_time - start_time + with generation_elapsed_lock: + generation_elapsed.append((end_time, elapsed_time)) - event = DataEvent(event_id) - event.set((success, response, error_msg)) + event = DataEvent(event_id) + event.set((success, response, error_msg)) + finally: + decrement_ip_count(client_ip, 'processing_ips') - decrement_ip_count(client_ip, 'processing_ips') - - # TODO: only decrement if not valid SYSTEM__ token - redis.decr('active_gen_workers') + # TODO: only decrement if not valid SYSTEM__ token + redis.decr('active_gen_workers') def start_workers(num_workers: int): for _ in range(num_workers): - threading.Thread(target=worker).start() + t = threading.Thread(target=worker) + t.daemon = True + t.start() diff --git a/llm_server/routes/v1/generate_stats.py b/llm_server/routes/v1/generate_stats.py index 9fe69f9..ac5f7d4 100644 --- a/llm_server/routes/v1/generate_stats.py +++ b/llm_server/routes/v1/generate_stats.py @@ -57,7 +57,9 @@ def generate_stats(regen: bool = False): active_gen_workers = get_active_gen_workers() proompters_in_queue = len(priority_queue) - estimated_avg_tps = redis.get('estimated_avg_tps', float, default=0) + + # This is so wildly inaccurate it's disabled until I implement stats reporting into VLLM. + # estimated_avg_tps = redis.get('estimated_avg_tps', float, default=0) if opts.average_generation_time_mode == 'database': average_generation_time = redis.get('average_generation_elapsed_sec', float, default=0) @@ -99,7 +101,7 @@ def generate_stats(regen: bool = False): 'proompts_total': get_total_proompts() if opts.show_num_prompts else None, 'uptime': int((datetime.now() - server_start_time).total_seconds()) if opts.show_uptime else None, 'average_generation_elapsed_sec': int(gen_time_calc), - 'estimated_avg_tps': estimated_avg_tps, + # 'estimated_avg_tps': estimated_avg_tps, 'tokens_generated': sum_column('prompts', 'response_tokens') if opts.show_total_output_tokens else None, 'nvidia': netdata_stats }, diff --git a/llm_server/routes/v1/generate_stream.py b/llm_server/routes/v1/generate_stream.py index aa73120..288b1a2 100644 --- a/llm_server/routes/v1/generate_stream.py +++ b/llm_server/routes/v1/generate_stream.py @@ -88,6 +88,14 @@ def stream(ws): partial_response = b'' +# TODO: handle when the backend is offline +# Traceback (most recent call last): +# File "/srv/server/local-llm-server/llm_server/routes/v1/generate_stream.py", line 91, in stream +# for chunk in response.iter_content(chunk_size=1): +# ^^^^^^^^^^^^^^^^^^^^^ +# AttributeError: 'NoneType' object has no attribute 'iter_content' + + for chunk in response.iter_content(chunk_size=1): partial_response += chunk if partial_response.endswith(b'\x00'): diff --git a/server.py b/server.py index 98aaffa..f020caa 100644 --- a/server.py +++ b/server.py @@ -33,10 +33,15 @@ from llm_server.stream import init_socketio # TODO: set the max tokens to that of the lowest backend # TODO: implement RRD backend loadbalancer option +# Lower priority +# TODO: the processing stat showed -1 and I had to restart the server # TODO: simulate OpenAI error messages regardless of endpoint # TODO: send extra headers when ratelimited? # TODO: make sure log_prompt() is used everywhere, including errors and invalid requests # TODO: unify logging thread in a function and use async/await instead +# TODO: move the netdata stats to a seperate part of the stats and have it set to the currently selected backend +# TODO: have VLLM reply with stats (TPS, generated token count, processing time) +# TODO: add config reloading via stored redis variables # Done, but need to verify # TODO: add more excluding to SYSTEM__ tokens @@ -166,6 +171,8 @@ def pre_fork(server): # Start background processes start_workers(opts.concurrent_gens) + print(f'Started {opts.concurrent_gens} inference workers.') + start_moderation_workers(opts.openai_moderation_workers) process_avg_gen_time_background_thread = Thread(target=process_avg_gen_time) process_avg_gen_time_background_thread.daemon = True