From e0e1bd05505ee159c396a1749d90bfcbe06ada93 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Sat, 17 Jul 2021 03:00:10 -0500 Subject: [PATCH] WIP: make hs2 backfill historical messages after marker event --- synapse/handlers/federation.py | 24 +++++-- .../databases/main/event_federation.py | 67 ++++++++++++++----- synapse/storage/databases/main/events.py | 13 ++-- synapse/visibility.py | 13 +++- 4 files changed, 93 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 41ecd8cebc..c6b3a1bf3a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -926,7 +926,9 @@ class FederationHandler(BaseHandler): [insertion_event_id], ) - insertion_event = await self.store.get_event(insertion_event_id, allow_none=True) + insertion_event = await self.store.get_event( + insertion_event_id, allow_none=True + ) if insertion_event is None: logger.warning( "_handle_marker_event: server %s didn't return insertion event %s for marker %s", @@ -942,6 +944,10 @@ class FederationHandler(BaseHandler): marker_event, ) + await self.store.insert_backward_extremity( + insertion_event_id, marker_event.room_id + ) + async def _resync_device(self, sender: str) -> None: """We have detected that the device list for the given user may be out of sync, so we try and resync them. @@ -1110,7 +1116,12 @@ class FederationHandler(BaseHandler): async def _maybe_backfill_inner( self, room_id: str, current_depth: int, limit: int ) -> bool: - extremities = await self.store.get_oldest_events_with_depth_in_room(room_id) + oldest_events = await self.store.get_oldest_events_with_depth_in_room(room_id) + insertion_events_to_be_backfilled = ( + await self.store.get_insertion_event_backwards_extremities_in_room(room_id) + ) + extremities = {**oldest_events, **insertion_events_to_be_backfilled} + logger.info("_maybe_backfill_inner: extremities %s", extremities) if not extremities: logger.debug("Not backfilling as no extremeties found.") @@ -1143,12 +1154,14 @@ class FederationHandler(BaseHandler): # types have. forward_events = await self.store.get_successor_events(list(extremities)) + logger.info("_maybe_backfill_inner: forward_events %s", forward_events) extremities_events = await self.store.get_events( forward_events, redact_behaviour=EventRedactBehaviour.AS_IS, get_prev_content=False, ) + logger.info("_maybe_backfill_inner: extremities_events %s", extremities_events) # We set `check_history_visibility_only` as we might otherwise get false # positives from users having been erased. @@ -1159,6 +1172,9 @@ class FederationHandler(BaseHandler): redact=False, check_history_visibility_only=True, ) + logger.info( + "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities + ) if not filtered_extremities: return False @@ -1177,7 +1193,7 @@ class FederationHandler(BaseHandler): # much larger factor will result in triggering a backfill request much # earlier than necessary. if current_depth - 2 * limit > max_depth: - logger.debug( + logger.info( "Not backfilling as we don't need to. %d < %d - 2 * %d", max_depth, current_depth, @@ -1185,7 +1201,7 @@ class FederationHandler(BaseHandler): ) return False - logger.debug( + logger.info( "room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s", room_id, current_depth, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index a2460fb7ca..8bbdb06c74 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -666,27 +666,49 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return {eid for eid, n in event_to_missing_sets.items() if n} async def get_oldest_events_with_depth_in_room(self, room_id): + def get_oldest_events_with_depth_in_room_txn(txn, room_id): + sql = ( + "SELECT b.event_id, MAX(e.depth) FROM events as e" + " INNER JOIN event_edges as g" + " ON g.event_id = e.event_id" + " INNER JOIN event_backward_extremities as b" + " ON g.prev_event_id = b.event_id" + " WHERE b.room_id = ? AND g.is_state is ?" + " GROUP BY b.event_id" + ) + + txn.execute(sql, (room_id, False)) + + return dict(txn) + return await self.db_pool.runInteraction( "get_oldest_events_with_depth_in_room", - self.get_oldest_events_with_depth_in_room_txn, + get_oldest_events_with_depth_in_room_txn, room_id, ) - def get_oldest_events_with_depth_in_room_txn(self, txn, room_id): - sql = ( - "SELECT b.event_id, MAX(e.depth) FROM events as e" - " INNER JOIN event_edges as g" - " ON g.event_id = e.event_id" - " INNER JOIN event_backward_extremities as b" - " ON g.prev_event_id = b.event_id" - " WHERE b.room_id = ? AND g.is_state is ?" - " GROUP BY b.event_id" + async def get_insertion_event_backwards_extremities_in_room(self, room_id): + def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id): + sql = """ + SELECT b.event_id, MAX(e.depth) FROM insertion_events as i + /* We only want insertion events that are also marked as backwards extremities */ + INNER JOIN event_backward_extremities as b USING (event_id) + /* Get the depth of the insertion event from the events table */ + INNER JOIN events AS e USING (event_id) + WHERE b.room_id = ? + GROUP BY b.event_id + """ + + txn.execute(sql, (room_id,)) + + return dict(txn) + + return await self.db_pool.runInteraction( + "get_insertion_event_backwards_extremities_in_room", + get_insertion_event_backwards_extremities_in_room_txn, + room_id, ) - txn.execute(sql, (room_id, False)) - - return dict(txn) - async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]: """Returns the event ID and depth for the event that has the max depth from a set of event IDs @@ -929,7 +951,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return sorted(events, key=lambda e: -e.depth) def _get_backfill_events(self, txn, room_id, event_list, limit): - logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) + logger.info("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) event_results = set() @@ -1122,6 +1144,21 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas _delete_old_forward_extrem_cache_txn, ) + async def insert_backward_extremity(self, event_id: str, room_id: str) -> None: + def _insert_backward_extremity_txn(txn): + self.db_pool.simple_insert_txn( + txn, + table="event_backward_extremities", + values={ + "event_id": event_id, + "room_id": room_id, + }, + ) + + await self.db_pool.runInteraction( + "_insert_backward_extremity_txn", _insert_backward_extremity_txn + ) + async def insert_received_event_to_staging( self, origin: str, event: EventBase ) -> None: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 201301440f..2041d49a10 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2091,18 +2091,21 @@ class PersistEventsStore: for ev in events: events_by_room.setdefault(ev.room_id, []).append(ev) + # From the events passed in, add all of the prev events as backwards extremities. + # Ignore any events that are already backwards extrems or outliers. query = ( "INSERT INTO event_backward_extremities (event_id, room_id)" " SELECT ?, ? WHERE NOT EXISTS (" - " SELECT 1 FROM event_backward_extremities" - " WHERE event_id = ? AND room_id = ?" + " SELECT 1 FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" " )" " AND NOT EXISTS (" - " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " - " AND outlier = ?" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " + " AND outlier = ?" " )" ) + logger.info("_update_backward_extremeties %s", events) txn.execute_batch( query, [ @@ -2113,6 +2116,8 @@ class PersistEventsStore: ], ) + # Delete all these events that we've already fetched and now know that their + # prev events are the new outliers. query = ( "DELETE FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" diff --git a/synapse/visibility.py b/synapse/visibility.py index 490fb26e81..e4bd4b077d 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -366,13 +366,21 @@ async def filter_events_for_server( if erased_senders: to_return = [] for e in events: - if not is_sender_erased(e, erased_senders): + erased = is_sender_erased(e, erased_senders) + logger.info( + "filter_events_for_server: (all_open) %s erased=%s", e, erased + ) + if not erased: to_return.append(e) elif redact: to_return.append(prune_event(e)) + logger.info("filter_events_for_server: (all_open) to_return=%s", to_return) return to_return + logger.info( + "filter_events_for_server: all_open and no erased senders %s", events + ) # If there are no erased users then we can just return the given list # of events without having to copy it. return events @@ -429,6 +437,9 @@ async def filter_events_for_server( for e in events: erased = is_sender_erased(e, erased_senders) visible = check_event_is_visible(e, event_to_state[e.event_id]) + logger.info( + "filter_events_for_server: %s erased=%s visible=%s", e, erased, visible + ) if visible and not erased: to_return.append(e) elif redact: