WIP: make hs2 backfill historical messages after marker event

This commit is contained in:
Eric Eastwood 2021-07-17 03:00:10 -05:00
parent 435f074541
commit e0e1bd0550
4 changed files with 93 additions and 24 deletions

View File

@ -926,7 +926,9 @@ class FederationHandler(BaseHandler):
[insertion_event_id],
)
insertion_event = await self.store.get_event(insertion_event_id, allow_none=True)
insertion_event = await self.store.get_event(
insertion_event_id, allow_none=True
)
if insertion_event is None:
logger.warning(
"_handle_marker_event: server %s didn't return insertion event %s for marker %s",
@ -942,6 +944,10 @@ class FederationHandler(BaseHandler):
marker_event,
)
await self.store.insert_backward_extremity(
insertion_event_id, marker_event.room_id
)
async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
@ -1110,7 +1116,12 @@ class FederationHandler(BaseHandler):
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
oldest_events = await self.store.get_oldest_events_with_depth_in_room(room_id)
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)
extremities = {**oldest_events, **insertion_events_to_be_backfilled}
logger.info("_maybe_backfill_inner: extremities %s", extremities)
if not extremities:
logger.debug("Not backfilling as no extremeties found.")
@ -1143,12 +1154,14 @@ class FederationHandler(BaseHandler):
# types have.
forward_events = await self.store.get_successor_events(list(extremities))
logger.info("_maybe_backfill_inner: forward_events %s", forward_events)
extremities_events = await self.store.get_events(
forward_events,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
logger.info("_maybe_backfill_inner: extremities_events %s", extremities_events)
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
@ -1159,6 +1172,9 @@ class FederationHandler(BaseHandler):
redact=False,
check_history_visibility_only=True,
)
logger.info(
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
)
if not filtered_extremities:
return False
@ -1177,7 +1193,7 @@ class FederationHandler(BaseHandler):
# much larger factor will result in triggering a backfill request much
# earlier than necessary.
if current_depth - 2 * limit > max_depth:
logger.debug(
logger.info(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
max_depth,
current_depth,
@ -1185,7 +1201,7 @@ class FederationHandler(BaseHandler):
)
return False
logger.debug(
logger.info(
"room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
room_id,
current_depth,

View File

@ -666,27 +666,49 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return {eid for eid, n in event_to_missing_sets.items() if n}
async def get_oldest_events_with_depth_in_room(self, room_id):
def get_oldest_events_with_depth_in_room_txn(txn, room_id):
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
" ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)
txn.execute(sql, (room_id, False))
return dict(txn)
return await self.db_pool.runInteraction(
"get_oldest_events_with_depth_in_room",
self.get_oldest_events_with_depth_in_room_txn,
get_oldest_events_with_depth_in_room_txn,
room_id,
)
def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
" ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
async def get_insertion_event_backwards_extremities_in_room(self, room_id):
def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
INNER JOIN event_backward_extremities as b USING (event_id)
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE b.room_id = ?
GROUP BY b.event_id
"""
txn.execute(sql, (room_id,))
return dict(txn)
return await self.db_pool.runInteraction(
"get_insertion_event_backwards_extremities_in_room",
get_insertion_event_backwards_extremities_in_room_txn,
room_id,
)
txn.execute(sql, (room_id, False))
return dict(txn)
async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
"""Returns the event ID and depth for the event that has the max depth from a set of event IDs
@ -929,7 +951,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return sorted(events, key=lambda e: -e.depth)
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
logger.info("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
event_results = set()
@ -1122,6 +1144,21 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
async def insert_backward_extremity(self, event_id: str, room_id: str) -> None:
def _insert_backward_extremity_txn(txn):
self.db_pool.simple_insert_txn(
txn,
table="event_backward_extremities",
values={
"event_id": event_id,
"room_id": room_id,
},
)
await self.db_pool.runInteraction(
"_insert_backward_extremity_txn", _insert_backward_extremity_txn
)
async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:

View File

@ -2091,18 +2091,21 @@ class PersistEventsStore:
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)
# From the events passed in, add all of the prev events as backwards extremities.
# Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)
logger.info("_update_backward_extremeties %s", events)
txn.execute_batch(
query,
[
@ -2113,6 +2116,8 @@ class PersistEventsStore:
],
)
# Delete all these events that we've already fetched and now know that their
# prev events are the new outliers.
query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"

View File

@ -366,13 +366,21 @@ async def filter_events_for_server(
if erased_senders:
to_return = []
for e in events:
if not is_sender_erased(e, erased_senders):
erased = is_sender_erased(e, erased_senders)
logger.info(
"filter_events_for_server: (all_open) %s erased=%s", e, erased
)
if not erased:
to_return.append(e)
elif redact:
to_return.append(prune_event(e))
logger.info("filter_events_for_server: (all_open) to_return=%s", to_return)
return to_return
logger.info(
"filter_events_for_server: all_open and no erased senders %s", events
)
# If there are no erased users then we can just return the given list
# of events without having to copy it.
return events
@ -429,6 +437,9 @@ async def filter_events_for_server(
for e in events:
erased = is_sender_erased(e, erased_senders)
visible = check_event_is_visible(e, event_to_state[e.event_id])
logger.info(
"filter_events_for_server: %s erased=%s visible=%s", e, erased, visible
)
if visible and not erased:
to_return.append(e)
elif redact: