From 8ee69f299cb3360de5c88f0c6b07525d35247fbd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 1 Jul 2019 17:55:11 +0100 Subject: [PATCH 1/8] Add basic function to get all data for a user out of synapse --- synapse/handlers/admin.py | 247 ++++++++++++++++++++++++++++++++++ synapse/storage/roommember.py | 20 +++ tests/handlers/test_admin.py | 210 +++++++++++++++++++++++++++++ tests/unittest.py | 2 +- 4 files changed, 478 insertions(+), 1 deletion(-) create mode 100644 tests/handlers/test_admin.py diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 941ebfa107..e424fc46bd 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -14,9 +14,17 @@ # limitations under the License. import logging +import os +import tempfile + +from canonicaljson import json from twisted.internet import defer +from synapse.api.constants import Membership +from synapse.types import RoomStreamToken +from synapse.visibility import filter_events_for_client + from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -89,3 +97,242 @@ class AdminHandler(BaseHandler): ret = yield self.store.search_users(term) defer.returnValue(ret) + + @defer.inlineCallbacks + def exfiltrate_user_data(self, user_id, writer): + """Write all data we have of the user to the specified directory. + + Args: + user_id (str) + writer (ExfiltrationWriter) + + Returns: + defer.Deferred + """ + # Get all rooms the user is in or has been in + rooms = yield self.store.get_rooms_for_user_where_membership_is( + user_id, + membership_list=( + Membership.JOIN, + Membership.LEAVE, + Membership.BAN, + Membership.INVITE, + ), + ) + + # We only try and fetch events for rooms the user has been in. If + # they've been e.g. invited to a room without joining then we handle + # those seperately. + rooms_user_has_been_in = yield self.store.get_rooms_user_has_been_in(user_id) + + for index, room in enumerate(rooms): + room_id = room.room_id + + logger.info( + "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms) + ) + + forgotten = yield self.store.did_forget(user_id, room_id) + if forgotten: + logger.info("[%s] User forgot room %d, ignoring", room_id) + continue + + if room_id not in rooms_user_has_been_in: + # If we haven't been in the rooms then the filtering code below + # won't return anything, so we need to handle these cases + # explicitly. + + if room.membership == Membership.INVITE: + event_id = room.event_id + invite = yield self.store.get_event(event_id, allow_none=True) + if invite: + invited_state = invite.unsigned["invite_room_state"] + writer.write_invite(room_id, invite, invited_state) + + continue + + # We only want to bother fetching events up to the last time they + # were joined. We estimate that point by looking at the + # stream_ordering of the last membership if it wasn't a join. + if room.membership == Membership.JOIN: + stream_ordering = yield self.store.get_room_max_stream_ordering() + else: + stream_ordering = room.stream_ordering + + from_key = str(RoomStreamToken(0, 0)) + to_key = str(RoomStreamToken(None, stream_ordering)) + + written_events = set() # Events that we've processed in this room + + # We need to track gaps in the events stream so that we can then + # write out the state at those events. We do this by keeping track + # of events whose prev events we haven't seen. + + # Map from event ID to prev events that haven't been processed, + # dict[str, set[str]]. + event_to_unseen_prevs = {} + + # The reverse mapping to above, i.e. map from unseen event to parent + # events. dict[str, set[str]] + unseen_event_to_parents = {} + + # We fetch events in the room the user could see by fetching *all* + # events that we have and then filtering, this isn't the most + # efficient method perhaps but it does guarentee we get everything. + while True: + events, _ = yield self.store.paginate_room_events( + room_id, from_key, to_key, limit=100, direction="f" + ) + if not events: + break + + from_key = events[-1].internal_metadata.after + + events = yield filter_events_for_client(self.store, user_id, events) + + writer.write_events(room_id, events) + + # Update the extremity tracking dicts + for event in events: + # Check if we have any prev events that haven't been + # processed yet, and add those to the appropriate dicts. + unseen_events = set(event.prev_event_ids()) - written_events + if unseen_events: + event_to_unseen_prevs[event.event_id] = unseen_events + for unseen in unseen_events: + unseen_event_to_parents.setdefault(unseen, set()).add( + event.event_id + ) + + # Now check if this event is an unseen prev event, if so + # then we remove this event from the appropriate dicts. + for event_id in unseen_event_to_parents.pop(event.event_id, []): + event_to_unseen_prevs.get(event_id, set()).discard( + event.event_id + ) + + written_events.add(event.event_id) + + logger.info( + "Written %d events in room %s", len(written_events), room_id + ) + + # Extremities are the events who have at least one unseen prev event. + extremities = ( + event_id + for event_id, unseen_prevs in event_to_unseen_prevs.items() + if unseen_prevs + ) + for event_id in extremities: + if not event_to_unseen_prevs[event_id]: + continue + state = yield self.store.get_state_for_event(event_id) + writer.write_state(room_id, event_id, state) + + defer.returnValue(writer.finished()) + + +class ExfiltrationWriter(object): + """Interfaced used to specify how to write exfilrated data. + """ + + def write_events(self, room_id, events): + """Write a batch of events for a room. + + Args: + room_id (str) + events (list[FrozenEvent]) + """ + pass + + def write_state(self, room_id, event_id, state): + """Write the state at the given event in the room. + + This only gets called for backward extremities rather than for each + event. + + Args: + room_id (str) + event_id (str) + state (list[FrozenEvent]) + """ + pass + + def write_invite(self, room_id, event, state): + """Write an invite for the room, with associated invite state. + + Args: + room_id (str) + invite (FrozenEvent) + state (list[dict]): A subset of the state at the invite, with a + subset of the event keys (type, state_key, content and sender) + """ + + def finished(self): + """Called when exfiltration is complete, and the return valus is passed + to the requester. + """ + pass + + +class FileExfiltrationWriter(ExfiltrationWriter): + """An ExfiltrationWriter that writes the users data to a directory. + + Returns the directory location on completion. + + Args: + user_id (str): The user whose data is being exfiltrated. + directory (str|None): The directory to write the data to, if None then + will write to a temporary directory. + """ + + def __init__(self, user_id, directory=None): + self.user_id = user_id + + if directory: + self.base_directory = directory + else: + self.base_directory = tempfile.mkdtemp( + prefix="synapse-exfiltrate__%s__" % (user_id,) + ) + + os.makedirs(self.base_directory, exist_ok=True) + if list(os.listdir(self.base_directory)): + raise Exception("Directory must be empty") + + def write_events(self, room_id, events): + room_directory = os.path.join(self.base_directory, "rooms", room_id) + os.makedirs(room_directory, exist_ok=True) + events_file = os.path.join(room_directory, "events") + + with open(events_file, "a") as f: + for event in events: + print(json.dumps(event.get_pdu_json()), file=f) + + def write_state(self, room_id, event_id, state): + room_directory = os.path.join(self.base_directory, "rooms", room_id) + state_directory = os.path.join(room_directory, "state") + os.makedirs(state_directory, exist_ok=True) + + event_file = os.path.join(state_directory, event_id) + + with open(event_file, "a") as f: + for event in state.values(): + print(json.dumps(event.get_pdu_json()), file=f) + + def write_invite(self, room_id, event, state): + self.write_events(room_id, [event]) + + # We write the invite state somewhere else as they aren't full events + # and are only a subset of the state at the event. + room_directory = os.path.join(self.base_directory, "rooms", room_id) + os.makedirs(room_directory, exist_ok=True) + + invite_state = os.path.join(room_directory, "invite_state") + + with open(invite_state, "a") as f: + for event in state.values(): + print(json.dumps(event), file=f) + + def finished(self): + return self.base_directory diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8004aeb909..32cfd010a5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -575,6 +575,26 @@ class RoomMemberWorkerStore(EventsWorkerStore): count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) + @defer.inlineCallbacks + def get_rooms_user_has_been_in(self, user_id): + """Get all rooms that the user has ever been in. + + Args: + user_id (str) + + Returns: + Deferred[set[str]]: Set of room IDs. + """ + + room_ids = yield self._simple_select_onecol( + table="room_memberships", + keyvalues={"membership": Membership.JOIN, "user_id": user_id}, + retcol="room_id", + desc="get_rooms_user_has_been_in", + ) + + return set(room_ids) + class RoomMemberStore(RoomMemberWorkerStore): def __init__(self, db_conn, hs): diff --git a/tests/handlers/test_admin.py b/tests/handlers/test_admin.py new file mode 100644 index 0000000000..5e7d2d3361 --- /dev/null +++ b/tests/handlers/test_admin.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import Counter + +from mock import Mock + +import synapse.api.errors +import synapse.handlers.admin +import synapse.rest.admin +import synapse.storage +from synapse.api.constants import EventTypes +from synapse.rest.client.v1 import login, room + +from tests import unittest + + +class ExfiltrateData(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + login.register_servlets, + room.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.admin_handler = hs.get_handlers().admin_handler + + self.user1 = self.register_user("user1", "password") + self.token1 = self.login("user1", "password") + + self.user2 = self.register_user("user2", "password") + self.token2 = self.login("user2", "password") + + def test_single_public_joined_room(self): + """Test that we write *all* events for a public room + """ + room_id = self.helper.create_room_as( + self.user1, tok=self.token1, is_public=True + ) + self.helper.send(room_id, body="Hello!", tok=self.token1) + self.helper.join(room_id, self.user2, tok=self.token2) + self.helper.send(room_id, body="Hello again!", tok=self.token1) + + writer = Mock() + + self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + + writer.write_events.assert_called() + + # Since we can see all events there shouldn't be any extremities, so no + # state should be written + writer.write_state.assert_not_called() + + # Collect all events that were written + written_events = [] + for (called_room_id, events), _ in writer.write_events.call_args_list: + self.assertEqual(called_room_id, room_id) + written_events.extend(events) + + # Check that the right number of events were written + counter = Counter( + (event.type, getattr(event, "state_key", None)) for event in written_events + ) + self.assertEqual(counter[(EventTypes.Message, None)], 2) + self.assertEqual(counter[(EventTypes.Member, self.user1)], 1) + self.assertEqual(counter[(EventTypes.Member, self.user2)], 1) + + def test_single_private_joined_room(self): + """Tests that we correctly write state when we can't see all events in + a room. + """ + room_id = self.helper.create_room_as(self.user1, tok=self.token1) + self.helper.send_state( + room_id, + EventTypes.RoomHistoryVisibility, + body={"history_visibility": "joined"}, + tok=self.token1, + ) + self.helper.send(room_id, body="Hello!", tok=self.token1) + self.helper.join(room_id, self.user2, tok=self.token2) + self.helper.send(room_id, body="Hello again!", tok=self.token1) + + writer = Mock() + + self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + + writer.write_events.assert_called() + + # Since we can't see all events there should be one extremity. + writer.write_state.assert_called_once() + + # Collect all events that were written + written_events = [] + for (called_room_id, events), _ in writer.write_events.call_args_list: + self.assertEqual(called_room_id, room_id) + written_events.extend(events) + + # Check that the right number of events were written + counter = Counter( + (event.type, getattr(event, "state_key", None)) for event in written_events + ) + self.assertEqual(counter[(EventTypes.Message, None)], 1) + self.assertEqual(counter[(EventTypes.Member, self.user1)], 1) + self.assertEqual(counter[(EventTypes.Member, self.user2)], 1) + + def test_single_left_room(self): + """Tests that we don't see events in the room after we leave. + """ + room_id = self.helper.create_room_as(self.user1, tok=self.token1) + self.helper.send(room_id, body="Hello!", tok=self.token1) + self.helper.join(room_id, self.user2, tok=self.token2) + self.helper.send(room_id, body="Hello again!", tok=self.token1) + self.helper.leave(room_id, self.user2, tok=self.token2) + self.helper.send(room_id, body="Helloooooo!", tok=self.token1) + + writer = Mock() + + self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + + writer.write_events.assert_called() + + # Since we can see all events there shouldn't be any extremities, so no + # state should be written + writer.write_state.assert_not_called() + + written_events = [] + for (called_room_id, events), _ in writer.write_events.call_args_list: + self.assertEqual(called_room_id, room_id) + written_events.extend(events) + + # Check that the right number of events were written + counter = Counter( + (event.type, getattr(event, "state_key", None)) for event in written_events + ) + self.assertEqual(counter[(EventTypes.Message, None)], 2) + self.assertEqual(counter[(EventTypes.Member, self.user1)], 1) + self.assertEqual(counter[(EventTypes.Member, self.user2)], 2) + + def test_single_left_rejoined_private_room(self): + """Tests that see the correct events in private rooms when we + repeatedly join and leave. + """ + room_id = self.helper.create_room_as(self.user1, tok=self.token1) + self.helper.send_state( + room_id, + EventTypes.RoomHistoryVisibility, + body={"history_visibility": "joined"}, + tok=self.token1, + ) + self.helper.send(room_id, body="Hello!", tok=self.token1) + self.helper.join(room_id, self.user2, tok=self.token2) + self.helper.send(room_id, body="Hello again!", tok=self.token1) + self.helper.leave(room_id, self.user2, tok=self.token2) + self.helper.send(room_id, body="Helloooooo!", tok=self.token1) + self.helper.join(room_id, self.user2, tok=self.token2) + self.helper.send(room_id, body="Helloooooo!!", tok=self.token1) + + writer = Mock() + + self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + + writer.write_events.assert_called_once() + + # Since we joined/left/joined again we expect there to be two gaps. + self.assertEqual(writer.write_state.call_count, 2) + + written_events = [] + for (called_room_id, events), _ in writer.write_events.call_args_list: + self.assertEqual(called_room_id, room_id) + written_events.extend(events) + + # Check that the right number of events were written + counter = Counter( + (event.type, getattr(event, "state_key", None)) for event in written_events + ) + self.assertEqual(counter[(EventTypes.Message, None)], 2) + self.assertEqual(counter[(EventTypes.Member, self.user1)], 1) + self.assertEqual(counter[(EventTypes.Member, self.user2)], 3) + + def test_invite(self): + """Tests that pending invites get handled correctly. + """ + room_id = self.helper.create_room_as(self.user1, tok=self.token1) + self.helper.send(room_id, body="Hello!", tok=self.token1) + self.helper.invite(room_id, self.user1, self.user2, tok=self.token1) + + writer = Mock() + + self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + + writer.write_events.assert_not_called() + writer.write_state.assert_not_called() + writer.write_invite.assert_called_once() + + args = writer.write_invite.call_args[0] + self.assertEqual(args[0], room_id) + self.assertEqual(args[1].content["membership"], "invite") + self.assertTrue(args[2]) # Assert there is at least one bit of state diff --git a/tests/unittest.py b/tests/unittest.py index d26804b5b5..684d5cb1cf 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -443,7 +443,7 @@ class HomeserverTestCase(TestCase): "POST", "/_matrix/client/r0/admin/register", body.encode("utf8") ) self.render(request) - self.assertEqual(channel.code, 200) + self.assertEqual(channel.code, 200, channel.json_body) user_id = channel.json_body["user_id"] return user_id From 65dd5543f60bcd6c5137fb03ac297c97b9b16426 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Jul 2019 12:10:23 +0100 Subject: [PATCH 2/8] Newsfile --- changelog.d/5589.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5589.feature diff --git a/changelog.d/5589.feature b/changelog.d/5589.feature new file mode 100644 index 0000000000..a87e669dd4 --- /dev/null +++ b/changelog.d/5589.feature @@ -0,0 +1 @@ +Add ability to pull all locally stored events out of synapse that a particular user can see. From d0b849c86d93ace21bdf7f73e1411f33a9e1b2fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Jul 2019 15:03:38 +0100 Subject: [PATCH 3/8] Apply comment fixups from code review Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/admin.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index e424fc46bd..6c905e97a7 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -100,7 +100,7 @@ class AdminHandler(BaseHandler): @defer.inlineCallbacks def exfiltrate_user_data(self, user_id, writer): - """Write all data we have of the user to the specified directory. + """Write all data we have on the user to the given writer. Args: user_id (str) @@ -178,7 +178,7 @@ class AdminHandler(BaseHandler): # We fetch events in the room the user could see by fetching *all* # events that we have and then filtering, this isn't the most - # efficient method perhaps but it does guarentee we get everything. + # efficient method perhaps but it does guarantee we get everything. while True: events, _ = yield self.store.paginate_room_events( room_id, from_key, to_key, limit=100, direction="f" @@ -233,7 +233,7 @@ class AdminHandler(BaseHandler): class ExfiltrationWriter(object): - """Interfaced used to specify how to write exfilrated data. + """Interface used to specify how to write exfiltrated data. """ def write_events(self, room_id, events): @@ -263,7 +263,7 @@ class ExfiltrationWriter(object): Args: room_id (str) - invite (FrozenEvent) + event (FrozenEvent) state (list[dict]): A subset of the state at the invite, with a subset of the event keys (type, state_key, content and sender) """ @@ -276,13 +276,13 @@ class ExfiltrationWriter(object): class FileExfiltrationWriter(ExfiltrationWriter): - """An ExfiltrationWriter that writes the users data to a directory. + """An ExfiltrationWriter that writes the user's data to a directory. Returns the directory location on completion. Args: user_id (str): The user whose data is being exfiltrated. - directory (str|None): The directory to write the data to, if None then + directory (str|None): The directory to write the data to. If None then will write to a temporary directory. """ From c061d4f237273f3400dc8e62aa7421f02caec3dd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Jul 2019 11:07:09 +0100 Subject: [PATCH 4/8] Fixup from review comments. --- synapse/handlers/admin.py | 39 ++++++++++++++++++++---------------- tests/handlers/test_admin.py | 10 ++++----- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 6c905e97a7..69d2c8c36f 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -99,7 +99,7 @@ class AdminHandler(BaseHandler): defer.returnValue(ret) @defer.inlineCallbacks - def exfiltrate_user_data(self, user_id, writer): + def export_user_data(self, user_id, writer): """Write all data we have on the user to the given writer. Args: @@ -107,7 +107,8 @@ class AdminHandler(BaseHandler): writer (ExfiltrationWriter) Returns: - defer.Deferred + defer.Deferred: Resolves when all data for a user has been written. + The returned value is that returned by `writer.finished()`. """ # Get all rooms the user is in or has been in rooms = yield self.store.get_rooms_for_user_where_membership_is( @@ -134,7 +135,7 @@ class AdminHandler(BaseHandler): forgotten = yield self.store.did_forget(user_id, room_id) if forgotten: - logger.info("[%s] User forgot room %d, ignoring", room_id) + logger.info("[%s] User forgot room %d, ignoring", user_id, room_id) continue if room_id not in rooms_user_has_been_in: @@ -172,9 +173,10 @@ class AdminHandler(BaseHandler): # dict[str, set[str]]. event_to_unseen_prevs = {} - # The reverse mapping to above, i.e. map from unseen event to parent - # events. dict[str, set[str]] - unseen_event_to_parents = {} + # The reverse mapping to above, i.e. map from unseen event to events + # that have the unseen event in their prev_events, i.e. the unseen + # events "children". dict[str, set[str]] + unseen_to_child_events = {} # We fetch events in the room the user could see by fetching *all* # events that we have and then filtering, this isn't the most @@ -200,14 +202,14 @@ class AdminHandler(BaseHandler): if unseen_events: event_to_unseen_prevs[event.event_id] = unseen_events for unseen in unseen_events: - unseen_event_to_parents.setdefault(unseen, set()).add( + unseen_to_child_events.setdefault(unseen, set()).add( event.event_id ) # Now check if this event is an unseen prev event, if so # then we remove this event from the appropriate dicts. - for event_id in unseen_event_to_parents.pop(event.event_id, []): - event_to_unseen_prevs.get(event_id, set()).discard( + for child_id in unseen_to_child_events.pop(event.event_id, []): + event_to_unseen_prevs.get(child_id, set()).discard( event.event_id ) @@ -233,7 +235,7 @@ class AdminHandler(BaseHandler): class ExfiltrationWriter(object): - """Interface used to specify how to write exfiltrated data. + """Interface used to specify how to write exported data. """ def write_events(self, room_id, events): @@ -254,7 +256,7 @@ class ExfiltrationWriter(object): Args: room_id (str) event_id (str) - state (list[FrozenEvent]) + state (dict[tuple[str, str], FrozenEvent]) """ pass @@ -264,13 +266,16 @@ class ExfiltrationWriter(object): Args: room_id (str) event (FrozenEvent) - state (list[dict]): A subset of the state at the invite, with a - subset of the event keys (type, state_key, content and sender) + state (dict[tuple[str, str], dict]): A subset of the state at the + invite, with a subset of the event keys (type, state_key + content and sender) """ def finished(self): - """Called when exfiltration is complete, and the return valus is passed - to the requester. + """Called when all data has succesfully been exported and written. + + This functions return value is passed to the caller of + `export_user_data`. """ pass @@ -281,7 +286,7 @@ class FileExfiltrationWriter(ExfiltrationWriter): Returns the directory location on completion. Args: - user_id (str): The user whose data is being exfiltrated. + user_id (str): The user whose data is being exported. directory (str|None): The directory to write the data to. If None then will write to a temporary directory. """ @@ -293,7 +298,7 @@ class FileExfiltrationWriter(ExfiltrationWriter): self.base_directory = directory else: self.base_directory = tempfile.mkdtemp( - prefix="synapse-exfiltrate__%s__" % (user_id,) + prefix="synapse-exported__%s__" % (user_id,) ) os.makedirs(self.base_directory, exist_ok=True) diff --git a/tests/handlers/test_admin.py b/tests/handlers/test_admin.py index 5e7d2d3361..fc37c4328c 100644 --- a/tests/handlers/test_admin.py +++ b/tests/handlers/test_admin.py @@ -55,7 +55,7 @@ class ExfiltrateData(unittest.HomeserverTestCase): writer = Mock() - self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + self.get_success(self.admin_handler.export_user_data(self.user2, writer)) writer.write_events.assert_called() @@ -94,7 +94,7 @@ class ExfiltrateData(unittest.HomeserverTestCase): writer = Mock() - self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + self.get_success(self.admin_handler.export_user_data(self.user2, writer)) writer.write_events.assert_called() @@ -127,7 +127,7 @@ class ExfiltrateData(unittest.HomeserverTestCase): writer = Mock() - self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + self.get_success(self.admin_handler.export_user_data(self.user2, writer)) writer.write_events.assert_called() @@ -169,7 +169,7 @@ class ExfiltrateData(unittest.HomeserverTestCase): writer = Mock() - self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + self.get_success(self.admin_handler.export_user_data(self.user2, writer)) writer.write_events.assert_called_once() @@ -198,7 +198,7 @@ class ExfiltrateData(unittest.HomeserverTestCase): writer = Mock() - self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer)) + self.get_success(self.admin_handler.export_user_data(self.user2, writer)) writer.write_events.assert_not_called() writer.write_state.assert_not_called() From 9ccea16d45416397b37fa407709ff455bca415e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Jul 2019 14:07:56 +0100 Subject: [PATCH 5/8] Assume key existence. Update docstrings --- synapse/handlers/admin.py | 4 +--- synapse/storage/stream.py | 16 ++++++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 69d2c8c36f..f06914a378 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -209,9 +209,7 @@ class AdminHandler(BaseHandler): # Now check if this event is an unseen prev event, if so # then we remove this event from the appropriate dicts. for child_id in unseen_to_child_events.pop(event.event_id, []): - event_to_unseen_prevs.get(child_id, set()).discard( - event.event_id - ) + event_to_unseen_prevs[child_id].discard(event.event_id) written_events.add(event.event_id) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d9482a3843..7b5b3b8c8d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -833,7 +833,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: Deferred[tuple[list[_EventDictReturn], str]]: Returns the results as a list of _EventDictReturn and a token that points to the end - of the result set. + of the result set. If no events are returned then the end of the + stream has been reached (i.e. there are no events between + `from_token` and `to_token`). """ assert int(limit) >= 0 @@ -905,15 +907,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): only those before direction(char): Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. - limit (int): The maximum number of events to return. Zero or less - means no limit. + limit (int): The maximum number of events to return. event_filter (Filter|None): If provided filters the events to those that match the filter. Returns: - tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts have - the keys "event_id", "topological_ordering" and "stream_orderign". + tuple[list[FrozenEvents], str]: Returns the results as a list of + dicts and a token that points to the end of the result set. The + dicts have the keys "event_id", "topological_ordering" and + "stream_ordering". If no events are returned then the end of the + stream has been reached (i.e. there are no events between + `from_key` and `to_key`). """ from_key = RoomStreamToken.parse(from_key) From eadb13d2e9caaa391f4efe2609a7d54d1723d311 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Jul 2019 14:15:00 +0100 Subject: [PATCH 6/8] Remove FileExfiltrationWriter --- synapse/handlers/admin.py | 63 --------------------------------------- 1 file changed, 63 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index f06914a378..5ff02c12bf 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -276,66 +276,3 @@ class ExfiltrationWriter(object): `export_user_data`. """ pass - - -class FileExfiltrationWriter(ExfiltrationWriter): - """An ExfiltrationWriter that writes the user's data to a directory. - - Returns the directory location on completion. - - Args: - user_id (str): The user whose data is being exported. - directory (str|None): The directory to write the data to. If None then - will write to a temporary directory. - """ - - def __init__(self, user_id, directory=None): - self.user_id = user_id - - if directory: - self.base_directory = directory - else: - self.base_directory = tempfile.mkdtemp( - prefix="synapse-exported__%s__" % (user_id,) - ) - - os.makedirs(self.base_directory, exist_ok=True) - if list(os.listdir(self.base_directory)): - raise Exception("Directory must be empty") - - def write_events(self, room_id, events): - room_directory = os.path.join(self.base_directory, "rooms", room_id) - os.makedirs(room_directory, exist_ok=True) - events_file = os.path.join(room_directory, "events") - - with open(events_file, "a") as f: - for event in events: - print(json.dumps(event.get_pdu_json()), file=f) - - def write_state(self, room_id, event_id, state): - room_directory = os.path.join(self.base_directory, "rooms", room_id) - state_directory = os.path.join(room_directory, "state") - os.makedirs(state_directory, exist_ok=True) - - event_file = os.path.join(state_directory, event_id) - - with open(event_file, "a") as f: - for event in state.values(): - print(json.dumps(event.get_pdu_json()), file=f) - - def write_invite(self, room_id, event, state): - self.write_events(room_id, [event]) - - # We write the invite state somewhere else as they aren't full events - # and are only a subset of the state at the event. - room_directory = os.path.join(self.base_directory, "rooms", room_id) - os.makedirs(room_directory, exist_ok=True) - - invite_state = os.path.join(room_directory, "invite_state") - - with open(invite_state, "a") as f: - for event in state.values(): - print(json.dumps(event), file=f) - - def finished(self): - return self.base_directory From b4f5416dd9bd7635a4b859e3d13eaee992096ef7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Jul 2019 14:41:29 +0100 Subject: [PATCH 7/8] pep8 --- synapse/handlers/admin.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 5ff02c12bf..e8a651e231 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -14,10 +14,6 @@ # limitations under the License. import logging -import os -import tempfile - -from canonicaljson import json from twisted.internet import defer From db0a50bc40e4aca4a03fa3a09d33497f57f95c2d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2019 16:59:59 +0100 Subject: [PATCH 8/8] Fixup docstrings --- synapse/storage/stream.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f8e3007d67..a0465484df 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -835,7 +835,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): as a list of _EventDictReturn and a token that points to the end of the result set. If no events are returned then the end of the stream has been reached (i.e. there are no events between - `from_token` and `to_token`). + `from_token` and `to_token`), or `limit` is zero. """ assert int(limit) >= 0 @@ -912,12 +912,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): those that match the filter. Returns: - tuple[list[FrozenEvents], str]: Returns the results as a list of - dicts and a token that points to the end of the result set. The - dicts have the keys "event_id", "topological_ordering" and - "stream_ordering". If no events are returned then the end of the - stream has been reached (i.e. there are no events between - `from_key` and `to_key`). + tuple[list[FrozenEvent], str]: Returns the results as a list of + events and a token that points to the end of the result set. If no + events are returned then the end of the stream has been reached + (i.e. there are no events between `from_key` and `to_key`). """ from_key = RoomStreamToken.parse(from_key)