From a627fcd6bda52ee6e0504b4231097645576e5829 Mon Sep 17 00:00:00 2001 From: Cyberes Date: Sat, 8 Apr 2023 15:13:09 -0600 Subject: [PATCH] more command-specific options, better login, more error handling --- README.md | 13 ++- main.py | 69 +++++++++---- matrix_gpt/bot/callbacks.py | 166 ++++++++++++++++++------------- matrix_gpt/bot/chat_functions.py | 6 +- matrix_gpt/matrix.py | 43 ++++---- 5 files changed, 184 insertions(+), 113 deletions(-) diff --git a/README.md b/README.md index 26b8085..92003d9 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,8 @@ pip install -r requirements.txt Copy `config.sample.yaml` to `config.yaml` and fill it out with your bot's auth and your OpenAI API key. -Then invite your bot and start a chat by prefixing your message with `!c`. The bot will create a thread (you don't need to use `!c` in the thread). +Then invite your bot and start a chat by prefixing your message with `!c`. The bot will create a thread (you don't need +to use `!c` in the thread). I included a sample Systemd service. @@ -23,7 +24,13 @@ Invite the bot to your room and query it with the command `!c` (this can be chan Don't try to use two bots in the same thread. +The bot can give helpful reactions: + +- 🚫 means that the user is not allowed to chat with the bot. +- ❌ means the bot encountered an exception. The bot restarts when it encounters an exception which means it will not be able to respond + for a short time after this reaction. +- ❌ 🔐 means there was a decryption failure. + ## Encryption -This bot supports encryption. I recommend using [Pantalaimon](https://github.com/matrix-org/pantalaimon/) to manage encryption keys as the -built-in solution is a little janky and may be unreliable. If you want a private DM with the bot I recommend creating a new room without encryption. \ No newline at end of file +This bot supports encryption. I recommend using [Pantalaimon](https://github.com/matrix-org/pantalaimon/) to manage encryption keys as the built-in solution is a little janky and may be unreliable. diff --git a/main.py b/main.py index f0c4701..edaf73c 100755 --- a/main.py +++ b/main.py @@ -12,7 +12,7 @@ from uuid import uuid4 import openai import yaml from aiohttp import ClientConnectionError, ServerDisconnectedError -from nio import InviteMemberEvent, JoinResponse, LocalProtocolError, MegolmEvent, RoomMessageText +from nio import InviteMemberEvent, JoinResponse, MegolmEvent, RoomMessageText from matrix_gpt import MatrixNioGPTHelper from matrix_gpt.bot.callbacks import Callbacks @@ -58,11 +58,14 @@ check_config_value_exists(config_data, 'openai') check_config_value_exists(config_data['openai'], 'api_key') check_config_value_exists(config_data['openai'], 'model') -gpt4_enabled = True if config_data['command'].get('gpt4_prefix') else False -logger.info(f'GPT4 enabled? {gpt4_enabled}') +# gpt4_enabled = True if config_data['command'].get('gpt4_prefix') else False +# logger.info(f'GPT4 enabled? {gpt4_enabled}') command_prefixes = {} for k, v in config_data['command'].items(): + if 'allowed_to_chat' not in v.keys(): + # Set default value + v['allowed_to_chat'] = 'all' command_prefixes[k] = v @@ -89,6 +92,23 @@ async def main(): log_level = logging.INFO logger.setLevel(log_level) + l = logger.getEffectiveLevel() + if l == 10: + logger.debug('Log level is DEBUG') + elif l == 20: + logger.info('Log level is INFO') + elif l == 30: + logger.warning('Log level is WARNING') + elif l == 40: + logger.error('Log level is ERROR') + elif l == 50: + logger.critical('Log level is CRITICAL') + else: + logger.info(f'Log level is {l}') + del l + + logger.info(f'Command Prefixes: {[k for k, v in command_prefixes.items()]}') + # Logging in with a new device each time seems to fix encryption errors device_id = config_data['bot_auth'].get('device_id', str(uuid4())) @@ -118,7 +138,7 @@ async def main(): system_prompt=config_data['openai'].get('system_prompt'), injected_system_prompt=config_data['openai'].get('injected_system_prompt', False), openai_temperature=config_data['openai'].get('temperature', 0), - gpt4_enabled=gpt4_enabled, + # gpt4_enabled=gpt4_enabled, log_level=log_level ) client.add_event_callback(callbacks.message, RoomMessageText) @@ -129,21 +149,22 @@ async def main(): # Keep trying to reconnect on failure (with some time in-between) while True: try: - # Try to login with the configured username/password - try: - login_response = await matrix_helper.login() - - # Check if login failed - if not login_response[0]: - logger.error(f'Failed to login: {login_response[1].message}\n{vars(login_response[1])}') - retry() - return False - except LocalProtocolError as e: - # There's an edge case here where the user hasn't installed the correct C - # dependencies. In that case, a LocalProtocolError is raised on login. - logger.fatal(f'Failed to login:\n{e}') - retry() - return False + logger.info('Logging in...') + while True: + login_success, login_response = await matrix_helper.login() + if not login_success: + if 'M_LIMIT_EXCEEDED' in str(login_response): + try: + wait = int((int(str(login_response).split(' ')[-1][:-2]) / 1000) / 2) # only wait half the ratelimited time + logger.error(f'Ratelimited, sleeping {wait}s...') + time.sleep(wait) + except: + logger.error('Could not parse M_LIMIT_EXCEEDED') + else: + logger.error(f'Failed to login, retrying: {login_response}') + time.sleep(5) + else: + break # Login succeeded! logger.info(f"Logged in as {client.user_id} using device {device_id}.") @@ -161,10 +182,18 @@ async def main(): device_list = [x.id for x in devices] if device_id in device_list: device_list.remove(device_id) - x = await client.delete_devices(device_list, {"type": "m.login.password", "user": config_data['bot_auth']['username'], "password": config_data['bot_auth']['password']}) + x = await client.delete_devices(device_list, { + "type": "m.login.password", + "user": config_data['bot_auth']['username'], + "password": config_data['bot_auth']['password'] + } + ) logger.info(f'Logged out: {device_list}') await client.sync_forever(timeout=10000, full_state=True) + # except LocalProtocolError: + # logger.error(f'Failed to login, retrying in 5s...') + # time.sleep(5) except (ClientConnectionError, ServerDisconnectedError): logger.warning("Unable to connect to homeserver, retrying in 15s...") time.sleep(15) diff --git a/matrix_gpt/bot/callbacks.py b/matrix_gpt/bot/callbacks.py index 313289e..b7739b8 100644 --- a/matrix_gpt/bot/callbacks.py +++ b/matrix_gpt/bot/callbacks.py @@ -3,6 +3,7 @@ import asyncio import json import logging import time +import traceback from types import ModuleType from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, MegolmEvent, RoomMessageText, UnknownEvent) @@ -29,8 +30,8 @@ class Callbacks: log_full_response: bool = False, injected_system_prompt: str = False, openai_temperature: float = 0, - gpt4_enabled: bool = False, log_level=logging.INFO + # gpt4_enabled: bool = False, ): """ Args: @@ -52,7 +53,7 @@ class Callbacks: self.injected_system_prompt = injected_system_prompt self.openai_obj = openai_obj self.openai_temperature = openai_temperature - self.gpt4_enabled = gpt4_enabled + # self.gpt4_enabled = gpt4_enabled self.log_level = log_level async def message(self, room: MatrixRoom, event: RoomMessageText) -> None: @@ -70,6 +71,7 @@ class Callbacks: return if not check_authorized(event.sender, self.allowed_to_chat): + await react_to_event(self.client, room.room_id, event.event_id, "🚫") return if event.server_timestamp < self.startup_ts: @@ -88,81 +90,107 @@ class Callbacks: # else: # has_command_prefix = False - command_activated, selected_model, sent_command_prefix = check_command_prefix(msg, self.command_prefixes) + command_activated, sent_command_prefix, command_info = check_command_prefix(msg, self.command_prefixes) + + if not command_activated and is_thread(event): # Threaded messages + is_our_thread, sent_command_prefix, command_info = await is_this_our_thread(self.client, room, event, self.command_prefixes) - # General message listener - if not command_activated and is_thread(event): - is_our_thread, selected_model, sent_command_prefix = await is_this_our_thread(self.client, room, event, self.command_prefixes) if is_our_thread or room.member_count == 2: - await self.client.room_typing(room.room_id, typing_state=True, timeout=3000) - thread_content = await get_thread_content(self.client, room, event) - api_data = [] - for event in thread_content: - if isinstance(event, MegolmEvent): - resp = await send_text_to_room(self.client, room.room_id, '❌ 🔐 Decryption Failure', reply_to_event_id=event.event_id, thread=True, thread_root_id=thread_content[0].event_id) - logger.critical(f'Decryption failure for event {event.event_id} in room {room.room_id}') - await self.client.room_typing(room.room_id, typing_state=False, timeout=3000) - self.store.add_event_id(resp.event_id) + # Wrap this in a try/catch so we can add reaction on failure. + # But don't want to spam the chat with errors. + try: + if not check_authorized(event.sender, command_info['allowed_to_chat']): + await react_to_event(self.client, room.room_id, event.event_id, "🚫") return - else: - thread_msg = event.body.strip().strip('\n') - api_data.append( - { - 'role': 'assistant' if event.sender == self.client.user_id else 'user', - 'content': thread_msg if not check_command_prefix(thread_msg, self.command_prefixes) else thread_msg[len(self.command_prefixes):].strip() - } - ) # if len(thread_content) >= 2 and thread_content[0].body.startswith(self.command_prefix): # if thread_content[len(thread_content) - 2].sender == self.client.user - # TODO: process_chat() will set typing as false after generating. - # TODO: If there is still another query in-progress that typing state will be overwritten by the one that just finished. - async def inner(): - await process_chat( - self.client, - room, - event, - api_data, - self.store, - openai_obj=self.openai_obj, - openai_model=selected_model, - openai_temperature=self.openai_temperature, - thread_root_id=thread_content[0].event_id, - system_prompt=self.system_prompt, - log_full_response=self.log_full_response, - injected_system_prompt=self.injected_system_prompt - ) + await self.client.room_typing(room.room_id, typing_state=True, timeout=3000) + thread_content = await get_thread_content(self.client, room, event) + api_data = [] + for event in thread_content: + if isinstance(event, MegolmEvent): + resp = await send_text_to_room(self.client, + room.room_id, + '❌ 🔐 Decryption Failure', + reply_to_event_id=event.event_id, + thread=True, + thread_root_id=thread_content[0].event_id + ) + logger.critical(f'Decryption failure for event {event.event_id} in room {room.room_id}') + await self.client.room_typing(room.room_id, typing_state=False, timeout=3000) + self.store.add_event_id(resp.event_id) + return + else: + thread_msg = event.body.strip().strip('\n') + api_data.append( + { + 'role': 'assistant' if event.sender == self.client.user_id else 'user', + 'content': thread_msg if not check_command_prefix(thread_msg, self.command_prefixes)[0] else thread_msg[len(sent_command_prefix):].strip() + } + ) # if len(thread_content) >= 2 and thread_content[0].body.startswith(self.command_prefix): # if thread_content[len(thread_content) - 2].sender == self.client.user - asyncio.get_event_loop().create_task(inner()) + # TODO: process_chat() will set typing as false after generating. + # TODO: If there is still another query in-progress that typing state will be overwritten by the one that just finished. + async def inner(): + await process_chat( + self.client, + room, + event, + api_data, + self.store, + openai_obj=self.openai_obj, + openai_model=command_info['model'], + openai_temperature=self.openai_temperature, + thread_root_id=thread_content[0].event_id, + system_prompt=self.system_prompt, + log_full_response=self.log_full_response, + injected_system_prompt=self.injected_system_prompt + ) + + asyncio.get_event_loop().create_task(inner()) + except: + await react_to_event(self.client, room.room_id, event.event_id, '❌') + raise return - elif (command_activated or room.member_count == 2) and not is_thread(event): - # Otherwise if this is in a 1-1 with the bot or features a command prefix, treat it as a command. - msg = msg if not command_activated else msg[len(self.command_prefixes):].strip() # Remove the command prefix - command = Command( - self.client, - self.store, - msg, - room, - event, - openai_obj=self.openai_obj, - openai_model=selected_model, - openai_temperature=self.openai_temperature, - reply_in_thread=self.reply_in_thread, - system_prompt=self.system_prompt, - injected_system_prompt=self.injected_system_prompt, - log_full_response=self.log_full_response - ) - await command.process() + elif (command_activated or room.member_count == 2) and not is_thread(event): # Everything else + if not check_authorized(event.sender, command_info['allowed_to_chat']): + await react_to_event(self.client, room.room_id, event.event_id, "🚫") + return + try: + msg = msg if not command_activated else msg[len(sent_command_prefix):].strip() # Remove the command prefix + command = Command( + self.client, + self.store, + msg, + room, + event, + openai_obj=self.openai_obj, + openai_model=command_info['model'], + openai_temperature=self.openai_temperature, + reply_in_thread=self.reply_in_thread, + system_prompt=self.system_prompt, + injected_system_prompt=self.injected_system_prompt, + log_full_response=self.log_full_response + ) + await command.process() + except: + await react_to_event(self.client, room.room_id, event.event_id, '❌') + raise else: - if self.log_level == logging.DEBUG: - # This may be a little slow - debug = { - 'command_prefix': sent_command_prefix, - 'are_we_activated': command_activated, - 'is_dm': room.member_count == 2, - 'is_thread': is_thread(event), - 'is_our_thread': await is_this_our_thread(self.client, room, event, self.command_prefixes)[0] + # We don't want this debug info to crash the entire process if an error is encountered + try: + if self.log_level == logging.DEBUG: + # This may be a little slow + debug = { + 'command_prefix': sent_command_prefix, + 'are_we_activated': command_activated, + 'is_dm': room.member_count == 2, + 'is_thread': is_thread(event), + 'is_our_thread': await is_this_our_thread(self.client, room, event, self.command_prefixes)[0] - } - logger.debug(f"Bot not reacting to event {event.event_id}: {json.dumps(debug)}") + } + logger.debug(f"Bot not reacting to event {event.event_id}: {json.dumps(debug)}") + except Exception: + logger.critical(traceback.format_exc()) async def invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None: """Callback for when an invite is received. Join the room specified in the invite. diff --git a/matrix_gpt/bot/chat_functions.py b/matrix_gpt/bot/chat_functions.py index 82d21ca..83714dc 100644 --- a/matrix_gpt/bot/chat_functions.py +++ b/matrix_gpt/bot/chat_functions.py @@ -114,7 +114,7 @@ def is_thread(event: RoomMessageText): def check_command_prefix(string: str, prefixes: dict): for k, v in prefixes.items(): - if string.startswith(f'{v} '): + if string.startswith(f'{k} '): return True, k, v return False, None, None @@ -225,7 +225,7 @@ async def process_chat( x = command[-1]['content'].replace("\n", "\\n") else: x = command - logger.info(f'Reply to {event.event_id} --> "{x}" and bot responded with "{z}"') + logger.info(f'Reply to {event.event_id} --> "{x}" and bot ({openai_model}) responded with "{z}"') resp = await send_text_to_room(client, room.room_id, text_response, reply_to_event_id=event.event_id, thread=True, thread_root_id=thread_root_id if thread_root_id else event.event_id) await client.room_typing(room.room_id, typing_state=False, timeout=3000) @@ -261,4 +261,4 @@ def check_authorized(string, to_check): output = True return output else: - raise Exception + raise Exception \ No newline at end of file diff --git a/matrix_gpt/matrix.py b/matrix_gpt/matrix.py index 377e2d8..4902cf2 100644 --- a/matrix_gpt/matrix.py +++ b/matrix_gpt/matrix.py @@ -31,26 +31,33 @@ class MatrixNioGPTHelper: self.client = AsyncClient(self.homeserver, self.user_id, config=self.client_config, store_path=self.store_path, device_id=device_id) async def login(self) -> tuple[bool, LoginError] | tuple[bool, LoginResponse | None]: - # If there are no previously-saved credentials, we'll use the password - if not os.path.exists(self.auth_file): - resp = await self.client.login(self.passwd, device_name=self.device_name) + try: + # If there are no previously-saved credentials, we'll use the password + if not os.path.exists(self.auth_file): + resp = await self.client.login(self.passwd, device_name=self.device_name) - # check that we logged in succesfully - if isinstance(resp, LoginResponse): - self.write_details_to_disk(resp) + # check that we logged in succesfully + if isinstance(resp, LoginResponse): + self.write_details_to_disk(resp) + return True, resp + else: + return False, resp else: - # raise Exception(f'Failed to log in!\n{resp}') - return False, resp - else: - # Otherwise the config file exists, so we'll use the stored credentials - with open(self.auth_file, "r") as f: - config = json.load(f) - client = AsyncClient(config["homeserver"]) - client.access_token = config["access_token"] - client.user_id = config["user_id"] - client.device_id = config["device_id"] - resp = await self.client.login(self.passwd, device_name=self.device_name) - return True, resp + # Otherwise the config file exists, so we'll use the stored credentials + with open(self.auth_file, "r") as f: + config = json.load(f) + client = AsyncClient(config["homeserver"]) + client.access_token = config["access_token"] + client.user_id = config["user_id"] + client.device_id = config["device_id"] + resp = await self.client.login(self.passwd, device_name=self.device_name) + if isinstance(resp, LoginResponse): + self.write_details_to_disk(resp) + return True, resp + else: + return False, resp + except Exception: + return False, None def write_details_to_disk(self, resp: LoginResponse) -> None: """Writes the required login details to disk so we can log in later without