diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 43f2986f89..74d7ac8a67 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2970,7 +2970,7 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(max_stream_id) async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8a7b4916cd..d1556659e3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1145,7 +1145,7 @@ class EventCreationHandler: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(max_stream_id) def _notify(): try: diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index b7ea4438e0..28bd8ab748 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -91,7 +91,7 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + def on_new_notifications(self, max_stream_ordering): if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index f21fa9b659..26706bf3e1 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -114,7 +114,7 @@ class HttpPusher: if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + def on_new_notifications(self, max_stream_ordering): self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering or 0 ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 3c3262a88c..fa8473bf8d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -64,6 +64,12 @@ class PusherPool: self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + # Record the last stream ID that we were poked about so we can get + # changes since then. We set this to the current max stream ID on + # startup as every individual pusher will have checked for changes on + # startup. + self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering() + # map from user id to app_id:pushkey to pusher self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] @@ -178,20 +184,27 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, min_stream_id, max_stream_id): + async def on_new_notifications(self, max_stream_id): if not self.pushers: # nothing to do here. return + if max_stream_id < self._last_room_stream_id_seen: + # Nothing to do + return + + prev_stream_id = self._last_room_stream_id_seen + self._last_room_stream_id_seen = max_stream_id + try: users_affected = await self.store.get_push_action_users_in_range( - min_stream_id, max_stream_id + prev_stream_id, max_stream_id ) for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(min_stream_id, max_stream_id) + p.on_new_notifications(max_stream_id) except Exception: logger.exception("Exception in pusher on_new_notifications") diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index d6ecf5b327..ccd3147dfd 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -154,7 +154,8 @@ class ReplicationDataHandler: max_token = self.store.get_room_max_stream_ordering() self.notifier.on_new_room_event(event, token, max_token, extra_users) - await self.pusher_pool.on_new_notifications(token, token) + max_token = self.store.get_room_max_stream_ordering() + await self.pusher_pool.on_new_notifications(max_token) # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index ae6bc24f4c..f306a09bfa 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -80,6 +80,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): "get_user_directory_stream_pos", "get_current_state_deltas", "get_device_updates_by_remote", + "get_room_max_stream_ordering", ] )