Don't keep old stream_ordering_to_exterm around (#15382)
This commit is contained in:
parent
72b43bec8b
commit
485b9fdefb
|
@ -0,0 +1 @@
|
||||||
|
Improve DB performance of clearing out old data from `stream_ordering_to_exterm`.
|
|
@ -215,6 +215,16 @@ class DeviceWorkerHandler:
|
||||||
possibly_changed = set(changed)
|
possibly_changed = set(changed)
|
||||||
possibly_left = set()
|
possibly_left = set()
|
||||||
for room_id in rooms_changed:
|
for room_id in rooms_changed:
|
||||||
|
# Check if the forward extremities have changed. If not then we know
|
||||||
|
# the current state won't have changed, and so we can skip this room.
|
||||||
|
try:
|
||||||
|
if not await self.store.have_room_forward_extremities_changed_since(
|
||||||
|
room_id, stream_ordering
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
except errors.StoreError:
|
||||||
|
pass
|
||||||
|
|
||||||
current_state_ids = await self._state_storage.get_current_state_ids(
|
current_state_ids = await self._state_storage.get_current_state_ids(
|
||||||
room_id, await_full_state=False
|
room_id, await_full_state=False
|
||||||
)
|
)
|
||||||
|
|
|
@ -1171,6 +1171,38 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
|
|
||||||
return int(min_depth) if min_depth is not None else None
|
return int(min_depth) if min_depth is not None else None
|
||||||
|
|
||||||
|
async def have_room_forward_extremities_changed_since(
|
||||||
|
self,
|
||||||
|
room_id: str,
|
||||||
|
stream_ordering: int,
|
||||||
|
) -> bool:
|
||||||
|
"""Check if the forward extremities in a room have changed since the
|
||||||
|
given stream ordering
|
||||||
|
|
||||||
|
Throws a StoreError if we have since purged the index for
|
||||||
|
stream_orderings from that point.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined]
|
||||||
|
raise StoreError(400, f"stream_ordering too old {stream_ordering}")
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT 1 FROM stream_ordering_to_exterm
|
||||||
|
WHERE stream_ordering > ? AND room_id = ?
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
def have_room_forward_extremities_changed_since_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> bool:
|
||||||
|
txn.execute(sql, (stream_ordering, room_id))
|
||||||
|
return txn.fetchone() is not None
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"have_room_forward_extremities_changed_since",
|
||||||
|
have_room_forward_extremities_changed_since_txn,
|
||||||
|
)
|
||||||
|
|
||||||
@cancellable
|
@cancellable
|
||||||
async def get_forward_extremities_for_room_at_stream_ordering(
|
async def get_forward_extremities_for_room_at_stream_ordering(
|
||||||
self, room_id: str, stream_ordering: int
|
self, room_id: str, stream_ordering: int
|
||||||
|
@ -1232,10 +1264,17 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
txn.execute(sql, (stream_ordering, room_id))
|
txn.execute(sql, (stream_ordering, room_id))
|
||||||
return [event_id for event_id, in txn]
|
return [event_id for event_id, in txn]
|
||||||
|
|
||||||
return await self.db_pool.runInteraction(
|
event_ids = await self.db_pool.runInteraction(
|
||||||
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
|
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# If we didn't find any IDs, then we must have cleared out the
|
||||||
|
# associated `stream_ordering_to_exterm`.
|
||||||
|
if not event_ids:
|
||||||
|
raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
|
||||||
|
|
||||||
|
return event_ids
|
||||||
|
|
||||||
def _get_connected_batch_event_backfill_results_txn(
|
def _get_connected_batch_event_backfill_results_txn(
|
||||||
self, txn: LoggingTransaction, insertion_event_id: str, limit: int
|
self, txn: LoggingTransaction, insertion_event_id: str, limit: int
|
||||||
) -> List[BackfillQueueNavigationItem]:
|
) -> List[BackfillQueueNavigationItem]:
|
||||||
|
@ -1664,19 +1703,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
@wrap_as_background_process("delete_old_forward_extrem_cache")
|
@wrap_as_background_process("delete_old_forward_extrem_cache")
|
||||||
async def _delete_old_forward_extrem_cache(self) -> None:
|
async def _delete_old_forward_extrem_cache(self) -> None:
|
||||||
def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None:
|
def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None:
|
||||||
# Delete entries older than a month, while making sure we don't delete
|
|
||||||
# the only entries for a room.
|
|
||||||
sql = """
|
sql = """
|
||||||
DELETE FROM stream_ordering_to_exterm
|
DELETE FROM stream_ordering_to_exterm
|
||||||
WHERE
|
WHERE stream_ordering < ?
|
||||||
room_id IN (
|
|
||||||
SELECT room_id
|
|
||||||
FROM stream_ordering_to_exterm
|
|
||||||
WHERE stream_ordering > ?
|
|
||||||
) AND stream_ordering < ?
|
|
||||||
"""
|
"""
|
||||||
txn.execute(
|
txn.execute(
|
||||||
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) # type: ignore[attr-defined]
|
sql, (self.stream_ordering_month_ago) # type: ignore[attr-defined]
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
|
|
Loading…
Reference in New Issue