From dcdcc15b53e0d9f4d2dffec7ffa262c93187ef40 Mon Sep 17 00:00:00 2001 From: Cyberes Date: Wed, 3 Apr 2024 22:57:54 -0600 Subject: [PATCH] add files --- .gitignore | 4 + README.md | 26 ++++- config.yml.sample | 20 ++++ exporter/__init__.py | 0 exporter/callbacks.py | 202 +++++++++++++++++++++++++++++++++++ exporter/config.py | 16 +++ exporter/export.py | 115 ++++++++++++++++++++ exporter/matrix.py | 71 ++++++++++++ main.py | 58 ++++++++++ matrix-room-exporter.service | 0 requirements.txt | 3 + 11 files changed, 514 insertions(+), 1 deletion(-) create mode 100644 config.yml.sample create mode 100644 exporter/__init__.py create mode 100644 exporter/callbacks.py create mode 100644 exporter/config.py create mode 100644 exporter/export.py create mode 100644 exporter/matrix.py create mode 100644 main.py create mode 100644 matrix-room-exporter.service create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore index 5d381cc..5bb197a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +.idea +config.yml +bots/ + # ---> Python # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/README.md b/README.md index 5e2980c..b01bcc0 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,27 @@ # matrix-room-exporter -Script to export Matrix a room \ No newline at end of file +_Export a Matrix a room._ + +This is a simple bot that can export a Matrix room and upload the compressed archive to a Cloudflare R2 bucket for the +room to download. + +### Install + +1. `pip install -r requirements.txt` +2. `cp config.yml.sample config.yml` +3. [Create a Cloudflare R2 bucket](https://developers.cloudflare.com/r2/get-started/#2-create-a-bucket). +4. Enable [public access](https://developers.cloudflare.com/r2/buckets/public-buckets/). +5. Create an [access key](https://developers.cloudflare.com/r2/api/s3/tokens/). +6. Set up an [object lifecycle rule](https://developers.cloudflare.com/r2/buckets/object-lifecycles/) to auto-delete old + exports from your bucket. 24 hours is a good choice. +7. Configure `config.yml` with your R2 credentials. +8. Set up a [matrix-org/pantalaimon](https://github.com/matrix-org/pantalaimon) for encrypted sync. +9. Create a new Matrix user and configure it in `config.yml`. +10. Start the bot with `python3 main.py` + +### Use + +1. Invite the new bot to the room you want to export. +2. Send the message `!export` to start the export process. + +The bot will upload the exported room to your R2 bucket and share the link with the room. \ No newline at end of file diff --git a/config.yml.sample b/config.yml.sample new file mode 100644 index 0000000..82d7e62 --- /dev/null +++ b/config.yml.sample @@ -0,0 +1,20 @@ +auth: + username: exportbot + password: password1234 + homeserver: https://pantalaimon.example.com + +# Where to cache the bot's login info. +store_path: ./bots + +command_prefix: '!export' + +# How often a room is allowed to be exported. +# If an export failed, the interval will be half of this value. +export_interval: 3600 + +r2: + bucket_name: export-bot + accountid: 12345 + access_key_id: 67890 + access_key_secret: abc1234 + pub_url: https://pub-example12345.r2.dev \ No newline at end of file diff --git a/exporter/__init__.py b/exporter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/exporter/callbacks.py b/exporter/callbacks.py new file mode 100644 index 0000000..d6c6817 --- /dev/null +++ b/exporter/callbacks.py @@ -0,0 +1,202 @@ +import copy +import json +import logging +import shutil +import tempfile +import time +import traceback +from datetime import datetime, timedelta +from pathlib import Path + +import aiofiles +import aiofiles.os as aos +from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, RoomMessageText, SyncResponse, RoomMessageMedia, MessageDirection) + +from exporter.export import download_mxc, zip_directory, upload_to_r2, trim_filename, fetch_events + + +class RoomExport: + def __init__(self, room_id: str, allowed_again: datetime): + self.room_id = room_id + self.allowed_again = allowed_again + + +class ExportTracker: + def __init__(self): + self._exported_rooms = {} + + def add_export(self, room_id: str, seconds_elapsed_allowed: int) -> None: + allowed_again = datetime.now() + timedelta(seconds=seconds_elapsed_allowed) + self._exported_rooms[room_id] = RoomExport(room_id, allowed_again) + + def check_allowed(self, room_id): + if self.get_export(room_id): + return datetime.now() >= self._exported_rooms[room_id].allowed_again + return True + + def get_export(self, room_id: str) -> RoomExport | None: + return copy.deepcopy(self._exported_rooms.get(room_id)) + + +class MatrixBotCallbacks: + def __init__(self, client: AsyncClient, config: dict): + self.client = client + self.config = config + self.logger = logging.getLogger('MatrixGPT').getChild('MatrixBotCallbacks') + self.startup_ts = time.time() * 1000 + self.seen_events = {} + self.exports = ExportTracker() + self.zip_temp_dir = Path(tempfile.mkdtemp()) + + async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None: + """ + Since the InviteMemberEvent is fired for every m.room.member state received + in a sync response's `rooms.invite` section, we will receive some that are + not actually our own invite event (such as the inviter's membership). + This makes sure we only call `callbacks.invite` with our own invite events. + """ + # event.source.get('origin_server_ts') > self.startup_time + if event.state_key == self.client.user_id: + self.logger.info(f"Got invite to {room.room_id} from {event.sender}.") + + # Attempt to join 3 times before giving up + for attempt in range(3): + result = await self.client.join(room.room_id) + if isinstance(result, JoinError): + self.logger.error(f"Error joining room {room.room_id} (attempt {attempt}): {result.message}") + else: + self.logger.info(f"Joined via invite: {room.room_id}") + return + else: + self.logger.error("Unable to join room: %s", room.room_id) + + async def handle_message(self, room: MatrixRoom, event: RoomMessageText) -> None: + # Extract the message text + await self.client.room_read_markers(room.room_id, event.event_id, event.event_id) + + # Ignore messages from ourselves + if event.sender == self.client.user_id: + return + + if event.server_timestamp < self.startup_ts: + self.logger.debug(f'Skipping event as it was sent before startup time: {event.event_id}') + return + if event.event_id in list(self.seen_events.keys()): + self.logger.debug(f'Skipping seen event: {event.event_id}') + return + + msg = event.body.strip().strip('\n') + + if msg == self.config['command_prefix']: + self.logger.info(f"Export for {room.room_id} requested by {event.sender}") + + if not self.exports.check_allowed(room.room_id): + last_export = self.exports.get_export(room.room_id) + time_diff = last_export.allowed_again - datetime.now() + minutes_until_future = time_diff.total_seconds() // 60 + content = { + "body": f'Cannot export again for {minutes_until_future} minutes.', + "m.relates_to": { + "m.in_reply_to": { + "event_id": event.event_id + } + }, + "msgtype": "m.text" + } + await self.client.room_send( + room.room_id, + 'm.room.message', + content + ) + self.logger.info(f"Rejected export in {room.room_id}, {minutes_until_future} minutes remaining.") + return + + zipfile_name = f'{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip' + start_msg = await self.client.room_send( + room_id=room.room_id, + message_type="m.room.message", + content={"msgtype": "m.text", "format": "org.matrix.custom.html", "body": 'Exporting room...', 'formatted_body': 'Exporting room...'}, + ) + + try: + sync_response = await self.client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}}) + if not isinstance(sync_response, SyncResponse): + self.logger.error(f'Failed to sync room "{room.room_id}": {sync_response}') + raise + start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch + + room_events = [ + await fetch_events(self.client, room.room_id, start_token, MessageDirection.back), + await fetch_events(self.client, room.room_id, start_token, MessageDirection.front), + ] + + temp_dir = Path(tempfile.mkdtemp()) + export_data = [] + self.logger.debug(f'Writing export for {room.room_id} to {temp_dir}') + + for direction in room_events: + for event in direction.all(): + if isinstance(event, RoomMessageMedia): + media_data = await download_mxc(self.client, event.url) + filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}') + event.source["_file_path"] = filename + async with aiofiles.open(temp_dir / filename, "wb") as f_media: + await f_media.write(media_data) + export_data.extend(direction.jsonify()) + + async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f: + await f.write(json.dumps(export_data, ensure_ascii=False, indent=4)) + + zipfile_path = self.zip_temp_dir / zipfile_name + self.logger.debug(f'Creating zip: {zipfile_path}') + await zip_directory(temp_dir, zipfile_path) + shutil.rmtree(temp_dir) + + self.logger.debug(f'Uploading: {zipfile_path}') + r2_upload = await upload_to_r2( + zipfile_path, zipfile_name, + self.config['r2']['bucket_name'], + self.config['r2']['accountid'], + self.config['r2']['access_key_id'], + self.config['r2']['access_key_secret'] + ) + await aos.remove(zipfile_path) + + self.logger.info(f'Export for {room.room_id} completed.') + self.exports.add_export(room.room_id, self.config['export_interval']) + + pub_url = f'{self.config["r2"]["pub_url"]}/{zipfile_name}' + formatted_body = f'Export complete!
{pub_url}
This file will be deleted after 24 hours.' + content = { + "body": f'Export complete!\n{pub_url}\nThis file will be deleted after 24 hours.', + "formatted_body": formatted_body, + "format": "org.matrix.custom.html", + "m.relates_to": { + "m.in_reply_to": { + "event_id": start_msg.event_id + } + }, + "msgtype": "m.text" + } + await self.client.room_send( + room.room_id, + 'm.room.message', + content + ) + except Exception as e: + self.logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}') + self.exports.add_export(room.room_id, self.config['export_interval'] // 2) + content = { + "body": f'❌ Failed to export room!', + "m.relates_to": { + "m.in_reply_to": { + "event_id": start_msg.event_id + } + }, + "msgtype": "m.text" + } + await self.client.room_send( + room.room_id, + 'm.room.message', + content + ) diff --git a/exporter/config.py b/exporter/config.py new file mode 100644 index 0000000..826cdfe --- /dev/null +++ b/exporter/config.py @@ -0,0 +1,16 @@ +DEFAULT_CONFIG = { + 'auth': { + 'username': None, + 'password': None, + 'homeserver': None, + }, + 'command_prefix': '!export', + 'export_interval': 3600, + 'r2': { + 'bucket_name': None, + 'accountid': None, + 'access_key_id': None, + 'access_key_secret': None, + 'pub_url': None, + } +} diff --git a/exporter/export.py b/exporter/export.py new file mode 100644 index 0000000..72435b1 --- /dev/null +++ b/exporter/export.py @@ -0,0 +1,115 @@ +import asyncio +import copy +import os +import zipfile +from pathlib import Path +from typing import Union, List +from urllib.parse import urlparse + +import aioboto3 +import aiofiles +from nio import ( + RedactedEvent, + RoomMessageFormatted, + RoomMessageMedia, + Event, + RoomMessagesError, + AsyncClient, + MessageDirection +) + +""" +Inspired by https://github.com/russelldavies/matrix-archive/blob/master/matrix-archive.py +""" + + +class RoomEvents: + def __init__(self, room_id: str): + self._room_id = room_id + self._events: List[Event] = [] + + def add_event(self, event): + self._events.append(event) + + def all(self): + return copy.deepcopy(self._events) + + def jsonify(self): + dump = [x.source for x in self._events] + return dump + + +def is_valid_event(event: Event): + return isinstance(event, (RoomMessageFormatted, RedactedEvent, RoomMessageMedia)) + + +async def download_mxc(client: AsyncClient, url: str) -> bytes: + mxc = urlparse(url) + response = await client.download(mxc.netloc, mxc.path.strip("/")) + if hasattr(response, "body"): + return response.body + else: + return b'' + + +async def zip_directory(directory_path, zip_path): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _zip_directory, directory_path, zip_path) + + +def _zip_directory(directory_path, zip_path): + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: + for root, dirs, files in os.walk(directory_path): + for file in files: + file_path = str(os.path.join(root, file)) + zip_file.write(file_path, os.path.relpath(file_path, directory_path)) + + +async def upload_to_r2(source_file: Union[str, Path], destination_file: str, bucket_name: str, accountid: str, access_key_id: str, access_key_secret: str): + session = aioboto3.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=access_key_secret, + ) + async with session.resource( + service_name="s3", + endpoint_url=f'https://{accountid}.r2.cloudflarestorage.com', + ) as s3: + bucket = await s3.Bucket(bucket_name) + async with aiofiles.open(source_file, 'rb') as f: + return await bucket.upload_fileobj(f, destination_file) + + +def trim_filename(filename, max_length=255): + extension = os.path.splitext(filename)[1] + max_length -= len(extension) + if len(filename) > max_length: + filename = filename[:max_length] + filename += extension + + return filename + + +async def fetch_events(client: AsyncClient, room_id: str, start_token: str, direction: MessageDirection) -> RoomEvents: + room_events = RoomEvents(room_id) + events = [] + while True: + response = await client.room_messages( + room_id, + start=start_token, + limit=1000, + direction=direction + ) + if isinstance(response, RoomMessagesError): + raise Exception(f'Failed to read room "{room_id}": {response.message}') + if len(response.chunk) == 0: + break + events.extend(event for event in response.chunk) + start_token = response.end + + if direction == MessageDirection.back: + events = reversed(events) + + for event in events: + room_events.add_event(event) + + return room_events diff --git a/exporter/matrix.py b/exporter/matrix.py new file mode 100644 index 0000000..b516816 --- /dev/null +++ b/exporter/matrix.py @@ -0,0 +1,71 @@ +import json +import logging +import os +from pathlib import Path + +from nio import AsyncClient, AsyncClientConfig, LoginError +from nio import LoginResponse + + +class MatrixClientHelper: + """ + A simple wrapper class for common matrix-nio actions. + """ + + # Encryption is disabled because it's handled by Pantalaimon. + client_config = AsyncClientConfig(max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=False) + + def __init__(self, user_id: str, passwd: str, homeserver: str, store_path: str, device_name: str = 'MatrixGPT'): + self.user_id = user_id + self.passwd = passwd + + self.homeserver = homeserver + if not (self.homeserver.startswith("https://") or self.homeserver.startswith("http://")): + self.homeserver = "https://" + self.homeserver + + self.store_path = Path(store_path).absolute().expanduser().resolve() + self.store_path.mkdir(parents=True, exist_ok=True) + self.auth_file = self.store_path / (device_name.lower() + '.json') + + self.device_name = device_name + self.client = AsyncClient(homeserver=self.homeserver, user=self.user_id, config=self.client_config, device_id=device_name) + self.logger = logging.getLogger('MatrixGPT').getChild('MatrixClientHelper') + + async def login(self) -> tuple[bool, LoginError] | tuple[bool, LoginResponse | None]: + try: + # If there are no previously-saved credentials, we'll use the password. + if not os.path.exists(self.auth_file): + self.logger.info('Using username/password.') + resp = await self.client.login(self.passwd, device_name=self.device_name) + + # Check that we logged in successfully. + if isinstance(resp, LoginResponse): + self.write_details_to_disk(resp) + return True, resp + else: + return False, resp + else: + # Otherwise the config file exists, so we'll use the stored credentials. + self.logger.info('Using cached credentials.') + with open(self.auth_file, "r") as f: + config = json.load(f) + client = AsyncClient(config["homeserver"]) + client.access_token = config["access_token"] + client.user_id = config["user_id"] + client.device_id = config["device_id"] + resp = await self.client.login(self.passwd, device_name=self.device_name) + if isinstance(resp, LoginResponse): + self.write_details_to_disk(resp) + return True, resp + else: + return False, resp + except Exception: + return False, None + + def write_details_to_disk(self, resp: LoginResponse) -> None: + with open(self.auth_file, "w") as f: + json.dump({"homeserver": self.homeserver, + "user_id": resp.user_id, + "device_id": resp.device_id, + "access_token": resp.access_token, + }, f) diff --git a/main.py b/main.py new file mode 100644 index 0000000..22cf81b --- /dev/null +++ b/main.py @@ -0,0 +1,58 @@ +import argparse +import asyncio +import logging +import os +from pathlib import Path + +import yaml +from nio import InviteMemberEvent, RoomMessageText + +from exporter.callbacks import MatrixBotCallbacks +from exporter.config import DEFAULT_CONFIG +from exporter.matrix import MatrixClientHelper + +SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__)) + + +async def main(args): + logging.basicConfig() + logger = logging.getLogger('MatrixGPT') + + with open(args.config) as stream: + try: + yaml_config = yaml.safe_load(stream) + except yaml.YAMLError as exc: + logger.critical(f'Failed to load config: {exc}') + quit(1) + + config = DEFAULT_CONFIG | yaml_config + + logger.setLevel(logging.DEBUG) + + matrix_helper = MatrixClientHelper( + user_id=config['auth']['username'], + passwd=config['auth']['password'], + homeserver=config['auth']['homeserver'], + store_path=config['store_path'], + ) + client = matrix_helper.client + + login_success, login_response = await matrix_helper.login() + if not login_success: + print(login_response) + quit(1) + logger.info(f'Logged in as {client.user_id}') + + callbacks = MatrixBotCallbacks(client, config) + + client.add_event_callback(callbacks.handle_invite, InviteMemberEvent) + client.add_event_callback(callbacks.handle_message, RoomMessageText) + + await client.sync_forever(timeout=30000) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Export a Matrix room.') + parser.add_argument('--config', default=Path(SCRIPT_PATH) / 'config.yml') + args = parser.parse_args() + asyncio.run(main(args)) diff --git a/matrix-room-exporter.service b/matrix-room-exporter.service new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7b2af93 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +matrix-nio==0.24.0 +aioboto3==12.3.0 +pyyaml==6.0.1 \ No newline at end of file