import sys import requests import nagios def handle_err(func): def wrapper(*args, **kwargs): try: crit, ret = func(*args, **kwargs) except Exception as e: print(f"UNKNOWN: exception '{e}'") sys.exit(nagios.UNKNOWN) if crit: print(f"CRITICAL: {crit}") sys.exit(nagios.CRITICAL) else: return ret return wrapper @handle_err def login(user_id: str, passwd: str, homeserver: str): data = {'type': 'm.login.password', 'user': user_id, 'password': passwd} r = requests.post(f'{homeserver}/_matrix/client/r0/login', json=data) if r.status_code != 200: return f'Bad status code on login for {user_id}: {r.status_code}\nBody: {r.text}', None return None, r.json() @handle_err def create_room(room_name, homeserver, auth_token): """ Creates an unencrypted room. """ data = {"name": room_name, "preset": "private_chat", "visibility": "private", # "initial_state": [{"type": "m.room.guest_access", "state_key": "", "content": {"guest_access": "can_join"}}] } r = requests.post(f'{homeserver}/_matrix/client/r0/createRoom?access_token={auth_token}', json=data) if r.status_code != 200: return Exception(f'Bad status code on create room for {room_name}: {r.status_code}\nBody: {r.text}'), None return None, r.json() @handle_err def send_invite(room_id, target_user_id, homeserver, auth_token): r = requests.post(f'{homeserver}/_matrix/client/r0/rooms/{room_id}/invite?access_token={auth_token}', json={'user_id': target_user_id}) if r.status_code != 200: return Exception(f'Bad status code on send invite for {room_id}: {r.status_code}\nBody: {r.text}'), None return None, r.json() @handle_err def join_room(room_id, homeserver, auth_token): r = requests.post(f'{homeserver}/_matrix/client/r0/join/{room_id}?access_token={auth_token}', data='{}') if r.status_code != 200: return Exception(f'Bad status code on join room for {room_id}: {r.status_code}\nBody: {r.text}'), None return None, r.json() @handle_err def join_room_invite(room_id, homeserver, auth_token): r = requests.post(f'{homeserver}/_matrix/client/r0/rooms/{room_id}/join?access_token={auth_token}', data='{}') if r.status_code != 200: return Exception(f'Bad status code on join room via invite for {room_id}: {r.status_code}\nBody: {r.text}'), None return None, r.json() @handle_err def send_msg(message, room_id, homeserver, auth_token): r = requests.post(f'{homeserver}/_matrix/client/r0/rooms/{room_id}/send/m.room.message?access_token={auth_token}', json={'msgtype': 'm.text', 'body': message}) if r.status_code != 200: return Exception(f'Bad status code on send message for {room_id}: {r.status_code}\nBody: {r.text}'), None return None, r.json() # errors will be handled in the other script def get_event(event_id, room_id, homeserver, auth_token): return requests.get(f'{homeserver}/_matrix/client/v3/rooms/{room_id}/event/{event_id}?access_token={auth_token}') @handle_err def get_state(homeserver, auth_token, since=None): if since: url = f'{homeserver}/_matrix/client/r0/sync?since{since}&access_token={auth_token}' else: url = f'{homeserver}/_matrix/client/r0/sync?access_token={auth_token}' r = requests.get(url) if r.status_code != 200: return Exception(f'Bad status code on sync: {r.status_code}\nBody: {r.text}'), None return None, r.json() @handle_err def forget_room(room_id, homeserver, auth_token): r = requests.post(f'{homeserver}/_matrix/client/r0/rooms/{room_id}/forget?access_token={auth_token}', data='{}') if r.status_code != 200: return Exception(f'Bad status code on leave room for {room_id}: {r.status_code}\nBody: {r.text}'), None return None, r.json() @handle_err def leave_room(room_id, homeserver, auth_token, forget=False): r = requests.post(f'{homeserver}/_matrix/client/r0/rooms/{room_id}/leave?access_token={auth_token}', data='{}') if r.status_code != 200: return Exception(f'Bad status code on leave room for {room_id}: {r.status_code}\nBody: {r.text}'), None if forget: f = forget_room(room_id, homeserver, auth_token) return None, r.json()