Speed up sliding sync by avoiding copies (#17670)

We ended up spending ~10% CPU creating a new dictionary and
`_RoomMembershipForUser`, so let's avoid creating new dicts and copying
by returning `newly_joined`, `newly_left` and `is_dm` as sets directly.

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
Erik Johnston 2024-09-06 11:12:29 +01:00 committed by GitHub
parent de3363ef58
commit d5accec2e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 296 additions and 262 deletions

1
changelog.d/17670.misc Normal file
View File

@ -0,0 +1 @@
Small performance improvement in speeding up sliding sync.

View File

@ -25,8 +25,8 @@ from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler
from synapse.handlers.sliding_sync.room_lists import (
RoomsForUserType,
SlidingSyncRoomLists,
_RoomMembershipForUser,
)
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
from synapse.logging.opentracing import (
@ -39,7 +39,9 @@ from synapse.logging.opentracing import (
)
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.roommember import MemberSummary
from synapse.storage.roommember import (
MemberSummary,
)
from synapse.types import (
JsonDict,
PersistedEventPosition,
@ -255,6 +257,8 @@ class SlidingSyncHandler:
],
from_token=from_token,
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
)
# Filter out empty room results during incremental sync
@ -352,7 +356,7 @@ class SlidingSyncHandler:
async def get_current_state_ids_at(
self,
room_id: str,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
state_filter: StateFilter,
to_token: StreamToken,
) -> StateMap[str]:
@ -417,7 +421,7 @@ class SlidingSyncHandler:
async def get_current_state_at(
self,
room_id: str,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
state_filter: StateFilter,
to_token: StreamToken,
) -> StateMap[EventBase]:
@ -457,9 +461,11 @@ class SlidingSyncHandler:
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
newly_joined: bool,
is_dm: bool,
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for the sync response.
@ -475,6 +481,8 @@ class SlidingSyncHandler:
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
is_dm: Whether the room is a DM room
"""
user = sync_config.user
@ -519,7 +527,7 @@ class SlidingSyncHandler:
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not room_membership_for_user_at_to_token.newly_joined:
if from_token and not newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
@ -1044,7 +1052,7 @@ class SlidingSyncHandler:
name=room_name,
avatar=room_avatar,
heroes=heroes,
is_dm=room_membership_for_user_at_to_token.is_dm,
is_dm=is_dm,
initial=initial,
required_state=list(required_room_state.values()),
timeline_events=timeline_events,

View File

@ -19,7 +19,6 @@ from itertools import chain
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
List,
Literal,
@ -48,7 +47,11 @@ from synapse.storage.databases.main.state import (
Sentinel as StateSentinel,
)
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.roommember import RoomsForUser, RoomsForUserSlidingSync
from synapse.storage.roommember import (
RoomsForUser,
RoomsForUserSlidingSync,
RoomsForUserStateReset,
)
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
@ -76,6 +79,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
@attr.s(auto_attribs=True, slots=True, frozen=True)
class SlidingSyncInterestedRooms:
"""The set of rooms and metadata a client is interested in based on their
@ -91,13 +99,22 @@ class SlidingSyncInterestedRooms:
includes the rooms that *may* have relevant updates. Rooms not
in this map will definitely not have room updates (though
extensions may have updates in these rooms).
newly_joined_rooms: The set of rooms that were joined in the token range
and the user is still joined to at the end of this range.
newly_left_rooms: The set of rooms that we left in the token range
and are still "leave" at the end of this range.
dm_room_ids: The set of rooms the user consider as direct-message (DM) rooms
"""
lists: Mapping[str, SlidingSyncResult.SlidingWindowList]
relevant_room_map: Mapping[str, RoomSyncConfig]
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig]
all_rooms: Set[str]
room_membership_for_user_map: Mapping[str, "_RoomMembershipForUser"]
room_membership_for_user_map: Mapping[str, RoomsForUserType]
newly_joined_rooms: AbstractSet[str]
newly_left_rooms: AbstractSet[str]
dm_room_ids: AbstractSet[str]
class Sentinel(enum.Enum):
@ -106,47 +123,10 @@ class Sentinel(enum.Enum):
UNSET_SENTINEL = object()
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RoomMembershipForUser:
"""
Attributes:
room_id: The room ID of the membership event
event_id: The event ID of the membership event
event_pos: The stream position of the membership event
membership: The membership state of the user in the room
sender: The person who sent the membership event
newly_joined: Whether the user newly joined the room during the given token
range and is still joined to the room at the end of this range.
newly_left: Whether the user newly left (or kicked) the room during the given
token range and is still "leave" at the end of this range.
is_dm: Whether this user considers this room as a direct-message (DM) room
"""
room_id: str
# Optional because state resets can affect room membership without a corresponding event.
event_id: Optional[str]
# Even during a state reset which removes the user from the room, we expect this to
# be set because `current_state_delta_stream` will note the position that the reset
# happened.
event_pos: PersistedEventPosition
# Even during a state reset which removes the user from the room, we expect this to
# be set to `LEAVE` because we can make that assumption based on the situaton (see
# `get_current_state_delta_membership_changes_for_user(...)`)
membership: str
# Optional because state resets can affect room membership without a corresponding event.
sender: Optional[str]
newly_joined: bool
newly_left: bool
is_dm: bool
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
return attr.evolve(self, **kwds)
def filter_membership_for_sync(
*,
user_id: str,
room_membership_for_user: Union[_RoomMembershipForUser, RoomsForUserSlidingSync],
room_membership_for_user: RoomsForUserType,
newly_left: bool,
) -> bool:
"""
@ -479,22 +459,10 @@ class SlidingSyncRoomLists:
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map={
# FIXME: Ideally we wouldn't have to create a new
# `_RoomMembershipForUser` here and instead just return
# `newly_joined_room_ids` directly, to save CPU time.
room_id: _RoomMembershipForUser(
room_id=room_id,
event_id=membership_info.event_id,
event_pos=membership_info.event_pos,
sender=membership_info.sender,
membership=membership_info.membership,
newly_joined=room_id in newly_joined_room_ids,
newly_left=room_id in newly_left_room_map,
is_dm=room_id in dm_room_ids,
)
for room_id, membership_info in room_membership_for_user_map.items()
},
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
newly_left_rooms=set(newly_left_room_map),
dm_room_ids=dm_room_ids,
)
async def _compute_interested_rooms_fallback(
@ -506,11 +474,15 @@ class SlidingSyncRoomLists:
) -> SlidingSyncInterestedRooms:
"""Fallback code when the database background updates haven't completed yet."""
room_membership_for_user_map = (
await self.get_room_membership_for_user_at_to_token(
(
room_membership_for_user_map,
newly_joined_room_ids,
newly_left_room_ids,
) = await self.get_room_membership_for_user_at_to_token(
sync_config.user, to_token, from_token
)
)
dm_room_ids = await self._get_dm_rooms_for_user(sync_config.user.to_string())
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
@ -525,6 +497,7 @@ class SlidingSyncRoomLists:
sync_room_map = await self.filter_rooms_relevant_for_sync(
user=sync_config.user,
room_membership_for_user_map=room_membership_for_user_map,
newly_left_room_ids=newly_left_room_ids,
)
for list_key, list_config in sync_config.lists.items():
@ -536,6 +509,7 @@ class SlidingSyncRoomLists:
sync_room_map,
list_config.filters,
to_token,
dm_room_ids,
)
# Find which rooms are partially stated and may need to be filtered out
@ -679,6 +653,9 @@ class SlidingSyncRoomLists:
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
newly_left_rooms=newly_left_room_ids,
dm_room_ids=dm_room_ids,
)
async def _filter_relevant_room_to_send(
@ -755,7 +732,7 @@ class SlidingSyncRoomLists:
async def _get_rewind_changes_to_current_membership_to_token(
self,
user: UserID,
rooms_for_user: Mapping[str, Union[RoomsForUser, RoomsForUserSlidingSync]],
rooms_for_user: Mapping[str, RoomsForUserType],
to_token: StreamToken,
) -> Mapping[str, Optional[RoomsForUser]]:
"""
@ -907,7 +884,7 @@ class SlidingSyncRoomLists:
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Dict[str, _RoomMembershipForUser]:
) -> Tuple[Dict[str, RoomsForUserType], AbstractSet[str], AbstractSet[str]]:
"""
Fetch room IDs that the user has had membership in (the full room list including
long-lost left rooms that will be filtered, sorted, and sliced).
@ -926,8 +903,11 @@ class SlidingSyncRoomLists:
from_token: The point in the stream to sync from.
Returns:
A dictionary of room IDs that the user has had membership in along with
A 3-tuple of:
- A dictionary of room IDs that the user has had membership in along with
membership information in that room at the time of `to_token`.
- Set of newly joined rooms
- Set of newly left rooms
"""
user_id = user.to_string()
@ -944,12 +924,14 @@ class SlidingSyncRoomLists:
# If the user has never joined any rooms before, we can just return an empty list
if not room_for_user_list:
return {}
return {}, set(), set()
# Since we fetched the users room list at some point in time after the
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`.
rooms_for_user = {room.room_id: room for room in room_for_user_list}
rooms_for_user: Dict[str, RoomsForUserType] = {
room.room_id: room for room in room_for_user_list
}
changes = await self._get_rewind_changes_to_current_membership_to_token(
user, rooms_for_user, to_token
)
@ -966,42 +948,23 @@ class SlidingSyncRoomLists:
user_id, to_token=to_token, from_token=from_token
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
room_for_user.room_id: _RoomMembershipForUser(
room_id=room_for_user.room_id,
event_id=room_for_user.event_id,
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
sender=room_for_user.sender,
newly_joined=room_id in newly_joined_room_ids,
newly_left=room_id in newly_left_room_ids,
is_dm=room_id in dm_room_ids,
)
for room_id, room_for_user in rooms_for_user.items()
}
# Ensure we have entries for rooms that the user has been "state reset"
# out of. These are rooms appear in the `newly_left_rooms` map but
# aren't in the `rooms_for_user` map.
for room_id, left_event_pos in newly_left_room_ids.items():
if room_id in sync_room_id_set:
if room_id in rooms_for_user:
continue
sync_room_id_set[room_id] = _RoomMembershipForUser(
rooms_for_user[room_id] = RoomsForUserStateReset(
room_id=room_id,
event_id=None,
event_pos=left_event_pos,
membership=Membership.LEAVE,
sender=None,
newly_joined=False,
newly_left=True,
is_dm=room_id in dm_room_ids,
room_version_id=await self.store.get_room_version_id(room_id),
)
return sync_room_id_set
return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids)
@trace
async def _get_newly_joined_and_left_rooms(
@ -1009,7 +972,7 @@ class SlidingSyncRoomLists:
user_id: str,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]:
) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]:
"""Fetch the sets of rooms that the user newly joined or left in the
given token range.
@ -1162,8 +1125,9 @@ class SlidingSyncRoomLists:
async def filter_rooms_relevant_for_sync(
self,
user: UserID,
room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
) -> Dict[str, _RoomMembershipForUser]:
room_membership_for_user_map: Dict[str, RoomsForUserType],
newly_left_room_ids: AbstractSet[str],
) -> Dict[str, RoomsForUserType]:
"""
Filter room IDs that should/can be listed for this user in the sync response (the
full room list that will be further filtered, sorted, and sliced).
@ -1184,6 +1148,7 @@ class SlidingSyncRoomLists:
Args:
user: User that is syncing
room_membership_for_user_map: Room membership for the user
newly_left_room_ids: The set of room IDs we have newly left
Returns:
A dictionary of room IDs that should be listed in the sync response along
@ -1198,7 +1163,7 @@ class SlidingSyncRoomLists:
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_membership_for_user,
newly_left=room_membership_for_user.newly_left,
newly_left=room_id in newly_left_room_ids,
)
}
@ -1207,9 +1172,9 @@ class SlidingSyncRoomLists:
async def check_room_subscription_allowed_for_user(
self,
room_id: str,
room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
room_membership_for_user_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
) -> Optional[_RoomMembershipForUser]:
) -> Optional[RoomsForUserType]:
"""
Check whether the user is allowed to see the room based on whether they have
ever had membership in the room or if the room is `world_readable`.
@ -1274,7 +1239,7 @@ class SlidingSyncRoomLists:
async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
self,
room_ids: StrCollection,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
) -> Dict[str, Optional[StateMap[StrippedStateEvent]]]:
"""
Fetch stripped state for a list of room IDs. Stripped state is only
@ -1371,7 +1336,7 @@ class SlidingSyncRoomLists:
"room_encryption",
],
room_ids: Set[str],
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
room_id_to_stripped_state_map: Dict[
str, Optional[StateMap[StrippedStateEvent]]
@ -1535,10 +1500,11 @@ class SlidingSyncRoomLists:
async def filter_rooms(
self,
user: UserID,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> Dict[str, _RoomMembershipForUser]:
dm_room_ids: AbstractSet[str],
) -> Dict[str, RoomsForUserType]:
"""
Filter rooms based on the sync request.
@ -1548,6 +1514,7 @@ class SlidingSyncRoomLists:
information in the room at the time of `to_token`.
filters: Filters to apply
to_token: We filter based on the state of the room at this token
dm_room_ids: Set of room IDs that are DMs for the user
Returns:
A filtered dictionary of room IDs along with membership information in the
@ -1567,14 +1534,14 @@ class SlidingSyncRoomLists:
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
if sync_room_map[room_id].is_dm
if room_id in dm_room_ids
}
else:
# Only non-DM rooms please
filtered_room_id_set = {
room_id
for room_id in filtered_room_id_set
if not sync_room_map[room_id].is_dm
if room_id not in dm_room_ids
}
if filters.spaces is not None:
@ -1862,9 +1829,9 @@ class SlidingSyncRoomLists:
@trace
async def sort_rooms(
self,
sync_room_map: Dict[str, _RoomMembershipForUser],
sync_room_map: Dict[str, RoomsForUserType],
to_token: StreamToken,
) -> List[_RoomMembershipForUser]:
) -> List[RoomsForUserType]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.

View File

@ -52,6 +52,20 @@ class RoomsForUserSlidingSync:
is_encrypted: bool
@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True)
class RoomsForUserStateReset:
"""A version of `RoomsForUser` that supports optional sender and event ID
fields, to handle state resets. State resets can affect room membership
without a corresponding event so that information isn't always available."""
room_id: str
sender: Optional[str]
membership: str
event_id: Optional[str]
event_pos: PersistedEventPosition
room_version_id: str
@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True)
class GetRoomsForUserWithStreamOrdering:
room_id: str

File diff suppressed because it is too large Load Diff