When restarting a partial join resync, prioritise the server which actioned a partial join (#14126)

This commit is contained in:
David Robertson 2022-10-18 12:33:18 +01:00 committed by GitHub
parent 4af93bd7f6
commit c3a4780080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 95 additions and 31 deletions

1
changelog.d/14126.misc Normal file
View File

@ -0,0 +1 @@
Faster joins: prioritise the server we joined by when restarting a partial join resync.

View File

@ -937,7 +937,10 @@ class DeviceListUpdater:
# Check if we are partially joining any rooms. If so we need to store # Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we # all device list updates so that we can handle them correctly once we
# know who is in the room. # know who is in the room.
partial_rooms = await self.store.get_partial_state_rooms_and_servers() # TODO(faster joins): this fetches and processes a bunch of data that we don't
# use. Could be replaced by a tighter query e.g.
# SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
partial_rooms = await self.store.get_partial_state_room_resync_info()
if partial_rooms: if partial_rooms:
await self.store.add_remote_device_list_to_pending( await self.store.add_remote_device_list_to_pending(
user_id, user_id,

View File

@ -632,6 +632,7 @@ class FederationHandler:
room_id=room_id, room_id=room_id,
servers=ret.servers_in_room, servers=ret.servers_in_room,
device_lists_stream_id=self.store.get_device_stream_token(), device_lists_stream_id=self.store.get_device_stream_token(),
joined_via=origin,
) )
try: try:
@ -1615,13 +1616,13 @@ class FederationHandler:
"""Resumes resyncing of all partial-state rooms after a restart.""" """Resumes resyncing of all partial-state rooms after a restart."""
assert not self.config.worker.worker_app assert not self.config.worker.worker_app
partial_state_rooms = await self.store.get_partial_state_rooms_and_servers() partial_state_rooms = await self.store.get_partial_state_room_resync_info()
for room_id, servers_in_room in partial_state_rooms.items(): for room_id, resync_info in partial_state_rooms.items():
run_as_background_process( run_as_background_process(
desc="sync_partial_state_room", desc="sync_partial_state_room",
func=self._sync_partial_state_room, func=self._sync_partial_state_room,
initial_destination=None, initial_destination=resync_info.joined_via,
other_destinations=servers_in_room, other_destinations=resync_info.servers_in_room,
room_id=room_id, room_id=room_id,
) )
@ -1650,28 +1651,12 @@ class FederationHandler:
# really leave, that might mean we have difficulty getting the room state over # really leave, that might mean we have difficulty getting the room state over
# federation. # federation.
# https://github.com/matrix-org/synapse/issues/12802 # https://github.com/matrix-org/synapse/issues/12802
#
# TODO(faster_joins): we need some way of prioritising which homeservers in
# `other_destinations` to try first, otherwise we'll spend ages trying dead
# homeservers for large rooms.
# https://github.com/matrix-org/synapse/issues/12999
if initial_destination is None and len(other_destinations) == 0:
raise ValueError(
f"Cannot resync state of {room_id}: no destinations provided"
)
# Make an infinite iterator of destinations to try. Once we find a working # Make an infinite iterator of destinations to try. Once we find a working
# destination, we'll stick with it until it flakes. # destination, we'll stick with it until it flakes.
destinations: Collection[str] destinations = _prioritise_destinations_for_partial_state_resync(
if initial_destination is not None: initial_destination, other_destinations, room_id
# Move `initial_destination` to the front of the list. )
destinations = list(other_destinations)
if initial_destination in destinations:
destinations.remove(initial_destination)
destinations = [initial_destination] + destinations
else:
destinations = other_destinations
destination_iter = itertools.cycle(destinations) destination_iter = itertools.cycle(destinations)
# `destination` is the current remote homeserver we're pulling from. # `destination` is the current remote homeserver we're pulling from.
@ -1769,3 +1754,29 @@ class FederationHandler:
room_id, room_id,
destination, destination,
) )
def _prioritise_destinations_for_partial_state_resync(
initial_destination: Optional[str],
other_destinations: Collection[str],
room_id: str,
) -> Collection[str]:
"""Work out the order in which we should ask servers to resync events.
If an `initial_destination` is given, it takes top priority. Otherwise
all servers are treated equally.
:raises ValueError: if no destination is provided at all.
"""
if initial_destination is None and len(other_destinations) == 0:
raise ValueError(f"Cannot resync state of {room_id}: no destinations provided")
if initial_destination is None:
return other_destinations
# Move `initial_destination` to the front of the list.
destinations = list(other_destinations)
if initial_destination in destinations:
destinations.remove(initial_destination)
destinations = [initial_destination] + destinations
return destinations

View File

@ -1658,7 +1658,7 @@ class DatabasePool:
table: string giving the table name table: string giving the table name
keyvalues: dict of column names and values to select the row with keyvalues: dict of column names and values to select the row with
retcol: string giving the name of the column to return retcol: string giving the name of the column to return
allow_none: If true, return None instead of failing if the SELECT allow_none: If true, return None instead of raising StoreError if the SELECT
statement returns no rows statement returns no rows
desc: description of the transaction, for logging and metrics desc: description of the transaction, for logging and metrics
""" """

View File

@ -97,6 +97,12 @@ class RoomSortOrder(Enum):
STATE_EVENTS = "state_events" STATE_EVENTS = "state_events"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class PartialStateResyncInfo:
joined_via: Optional[str]
servers_in_room: List[str] = attr.ib(factory=list)
class RoomWorkerStore(CacheInvalidationWorkerStore): class RoomWorkerStore(CacheInvalidationWorkerStore):
def __init__( def __init__(
self, self,
@ -1160,17 +1166,29 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
desc="get_partial_state_servers_at_join", desc="get_partial_state_servers_at_join",
) )
async def get_partial_state_rooms_and_servers( async def get_partial_state_room_resync_info(
self, self,
) -> Mapping[str, Collection[str]]: ) -> Mapping[str, PartialStateResyncInfo]:
"""Get all rooms containing events with partial state, and the servers known """Get all rooms containing events with partial state, and the information
to be in the room. needed to restart a "resync" of those rooms.
Returns: Returns:
A dictionary of rooms with partial state, with room IDs as keys and A dictionary of rooms with partial state, with room IDs as keys and
lists of servers in rooms as values. lists of servers in rooms as values.
""" """
room_servers: Dict[str, List[str]] = {} room_servers: Dict[str, PartialStateResyncInfo] = {}
rows = await self.db_pool.simple_select_list(
table="partial_state_rooms",
keyvalues={},
retcols=("room_id", "joined_via"),
desc="get_server_which_served_partial_join",
)
for row in rows:
room_id = row["room_id"]
joined_via = row["joined_via"]
room_servers[room_id] = PartialStateResyncInfo(joined_via=joined_via)
rows = await self.db_pool.simple_select_list( rows = await self.db_pool.simple_select_list(
"partial_state_rooms_servers", "partial_state_rooms_servers",
@ -1182,7 +1200,15 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
for row in rows: for row in rows:
room_id = row["room_id"] room_id = row["room_id"]
server_name = row["server_name"] server_name = row["server_name"]
room_servers.setdefault(room_id, []).append(server_name) entry = room_servers.get(room_id)
if entry is None:
# There is a foreign key constraint which enforces that every room_id in
# partial_state_rooms_servers appears in partial_state_rooms. So we
# expect `entry` to be non-null. (This reasoning fails if we've
# partial-joined between the two SELECTs, but this is unlikely to happen
# in practice.)
continue
entry.servers_in_room.append(server_name)
return room_servers return room_servers
@ -1827,6 +1853,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id: str, room_id: str,
servers: Collection[str], servers: Collection[str],
device_lists_stream_id: int, device_lists_stream_id: int,
joined_via: str,
) -> None: ) -> None:
"""Mark the given room as containing events with partial state. """Mark the given room as containing events with partial state.
@ -1842,6 +1869,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
servers: other servers known to be in the room servers: other servers known to be in the room
device_lists_stream_id: the device_lists stream ID at the time when we first device_lists_stream_id: the device_lists stream ID at the time when we first
joined the room. joined the room.
joined_via: the server name we requested a partial join from.
""" """
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"store_partial_state_room", "store_partial_state_room",
@ -1849,6 +1877,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id, room_id,
servers, servers,
device_lists_stream_id, device_lists_stream_id,
joined_via,
) )
def _store_partial_state_room_txn( def _store_partial_state_room_txn(
@ -1857,6 +1886,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id: str, room_id: str,
servers: Collection[str], servers: Collection[str],
device_lists_stream_id: int, device_lists_stream_id: int,
joined_via: str,
) -> None: ) -> None:
DatabasePool.simple_insert_txn( DatabasePool.simple_insert_txn(
txn, txn,
@ -1866,6 +1896,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
"device_lists_stream_id": device_lists_stream_id, "device_lists_stream_id": device_lists_stream_id,
# To be updated later once the join event is persisted. # To be updated later once the join event is persisted.
"join_event_id": None, "join_event_id": None,
"joined_via": joined_via,
}, },
) )
DatabasePool.simple_insert_many_txn( DatabasePool.simple_insert_many_txn(

View File

@ -0,0 +1,18 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- When we resync partial state, we prioritise doing so using the server we
-- partial-joined from. To do this we need to record which server that was!
ALTER TABLE partial_state_rooms ADD COLUMN joined_via TEXT;