MatrixGPT/matrix_gpt/bot/chat_functions.py

253 lines
10 KiB
Python
Raw Normal View History

import asyncio
import functools
2023-03-18 02:14:45 -06:00
import logging
2023-03-19 16:01:36 -06:00
import time
2023-03-19 14:46:42 -06:00
from types import ModuleType
2023-03-18 02:14:45 -06:00
from typing import List, Optional, Union
2023-03-19 15:37:55 -06:00
import stopit
2023-03-18 02:14:45 -06:00
from markdown import markdown
2023-03-18 16:57:15 -06:00
from nio import (AsyncClient, ErrorResponse, Event, MatrixRoom, MegolmEvent, Response, RoomMessageText, RoomSendResponse, SendRetryError, )
2023-03-18 02:14:45 -06:00
logger = logging.getLogger('MatrixGPT')
2023-03-19 15:22:05 -06:00
async def send_text_to_room(client: AsyncClient, room_id: str, message: str, notice: bool = False, markdown_convert: bool = True, reply_to_event_id: Optional[str] = None, thread: bool = False, thread_root_id: str = None) -> Union[RoomSendResponse, ErrorResponse]:
2023-03-18 02:14:45 -06:00
"""Send text to a matrix room.
Args:
client: The client to communicate to matrix with.
room_id: The ID of the room to send the message to.
message: The message content.
notice: Whether the message should be sent with an "m.notice" message type
(will not ping users).
markdown_convert: Whether to convert the message content to markdown.
Defaults to true.
reply_to_event_id: Whether this message is a reply to another event. The event
ID this is message is a reply to.
2023-03-19 14:46:42 -06:00
thread:
thread_root_id:
2023-03-18 02:14:45 -06:00
Returns:
A RoomSendResponse if the request was successful, else an ErrorResponse.
2023-03-19 14:46:42 -06:00
2023-03-18 02:14:45 -06:00
"""
# Determine whether to ping room members or not
msgtype = "m.notice" if notice else "m.text"
2023-03-19 15:22:05 -06:00
content = {"msgtype": msgtype, "format": "org.matrix.custom.html", "body": message, }
2023-03-18 02:14:45 -06:00
if markdown_convert:
content["formatted_body"] = markdown(message)
if reply_to_event_id:
if thread:
2023-03-19 15:22:05 -06:00
content["m.relates_to"] = {'event_id': thread_root_id, 'is_falling_back': True, "m.in_reply_to": {"event_id": reply_to_event_id}, 'rel_type': "m.thread"}
2023-03-18 02:14:45 -06:00
else:
2023-03-19 15:22:05 -06:00
content["m.relates_to"] = {"m.in_reply_to": {"event_id": reply_to_event_id}}
2023-03-18 02:14:45 -06:00
try:
2023-03-19 14:46:42 -06:00
return await client.room_send(room_id, "m.room.message", content, ignore_unverified_devices=True)
2023-03-18 02:14:45 -06:00
except SendRetryError:
logger.exception(f"Unable to send message response to {room_id}")
def make_pill(user_id: str, displayname: str = None) -> str:
"""Convert a user ID (and optionally a display name) to a formatted user 'pill'
Args:
user_id: The MXID of the user.
displayname: An optional displayname. Clients like Element will figure out the
correct display name no matter what, but other clients may not. If not
provided, the MXID will be used instead.
Returns:
The formatted user pill.
"""
if not displayname:
displayname = user_id
return f'<a href="https://matrix.to/#/{user_id}">{displayname}</a>'
2023-03-18 16:57:15 -06:00
async def react_to_event(client: AsyncClient, room_id: str, event_id: str, reaction_text: str, ) -> Union[Response, ErrorResponse]:
2023-03-18 02:14:45 -06:00
"""Reacts to a given event in a room with the given reaction text
Args:
client: The client to communicate to matrix with.
room_id: The ID of the room to send the message to.
event_id: The ID of the event to react to.
reaction_text: The string to react with. Can also be (one or more) emoji characters.
Returns:
A nio.Response or nio.ErrorResponse if an error occurred.
Raises:
SendRetryError: If the reaction was unable to be sent.
"""
2023-03-19 15:22:05 -06:00
content = {"m.relates_to": {"rel_type": "m.annotation", "event_id": event_id, "key": reaction_text}}
2023-03-18 16:57:15 -06:00
return await client.room_send(room_id, "m.reaction", content, ignore_unverified_devices=True, )
2023-03-18 02:14:45 -06:00
async def decryption_failure(self, room: MatrixRoom, event: MegolmEvent) -> None:
"""Callback for when an event fails to decrypt. Inform the user"""
2023-03-18 02:35:55 -06:00
# logger.error(
# f"Failed to decrypt event '{event.event_id}' in room '{room.room_id}'!"
# f"\n\n"
# f"Tip: try using a different device ID in your config file and restart."
# f"\n\n"
# f"If all else fails, delete your store directory and let the bot recreate "
# f"it (your reminders will NOT be deleted, but the bot may respond to existing "
# f"commands a second time)."
# )
2023-03-18 02:14:45 -06:00
2023-03-19 14:46:42 -06:00
user_msg = "Unable to decrypt this message. Check whether you've chosen to only encrypt to trusted devices."
2023-03-18 16:57:15 -06:00
await send_text_to_room(self.client, room.room_id, user_msg, reply_to_event_id=event.event_id, )
2023-03-18 02:14:45 -06:00
def is_thread(event: RoomMessageText):
return event.source['content'].get('m.relates_to', {}).get('rel_type') == 'm.thread'
2023-03-18 16:57:15 -06:00
async def is_this_our_thread(client: AsyncClient, room: MatrixRoom, event: RoomMessageText, command_flag: str):
base_event_id = event.source['content'].get('m.relates_to', {}).get('event_id')
if base_event_id:
return (await client.room_get_event(room.room_id, base_event_id)).event.body.startswith(f'{command_flag} ')
else:
return False
2023-03-18 02:14:45 -06:00
async def get_thread_content(client: AsyncClient, room: MatrixRoom, base_event: RoomMessageText) -> List[Event]:
messages = []
new_event = (await client.room_get_event(room.room_id, base_event.event_id)).event
while True:
if new_event.source['content'].get('m.relates_to', {}).get('rel_type') == 'm.thread':
messages.append(new_event)
else:
break
new_event = (await client.room_get_event(room.room_id, new_event.source['content']['m.relates_to']['m.in_reply_to']['event_id'])).event
messages.append((await client.room_get_event(room.room_id, base_event.source['content']['m.relates_to']['event_id'])).event) # put the root event in the array
messages.reverse()
return messages
async def process_chat(
client,
room,
event,
command,
store,
openai_obj: ModuleType,
openai_model: str,
openai_temperature: float,
openai_retries: int = 3,
thread_root_id: str = None,
system_prompt: str = None,
log_full_response: bool = False,
injected_system_prompt: str = False
):
2023-03-18 02:14:45 -06:00
if not store.check_seen_event(event.event_id):
2023-03-19 17:18:45 -06:00
await client.room_typing(room.room_id, typing_state=True, timeout=90000)
2023-03-18 02:14:45 -06:00
# if self.reply_in_thread:
# thread_content = await get_thread_content(self.client, self.room, self.event)
if isinstance(command, list):
messages = command
else:
2023-03-19 14:46:42 -06:00
messages = [{'role': 'user', 'content': command}]
2023-03-18 15:54:00 -06:00
2023-03-18 13:32:04 -06:00
if system_prompt:
2023-03-18 15:18:22 -06:00
messages.insert(0, {"role": "system", "content": system_prompt})
2023-03-18 15:24:15 -06:00
if injected_system_prompt:
2023-03-18 15:18:22 -06:00
if messages[-1]['role'] == 'system':
2023-03-18 15:24:15 -06:00
del messages[-1]
2023-03-18 15:54:00 -06:00
index = -9999
2023-03-19 14:46:42 -06:00
if len(messages) >= 3: # only inject the system prompt if this isn't the first reply
2023-03-18 15:54:00 -06:00
index = -1
elif not system_prompt:
index = 0
if index != -9999:
messages.insert(index, {"role": "system", "content": injected_system_prompt})
2023-03-19 15:22:05 -06:00
logger.debug(f'Generating reply to event {event.event_id}')
2023-03-19 15:37:55 -06:00
loop = asyncio.get_running_loop()
2023-03-19 15:52:15 -06:00
# I don't think the OpenAI py api has a built-in timeout
2023-03-19 15:37:55 -06:00
@stopit.threading_timeoutable(default=(None, None))
async def generate():
return await loop.run_in_executor(None, functools.partial(openai_obj.ChatCompletion.create, model=openai_model, messages=messages, temperature=openai_temperature, timeout=20))
# r = openai_obj.ChatCompletion.create(model=openai_model, messages=messages, temperature=openai_temperature, timeout=20)
2023-03-19 15:37:55 -06:00
2023-03-19 19:46:07 -06:00
response = None
2023-03-19 15:37:55 -06:00
for i in range(openai_retries):
2023-03-19 15:52:15 -06:00
try:
task = asyncio.create_task(generate(timeout=20))
asyncio.as_completed(task)
response = await task
if response is not None:
2023-03-19 15:52:15 -06:00
break
except Exception as e: # (stopit.utils.TimeoutException, openai.error.APIConnectionError)
2023-03-19 19:46:07 -06:00
logger.warning(f'Got exception when generating response to event {event.event_id}, retrying: {e}')
2023-03-19 17:18:45 -06:00
await client.room_typing(room.room_id, typing_state=True, timeout=15000)
2023-03-19 16:01:36 -06:00
time.sleep(2)
2023-03-19 15:52:15 -06:00
continue
if response is None:
2023-03-19 19:46:07 -06:00
logger.critical(f'Could not generate response to event {event.event_id} in room {room.room_id}.')
await client.room_typing(room.room_id, typing_state=False, timeout=15000)
2023-03-19 15:37:55 -06:00
await react_to_event(client, room.room_id, event.event_id, '')
return
text_response = response["choices"][0]["message"]["content"].strip().strip('\n')
2023-03-18 15:04:12 -06:00
2023-03-19 14:46:42 -06:00
# Logging stuff
2023-03-18 14:58:30 -06:00
if log_full_response:
2023-03-19 15:37:55 -06:00
logger.debug({'event_id': event.event_id, 'room': room.room_id, 'messages': messages, 'response': response})
2023-03-18 15:04:12 -06:00
z = text_response.replace("\n", "\\n")
if isinstance(command, str):
x = command.replace("\n", "\\n")
2023-03-18 15:08:34 -06:00
elif isinstance(command, list):
x = command[-1]['content'].replace("\n", "\\n")
2023-03-18 14:58:30 -06:00
else:
2023-03-18 15:04:12 -06:00
x = command
logger.info(f'Reply to {event.event_id} --> "{x}" and bot responded with "{z}"')
2023-03-18 02:14:45 -06:00
resp = await send_text_to_room(client, room.room_id, text_response, reply_to_event_id=event.event_id, thread=True, thread_root_id=thread_root_id if thread_root_id else event.event_id)
await client.room_typing(room.room_id, typing_state=False, timeout=3000)
2023-03-18 15:54:00 -06:00
2023-03-18 02:14:45 -06:00
store.add_event_id(event.event_id)
2023-03-18 14:06:40 -06:00
if not isinstance(resp, RoomSendResponse):
logger.critical(f'Failed to respond to event {event.event_id} in room {room.room_id}:\n{vars(resp)}')
await react_to_event(client, room.room_id, event.event_id, '')
2023-03-18 14:06:40 -06:00
else:
store.add_event_id(resp.event_id)
2023-03-18 02:30:12 -06:00
def check_authorized(string, to_check):
2023-03-18 03:21:03 -06:00
def check_str(s, c):
if c != 'all':
if '@' not in c and ':' not in c:
# Homeserver
if s.split(':')[-1] in c:
return True
elif s in c:
# By username
return True
elif c == 'all':
return True
return False
if isinstance(to_check, str):
return check_str(string, to_check)
elif isinstance(to_check, list):
output = False
for item in to_check:
if check_str(string, item):
output = True
return output
else:
raise Exception