Change to not require a state_groups.room_id index.
This does mean that we won't clean up orphaned state groups (i.e. state groups that were persisted but the associated event wasn't).
This commit is contained in:
parent
6a0092d371
commit
7134ca7daa
|
@ -1624,7 +1624,10 @@ class EventsStore(
|
||||||
"""Deletes all record of a room
|
"""Deletes all record of a room
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_id (str):
|
room_id (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[List[int]]: The list of state groups to delete.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
|
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
|
||||||
|
@ -1714,10 +1717,24 @@ class EventsStore(
|
||||||
# index on them. In any case we should be clearing out 'stream' tables
|
# index on them. In any case we should be clearing out 'stream' tables
|
||||||
# periodically anyway (#5888)
|
# periodically anyway (#5888)
|
||||||
|
|
||||||
|
# Now we fetch all the state groups that should be deleted.
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
SELECT DISTINCT state_group FROM events
|
||||||
|
INNER JOIN event_to_state_groups USING(event_id)
|
||||||
|
WHERE events.room_id = ?
|
||||||
|
""",
|
||||||
|
(room_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
state_groups = [row[0] for row in txn]
|
||||||
|
|
||||||
# TODO: we could probably usefully do a bunch of cache invalidation here
|
# TODO: we could probably usefully do a bunch of cache invalidation here
|
||||||
|
|
||||||
logger.info("[purge] done")
|
logger.info("[purge] done")
|
||||||
|
|
||||||
|
return state_groups
|
||||||
|
|
||||||
def purge_unreferenced_state_groups(
|
def purge_unreferenced_state_groups(
|
||||||
self, room_id: str, state_groups_to_delete: Set[int]
|
self, room_id: str, state_groups_to_delete: Set[int]
|
||||||
) -> defer.Deferred:
|
) -> defer.Deferred:
|
||||||
|
@ -1825,54 +1842,53 @@ class EventsStore(
|
||||||
|
|
||||||
return {row["state_group"]: row["prev_state_group"] for row in rows}
|
return {row["state_group"]: row["prev_state_group"] for row in rows}
|
||||||
|
|
||||||
def purge_room_state(self, room_id):
|
def purge_room_state(self, room_id, state_groups_to_delete):
|
||||||
"""Deletes all record of a room from state tables
|
"""Deletes all record of a room from state tables
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_id (str):
|
room_id (str):
|
||||||
|
state_groups_to_delete (list[int]): State groups to delete
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"purge_room_state", self._purge_room_state_txn, room_id
|
"purge_room_state",
|
||||||
|
self._purge_room_state_txn,
|
||||||
|
room_id,
|
||||||
|
state_groups_to_delete,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _purge_room_state_txn(self, txn, room_id):
|
def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
|
||||||
# first we have to delete the state groups states
|
# first we have to delete the state groups states
|
||||||
logger.info("[purge] removing %s from state_groups_state", room_id)
|
logger.info("[purge] removing %s from state_groups_state", room_id)
|
||||||
|
|
||||||
txn.execute(
|
self._simple_delete_many_txn(
|
||||||
"""
|
txn,
|
||||||
DELETE FROM state_groups_state
|
table="state_groups_state",
|
||||||
WHERE state_group IN (
|
column="state_group",
|
||||||
SELECT state_group FROM state_groups
|
iterable=state_groups_to_delete,
|
||||||
WHERE room_id = ?
|
keyvalues={},
|
||||||
)
|
|
||||||
""",
|
|
||||||
(room_id,),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# ... and the state group edges
|
# ... and the state group edges
|
||||||
logger.info("[purge] removing %s from state_group_edges", room_id)
|
logger.info("[purge] removing %s from state_group_edges", room_id)
|
||||||
|
|
||||||
txn.execute(
|
self._simple_delete_many_txn(
|
||||||
"""
|
txn,
|
||||||
DELETE FROM state_group_edges
|
table="state_group_edges",
|
||||||
WHERE state_group IN (
|
column="state_group",
|
||||||
SELECT state_group FROM state_groups
|
iterable=state_groups_to_delete,
|
||||||
WHERE room_id = ?
|
keyvalues={},
|
||||||
)
|
|
||||||
""",
|
|
||||||
(room_id,),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# ... and the state groups
|
# ... and the state groups
|
||||||
logger.info("[purge] removing %s from state_groups", room_id)
|
logger.info("[purge] removing %s from state_groups", room_id)
|
||||||
|
|
||||||
txn.execute(
|
self._simple_delete_many_txn(
|
||||||
"""
|
txn,
|
||||||
DELETE FROM state_groups WHERE room_id = ?
|
table="state_groups",
|
||||||
""",
|
column="id",
|
||||||
(room_id,),
|
iterable=state_groups_to_delete,
|
||||||
|
keyvalues={},
|
||||||
)
|
)
|
||||||
|
|
||||||
async def is_event_after(self, event_id1, event_id2):
|
async def is_event_after(self, event_id1, event_id2):
|
||||||
|
|
|
@ -1,17 +0,0 @@
|
||||||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
|
||||||
('state_groups_room_id_idx', '{}');
|
|
|
@ -1023,7 +1023,6 @@ class StateBackgroundUpdateStore(
|
||||||
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
|
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
|
||||||
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
|
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
|
||||||
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
|
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
|
||||||
STATE_GROUPS_ROOM_INDEX_UPDATE_NAME = "state_groups_room_id_idx"
|
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(StateBackgroundUpdateStore, self).__init__(db_conn, hs)
|
super(StateBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||||
|
@ -1047,12 +1046,6 @@ class StateBackgroundUpdateStore(
|
||||||
table="event_to_state_groups",
|
table="event_to_state_groups",
|
||||||
columns=["state_group"],
|
columns=["state_group"],
|
||||||
)
|
)
|
||||||
self.register_background_index_update(
|
|
||||||
self.STATE_GROUPS_ROOM_INDEX_UPDATE_NAME,
|
|
||||||
index_name="state_groups_room_id_idx",
|
|
||||||
table="state_groups",
|
|
||||||
columns=["room_id"],
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _background_deduplicate_state(self, progress, batch_size):
|
def _background_deduplicate_state(self, progress, batch_size):
|
||||||
|
|
|
@ -33,8 +33,8 @@ class PurgeEventsStorage(object):
|
||||||
"""Deletes all record of a room
|
"""Deletes all record of a room
|
||||||
"""
|
"""
|
||||||
|
|
||||||
yield self.stores.main.purge_room(room_id)
|
state_groups_to_delete = yield self.stores.main.purge_room(room_id)
|
||||||
yield self.stores.main.purge_room_state(room_id)
|
yield self.stores.main.purge_room_state(room_id, state_groups_to_delete)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def purge_history(self, room_id, token, delete_local_events):
|
def purge_history(self, room_id, token, delete_local_events):
|
||||||
|
|
Loading…
Reference in New Issue