From 9e3cbc9d2ecf7817c3771bd6026998f70e048d01 Mon Sep 17 00:00:00 2001 From: Cyberes Date: Mon, 16 Oct 2023 23:36:25 -0600 Subject: [PATCH] fix streaming slowdown? --- llm_server/routes/openai/chat_completions.py | 4 ++-- llm_server/routes/openai/completions.py | 2 +- llm_server/routes/v1/generate_stream.py | 4 ++-- llm_server/workers/inferencer.py | 13 +++++++------ 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/llm_server/routes/openai/chat_completions.py b/llm_server/routes/openai/chat_completions.py index fcad3a0..fbac971 100644 --- a/llm_server/routes/openai/chat_completions.py +++ b/llm_server/routes/openai/chat_completions.py @@ -157,8 +157,8 @@ def openai_chat_completions(model_name=None): traceback.print_exc() yield 'data: [DONE]\n\n' finally: - # if event: - # redis.lpush(f'notifications:{event.event_id}', 'canceled') + if event: + redis.publish(f'notifications:{event.event_id}', 'canceled') stream_redis.delete(stream_name) return Response(generate(), mimetype='text/event-stream') diff --git a/llm_server/routes/openai/completions.py b/llm_server/routes/openai/completions.py index b8efb07..e7b85ea 100644 --- a/llm_server/routes/openai/completions.py +++ b/llm_server/routes/openai/completions.py @@ -205,7 +205,7 @@ def openai_completions(model_name=None): yield 'data: [DONE]\n\n' finally: if event: - redis.lpush(f'notifications:{event.event_id}', 'canceled') + redis.publish(f'notifications:{event.event_id}', 'canceled') stream_redis.delete(stream_name) return Response(generate(), mimetype='text/event-stream') diff --git a/llm_server/routes/v1/generate_stream.py b/llm_server/routes/v1/generate_stream.py index e329cd8..72f4bad 100644 --- a/llm_server/routes/v1/generate_stream.py +++ b/llm_server/routes/v1/generate_stream.py @@ -185,8 +185,8 @@ def do_stream(ws, model_name): backend_url=handler.backend_url ) finally: - # if event_id: - # redis.lpush(f'notifications:{event_id}', 'canceled') + if event_id: + redis.publish(f'notifications:{event_id}', 'canceled') try: # Must close the connection or greenlets will complain. ws.close() diff --git a/llm_server/workers/inferencer.py b/llm_server/workers/inferencer.py index 3d05dc2..765ec8b 100644 --- a/llm_server/workers/inferencer.py +++ b/llm_server/workers/inferencer.py @@ -23,7 +23,8 @@ def get_stream_name(name: str): def inference_do_stream(stream_name: str, msg_to_backend: dict, backend_url: str, event_id: str): prompt = msg_to_backend['prompt'] stream_name = get_stream_name(stream_name) - redis.delete(f'notifications:{event_id}') + pubsub = redis.pubsub() + pubsub.subscribe(f'notifications:{event_id}') stream_redis.delete(get_stream_name(stream_name)) # be extra sure try: response = generator(msg_to_backend, backend_url) @@ -33,11 +34,11 @@ def inference_do_stream(stream_name: str, msg_to_backend: dict, backend_url: str # If there is no more data, break the loop if not chunk: break - # message = redis.lpop(f'notifications:{event_id}') - # if message and message.decode('utf-8') == 'canceled': - # print('Client canceled generation') - # response.close() - # return + message = pubsub.get_message() + if message and message['data'] == b'canceled': + print('Client canceled generation') + response.close() + return partial_response += chunk if partial_response.endswith(b'\x00'):