Sliding sync: various fixes to background update (#17636)
Follows on from #17512, other fixes include: #17633, #17634, #17635
This commit is contained in:
parent
ca69d0f571
commit
d52c17ce01
|
@ -0,0 +1 @@
|
|||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
|
@ -1861,7 +1861,7 @@ class PersistEventsStore:
|
|||
VALUES (
|
||||
?, ?, ?, ?, ?,
|
||||
(SELECT stream_ordering FROM events WHERE event_id = ?),
|
||||
(SELECT instance_name FROM events WHERE event_id = ?)
|
||||
(SELECT COALESCE(instance_name, 'master') FROM events WHERE event_id = ?)
|
||||
{("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""}
|
||||
)
|
||||
ON CONFLICT (room_id, user_id)
|
||||
|
|
|
@ -41,6 +41,7 @@ from synapse.storage.databases.main.events import (
|
|||
SlidingSyncMembershipSnapshotSharedInsertValues,
|
||||
SlidingSyncStateInsertValues,
|
||||
)
|
||||
from synapse.storage.databases.main.events_worker import DatabaseCorruptionError
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.types import Cursor
|
||||
|
@ -1857,6 +1858,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
initial_phase = True
|
||||
|
||||
last_room_id = progress.get("last_room_id", "")
|
||||
last_user_id = progress.get("last_user_id", "")
|
||||
last_event_stream_ordering = progress["last_event_stream_ordering"]
|
||||
|
||||
def _find_memberships_to_update_txn(
|
||||
|
@ -1887,11 +1889,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
FROM local_current_membership AS c
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
|
||||
WHERE c.room_id > ?
|
||||
ORDER BY c.room_id ASC
|
||||
WHERE (c.room_id, c.user_id) > (?, ?)
|
||||
ORDER BY c.room_id ASC, c.user_id ASC
|
||||
LIMIT ?
|
||||
""",
|
||||
(last_room_id, batch_size),
|
||||
(last_room_id, last_user_id, batch_size),
|
||||
)
|
||||
elif last_event_stream_ordering is not None:
|
||||
# It's important to sort by `event_stream_ordering` *ascending* (oldest to
|
||||
|
@ -1993,6 +1995,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
WHERE
|
||||
room_id = ?
|
||||
AND m.user_id = ?
|
||||
AND (m.membership = ? OR m.membership = ?)
|
||||
AND e.event_id != ?
|
||||
ORDER BY e.topological_ordering DESC
|
||||
LIMIT 1
|
||||
|
@ -2000,6 +2003,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
(
|
||||
room_id,
|
||||
user_id,
|
||||
Membership.INVITE,
|
||||
Membership.KNOCK,
|
||||
event_id,
|
||||
),
|
||||
)
|
||||
|
@ -2081,9 +2086,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
# have `current_state_events` and we should have some current state
|
||||
# for each room
|
||||
if current_state_ids_map:
|
||||
fetched_events = await self.get_events(
|
||||
current_state_ids_map.values()
|
||||
)
|
||||
try:
|
||||
fetched_events = await self.get_events(
|
||||
current_state_ids_map.values()
|
||||
)
|
||||
except DatabaseCorruptionError as e:
|
||||
logger.warning(
|
||||
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
|
||||
room_id,
|
||||
e,
|
||||
)
|
||||
continue
|
||||
|
||||
current_state_map: StateMap[EventBase] = {
|
||||
state_key: fetched_events[event_id]
|
||||
|
@ -2124,7 +2137,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
False
|
||||
)
|
||||
elif membership in (Membership.INVITE, Membership.KNOCK) or (
|
||||
membership == Membership.LEAVE and is_outlier
|
||||
membership in (Membership.LEAVE, Membership.BAN) and is_outlier
|
||||
):
|
||||
invite_or_knock_event_id = membership_event_id
|
||||
invite_or_knock_membership = membership
|
||||
|
@ -2135,7 +2148,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
# us a consistent view of the room state regardless of your
|
||||
# membership (i.e. the room shouldn't disappear if your using the
|
||||
# `is_encrypted` filter and you leave).
|
||||
if membership == Membership.LEAVE and is_outlier:
|
||||
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
|
||||
invite_or_knock_event_id, invite_or_knock_membership = (
|
||||
await self.db_pool.runInteraction(
|
||||
"sliding_sync_membership_snapshots_bg_update._find_previous_membership",
|
||||
|
@ -2182,7 +2195,15 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
await_full_state=False,
|
||||
)
|
||||
|
||||
fetched_events = await self.get_events(state_ids_map.values())
|
||||
try:
|
||||
fetched_events = await self.get_events(state_ids_map.values())
|
||||
except DatabaseCorruptionError as e:
|
||||
logger.warning(
|
||||
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
|
||||
room_id,
|
||||
e,
|
||||
)
|
||||
continue
|
||||
|
||||
state_map: StateMap[EventBase] = {
|
||||
state_key: fetched_events[event_id]
|
||||
|
@ -2296,7 +2317,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
(
|
||||
room_id,
|
||||
_room_id_from_rooms_table,
|
||||
_user_id,
|
||||
user_id,
|
||||
_sender,
|
||||
_membership_event_id,
|
||||
_membership,
|
||||
|
@ -2308,8 +2329,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
|||
progress = {
|
||||
"initial_phase": initial_phase,
|
||||
"last_room_id": room_id,
|
||||
"last_event_stream_ordering": membership_event_stream_ordering,
|
||||
"last_user_id": user_id,
|
||||
"last_event_stream_ordering": last_event_stream_ordering,
|
||||
}
|
||||
if not initial_phase:
|
||||
progress["last_event_stream_ordering"] = membership_event_stream_ordering
|
||||
|
||||
await self.db_pool.updates._background_update_progress(
|
||||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
||||
|
|
|
@ -98,6 +98,26 @@ if TYPE_CHECKING:
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DatabaseCorruptionError(RuntimeError):
|
||||
"""We found an event in the DB that has a persisted event ID that doesn't
|
||||
match its computed event ID."""
|
||||
|
||||
def __init__(
|
||||
self, room_id: str, persisted_event_id: str, computed_event_id: str
|
||||
) -> None:
|
||||
self.room_id = room_id
|
||||
self.persisted_event_id = persisted_event_id
|
||||
self.computed_event_id = computed_event_id
|
||||
|
||||
message = (
|
||||
f"Database corruption: Event {persisted_event_id} in room {room_id} "
|
||||
f"from the database appears to have been modified (calculated "
|
||||
f"event id {computed_event_id})"
|
||||
)
|
||||
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
# These values are used in the `enqueue_event` and `_fetch_loop` methods to
|
||||
# control how we batch/bulk fetch events from the database.
|
||||
# The values are plucked out of thing air to make initial sync run faster
|
||||
|
@ -1364,10 +1384,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
if original_ev.event_id != event_id:
|
||||
# it's difficult to see what to do here. Pretty much all bets are off
|
||||
# if Synapse cannot rely on the consistency of its database.
|
||||
raise RuntimeError(
|
||||
f"Database corruption: Event {event_id} in room {d['room_id']} "
|
||||
f"from the database appears to have been modified (calculated "
|
||||
f"event id {original_ev.event_id})"
|
||||
raise DatabaseCorruptionError(
|
||||
d["room_id"], event_id, original_ev.event_id
|
||||
)
|
||||
|
||||
event_map[event_id] = original_ev
|
||||
|
|
Loading…
Reference in New Issue