matrix-room-exporter/exporter/export.py

116 lines
3.3 KiB
Python

import asyncio
import copy
import os
import zipfile
from pathlib import Path
from typing import Union, List
from urllib.parse import urlparse
import aioboto3
import aiofiles
from nio import (
RedactedEvent,
RoomMessageFormatted,
RoomMessageMedia,
Event,
RoomMessagesError,
AsyncClient,
MessageDirection
)
"""
Inspired by https://github.com/russelldavies/matrix-archive/blob/master/matrix-archive.py
"""
class RoomEvents:
def __init__(self, room_id: str):
self._room_id = room_id
self._events: List[Event] = []
def add_event(self, event):
self._events.append(event)
def all(self):
return copy.deepcopy(self._events)
def jsonify(self):
dump = [x.source for x in self._events]
return dump
def is_valid_event(event: Event):
return isinstance(event, (RoomMessageFormatted, RedactedEvent, RoomMessageMedia))
async def download_mxc(client: AsyncClient, url: str) -> bytes:
mxc = urlparse(url)
response = await client.download(mxc.netloc, mxc.path.strip("/"))
if hasattr(response, "body"):
return response.body
else:
return b''
async def zip_directory(directory_path, zip_path):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _zip_directory, directory_path, zip_path)
def _zip_directory(directory_path, zip_path):
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = str(os.path.join(root, file))
zip_file.write(file_path, os.path.relpath(file_path, directory_path))
async def upload_to_r2(source_file: Union[str, Path], destination_file: str, bucket_name: str, accountid: str, access_key_id: str, access_key_secret: str):
session = aioboto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=access_key_secret,
)
async with session.resource(
service_name="s3",
endpoint_url=f'https://{accountid}.r2.cloudflarestorage.com',
) as s3:
bucket = await s3.Bucket(bucket_name)
async with aiofiles.open(source_file, 'rb') as f:
return await bucket.upload_fileobj(f, destination_file)
def trim_filename(filename, max_length=230):
extension = os.path.splitext(filename)[1]
max_length -= len(extension)
if len(filename) > max_length:
filename = filename[:max_length]
filename += extension
return filename
async def fetch_events(client: AsyncClient, room_id: str, start_token: str, direction: MessageDirection) -> RoomEvents:
room_events = RoomEvents(room_id)
events = []
while True:
response = await client.room_messages(
room_id,
start=start_token,
limit=1000,
direction=direction
)
if isinstance(response, RoomMessagesError):
raise Exception(f'Failed to read room "{room_id}": {response.message}')
if len(response.chunk) == 0:
break
events.extend(event for event in response.chunk)
start_token = response.end
if direction == MessageDirection.back:
events = reversed(events)
for event in events:
room_events.add_event(event)
return room_events