more command-specific options, better login, more error handling

This commit is contained in:
Cyberes 2023-04-08 15:13:09 -06:00
parent 312cdc5694
commit a627fcd6bd
5 changed files with 184 additions and 113 deletions

View File

@ -13,7 +13,8 @@ pip install -r requirements.txt
Copy `config.sample.yaml` to `config.yaml` and fill it out with your bot's auth and your OpenAI API key. Copy `config.sample.yaml` to `config.yaml` and fill it out with your bot's auth and your OpenAI API key.
Then invite your bot and start a chat by prefixing your message with `!c`. The bot will create a thread (you don't need to use `!c` in the thread). Then invite your bot and start a chat by prefixing your message with `!c`. The bot will create a thread (you don't need
to use `!c` in the thread).
I included a sample Systemd service. I included a sample Systemd service.
@ -23,7 +24,13 @@ Invite the bot to your room and query it with the command `!c` (this can be chan
Don't try to use two bots in the same thread. Don't try to use two bots in the same thread.
The bot can give helpful reactions:
- 🚫 means that the user is not allowed to chat with the bot.
- ❌ means the bot encountered an exception. The bot restarts when it encounters an exception which means it will not be able to respond
for a short time after this reaction.
- ❌ 🔐 means there was a decryption failure.
## Encryption ## Encryption
This bot supports encryption. I recommend using [Pantalaimon](https://github.com/matrix-org/pantalaimon/) to manage encryption keys as the This bot supports encryption. I recommend using [Pantalaimon](https://github.com/matrix-org/pantalaimon/) to manage encryption keys as the built-in solution is a little janky and may be unreliable.
built-in solution is a little janky and may be unreliable. If you want a private DM with the bot I recommend creating a new room without encryption.

69
main.py
View File

@ -12,7 +12,7 @@ from uuid import uuid4
import openai import openai
import yaml import yaml
from aiohttp import ClientConnectionError, ServerDisconnectedError from aiohttp import ClientConnectionError, ServerDisconnectedError
from nio import InviteMemberEvent, JoinResponse, LocalProtocolError, MegolmEvent, RoomMessageText from nio import InviteMemberEvent, JoinResponse, MegolmEvent, RoomMessageText
from matrix_gpt import MatrixNioGPTHelper from matrix_gpt import MatrixNioGPTHelper
from matrix_gpt.bot.callbacks import Callbacks from matrix_gpt.bot.callbacks import Callbacks
@ -58,11 +58,14 @@ check_config_value_exists(config_data, 'openai')
check_config_value_exists(config_data['openai'], 'api_key') check_config_value_exists(config_data['openai'], 'api_key')
check_config_value_exists(config_data['openai'], 'model') check_config_value_exists(config_data['openai'], 'model')
gpt4_enabled = True if config_data['command'].get('gpt4_prefix') else False # gpt4_enabled = True if config_data['command'].get('gpt4_prefix') else False
logger.info(f'GPT4 enabled? {gpt4_enabled}') # logger.info(f'GPT4 enabled? {gpt4_enabled}')
command_prefixes = {} command_prefixes = {}
for k, v in config_data['command'].items(): for k, v in config_data['command'].items():
if 'allowed_to_chat' not in v.keys():
# Set default value
v['allowed_to_chat'] = 'all'
command_prefixes[k] = v command_prefixes[k] = v
@ -89,6 +92,23 @@ async def main():
log_level = logging.INFO log_level = logging.INFO
logger.setLevel(log_level) logger.setLevel(log_level)
l = logger.getEffectiveLevel()
if l == 10:
logger.debug('Log level is DEBUG')
elif l == 20:
logger.info('Log level is INFO')
elif l == 30:
logger.warning('Log level is WARNING')
elif l == 40:
logger.error('Log level is ERROR')
elif l == 50:
logger.critical('Log level is CRITICAL')
else:
logger.info(f'Log level is {l}')
del l
logger.info(f'Command Prefixes: {[k for k, v in command_prefixes.items()]}')
# Logging in with a new device each time seems to fix encryption errors # Logging in with a new device each time seems to fix encryption errors
device_id = config_data['bot_auth'].get('device_id', str(uuid4())) device_id = config_data['bot_auth'].get('device_id', str(uuid4()))
@ -118,7 +138,7 @@ async def main():
system_prompt=config_data['openai'].get('system_prompt'), system_prompt=config_data['openai'].get('system_prompt'),
injected_system_prompt=config_data['openai'].get('injected_system_prompt', False), injected_system_prompt=config_data['openai'].get('injected_system_prompt', False),
openai_temperature=config_data['openai'].get('temperature', 0), openai_temperature=config_data['openai'].get('temperature', 0),
gpt4_enabled=gpt4_enabled, # gpt4_enabled=gpt4_enabled,
log_level=log_level log_level=log_level
) )
client.add_event_callback(callbacks.message, RoomMessageText) client.add_event_callback(callbacks.message, RoomMessageText)
@ -129,21 +149,22 @@ async def main():
# Keep trying to reconnect on failure (with some time in-between) # Keep trying to reconnect on failure (with some time in-between)
while True: while True:
try: try:
# Try to login with the configured username/password logger.info('Logging in...')
try: while True:
login_response = await matrix_helper.login() login_success, login_response = await matrix_helper.login()
if not login_success:
# Check if login failed if 'M_LIMIT_EXCEEDED' in str(login_response):
if not login_response[0]: try:
logger.error(f'Failed to login: {login_response[1].message}\n{vars(login_response[1])}') wait = int((int(str(login_response).split(' ')[-1][:-2]) / 1000) / 2) # only wait half the ratelimited time
retry() logger.error(f'Ratelimited, sleeping {wait}s...')
return False time.sleep(wait)
except LocalProtocolError as e: except:
# There's an edge case here where the user hasn't installed the correct C logger.error('Could not parse M_LIMIT_EXCEEDED')
# dependencies. In that case, a LocalProtocolError is raised on login. else:
logger.fatal(f'Failed to login:\n{e}') logger.error(f'Failed to login, retrying: {login_response}')
retry() time.sleep(5)
return False else:
break
# Login succeeded! # Login succeeded!
logger.info(f"Logged in as {client.user_id} using device {device_id}.") logger.info(f"Logged in as {client.user_id} using device {device_id}.")
@ -161,10 +182,18 @@ async def main():
device_list = [x.id for x in devices] device_list = [x.id for x in devices]
if device_id in device_list: if device_id in device_list:
device_list.remove(device_id) device_list.remove(device_id)
x = await client.delete_devices(device_list, {"type": "m.login.password", "user": config_data['bot_auth']['username'], "password": config_data['bot_auth']['password']}) x = await client.delete_devices(device_list, {
"type": "m.login.password",
"user": config_data['bot_auth']['username'],
"password": config_data['bot_auth']['password']
}
)
logger.info(f'Logged out: {device_list}') logger.info(f'Logged out: {device_list}')
await client.sync_forever(timeout=10000, full_state=True) await client.sync_forever(timeout=10000, full_state=True)
# except LocalProtocolError:
# logger.error(f'Failed to login, retrying in 5s...')
# time.sleep(5)
except (ClientConnectionError, ServerDisconnectedError): except (ClientConnectionError, ServerDisconnectedError):
logger.warning("Unable to connect to homeserver, retrying in 15s...") logger.warning("Unable to connect to homeserver, retrying in 15s...")
time.sleep(15) time.sleep(15)

View File

@ -3,6 +3,7 @@ import asyncio
import json import json
import logging import logging
import time import time
import traceback
from types import ModuleType from types import ModuleType
from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, MegolmEvent, RoomMessageText, UnknownEvent) from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, MegolmEvent, RoomMessageText, UnknownEvent)
@ -29,8 +30,8 @@ class Callbacks:
log_full_response: bool = False, log_full_response: bool = False,
injected_system_prompt: str = False, injected_system_prompt: str = False,
openai_temperature: float = 0, openai_temperature: float = 0,
gpt4_enabled: bool = False,
log_level=logging.INFO log_level=logging.INFO
# gpt4_enabled: bool = False,
): ):
""" """
Args: Args:
@ -52,7 +53,7 @@ class Callbacks:
self.injected_system_prompt = injected_system_prompt self.injected_system_prompt = injected_system_prompt
self.openai_obj = openai_obj self.openai_obj = openai_obj
self.openai_temperature = openai_temperature self.openai_temperature = openai_temperature
self.gpt4_enabled = gpt4_enabled # self.gpt4_enabled = gpt4_enabled
self.log_level = log_level self.log_level = log_level
async def message(self, room: MatrixRoom, event: RoomMessageText) -> None: async def message(self, room: MatrixRoom, event: RoomMessageText) -> None:
@ -70,6 +71,7 @@ class Callbacks:
return return
if not check_authorized(event.sender, self.allowed_to_chat): if not check_authorized(event.sender, self.allowed_to_chat):
await react_to_event(self.client, room.room_id, event.event_id, "🚫")
return return
if event.server_timestamp < self.startup_ts: if event.server_timestamp < self.startup_ts:
@ -88,81 +90,107 @@ class Callbacks:
# else: # else:
# has_command_prefix = False # has_command_prefix = False
command_activated, selected_model, sent_command_prefix = check_command_prefix(msg, self.command_prefixes) command_activated, sent_command_prefix, command_info = check_command_prefix(msg, self.command_prefixes)
if not command_activated and is_thread(event): # Threaded messages
is_our_thread, sent_command_prefix, command_info = await is_this_our_thread(self.client, room, event, self.command_prefixes)
# General message listener
if not command_activated and is_thread(event):
is_our_thread, selected_model, sent_command_prefix = await is_this_our_thread(self.client, room, event, self.command_prefixes)
if is_our_thread or room.member_count == 2: if is_our_thread or room.member_count == 2:
await self.client.room_typing(room.room_id, typing_state=True, timeout=3000) # Wrap this in a try/catch so we can add reaction on failure.
thread_content = await get_thread_content(self.client, room, event) # But don't want to spam the chat with errors.
api_data = [] try:
for event in thread_content: if not check_authorized(event.sender, command_info['allowed_to_chat']):
if isinstance(event, MegolmEvent): await react_to_event(self.client, room.room_id, event.event_id, "🚫")
resp = await send_text_to_room(self.client, room.room_id, '❌ 🔐 Decryption Failure', reply_to_event_id=event.event_id, thread=True, thread_root_id=thread_content[0].event_id)
logger.critical(f'Decryption failure for event {event.event_id} in room {room.room_id}')
await self.client.room_typing(room.room_id, typing_state=False, timeout=3000)
self.store.add_event_id(resp.event_id)
return return
else:
thread_msg = event.body.strip().strip('\n')
api_data.append(
{
'role': 'assistant' if event.sender == self.client.user_id else 'user',
'content': thread_msg if not check_command_prefix(thread_msg, self.command_prefixes) else thread_msg[len(self.command_prefixes):].strip()
}
) # if len(thread_content) >= 2 and thread_content[0].body.startswith(self.command_prefix): # if thread_content[len(thread_content) - 2].sender == self.client.user
# TODO: process_chat() will set typing as false after generating. await self.client.room_typing(room.room_id, typing_state=True, timeout=3000)
# TODO: If there is still another query in-progress that typing state will be overwritten by the one that just finished. thread_content = await get_thread_content(self.client, room, event)
async def inner(): api_data = []
await process_chat( for event in thread_content:
self.client, if isinstance(event, MegolmEvent):
room, resp = await send_text_to_room(self.client,
event, room.room_id,
api_data, '❌ 🔐 Decryption Failure',
self.store, reply_to_event_id=event.event_id,
openai_obj=self.openai_obj, thread=True,
openai_model=selected_model, thread_root_id=thread_content[0].event_id
openai_temperature=self.openai_temperature, )
thread_root_id=thread_content[0].event_id, logger.critical(f'Decryption failure for event {event.event_id} in room {room.room_id}')
system_prompt=self.system_prompt, await self.client.room_typing(room.room_id, typing_state=False, timeout=3000)
log_full_response=self.log_full_response, self.store.add_event_id(resp.event_id)
injected_system_prompt=self.injected_system_prompt return
) else:
thread_msg = event.body.strip().strip('\n')
api_data.append(
{
'role': 'assistant' if event.sender == self.client.user_id else 'user',
'content': thread_msg if not check_command_prefix(thread_msg, self.command_prefixes)[0] else thread_msg[len(sent_command_prefix):].strip()
}
) # if len(thread_content) >= 2 and thread_content[0].body.startswith(self.command_prefix): # if thread_content[len(thread_content) - 2].sender == self.client.user
asyncio.get_event_loop().create_task(inner()) # TODO: process_chat() will set typing as false after generating.
# TODO: If there is still another query in-progress that typing state will be overwritten by the one that just finished.
async def inner():
await process_chat(
self.client,
room,
event,
api_data,
self.store,
openai_obj=self.openai_obj,
openai_model=command_info['model'],
openai_temperature=self.openai_temperature,
thread_root_id=thread_content[0].event_id,
system_prompt=self.system_prompt,
log_full_response=self.log_full_response,
injected_system_prompt=self.injected_system_prompt
)
asyncio.get_event_loop().create_task(inner())
except:
await react_to_event(self.client, room.room_id, event.event_id, '')
raise
return return
elif (command_activated or room.member_count == 2) and not is_thread(event): elif (command_activated or room.member_count == 2) and not is_thread(event): # Everything else
# Otherwise if this is in a 1-1 with the bot or features a command prefix, treat it as a command. if not check_authorized(event.sender, command_info['allowed_to_chat']):
msg = msg if not command_activated else msg[len(self.command_prefixes):].strip() # Remove the command prefix await react_to_event(self.client, room.room_id, event.event_id, "🚫")
command = Command( return
self.client, try:
self.store, msg = msg if not command_activated else msg[len(sent_command_prefix):].strip() # Remove the command prefix
msg, command = Command(
room, self.client,
event, self.store,
openai_obj=self.openai_obj, msg,
openai_model=selected_model, room,
openai_temperature=self.openai_temperature, event,
reply_in_thread=self.reply_in_thread, openai_obj=self.openai_obj,
system_prompt=self.system_prompt, openai_model=command_info['model'],
injected_system_prompt=self.injected_system_prompt, openai_temperature=self.openai_temperature,
log_full_response=self.log_full_response reply_in_thread=self.reply_in_thread,
) system_prompt=self.system_prompt,
await command.process() injected_system_prompt=self.injected_system_prompt,
log_full_response=self.log_full_response
)
await command.process()
except:
await react_to_event(self.client, room.room_id, event.event_id, '')
raise
else: else:
if self.log_level == logging.DEBUG: # We don't want this debug info to crash the entire process if an error is encountered
# This may be a little slow try:
debug = { if self.log_level == logging.DEBUG:
'command_prefix': sent_command_prefix, # This may be a little slow
'are_we_activated': command_activated, debug = {
'is_dm': room.member_count == 2, 'command_prefix': sent_command_prefix,
'is_thread': is_thread(event), 'are_we_activated': command_activated,
'is_our_thread': await is_this_our_thread(self.client, room, event, self.command_prefixes)[0] 'is_dm': room.member_count == 2,
'is_thread': is_thread(event),
'is_our_thread': await is_this_our_thread(self.client, room, event, self.command_prefixes)[0]
} }
logger.debug(f"Bot not reacting to event {event.event_id}: {json.dumps(debug)}") logger.debug(f"Bot not reacting to event {event.event_id}: {json.dumps(debug)}")
except Exception:
logger.critical(traceback.format_exc())
async def invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None: async def invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None:
"""Callback for when an invite is received. Join the room specified in the invite. """Callback for when an invite is received. Join the room specified in the invite.

View File

@ -114,7 +114,7 @@ def is_thread(event: RoomMessageText):
def check_command_prefix(string: str, prefixes: dict): def check_command_prefix(string: str, prefixes: dict):
for k, v in prefixes.items(): for k, v in prefixes.items():
if string.startswith(f'{v} '): if string.startswith(f'{k} '):
return True, k, v return True, k, v
return False, None, None return False, None, None
@ -225,7 +225,7 @@ async def process_chat(
x = command[-1]['content'].replace("\n", "\\n") x = command[-1]['content'].replace("\n", "\\n")
else: else:
x = command x = command
logger.info(f'Reply to {event.event_id} --> "{x}" and bot responded with "{z}"') logger.info(f'Reply to {event.event_id} --> "{x}" and bot ({openai_model}) responded with "{z}"')
resp = await send_text_to_room(client, room.room_id, text_response, reply_to_event_id=event.event_id, thread=True, thread_root_id=thread_root_id if thread_root_id else event.event_id) resp = await send_text_to_room(client, room.room_id, text_response, reply_to_event_id=event.event_id, thread=True, thread_root_id=thread_root_id if thread_root_id else event.event_id)
await client.room_typing(room.room_id, typing_state=False, timeout=3000) await client.room_typing(room.room_id, typing_state=False, timeout=3000)

View File

@ -31,26 +31,33 @@ class MatrixNioGPTHelper:
self.client = AsyncClient(self.homeserver, self.user_id, config=self.client_config, store_path=self.store_path, device_id=device_id) self.client = AsyncClient(self.homeserver, self.user_id, config=self.client_config, store_path=self.store_path, device_id=device_id)
async def login(self) -> tuple[bool, LoginError] | tuple[bool, LoginResponse | None]: async def login(self) -> tuple[bool, LoginError] | tuple[bool, LoginResponse | None]:
# If there are no previously-saved credentials, we'll use the password try:
if not os.path.exists(self.auth_file): # If there are no previously-saved credentials, we'll use the password
resp = await self.client.login(self.passwd, device_name=self.device_name) if not os.path.exists(self.auth_file):
resp = await self.client.login(self.passwd, device_name=self.device_name)
# check that we logged in succesfully # check that we logged in succesfully
if isinstance(resp, LoginResponse): if isinstance(resp, LoginResponse):
self.write_details_to_disk(resp) self.write_details_to_disk(resp)
return True, resp
else:
return False, resp
else: else:
# raise Exception(f'Failed to log in!\n{resp}') # Otherwise the config file exists, so we'll use the stored credentials
return False, resp with open(self.auth_file, "r") as f:
else: config = json.load(f)
# Otherwise the config file exists, so we'll use the stored credentials client = AsyncClient(config["homeserver"])
with open(self.auth_file, "r") as f: client.access_token = config["access_token"]
config = json.load(f) client.user_id = config["user_id"]
client = AsyncClient(config["homeserver"]) client.device_id = config["device_id"]
client.access_token = config["access_token"] resp = await self.client.login(self.passwd, device_name=self.device_name)
client.user_id = config["user_id"] if isinstance(resp, LoginResponse):
client.device_id = config["device_id"] self.write_details_to_disk(resp)
resp = await self.client.login(self.passwd, device_name=self.device_name) return True, resp
return True, resp else:
return False, resp
except Exception:
return False, None
def write_details_to_disk(self, resp: LoginResponse) -> None: def write_details_to_disk(self, resp: LoginResponse) -> None:
"""Writes the required login details to disk so we can log in later without """Writes the required login details to disk so we can log in later without