matrix-room-exporter/exporter/export.py

116 lines
3.3 KiB
Python
Raw Normal View History

2024-04-03 22:57:54 -06:00
import asyncio
import copy
import os
import zipfile
from pathlib import Path
from typing import Union, List
from urllib.parse import urlparse
import aioboto3
import aiofiles
from nio import (
RedactedEvent,
RoomMessageFormatted,
RoomMessageMedia,
Event,
RoomMessagesError,
AsyncClient,
MessageDirection
)
"""
Inspired by https://github.com/russelldavies/matrix-archive/blob/master/matrix-archive.py
"""
class RoomEvents:
def __init__(self, room_id: str):
self._room_id = room_id
self._events: List[Event] = []
def add_event(self, event):
self._events.append(event)
def all(self):
return copy.deepcopy(self._events)
def jsonify(self):
dump = [x.source for x in self._events]
return dump
def is_valid_event(event: Event):
return isinstance(event, (RoomMessageFormatted, RedactedEvent, RoomMessageMedia))
async def download_mxc(client: AsyncClient, url: str) -> bytes:
mxc = urlparse(url)
response = await client.download(mxc.netloc, mxc.path.strip("/"))
if hasattr(response, "body"):
return response.body
else:
return b''
async def zip_directory(directory_path, zip_path):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _zip_directory, directory_path, zip_path)
def _zip_directory(directory_path, zip_path):
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = str(os.path.join(root, file))
zip_file.write(file_path, os.path.relpath(file_path, directory_path))
async def upload_to_r2(source_file: Union[str, Path], destination_file: str, bucket_name: str, accountid: str, access_key_id: str, access_key_secret: str):
session = aioboto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=access_key_secret,
)
async with session.resource(
service_name="s3",
endpoint_url=f'https://{accountid}.r2.cloudflarestorage.com',
) as s3:
bucket = await s3.Bucket(bucket_name)
async with aiofiles.open(source_file, 'rb') as f:
return await bucket.upload_fileobj(f, destination_file)
def trim_filename(filename, max_length=255):
extension = os.path.splitext(filename)[1]
max_length -= len(extension)
if len(filename) > max_length:
filename = filename[:max_length]
filename += extension
return filename
async def fetch_events(client: AsyncClient, room_id: str, start_token: str, direction: MessageDirection) -> RoomEvents:
room_events = RoomEvents(room_id)
events = []
while True:
response = await client.room_messages(
room_id,
start=start_token,
limit=1000,
direction=direction
)
if isinstance(response, RoomMessagesError):
raise Exception(f'Failed to read room "{room_id}": {response.message}')
if len(response.chunk) == 0:
break
events.extend(event for event in response.chunk)
start_token = response.end
if direction == MessageDirection.back:
events = reversed(events)
for event in events:
room_events.add_event(event)
return room_events