Optimize how we calculate `likely_domains` during backfill (#13575)
Optimize how we calculate `likely_domains` during backfill because I've seen this take 17s in production just to `get_current_state` which is used to `get_domains_from_state` (see case [*2. Loading tons of events* in the `/messages` investigation issue](https://github.com/matrix-org/synapse/issues/13356)). There are 3 ways we currently calculate hosts that are in the room: 1. `get_current_state` -> `get_domains_from_state` - Used in `backfill` to calculate `likely_domains` and `/timestamp_to_event` because it was cargo-culted from `backfill` - This one is being eliminated in favor of `get_current_hosts_in_room` in this PR 🕳 1. `get_current_hosts_in_room` - Used for other federation things like sending read receipts and typing indicators 1. `get_hosts_in_room_at_events` - Used when pushing out events over federation to other servers in the `_process_event_queue_loop` Fix https://github.com/matrix-org/synapse/issues/13626 Part of https://github.com/matrix-org/synapse/issues/13356 Mentioned in [internal doc](https://docs.google.com/document/d/1lvUoVfYUiy6UaHB6Rb4HicjaJAU40-APue9Q4vzuW3c/edit#bookmark=id.2tvwz3yhcafh) ### Query performance #### Before The query from `get_current_state` sucks just because we have to get all 80k events. And we see almost the exact same performance locally trying to get all of these events (16s vs 17s): ``` synapse=# SELECT type, state_key, event_id FROM current_state_events WHERE room_id = '!OGEhHVWSdvArJzumhm:matrix.org'; Time: 16035.612 ms (00:16.036) synapse=# SELECT type, state_key, event_id FROM current_state_events WHERE room_id = '!OGEhHVWSdvArJzumhm:matrix.org'; Time: 4243.237 ms (00:04.243) ``` But what about `get_current_hosts_in_room`: When there is 8M rows in the `current_state_events` table, the previous query in `get_current_hosts_in_room` took 13s from complete freshness (when the events were first added). But takes 930ms after a Postgres restart or 390ms if running back to back to back. ```sh $ psql synapse synapse=# \timing on synapse=# SELECT COUNT(DISTINCT substring(state_key FROM '@[^:]*:(.*)$')) FROM current_state_events WHERE type = 'm.room.member' AND membership = 'join' AND room_id = '!OGEhHVWSdvArJzumhm:matrix.org'; count ------- 4130 (1 row) Time: 13181.598 ms (00:13.182) synapse=# SELECT COUNT(*) from current_state_events where room_id = '!OGEhHVWSdvArJzumhm:matrix.org'; count ------- 80814 synapse=# SELECT COUNT(*) from current_state_events; count --------- 8162847 synapse=# SELECT pg_size_pretty( pg_total_relation_size('current_state_events') ); pg_size_pretty ---------------- 4702 MB ``` #### After I'm not sure how long it takes from complete freshness as I only really get that opportunity once (maybe restarting computer but that's cumbersome) and it's not really relevant to normal operating times. Maybe you get closer to the fresh times the more access variability there is so that Postgres caches aren't as exact. Update: The longest I've seen this run for is 6.4s and 4.5s after a computer restart. After a Postgres restart, it takes 330ms and running back to back takes 260ms. ```sh $ psql synapse synapse=# \timing on Timing is on. synapse=# SELECT substring(c.state_key FROM '@[^:]*:(.*)$') as host FROM current_state_events c /* Get the depth of the event from the events table */ INNER JOIN events AS e USING (event_id) WHERE c.type = 'm.room.member' AND c.membership = 'join' AND c.room_id = '!OGEhHVWSdvArJzumhm:matrix.org' GROUP BY host ORDER BY min(e.depth) ASC; Time: 333.800 ms ``` #### Going further To improve things further we could add a `limit` parameter to `get_current_hosts_in_room`. Realistically, we don't need 4k domains to choose from because there is no way we're going to query that many before we a) probably get an answer or b) we give up. Another thing we can do is optimize the query to use a index skip scan: - https://wiki.postgresql.org/wiki/Loose_indexscan - Index Skip Scan, https://commitfest.postgresql.org/37/1741/ - https://www.timescale.com/blog/how-we-made-distinct-queries-up-to-8000x-faster-on-postgresql/
This commit is contained in:
parent
4f6de33f41
commit
51d732db3b
|
@ -0,0 +1 @@
|
|||
Optimize how Synapse calculates domains to fetch from during backfill.
|
|
@ -70,7 +70,7 @@ from synapse.replication.http.federation import (
|
|||
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import JsonDict, StateMap, get_domain_from_id
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.visibility import filter_events_for_server
|
||||
|
@ -104,37 +104,6 @@ backfill_processing_before_timer = Histogram(
|
|||
)
|
||||
|
||||
|
||||
def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
|
||||
"""Get joined domains from state
|
||||
|
||||
Args:
|
||||
state: State map from type/state key to event.
|
||||
|
||||
Returns:
|
||||
Returns a list of servers with the lowest depth of their joins.
|
||||
Sorted by lowest depth first.
|
||||
"""
|
||||
joined_users = [
|
||||
(state_key, int(event.depth))
|
||||
for (e_type, state_key), event in state.items()
|
||||
if e_type == EventTypes.Member and event.membership == Membership.JOIN
|
||||
]
|
||||
|
||||
joined_domains: Dict[str, int] = {}
|
||||
for u, d in joined_users:
|
||||
try:
|
||||
dom = get_domain_from_id(u)
|
||||
old_d = joined_domains.get(dom)
|
||||
if old_d:
|
||||
joined_domains[dom] = min(d, old_d)
|
||||
else:
|
||||
joined_domains[dom] = d
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return sorted(joined_domains.items(), key=lambda d: d[1])
|
||||
|
||||
|
||||
class _BackfillPointType(Enum):
|
||||
# a regular backwards extremity (ie, an event which we don't yet have, but which
|
||||
# is referred to by other events in the DAG)
|
||||
|
@ -432,21 +401,19 @@ class FederationHandler:
|
|||
)
|
||||
|
||||
# Now we need to decide which hosts to hit first.
|
||||
|
||||
# First we try hosts that are already in the room
|
||||
# First we try hosts that are already in the room.
|
||||
# TODO: HEURISTIC ALERT.
|
||||
likely_domains = (
|
||||
await self._storage_controllers.state.get_current_hosts_in_room(room_id)
|
||||
)
|
||||
|
||||
curr_state = await self._storage_controllers.state.get_current_state(room_id)
|
||||
|
||||
curr_domains = get_domains_from_state(curr_state)
|
||||
|
||||
likely_domains = [
|
||||
domain for domain, depth in curr_domains if domain != self.server_name
|
||||
]
|
||||
|
||||
async def try_backfill(domains: List[str]) -> bool:
|
||||
async def try_backfill(domains: Collection[str]) -> bool:
|
||||
# TODO: Should we try multiple of these at a time?
|
||||
for dom in domains:
|
||||
# We don't want to ask our own server for information we don't have
|
||||
if dom == self.server_name:
|
||||
continue
|
||||
|
||||
try:
|
||||
await self._federation_event_handler.backfill(
|
||||
dom, room_id, limit=100, extremities=extremities_to_request
|
||||
|
|
|
@ -60,7 +60,6 @@ from synapse.event_auth import validate_event_for_room_version
|
|||
from synapse.events import EventBase
|
||||
from synapse.events.utils import copy_and_fixup_power_levels_contents
|
||||
from synapse.federation.federation_client import InvalidResponseError
|
||||
from synapse.handlers.federation import get_domains_from_state
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
from synapse.module_api import NOT_SPAM
|
||||
from synapse.rest.admin._base import assert_user_is_admin
|
||||
|
@ -1462,17 +1461,16 @@ class TimestampLookupHandler:
|
|||
timestamp,
|
||||
)
|
||||
|
||||
# Find other homeservers from the given state in the room
|
||||
curr_state = await self._storage_controllers.state.get_current_state(
|
||||
room_id
|
||||
likely_domains = (
|
||||
await self._storage_controllers.state.get_current_hosts_in_room(room_id)
|
||||
)
|
||||
curr_domains = get_domains_from_state(curr_state)
|
||||
likely_domains = [
|
||||
domain for domain, depth in curr_domains if domain != self.server_name
|
||||
]
|
||||
|
||||
# Loop through each homeserver candidate until we get a succesful response
|
||||
for domain in likely_domains:
|
||||
# We don't want to ask our own server for information we don't have
|
||||
if domain == self.server_name:
|
||||
continue
|
||||
|
||||
try:
|
||||
remote_response = await self.federation_client.timestamp_to_event(
|
||||
domain, room_id, timestamp, direction
|
||||
|
|
|
@ -23,7 +23,6 @@ from typing import (
|
|||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
|
@ -520,7 +519,7 @@ class StateStorageController:
|
|||
)
|
||||
return state_map.get(key)
|
||||
|
||||
async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
|
||||
async def get_current_hosts_in_room(self, room_id: str) -> List[str]:
|
||||
"""Get current hosts in room based on current state."""
|
||||
|
||||
await self._partial_state_room_tracker.await_full_state(room_id)
|
||||
|
|
|
@ -187,27 +187,48 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
async def get_users_in_room(self, room_id: str) -> List[str]:
|
||||
"""
|
||||
Returns a list of users in the room sorted by longest in the room first
|
||||
(aka. with the lowest depth). This is done to match the sort in
|
||||
`get_current_hosts_in_room()` and so we can re-use the cache but it's
|
||||
not horrible to have here either.
|
||||
"""
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_users_in_room", self.get_users_in_room_txn, room_id
|
||||
)
|
||||
|
||||
def get_users_in_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[str]:
|
||||
"""
|
||||
Returns a list of users in the room sorted by longest in the room first
|
||||
(aka. with the lowest depth). This is done to match the sort in
|
||||
`get_current_hosts_in_room()` and so we can re-use the cache but it's
|
||||
not horrible to have here either.
|
||||
"""
|
||||
# If we can assume current_state_events.membership is up to date
|
||||
# then we can avoid a join, which is a Very Good Thing given how
|
||||
# frequently this function gets called.
|
||||
if self._current_state_events_membership_up_to_date:
|
||||
sql = """
|
||||
SELECT state_key FROM current_state_events
|
||||
WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
|
||||
SELECT c.state_key FROM current_state_events as c
|
||||
/* Get the depth of the event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
|
||||
/* Sorted by lowest depth first */
|
||||
ORDER BY e.depth ASC;
|
||||
"""
|
||||
else:
|
||||
sql = """
|
||||
SELECT state_key FROM room_memberships as m
|
||||
SELECT c.state_key FROM room_memberships as m
|
||||
/* Get the depth of the event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
INNER JOIN current_state_events as c
|
||||
ON m.event_id = c.event_id
|
||||
AND m.room_id = c.room_id
|
||||
AND m.user_id = c.state_key
|
||||
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
|
||||
/* Sorted by lowest depth first */
|
||||
ORDER BY e.depth ASC;
|
||||
"""
|
||||
|
||||
txn.execute(sql, (room_id, Membership.JOIN))
|
||||
|
@ -1037,37 +1058,70 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
return True
|
||||
|
||||
@cached(iterable=True, max_entries=10000)
|
||||
async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
|
||||
"""Get current hosts in room based on current state."""
|
||||
async def get_current_hosts_in_room(self, room_id: str) -> List[str]:
|
||||
"""
|
||||
Get current hosts in room based on current state.
|
||||
|
||||
The heuristic of sorting by servers who have been in the room the
|
||||
longest is good because they're most likely to have anything we ask
|
||||
about.
|
||||
|
||||
Returns:
|
||||
Returns a list of servers sorted by longest in the room first. (aka.
|
||||
sorted by join with the lowest depth first).
|
||||
"""
|
||||
|
||||
# First we check if we already have `get_users_in_room` in the cache, as
|
||||
# we can just calculate result from that
|
||||
users = self.get_users_in_room.cache.get_immediate(
|
||||
(room_id,), None, update_metrics=False
|
||||
)
|
||||
if users is not None:
|
||||
return {get_domain_from_id(u) for u in users}
|
||||
|
||||
if isinstance(self.database_engine, Sqlite3Engine):
|
||||
if users is None and isinstance(self.database_engine, Sqlite3Engine):
|
||||
# If we're using SQLite then let's just always use
|
||||
# `get_users_in_room` rather than funky SQL.
|
||||
users = await self.get_users_in_room(room_id)
|
||||
return {get_domain_from_id(u) for u in users}
|
||||
|
||||
if users is not None:
|
||||
# Because `users` is sorted from lowest -> highest depth, the list
|
||||
# of domains will also be sorted that way.
|
||||
domains: List[str] = []
|
||||
# We use a `Set` just for fast lookups
|
||||
domain_set: Set[str] = set()
|
||||
for u in users:
|
||||
domain = get_domain_from_id(u)
|
||||
if domain not in domain_set:
|
||||
domain_set.add(domain)
|
||||
domains.append(domain)
|
||||
return domains
|
||||
|
||||
# For PostgreSQL we can use a regex to pull out the domains from the
|
||||
# joined users in `current_state_events` via regex.
|
||||
|
||||
def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> Set[str]:
|
||||
def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> List[str]:
|
||||
# Returns a list of servers currently joined in the room sorted by
|
||||
# longest in the room first (aka. with the lowest depth). The
|
||||
# heuristic of sorting by servers who have been in the room the
|
||||
# longest is good because they're most likely to have anything we
|
||||
# ask about.
|
||||
sql = """
|
||||
SELECT DISTINCT substring(state_key FROM '@[^:]*:(.*)$')
|
||||
FROM current_state_events
|
||||
SELECT
|
||||
/* Match the domain part of the MXID */
|
||||
substring(c.state_key FROM '@[^:]*:(.*)$') as server_domain
|
||||
FROM current_state_events c
|
||||
/* Get the depth of the event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
WHERE
|
||||
type = 'm.room.member'
|
||||
AND membership = 'join'
|
||||
AND room_id = ?
|
||||
/* Find any join state events in the room */
|
||||
c.type = 'm.room.member'
|
||||
AND c.membership = 'join'
|
||||
AND c.room_id = ?
|
||||
/* Group all state events from the same domain into their own buckets (groups) */
|
||||
GROUP BY server_domain
|
||||
/* Sorted by lowest depth first */
|
||||
ORDER BY min(e.depth) ASC;
|
||||
"""
|
||||
txn.execute(sql, (room_id,))
|
||||
return {d for d, in txn}
|
||||
return [d for d, in txn]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_current_hosts_in_room", get_current_hosts_in_room_txn
|
||||
|
|
Loading…
Reference in New Issue