import asyncio import copy import os import zipfile from pathlib import Path from typing import Union, List from urllib.parse import urlparse import aioboto3 import aiofiles from nio import ( RedactedEvent, RoomMessageFormatted, RoomMessageMedia, Event, RoomMessagesError, AsyncClient, MessageDirection ) """ Inspired by https://github.com/russelldavies/matrix-archive/blob/master/matrix-archive.py """ class RoomEvents: def __init__(self, room_id: str): self._room_id = room_id self._events: List[Event] = [] def add_event(self, event): self._events.append(event) def all(self): return copy.deepcopy(self._events) def jsonify(self): dump = [x.source for x in self._events] return dump def is_valid_event(event: Event): return isinstance(event, (RoomMessageFormatted, RedactedEvent, RoomMessageMedia)) async def download_mxc(client: AsyncClient, url: str) -> bytes: mxc = urlparse(url) response = await client.download(mxc.netloc, mxc.path.strip("/")) if hasattr(response, "body"): return response.body else: return b'' async def zip_directory(directory_path, zip_path): loop = asyncio.get_event_loop() await loop.run_in_executor(None, _zip_directory, directory_path, zip_path) def _zip_directory(directory_path, zip_path): with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: for root, dirs, files in os.walk(directory_path): for file in files: file_path = str(os.path.join(root, file)) zip_file.write(file_path, os.path.relpath(file_path, directory_path)) async def upload_to_r2(source_file: Union[str, Path], destination_file: str, bucket_name: str, accountid: str, access_key_id: str, access_key_secret: str): session = aioboto3.Session( aws_access_key_id=access_key_id, aws_secret_access_key=access_key_secret, ) async with session.resource( service_name="s3", endpoint_url=f'https://{accountid}.r2.cloudflarestorage.com', ) as s3: bucket = await s3.Bucket(bucket_name) async with aiofiles.open(source_file, 'rb') as f: return await bucket.upload_fileobj(f, destination_file) def trim_filename(filename, max_length=200): extension = os.path.splitext(filename)[1] max_length -= len(extension) if len(filename) > max_length: filename = filename[:max_length] filename += extension return filename async def fetch_events(client: AsyncClient, room_id: str, start_token: str, direction: MessageDirection) -> RoomEvents: room_events = RoomEvents(room_id) events = [] while True: response = await client.room_messages( room_id, start=start_token, limit=1000, direction=direction ) if isinstance(response, RoomMessagesError): raise Exception(f'Failed to read room "{room_id}": {response.message}') if len(response.chunk) == 0: break events.extend(event for event in response.chunk) start_token = response.end if direction == MessageDirection.back: events = reversed(events) for event in events: room_events.add_event(event) return room_events