matrix-room-exporter/exporter/run.py

148 lines
5.6 KiB
Python

import json
import logging
import os
import re
import shutil
import tempfile
import time
import traceback
from asyncio import Semaphore
from io import BytesIO
from pathlib import Path
import aiofiles
import aiofiles.os as aos
import redis
from PIL import Image
from nio import MatrixRoom, SyncResponse, RoomMessageMedia, MessageDirection, AsyncClient, RoomMessageText, RoomSendResponse
from exporter.export import download_mxc, zip_directory, upload_to_r2, trim_filename, fetch_events
from exporter.export_tracker import ExportTracker
CLEAN_ROOM_NAME_RE = re.compile('[^a-zA-Z0-9]')
async def run_export_thread(room: MatrixRoom, requestor_event: RoomMessageText, client: AsyncClient, config: dict, start_msg: RoomSendResponse, sem: Semaphore):
logger = logging.getLogger('ExportBot').getChild('RunExportThread')
export_created = int(time.time())
clean_room_name_str = ''
if room.name:
clean_room_name = CLEAN_ROOM_NAME_RE.sub('', room.name)
clean_room_name_str = f'{clean_room_name}-'
zipfile_name = f'{clean_room_name_str}{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip'
zipfile_path = None
temp_dir = None
r = redis.Redis(host='localhost', port=6379, db=3)
exports = ExportTracker()
zip_temp_dir = Path(tempfile.mkdtemp())
try:
sync_response = await client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}})
if not isinstance(sync_response, SyncResponse):
logger.error(f'Failed to sync room "{room.room_id}": {sync_response}')
raise
start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch
room_events = [
await fetch_events(client, room.room_id, start_token, MessageDirection.back),
await fetch_events(client, room.room_id, start_token, MessageDirection.front),
]
temp_dir = Path(tempfile.mkdtemp())
exported_events = []
logger.debug(f'Writing export for {room.room_id} to {temp_dir}')
for direction in room_events:
for event in direction.all():
if isinstance(event, RoomMessageMedia):
media_data = await download_mxc(client, event.url)
filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}')
event.source["exported_file_path"] = filename
async with aiofiles.open(temp_dir / filename, "wb") as f_media:
await f_media.write(media_data)
exported_events.append(event.source)
room_avatar_data = await download_mxc(client, room.room_avatar_url)
image = Image.open(BytesIO(room_avatar_data))
room_avatar_filename = 'room_avatar.' + str(image.format).lower()
async with aiofiles.open(temp_dir / room_avatar_filename, "wb") as f_media:
await f_media.write(room_avatar_data)
exported_data = {
'room': {
'id': room.room_id,
'name': room.name,
'avatar_path': room_avatar_filename
},
'users': room.names,
'requesting_user': requestor_event.sender,
'exporting_user': client.user_id,
'created': export_created,
'events': sorted(exported_events, key=lambda x: x['origin_server_ts'])
}
async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f:
await f.write(json.dumps(exported_data, ensure_ascii=False, indent=4))
zipfile_path = zip_temp_dir / zipfile_name
logger.debug(f'Creating zip: {zipfile_path}')
await zip_directory(temp_dir, zipfile_path)
shutil.rmtree(temp_dir)
logger.debug(f'Uploading: {zipfile_path}')
r2_upload = await upload_to_r2(
zipfile_path, zipfile_name,
config['r2']['bucket_name'],
config['r2']['accountid'],
config['r2']['access_key_id'],
config['r2']['access_key_secret']
)
await aos.remove(zipfile_path)
logger.info(f'Export for {room.room_id} completed')
exports.add_export(room.room_id, config['export_interval'])
pub_url = f'{config["r2"]["pub_url"]}/{zipfile_name}'
formatted_body = f'<strong>Export complete!</strong><br><a href="{pub_url}">{pub_url}</a><br>This file will be deleted after 24 hours.'
content = {
"body": f'Export complete!\n{pub_url}\nThis file will be deleted after {config["r2"]["retention_hrs"]} hours.',
"formatted_body": formatted_body,
"format": "org.matrix.custom.html",
"m.relates_to": {
"m.in_reply_to": {
"event_id": start_msg.event_id
}
},
"msgtype": "m.text"
}
await client.room_send(
room.room_id,
'm.room.message',
content
)
except Exception as e:
logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}')
exports.add_export(room.room_id, config['export_interval'] // 2)
content = {
"body": f'❌ Failed to export room!',
"m.relates_to": {
"m.in_reply_to": {
"event_id": start_msg.event_id
}
},
"msgtype": "m.text"
}
await client.room_send(
room.room_id,
'm.room.message',
content
)
if zipfile_path:
os.remove(zipfile_path)
if temp_dir:
shutil.rmtree(temp_dir)
r.lrem('in_progress', 0, room.room_id)
sem.release()