From 54eb0253856060a60229f0e65fd655e8f00c07ee Mon Sep 17 00:00:00 2001 From: Cyberes Date: Thu, 4 Apr 2024 15:04:13 -0600 Subject: [PATCH] fix concurrancy, add whitelist --- README.md | 21 ++-- config.yml.sample | 7 ++ exporter/callbacks.py | 211 ++++++++----------------------------- exporter/config.py | 2 + exporter/export_tracker.py | 52 +++++++++ exporter/matrix.py | 18 +++- exporter/run.py | 147 ++++++++++++++++++++++++++ main.py | 9 +- requirements.txt | 4 +- 9 files changed, 291 insertions(+), 180 deletions(-) create mode 100644 exporter/export_tracker.py create mode 100644 exporter/run.py diff --git a/README.md b/README.md index 2b88b65..269b498 100644 --- a/README.md +++ b/README.md @@ -8,16 +8,17 @@ download. ### Install 1. `pip install -r requirements.txt` -2. `cp config.yml.sample config.yml` -3. [Create a Cloudflare R2 bucket](https://developers.cloudflare.com/r2/get-started/#2-create-a-bucket). -4. Enable [public access](https://developers.cloudflare.com/r2/buckets/public-buckets/). -5. Create an [access key](https://developers.cloudflare.com/r2/api/s3/tokens/). -6. Configure `config.yml` with your R2 credentials. -7. Set up an [object lifecycle rule](https://developers.cloudflare.com/r2/buckets/object-lifecycles/) to auto-delete old - exports from your bucket. 24 hours is a good choice. -8. Set up a [matrix-org/pantalaimon](https://github.com/matrix-org/pantalaimon) for encrypted sync. -9. Create a new Matrix user and configure it in `config.yml`. -10. Start the bot with `python3 main.py` +2. `sudo apt install redis-server && sudo systemctl enable --now redis-server` +3`cp config.yml.sample config.yml` +4. [Create a Cloudflare R2 bucket](https://developers.cloudflare.com/r2/get-started/#2-create-a-bucket). +5. Enable [public access](https://developers.cloudflare.com/r2/buckets/public-buckets/). +6. Create an [access key](https://developers.cloudflare.com/r2/api/s3/tokens/). +7. Configure `config.yml` with your R2 credentials. +8. Set up an [object lifecycle rule](https://developers.cloudflare.com/r2/buckets/object-lifecycles) to auto-delete old exports from your bucket. 24 hours is a good choice. +9. Set up a [matrix-org/pantalaimon](https://github.com/matrix-org/pantalaimon) for encrypted sync. +10. Create a new Matrix user and configure it in `config.yml` +11. Add your username to `allowed_to_export` in `config.yml` +12. Start the bot with `python3 main.py` ### Use diff --git a/config.yml.sample b/config.yml.sample index 82d7e62..82971a4 100644 --- a/config.yml.sample +++ b/config.yml.sample @@ -12,6 +12,13 @@ command_prefix: '!export' # If an export failed, the interval will be half of this value. export_interval: 3600 +# How many rooms will be exported concurrently. +concurrent_exports: 3 + +# Whitelist of usernames that are allowed to use the export bot. +allowed_to_export: + - '@bob:example.com' + r2: bucket_name: export-bot accountid: 12345 diff --git a/exporter/callbacks.py b/exporter/callbacks.py index 114b8f6..5528b37 100644 --- a/exporter/callbacks.py +++ b/exporter/callbacks.py @@ -1,55 +1,26 @@ -import copy -import json +import asyncio import logging -import os -import re -import shutil -import tempfile import time -import traceback -from datetime import datetime, timedelta -from pathlib import Path +from datetime import datetime -import aiofiles -import aiofiles.os as aos -from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, RoomMessageText, SyncResponse, RoomMessageMedia, MessageDirection) +import redis +from nio import InviteMemberEvent, JoinError, MatrixRoom, RoomMessageText, RoomSendResponse -from exporter.export import download_mxc, zip_directory, upload_to_r2, trim_filename, fetch_events - -CLEAN_ROOM_NAME_RE = re.compile('[^a-zA-Z0-9]') - - -class RoomExport: - def __init__(self, room_id: str, allowed_again: datetime): - self.room_id = room_id - self.allowed_again = allowed_again - - -class ExportTracker: - def __init__(self): - self._exported_rooms = {} - - def add_export(self, room_id: str, seconds_elapsed_allowed: int) -> None: - allowed_again = datetime.now() + timedelta(seconds=seconds_elapsed_allowed) - self._exported_rooms[room_id] = RoomExport(room_id, allowed_again) - - def check_allowed(self, room_id): - if self.get_export(room_id): - return datetime.now() >= self._exported_rooms[room_id].allowed_again - return True - - def get_export(self, room_id: str) -> RoomExport | None: - return copy.deepcopy(self._exported_rooms.get(room_id)) +from exporter.export_tracker import ExportTracker +from exporter.matrix import MatrixClientHelper +from exporter.run import run_export_thread class MatrixBotCallbacks: - def __init__(self, client: AsyncClient, config: dict): - self.client = client + def __init__(self, client: MatrixClientHelper, config: dict): + self.client = client.client + self.client_helper = client self.config = config self.logger = logging.getLogger('ExportBot').getChild('MatrixBotCallbacks') self.startup_ts = time.time() * 1000 self.exports = ExportTracker() - self.zip_temp_dir = Path(tempfile.mkdtemp()) + self.sem = asyncio.Semaphore(config['concurrent_exports']) + self.redis = redis.Redis(host='localhost', port=6379, db=3) async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent): """ @@ -58,46 +29,52 @@ class MatrixBotCallbacks: not actually our own invite event (such as the inviter's membership). This makes sure we only call `callbacks.invite` with our own invite events. """ - # event.source.get('origin_server_ts') > self.startup_time + + if event.sender not in self.config['allowed_to_export']: + self.logger.debug(f'Rejected invite from {event.sender}') + return + if event.state_key == self.client.user_id: self.logger.info(f"Got invite to {room.room_id} from {event.sender}.") - # Attempt to join 3 times before giving up - for attempt in range(3): - result = await self.client.join(room.room_id) - if isinstance(result, JoinError): - self.logger.error(f"Error joining room {room.room_id} (attempt {attempt}): {result.message}") - else: - self.logger.info(f"Joined via invite: {room.room_id}") - return + # Attempt to join 3 times before giving up + for attempt in range(3): + result = await self.client.join(room.room_id) + if isinstance(result, JoinError): + self.logger.error(f"Error joining room {room.room_id} (attempt {attempt}): {result.message}") else: - self.logger.error("Unable to join room: %s", room.room_id) + self.logger.info(f"Joined via invite: {room.room_id}") + return + else: + self.logger.error("Unable to join room: %s", room.room_id) - async def handle_message(self, room: MatrixRoom, event: RoomMessageText): - msg = event.body.strip().strip('\n') + async def handle_message(self, room: MatrixRoom, requestor_event: RoomMessageText): + msg = requestor_event.body.strip().strip('\n') if msg == "** Unable to decrypt: The sender's device has not sent us the keys for this message. **": - self.logger.debug(f'Unable to decrypt event "{event.event_id} in room {room.room_id}') + self.logger.debug(f'Unable to decrypt event "{requestor_event.event_id} in room {room.room_id}') return if msg != self.config['command_prefix']: return - if event.server_timestamp < self.startup_ts: + if requestor_event.server_timestamp < self.startup_ts: return - if event.sender == self.client.user_id: + if requestor_event.sender == self.client.user_id: + return + if requestor_event.sender not in self.config['allowed_to_export']: + await self.client_helper.react_to_event(room.room_id, requestor_event.event_id, '❌') return - requestor = event.sender - self.logger.info(f'Export for "{room.name}" ({room.room_id}) requested by {requestor}') - await self.client.room_read_markers(room.room_id, event.event_id, event.event_id) + self.logger.info(f'Export for "{room.name}" ({room.room_id}) requested by {requestor_event.sender}') + await self.client.room_read_markers(room.room_id, requestor_event.event_id, requestor_event.event_id) if not self.exports.check_allowed(room.room_id): last_export = self.exports.get_export(room.room_id) time_diff = last_export.allowed_again - datetime.now() - minutes_until_future = time_diff.total_seconds() // 60 + minutes_until_future = int(time_diff.total_seconds() / 60) content = { "body": f'Cannot export again for {minutes_until_future} minutes.', "m.relates_to": { "m.in_reply_to": { - "event_id": event.event_id + "event_id": requestor_event.event_id } }, "msgtype": "m.text" @@ -110,114 +87,18 @@ class MatrixBotCallbacks: self.logger.info(f'Rejected export in {room.room_id}, {minutes_until_future} minutes remaining.') return - export_created = int(time.time()) - clean_room_name_str = '' - if room.name: - clean_room_name = CLEAN_ROOM_NAME_RE.sub('', room.name) - clean_room_name_str = f'{clean_room_name}-' - zipfile_name = f'{clean_room_name_str}{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip' - zipfile_path = None - temp_dir = None + if room.room_id.encode() in self.redis.lrange('in_progress', 0, -1): + return + self.redis.lpush('in_progress', room.room_id) start_msg = await self.client.room_send( room_id=room.room_id, message_type="m.room.message", content={"msgtype": "m.text", "format": "org.matrix.custom.html", "body": 'Exporting room...', 'formatted_body': 'Exporting room...'}, ) + if not isinstance(start_msg, RoomSendResponse): + self.logger.error(f'Failed to export room {room.room_id}: {start_msg}') + return - try: - sync_response = await self.client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}}) - if not isinstance(sync_response, SyncResponse): - self.logger.error(f'Failed to sync room "{room.room_id}": {sync_response}') - raise - start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch - - room_events = [ - await fetch_events(self.client, room.room_id, start_token, MessageDirection.back), - await fetch_events(self.client, room.room_id, start_token, MessageDirection.front), - ] - - temp_dir = Path(tempfile.mkdtemp()) - exported_events = [] - self.logger.debug(f'Writing export for {room.room_id} to {temp_dir}') - - for direction in room_events: - for event in direction.all(): - if isinstance(event, RoomMessageMedia): - media_data = await download_mxc(self.client, event.url) - filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}') - event.source["exported_file_path"] = filename - async with aiofiles.open(temp_dir / filename, "wb") as f_media: - await f_media.write(media_data) - exported_events.append(event.source) - - exported_data = { - 'room': { - 'id': room.room_id, - 'name': room.name - }, - 'requesting_user': requestor, - 'exporting_user': self.client.user_id, - 'created': export_created, - 'events': sorted(exported_events, key=lambda x: x['origin_server_ts']) - } - async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f: - await f.write(json.dumps(exported_data, ensure_ascii=False, indent=4)) - - zipfile_path = self.zip_temp_dir / zipfile_name - self.logger.debug(f'Creating zip: {zipfile_path}') - await zip_directory(temp_dir, zipfile_path) - shutil.rmtree(temp_dir) - - self.logger.debug(f'Uploading: {zipfile_path}') - r2_upload = await upload_to_r2( - zipfile_path, zipfile_name, - self.config['r2']['bucket_name'], - self.config['r2']['accountid'], - self.config['r2']['access_key_id'], - self.config['r2']['access_key_secret'] - ) - await aos.remove(zipfile_path) - - self.logger.info(f'Export for {room.room_id} completed') - self.exports.add_export(room.room_id, self.config['export_interval']) - - pub_url = f'{self.config["r2"]["pub_url"]}/{zipfile_name}' - formatted_body = f'Export complete!
{pub_url}
This file will be deleted after 24 hours.' - content = { - "body": f'Export complete!\n{pub_url}\nThis file will be deleted after {self.config["r2"]["retention_hrs"]} hours.', - "formatted_body": formatted_body, - "format": "org.matrix.custom.html", - "m.relates_to": { - "m.in_reply_to": { - "event_id": start_msg.event_id - } - }, - "msgtype": "m.text" - } - await self.client.room_send( - room.room_id, - 'm.room.message', - content - ) - except Exception as e: - self.logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}') - self.exports.add_export(room.room_id, self.config['export_interval'] // 2) - content = { - "body": f'❌ Failed to export room!', - "m.relates_to": { - "m.in_reply_to": { - "event_id": start_msg.event_id - } - }, - "msgtype": "m.text" - } - await self.client.room_send( - room.room_id, - 'm.room.message', - content - ) - if zipfile_path: - os.remove(zipfile_path) - if temp_dir: - shutil.rmtree(temp_dir) + await self.sem.acquire() + task = asyncio.create_task(run_export_thread(room, requestor_event, self.client, self.config, start_msg, self.sem)) diff --git a/exporter/config.py b/exporter/config.py index cca9e00..fec8f6e 100644 --- a/exporter/config.py +++ b/exporter/config.py @@ -6,6 +6,8 @@ DEFAULT_CONFIG = { }, 'command_prefix': '!export', 'export_interval': 3600, + 'concurrent_exports': 3, + 'allowed_to_export': [], 'r2': { 'bucket_name': None, 'accountid': None, diff --git a/exporter/export_tracker.py b/exporter/export_tracker.py new file mode 100644 index 0000000..6698cd7 --- /dev/null +++ b/exporter/export_tracker.py @@ -0,0 +1,52 @@ +import json +from datetime import datetime, timedelta + +import redis + + +class RoomExport: + def __init__(self, room_id: str, allowed_again: datetime): + self.room_id = room_id + self.allowed_again = allowed_again + + def to_json(self): + return json.dumps({ + 'room_id': self.room_id, + 'allowed_again': self.allowed_again.isoformat() + }) + + @classmethod + def from_json(cls, data): + data = json.loads(data) + return cls(data['room_id'], datetime.fromisoformat(data['allowed_again'])) + + +class ExportTracker: + def __init__(self): + self._redis = redis.Redis(host='localhost', port=6379, db=3) + + def add_export(self, room_id: str, seconds_elapsed_allowed: int) -> None: + allowed_again = datetime.now() + timedelta(seconds=seconds_elapsed_allowed) + room_export = RoomExport(room_id, allowed_again) + self._redis.set(room_id, room_export.to_json()) + + def check_allowed(self, room_id): + room_export = self.get_export(room_id) + if room_export: + return datetime.now() >= room_export.allowed_again + return True + + def get_export(self, room_id: str) -> RoomExport | None: + data = self._redis.get(room_id) + if data: + return RoomExport.from_json(data.decode('utf-8')) + return None + + def all(self): + keys = self._redis.keys('*') + exports = [] + for key in keys: + export = self.get_export(key.decode('utf-8')) + if export: + exports.append(export) + return exports diff --git a/exporter/matrix.py b/exporter/matrix.py index 908f141..32280a1 100644 --- a/exporter/matrix.py +++ b/exporter/matrix.py @@ -2,8 +2,9 @@ import json import logging import os from pathlib import Path +from typing import Union -from nio import AsyncClient, AsyncClientConfig, LoginError +from nio import AsyncClient, AsyncClientConfig, LoginError, Response, ErrorResponse from nio import LoginResponse @@ -69,3 +70,18 @@ class MatrixClientHelper: "device_id": resp.device_id, "access_token": resp.access_token, }, f) + + async def react_to_event(self, room_id: str, event_id: str, reaction_text: str, extra_error: str = False, extra_msg: str = False) -> Union[Response, ErrorResponse]: + content = { + "m.relates_to": { + "rel_type": "m.annotation", + "event_id": event_id, + "key": reaction_text + }, + "m.matrixbot": {} + } + if extra_error: + content["m.matrixbot"]["error"] = str(extra_error) + if extra_msg: + content["m.matrixbot"]["msg"] = str(extra_msg) + return await self.client.room_send(room_id, "m.reaction", content, ignore_unverified_devices=True) diff --git a/exporter/run.py b/exporter/run.py new file mode 100644 index 0000000..b6e89dd --- /dev/null +++ b/exporter/run.py @@ -0,0 +1,147 @@ +import json +import logging +import os +import re +import shutil +import tempfile +import time +import traceback +from asyncio import Semaphore +from io import BytesIO +from pathlib import Path + +import aiofiles +import aiofiles.os as aos +import redis +from PIL import Image +from nio import MatrixRoom, SyncResponse, RoomMessageMedia, MessageDirection, AsyncClient, RoomMessageText, RoomSendResponse + +from exporter.export import download_mxc, zip_directory, upload_to_r2, trim_filename, fetch_events +from exporter.export_tracker import ExportTracker + +CLEAN_ROOM_NAME_RE = re.compile('[^a-zA-Z0-9]') + + +async def run_export_thread(room: MatrixRoom, requestor_event: RoomMessageText, client: AsyncClient, config: dict, start_msg: RoomSendResponse, sem: Semaphore): + logger = logging.getLogger('ExportBot').getChild('RunExportThread') + + export_created = int(time.time()) + clean_room_name_str = '' + if room.name: + clean_room_name = CLEAN_ROOM_NAME_RE.sub('', room.name) + clean_room_name_str = f'{clean_room_name}-' + zipfile_name = f'{clean_room_name_str}{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip' + zipfile_path = None + temp_dir = None + + r = redis.Redis(host='localhost', port=6379, db=3) + exports = ExportTracker() + zip_temp_dir = Path(tempfile.mkdtemp()) + + try: + sync_response = await client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}}) + if not isinstance(sync_response, SyncResponse): + logger.error(f'Failed to sync room "{room.room_id}": {sync_response}') + raise + start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch + + room_events = [ + await fetch_events(client, room.room_id, start_token, MessageDirection.back), + await fetch_events(client, room.room_id, start_token, MessageDirection.front), + ] + + temp_dir = Path(tempfile.mkdtemp()) + exported_events = [] + logger.debug(f'Writing export for {room.room_id} to {temp_dir}') + + for direction in room_events: + for event in direction.all(): + if isinstance(event, RoomMessageMedia): + media_data = await download_mxc(client, event.url) + filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}') + event.source["exported_file_path"] = filename + async with aiofiles.open(temp_dir / filename, "wb") as f_media: + await f_media.write(media_data) + exported_events.append(event.source) + + room_avatar_data = await download_mxc(client, room.room_avatar_url) + image = Image.open(BytesIO(room_avatar_data)) + room_avatar_filename = 'room_avatar.' + str(image.format).lower() + async with aiofiles.open(temp_dir / room_avatar_filename, "wb") as f_media: + await f_media.write(room_avatar_data) + + exported_data = { + 'room': { + 'id': room.room_id, + 'name': room.name, + 'avatar_path': room_avatar_filename + }, + 'users': room.names, + 'requesting_user': requestor_event.sender, + 'exporting_user': client.user_id, + 'created': export_created, + 'events': sorted(exported_events, key=lambda x: x['origin_server_ts']) + } + async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f: + await f.write(json.dumps(exported_data, ensure_ascii=False, indent=4)) + + zipfile_path = zip_temp_dir / zipfile_name + logger.debug(f'Creating zip: {zipfile_path}') + await zip_directory(temp_dir, zipfile_path) + shutil.rmtree(temp_dir) + + logger.debug(f'Uploading: {zipfile_path}') + r2_upload = await upload_to_r2( + zipfile_path, zipfile_name, + config['r2']['bucket_name'], + config['r2']['accountid'], + config['r2']['access_key_id'], + config['r2']['access_key_secret'] + ) + await aos.remove(zipfile_path) + + logger.info(f'Export for {room.room_id} completed') + exports.add_export(room.room_id, config['export_interval']) + + pub_url = f'{config["r2"]["pub_url"]}/{zipfile_name}' + formatted_body = f'Export complete!
{pub_url}
This file will be deleted after 24 hours.' + content = { + "body": f'Export complete!\n{pub_url}\nThis file will be deleted after {config["r2"]["retention_hrs"]} hours.', + "formatted_body": formatted_body, + "format": "org.matrix.custom.html", + "m.relates_to": { + "m.in_reply_to": { + "event_id": start_msg.event_id + } + }, + "msgtype": "m.text" + } + await client.room_send( + room.room_id, + 'm.room.message', + content + ) + except Exception as e: + logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}') + exports.add_export(room.room_id, config['export_interval'] // 2) + content = { + "body": f'❌ Failed to export room!', + "m.relates_to": { + "m.in_reply_to": { + "event_id": start_msg.event_id + } + }, + "msgtype": "m.text" + } + await client.room_send( + room.room_id, + 'm.room.message', + content + ) + if zipfile_path: + os.remove(zipfile_path) + if temp_dir: + shutil.rmtree(temp_dir) + + r.lrem('in_progress', 0, room.room_id) + sem.release() diff --git a/main.py b/main.py index 845b2cc..9349f03 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ import logging import os from pathlib import Path +import redis import yaml from nio import InviteMemberEvent, RoomMessageText @@ -28,12 +29,14 @@ async def main(args): # Really lazy way to do this config = yaml_config config['r2'] = DEFAULT_CONFIG['r2'] | yaml_config.get('r2', {}) - - print(config["r2"]["retention_hrs"]) + config['allowed_to_export'] = yaml_config.get('allowed_to_export', []) logger.setLevel(logging.DEBUG if args.debug else logging.INFO) logger.debug('Debug logging enabled!') + r = redis.Redis(host='localhost', port=6379, db=3) + r.flushdb() + matrix_helper = MatrixClientHelper( user_id=config['auth']['username'], passwd=config['auth']['password'], @@ -48,7 +51,7 @@ async def main(args): quit(1) logger.info(f'Logged in as {client.user_id}') - callbacks = MatrixBotCallbacks(client, config) + callbacks = MatrixBotCallbacks(matrix_helper, config) client.add_event_callback(callbacks.handle_invite, InviteMemberEvent) client.add_event_callback(callbacks.handle_message, RoomMessageText) diff --git a/requirements.txt b/requirements.txt index 7b2af93..facbfe4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ matrix-nio==0.24.0 aioboto3==12.3.0 -pyyaml==6.0.1 \ No newline at end of file +pyyaml==6.0.1 +redis==5.0.3 +pillow==10.3.0 \ No newline at end of file