From a3f8ec284a766883f4fadabbe91bc89ca7ca2a50 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Sep 2024 22:55:45 +0100 Subject: [PATCH 1/5] Move metrics out of hot path We can update the counter once outside of the loop. --- synapse/notifier.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 7a2b54036c..e28c6eddb9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -63,6 +63,7 @@ from synapse.types import ( ) from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure +from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -152,17 +153,6 @@ class _NotifierUserStream: self.last_notified_ms = time_now_ms notify_deferred = self.notify_deferred - log_kv( - { - "notify": self.user_id, - "stream": stream_key, - "stream_id": stream_id, - "listeners": self.count_listeners(), - } - ) - - users_woken_by_stream_counter.labels(stream_key).inc() - with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) notify_deferred.callback(self.current_token) @@ -350,6 +340,10 @@ class Notifier: except Exception: logger.exception("Failed to notify listener") + users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc( + len(user_streams) + ) + # Poke the replication so that other workers also see the write to # the un-partial-stated rooms stream. self.notify_replication() @@ -519,12 +513,16 @@ class Notifier: rooms = rooms or [] with Measure(self.clock, "on_new_event"): - user_streams = set() + user_streams: Set[_NotifierUserStream] = set() log_kv( { "waking_up_explicit_users": len(users), "waking_up_explicit_rooms": len(rooms), + "users": shortstr(users), + "rooms": shortstr(rooms), + "stream": stream_key, + "stream_id": new_token, } ) @@ -550,6 +548,8 @@ class Notifier: except Exception: logger.exception("Failed to notify listener") + users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams)) + self.notify_replication() # Notify appservices. From 9d3e8d7fcd2fda0cbbe0a85990267fd0a553c9ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Sep 2024 09:06:50 +0100 Subject: [PATCH 2/5] Calculate new current token once Turns out doing `.copy_and_advance` can be expensive --- synapse/notifier.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e28c6eddb9..9d72fd4190 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -121,14 +121,13 @@ class _NotifierUserStream: ): self.user_id = user_id self.rooms = set(rooms) - self.current_token = current_token # The last token for which we should wake up any streams that have a # token that comes before it. This gets updated every time we get poked. # We start it at the current token since if we get any streams # that have a token from before we have no idea whether they should be # woken up or not, so lets just wake them up. - self.last_notified_token = current_token + self.current_token = current_token self.last_notified_ms = time_now_ms self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred( @@ -137,8 +136,7 @@ class _NotifierUserStream: def notify( self, - stream_key: StreamKeyType, - stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken], + current_token: StreamToken, time_now_ms: int, ) -> None: """Notify any listeners for this user of a new event from an @@ -148,8 +146,7 @@ class _NotifierUserStream: stream_id: The new id for the stream the event came from. time_now_ms: The current time in milliseconds. """ - self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) - self.last_notified_token = self.current_token + self.current_token = current_token self.last_notified_ms = time_now_ms notify_deferred = self.notify_deferred @@ -181,7 +178,7 @@ class _NotifierUserStream: """ # Immediately wake up stream if something has already since happened # since their last token. - if self.last_notified_token != token: + if self.current_token != token: return _NotificationListener(defer.succeed(self.current_token)) else: return _NotificationListener(self.notify_deferred.observe()) @@ -332,11 +329,10 @@ class Notifier: # Wake up all related user stream notifiers user_streams = self.room_to_user_streams.get(room_id, set()) time_now_ms = self.clock.time_msec() + current_token = self.event_sources.get_current_token() for user_stream in user_streams: try: - user_stream.notify( - StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms - ) + user_stream.notify(current_token, time_now_ms) except Exception: logger.exception("Failed to notify listener") @@ -542,9 +538,10 @@ class Notifier: ) time_now_ms = self.clock.time_msec() + current_token = self.event_sources.get_current_token() for user_stream in user_streams: try: - user_stream.notify(stream_key, new_token, time_now_ms) + user_stream.notify(current_token, time_now_ms) except Exception: logger.exception("Failed to notify listener") From 6452e22e8ff863ad1add4c04e8cc373439112a6c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Sep 2024 09:53:33 +0100 Subject: [PATCH 3/5] Newsfile --- changelog.d/17765.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17765.misc diff --git a/changelog.d/17765.misc b/changelog.d/17765.misc new file mode 100644 index 0000000000..af4e5c85ea --- /dev/null +++ b/changelog.d/17765.misc @@ -0,0 +1 @@ +Increase performance of the notifier when there are many syncing users. From e85f8f77c2d72e2386a48b2cba4cb0ff4e818c19 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Sep 2024 10:19:42 +0100 Subject: [PATCH 4/5] Fix typing tests --- tests/rest/client/test_sync.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 63df31ec75..c52a5b2e79 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -282,22 +282,33 @@ class SyncTypingTests(unittest.HomeserverTestCase): self.assertEqual(200, channel.code) next_batch = channel.json_body["next_batch"] - # This should time out! But it does not, because our stream token is - # ahead, and therefore it's saying the typing (that we've actually - # already seen) is new, since it's got a token above our new, now-reset - # stream token. - channel = self.make_request("GET", sync_url % (access_token, next_batch)) - self.assertEqual(200, channel.code) - next_batch = channel.json_body["next_batch"] - # Clear the typing information, so that it doesn't think everything is - # in the future. + # in the future. This happens automatically when the typing stream + # resets. typing._reset() - # Now it SHOULD fail as it never completes! + # Nothing new, so we time out. with self.assertRaises(TimedOutException): self.make_request("GET", sync_url % (access_token, next_batch)) + # Sync and start typing again. + sync_channel = self.make_request( + "GET", sync_url % (access_token, next_batch), await_result=False + ) + self.assertFalse(sync_channel.is_finished()) + + channel = self.make_request( + "PUT", + typing_url % (room, other_user_id, other_access_token), + b'{"typing": true, "timeout": 30000}', + ) + self.assertEqual(200, channel.code) + + # Sync should now return. + sync_channel.await_result() + self.assertEqual(200, sync_channel.code) + next_batch = sync_channel.json_body["next_batch"] + class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin): servlets = [ From 210f76f4b97b856a9a5c6d79e0ee10b5a0e2b270 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Sep 2024 10:23:37 +0100 Subject: [PATCH 5/5] Fix up doc string --- synapse/notifier.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 9d72fd4190..744cbddfa3 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -142,8 +142,7 @@ class _NotifierUserStream: """Notify any listeners for this user of a new event from an event source. Args: - stream_key: The stream the event came from. - stream_id: The new id for the stream the event came from. + current_token: The new current token. time_now_ms: The current time in milliseconds. """ self.current_token = current_token