matrix-room-exporter/exporter/callbacks.py

203 lines
8.9 KiB
Python

import copy
import json
import logging
import shutil
import tempfile
import time
import traceback
from datetime import datetime, timedelta
from pathlib import Path
import aiofiles
import aiofiles.os as aos
from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, RoomMessageText, SyncResponse, RoomMessageMedia, MessageDirection)
from exporter.export import download_mxc, zip_directory, upload_to_r2, trim_filename, fetch_events
class RoomExport:
def __init__(self, room_id: str, allowed_again: datetime):
self.room_id = room_id
self.allowed_again = allowed_again
class ExportTracker:
def __init__(self):
self._exported_rooms = {}
def add_export(self, room_id: str, seconds_elapsed_allowed: int) -> None:
allowed_again = datetime.now() + timedelta(seconds=seconds_elapsed_allowed)
self._exported_rooms[room_id] = RoomExport(room_id, allowed_again)
def check_allowed(self, room_id):
if self.get_export(room_id):
return datetime.now() >= self._exported_rooms[room_id].allowed_again
return True
def get_export(self, room_id: str) -> RoomExport | None:
return copy.deepcopy(self._exported_rooms.get(room_id))
class MatrixBotCallbacks:
def __init__(self, client: AsyncClient, config: dict):
self.client = client
self.config = config
self.logger = logging.getLogger('MatrixGPT').getChild('MatrixBotCallbacks')
self.startup_ts = time.time() * 1000
self.seen_events = {}
self.exports = ExportTracker()
self.zip_temp_dir = Path(tempfile.mkdtemp())
async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None:
"""
Since the InviteMemberEvent is fired for every m.room.member state received
in a sync response's `rooms.invite` section, we will receive some that are
not actually our own invite event (such as the inviter's membership).
This makes sure we only call `callbacks.invite` with our own invite events.
"""
# event.source.get('origin_server_ts') > self.startup_time
if event.state_key == self.client.user_id:
self.logger.info(f"Got invite to {room.room_id} from {event.sender}.")
# Attempt to join 3 times before giving up
for attempt in range(3):
result = await self.client.join(room.room_id)
if isinstance(result, JoinError):
self.logger.error(f"Error joining room {room.room_id} (attempt {attempt}): {result.message}")
else:
self.logger.info(f"Joined via invite: {room.room_id}")
return
else:
self.logger.error("Unable to join room: %s", room.room_id)
async def handle_message(self, room: MatrixRoom, event: RoomMessageText) -> None:
# Extract the message text
await self.client.room_read_markers(room.room_id, event.event_id, event.event_id)
# Ignore messages from ourselves
if event.sender == self.client.user_id:
return
if event.server_timestamp < self.startup_ts:
self.logger.debug(f'Skipping event as it was sent before startup time: {event.event_id}')
return
if event.event_id in list(self.seen_events.keys()):
self.logger.debug(f'Skipping seen event: {event.event_id}')
return
msg = event.body.strip().strip('\n')
if msg == self.config['command_prefix']:
self.logger.info(f"Export for {room.room_id} requested by {event.sender}")
if not self.exports.check_allowed(room.room_id):
last_export = self.exports.get_export(room.room_id)
time_diff = last_export.allowed_again - datetime.now()
minutes_until_future = time_diff.total_seconds() // 60
content = {
"body": f'Cannot export again for {minutes_until_future} minutes.',
"m.relates_to": {
"m.in_reply_to": {
"event_id": event.event_id
}
},
"msgtype": "m.text"
}
await self.client.room_send(
room.room_id,
'm.room.message',
content
)
self.logger.info(f"Rejected export in {room.room_id}, {minutes_until_future} minutes remaining.")
return
zipfile_name = f'{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip'
start_msg = await self.client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "format": "org.matrix.custom.html", "body": 'Exporting room...', 'formatted_body': '<i>Exporting room...</i>'},
)
try:
sync_response = await self.client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}})
if not isinstance(sync_response, SyncResponse):
self.logger.error(f'Failed to sync room "{room.room_id}": {sync_response}')
raise
start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch
room_events = [
await fetch_events(self.client, room.room_id, start_token, MessageDirection.back),
await fetch_events(self.client, room.room_id, start_token, MessageDirection.front),
]
temp_dir = Path(tempfile.mkdtemp())
export_data = []
self.logger.debug(f'Writing export for {room.room_id} to {temp_dir}')
for direction in room_events:
for event in direction.all():
if isinstance(event, RoomMessageMedia):
media_data = await download_mxc(self.client, event.url)
filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}')
event.source["_file_path"] = filename
async with aiofiles.open(temp_dir / filename, "wb") as f_media:
await f_media.write(media_data)
export_data.extend(direction.jsonify())
async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f:
await f.write(json.dumps(export_data, ensure_ascii=False, indent=4))
zipfile_path = self.zip_temp_dir / zipfile_name
self.logger.debug(f'Creating zip: {zipfile_path}')
await zip_directory(temp_dir, zipfile_path)
shutil.rmtree(temp_dir)
self.logger.debug(f'Uploading: {zipfile_path}')
r2_upload = await upload_to_r2(
zipfile_path, zipfile_name,
self.config['r2']['bucket_name'],
self.config['r2']['accountid'],
self.config['r2']['access_key_id'],
self.config['r2']['access_key_secret']
)
await aos.remove(zipfile_path)
self.logger.info(f'Export for {room.room_id} completed.')
self.exports.add_export(room.room_id, self.config['export_interval'])
pub_url = f'{self.config["r2"]["pub_url"]}/{zipfile_name}'
formatted_body = f'<strong>Export complete!</strong><br><a href="{pub_url}">{pub_url}</a><br>This file will be deleted after 24 hours.'
content = {
"body": f'Export complete!\n{pub_url}\nThis file will be deleted after 24 hours.',
"formatted_body": formatted_body,
"format": "org.matrix.custom.html",
"m.relates_to": {
"m.in_reply_to": {
"event_id": start_msg.event_id
}
},
"msgtype": "m.text"
}
await self.client.room_send(
room.room_id,
'm.room.message',
content
)
except Exception as e:
self.logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}')
self.exports.add_export(room.room_id, self.config['export_interval'] // 2)
content = {
"body": f'❌ Failed to export room!',
"m.relates_to": {
"m.in_reply_to": {
"event_id": start_msg.event_id
}
},
"msgtype": "m.text"
}
await self.client.room_send(
room.room_id,
'm.room.message',
content
)