Spread out sending device lists to remote hosts (#12132)

This commit is contained in:
Erik Johnston 2022-03-04 11:48:15 +00:00 committed by GitHub
parent 87c230c27c
commit 423cca9efe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 79 additions and 16 deletions

View File

@ -0,0 +1 @@
Improve performance of logging in for large accounts.

View File

@ -244,7 +244,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
self.notifier.on_new_replication_data() self.notifier.on_new_replication_data()
def send_device_messages(self, destination: str) -> None: def send_device_messages(self, destination: str, immediate: bool = False) -> None:
"""As per FederationSender""" """As per FederationSender"""
# We don't need to replicate this as it gets sent down a different # We don't need to replicate this as it gets sent down a different
# stream. # stream.

View File

@ -118,7 +118,12 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def send_device_messages(self, destination: str) -> None: def send_device_messages(self, destination: str, immediate: bool = True) -> None:
"""Tells the sender that a new device message is ready to be sent to the
destination. The `immediate` flag specifies whether the messages should
be tried to be sent immediately, or whether it can be delayed for a
short while (to aid performance).
"""
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
@ -146,9 +151,8 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
@attr.s @attr.s
class _PresenceQueue: class _DestinationWakeupQueue:
"""A queue of destinations that need to be woken up due to new presence """A queue of destinations that need to be woken up due to new updates.
updates.
Staggers waking up of per destination queues to ensure that we don't attempt Staggers waking up of per destination queues to ensure that we don't attempt
to start TLS connections with many hosts all at once, leading to pinned CPU. to start TLS connections with many hosts all at once, leading to pinned CPU.
@ -175,7 +179,7 @@ class _PresenceQueue:
if not self.processing: if not self.processing:
self._handle() self._handle()
@wrap_as_background_process("_PresenceQueue.handle") @wrap_as_background_process("_DestinationWakeupQueue.handle")
async def _handle(self) -> None: async def _handle(self) -> None:
"""Background process to drain the queue.""" """Background process to drain the queue."""
@ -297,7 +301,7 @@ class FederationSender(AbstractFederationSender):
self._external_cache = hs.get_external_cache() self._external_cache = hs.get_external_cache()
self._presence_queue = _PresenceQueue(self, self.clock) self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination """Get or create a PerDestinationQueue for the given destination
@ -614,7 +618,7 @@ class FederationSender(AbstractFederationSender):
states, start_loop=False states, start_loop=False
) )
self._presence_queue.add_to_queue(destination) self._destination_wakeup_queue.add_to_queue(destination)
def build_and_send_edu( def build_and_send_edu(
self, self,
@ -667,7 +671,7 @@ class FederationSender(AbstractFederationSender):
else: else:
queue.send_edu(edu) queue.send_edu(edu)
def send_device_messages(self, destination: str) -> None: def send_device_messages(self, destination: str, immediate: bool = False) -> None:
if destination == self.server_name: if destination == self.server_name:
logger.warning("Not sending device update to ourselves") logger.warning("Not sending device update to ourselves")
return return
@ -677,7 +681,11 @@ class FederationSender(AbstractFederationSender):
): ):
return return
self._get_per_destination_queue(destination).attempt_new_transaction() if immediate:
self._get_per_destination_queue(destination).attempt_new_transaction()
else:
self._get_per_destination_queue(destination).mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)
def wake_destination(self, destination: str) -> None: def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote. """Called when we want to retry sending transactions to a remote.

View File

@ -219,6 +219,16 @@ class PerDestinationQueue:
self._pending_edus.append(edu) self._pending_edus.append(edu)
self.attempt_new_transaction() self.attempt_new_transaction()
def mark_new_data(self) -> None:
"""Marks that the destination has new data to send, without starting a
new transaction.
If a transaction loop is already in progress then a new transcation will
be attempted when the current one finishes.
"""
self._new_data_to_send = True
def attempt_new_transaction(self) -> None: def attempt_new_transaction(self) -> None:
"""Try to start a new transaction to this destination """Try to start a new transaction to this destination

View File

@ -506,7 +506,7 @@ class DeviceHandler(DeviceWorkerHandler):
"Sending device list update notif for %r to: %r", user_id, hosts "Sending device list update notif for %r to: %r", user_id, hosts
) )
for host in hosts: for host in hosts:
self.federation_sender.send_device_messages(host) self.federation_sender.send_device_messages(host, immediate=False)
log_kv({"message": "sent device update to host", "host": host}) log_kv({"message": "sent device update to host", "host": host})
async def notify_user_signature_update( async def notify_user_signature_update(

View File

@ -380,7 +380,7 @@ class FederationSenderHandler:
# changes. # changes.
hosts = {row.entity for row in rows if not row.entity.startswith("@")} hosts = {row.entity for row in rows if not row.entity.startswith("@")}
for host in hosts: for host in hosts:
self.federation_sender.send_device_messages(host) self.federation_sender.send_device_messages(host, immediate=False)
elif stream_name == ToDeviceStream.NAME: elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local # The to_device stream includes stuff to be pushed to both local

View File

@ -201,9 +201,12 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.assertEqual(len(self.edus), 1) self.assertEqual(len(self.edus), 1)
stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None) stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# a second call should produce no new device EDUs # a second call should produce no new device EDUs
self.hs.get_federation_sender().send_device_messages("host2") self.hs.get_federation_sender().send_device_messages("host2")
self.pump()
self.assertEqual(self.edus, []) self.assertEqual(self.edus, [])
# a second device # a second device
@ -232,6 +235,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
device1_signing_key = self.generate_and_upload_device_signing_key(u1, "D1") device1_signing_key = self.generate_and_upload_device_signing_key(u1, "D1")
device2_signing_key = self.generate_and_upload_device_signing_key(u1, "D2") device2_signing_key = self.generate_and_upload_device_signing_key(u1, "D2")
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# expect two more edus # expect two more edus
self.assertEqual(len(self.edus), 2) self.assertEqual(len(self.edus), 2)
stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", stream_id) stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", stream_id)
@ -265,6 +272,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
e2e_handler.upload_signing_keys_for_user(u1, cross_signing_keys) e2e_handler.upload_signing_keys_for_user(u1, cross_signing_keys)
) )
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# expect signing key update edu # expect signing key update edu
self.assertEqual(len(self.edus), 2) self.assertEqual(len(self.edus), 2)
self.assertEqual(self.edus.pop(0)["edu_type"], "m.signing_key_update") self.assertEqual(self.edus.pop(0)["edu_type"], "m.signing_key_update")
@ -284,6 +295,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
) )
self.assertEqual(ret["failures"], {}) self.assertEqual(ret["failures"], {})
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# expect two edus, in one or two transactions. We don't know what order the # expect two edus, in one or two transactions. We don't know what order the
# devices will be updated. # devices will be updated.
self.assertEqual(len(self.edus), 2) self.assertEqual(len(self.edus), 2)
@ -307,6 +322,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.login("user", "pass", device_id="D2") self.login("user", "pass", device_id="D2")
self.login("user", "pass", device_id="D3") self.login("user", "pass", device_id="D3")
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# expect three edus # expect three edus
self.assertEqual(len(self.edus), 3) self.assertEqual(len(self.edus), 3)
stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None) stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
@ -318,6 +337,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"]) self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
) )
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# expect three edus, in an unknown order # expect three edus, in an unknown order
self.assertEqual(len(self.edus), 3) self.assertEqual(len(self.edus), 3)
for edu in self.edus: for edu in self.edus:
@ -350,12 +373,19 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"]) self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
) )
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
self.assertGreaterEqual(mock_send_txn.call_count, 4) self.assertGreaterEqual(mock_send_txn.call_count, 4)
# recover the server # recover the server
mock_send_txn.side_effect = self.record_transaction mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2") self.hs.get_federation_sender().send_device_messages("host2")
self.pump()
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# for each device, there should be a single update # for each device, there should be a single update
self.assertEqual(len(self.edus), 3) self.assertEqual(len(self.edus), 3)
@ -390,6 +420,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"]) self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
) )
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
self.assertGreaterEqual(mock_send_txn.call_count, 4) self.assertGreaterEqual(mock_send_txn.call_count, 4)
# run the prune job # run the prune job
@ -401,7 +435,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
# recover the server # recover the server
mock_send_txn.side_effect = self.record_transaction mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2") self.hs.get_federation_sender().send_device_messages("host2")
self.pump()
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# there should be a single update for this user. # there should be a single update for this user.
self.assertEqual(len(self.edus), 1) self.assertEqual(len(self.edus), 1)
@ -435,6 +472,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.login("user", "pass", device_id="D2") self.login("user", "pass", device_id="D2")
self.login("user", "pass", device_id="D3") self.login("user", "pass", device_id="D3")
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# delete them again # delete them again
self.get_success( self.get_success(
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"]) self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
@ -451,7 +492,10 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
# recover the server # recover the server
mock_send_txn.side_effect = self.record_transaction mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2") self.hs.get_federation_sender().send_device_messages("host2")
self.pump()
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)
# ... and we should get a single update for this user. # ... and we should get a single update for this user.
self.assertEqual(len(self.edus), 1) self.assertEqual(len(self.edus), 1)