Sliding Sync: Get `bump_stamp` from new sliding sync tables because it's faster (#17658)
Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster (performance) than flipping through the latest events in the room.
This commit is contained in:
parent
5c229415c4
commit
e1ed959a68
|
@ -0,0 +1 @@
|
|||
Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster.
|
|
@ -1040,11 +1040,51 @@ class SlidingSyncHandler:
|
|||
)
|
||||
)
|
||||
|
||||
# By default, just choose the membership event position
|
||||
# Figure out the last bump event in the room
|
||||
#
|
||||
# By default, just choose the membership event position for any non-join membership
|
||||
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
|
||||
|
||||
# Figure out the last bump event in the room if we're in the room.
|
||||
# If we're joined to the room, we need to find the last bump event before the
|
||||
# `to_token`
|
||||
if room_membership_for_user_at_to_token.membership == Membership.JOIN:
|
||||
# We can quickly query for the latest bump event in the room using the
|
||||
# sliding sync tables.
|
||||
latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(
|
||||
room_id
|
||||
)
|
||||
|
||||
min_to_token_position = to_token.room_key.stream
|
||||
|
||||
# If we can rely on the new sliding sync tables and the `bump_stamp` is
|
||||
# `None`, just fallback to the membership event position. This can happen
|
||||
# when we've just joined a remote room and all the events are backfilled.
|
||||
if (
|
||||
# FIXME: The background job check can be removed once we bump
|
||||
# `SCHEMA_COMPAT_VERSION` and run the foreground update for
|
||||
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`
|
||||
# (tracked by https://github.com/element-hq/synapse/issues/17623)
|
||||
await self.store.have_finished_sliding_sync_background_jobs()
|
||||
and latest_room_bump_stamp is None
|
||||
):
|
||||
pass
|
||||
|
||||
# The `bump_stamp` stored in the database might be ahead of our token. Since
|
||||
# `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure
|
||||
# that's before the `to_token` in all scenarios. The only scenario we can be
|
||||
# sure of is if the `bump_stamp` is totally before the minimum position from
|
||||
# the token.
|
||||
#
|
||||
# We don't need to check if the background update has finished, as if the
|
||||
# returned bump stamp is not None then it must be up to date.
|
||||
elif (
|
||||
latest_room_bump_stamp is not None
|
||||
and latest_room_bump_stamp < min_to_token_position
|
||||
):
|
||||
bump_stamp = latest_room_bump_stamp
|
||||
|
||||
# Otherwise, if it's within or after the `to_token`, we need to find the
|
||||
# last bump event before the `to_token`.
|
||||
else:
|
||||
last_bump_event_result = (
|
||||
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
||||
room_id,
|
||||
|
@ -1052,8 +1092,6 @@ class SlidingSyncHandler:
|
|||
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||
)
|
||||
)
|
||||
|
||||
# But if we found a bump event, use that instead
|
||||
if last_bump_event_result is not None:
|
||||
_, new_bump_event_pos = last_bump_event_result
|
||||
|
||||
|
|
|
@ -327,6 +327,13 @@ class PersistEventsStore:
|
|||
|
||||
async with stream_ordering_manager as stream_orderings:
|
||||
for (event, _), stream in zip(events_and_contexts, stream_orderings):
|
||||
# XXX: We can't rely on `stream_ordering`/`instance_name` being correct
|
||||
# at this point. We could be working with events that were previously
|
||||
# persisted as an `outlier` with one `stream_ordering` but are now being
|
||||
# persisted again and de-outliered and are being assigned a different
|
||||
# `stream_ordering` here that won't end up being used.
|
||||
# `_update_outliers_txn()` will fix this discrepancy (always use the
|
||||
# `stream_ordering` from the first time it was persisted).
|
||||
event.internal_metadata.stream_ordering = stream
|
||||
event.internal_metadata.instance_name = self._instance_name
|
||||
|
||||
|
@ -470,11 +477,11 @@ class PersistEventsStore:
|
|||
membership_infos_to_insert_membership_snapshots.append(
|
||||
# XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
|
||||
# because we're sourcing the event from `events_and_contexts`, we
|
||||
# can't rely on `stream_ordering`/`instance_name` being correct. We
|
||||
# could be working with events that were previously persisted as an
|
||||
# `outlier` with one `stream_ordering` but are now being persisted
|
||||
# again and de-outliered and assigned a different `stream_ordering`
|
||||
# that won't end up being used. Since we call
|
||||
# can't rely on `stream_ordering`/`instance_name` being correct at
|
||||
# this point. We could be working with events that were previously
|
||||
# persisted as an `outlier` with one `stream_ordering` but are now
|
||||
# being persisted again and de-outliered and assigned a different
|
||||
# `stream_ordering` that won't end up being used. Since we call
|
||||
# `_calculate_sliding_sync_table_changes()` before
|
||||
# `_update_outliers_txn()` which fixes this discrepancy (always use
|
||||
# the `stream_ordering` from the first time it was persisted), we're
|
||||
|
@ -591,11 +598,17 @@ class PersistEventsStore:
|
|||
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||
)
|
||||
)
|
||||
bump_stamp_to_fully_insert = (
|
||||
most_recent_bump_event_pos_results[1].stream
|
||||
if most_recent_bump_event_pos_results is not None
|
||||
else None
|
||||
)
|
||||
if most_recent_bump_event_pos_results is not None:
|
||||
_, new_bump_event_pos = most_recent_bump_event_pos_results
|
||||
|
||||
# If we've just joined a remote room, then the last bump event may
|
||||
# have been backfilled (and so have a negative stream ordering).
|
||||
# These negative stream orderings can't sensibly be compared, so
|
||||
# instead just leave it as `None` in the table and we will use their
|
||||
# membership event position as the bump event position in the
|
||||
# Sliding Sync API.
|
||||
if new_bump_event_pos.stream > 0:
|
||||
bump_stamp_to_fully_insert = new_bump_event_pos.stream
|
||||
|
||||
current_state_ids_map = dict(
|
||||
await self.store.get_partial_filtered_current_state_ids(
|
||||
|
@ -2123,31 +2136,26 @@ class PersistEventsStore:
|
|||
if len(events_and_contexts) == 0:
|
||||
return
|
||||
|
||||
# We only update the sliding sync tables for non-backfilled events.
|
||||
#
|
||||
# Check if the first event is a backfilled event (with a negative
|
||||
# `stream_ordering`). If one event is backfilled, we assume this whole batch was
|
||||
# backfilled.
|
||||
first_event_stream_ordering = events_and_contexts[0][
|
||||
0
|
||||
].internal_metadata.stream_ordering
|
||||
# This should exist for persisted events
|
||||
assert first_event_stream_ordering is not None
|
||||
if first_event_stream_ordering < 0:
|
||||
return
|
||||
|
||||
# Since the list is sorted ascending by `stream_ordering`, the last event should
|
||||
# have the highest `stream_ordering`.
|
||||
max_stream_ordering = events_and_contexts[-1][
|
||||
0
|
||||
].internal_metadata.stream_ordering
|
||||
# `stream_ordering` should be assigned for persisted events
|
||||
assert max_stream_ordering is not None
|
||||
# Check if the event is a backfilled event (with a negative `stream_ordering`).
|
||||
# If one event is backfilled, we assume this whole batch was backfilled.
|
||||
if max_stream_ordering < 0:
|
||||
# We only update the sliding sync tables for non-backfilled events.
|
||||
return
|
||||
|
||||
max_bump_stamp = None
|
||||
for event, _ in reversed(events_and_contexts):
|
||||
# Sanity check that all events belong to the same room
|
||||
assert event.room_id == room_id
|
||||
|
||||
if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
|
||||
# This should exist for persisted events
|
||||
# `stream_ordering` should be assigned for persisted events
|
||||
assert event.internal_metadata.stream_ordering is not None
|
||||
|
||||
max_bump_stamp = event.internal_metadata.stream_ordering
|
||||
|
@ -2156,11 +2164,6 @@ class PersistEventsStore:
|
|||
# matching bump event which should have the highest `stream_ordering`.
|
||||
break
|
||||
|
||||
# We should have exited earlier if there were no events
|
||||
assert (
|
||||
max_stream_ordering is not None
|
||||
), "Expected to have a stream_ordering if we have events"
|
||||
|
||||
# Handle updating the `sliding_sync_joined_rooms` table.
|
||||
#
|
||||
txn.execute(
|
||||
|
|
|
@ -41,6 +41,46 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class SlidingSyncStore(SQLBaseStore):
|
||||
async def get_latest_bump_stamp_for_room(
|
||||
self,
|
||||
room_id: str,
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
Get the `bump_stamp` for the room.
|
||||
|
||||
The `bump_stamp` is the `stream_ordering` of the last event according to the
|
||||
`bump_event_types`. This helps clients sort more readily without them needing to
|
||||
pull in a bunch of the timeline to determine the last activity.
|
||||
`bump_event_types` is a thing because for example, we don't want display name
|
||||
changes to mark the room as unread and bump it to the top. For encrypted rooms,
|
||||
we just have to consider any activity as a bump because we can't see the content
|
||||
and the client has to figure it out for themselves.
|
||||
|
||||
This should only be called where the server is participating
|
||||
in the room (someone local is joined).
|
||||
|
||||
Returns:
|
||||
The `bump_stamp` for the room (which can be `None`).
|
||||
"""
|
||||
|
||||
return cast(
|
||||
Optional[int],
|
||||
await self.db_pool.simple_select_one_onecol(
|
||||
table="sliding_sync_joined_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="bump_stamp",
|
||||
# FIXME: This should be `False` once we bump `SCHEMA_COMPAT_VERSION` and run the
|
||||
# foreground update for
|
||||
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked
|
||||
# by https://github.com/element-hq/synapse/issues/17623)
|
||||
#
|
||||
# The should be `allow_none=False` in the future because event though
|
||||
# `bump_stamp` itself can be `None`, we should have a row in the
|
||||
# `sliding_sync_joined_rooms` table for any joined room.
|
||||
allow_none=True,
|
||||
),
|
||||
)
|
||||
|
||||
async def persist_per_connection_state(
|
||||
self,
|
||||
user_id: str,
|
||||
|
|
|
@ -106,6 +106,12 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
|
|||
assert persist_events_store is not None
|
||||
self.persist_events_store = persist_events_store
|
||||
|
||||
persist_controller = self.hs.get_storage_controllers().persistence
|
||||
assert persist_controller is not None
|
||||
self.persist_controller = persist_controller
|
||||
|
||||
self.state_handler = self.hs.get_state_handler()
|
||||
|
||||
def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]:
|
||||
"""
|
||||
Return the rows from the `sliding_sync_joined_rooms` table.
|
||||
|
@ -260,10 +266,8 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
|
|||
)
|
||||
)
|
||||
context = EventContext.for_outlier(self.hs.get_storage_controllers())
|
||||
persist_controller = self.hs.get_storage_controllers().persistence
|
||||
assert persist_controller is not None
|
||||
persisted_event, _, _ = self.get_success(
|
||||
persist_controller.persist_event(invite_event, context)
|
||||
self.persist_controller.persist_event(invite_event, context)
|
||||
)
|
||||
|
||||
self._remote_invite_count += 1
|
||||
|
@ -316,10 +320,8 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
|
|||
)
|
||||
)
|
||||
context = EventContext.for_outlier(self.hs.get_storage_controllers())
|
||||
persist_controller = self.hs.get_storage_controllers().persistence
|
||||
assert persist_controller is not None
|
||||
persisted_event, _, _ = self.get_success(
|
||||
persist_controller.persist_event(kick_event, context)
|
||||
self.persist_controller.persist_event(kick_event, context)
|
||||
)
|
||||
|
||||
return persisted_event
|
||||
|
@ -926,6 +928,201 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
|
|||
user2_snapshot,
|
||||
)
|
||||
|
||||
def test_joined_room_bump_stamp_backfill(self) -> None:
|
||||
"""
|
||||
Test that `bump_stamp` ignores backfilled events, i.e. events with a
|
||||
negative stream ordering.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
_user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote room
|
||||
creator = "@user:other"
|
||||
room_id = "!foo:other"
|
||||
room_version = RoomVersions.V10
|
||||
shared_kwargs = {
|
||||
"room_id": room_id,
|
||||
"room_version": room_version.identifier,
|
||||
}
|
||||
|
||||
create_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[],
|
||||
type=EventTypes.Create,
|
||||
state_key="",
|
||||
content={
|
||||
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||
# version > 10 (in favor of relying on `sender`)
|
||||
EventContentFields.ROOM_CREATOR: creator,
|
||||
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||
},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
creator_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[create_tuple[0].event_id],
|
||||
auth_event_ids=[create_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=creator,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
room_name_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[creator_tuple[0].event_id],
|
||||
auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
|
||||
type=EventTypes.Name,
|
||||
state_key="",
|
||||
content={
|
||||
EventContentFields.ROOM_NAME: "my super duper room",
|
||||
},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
# We add a message event as a valid "bump type"
|
||||
msg_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[room_name_tuple[0].event_id],
|
||||
auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
|
||||
type=EventTypes.Message,
|
||||
content={"body": "foo", "msgtype": "m.text"},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
invite_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[msg_tuple[0].event_id],
|
||||
auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
content={"membership": Membership.INVITE},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
remote_events_and_contexts = [
|
||||
create_tuple,
|
||||
creator_tuple,
|
||||
room_name_tuple,
|
||||
msg_tuple,
|
||||
invite_tuple,
|
||||
]
|
||||
|
||||
# Ensure the local HS knows the room version
|
||||
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||
|
||||
# Persist these events as backfilled events.
|
||||
for event, context in remote_events_and_contexts:
|
||||
self.get_success(
|
||||
self.persist_controller.persist_event(event, context, backfilled=True)
|
||||
)
|
||||
|
||||
# Now we join the local user to the room. We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||
# process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[invite_tuple[0].event_id],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=user1_id,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id
|
||||
for e in [create_tuple[0], invite_tuple[0], room_name_tuple[0]]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
join_event, _join_event_pos, _room_token = self.get_success(
|
||||
self.persist_controller.persist_event(join_event, join_context)
|
||||
)
|
||||
|
||||
# Make sure the tables are populated correctly
|
||||
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
|
||||
self.assertIncludes(
|
||||
set(sliding_sync_joined_rooms_results.keys()),
|
||||
{room_id},
|
||||
exact=True,
|
||||
)
|
||||
self.assertEqual(
|
||||
sliding_sync_joined_rooms_results[room_id],
|
||||
_SlidingSyncJoinedRoomResult(
|
||||
room_id=room_id,
|
||||
# This should be the last event in the room (the join membership)
|
||||
event_stream_ordering=join_event.internal_metadata.stream_ordering,
|
||||
# Since all of the bump events are backfilled, the `bump_stamp` should
|
||||
# still be `None`. (and we will fallback to the users membership event
|
||||
# position in the Sliding Sync API)
|
||||
bump_stamp=None,
|
||||
room_type=None,
|
||||
# We still pick up state of the room even if it's backfilled
|
||||
room_name="my super duper room",
|
||||
is_encrypted=False,
|
||||
tombstone_successor_room_id=None,
|
||||
),
|
||||
)
|
||||
|
||||
sliding_sync_membership_snapshots_results = (
|
||||
self._get_sliding_sync_membership_snapshots()
|
||||
)
|
||||
self.assertIncludes(
|
||||
set(sliding_sync_membership_snapshots_results.keys()),
|
||||
{
|
||||
(room_id, user1_id),
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
self.assertEqual(
|
||||
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
|
||||
_SlidingSyncMembershipSnapshotResult(
|
||||
room_id=room_id,
|
||||
user_id=user1_id,
|
||||
sender=user1_id,
|
||||
membership_event_id=join_event.event_id,
|
||||
membership=Membership.JOIN,
|
||||
event_stream_ordering=join_event.internal_metadata.stream_ordering,
|
||||
has_known_state=True,
|
||||
room_type=None,
|
||||
room_name="my super duper room",
|
||||
is_encrypted=False,
|
||||
tombstone_successor_room_id=None,
|
||||
),
|
||||
)
|
||||
|
||||
@parameterized.expand(
|
||||
# Test both an insert an upsert into the
|
||||
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` to exercise
|
||||
|
@ -1036,11 +1233,9 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
|
|||
context = self.get_success(unpersisted_context.persist(event))
|
||||
events_to_persist.append((event, context))
|
||||
|
||||
persist_controller = self.hs.get_storage_controllers().persistence
|
||||
assert persist_controller is not None
|
||||
for event, context in events_to_persist:
|
||||
self.get_success(
|
||||
persist_controller.persist_event(
|
||||
self.persist_controller.persist_event(
|
||||
event,
|
||||
context,
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue