fix concurrancy, add whitelist

This commit is contained in:
Cyberes 2024-04-04 15:04:13 -06:00
parent d205622b10
commit 54eb025385
9 changed files with 291 additions and 180 deletions

View File

@ -8,16 +8,17 @@ download.
### Install ### Install
1. `pip install -r requirements.txt` 1. `pip install -r requirements.txt`
2. `cp config.yml.sample config.yml` 2. `sudo apt install redis-server && sudo systemctl enable --now redis-server`
3. [Create a Cloudflare R2 bucket](https://developers.cloudflare.com/r2/get-started/#2-create-a-bucket). 3`cp config.yml.sample config.yml`
4. Enable [public access](https://developers.cloudflare.com/r2/buckets/public-buckets/). 4. [Create a Cloudflare R2 bucket](https://developers.cloudflare.com/r2/get-started/#2-create-a-bucket).
5. Create an [access key](https://developers.cloudflare.com/r2/api/s3/tokens/). 5. Enable [public access](https://developers.cloudflare.com/r2/buckets/public-buckets/).
6. Configure `config.yml` with your R2 credentials. 6. Create an [access key](https://developers.cloudflare.com/r2/api/s3/tokens/).
7. Set up an [object lifecycle rule](https://developers.cloudflare.com/r2/buckets/object-lifecycles/) to auto-delete old 7. Configure `config.yml` with your R2 credentials.
exports from your bucket. 24 hours is a good choice. 8. Set up an [object lifecycle rule](https://developers.cloudflare.com/r2/buckets/object-lifecycles) to auto-delete old exports from your bucket. 24 hours is a good choice.
8. Set up a [matrix-org/pantalaimon](https://github.com/matrix-org/pantalaimon) for encrypted sync. 9. Set up a [matrix-org/pantalaimon](https://github.com/matrix-org/pantalaimon) for encrypted sync.
9. Create a new Matrix user and configure it in `config.yml`. 10. Create a new Matrix user and configure it in `config.yml`
10. Start the bot with `python3 main.py` 11. Add your username to `allowed_to_export` in `config.yml`
12. Start the bot with `python3 main.py`
### Use ### Use

View File

@ -12,6 +12,13 @@ command_prefix: '!export'
# If an export failed, the interval will be half of this value. # If an export failed, the interval will be half of this value.
export_interval: 3600 export_interval: 3600
# How many rooms will be exported concurrently.
concurrent_exports: 3
# Whitelist of usernames that are allowed to use the export bot.
allowed_to_export:
- '@bob:example.com'
r2: r2:
bucket_name: export-bot bucket_name: export-bot
accountid: 12345 accountid: 12345

View File

@ -1,55 +1,26 @@
import copy import asyncio
import json
import logging import logging
import os
import re
import shutil
import tempfile
import time import time
import traceback from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
import aiofiles import redis
import aiofiles.os as aos from nio import InviteMemberEvent, JoinError, MatrixRoom, RoomMessageText, RoomSendResponse
from nio import (AsyncClient, InviteMemberEvent, JoinError, MatrixRoom, RoomMessageText, SyncResponse, RoomMessageMedia, MessageDirection)
from exporter.export import download_mxc, zip_directory, upload_to_r2, trim_filename, fetch_events from exporter.export_tracker import ExportTracker
from exporter.matrix import MatrixClientHelper
CLEAN_ROOM_NAME_RE = re.compile('[^a-zA-Z0-9]') from exporter.run import run_export_thread
class RoomExport:
def __init__(self, room_id: str, allowed_again: datetime):
self.room_id = room_id
self.allowed_again = allowed_again
class ExportTracker:
def __init__(self):
self._exported_rooms = {}
def add_export(self, room_id: str, seconds_elapsed_allowed: int) -> None:
allowed_again = datetime.now() + timedelta(seconds=seconds_elapsed_allowed)
self._exported_rooms[room_id] = RoomExport(room_id, allowed_again)
def check_allowed(self, room_id):
if self.get_export(room_id):
return datetime.now() >= self._exported_rooms[room_id].allowed_again
return True
def get_export(self, room_id: str) -> RoomExport | None:
return copy.deepcopy(self._exported_rooms.get(room_id))
class MatrixBotCallbacks: class MatrixBotCallbacks:
def __init__(self, client: AsyncClient, config: dict): def __init__(self, client: MatrixClientHelper, config: dict):
self.client = client self.client = client.client
self.client_helper = client
self.config = config self.config = config
self.logger = logging.getLogger('ExportBot').getChild('MatrixBotCallbacks') self.logger = logging.getLogger('ExportBot').getChild('MatrixBotCallbacks')
self.startup_ts = time.time() * 1000 self.startup_ts = time.time() * 1000
self.exports = ExportTracker() self.exports = ExportTracker()
self.zip_temp_dir = Path(tempfile.mkdtemp()) self.sem = asyncio.Semaphore(config['concurrent_exports'])
self.redis = redis.Redis(host='localhost', port=6379, db=3)
async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent): async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent):
""" """
@ -58,7 +29,11 @@ class MatrixBotCallbacks:
not actually our own invite event (such as the inviter's membership). not actually our own invite event (such as the inviter's membership).
This makes sure we only call `callbacks.invite` with our own invite events. This makes sure we only call `callbacks.invite` with our own invite events.
""" """
# event.source.get('origin_server_ts') > self.startup_time
if event.sender not in self.config['allowed_to_export']:
self.logger.debug(f'Rejected invite from {event.sender}')
return
if event.state_key == self.client.user_id: if event.state_key == self.client.user_id:
self.logger.info(f"Got invite to {room.room_id} from {event.sender}.") self.logger.info(f"Got invite to {room.room_id} from {event.sender}.")
@ -73,31 +48,33 @@ class MatrixBotCallbacks:
else: else:
self.logger.error("Unable to join room: %s", room.room_id) self.logger.error("Unable to join room: %s", room.room_id)
async def handle_message(self, room: MatrixRoom, event: RoomMessageText): async def handle_message(self, room: MatrixRoom, requestor_event: RoomMessageText):
msg = event.body.strip().strip('\n') msg = requestor_event.body.strip().strip('\n')
if msg == "** Unable to decrypt: The sender's device has not sent us the keys for this message. **": if msg == "** Unable to decrypt: The sender's device has not sent us the keys for this message. **":
self.logger.debug(f'Unable to decrypt event "{event.event_id} in room {room.room_id}') self.logger.debug(f'Unable to decrypt event "{requestor_event.event_id} in room {room.room_id}')
return return
if msg != self.config['command_prefix']: if msg != self.config['command_prefix']:
return return
if event.server_timestamp < self.startup_ts: if requestor_event.server_timestamp < self.startup_ts:
return return
if event.sender == self.client.user_id: if requestor_event.sender == self.client.user_id:
return
if requestor_event.sender not in self.config['allowed_to_export']:
await self.client_helper.react_to_event(room.room_id, requestor_event.event_id, '')
return return
requestor = event.sender self.logger.info(f'Export for "{room.name}" ({room.room_id}) requested by {requestor_event.sender}')
self.logger.info(f'Export for "{room.name}" ({room.room_id}) requested by {requestor}') await self.client.room_read_markers(room.room_id, requestor_event.event_id, requestor_event.event_id)
await self.client.room_read_markers(room.room_id, event.event_id, event.event_id)
if not self.exports.check_allowed(room.room_id): if not self.exports.check_allowed(room.room_id):
last_export = self.exports.get_export(room.room_id) last_export = self.exports.get_export(room.room_id)
time_diff = last_export.allowed_again - datetime.now() time_diff = last_export.allowed_again - datetime.now()
minutes_until_future = time_diff.total_seconds() // 60 minutes_until_future = int(time_diff.total_seconds() / 60)
content = { content = {
"body": f'Cannot export again for {minutes_until_future} minutes.', "body": f'Cannot export again for {minutes_until_future} minutes.',
"m.relates_to": { "m.relates_to": {
"m.in_reply_to": { "m.in_reply_to": {
"event_id": event.event_id "event_id": requestor_event.event_id
} }
}, },
"msgtype": "m.text" "msgtype": "m.text"
@ -110,114 +87,18 @@ class MatrixBotCallbacks:
self.logger.info(f'Rejected export in {room.room_id}, {minutes_until_future} minutes remaining.') self.logger.info(f'Rejected export in {room.room_id}, {minutes_until_future} minutes remaining.')
return return
export_created = int(time.time()) if room.room_id.encode() in self.redis.lrange('in_progress', 0, -1):
clean_room_name_str = '' return
if room.name: self.redis.lpush('in_progress', room.room_id)
clean_room_name = CLEAN_ROOM_NAME_RE.sub('', room.name)
clean_room_name_str = f'{clean_room_name}-'
zipfile_name = f'{clean_room_name_str}{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip'
zipfile_path = None
temp_dir = None
start_msg = await self.client.room_send( start_msg = await self.client.room_send(
room_id=room.room_id, room_id=room.room_id,
message_type="m.room.message", message_type="m.room.message",
content={"msgtype": "m.text", "format": "org.matrix.custom.html", "body": 'Exporting room...', 'formatted_body': '<i>Exporting room...</i>'}, content={"msgtype": "m.text", "format": "org.matrix.custom.html", "body": 'Exporting room...', 'formatted_body': '<i>Exporting room...</i>'},
) )
if not isinstance(start_msg, RoomSendResponse):
self.logger.error(f'Failed to export room {room.room_id}: {start_msg}')
return
try: await self.sem.acquire()
sync_response = await self.client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}}) task = asyncio.create_task(run_export_thread(room, requestor_event, self.client, self.config, start_msg, self.sem))
if not isinstance(sync_response, SyncResponse):
self.logger.error(f'Failed to sync room "{room.room_id}": {sync_response}')
raise
start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch
room_events = [
await fetch_events(self.client, room.room_id, start_token, MessageDirection.back),
await fetch_events(self.client, room.room_id, start_token, MessageDirection.front),
]
temp_dir = Path(tempfile.mkdtemp())
exported_events = []
self.logger.debug(f'Writing export for {room.room_id} to {temp_dir}')
for direction in room_events:
for event in direction.all():
if isinstance(event, RoomMessageMedia):
media_data = await download_mxc(self.client, event.url)
filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}')
event.source["exported_file_path"] = filename
async with aiofiles.open(temp_dir / filename, "wb") as f_media:
await f_media.write(media_data)
exported_events.append(event.source)
exported_data = {
'room': {
'id': room.room_id,
'name': room.name
},
'requesting_user': requestor,
'exporting_user': self.client.user_id,
'created': export_created,
'events': sorted(exported_events, key=lambda x: x['origin_server_ts'])
}
async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f:
await f.write(json.dumps(exported_data, ensure_ascii=False, indent=4))
zipfile_path = self.zip_temp_dir / zipfile_name
self.logger.debug(f'Creating zip: {zipfile_path}')
await zip_directory(temp_dir, zipfile_path)
shutil.rmtree(temp_dir)
self.logger.debug(f'Uploading: {zipfile_path}')
r2_upload = await upload_to_r2(
zipfile_path, zipfile_name,
self.config['r2']['bucket_name'],
self.config['r2']['accountid'],
self.config['r2']['access_key_id'],
self.config['r2']['access_key_secret']
)
await aos.remove(zipfile_path)
self.logger.info(f'Export for {room.room_id} completed')
self.exports.add_export(room.room_id, self.config['export_interval'])
pub_url = f'{self.config["r2"]["pub_url"]}/{zipfile_name}'
formatted_body = f'<strong>Export complete!</strong><br><a href="{pub_url}">{pub_url}</a><br>This file will be deleted after 24 hours.'
content = {
"body": f'Export complete!\n{pub_url}\nThis file will be deleted after {self.config["r2"]["retention_hrs"]} hours.',
"formatted_body": formatted_body,
"format": "org.matrix.custom.html",
"m.relates_to": {
"m.in_reply_to": {
"event_id": start_msg.event_id
}
},
"msgtype": "m.text"
}
await self.client.room_send(
room.room_id,
'm.room.message',
content
)
except Exception as e:
self.logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}')
self.exports.add_export(room.room_id, self.config['export_interval'] // 2)
content = {
"body": f'❌ Failed to export room!',
"m.relates_to": {
"m.in_reply_to": {
"event_id": start_msg.event_id
}
},
"msgtype": "m.text"
}
await self.client.room_send(
room.room_id,
'm.room.message',
content
)
if zipfile_path:
os.remove(zipfile_path)
if temp_dir:
shutil.rmtree(temp_dir)

View File

@ -6,6 +6,8 @@ DEFAULT_CONFIG = {
}, },
'command_prefix': '!export', 'command_prefix': '!export',
'export_interval': 3600, 'export_interval': 3600,
'concurrent_exports': 3,
'allowed_to_export': [],
'r2': { 'r2': {
'bucket_name': None, 'bucket_name': None,
'accountid': None, 'accountid': None,

View File

@ -0,0 +1,52 @@
import json
from datetime import datetime, timedelta
import redis
class RoomExport:
def __init__(self, room_id: str, allowed_again: datetime):
self.room_id = room_id
self.allowed_again = allowed_again
def to_json(self):
return json.dumps({
'room_id': self.room_id,
'allowed_again': self.allowed_again.isoformat()
})
@classmethod
def from_json(cls, data):
data = json.loads(data)
return cls(data['room_id'], datetime.fromisoformat(data['allowed_again']))
class ExportTracker:
def __init__(self):
self._redis = redis.Redis(host='localhost', port=6379, db=3)
def add_export(self, room_id: str, seconds_elapsed_allowed: int) -> None:
allowed_again = datetime.now() + timedelta(seconds=seconds_elapsed_allowed)
room_export = RoomExport(room_id, allowed_again)
self._redis.set(room_id, room_export.to_json())
def check_allowed(self, room_id):
room_export = self.get_export(room_id)
if room_export:
return datetime.now() >= room_export.allowed_again
return True
def get_export(self, room_id: str) -> RoomExport | None:
data = self._redis.get(room_id)
if data:
return RoomExport.from_json(data.decode('utf-8'))
return None
def all(self):
keys = self._redis.keys('*')
exports = []
for key in keys:
export = self.get_export(key.decode('utf-8'))
if export:
exports.append(export)
return exports

View File

@ -2,8 +2,9 @@ import json
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
from typing import Union
from nio import AsyncClient, AsyncClientConfig, LoginError from nio import AsyncClient, AsyncClientConfig, LoginError, Response, ErrorResponse
from nio import LoginResponse from nio import LoginResponse
@ -69,3 +70,18 @@ class MatrixClientHelper:
"device_id": resp.device_id, "device_id": resp.device_id,
"access_token": resp.access_token, "access_token": resp.access_token,
}, f) }, f)
async def react_to_event(self, room_id: str, event_id: str, reaction_text: str, extra_error: str = False, extra_msg: str = False) -> Union[Response, ErrorResponse]:
content = {
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": event_id,
"key": reaction_text
},
"m.matrixbot": {}
}
if extra_error:
content["m.matrixbot"]["error"] = str(extra_error)
if extra_msg:
content["m.matrixbot"]["msg"] = str(extra_msg)
return await self.client.room_send(room_id, "m.reaction", content, ignore_unverified_devices=True)

147
exporter/run.py Normal file
View File

@ -0,0 +1,147 @@
import json
import logging
import os
import re
import shutil
import tempfile
import time
import traceback
from asyncio import Semaphore
from io import BytesIO
from pathlib import Path
import aiofiles
import aiofiles.os as aos
import redis
from PIL import Image
from nio import MatrixRoom, SyncResponse, RoomMessageMedia, MessageDirection, AsyncClient, RoomMessageText, RoomSendResponse
from exporter.export import download_mxc, zip_directory, upload_to_r2, trim_filename, fetch_events
from exporter.export_tracker import ExportTracker
CLEAN_ROOM_NAME_RE = re.compile('[^a-zA-Z0-9]')
async def run_export_thread(room: MatrixRoom, requestor_event: RoomMessageText, client: AsyncClient, config: dict, start_msg: RoomSendResponse, sem: Semaphore):
logger = logging.getLogger('ExportBot').getChild('RunExportThread')
export_created = int(time.time())
clean_room_name_str = ''
if room.name:
clean_room_name = CLEAN_ROOM_NAME_RE.sub('', room.name)
clean_room_name_str = f'{clean_room_name}-'
zipfile_name = f'{clean_room_name_str}{room.room_id.replace(":", "_").replace("!", "").replace(".", "")}-{int(time.time())}.zip'
zipfile_path = None
temp_dir = None
r = redis.Redis(host='localhost', port=6379, db=3)
exports = ExportTracker()
zip_temp_dir = Path(tempfile.mkdtemp())
try:
sync_response = await client.sync(full_state=True, sync_filter={"room": {"timeline": {"limit": 1}}})
if not isinstance(sync_response, SyncResponse):
logger.error(f'Failed to sync room "{room.room_id}": {sync_response}')
raise
start_token = sync_response.rooms.join[room.room_id].timeline.prev_batch
room_events = [
await fetch_events(client, room.room_id, start_token, MessageDirection.back),
await fetch_events(client, room.room_id, start_token, MessageDirection.front),
]
temp_dir = Path(tempfile.mkdtemp())
exported_events = []
logger.debug(f'Writing export for {room.room_id} to {temp_dir}')
for direction in room_events:
for event in direction.all():
if isinstance(event, RoomMessageMedia):
media_data = await download_mxc(client, event.url)
filename = trim_filename(f'{int(event.server_timestamp / 1000)} -- {event.body}')
event.source["exported_file_path"] = filename
async with aiofiles.open(temp_dir / filename, "wb") as f_media:
await f_media.write(media_data)
exported_events.append(event.source)
room_avatar_data = await download_mxc(client, room.room_avatar_url)
image = Image.open(BytesIO(room_avatar_data))
room_avatar_filename = 'room_avatar.' + str(image.format).lower()
async with aiofiles.open(temp_dir / room_avatar_filename, "wb") as f_media:
await f_media.write(room_avatar_data)
exported_data = {
'room': {
'id': room.room_id,
'name': room.name,
'avatar_path': room_avatar_filename
},
'users': room.names,
'requesting_user': requestor_event.sender,
'exporting_user': client.user_id,
'created': export_created,
'events': sorted(exported_events, key=lambda x: x['origin_server_ts'])
}
async with aiofiles.open(temp_dir / 'data.json', 'w', encoding='utf-8') as f:
await f.write(json.dumps(exported_data, ensure_ascii=False, indent=4))
zipfile_path = zip_temp_dir / zipfile_name
logger.debug(f'Creating zip: {zipfile_path}')
await zip_directory(temp_dir, zipfile_path)
shutil.rmtree(temp_dir)
logger.debug(f'Uploading: {zipfile_path}')
r2_upload = await upload_to_r2(
zipfile_path, zipfile_name,
config['r2']['bucket_name'],
config['r2']['accountid'],
config['r2']['access_key_id'],
config['r2']['access_key_secret']
)
await aos.remove(zipfile_path)
logger.info(f'Export for {room.room_id} completed')
exports.add_export(room.room_id, config['export_interval'])
pub_url = f'{config["r2"]["pub_url"]}/{zipfile_name}'
formatted_body = f'<strong>Export complete!</strong><br><a href="{pub_url}">{pub_url}</a><br>This file will be deleted after 24 hours.'
content = {
"body": f'Export complete!\n{pub_url}\nThis file will be deleted after {config["r2"]["retention_hrs"]} hours.',
"formatted_body": formatted_body,
"format": "org.matrix.custom.html",
"m.relates_to": {
"m.in_reply_to": {
"event_id": start_msg.event_id
}
},
"msgtype": "m.text"
}
await client.room_send(
room.room_id,
'm.room.message',
content
)
except Exception as e:
logger.error(f'Export failed for {room.room_id}: {traceback.format_exc()}')
exports.add_export(room.room_id, config['export_interval'] // 2)
content = {
"body": f'❌ Failed to export room!',
"m.relates_to": {
"m.in_reply_to": {
"event_id": start_msg.event_id
}
},
"msgtype": "m.text"
}
await client.room_send(
room.room_id,
'm.room.message',
content
)
if zipfile_path:
os.remove(zipfile_path)
if temp_dir:
shutil.rmtree(temp_dir)
r.lrem('in_progress', 0, room.room_id)
sem.release()

View File

@ -4,6 +4,7 @@ import logging
import os import os
from pathlib import Path from pathlib import Path
import redis
import yaml import yaml
from nio import InviteMemberEvent, RoomMessageText from nio import InviteMemberEvent, RoomMessageText
@ -28,12 +29,14 @@ async def main(args):
# Really lazy way to do this # Really lazy way to do this
config = yaml_config config = yaml_config
config['r2'] = DEFAULT_CONFIG['r2'] | yaml_config.get('r2', {}) config['r2'] = DEFAULT_CONFIG['r2'] | yaml_config.get('r2', {})
config['allowed_to_export'] = yaml_config.get('allowed_to_export', [])
print(config["r2"]["retention_hrs"])
logger.setLevel(logging.DEBUG if args.debug else logging.INFO) logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
logger.debug('Debug logging enabled!') logger.debug('Debug logging enabled!')
r = redis.Redis(host='localhost', port=6379, db=3)
r.flushdb()
matrix_helper = MatrixClientHelper( matrix_helper = MatrixClientHelper(
user_id=config['auth']['username'], user_id=config['auth']['username'],
passwd=config['auth']['password'], passwd=config['auth']['password'],
@ -48,7 +51,7 @@ async def main(args):
quit(1) quit(1)
logger.info(f'Logged in as {client.user_id}') logger.info(f'Logged in as {client.user_id}')
callbacks = MatrixBotCallbacks(client, config) callbacks = MatrixBotCallbacks(matrix_helper, config)
client.add_event_callback(callbacks.handle_invite, InviteMemberEvent) client.add_event_callback(callbacks.handle_invite, InviteMemberEvent)
client.add_event_callback(callbacks.handle_message, RoomMessageText) client.add_event_callback(callbacks.handle_message, RoomMessageText)

View File

@ -1,3 +1,5 @@
matrix-nio==0.24.0 matrix-nio==0.24.0
aioboto3==12.3.0 aioboto3==12.3.0
pyyaml==6.0.1 pyyaml==6.0.1
redis==5.0.3
pillow==10.3.0