server-personification/pers/langchain/history.py

69 lines
2.3 KiB
Python

import json
import pickle
from datetime import datetime
from typing import Union, Type
import redis
from langchain_core.messages import AIMessage, SystemMessage, FunctionMessage, HumanMessage
class HistoryManager:
def __init__(self, flush: bool, timestamp_messages: bool):
self._redis = redis.Redis(host='localhost', port=6379, db=0)
self._key = 'history'
self._timestamp_messages = timestamp_messages
if flush:
self._redis.flushdb()
self._redis.set('end_my_response', 0)
def _format_message(self, content: str, msg_type: Union[Type[HumanMessage], Type[AIMessage], Type[SystemMessage]]):
if self._timestamp_messages:
msg = msg_type(content=json.dumps({
'content': content,
'timestamp': datetime.now().strftime('%m-%d-%Y %H:%M:%S')
}))
else:
msg = msg_type(content=content)
return pickle.dumps(msg)
def add_human_msg(self, msg: str):
self._redis.rpush(self._key, self._format_message(msg, HumanMessage))
def add_agent_msg(self, msg: str):
self._redis.rpush(self._key, self._format_message(msg, AIMessage))
def add_system_msg(self, msg: str):
self._redis.rpush(self._key, self._format_message(msg, SystemMessage))
def add_function_output(self, name: str, output: str):
if self._timestamp_messages:
content = json.dumps(
{
'output': output,
'timestamp': datetime.now().strftime('%m-%d-%Y %H:%M:%S')
}
)
else:
content = output
self._redis.rpush(self._key, pickle.dumps(FunctionMessage(name=name, content=content)))
def acknowledge_stop(self):
last_item = pickle.loads(self._redis.lrange(self._key, -1, -1)[0])
if hasattr(last_item, 'name') and last_item.name == 'end_my_response':
self._redis.rpop(self._key)
return True
return False
@property
def history(self):
history = []
for item in self._redis.lrange(self._key, 0, -1):
history.append(pickle.loads(item))
return history
def __str__(self):
return str(self.history)
def __repr__(self):
print(self.history)