Purge chain cover tables when purging events. (#9498)
This commit is contained in:
parent
d790d0d314
commit
922788c604
|
@ -0,0 +1 @@
|
||||||
|
Properly purge the event chain cover index when purging history.
|
|
@ -28,7 +28,10 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
||||||
async def purge_history(
|
async def purge_history(
|
||||||
self, room_id: str, token: str, delete_local_events: bool
|
self, room_id: str, token: str, delete_local_events: bool
|
||||||
) -> Set[int]:
|
) -> Set[int]:
|
||||||
"""Deletes room history before a certain point
|
"""Deletes room history before a certain point.
|
||||||
|
|
||||||
|
Note that only a single purge can occur at once, this is guaranteed via
|
||||||
|
a higher level (in the PaginationHandler).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_id:
|
room_id:
|
||||||
|
@ -52,7 +55,9 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
||||||
delete_local_events,
|
delete_local_events,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _purge_history_txn(self, txn, room_id, token, delete_local_events):
|
def _purge_history_txn(
|
||||||
|
self, txn, room_id: str, token: RoomStreamToken, delete_local_events: bool
|
||||||
|
) -> Set[int]:
|
||||||
# Tables that should be pruned:
|
# Tables that should be pruned:
|
||||||
# event_auth
|
# event_auth
|
||||||
# event_backward_extremities
|
# event_backward_extremities
|
||||||
|
@ -103,7 +108,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
||||||
if max_depth < token.topological:
|
if max_depth < token.topological:
|
||||||
# We need to ensure we don't delete all the events from the database
|
# We need to ensure we don't delete all the events from the database
|
||||||
# otherwise we wouldn't be able to send any events (due to not
|
# otherwise we wouldn't be able to send any events (due to not
|
||||||
# having any backwards extremeties)
|
# having any backwards extremities)
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
400, "topological_ordering is greater than forward extremeties"
|
400, "topological_ordering is greater than forward extremeties"
|
||||||
)
|
)
|
||||||
|
@ -154,7 +159,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
logger.info("[purge] Finding new backward extremities")
|
logger.info("[purge] Finding new backward extremities")
|
||||||
|
|
||||||
# We calculate the new entries for the backward extremeties by finding
|
# We calculate the new entries for the backward extremities by finding
|
||||||
# events to be purged that are pointed to by events we're not going to
|
# events to be purged that are pointed to by events we're not going to
|
||||||
# purge.
|
# purge.
|
||||||
txn.execute(
|
txn.execute(
|
||||||
|
@ -296,7 +301,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
||||||
"purge_room", self._purge_room_txn, room_id
|
"purge_room", self._purge_room_txn, room_id
|
||||||
)
|
)
|
||||||
|
|
||||||
def _purge_room_txn(self, txn, room_id):
|
def _purge_room_txn(self, txn, room_id: str) -> List[int]:
|
||||||
# First we fetch all the state groups that should be deleted, before
|
# First we fetch all the state groups that should be deleted, before
|
||||||
# we delete that information.
|
# we delete that information.
|
||||||
txn.execute(
|
txn.execute(
|
||||||
|
@ -310,6 +315,31 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
state_groups = [row[0] for row in txn]
|
state_groups = [row[0] for row in txn]
|
||||||
|
|
||||||
|
# Get all the auth chains that are referenced by events that are to be
|
||||||
|
# deleted.
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
SELECT chain_id, sequence_number FROM events
|
||||||
|
LEFT JOIN event_auth_chains USING (event_id)
|
||||||
|
WHERE room_id = ?
|
||||||
|
""",
|
||||||
|
(room_id,),
|
||||||
|
)
|
||||||
|
referenced_chain_id_tuples = list(txn)
|
||||||
|
|
||||||
|
logger.info("[purge] removing events from event_auth_chain_links")
|
||||||
|
txn.executemany(
|
||||||
|
"""
|
||||||
|
DELETE FROM event_auth_chain_links WHERE
|
||||||
|
(origin_chain_id = ? AND origin_sequence_number = ?) OR
|
||||||
|
(target_chain_id = ? AND target_sequence_number = ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
(chain_id, seq_num, chain_id, seq_num)
|
||||||
|
for (chain_id, seq_num) in referenced_chain_id_tuples
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
# Now we delete tables which lack an index on room_id but have one on event_id
|
# Now we delete tables which lack an index on room_id but have one on event_id
|
||||||
for table in (
|
for table in (
|
||||||
"event_auth",
|
"event_auth",
|
||||||
|
@ -319,6 +349,8 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
||||||
"event_reference_hashes",
|
"event_reference_hashes",
|
||||||
"event_relations",
|
"event_relations",
|
||||||
"event_to_state_groups",
|
"event_to_state_groups",
|
||||||
|
"event_auth_chains",
|
||||||
|
"event_auth_chain_to_calculate",
|
||||||
"redactions",
|
"redactions",
|
||||||
"rejections",
|
"rejections",
|
||||||
"state_events",
|
"state_events",
|
||||||
|
|
|
@ -73,9 +73,6 @@ class PurgeEventsStorage:
|
||||||
Returns:
|
Returns:
|
||||||
The set of state groups that can be deleted.
|
The set of state groups that can be deleted.
|
||||||
"""
|
"""
|
||||||
# Graph of state group -> previous group
|
|
||||||
graph = {}
|
|
||||||
|
|
||||||
# Set of events that we have found to be referenced by events
|
# Set of events that we have found to be referenced by events
|
||||||
referenced_groups = set()
|
referenced_groups = set()
|
||||||
|
|
||||||
|
@ -111,8 +108,6 @@ class PurgeEventsStorage:
|
||||||
next_to_search |= prevs
|
next_to_search |= prevs
|
||||||
state_groups_seen |= prevs
|
state_groups_seen |= prevs
|
||||||
|
|
||||||
graph.update(edges)
|
|
||||||
|
|
||||||
to_delete = state_groups_seen - referenced_groups
|
to_delete = state_groups_seen - referenced_groups
|
||||||
|
|
||||||
return to_delete
|
return to_delete
|
||||||
|
|
Loading…
Reference in New Issue