diff --git a/changelog.d/17765.misc b/changelog.d/17765.misc new file mode 100644 index 0000000000..af4e5c85ea --- /dev/null +++ b/changelog.d/17765.misc @@ -0,0 +1 @@ +Increase performance of the notifier when there are many syncing users. diff --git a/synapse/notifier.py b/synapse/notifier.py index 7a2b54036c..744cbddfa3 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -63,6 +63,7 @@ from synapse.types import ( ) from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure +from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -120,14 +121,13 @@ class _NotifierUserStream: ): self.user_id = user_id self.rooms = set(rooms) - self.current_token = current_token # The last token for which we should wake up any streams that have a # token that comes before it. This gets updated every time we get poked. # We start it at the current token since if we get any streams # that have a token from before we have no idea whether they should be # woken up or not, so lets just wake them up. - self.last_notified_token = current_token + self.current_token = current_token self.last_notified_ms = time_now_ms self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred( @@ -136,33 +136,19 @@ class _NotifierUserStream: def notify( self, - stream_key: StreamKeyType, - stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken], + current_token: StreamToken, time_now_ms: int, ) -> None: """Notify any listeners for this user of a new event from an event source. Args: - stream_key: The stream the event came from. - stream_id: The new id for the stream the event came from. + current_token: The new current token. time_now_ms: The current time in milliseconds. """ - self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) - self.last_notified_token = self.current_token + self.current_token = current_token self.last_notified_ms = time_now_ms notify_deferred = self.notify_deferred - log_kv( - { - "notify": self.user_id, - "stream": stream_key, - "stream_id": stream_id, - "listeners": self.count_listeners(), - } - ) - - users_woken_by_stream_counter.labels(stream_key).inc() - with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) notify_deferred.callback(self.current_token) @@ -191,7 +177,7 @@ class _NotifierUserStream: """ # Immediately wake up stream if something has already since happened # since their last token. - if self.last_notified_token != token: + if self.current_token != token: return _NotificationListener(defer.succeed(self.current_token)) else: return _NotificationListener(self.notify_deferred.observe()) @@ -342,14 +328,17 @@ class Notifier: # Wake up all related user stream notifiers user_streams = self.room_to_user_streams.get(room_id, set()) time_now_ms = self.clock.time_msec() + current_token = self.event_sources.get_current_token() for user_stream in user_streams: try: - user_stream.notify( - StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms - ) + user_stream.notify(current_token, time_now_ms) except Exception: logger.exception("Failed to notify listener") + users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc( + len(user_streams) + ) + # Poke the replication so that other workers also see the write to # the un-partial-stated rooms stream. self.notify_replication() @@ -519,12 +508,16 @@ class Notifier: rooms = rooms or [] with Measure(self.clock, "on_new_event"): - user_streams = set() + user_streams: Set[_NotifierUserStream] = set() log_kv( { "waking_up_explicit_users": len(users), "waking_up_explicit_rooms": len(rooms), + "users": shortstr(users), + "rooms": shortstr(rooms), + "stream": stream_key, + "stream_id": new_token, } ) @@ -544,12 +537,15 @@ class Notifier: ) time_now_ms = self.clock.time_msec() + current_token = self.event_sources.get_current_token() for user_stream in user_streams: try: - user_stream.notify(stream_key, new_token, time_now_ms) + user_stream.notify(current_token, time_now_ms) except Exception: logger.exception("Failed to notify listener") + users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams)) + self.notify_replication() # Notify appservices. diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 63df31ec75..c52a5b2e79 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -282,22 +282,33 @@ class SyncTypingTests(unittest.HomeserverTestCase): self.assertEqual(200, channel.code) next_batch = channel.json_body["next_batch"] - # This should time out! But it does not, because our stream token is - # ahead, and therefore it's saying the typing (that we've actually - # already seen) is new, since it's got a token above our new, now-reset - # stream token. - channel = self.make_request("GET", sync_url % (access_token, next_batch)) - self.assertEqual(200, channel.code) - next_batch = channel.json_body["next_batch"] - # Clear the typing information, so that it doesn't think everything is - # in the future. + # in the future. This happens automatically when the typing stream + # resets. typing._reset() - # Now it SHOULD fail as it never completes! + # Nothing new, so we time out. with self.assertRaises(TimedOutException): self.make_request("GET", sync_url % (access_token, next_batch)) + # Sync and start typing again. + sync_channel = self.make_request( + "GET", sync_url % (access_token, next_batch), await_result=False + ) + self.assertFalse(sync_channel.is_finished()) + + channel = self.make_request( + "PUT", + typing_url % (room, other_user_id, other_access_token), + b'{"typing": true, "timeout": 30000}', + ) + self.assertEqual(200, channel.code) + + # Sync should now return. + sync_channel.await_result() + self.assertEqual(200, sync_channel.code) + next_batch = sync_channel.json_body["next_batch"] + class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin): servlets = [