fix streaming slowdown?

This commit is contained in:
Cyberes 2023-10-16 23:36:25 -06:00
parent c3c053e071
commit 9e3cbc9d2e
4 changed files with 12 additions and 11 deletions

View File

@ -157,8 +157,8 @@ def openai_chat_completions(model_name=None):
traceback.print_exc() traceback.print_exc()
yield 'data: [DONE]\n\n' yield 'data: [DONE]\n\n'
finally: finally:
# if event: if event:
# redis.lpush(f'notifications:{event.event_id}', 'canceled') redis.publish(f'notifications:{event.event_id}', 'canceled')
stream_redis.delete(stream_name) stream_redis.delete(stream_name)
return Response(generate(), mimetype='text/event-stream') return Response(generate(), mimetype='text/event-stream')

View File

@ -205,7 +205,7 @@ def openai_completions(model_name=None):
yield 'data: [DONE]\n\n' yield 'data: [DONE]\n\n'
finally: finally:
if event: if event:
redis.lpush(f'notifications:{event.event_id}', 'canceled') redis.publish(f'notifications:{event.event_id}', 'canceled')
stream_redis.delete(stream_name) stream_redis.delete(stream_name)
return Response(generate(), mimetype='text/event-stream') return Response(generate(), mimetype='text/event-stream')

View File

@ -185,8 +185,8 @@ def do_stream(ws, model_name):
backend_url=handler.backend_url backend_url=handler.backend_url
) )
finally: finally:
# if event_id: if event_id:
# redis.lpush(f'notifications:{event_id}', 'canceled') redis.publish(f'notifications:{event_id}', 'canceled')
try: try:
# Must close the connection or greenlets will complain. # Must close the connection or greenlets will complain.
ws.close() ws.close()

View File

@ -23,7 +23,8 @@ def get_stream_name(name: str):
def inference_do_stream(stream_name: str, msg_to_backend: dict, backend_url: str, event_id: str): def inference_do_stream(stream_name: str, msg_to_backend: dict, backend_url: str, event_id: str):
prompt = msg_to_backend['prompt'] prompt = msg_to_backend['prompt']
stream_name = get_stream_name(stream_name) stream_name = get_stream_name(stream_name)
redis.delete(f'notifications:{event_id}') pubsub = redis.pubsub()
pubsub.subscribe(f'notifications:{event_id}')
stream_redis.delete(get_stream_name(stream_name)) # be extra sure stream_redis.delete(get_stream_name(stream_name)) # be extra sure
try: try:
response = generator(msg_to_backend, backend_url) response = generator(msg_to_backend, backend_url)
@ -33,11 +34,11 @@ def inference_do_stream(stream_name: str, msg_to_backend: dict, backend_url: str
# If there is no more data, break the loop # If there is no more data, break the loop
if not chunk: if not chunk:
break break
# message = redis.lpop(f'notifications:{event_id}') message = pubsub.get_message()
# if message and message.decode('utf-8') == 'canceled': if message and message['data'] == b'canceled':
# print('Client canceled generation') print('Client canceled generation')
# response.close() response.close()
# return return
partial_response += chunk partial_response += chunk
if partial_response.endswith(b'\x00'): if partial_response.endswith(b'\x00'):