import copy import json import logging import os import shutil import tempfile import time import traceback from datetime import datetime, timedelta from pathlib import Path import aiofiles import aiofiles.os as aos from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, RoomMessageText, SyncResponse, RoomMessageMedia, MessageDirection) from exporter.export import download_mxc, zip_directory, upload_to_r2, trim_filename, fetch_events class RoomExport: def __init__(self, room_id: str, allowed_again: datetime): self.room_id = room_id self.allowed_again = allowed_again class ExportTracker: def __init__(self): self._exported_rooms = {} def add_export(self, room_id: str, seconds_elapsed_allowed: int) -> None: allowed_again = datetime.now() + timedelta(seconds=seconds_elapsed_allowed) self._exported_rooms[room_id] = RoomExport(room_id, allowed_again) def check_allowed(self, room_id): if self.get_export(room_id): return datetime.now() >= self._exported_rooms[room_id].allowed_again return True def get_export(self, room_id: str) -> RoomExport | None: return copy.deepcopy(self._exported_rooms.get(room_id)) class MatrixBotCallbacks: def __init__(self, client: AsyncClient, config: dict): self.client = client self.config = config self.logger = logging.getLogger('ExportBot').getChild('MatrixBotCallbacks') self.startup_ts = time.time() * 1000 self.seen_events = {} self.exports = ExportTracker() self.zip_temp_dir = Path(tempfile.mkdtemp()) async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None: """ Since the InviteMemberEvent is fired for every m.room.member state received in a sync response's `rooms.invite` section, we will receive some that are not actually our own invite event (such as the inviter's membership). This makes sure we only call `callbacks.invite` with our own invite events. """ # event.source.get('origin_server_ts') > self.startup_time if event.state_key == self.client.user_id: self.logger.info(f"Got invite to {room.room_id} from {event.sender}.") # Attempt to join 3 times before giving up for attempt in range(3): result = await self.client.join(room.room_id) if isinstance(result, JoinError): self.logger.error(f"Error joining room {room.room_id} (attempt {attempt}): {result.message}") else: self.logger.info(f"Joined via invite: {room.room_id}") return else: self.logger.error("Unable to join room: %s", room.room_id) async def handle_message(self, room: MatrixRoom, event: RoomMessageText) -> None: # Extract the message text await self.client.room_read_markers(room.room_id, event.event_id, event.event_id) # Ignore messages from ourselves if event.sender == self.client.user_id: return if event.server_timestamp < self.startup_ts: # self.logger.debug(f'Skipping event as it was sent before startup time: {event.event_id}') return if event.event_id in list(self.seen_events.keys()): self.logger.debug(f'Skipping seen event: {event.event_id}') return msg = event.body.strip().strip('\n') if msg == self.config['command_prefix']: self.logger.info(f"Export for {room.room_id} requested by {event.sender}") if not self.exports.check_allowed(room.room_id): last_export = self.exports.get_export(room.room_id) time_diff = last_export.allowed_again - datetime.now() minutes_until_future = time_diff.total_seconds() // 60 content = { "body": f'Cannot export again for {minutes_until_future} minutes.', "m.relates_to": { "m.in_reply_to": { "event_id": event.event_id } }, "msgtype": "m.text" } await self.client.room_send( room.room_id, 'm.room.message', content ) self.logger.info(f"Rejected export in {room.room_id}, {minutes_until_future} minutes remaining.") return zipfile_name = f'{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip' zipfile_path = None temp_dir = None start_msg = await self.client.room_send( room_id=room.room_id, message_type="m.room.message", content={"msgtype": "m.text", "format": "org.matrix.custom.html", "body": 'Exporting room...', 'formatted_body': 'Exporting room...'}, ) try: sync_response = await self.client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}}) if not isinstance(sync_response, SyncResponse): self.logger.error(f'Failed to sync room "{room.room_id}": {sync_response}') raise start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch room_events = [ await fetch_events(self.client, room.room_id, start_token, MessageDirection.back), await fetch_events(self.client, room.room_id, start_token, MessageDirection.front), ] temp_dir = Path(tempfile.mkdtemp()) export_data = [] self.logger.debug(f'Writing export for {room.room_id} to {temp_dir}') for direction in room_events: for event in direction.all(): if isinstance(event, RoomMessageMedia): media_data = await download_mxc(self.client, event.url) filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}') event.source["_file_path"] = filename async with aiofiles.open(temp_dir / filename, "wb") as f_media: await f_media.write(media_data) export_data.extend(direction.jsonify()) async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f: await f.write(json.dumps(export_data, ensure_ascii=False, indent=4)) zipfile_path = self.zip_temp_dir / zipfile_name self.logger.debug(f'Creating zip: {zipfile_path}') await zip_directory(temp_dir, zipfile_path) shutil.rmtree(temp_dir) self.logger.debug(f'Uploading: {zipfile_path}') r2_upload = await upload_to_r2( zipfile_path, zipfile_name, self.config['r2']['bucket_name'], self.config['r2']['accountid'], self.config['r2']['access_key_id'], self.config['r2']['access_key_secret'] ) await aos.remove(zipfile_path) self.logger.info(f'Export for {room.room_id} completed') self.exports.add_export(room.room_id, self.config['export_interval']) pub_url = f'{self.config["r2"]["pub_url"]}/{zipfile_name}' formatted_body = f'Export complete!
{pub_url}
This file will be deleted after 24 hours.' content = { "body": f'Export complete!\n{pub_url}\nThis file will be deleted after {self.config["r2"]["retention_hrs"]} hours.', "formatted_body": formatted_body, "format": "org.matrix.custom.html", "m.relates_to": { "m.in_reply_to": { "event_id": start_msg.event_id } }, "msgtype": "m.text" } await self.client.room_send( room.room_id, 'm.room.message', content ) except Exception as e: self.logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}') self.exports.add_export(room.room_id, self.config['export_interval'] // 2) content = { "body": f'❌ Failed to export room!', "m.relates_to": { "m.in_reply_to": { "event_id": start_msg.event_id } }, "msgtype": "m.text" } await self.client.room_send( room.room_id, 'm.room.message', content ) if zipfile_path: os.remove(zipfile_path) if temp_dir: shutil.rmtree(temp_dir)