284 lines
10 KiB
Python
284 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
import asyncio
|
|
import copy
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
|
|
from nio import (AsyncClient, CallEvent, JoinError, MatrixRoom, MegolmEvent, PowerLevelsEvent, RedactionEvent,
|
|
RoomAvatarEvent, RoomCreateEvent, RoomEncryptedAudio, RoomEncryptedFile, RoomEncryptedImage,
|
|
RoomEncryptedVideo, RoomEncryptionEvent, RoomGuestAccessEvent, RoomHistoryVisibilityEvent,
|
|
RoomJoinRulesEvent, RoomMemberEvent, RoomMessageAudio, RoomMessageEmote, RoomMessageFile,
|
|
RoomMessageImage, RoomMessageNotice, RoomMessageText, RoomMessageUnknown, RoomMessageVideo,
|
|
RoomMessagesError, RoomNameEvent, RoomTopicEvent, RoomUpgradeEvent, StickerEvent,
|
|
UnknownEncryptedEvent, UnknownEvent)
|
|
|
|
# SQLite database setup
|
|
conn = sqlite3.connect("matrix_rooms.db")
|
|
c = conn.cursor()
|
|
c.execute('''CREATE TABLE IF NOT EXISTS rooms (
|
|
room_id TEXT PRIMARY KEY,
|
|
server_hostname TEXT,
|
|
room_name TEXT,
|
|
topic TEXT,
|
|
snapshot_timestamp INTEGER
|
|
)''')
|
|
c.execute('''CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY,
|
|
event_id TEXT UNIQUE,
|
|
room_id TEXT,
|
|
sender TEXT,
|
|
event_type TEXT,
|
|
content TEXT,
|
|
timestamp INTEGER
|
|
)''')
|
|
c.execute('''CREATE TABLE IF NOT EXISTS room_members (
|
|
id INTEGER PRIMARY KEY,
|
|
room_id TEXT,
|
|
user_id TEXT,
|
|
server_hostname TEXT,
|
|
snapshot_timestamp INTEGER,
|
|
UNIQUE (room_id, user_id)
|
|
)''')
|
|
conn.commit()
|
|
|
|
# Matrix account credentials
|
|
USERNAME = "user1"
|
|
PASSWORD = "jkldskjldsajklsda"
|
|
HOMESERVER = "https://matrix-client.matrix.org"
|
|
|
|
# Regular expression to match room IDs
|
|
ROOM_ID_REGEX = re.compile(r"([!#][A-Za-z0-9]+:(?:[\w-]+\.)*[\w-]{1,63}(?:\.(?:\w{3}|\w{2})))")
|
|
|
|
|
|
def handle_room_message(event, room_id, client):
|
|
event_type = None
|
|
content = None
|
|
|
|
if isinstance(event, UnknownEvent):
|
|
event_type = 'unknown'
|
|
content = event.source
|
|
elif isinstance(event, UnknownEncryptedEvent):
|
|
event_type = 'unknown_encrypted'
|
|
content = event.source
|
|
elif isinstance(event, MegolmEvent):
|
|
event_type = 'megolm'
|
|
content = event.source
|
|
elif isinstance(event, CallEvent):
|
|
event_type = 'call'
|
|
content = event.source
|
|
elif isinstance(event, RoomEncryptionEvent):
|
|
event_type = 'encryption_enabled'
|
|
content = event.source
|
|
elif isinstance(event, RoomCreateEvent):
|
|
event_type = 'room_create'
|
|
content = event.source
|
|
elif isinstance(event, RoomGuestAccessEvent):
|
|
event_type = 'guest_access'
|
|
content = event.guest_access
|
|
elif isinstance(event, RoomJoinRulesEvent):
|
|
event_type = 'join_rules'
|
|
content = event.join_rule
|
|
elif isinstance(event, RoomHistoryVisibilityEvent):
|
|
event_type = 'history_visibility'
|
|
content = event.history_visibility
|
|
elif isinstance(event, RoomNameEvent):
|
|
event_type = 'room_name'
|
|
content = event.source
|
|
elif isinstance(event, RoomTopicEvent):
|
|
event_type = 'room_name'
|
|
content = event.source
|
|
elif isinstance(event, RoomAvatarEvent):
|
|
event_type = 'room_avatar'
|
|
content = event.avatar_url
|
|
elif isinstance(event, RoomMemberEvent):
|
|
event_type = 'member'
|
|
content = event.source
|
|
elif isinstance(event, RoomMessageText):
|
|
event_type = 'message'
|
|
content = event.source
|
|
# Add any room IDs we find in the message to our database
|
|
room_ids = re.findall(ROOM_ID_REGEX, event.body)
|
|
for new_room_id in room_ids:
|
|
if new_room_id not in client.rooms:
|
|
print(f'Found a new room: {room_id}')
|
|
asyncio.create_task(join_room(client, new_room_id))
|
|
elif isinstance(event, RoomMessageEmote):
|
|
event_type = 'emote'
|
|
content = event.body
|
|
elif isinstance(event, RoomMessageNotice):
|
|
event_type = 'notice'
|
|
content = event.body
|
|
elif isinstance(event, RoomMessageUnknown):
|
|
event_type = 'unknown_msg'
|
|
content = event.source
|
|
elif isinstance(event, PowerLevelsEvent):
|
|
event_type = 'power_levels'
|
|
content = str(event.power_levels)
|
|
elif isinstance(event, RedactionEvent):
|
|
event_type = 'redaction'
|
|
content = {'redacts': event.redacts, 'reason': event.reason}
|
|
elif isinstance(event, StickerEvent):
|
|
event_type = 'sticker'
|
|
content = event.source
|
|
elif isinstance(event, RoomUpgradeEvent):
|
|
event_type = 'room_upgrade'
|
|
content = {'body': event.body, 'replacement_room': event.replacement_room}
|
|
elif isinstance(event, RoomMessageImage):
|
|
event_type = 'message_image'
|
|
content = {'url': event.url, 'body': event.body}
|
|
elif isinstance(event, RoomMessageAudio):
|
|
event_type = 'message_audio'
|
|
content = {'url': event.url, 'body': event.body}
|
|
elif isinstance(event, RoomMessageVideo):
|
|
event_type = 'message_video'
|
|
content = {'url': event.url, 'body': event.body}
|
|
elif isinstance(event, RoomMessageFile):
|
|
event_type = 'message_file'
|
|
content = {'url': event.url, 'body': event.body}
|
|
elif isinstance(event, RoomEncryptedImage):
|
|
event_type = 'message_enc_image'
|
|
content = event.source
|
|
elif isinstance(event, RoomEncryptedAudio):
|
|
event_type = 'message_enc_audio'
|
|
content = event.source
|
|
elif isinstance(event, RoomEncryptedVideo):
|
|
event_type = 'message_enc_video'
|
|
content = event.source
|
|
elif isinstance(event, RoomEncryptedFile):
|
|
event_type = 'message_enc_file'
|
|
content = event.source
|
|
else:
|
|
event_type = 'event_type_not_found'
|
|
content = event.source
|
|
|
|
if isinstance(content, dict):
|
|
content = json.dumps(content)
|
|
|
|
room_id, server_hostname = sanitize_room_id(room_id)
|
|
|
|
if event_type:
|
|
try:
|
|
c.execute(
|
|
"INSERT INTO messages (event_id, room_id, sender, event_type, content, timestamp) VALUES (?, ?, ?, ?, ?, ?)",
|
|
(event.event_id, room_id, event.sender, event_type, content, event.server_timestamp))
|
|
conn.commit()
|
|
print(f"Added message: {event.event_id}")
|
|
except sqlite3.IntegrityError:
|
|
# print(f"Message {event.event_id} already exists in the database.")
|
|
pass
|
|
|
|
|
|
def store_room_members(room_id, room):
|
|
snapshot_timestamp = int(time.time())
|
|
for user_id in room.users:
|
|
server_hostname = user_id.split(":")[1]
|
|
try:
|
|
c.execute(
|
|
"INSERT INTO room_members (room_id, user_id, server_hostname, snapshot_timestamp) VALUES (?, ?, ?, ?)",
|
|
(room_id, user_id, server_hostname, snapshot_timestamp))
|
|
conn.commit()
|
|
print(f"Added member {user_id} to room {room_id}")
|
|
except sqlite3.IntegrityError:
|
|
pass
|
|
|
|
|
|
async def join_room(client, room_id):
|
|
try:
|
|
response = await client.join(room_id)
|
|
if isinstance(response, JoinError):
|
|
print(f"Error while joining room {room_id}: {response.message}")
|
|
else:
|
|
print(f"Joined room {room_id}")
|
|
await client.sync(timeout=30000)
|
|
except Exception as e:
|
|
print(f'Failed to join room {room_id}:', vars(e))
|
|
insert_room(room_id, topic=None, room_name=None)
|
|
|
|
|
|
def sanitize_room_id(room_id: str):
|
|
if isinstance(room_id, tuple):
|
|
room_id = list(filter(None, list(room_id)))
|
|
if len(room_id) == 1:
|
|
room_id = room_id[0]
|
|
if room_id.startswith('#/#'):
|
|
room_id.replace('#/#', '#')
|
|
elif room_id.startswith('#/!'):
|
|
room_id.replace('#/!', '!')
|
|
server_hostname = room_id.split(":")[1]
|
|
else:
|
|
server_hostname = room_id[0].split(":")[1]
|
|
room_id = json.dumps(room_id)
|
|
else:
|
|
server_hostname = room_id.split(":")[1]
|
|
return room_id, server_hostname
|
|
|
|
|
|
def insert_room(room_id: str, room_name, topic):
|
|
snapshot_timestamp = int(time.time())
|
|
room_id, server_hostname = sanitize_room_id(room_id)
|
|
try:
|
|
c.execute(
|
|
"INSERT INTO rooms (room_id, server_hostname, room_name, topic, snapshot_timestamp) VALUES (?, ?, ?, ?, ?)",
|
|
(room_id, server_hostname, room_name, topic, snapshot_timestamp))
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
pass
|
|
|
|
|
|
async def crawl_room_history(client, room_id):
|
|
prev_batch = ''
|
|
while True:
|
|
response = await client.room_messages(room_id=room_id, start=prev_batch, limit=100)
|
|
|
|
if isinstance(response, RoomMessagesError):
|
|
print(f"Error while fetching room messages: {response.message}")
|
|
break
|
|
|
|
if not response.chunk:
|
|
break
|
|
|
|
store_room_members(room_id, client.rooms[room_id])
|
|
|
|
insert_room(room_id, topic=client.rooms[room_id].topic, room_name=client.rooms[room_id].name)
|
|
|
|
for event in response.chunk:
|
|
handle_room_message(event, room_id, client)
|
|
|
|
prev_batch = response.end
|
|
|
|
|
|
async def main():
|
|
client = AsyncClient(HOMESERVER, USERNAME)
|
|
try:
|
|
await client.login(PASSWORD)
|
|
print(f"Logged in as {USERNAME}")
|
|
|
|
while True:
|
|
# Sync with the server to get the joined rooms
|
|
await client.sync(timeout=30000)
|
|
|
|
# We are going to be joining rooms between crawl iterations.
|
|
# Freezing the joined rooms prevents "RuntimeError: dictionary changed size during iteration"
|
|
client_rooms_frozen = copy.deepcopy(client.rooms)
|
|
|
|
# Crawl through the history of each joined room
|
|
for room_id in client_rooms_frozen:
|
|
print(f"Crawling room history: {room_id}")
|
|
await crawl_room_history(client, room_id)
|
|
|
|
# TODO: parse https://{HOMESERVER}/_matrix/client/r0/publicRooms?limit=1000 to get more rooms to crawl
|
|
# TODO: thread the crawlers for each room
|
|
|
|
print('Crawl complete!')
|
|
print('===============================')
|
|
print('Sleeping 1 minute...')
|
|
time.sleep(60)
|
|
finally:
|
|
await client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|