Be smarter about which hosts to send presence to when processing room joins (#9402)
This PR attempts to eliminate unnecessary presence sending work when your local server joins a room, or when a remote server joins a room your server is participating in by processing state deltas in chunks rather than individually. --- When your server joins a room for the first time, it requests the historical state as well. This chunk of new state is passed to the presence handler which, after filtering that state down to only membership joins, will send presence updates to homeservers for each join processed. It turns out that we were being a bit naive and processing each event individually, and sending out presence updates for every one of those joins. Even if many different joins were users on the same server (hello IRC bridges), we'd send presence to that same homeserver for every remote user join we saw. This PR attempts to deduplicate all of that by processing the entire batch of state deltas at once, instead of only doing each join individually. We process the joins and note down which servers need which presence: * If it was a local user join, send that user's latest presence to all servers in the room * If it was a remote user join, send the presence for all local users in the room to that homeserver We deduplicate by inserting all of those pending updates into a dictionary of the form: ``` { server_name1: {presence_update1, ...}, server_name2: {presence_update1, presence_update2, ...} } ``` Only after building this dict do we then start sending out presence updates.
This commit is contained in:
parent
13e9029f44
commit
8bcfc2eaad
|
@ -0,0 +1 @@
|
|||
Fix a bug where a lot of unnecessary presence updates were sent when joining a room.
|
|
@ -474,7 +474,7 @@ class FederationSender:
|
|||
self._processing_pending_presence = False
|
||||
|
||||
def send_presence_to_destinations(
|
||||
self, states: List[UserPresenceState], destinations: List[str]
|
||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||
) -> None:
|
||||
"""Send the given presence states to the given destinations.
|
||||
destinations (list[str])
|
||||
|
|
|
@ -849,6 +849,9 @@ class PresenceHandler(BasePresenceHandler):
|
|||
"""Process current state deltas to find new joins that need to be
|
||||
handled.
|
||||
"""
|
||||
# A map of destination to a set of user state that they should receive
|
||||
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]
|
||||
|
||||
for delta in deltas:
|
||||
typ = delta["type"]
|
||||
state_key = delta["state_key"]
|
||||
|
@ -858,6 +861,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
|
||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||
|
||||
# Drop any event that isn't a membership join
|
||||
if typ != EventTypes.Member:
|
||||
continue
|
||||
|
||||
|
@ -880,13 +884,38 @@ class PresenceHandler(BasePresenceHandler):
|
|||
# Ignore changes to join events.
|
||||
continue
|
||||
|
||||
await self._on_user_joined_room(room_id, state_key)
|
||||
# Retrieve any user presence state updates that need to be sent as a result,
|
||||
# and the destinations that need to receive it
|
||||
destinations, user_presence_states = await self._on_user_joined_room(
|
||||
room_id, state_key
|
||||
)
|
||||
|
||||
async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
|
||||
# Insert the destinations and respective updates into our destinations dict
|
||||
for destination in destinations:
|
||||
presence_destinations.setdefault(destination, set()).update(
|
||||
user_presence_states
|
||||
)
|
||||
|
||||
# Send out user presence updates for each destination
|
||||
for destination, user_state_set in presence_destinations.items():
|
||||
self.federation.send_presence_to_destinations(
|
||||
destinations=[destination], states=user_state_set
|
||||
)
|
||||
|
||||
async def _on_user_joined_room(
|
||||
self, room_id: str, user_id: str
|
||||
) -> Tuple[List[str], List[UserPresenceState]]:
|
||||
"""Called when we detect a user joining the room via the current state
|
||||
delta stream.
|
||||
"""
|
||||
delta stream. Returns the destinations that need to be updated and the
|
||||
presence updates to send to them.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room that the user has joined.
|
||||
user_id: The ID of the user that has joined the room.
|
||||
|
||||
Returns:
|
||||
A tuple of destinations and presence updates to send to them.
|
||||
"""
|
||||
if self.is_mine_id(user_id):
|
||||
# If this is a local user then we need to send their presence
|
||||
# out to hosts in the room (who don't already have it)
|
||||
|
@ -894,15 +923,15 @@ class PresenceHandler(BasePresenceHandler):
|
|||
# TODO: We should be able to filter the hosts down to those that
|
||||
# haven't previously seen the user
|
||||
|
||||
state = await self.current_state_for_user(user_id)
|
||||
hosts = await self.state.get_current_hosts_in_room(room_id)
|
||||
remote_hosts = await self.state.get_current_hosts_in_room(room_id)
|
||||
|
||||
# Filter out ourselves.
|
||||
hosts = {host for host in hosts if host != self.server_name}
|
||||
filtered_remote_hosts = [
|
||||
host for host in remote_hosts if host != self.server_name
|
||||
]
|
||||
|
||||
self.federation.send_presence_to_destinations(
|
||||
states=[state], destinations=hosts
|
||||
)
|
||||
state = await self.current_state_for_user(user_id)
|
||||
return filtered_remote_hosts, [state]
|
||||
else:
|
||||
# A remote user has joined the room, so we need to:
|
||||
# 1. Check if this is a new server in the room
|
||||
|
@ -915,6 +944,8 @@ class PresenceHandler(BasePresenceHandler):
|
|||
# TODO: Check that this is actually a new server joining the
|
||||
# room.
|
||||
|
||||
remote_host = get_domain_from_id(user_id)
|
||||
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
user_ids = list(filter(self.is_mine_id, users))
|
||||
|
||||
|
@ -934,10 +965,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
or state.status_msg is not None
|
||||
]
|
||||
|
||||
if states:
|
||||
self.federation.send_presence_to_destinations(
|
||||
states=states, destinations=[get_domain_from_id(user_id)]
|
||||
)
|
||||
return [remote_host], states
|
||||
|
||||
|
||||
def should_notify(old_state, new_state):
|
||||
|
|
|
@ -521,7 +521,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
)
|
||||
self.assertEqual(expected_state.state, PresenceState.ONLINE)
|
||||
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
||||
destinations=["server2"], states=[expected_state]
|
||||
destinations=["server2"], states={expected_state}
|
||||
)
|
||||
|
||||
#
|
||||
|
@ -533,7 +533,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
self.federation_sender.send_presence.assert_not_called()
|
||||
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
||||
destinations=["server3"], states=[expected_state]
|
||||
destinations=["server3"], states={expected_state}
|
||||
)
|
||||
|
||||
def test_remote_gets_presence_when_local_user_joins(self):
|
||||
|
@ -584,8 +584,14 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
self.presence_handler.current_state_for_user("@test2:server")
|
||||
)
|
||||
self.assertEqual(expected_state.state, PresenceState.ONLINE)
|
||||
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
||||
destinations={"server2", "server3"}, states=[expected_state]
|
||||
self.assertEqual(
|
||||
self.federation_sender.send_presence_to_destinations.call_count, 2
|
||||
)
|
||||
self.federation_sender.send_presence_to_destinations.assert_any_call(
|
||||
destinations=["server3"], states={expected_state}
|
||||
)
|
||||
self.federation_sender.send_presence_to_destinations.assert_any_call(
|
||||
destinations=["server2"], states={expected_state}
|
||||
)
|
||||
|
||||
def _add_new_user(self, room_id, user_id):
|
||||
|
|
Loading…
Reference in New Issue