Fixup pusher pool notifications
This commit is contained in:
parent
453dfe210b
commit
e7fd336a53
|
@ -2970,7 +2970,7 @@ class FederationHandler(BaseHandler):
|
||||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
|
await self.pusher_pool.on_new_notifications(max_stream_id)
|
||||||
|
|
||||||
async def _clean_room_for_join(self, room_id: str) -> None:
|
async def _clean_room_for_join(self, room_id: str) -> None:
|
||||||
"""Called to clean up any data in DB for a given room, ready for the
|
"""Called to clean up any data in DB for a given room, ready for the
|
||||||
|
|
|
@ -1145,7 +1145,7 @@ class EventCreationHandler:
|
||||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||||
self._message_handler.maybe_schedule_expiry(event)
|
self._message_handler.maybe_schedule_expiry(event)
|
||||||
|
|
||||||
await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
|
await self.pusher_pool.on_new_notifications(max_stream_id)
|
||||||
|
|
||||||
def _notify():
|
def _notify():
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -91,7 +91,7 @@ class EmailPusher:
|
||||||
pass
|
pass
|
||||||
self.timed_call = None
|
self.timed_call = None
|
||||||
|
|
||||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
def on_new_notifications(self, max_stream_ordering):
|
||||||
if self.max_stream_ordering:
|
if self.max_stream_ordering:
|
||||||
self.max_stream_ordering = max(
|
self.max_stream_ordering = max(
|
||||||
max_stream_ordering, self.max_stream_ordering
|
max_stream_ordering, self.max_stream_ordering
|
||||||
|
|
|
@ -114,7 +114,7 @@ class HttpPusher:
|
||||||
if should_check_for_notifs:
|
if should_check_for_notifs:
|
||||||
self._start_processing()
|
self._start_processing()
|
||||||
|
|
||||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
def on_new_notifications(self, max_stream_ordering):
|
||||||
self.max_stream_ordering = max(
|
self.max_stream_ordering = max(
|
||||||
max_stream_ordering, self.max_stream_ordering or 0
|
max_stream_ordering, self.max_stream_ordering or 0
|
||||||
)
|
)
|
||||||
|
|
|
@ -64,6 +64,12 @@ class PusherPool:
|
||||||
self._pusher_shard_config = hs.config.push.pusher_shard_config
|
self._pusher_shard_config = hs.config.push.pusher_shard_config
|
||||||
self._instance_name = hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
|
# Record the last stream ID that we were poked about so we can get
|
||||||
|
# changes since then. We set this to the current max stream ID on
|
||||||
|
# startup as every individual pusher will have checked for changes on
|
||||||
|
# startup.
|
||||||
|
self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering()
|
||||||
|
|
||||||
# map from user id to app_id:pushkey to pusher
|
# map from user id to app_id:pushkey to pusher
|
||||||
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
|
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
|
||||||
|
|
||||||
|
@ -178,20 +184,27 @@ class PusherPool:
|
||||||
)
|
)
|
||||||
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
|
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
|
||||||
|
|
||||||
async def on_new_notifications(self, min_stream_id, max_stream_id):
|
async def on_new_notifications(self, max_stream_id):
|
||||||
if not self.pushers:
|
if not self.pushers:
|
||||||
# nothing to do here.
|
# nothing to do here.
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if max_stream_id < self._last_room_stream_id_seen:
|
||||||
|
# Nothing to do
|
||||||
|
return
|
||||||
|
|
||||||
|
prev_stream_id = self._last_room_stream_id_seen
|
||||||
|
self._last_room_stream_id_seen = max_stream_id
|
||||||
|
|
||||||
try:
|
try:
|
||||||
users_affected = await self.store.get_push_action_users_in_range(
|
users_affected = await self.store.get_push_action_users_in_range(
|
||||||
min_stream_id, max_stream_id
|
prev_stream_id, max_stream_id
|
||||||
)
|
)
|
||||||
|
|
||||||
for u in users_affected:
|
for u in users_affected:
|
||||||
if u in self.pushers:
|
if u in self.pushers:
|
||||||
for p in self.pushers[u].values():
|
for p in self.pushers[u].values():
|
||||||
p.on_new_notifications(min_stream_id, max_stream_id)
|
p.on_new_notifications(max_stream_id)
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Exception in pusher on_new_notifications")
|
logger.exception("Exception in pusher on_new_notifications")
|
||||||
|
|
|
@ -154,7 +154,8 @@ class ReplicationDataHandler:
|
||||||
max_token = self.store.get_room_max_stream_ordering()
|
max_token = self.store.get_room_max_stream_ordering()
|
||||||
self.notifier.on_new_room_event(event, token, max_token, extra_users)
|
self.notifier.on_new_room_event(event, token, max_token, extra_users)
|
||||||
|
|
||||||
await self.pusher_pool.on_new_notifications(token, token)
|
max_token = self.store.get_room_max_stream_ordering()
|
||||||
|
await self.pusher_pool.on_new_notifications(max_token)
|
||||||
|
|
||||||
# Notify any waiting deferreds. The list is ordered by position so we
|
# Notify any waiting deferreds. The list is ordered by position so we
|
||||||
# just iterate through the list until we reach a position that is
|
# just iterate through the list until we reach a position that is
|
||||||
|
|
|
@ -80,6 +80,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
||||||
"get_user_directory_stream_pos",
|
"get_user_directory_stream_pos",
|
||||||
"get_current_state_deltas",
|
"get_current_state_deltas",
|
||||||
"get_device_updates_by_remote",
|
"get_device_updates_by_remote",
|
||||||
|
"get_room_max_stream_ordering",
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue