From 62d6e4c2a2ccc3f9f3893ce4a1b970aee7bde9cf Mon Sep 17 00:00:00 2001 From: Cyberes Date: Thu, 4 Apr 2024 00:08:36 -0600 Subject: [PATCH] adjust startup and how events are saved --- exporter/callbacks.py | 254 +++++++++++++++++++++--------------------- main.py | 3 +- 2 files changed, 128 insertions(+), 129 deletions(-) diff --git a/exporter/callbacks.py b/exporter/callbacks.py index df5f85e..f4fa216 100644 --- a/exporter/callbacks.py +++ b/exporter/callbacks.py @@ -45,11 +45,10 @@ class MatrixBotCallbacks: self.config = config self.logger = logging.getLogger('ExportBot').getChild('MatrixBotCallbacks') self.startup_ts = time.time() * 1000 - self.seen_events = {} self.exports = ExportTracker() self.zip_temp_dir = Path(tempfile.mkdtemp()) - async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None: + async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent): """ Since the InviteMemberEvent is fired for every m.room.member state received in a sync response's `rooms.invite` section, we will receive some that are @@ -71,140 +70,141 @@ class MatrixBotCallbacks: else: self.logger.error("Unable to join room: %s", room.room_id) - async def handle_message(self, room: MatrixRoom, event: RoomMessageText) -> None: - # Extract the message text - await self.client.room_read_markers(room.room_id, event.event_id, event.event_id) - - # Ignore messages from ourselves + async def handle_message(self, room: MatrixRoom, event: RoomMessageText): + msg = event.body.strip().strip('\n') + if msg == "** Unable to decrypt: The sender's device has not sent us the keys for this message. **": + self.logger.debug(f'Unable to decrypt event "{event.event_id} in room {room.room_id}') + return + if msg != self.config['command_prefix']: + return + if event.server_timestamp < self.startup_ts: + return if event.sender == self.client.user_id: return - if event.server_timestamp < self.startup_ts: - # self.logger.debug(f'Skipping event as it was sent before startup time: {event.event_id}') - return - if event.event_id in list(self.seen_events.keys()): - self.logger.debug(f'Skipping seen event: {event.event_id}') - return + self.logger.info(f"Export for {room.room_id} requested by {event.sender}") + await self.client.room_read_markers(room.room_id, event.event_id, event.event_id) - msg = event.body.strip().strip('\n') - - if msg == self.config['command_prefix']: - self.logger.info(f"Export for {room.room_id} requested by {event.sender}") - - if not self.exports.check_allowed(room.room_id): - last_export = self.exports.get_export(room.room_id) - time_diff = last_export.allowed_again - datetime.now() - minutes_until_future = time_diff.total_seconds() // 60 - content = { - "body": f'Cannot export again for {minutes_until_future} minutes.', - "m.relates_to": { - "m.in_reply_to": { - "event_id": event.event_id - } - }, - "msgtype": "m.text" - } - await self.client.room_send( - room.room_id, - 'm.room.message', - content - ) - self.logger.info(f"Rejected export in {room.room_id}, {minutes_until_future} minutes remaining.") - return - - zipfile_name = f'{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip' - zipfile_path = None - temp_dir = None - - start_msg = await self.client.room_send( - room_id=room.room_id, - message_type="m.room.message", - content={"msgtype": "m.text", "format": "org.matrix.custom.html", "body": 'Exporting room...', 'formatted_body': 'Exporting room...'}, + if not self.exports.check_allowed(room.room_id): + last_export = self.exports.get_export(room.room_id) + time_diff = last_export.allowed_again - datetime.now() + minutes_until_future = time_diff.total_seconds() // 60 + content = { + "body": f'Cannot export again for {minutes_until_future} minutes.', + "m.relates_to": { + "m.in_reply_to": { + "event_id": event.event_id + } + }, + "msgtype": "m.text" + } + await self.client.room_send( + room.room_id, + 'm.room.message', + content ) + self.logger.info(f"Rejected export in {room.room_id}, {minutes_until_future} minutes remaining.") + return - try: - sync_response = await self.client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}}) - if not isinstance(sync_response, SyncResponse): - self.logger.error(f'Failed to sync room "{room.room_id}": {sync_response}') - raise - start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch + export_created = int(time.time()) + zipfile_name = f'{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip' + zipfile_path = None + temp_dir = None - room_events = [ - await fetch_events(self.client, room.room_id, start_token, MessageDirection.back), - await fetch_events(self.client, room.room_id, start_token, MessageDirection.front), - ] + start_msg = await self.client.room_send( + room_id=room.room_id, + message_type="m.room.message", + content={"msgtype": "m.text", "format": "org.matrix.custom.html", "body": 'Exporting room...', 'formatted_body': 'Exporting room...'}, + ) - temp_dir = Path(tempfile.mkdtemp()) - export_data = [] - self.logger.debug(f'Writing export for {room.room_id} to {temp_dir}') + try: + sync_response = await self.client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}}) + if not isinstance(sync_response, SyncResponse): + self.logger.error(f'Failed to sync room "{room.room_id}": {sync_response}') + raise + start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch - for direction in room_events: - for event in direction.all(): - if isinstance(event, RoomMessageMedia): - media_data = await download_mxc(self.client, event.url) - filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}') - event.source["_file_path"] = filename - async with aiofiles.open(temp_dir / filename, "wb") as f_media: - await f_media.write(media_data) - export_data.extend(direction.jsonify()) + room_events = [ + await fetch_events(self.client, room.room_id, start_token, MessageDirection.back), + await fetch_events(self.client, room.room_id, start_token, MessageDirection.front), + ] - async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f: - await f.write(json.dumps(export_data, ensure_ascii=False, indent=4)) + temp_dir = Path(tempfile.mkdtemp()) + exported_events = [] + self.logger.debug(f'Writing export for {room.room_id} to {temp_dir}') - zipfile_path = self.zip_temp_dir / zipfile_name - self.logger.debug(f'Creating zip: {zipfile_path}') - await zip_directory(temp_dir, zipfile_path) + for direction in room_events: + for event in direction.all(): + if isinstance(event, RoomMessageMedia): + media_data = await download_mxc(self.client, event.url) + filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}') + event.source["exported_file_path"] = filename + async with aiofiles.open(temp_dir / filename, "wb") as f_media: + await f_media.write(media_data) + exported_events.append(event.source) + + exported_data = { + 'exporting_user': self.client.user_id, + 'created': export_created, + 'events': sorted(exported_events, key=lambda x: x['origin_server_ts']) + } + async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f: + await f.write(json.dumps(exported_data, ensure_ascii=False, indent=4)) + + zipfile_path = self.zip_temp_dir / zipfile_name + self.logger.debug(f'Creating zip: {zipfile_path}') + await zip_directory(temp_dir, zipfile_path) + shutil.rmtree(temp_dir) + + self.logger.debug(f'Uploading: {zipfile_path}') + r2_upload = await upload_to_r2( + zipfile_path, zipfile_name, + self.config['r2']['bucket_name'], + self.config['r2']['accountid'], + self.config['r2']['access_key_id'], + self.config['r2']['access_key_secret'] + ) + await aos.remove(zipfile_path) + + self.logger.info(f'Export for {room.room_id} completed') + self.exports.add_export(room.room_id, self.config['export_interval']) + + pub_url = f'{self.config["r2"]["pub_url"]}/{zipfile_name}' + formatted_body = f'Export complete!
{pub_url}
This file will be deleted after 24 hours.' + content = { + "body": f'Export complete!\n{pub_url}\nThis file will be deleted after {self.config["r2"]["retention_hrs"]} hours.', + "formatted_body": formatted_body, + "format": "org.matrix.custom.html", + "m.relates_to": { + "m.in_reply_to": { + "event_id": start_msg.event_id + } + }, + "msgtype": "m.text" + } + await self.client.room_send( + room.room_id, + 'm.room.message', + content + ) + except Exception as e: + self.logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}') + self.exports.add_export(room.room_id, self.config['export_interval'] // 2) + content = { + "body": f'❌ Failed to export room!', + "m.relates_to": { + "m.in_reply_to": { + "event_id": start_msg.event_id + } + }, + "msgtype": "m.text" + } + await self.client.room_send( + room.room_id, + 'm.room.message', + content + ) + if zipfile_path: + os.remove(zipfile_path) + if temp_dir: shutil.rmtree(temp_dir) - - self.logger.debug(f'Uploading: {zipfile_path}') - r2_upload = await upload_to_r2( - zipfile_path, zipfile_name, - self.config['r2']['bucket_name'], - self.config['r2']['accountid'], - self.config['r2']['access_key_id'], - self.config['r2']['access_key_secret'] - ) - await aos.remove(zipfile_path) - - self.logger.info(f'Export for {room.room_id} completed') - self.exports.add_export(room.room_id, self.config['export_interval']) - - pub_url = f'{self.config["r2"]["pub_url"]}/{zipfile_name}' - formatted_body = f'Export complete!
{pub_url}
This file will be deleted after 24 hours.' - content = { - "body": f'Export complete!\n{pub_url}\nThis file will be deleted after {self.config["r2"]["retention_hrs"]} hours.', - "formatted_body": formatted_body, - "format": "org.matrix.custom.html", - "m.relates_to": { - "m.in_reply_to": { - "event_id": start_msg.event_id - } - }, - "msgtype": "m.text" - } - await self.client.room_send( - room.room_id, - 'm.room.message', - content - ) - except Exception as e: - self.logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}') - self.exports.add_export(room.room_id, self.config['export_interval'] // 2) - content = { - "body": f'❌ Failed to export room!', - "m.relates_to": { - "m.in_reply_to": { - "event_id": start_msg.event_id - } - }, - "msgtype": "m.text" - } - await self.client.room_send( - room.room_id, - 'm.room.message', - content - ) - if zipfile_path: - os.remove(zipfile_path) - if temp_dir: - shutil.rmtree(temp_dir) diff --git a/main.py b/main.py index 9015575..b3b96ef 100644 --- a/main.py +++ b/main.py @@ -45,11 +45,10 @@ async def main(args): logger.info(f'Logged in as {client.user_id}') callbacks = MatrixBotCallbacks(client, config) - client.add_event_callback(callbacks.handle_invite, InviteMemberEvent) client.add_event_callback(callbacks.handle_message, RoomMessageText) - await client.sync_forever(timeout=30000) + await client.sync_forever(timeout=10000) if __name__ == '__main__':