This commit is contained in:
Cyberes 2023-03-18 02:14:45 -06:00
parent c40bcf3d1a
commit 9579c5261d
15 changed files with 883 additions and 2 deletions

5
.gitignore vendored
View File

@ -1,3 +1,7 @@
.idea
bot-store/
config.yaml
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
@ -137,4 +141,3 @@ dmypy.json
# Cython debug symbols # Cython debug symbols
cython_debug/ cython_debug/

View File

@ -1,3 +1,14 @@
# MatrixGPT # MatrixGPT
ChatGPT bot for Matrix _ChatGPT bot for Matrix._
## Install
```bash
sudo apt install libolm-dev
pip install -r requirements.txt
```
### Is Encryption Supported?
No, not right now. Encryption is pointless because messages with the bot have to be processed and sent to OpenAI.

37
config.sample.yaml Normal file
View File

@ -0,0 +1,37 @@
# Make sure to quote any string with @ or ! characters.
data_storage: bot-store
bot_auth:
username: chatgpt
password: password1234
homeserver: matrix.example.com
store_path: 'bot-store/'
device_id: ABCDEFGHIJ
openai_api_key: sk-J12J3O12U3J1LK2J310283JIJ1L2K3J
openai_model: gpt-3.5-turbo
# Who is the bot allowed to respond to?
# Possible values: "all", an array of usernames, or a homeserver
allowed_to_chat: 'all'
# Who can invite the bot to a DM for a private chat?
# Possible values: "all", an array of usernames, or a homeserver
allowed_to_invite:
- '@bobjoe@example.com'
# Room IDs to auto-join.
autojoin_rooms:
- '!kjllkjlkj321123:example.com'
#whitelist_rooms:
#blacklist_rooms:
# Should the bot set its avatar on login?
#set_avatar: true
command_prefix: '!c'
reply_in_thread: true

136
main.py Executable file
View File

@ -0,0 +1,136 @@
#!/usr/bin/env python3
import argparse
import asyncio
import logging
import os
import sys
import time
import traceback
from pathlib import Path
import openai
import yaml
from aiohttp import ClientConnectionError, ServerDisconnectedError
from nio import InviteMemberEvent, JoinResponse, LocalProtocolError, MegolmEvent, RoomMessageText
from matrix_gpt import MatrixNioGPTHelper
from matrix_gpt.bot.callbacks import Callbacks
from matrix_gpt.bot.storage import Storage
from matrix_gpt.config import check_config_value_exists
script_directory = os.path.abspath(os.path.dirname(__file__))
logging.basicConfig()
logger = logging.getLogger('MatrixGPT')
logger.setLevel(logging.INFO)
parser = argparse.ArgumentParser(description='MatrixGPT Bot')
parser.add_argument('--config', default=Path(script_directory, 'config.yaml'), help='Path to config.yaml if it is not located next to this executable.')
args = parser.parse_args()
# Load config
if not Path(args.config).exists():
print('Config file does not exist:', args.config)
sys.exit(1)
else:
try:
with open(args.config, 'r') as file:
config_data = yaml.safe_load(file)
except Exception as e:
print(f'Failed to load config file: {e}')
sys.exit(1)
# Test config
check_config_value_exists(config_data, 'bot_auth', dict)
check_config_value_exists(config_data['bot_auth'], 'username')
check_config_value_exists(config_data['bot_auth'], 'password')
check_config_value_exists(config_data['bot_auth'], 'homeserver')
check_config_value_exists(config_data['bot_auth'], 'store_path')
check_config_value_exists(config_data, 'allowed_to_chat')
check_config_value_exists(config_data, 'allowed_to_invite', allow_empty=True)
check_config_value_exists(config_data, 'command_prefix')
check_config_value_exists(config_data, 'openai_api_key')
check_config_value_exists(config_data, 'openai_model')
check_config_value_exists(config_data, 'data_storage')
# check_config_value_exists(config_data, 'autojoin_rooms')
def retry(msg=None):
if msg:
logger.warning(f'{msg}, retrying in 15s...')
else:
logger.warning(f'Retrying in 15s...')
time.sleep(15)
async def main():
matrix_helper = MatrixNioGPTHelper(
auth_file=Path(config_data['bot_auth']['store_path'], 'bot_auth.json'),
user_id=config_data['bot_auth']['username'],
passwd=config_data['bot_auth']['password'],
homeserver=config_data['bot_auth']['homeserver'],
store_path=config_data['bot_auth']['store_path'],
device_id=config_data['bot_auth'].get('device_id')
)
client = matrix_helper.client
openai.api_key = config_data['openai_api_key']
openai_config = {
'model': config_data['openai_model'],
'openai': openai
}
storage = Storage(Path(config_data['data_storage'], 'matrixgpt.db'))
# Set up event callbacks
callbacks = Callbacks(client, storage, config_data['command_prefix'], openai_config, config_data.get('reply_in_thread', False), config_data['allowed_to_invite'], config_data['allowed_to_chat'])
client.add_event_callback(callbacks.message, RoomMessageText)
client.add_event_callback(callbacks.invite_event_filtered_callback, InviteMemberEvent)
client.add_event_callback(callbacks.decryption_failure, MegolmEvent)
# client.add_event_callback(callbacks.unknown, UnknownEvent)
# Keep trying to reconnect on failure (with some time in-between)
while True:
try:
# Try to login with the configured username/password
try:
login_response = await matrix_helper.login()
# Check if login failed
if not login_response[0]:
logger.error(f'Failed to login: {login_response[1].message}\n{vars(login_response[1])}')
retry()
return False
except LocalProtocolError as e:
# There's an edge case here where the user hasn't installed the correct C
# dependencies. In that case, a LocalProtocolError is raised on login.
logger.fatal(f'Failed to login:\n{e}')
retry()
return False
# Login succeeded!
logger.info(f"Logged in as {client.user_id}")
if config_data.get('autojoin_rooms'):
for room in config_data.get('autojoin_rooms'):
r = await client.join(room)
if not isinstance(r, JoinResponse):
logger.critical(f'Failed to join room {room}: {vars(r)}')
await client.sync_forever(timeout=10000, full_state=True)
except (ClientConnectionError, ServerDisconnectedError):
logger.warning("Unable to connect to homeserver, retrying in 15s...")
time.sleep(15)
finally:
await client.close()
sys.exit()
if __name__ == "__main__":
while True:
try:
asyncio.run(main())
except Exception:
logger.critical(traceback.format_exc())
time.sleep(5) # don't exit, keep going

1
matrix_gpt/__init__.py Normal file
View File

@ -0,0 +1 @@
from .matrix import MatrixNioGPTHelper

View File

View File

@ -0,0 +1,92 @@
import logging
from nio import AsyncClient, MatrixRoom, RoomMessageText
from .chat_functions import get_thread_content, process_chat, send_text_to_room
# from .config import Config
from .storage import Storage
logger = logging.getLogger('MatrixGPT')
class Command:
def __init__(
self,
client: AsyncClient,
store: Storage,
# config: Config,
command: str,
room: MatrixRoom,
event: RoomMessageText,
openai,
reply_in_thread
):
"""A command made by a user.
Args:
client: The client to communicate to matrix with.
store: Bot storage.
config: Bot configuration parameters.
command: The command and arguments.
room: The room the command was sent in.
event: The event describing the command.
"""
self.client = client
self.store = store
# self.config = config
self.command = command
self.room = room
self.event = event
self.args = self.command.split()[1:]
self.openai = openai
self.reply_in_thread = reply_in_thread
async def process(self):
"""Process the command"""
# await self.client.room_read_markers(self.room.room_id, self.event.event_id, self.event.event_id)
self.command = self.command.strip()
# if self.command.startswith("echo"):
# await self._echo()
# elif self.command.startswith("react"):
# await self._react()
if self.command.startswith("help"):
await self._show_help()
else:
await self._process_chat()
async def _process_chat(self):
await process_chat(self.client, self.room, self.event, self.command, self.store, self.openai)
async def _show_help(self):
"""Show the help text"""
# if not self.args:
# text = (
# "Hello, I am a bot made with matrix-nio! Use `help commands` to view "
# "available commands."
# )
# await send_text_to_room(self.client, self.room.room_id, text)
# return
# topic = self.args[0]
# if topic == "rules":
# text = "These are the rules!"
# elif topic == "commands":
# text = """Available commands:"""
# else:
# text = "Unknown help topic!"
text = 'Send your message to ChatGPT like this: `!c Hi ChatGPT, how are you?`'
await send_text_to_room(self.client, self.room.room_id, text)
async def _unknown_command(self):
await send_text_to_room(
self.client,
self.room.room_id,
f"Unknown command '{self.command}'. Try the 'help' command for more information.",
)

213
matrix_gpt/bot/callbacks.py Normal file
View File

@ -0,0 +1,213 @@
# https://github.com/anoadragon453/nio-template
import logging
import time
from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, MegolmEvent, RoomMessageText, UnknownEvent, )
from .bot_commands import Command
from .chat_functions import get_thread_content, is_thread, process_chat, react_to_event
# from .config import Config
from .storage import Storage
logger = logging.getLogger('MatrixGPT')
class Callbacks:
def __init__(self, client: AsyncClient, store: Storage, command_prefix: str, openai, reply_in_thread, allowed_to_invite, allowed_to_chat='all'):
"""
Args:
client: nio client used to interact with matrix.
store: Bot storage.
config: Bot configuration parameters.
"""
self.client = client
self.store = store
# self.config = config
self.command_prefix = command_prefix
self.openai = openai
self.startup_ts = time.time_ns() // 1_000_000
self.reply_in_thread = reply_in_thread
self.allowed_to_invite = allowed_to_invite if allowed_to_invite else []
self.allowed_to_chat = allowed_to_chat
async def message(self, room: MatrixRoom, event: RoomMessageText) -> None:
"""Callback for when a message event is received
Args:
room: The room the event came from.
event: The event defining the message.
"""
# Extract the message text
msg = event.body
logger.debug(f"Bot message received for room {room.display_name} | "
f"{room.user_name(event.sender)}: {msg}")
await self.client.room_read_markers(room.room_id, event.event_id, event.event_id)
# Ignore messages from ourselves
if event.sender == self.client.user_id:
return
if self.allowed_to_chat != 'all' and event.sender not in self.allowed_to_chat:
return
if event.server_timestamp < self.startup_ts:
logger.info(f'Skipping event as it was sent before startup time: {event.event_id}')
return
# if room.member_count > 2:
# has_command_prefix =
# else:
# has_command_prefix = False
# room.is_group is often a DM, but not always.
# room.is_group does not allow room aliases
# room.member_count > 2 ... we assume a public room
# room.member_count <= 2 ... we assume a DM
# General message listener
if not msg.startswith(self.command_prefix) and is_thread(event) and not self.store.check_seen_event(event.event_id):
await self.client.room_typing(room.room_id, typing_state=True, timeout=3000)
thread_content = await get_thread_content(self.client, room, event)
api_data = []
for event in thread_content:
api_data.append({'role': 'assistant' if event.sender == self.client.user_id else 'user', 'content': event.body if not event.body.startswith(self.command_prefix) else event.body[
len(self.command_prefix):].strip()}) # if len(thread_content) >= 2 and thread_content[0].body.startswith(self.command_prefix): # if thread_content[len(thread_content) - 2].sender == self.client.user
# message = Message(self.client, self.store, msg, room, event, self.reply_in_thread)
# await message.process()
api_data.append({'role': 'user', 'content': event.body})
print(thread_content)
await process_chat(self.client, room, event, api_data, self.store, self.openai, thread_root_id=thread_content[0].event_id)
return
elif msg.startswith(self.command_prefix) or room.member_count == 2:
# Otherwise if this is in a 1-1 with the bot or features a command prefix,
# treat it as a command
msg = event.body if not event.body.startswith(self.command_prefix) else event.body[len(self.command_prefix):].strip() # Remove the command prefix
command = Command(self.client, self.store, msg, room, event, self.openai, self.reply_in_thread)
await command.process()
async def invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None:
"""Callback for when an invite is received. Join the room specified in the invite.
Args:
room: The room that we are invited to.
event: The invite event.
"""
if event.sender not in self.allowed_to_invite:
logger.info(f"Got invite to {room.room_id} from {event.sender} but rejected.")
return
logger.debug(f"Got invite to {room.room_id} from {event.sender}.")
# Attempt to join 3 times before giving up
for attempt in range(3):
result = await self.client.join(room.room_id)
if type(result) == JoinError:
logger.error(f"Error joining room {room.room_id} (attempt %d): %s", attempt, result.message, )
else:
break
else:
logger.error("Unable to join room: %s", room.room_id)
# Successfully joined room
logger.info(f"Joined via invite: {room.room_id}")
async def invite_event_filtered_callback(self, room: MatrixRoom, event: InviteMemberEvent) -> None:
"""
Since the InviteMemberEvent is fired for every m.room.member state received
in a sync response's `rooms.invite` section, we will receive some that are
not actually our own invite event (such as the inviter's membership).
This makes sure we only call `callbacks.invite` with our own invite events.
"""
if event.state_key == self.client.user_id:
# This is our own membership (invite) event
await self.invite(room, event)
# async def _reaction(
# self, room: MatrixRoom, event: UnknownEvent, reacted_to_id: str
# ) -> None:
# """A reaction was sent to one of our messages. Let's send a reply acknowledging it.
#
# Args:
# room: The room the reaction was sent in.
#
# event: The reaction event.
#
# reacted_to_id: The event ID that the reaction points to.
# """
# logger.debug(f"Got reaction to {room.room_id} from {event.sender}.")
#
# # Get the original event that was reacted to
# event_response = await self.client.room_get_event(room.room_id, reacted_to_id)
# if isinstance(event_response, RoomGetEventError):
# logger.warning(
# "Error getting event that was reacted to (%s)", reacted_to_id
# )
# return
# reacted_to_event = event_response.event
#
# # Only acknowledge reactions to events that we sent
# if reacted_to_event.sender != self.config.user_id:
# return
#
# # Send a message acknowledging the reaction
# reaction_sender_pill = make_pill(event.sender)
# reaction_content = (
# event.source.get("content", {}).get("m.relates_to", {}).get("key")
# )
# message = (
# f"{reaction_sender_pill} reacted to this event with `{reaction_content}`!"
# )
# await send_text_to_room(
# self.client,
# room.room_id,
# message,
# reply_to_event_id=reacted_to_id,
# )
async def decryption_failure(self, room: MatrixRoom, event: MegolmEvent) -> None:
"""Callback for when an event fails to decrypt. Inform the user.
Args:
room: The room that the event that we were unable to decrypt is in.
event: The encrypted event that we were unable to decrypt.
"""
logger.error(f"Failed to decrypt event '{event.event_id}' in room '{room.room_id}'!"
f"\n\n"
f"Tip: try using a different device ID in your config file and restart."
f"\n\n"
f"If all else fails, delete your store directory and let the bot recreate "
f"it (your reminders will NOT be deleted, but the bot may respond to existing "
f"commands a second time).")
red_x_and_lock_emoji = "❌ 🔐"
# React to the undecryptable event with some emoji
await react_to_event(self.client, room.room_id, event.event_id, red_x_and_lock_emoji, )
async def unknown(self, room: MatrixRoom, event: UnknownEvent) -> None:
"""Callback for when an event with a type that is unknown to matrix-nio is received.
Currently this is used for reaction events, which are not yet part of a released
matrix spec (and are thus unknown to nio).
Args:
room: The room the reaction was sent in.
event: The event itself.
"""
# if event.type == "m.reaction":
# # Get the ID of the event this was a reaction to
# relation_dict = event.source.get("content", {}).get("m.relates_to", {})
#
# reacted_to = relation_dict.get("event_id")
# if reacted_to and relation_dict.get("rel_type") == "m.annotation":
# await self._reaction(room, event, reacted_to)
# return
logger.debug(f"Got unknown event with type to {event.type} from {event.sender} in {room.room_id}.")

View File

@ -0,0 +1,213 @@
import logging
from typing import List, Optional, Union
from markdown import markdown
from nio import (
AsyncClient,
ErrorResponse,
Event, MatrixRoom,
MegolmEvent,
Response,
RoomMessageText, RoomSendResponse,
SendRetryError,
)
logger = logging.getLogger('MatrixGPT')
async def send_text_to_room(
client: AsyncClient,
room_id: str,
message: str,
notice: bool = False,
markdown_convert: bool = True,
reply_to_event_id: Optional[str] = None,
thread: bool = False,
thread_root_id: str = None
) -> Union[RoomSendResponse, ErrorResponse]:
"""Send text to a matrix room.
Args:
client: The client to communicate to matrix with.
room_id: The ID of the room to send the message to.
message: The message content.
notice: Whether the message should be sent with an "m.notice" message type
(will not ping users).
markdown_convert: Whether to convert the message content to markdown.
Defaults to true.
reply_to_event_id: Whether this message is a reply to another event. The event
ID this is message is a reply to.
Returns:
A RoomSendResponse if the request was successful, else an ErrorResponse.
"""
# Determine whether to ping room members or not
msgtype = "m.notice" if notice else "m.text"
content = {
"msgtype": msgtype,
"format": "org.matrix.custom.html",
"body": message,
}
if markdown_convert:
content["formatted_body"] = markdown(message)
if reply_to_event_id:
if thread:
content["m.relates_to"] = {
'event_id': thread_root_id,
'is_falling_back': True,
"m.in_reply_to": {
"event_id": reply_to_event_id
},
'rel_type': "m.thread"
}
else:
content["m.relates_to"] = {"m.in_reply_to": {"event_id": reply_to_event_id}}
try:
return await client.room_send(
room_id,
"m.room.message",
content,
ignore_unverified_devices=True,
)
except SendRetryError:
logger.exception(f"Unable to send message response to {room_id}")
def make_pill(user_id: str, displayname: str = None) -> str:
"""Convert a user ID (and optionally a display name) to a formatted user 'pill'
Args:
user_id: The MXID of the user.
displayname: An optional displayname. Clients like Element will figure out the
correct display name no matter what, but other clients may not. If not
provided, the MXID will be used instead.
Returns:
The formatted user pill.
"""
if not displayname:
# Use the user ID as the displayname if not provided
displayname = user_id
return f'<a href="https://matrix.to/#/{user_id}">{displayname}</a>'
async def react_to_event(
client: AsyncClient,
room_id: str,
event_id: str,
reaction_text: str,
) -> Union[Response, ErrorResponse]:
"""Reacts to a given event in a room with the given reaction text
Args:
client: The client to communicate to matrix with.
room_id: The ID of the room to send the message to.
event_id: The ID of the event to react to.
reaction_text: The string to react with. Can also be (one or more) emoji characters.
Returns:
A nio.Response or nio.ErrorResponse if an error occurred.
Raises:
SendRetryError: If the reaction was unable to be sent.
"""
content = {
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": event_id,
"key": reaction_text,
}
}
return await client.room_send(
room_id,
"m.reaction",
content,
ignore_unverified_devices=True,
)
async def decryption_failure(self, room: MatrixRoom, event: MegolmEvent) -> None:
"""Callback for when an event fails to decrypt. Inform the user"""
logger.error(
f"Failed to decrypt event '{event.event_id}' in room '{room.room_id}'!"
f"\n\n"
f"Tip: try using a different device ID in your config file and restart."
f"\n\n"
f"If all else fails, delete your store directory and let the bot recreate "
f"it (your reminders will NOT be deleted, but the bot may respond to existing "
f"commands a second time)."
)
user_msg = (
"Unable to decrypt this message. "
"Check whether you've chosen to only encrypt to trusted devices."
)
await send_text_to_room(
self.client,
room.room_id,
user_msg,
reply_to_event_id=event.event_id,
)
def is_thread(event: RoomMessageText):
return event.source['content'].get('m.relates_to', {}).get('rel_type') == 'm.thread'
async def get_thread_content(client: AsyncClient, room: MatrixRoom, base_event: RoomMessageText) -> List[Event]:
messages = []
new_event = (await client.room_get_event(room.room_id, base_event.event_id)).event
while True:
if new_event.source['content'].get('m.relates_to', {}).get('rel_type') == 'm.thread':
messages.append(new_event)
else:
break
new_event = (await client.room_get_event(room.room_id, new_event.source['content']['m.relates_to']['m.in_reply_to']['event_id'])).event
messages.append((await client.room_get_event(room.room_id, base_event.source['content']['m.relates_to']['event_id'])).event) # put the root event in the array
messages.reverse()
return messages
async def process_chat(client, room, event, command, store, openai, thread_root_id: str = None):
if not store.check_seen_event(event.event_id):
await client.room_typing(room.room_id, typing_state=True, timeout=3000)
# if self.reply_in_thread:
# thread_content = await get_thread_content(self.client, self.room, self.event)
if isinstance(command, list):
messages = command
else:
messages = [
{'role': 'user', 'content': command},
]
response = openai['openai'].ChatCompletion.create(
model=openai['model'],
messages=messages,
temperature=0,
)
logger.debug(response)
text_response = response["choices"][0]["message"]["content"].strip().strip('\n')
logger.info(f'Reply to {event.event_id} --> "{command}" and bot responded with "{text_response}"')
resp = await send_text_to_room(client, room.room_id, text_response, reply_to_event_id=event.event_id, thread=True, thread_root_id=thread_root_id if thread_root_id else event.event_id)
await client.room_typing(room.room_id, typing_state=False, timeout=3000)
store.add_event_id(event.event_id)
store.add_event_id(resp.event_id)
# else:
# logger.info(f'Not responding to seen event {event.event_id}')

View File

@ -0,0 +1,55 @@
import logging
from nio import AsyncClient, MatrixRoom, RoomMessageText
from .chat_functions import send_text_to_room
# from .config import Config
from .storage import Storage
logger = logging.getLogger('MatrixGPT')
class Message:
def __init__(
self,
client: AsyncClient,
store: Storage,
# config: Config,
message_content: str,
room: MatrixRoom,
event: RoomMessageText,
openai,
):
"""Initialize a new Message
Args:
client: nio client used to interact with matrix.
store: Bot storage.
config: Bot configuration parameters.
message_content: The body of the message.
room: The room the event came from.
event: The event defining the message.
"""
self.client = client
self.store = store
# self.config = config
self.message_content = message_content
self.room = room
self.event = event
async def process(self) -> None:
"""Process and possibly respond to the message"""
if self.message_content.lower() == "hello world":
await self._hello_world()
async def _hello_world(self) -> None:
"""Say hello"""
text = "Hello, world!"
await send_text_to_room(self.client, self.room.room_id, text)

32
matrix_gpt/bot/storage.py Normal file
View File

@ -0,0 +1,32 @@
import logging
import sqlite3
from pathlib import Path
from typing import Union
logger = logging.getLogger('MatrixGPT')
class Storage:
insert_event = "INSERT INTO `seen_events` (`event_id`) VALUES (?);"
seen_events = set()
def __init__(self, database_file: Union[str, Path]):
self.conn = sqlite3.connect(database_file)
self.cursor = self.conn.cursor()
table_exists = self.cursor.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='seen_events';").fetchall()[0][0]
if table_exists == 0:
self.cursor.execute("CREATE TABLE `seen_events` (`event_id` text NOT NULL);")
logger.info('Created new database file.')
# This does not work
# db_seen_events = self.cursor.execute("SELECT `event_id` FROM `seen_events`;").fetchall()
def add_event_id(self, event_id):
self.seen_events.add(event_id)
# This makes the program exit???
# self.cursor.execute(self.insert_event, (event_id))
def check_seen_event(self, event_id):
return event_id in self.seen_events

14
matrix_gpt/config.py Normal file
View File

@ -0,0 +1,14 @@
import sys
def check_config_value_exists(config_part, key, check_type=None, allow_empty=False) -> bool:
if key not in config_part.keys():
print(f'Config key not found: "{key}"')
sys.exit(1)
if not allow_empty and config_part[key] is None or config_part[key] == '':
print(f'Config key "{key}" must not be empty.')
sys.exit(1)
if check_type and not isinstance(config_part[key], check_type):
print(f'Config key "{key}" must be type "{check_type}", not "{type(config_part[key])}".')
sys.exit(1)
return True

68
matrix_gpt/matrix.py Normal file
View File

@ -0,0 +1,68 @@
import json
import os
from pathlib import Path
from typing import Union
from nio import AsyncClient, AsyncClientConfig, LoginError
from nio import LoginResponse
class MatrixNioGPTHelper:
"""
A simple wrapper class for common matrix-nio actions.
"""
client = None
client_config = AsyncClientConfig(max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, )
def __init__(self, auth_file: Union[Path, str], user_id: str, passwd: str, homeserver: str, store_path: str, device_name: str = 'MatrixGPT', device_id: str = None):
self.auth_file = auth_file
self.user_id = user_id
self.passwd = passwd
self.homeserver = homeserver
if not (self.homeserver.startswith("https://") or self.homeserver.startswith("http://")):
self.homeserver = "https://" + self.homeserver
self.store_path = store_path
Path(self.store_path).mkdir(parents=True, exist_ok=True)
self.device_name = device_name
self.client = AsyncClient(self.homeserver, self.user_id, config=self.client_config, store_path=self.store_path, device_id=device_id)
async def login(self) -> tuple[bool, LoginError] | tuple[bool, LoginResponse | None]:
# If there are no previously-saved credentials, we'll use the password
if not os.path.exists(self.auth_file):
resp = await self.client.login(self.passwd, device_name=self.device_name)
# check that we logged in succesfully
if isinstance(resp, LoginResponse):
self.write_details_to_disk(resp)
else:
# raise Exception(f'Failed to log in!\n{resp}')
return False, resp
else:
# Otherwise the config file exists, so we'll use the stored credentials
with open(self.auth_file, "r") as f:
config = json.load(f)
client = AsyncClient(config["homeserver"])
client.access_token = config["access_token"]
client.user_id = config["user_id"]
client.device_id = config["device_id"]
resp = await self.client.login(self.passwd, device_name=self.device_name)
return True, resp
def write_details_to_disk(self, resp: LoginResponse) -> None:
"""Writes the required login details to disk so we can log in later without
using a password.
Arguments:
resp {LoginResponse} -- the successful client login response.
homeserver -- URL of homeserver, e.g. "https://matrix.example.org"
"""
with open(self.auth_file, "w") as f:
json.dump({"homeserver": self.homeserver, # e.g. "https://matrix.example.org"
"user_id": resp.user_id, # e.g. "@user:example.org"
"device_id": resp.device_id, # device ID, 10 uppercase letters
"access_token": resp.access_token, # cryptogr. access token
}, f, )

1
nio-template Submodule

@ -0,0 +1 @@
Subproject commit 3d8fbf142b71e5e9da3fbffb177fb44542de1200

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
matrix-nio[e2e]
pyyaml
markdown
python-olm
openai