From 8d3542a64e2689a00ed87f9bd58fe3e1d3b10ed8 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 22 May 2019 16:42:00 -0400 Subject: [PATCH 01/15] implement federation parts of cross-signing --- .../sender/per_destination_queue.py | 4 +- synapse/handlers/device.py | 13 +- synapse/handlers/e2e_keys.py | 116 +++++++++++++++++- synapse/storage/devices.py | 56 ++++++++- 4 files changed, 179 insertions(+), 10 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index fad980b893..0486af2dbf 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -366,10 +366,10 @@ class PerDestinationQueue(object): Edu( origin=self._server_name, destination=self._destination, - edu_type="m.device_list_update", + edu_type=edu_type, content=content, ) - for content in results + for (edu_type, content) in results ] assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs" diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5f23ee4488..cd6eb52316 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -458,7 +458,18 @@ class DeviceHandler(DeviceWorkerHandler): @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - return {"user_id": user_id, "stream_id": stream_id, "devices": devices} + master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master") + self_signing_key = yield self.store.get_e2e_cross_signing_key( + user_id, "self_signing" + ) + + return { + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + "master_key": master_key, + "self_signing_key": self_signing_key + } @defer.inlineCallbacks def user_left_room(self, user, room_id): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5ea54f60be..849ee04f93 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -36,6 +36,8 @@ from synapse.types import ( get_verify_key_from_cross_signing_key, ) from synapse.util import unwrapFirstError +from synapse.util.async_helpers import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -49,10 +51,17 @@ class E2eKeysHandler(object): self.is_mine = hs.is_mine self.clock = hs.get_clock() + self._edu_updater = SigningKeyEduUpdater(hs, self) + + federation_registry = hs.get_federation_registry() + + federation_registry.register_edu_handler( + "m.signing_key_update", self._edu_updater.incoming_signing_key_update, + ) # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the # "query handler" interface. - hs.get_federation_registry().register_query_handler( + federation_registry.register_query_handler( "client_keys", self.on_federation_query_client_keys ) @@ -343,7 +352,15 @@ class E2eKeysHandler(object): """ device_keys_query = query_body.get("device_keys", {}) res = yield self.query_local_devices(device_keys_query) - return {"device_keys": res} + ret = {"device_keys": res} + + # add in the cross-signing keys + cross_signing_keys = yield self.query_cross_signing_keys(device_keys_query) + + for key, value in iteritems(cross_signing_keys): + ret[key + "_keys"] = value + + return ret @trace @defer.inlineCallbacks @@ -1047,3 +1064,98 @@ class SignatureListItem: target_user_id = attr.ib() target_device_id = attr.ib() signature = attr.ib() + + +class SigningKeyEduUpdater(object): + "Handles incoming signing key updates from federation and updates the DB" + + def __init__(self, hs, e2e_keys_handler): + self.store = hs.get_datastore() + self.federation = hs.get_federation_client() + self.clock = hs.get_clock() + self.e2e_keys_handler = e2e_keys_handler + + self._remote_edu_linearizer = Linearizer(name="remote_signing_key") + + # user_id -> list of updates waiting to be handled. + self._pending_updates = {} + + # Recently seen stream ids. We don't bother keeping these in the DB, + # but they're useful to have them about to reduce the number of spurious + # resyncs. + self._seen_updates = ExpiringCache( + cache_name="signing_key_update_edu", + clock=self.clock, + max_len=10000, + expiry_ms=30 * 60 * 1000, + iterable=True, + ) + + @defer.inlineCallbacks + def incoming_signing_key_update(self, origin, edu_content): + """Called on incoming signing key update from federation. Responsible for + parsing the EDU and adding to pending updates list. + + Args: + origin (string): the server that sent the EDU + edu_content (dict): the contents of the EDU + """ + + user_id = edu_content.pop("user_id") + master_key = edu_content.pop("master_key", None) + self_signing_key = edu_content.pop("self_signing_key", None) + + if get_domain_from_id(user_id) != origin: + # TODO: Raise? + logger.warning("Got signing key update edu for %r from %r", user_id, origin) + return + + room_ids = yield self.store.get_rooms_for_user(user_id) + if not room_ids: + # We don't share any rooms with this user. Ignore update, as we + # probably won't get any further updates. + return + + self._pending_updates.setdefault(user_id, []).append( + (master_key, self_signing_key, edu_content) + ) + + yield self._handle_signing_key_updates(user_id) + + @defer.inlineCallbacks + def _handle_signing_key_updates(self, user_id): + """Actually handle pending updates. + + Args: + user_id (string): the user whose updates we are processing + """ + + device_handler = self.e2e_keys_handler.device_handler + + with (yield self._remote_edu_linearizer.queue(user_id)): + pending_updates = self._pending_updates.pop(user_id, []) + if not pending_updates: + # This can happen since we batch updates + return + + device_ids = [] + + logger.info("pending updates: %r", pending_updates) + + for master_key, self_signing_key, edu_content in pending_updates: + if master_key: + yield self.store.set_e2e_cross_signing_key( + user_id, "master", master_key + ) + device_id = \ + get_verify_key_from_cross_signing_key(master_key)[1].version + device_ids.append(device_id) + if self_signing_key: + yield self.store.set_e2e_cross_signing_key( + user_id, "self_signing", self_signing_key + ) + device_id = \ + get_verify_key_from_cross_signing_key(self_signing_key)[1].version + device_ids.append(device_id) + + yield device_handler.notify_device_update(user_id, device_ids) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index f7a3542348..182e95fa21 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -94,9 +94,10 @@ class DeviceWorkerStore(SQLBaseStore): """Get stream of updates to send to remote servers Returns: - Deferred[tuple[int, list[dict]]]: + Deferred[tuple[int, list[tuple[string,dict]]]]: current stream id (ie, the stream id of the last update included in the - response), and the list of updates + response), and the list of updates, where each update is a pair of EDU + type and EDU contents """ now_stream_id = self._device_list_id_gen.get_current_token() @@ -129,6 +130,25 @@ class DeviceWorkerStore(SQLBaseStore): if not updates: return now_stream_id, [] + # get the cross-signing keys of the users the list + users = set(r[0] for r in updates) + master_key_by_user = {} + self_signing_key_by_user = {} + for user in users: + cross_signing_key = yield self.get_e2e_cross_signing_key(user, "master") + key_id, verify_key = get_verify_key_from_cross_signing_key(cross_signing_key) + master_key_by_user[user] = { + "key_info": cross_signing_key, + "pubkey": verify_key.version + } + + cross_signing_key = yield self.get_e2e_cross_signing_key(user, "self_signing") + key_id, verify_key = get_verify_key_from_cross_signing_key(cross_signing_key) + self_signing_key_by_user[user] = { + "key_info": cross_signing_key, + "pubkey": verify_key.version + } + # if we have exceeded the limit, we need to exclude any results with the # same stream_id as the last row. if len(updates) > limit: @@ -158,6 +178,10 @@ class DeviceWorkerStore(SQLBaseStore): # Stop processing updates break + if update[1] == master_key_by_user[update[0]]["pubkey"] or \ + update[1] == self_signing_key_by_user[update[0]]["pubkey"]: + continue + key = (update[0], update[1]) update_context = update[3] @@ -172,16 +196,37 @@ class DeviceWorkerStore(SQLBaseStore): # means that there are more than limit updates all of which have the same # steam_id. + # figure out which cross-signing keys were changed by intersecting the + # update list with the master/self-signing key by user maps + cross_signing_keys_by_user = {} + for user_id, device_id, stream in updates: + if device_id == master_key_by_user[user_id]["pubkey"]: + result = cross_signing_keys_by_user.setdefault(user_id, {}) + result["master_key"] = \ + master_key_by_user[user_id]["key_info"] + elif device_id == self_signing_key_by_user[user_id]["pubkey"]: + result = cross_signing_keys_by_user.setdefault(user_id, {}) + result["self_signing_key"] = \ + self_signing_key_by_user[user_id]["key_info"] + + cross_signing_results = [] + + # add the updated cross-signing keys to the results list + for user_id, result in iteritems(cross_signing_keys_by_user): + result["user_id"] = user_id + cross_signing_results.append(("m.signing_key_update", result)) + # That should only happen if a client is spamming the server with new # devices, in which case E2E isn't going to work well anyway. We'll just # skip that stream_id and return an empty list, and continue with the next # stream_id next time. - if not query_map: + if not query_map and not cross_signing_results: return stream_id_cutoff, [] results = yield self._get_device_update_edus_by_remote( destination, from_stream_id, query_map ) + results.extend(cross_signing_results) return now_stream_id, results @@ -200,6 +245,7 @@ class DeviceWorkerStore(SQLBaseStore): Returns: List: List of device updates """ + # get the list of device updates that need to be sent sql = """ SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? @@ -231,7 +277,7 @@ class DeviceWorkerStore(SQLBaseStore): query_map.keys(), include_all_devices=True, include_deleted_devices=True, - ) + ) if query_map else {} results = [] for user_id, user_devices in iteritems(devices): @@ -262,7 +308,7 @@ class DeviceWorkerStore(SQLBaseStore): else: result["deleted"] = True - results.append(result) + results.append(("m.device_list_update", result)) return results From a1aaf3eea6fabb3d61a28e3f75afdb33b41304ee Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 22 May 2019 21:24:21 -0400 Subject: [PATCH 02/15] don't crash if the user doesn't have cross-signing keys --- synapse/storage/devices.py | 39 ++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 182e95fa21..f46978c9c3 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -136,18 +136,24 @@ class DeviceWorkerStore(SQLBaseStore): self_signing_key_by_user = {} for user in users: cross_signing_key = yield self.get_e2e_cross_signing_key(user, "master") - key_id, verify_key = get_verify_key_from_cross_signing_key(cross_signing_key) - master_key_by_user[user] = { - "key_info": cross_signing_key, - "pubkey": verify_key.version - } + if cross_signing_key: + key_id, verify_key = get_verify_key_from_cross_signing_key( + cross_signing_key + ) + master_key_by_user[user] = { + "key_info": cross_signing_key, + "pubkey": verify_key.version + } cross_signing_key = yield self.get_e2e_cross_signing_key(user, "self_signing") - key_id, verify_key = get_verify_key_from_cross_signing_key(cross_signing_key) - self_signing_key_by_user[user] = { - "key_info": cross_signing_key, - "pubkey": verify_key.version - } + if cross_signing_key: + key_id, verify_key = get_verify_key_from_cross_signing_key( + cross_signing_key + ) + self_signing_key_by_user[user] = { + "key_info": cross_signing_key, + "pubkey": verify_key.version + } # if we have exceeded the limit, we need to exclude any results with the # same stream_id as the last row. @@ -178,8 +184,11 @@ class DeviceWorkerStore(SQLBaseStore): # Stop processing updates break - if update[1] == master_key_by_user[update[0]]["pubkey"] or \ - update[1] == self_signing_key_by_user[update[0]]["pubkey"]: + # skip over cross-signing keys + if (update[0] in master_key_by_user + and update[1] == master_key_by_user[update[0]]["pubkey"]) \ + or (update[0] in master_key_by_user + and update[1] == self_signing_key_by_user[update[0]]["pubkey"]): continue key = (update[0], update[1]) @@ -200,11 +209,13 @@ class DeviceWorkerStore(SQLBaseStore): # update list with the master/self-signing key by user maps cross_signing_keys_by_user = {} for user_id, device_id, stream in updates: - if device_id == master_key_by_user[user_id]["pubkey"]: + if device_id == master_key_by_user.get(user_id, {}) \ + .get("pubkey", None): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["master_key"] = \ master_key_by_user[user_id]["key_info"] - elif device_id == self_signing_key_by_user[user_id]["pubkey"]: + elif device_id == self_signing_key_by_user.get(user_id, {}) \ + .get("pubkey", None): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["self_signing_key"] = \ self_signing_key_by_user[user_id]["key_info"] From cfdb84422dba2ca28eacb65aca960aecc5598658 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Mon, 22 Jul 2019 13:04:55 -0400 Subject: [PATCH 03/15] make black happy --- synapse/handlers/e2e_keys.py | 12 ++++---- synapse/storage/devices.py | 54 +++++++++++++++++++++--------------- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 849ee04f93..f3cfba0bd3 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -56,7 +56,7 @@ class E2eKeysHandler(object): federation_registry = hs.get_federation_registry() federation_registry.register_edu_handler( - "m.signing_key_update", self._edu_updater.incoming_signing_key_update, + "m.signing_key_update", self._edu_updater.incoming_signing_key_update ) # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the @@ -1147,15 +1147,17 @@ class SigningKeyEduUpdater(object): yield self.store.set_e2e_cross_signing_key( user_id, "master", master_key ) - device_id = \ - get_verify_key_from_cross_signing_key(master_key)[1].version + device_id = get_verify_key_from_cross_signing_key(master_key)[ + 1 + ].version device_ids.append(device_id) if self_signing_key: yield self.store.set_e2e_cross_signing_key( user_id, "self_signing", self_signing_key ) - device_id = \ - get_verify_key_from_cross_signing_key(self_signing_key)[1].version + device_id = get_verify_key_from_cross_signing_key(self_signing_key)[ + 1 + ].version device_ids.append(device_id) yield device_handler.notify_device_update(user_id, device_ids) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index f46978c9c3..60bf6d68ec 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -37,6 +37,7 @@ from synapse.storage._base import ( make_in_list_sql_clause, ) from synapse.storage.background_updates import BackgroundUpdateStore +from synapse.types import get_verify_key_from_cross_signing_key from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList @@ -142,17 +143,19 @@ class DeviceWorkerStore(SQLBaseStore): ) master_key_by_user[user] = { "key_info": cross_signing_key, - "pubkey": verify_key.version + "pubkey": verify_key.version, } - cross_signing_key = yield self.get_e2e_cross_signing_key(user, "self_signing") + cross_signing_key = yield self.get_e2e_cross_signing_key( + user, "self_signing" + ) if cross_signing_key: key_id, verify_key = get_verify_key_from_cross_signing_key( cross_signing_key ) self_signing_key_by_user[user] = { "key_info": cross_signing_key, - "pubkey": verify_key.version + "pubkey": verify_key.version, } # if we have exceeded the limit, we need to exclude any results with the @@ -185,10 +188,13 @@ class DeviceWorkerStore(SQLBaseStore): break # skip over cross-signing keys - if (update[0] in master_key_by_user - and update[1] == master_key_by_user[update[0]]["pubkey"]) \ - or (update[0] in master_key_by_user - and update[1] == self_signing_key_by_user[update[0]]["pubkey"]): + if ( + update[0] in master_key_by_user + and update[1] == master_key_by_user[update[0]]["pubkey"] + ) or ( + update[0] in master_key_by_user + and update[1] == self_signing_key_by_user[update[0]]["pubkey"] + ): continue key = (update[0], update[1]) @@ -209,16 +215,16 @@ class DeviceWorkerStore(SQLBaseStore): # update list with the master/self-signing key by user maps cross_signing_keys_by_user = {} for user_id, device_id, stream in updates: - if device_id == master_key_by_user.get(user_id, {}) \ - .get("pubkey", None): + if device_id == master_key_by_user.get(user_id, {}).get("pubkey", None): result = cross_signing_keys_by_user.setdefault(user_id, {}) - result["master_key"] = \ - master_key_by_user[user_id]["key_info"] - elif device_id == self_signing_key_by_user.get(user_id, {}) \ - .get("pubkey", None): + result["master_key"] = master_key_by_user[user_id]["key_info"] + elif device_id == self_signing_key_by_user.get(user_id, {}).get( + "pubkey", None + ): result = cross_signing_keys_by_user.setdefault(user_id, {}) - result["self_signing_key"] = \ - self_signing_key_by_user[user_id]["key_info"] + result["self_signing_key"] = self_signing_key_by_user[user_id][ + "key_info" + ] cross_signing_results = [] @@ -282,13 +288,17 @@ class DeviceWorkerStore(SQLBaseStore): List[Dict]: List of objects representing an device update EDU """ - devices = yield self.runInteraction( - "_get_e2e_device_keys_txn", - self._get_e2e_device_keys_txn, - query_map.keys(), - include_all_devices=True, - include_deleted_devices=True, - ) if query_map else {} + devices = ( + yield self.runInteraction( + "_get_e2e_device_keys_txn", + self._get_e2e_device_keys_txn, + query_map.keys(), + include_all_devices=True, + include_deleted_devices=True, + ) + if query_map + else {} + ) results = [] for user_id, user_devices in iteritems(devices): From 41ad35b5235ad9ed8a1b8889287ae840ee3373bd Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Fri, 2 Aug 2019 18:03:23 -0400 Subject: [PATCH 04/15] add missing param --- synapse/handlers/e2e_keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index f3cfba0bd3..6b65f47d18 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -355,7 +355,7 @@ class E2eKeysHandler(object): ret = {"device_keys": res} # add in the cross-signing keys - cross_signing_keys = yield self.query_cross_signing_keys(device_keys_query) + cross_signing_keys = yield self.query_cross_signing_keys(device_keys_query, None) for key, value in iteritems(cross_signing_keys): ret[key + "_keys"] = value From 1fabf82d50f3db25ce0e4a93f349d90eb2d30a16 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 22 Oct 2019 21:44:58 -0400 Subject: [PATCH 05/15] update to work with newer code, and fix formatting --- synapse/handlers/device.py | 2 +- synapse/handlers/e2e_keys.py | 9 +++++---- synapse/storage/devices.py | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index cd6eb52316..fd8d14b680 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -468,7 +468,7 @@ class DeviceHandler(DeviceWorkerHandler): "stream_id": stream_id, "devices": devices, "master_key": master_key, - "self_signing_key": self_signing_key + "self_signing_key": self_signing_key, } @defer.inlineCallbacks diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 6b65f47d18..73572f4614 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -260,7 +260,7 @@ class E2eKeysHandler(object): Returns: defer.Deferred[dict[str, dict[str, dict]]]: map from - (master|self_signing|user_signing) -> user_id -> key + (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key """ master_keys = {} self_signing_keys = {} @@ -355,10 +355,11 @@ class E2eKeysHandler(object): ret = {"device_keys": res} # add in the cross-signing keys - cross_signing_keys = yield self.query_cross_signing_keys(device_keys_query, None) + cross_signing_keys = yield self.get_cross_signing_keys_from_cache( + device_keys_query, None + ) - for key, value in iteritems(cross_signing_keys): - ret[key + "_keys"] = value + ret.update(cross_signing_keys) return ret diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 60bf6d68ec..1aaef1fbb0 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -214,7 +214,7 @@ class DeviceWorkerStore(SQLBaseStore): # figure out which cross-signing keys were changed by intersecting the # update list with the master/self-signing key by user maps cross_signing_keys_by_user = {} - for user_id, device_id, stream in updates: + for user_id, device_id, stream, _opentracing_context in updates: if device_id == master_key_by_user.get(user_id, {}).get("pubkey", None): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["master_key"] = master_key_by_user[user_id]["key_info"] From 056383953529132d8df3b352a05106c105bfb917 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 22 Oct 2019 21:51:01 -0400 Subject: [PATCH 06/15] add news file --- changelog.d/5727.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5727.feature diff --git a/changelog.d/5727.feature b/changelog.d/5727.feature new file mode 100644 index 0000000000..819bebf2d7 --- /dev/null +++ b/changelog.d/5727.feature @@ -0,0 +1 @@ +Add federation support for cross-signing. From 3e3f9b684e2853217d86349f289780b397afa88a Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 22 Oct 2019 22:26:30 -0400 Subject: [PATCH 07/15] fix unit test --- tests/storage/test_devices.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index 3cc18f9f1c..039cc79357 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -137,7 +137,9 @@ class DeviceStoreTestCase(tests.unittest.TestCase): """Check that an specific device ids exist in a list of device update EDUs""" self.assertEqual(len(device_updates), len(expected_device_ids)) - received_device_ids = {update["device_id"] for update in device_updates} + received_device_ids = { + update["device_id"] for edu_type, update in device_updates + } self.assertEqual(received_device_ids, set(expected_device_ids)) @defer.inlineCallbacks From 404e8c85321b4fe9b9b74e07841367c4cf201551 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 22 Oct 2019 22:33:23 -0400 Subject: [PATCH 08/15] vendor-prefix the EDU name until MSC1756 is merged into the spec --- synapse/handlers/e2e_keys.py | 3 ++- synapse/storage/devices.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 73572f4614..d25af42f5f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -55,8 +55,9 @@ class E2eKeysHandler(object): federation_registry = hs.get_federation_registry() + # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec federation_registry.register_edu_handler( - "m.signing_key_update", self._edu_updater.incoming_signing_key_update + "org.matrix.signing_key_update", self._edu_updater.incoming_signing_key_update ) # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 1aaef1fbb0..6ac165068e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -231,7 +231,8 @@ class DeviceWorkerStore(SQLBaseStore): # add the updated cross-signing keys to the results list for user_id, result in iteritems(cross_signing_keys_by_user): result["user_id"] = user_id - cross_signing_results.append(("m.signing_key_update", result)) + # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec + cross_signing_results.append(("org.matrix.signing_key_update", result)) # That should only happen if a client is spamming the server with new # devices, in which case E2E isn't going to work well anyway. We'll just From 480eac30eb543ce6947009fa90b8409f153eb3a4 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Tue, 22 Oct 2019 22:37:16 -0400 Subject: [PATCH 09/15] black --- synapse/handlers/e2e_keys.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d25af42f5f..f6aa9a940b 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -57,7 +57,8 @@ class E2eKeysHandler(object): # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec federation_registry.register_edu_handler( - "org.matrix.signing_key_update", self._edu_updater.incoming_signing_key_update + "org.matrix.signing_key_update", + self._edu_updater.incoming_signing_key_update, ) # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the From dc2cd6f79d413012688ef89e8999c0c2207e198e Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 23 Oct 2019 09:13:47 -0400 Subject: [PATCH 10/15] move get_e2e_cross_signing_key to EndToEndKeyWorkerStore so it works with workers --- synapse/storage/end_to_end_keys.py | 134 ++++++++++++++--------------- 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 2effe08592..f9bef14992 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -249,6 +249,73 @@ class EndToEndKeyWorkerStore(SQLBaseStore): return self.runInteraction("count_e2e_one_time_keys", _count_e2e_one_time_keys) + def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None): + """Returns a user's cross-signing key. + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + user_id (str): the user whose key is being requested + key_type (str): the type of key that is being set: either 'master' + for a master key, 'self_signing' for a self-signing key, or + 'user_signing' for a user-signing key + from_user_id (str): if specified, signatures made by this user on + the key will be included in the result + + Returns: + dict of the key data or None if not found + """ + sql = ( + "SELECT keydata " + " FROM e2e_cross_signing_keys " + " WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1" + ) + txn.execute(sql, (user_id, key_type)) + row = txn.fetchone() + if not row: + return None + key = json.loads(row[0]) + + device_id = None + for k in key["keys"].values(): + device_id = k + + if from_user_id is not None: + sql = ( + "SELECT key_id, signature " + " FROM e2e_cross_signing_signatures " + " WHERE user_id = ? " + " AND target_user_id = ? " + " AND target_device_id = ? " + ) + txn.execute(sql, (from_user_id, user_id, device_id)) + row = txn.fetchone() + if row: + key.setdefault("signatures", {}).setdefault(from_user_id, {})[ + row[0] + ] = row[1] + + return key + + def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None): + """Returns a user's cross-signing key. + + Args: + user_id (str): the user whose self-signing key is being requested + key_type (str): the type of cross-signing key to get + from_user_id (str): if specified, signatures made by this user on + the self-signing key will be included in the result + + Returns: + dict of the key data or None if not found + """ + return self.runInteraction( + "get_e2e_cross_signing_key", + self._get_e2e_cross_signing_key_txn, + user_id, + key_type, + from_user_id, + ) + class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): @@ -427,73 +494,6 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): key, ) - def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None): - """Returns a user's cross-signing key. - - Args: - txn (twisted.enterprise.adbapi.Connection): db connection - user_id (str): the user whose key is being requested - key_type (str): the type of key that is being set: either 'master' - for a master key, 'self_signing' for a self-signing key, or - 'user_signing' for a user-signing key - from_user_id (str): if specified, signatures made by this user on - the key will be included in the result - - Returns: - dict of the key data or None if not found - """ - sql = ( - "SELECT keydata " - " FROM e2e_cross_signing_keys " - " WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1" - ) - txn.execute(sql, (user_id, key_type)) - row = txn.fetchone() - if not row: - return None - key = json.loads(row[0]) - - device_id = None - for k in key["keys"].values(): - device_id = k - - if from_user_id is not None: - sql = ( - "SELECT key_id, signature " - " FROM e2e_cross_signing_signatures " - " WHERE user_id = ? " - " AND target_user_id = ? " - " AND target_device_id = ? " - ) - txn.execute(sql, (from_user_id, user_id, device_id)) - row = txn.fetchone() - if row: - key.setdefault("signatures", {}).setdefault(from_user_id, {})[ - row[0] - ] = row[1] - - return key - - def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None): - """Returns a user's cross-signing key. - - Args: - user_id (str): the user whose self-signing key is being requested - key_type (str): the type of cross-signing key to get - from_user_id (str): if specified, signatures made by this user on - the self-signing key will be included in the result - - Returns: - dict of the key data or None if not found - """ - return self.runInteraction( - "get_e2e_cross_signing_key", - self._get_e2e_cross_signing_key_txn, - user_id, - key_type, - from_user_id, - ) - def store_e2e_cross_signing_signatures(self, user_id, signatures): """Stores cross-signing signatures. From ff05c9b760ba5736a189b320c2e0d4592d0072a4 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Thu, 24 Oct 2019 21:46:11 -0400 Subject: [PATCH 11/15] don't error if federation query doesn't have cross-signing keys --- synapse/handlers/e2e_keys.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index f6aa9a940b..4ab75a351e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -218,13 +218,15 @@ class E2eKeysHandler(object): if user_id in destination_query: results[user_id] = keys - for user_id, key in remote_result["master_keys"].items(): - if user_id in destination_query: - cross_signing_keys["master_keys"][user_id] = key + if "master_keys" in remote_result: + for user_id, key in remote_result["master_keys"].items(): + if user_id in destination_query: + cross_signing_keys["master_keys"][user_id] = key - for user_id, key in remote_result["self_signing_keys"].items(): - if user_id in destination_query: - cross_signing_keys["self_signing_keys"][user_id] = key + if "self_signing_keys" in remote_result: + for user_id, key in remote_result["self_signing_keys"].items(): + if user_id in destination_query: + cross_signing_keys["self_signing_keys"][user_id] = key except Exception as e: failure = _exception_to_failure(e) From d78b1e339dd813214d8a8316c38a3be31ad8f132 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 30 Oct 2019 10:01:53 -0400 Subject: [PATCH 12/15] apply changes as a result of PR review --- synapse/handlers/e2e_keys.py | 22 +++--- synapse/storage/data_stores/main/devices.py | 79 ++++++++++----------- 2 files changed, 46 insertions(+), 55 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 4ab75a351e..0f320b3764 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1072,7 +1072,7 @@ class SignatureListItem: class SigningKeyEduUpdater(object): - "Handles incoming signing key updates from federation and updates the DB" + """Handles incoming signing key updates from federation and updates the DB""" def __init__(self, hs, e2e_keys_handler): self.store = hs.get_datastore() @@ -1111,7 +1111,6 @@ class SigningKeyEduUpdater(object): self_signing_key = edu_content.pop("self_signing_key", None) if get_domain_from_id(user_id) != origin: - # TODO: Raise? logger.warning("Got signing key update edu for %r from %r", user_id, origin) return @@ -1122,7 +1121,7 @@ class SigningKeyEduUpdater(object): return self._pending_updates.setdefault(user_id, []).append( - (master_key, self_signing_key, edu_content) + (master_key, self_signing_key) ) yield self._handle_signing_key_updates(user_id) @@ -1147,22 +1146,21 @@ class SigningKeyEduUpdater(object): logger.info("pending updates: %r", pending_updates) - for master_key, self_signing_key, edu_content in pending_updates: + for master_key, self_signing_key in pending_updates: if master_key: yield self.store.set_e2e_cross_signing_key( user_id, "master", master_key ) - device_id = get_verify_key_from_cross_signing_key(master_key)[ - 1 - ].version - device_ids.append(device_id) + _, verify_key = get_verify_key_from_cross_signing_key(master_key) + # verify_key is a VerifyKey from signedjson, which uses + # .version to denote the portion of the key ID after the + # algorithm and colon, which is the device ID + device_ids.append(verify_key.version) if self_signing_key: yield self.store.set_e2e_cross_signing_key( user_id, "self_signing", self_signing_key ) - device_id = get_verify_key_from_cross_signing_key(self_signing_key)[ - 1 - ].version - device_ids.append(device_id) + _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key) + device_ids.append(verify_key.version) yield device_handler.notify_device_update(user_id, device_ids) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 6ac165068e..0b12bc58c4 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -92,8 +92,12 @@ class DeviceWorkerStore(SQLBaseStore): @trace @defer.inlineCallbacks def get_devices_by_remote(self, destination, from_stream_id, limit): - """Get stream of updates to send to remote servers + """Get a stream of device updates to send to the given remote server. + Args: + destination (str): The host the device updates are intended for + from_stream_id (int): The minimum stream_id to filter updates by, exclusive + limit (int): Maximum number of device updates to return Returns: Deferred[tuple[int, list[tuple[string,dict]]]]: current stream id (ie, the stream id of the last update included in the @@ -131,7 +135,8 @@ class DeviceWorkerStore(SQLBaseStore): if not updates: return now_stream_id, [] - # get the cross-signing keys of the users the list + # get the cross-signing keys of the users in the list, so that we can + # determine which of the device changes were cross-signing keys users = set(r[0] for r in updates) master_key_by_user = {} self_signing_key_by_user = {} @@ -141,9 +146,12 @@ class DeviceWorkerStore(SQLBaseStore): key_id, verify_key = get_verify_key_from_cross_signing_key( cross_signing_key ) + # verify_key is a VerifyKey from signedjson, which uses + # .version to denote the portion of the key ID after the + # algorithm and colon, which is the device ID master_key_by_user[user] = { "key_info": cross_signing_key, - "pubkey": verify_key.version, + "device_id": verify_key.version, } cross_signing_key = yield self.get_e2e_cross_signing_key( @@ -155,7 +163,7 @@ class DeviceWorkerStore(SQLBaseStore): ) self_signing_key_by_user[user] = { "key_info": cross_signing_key, - "pubkey": verify_key.version, + "device_id": verify_key.version, } # if we have exceeded the limit, we need to exclude any results with the @@ -182,69 +190,54 @@ class DeviceWorkerStore(SQLBaseStore): # context which created the Edu. query_map = {} - for update in updates: - if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: + cross_signing_keys_by_user = {} + for user_id, device_id, update_stream_id, update_context in updates: + if stream_id_cutoff is not None and update_stream_id >= stream_id_cutoff: # Stop processing updates break - # skip over cross-signing keys if ( - update[0] in master_key_by_user - and update[1] == master_key_by_user[update[0]]["pubkey"] - ) or ( - update[0] in master_key_by_user - and update[1] == self_signing_key_by_user[update[0]]["pubkey"] + user_id in master_key_by_user + and device_id == master_key_by_user[user_id]["device_id"] ): - continue - - key = (update[0], update[1]) - - update_context = update[3] - update_stream_id = update[2] - - previous_update_stream_id, _ = query_map.get(key, (0, None)) - - if update_stream_id > previous_update_stream_id: - query_map[key] = (update_stream_id, update_context) - - # If we didn't find any updates with a stream_id lower than the cutoff, it - # means that there are more than limit updates all of which have the same - # steam_id. - - # figure out which cross-signing keys were changed by intersecting the - # update list with the master/self-signing key by user maps - cross_signing_keys_by_user = {} - for user_id, device_id, stream, _opentracing_context in updates: - if device_id == master_key_by_user.get(user_id, {}).get("pubkey", None): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["master_key"] = master_key_by_user[user_id]["key_info"] - elif device_id == self_signing_key_by_user.get(user_id, {}).get( - "pubkey", None + elif ( + user_id in master_key_by_user + and device_id == self_signing_key_by_user[user_id]["device_id"] ): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["self_signing_key"] = self_signing_key_by_user[user_id][ "key_info" ] + else: + key = (user_id, device_id) - cross_signing_results = [] + previous_update_stream_id, _ = query_map.get(key, (0, None)) - # add the updated cross-signing keys to the results list - for user_id, result in iteritems(cross_signing_keys_by_user): - result["user_id"] = user_id - # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec - cross_signing_results.append(("org.matrix.signing_key_update", result)) + if update_stream_id > previous_update_stream_id: + query_map[key] = (update_stream_id, update_context) + + # If we didn't find any updates with a stream_id lower than the cutoff, it + # means that there are more than limit updates all of which have the same + # steam_id. # That should only happen if a client is spamming the server with new # devices, in which case E2E isn't going to work well anyway. We'll just # skip that stream_id and return an empty list, and continue with the next # stream_id next time. - if not query_map and not cross_signing_results: + if not query_map and not cross_signing_keys_by_user: return stream_id_cutoff, [] results = yield self._get_device_update_edus_by_remote( destination, from_stream_id, query_map ) - results.extend(cross_signing_results) + + # add the updated cross-signing keys to the results list + for user_id, result in iteritems(cross_signing_keys_by_user): + result["user_id"] = user_id + # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec + results.append(("org.matrix.signing_key_update", result)) return now_stream_id, results From bc32f102cd1144923581771b0cc84ead4d99cefb Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 30 Oct 2019 10:07:36 -0400 Subject: [PATCH 13/15] black --- synapse/handlers/e2e_keys.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 0f320b3764..1ab471b3be 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1160,7 +1160,9 @@ class SigningKeyEduUpdater(object): yield self.store.set_e2e_cross_signing_key( user_id, "self_signing", self_signing_key ) - _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key) + _, verify_key = get_verify_key_from_cross_signing_key( + self_signing_key + ) device_ids.append(verify_key.version) yield device_handler.notify_device_update(user_id, device_ids) From bb6cec27a5ac6d5d6d5f67df21610a63745ac0a9 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 30 Oct 2019 14:57:34 -0400 Subject: [PATCH 14/15] rename get_devices_by_remote to get_device_updates_by_remote --- synapse/federation/sender/per_destination_queue.py | 4 ++-- synapse/storage/data_stores/main/devices.py | 8 ++++---- tests/handlers/test_typing.py | 4 ++-- tests/storage/test_devices.py | 12 ++++++------ 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index d5d4a60c88..6e3012cd41 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -359,7 +359,7 @@ class PerDestinationQueue(object): last_device_list = self._last_device_list_stream_id # Retrieve list of new device updates to send to the destination - now_stream_id, results = yield self._store.get_devices_by_remote( + now_stream_id, results = yield self._store.get_device_updates_by_remote( self._destination, last_device_list, limit=limit ) edus = [ @@ -372,7 +372,7 @@ class PerDestinationQueue(object): for (edu_type, content) in results ] - assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs" + assert len(edus) <= limit, "get_device_updates_by_remote returned too many EDUs" return (edus, now_stream_id) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 0b12bc58c4..717eab4159 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -91,7 +91,7 @@ class DeviceWorkerStore(SQLBaseStore): @trace @defer.inlineCallbacks - def get_devices_by_remote(self, destination, from_stream_id, limit): + def get_device_updates_by_remote(self, destination, from_stream_id, limit): """Get a stream of device updates to send to the given remote server. Args: @@ -123,8 +123,8 @@ class DeviceWorkerStore(SQLBaseStore): # stream_id; the rationale being that such a large device list update # is likely an error. updates = yield self.runInteraction( - "get_devices_by_remote", - self._get_devices_by_remote_txn, + "get_device_updates_by_remote", + self._get_device_updates_by_remote_txn, destination, from_stream_id, now_stream_id, @@ -241,7 +241,7 @@ class DeviceWorkerStore(SQLBaseStore): return now_stream_id, results - def _get_devices_by_remote_txn( + def _get_device_updates_by_remote_txn( self, txn, destination, from_stream_id, now_stream_id, limit ): """Return device update information for a given remote destination diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index f360c8e965..5ec568f4e6 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -73,7 +73,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): "get_received_txn_response", "set_received_txn_response", "get_destination_retry_timings", - "get_devices_by_remote", + "get_device_updates_by_remote", # Bits that user_directory needs "get_user_directory_stream_pos", "get_current_state_deltas", @@ -109,7 +109,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): retry_timings_res ) - self.datastore.get_devices_by_remote.return_value = (0, []) + self.datastore.get_device_updates_by_remote.return_value = (0, []) def get_received_txn_response(*args): return defer.succeed(None) diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index 039cc79357..6f8d990959 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -72,7 +72,7 @@ class DeviceStoreTestCase(tests.unittest.TestCase): ) @defer.inlineCallbacks - def test_get_devices_by_remote(self): + def test_get_device_updates_by_remote(self): device_ids = ["device_id1", "device_id2"] # Add two device updates with a single stream_id @@ -81,7 +81,7 @@ class DeviceStoreTestCase(tests.unittest.TestCase): ) # Get all device updates ever meant for this remote - now_stream_id, device_updates = yield self.store.get_devices_by_remote( + now_stream_id, device_updates = yield self.store.get_device_updates_by_remote( "somehost", -1, limit=100 ) @@ -89,7 +89,7 @@ class DeviceStoreTestCase(tests.unittest.TestCase): self._check_devices_in_updates(device_ids, device_updates) @defer.inlineCallbacks - def test_get_devices_by_remote_limited(self): + def test_get_device_updates_by_remote_limited(self): # Test breaking the update limit in 1, 101, and 1 device_id segments # first add one device @@ -115,20 +115,20 @@ class DeviceStoreTestCase(tests.unittest.TestCase): # # first we should get a single update - now_stream_id, device_updates = yield self.store.get_devices_by_remote( + now_stream_id, device_updates = yield self.store.get_device_updates_by_remote( "someotherhost", -1, limit=100 ) self._check_devices_in_updates(device_ids1, device_updates) # Then we should get an empty list back as the 101 devices broke the limit - now_stream_id, device_updates = yield self.store.get_devices_by_remote( + now_stream_id, device_updates = yield self.store.get_device_updates_by_remote( "someotherhost", now_stream_id, limit=100 ) self.assertEqual(len(device_updates), 0) # The 101 devices should've been cleared, so we should now just get one device # update - now_stream_id, device_updates = yield self.store.get_devices_by_remote( + now_stream_id, device_updates = yield self.store.get_device_updates_by_remote( "someotherhost", now_stream_id, limit=100 ) self._check_devices_in_updates(device_ids3, device_updates) From c3fc176c6047c8194262a64599d000e9cb43f7f8 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Thu, 31 Oct 2019 22:49:48 -0400 Subject: [PATCH 15/15] Update synapse/storage/data_stores/main/devices.py Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/data_stores/main/devices.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 717eab4159..71f62036c0 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -203,7 +203,7 @@ class DeviceWorkerStore(SQLBaseStore): result = cross_signing_keys_by_user.setdefault(user_id, {}) result["master_key"] = master_key_by_user[user_id]["key_info"] elif ( - user_id in master_key_by_user + user_id in self_signing_key_by_user and device_id == self_signing_key_by_user[user_id]["device_id"] ): result = cross_signing_keys_by_user.setdefault(user_id, {})