Populate `rooms.creator` field for easy lookup (#10697)
Part of https://github.com/matrix-org/synapse/pull/10566 - Fill in creator whenever we insert into the rooms table - Add background update to backfill any missing creator values
This commit is contained in:
parent
e059094119
commit
dc75fb7f05
|
@ -0,0 +1 @@
|
||||||
|
Ensure `rooms.creator` field is always populated for easy lookup in [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) usage later.
|
|
@ -198,6 +198,9 @@ class EventContentFields:
|
||||||
# cf https://github.com/matrix-org/matrix-doc/pull/1772
|
# cf https://github.com/matrix-org/matrix-doc/pull/1772
|
||||||
ROOM_TYPE = "type"
|
ROOM_TYPE = "type"
|
||||||
|
|
||||||
|
# The creator of the room, as used in `m.room.create` events.
|
||||||
|
ROOM_CREATOR = "creator"
|
||||||
|
|
||||||
# Used on normal messages to indicate they were historically imported after the fact
|
# Used on normal messages to indicate they were historically imported after the fact
|
||||||
MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
|
MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
|
||||||
# For "insertion" events to indicate what the next chunk ID should be in
|
# For "insertion" events to indicate what the next chunk ID should be in
|
||||||
|
|
|
@ -507,6 +507,7 @@ class FederationHandler(BaseHandler):
|
||||||
await self.store.upsert_room_on_join(
|
await self.store.upsert_room_on_join(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
room_version=room_version_obj,
|
room_version=room_version_obj,
|
||||||
|
auth_events=auth_chain,
|
||||||
)
|
)
|
||||||
|
|
||||||
max_stream_id = await self._persist_auth_tree(
|
max_stream_id = await self._persist_auth_tree(
|
||||||
|
|
|
@ -19,9 +19,10 @@ from abc import abstractmethod
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Any, Dict, List, Optional, Tuple
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, JoinRules
|
from synapse.api.constants import EventContentFields, EventTypes, JoinRules
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.api.room_versions import RoomVersion, RoomVersions
|
from synapse.api.room_versions import RoomVersion, RoomVersions
|
||||||
|
from synapse.events import EventBase
|
||||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||||
from synapse.storage.databases.main.search import SearchStore
|
from synapse.storage.databases.main.search import SearchStore
|
||||||
|
@ -1013,6 +1014,7 @@ class _BackgroundUpdates:
|
||||||
ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
|
ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
|
||||||
POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
|
POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
|
||||||
REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
|
REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
|
||||||
|
POPULATE_ROOMS_CREATOR_COLUMN = "populate_rooms_creator_column"
|
||||||
|
|
||||||
|
|
||||||
_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
|
_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
|
||||||
|
@ -1054,6 +1056,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
||||||
self._background_replace_room_depth_min_depth,
|
self._background_replace_room_depth_min_depth,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.db_pool.updates.register_background_update_handler(
|
||||||
|
_BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
|
||||||
|
self._background_populate_rooms_creator_column,
|
||||||
|
)
|
||||||
|
|
||||||
async def _background_insert_retention(self, progress, batch_size):
|
async def _background_insert_retention(self, progress, batch_size):
|
||||||
"""Retrieves a list of all rooms within a range and inserts an entry for each of
|
"""Retrieves a list of all rooms within a range and inserts an entry for each of
|
||||||
them into the room_retention table.
|
them into the room_retention table.
|
||||||
|
@ -1273,7 +1280,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
||||||
keyvalues={"room_id": room_id},
|
keyvalues={"room_id": room_id},
|
||||||
retcol="MAX(stream_ordering)",
|
retcol="MAX(stream_ordering)",
|
||||||
allow_none=True,
|
allow_none=True,
|
||||||
desc="upsert_room_on_join",
|
desc="has_auth_chain_index_fallback",
|
||||||
)
|
)
|
||||||
|
|
||||||
return max_ordering is None
|
return max_ordering is None
|
||||||
|
@ -1343,6 +1350,65 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
async def _background_populate_rooms_creator_column(
|
||||||
|
self, progress: dict, batch_size: int
|
||||||
|
):
|
||||||
|
"""Background update to go and add creator information to `rooms`
|
||||||
|
table from `current_state_events` table.
|
||||||
|
"""
|
||||||
|
|
||||||
|
last_room_id = progress.get("room_id", "")
|
||||||
|
|
||||||
|
def _background_populate_rooms_creator_column_txn(txn: LoggingTransaction):
|
||||||
|
sql = """
|
||||||
|
SELECT room_id, json FROM event_json
|
||||||
|
INNER JOIN rooms AS room USING (room_id)
|
||||||
|
INNER JOIN current_state_events AS state_event USING (room_id, event_id)
|
||||||
|
WHERE room_id > ? AND (room.creator IS NULL OR room.creator = '') AND state_event.type = 'm.room.create' AND state_event.state_key = ''
|
||||||
|
ORDER BY room_id
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (last_room_id, batch_size))
|
||||||
|
room_id_to_create_event_results = txn.fetchall()
|
||||||
|
|
||||||
|
new_last_room_id = ""
|
||||||
|
for room_id, event_json in room_id_to_create_event_results:
|
||||||
|
event_dict = db_to_json(event_json)
|
||||||
|
|
||||||
|
creator = event_dict.get("content").get(EventContentFields.ROOM_CREATOR)
|
||||||
|
|
||||||
|
self.db_pool.simple_update_txn(
|
||||||
|
txn,
|
||||||
|
table="rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
updatevalues={"creator": creator},
|
||||||
|
)
|
||||||
|
new_last_room_id = room_id
|
||||||
|
|
||||||
|
if new_last_room_id == "":
|
||||||
|
return True
|
||||||
|
|
||||||
|
self.db_pool.updates._background_update_progress_txn(
|
||||||
|
txn,
|
||||||
|
_BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
|
||||||
|
{"room_id": new_last_room_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
end = await self.db_pool.runInteraction(
|
||||||
|
"_background_populate_rooms_creator_column",
|
||||||
|
_background_populate_rooms_creator_column_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
if end:
|
||||||
|
await self.db_pool.updates._end_background_update(
|
||||||
|
_BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN
|
||||||
|
)
|
||||||
|
|
||||||
|
return batch_size
|
||||||
|
|
||||||
|
|
||||||
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||||
|
@ -1350,7 +1416,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||||
|
|
||||||
self.config = hs.config
|
self.config = hs.config
|
||||||
|
|
||||||
async def upsert_room_on_join(self, room_id: str, room_version: RoomVersion):
|
async def upsert_room_on_join(
|
||||||
|
self, room_id: str, room_version: RoomVersion, auth_events: List[EventBase]
|
||||||
|
):
|
||||||
"""Ensure that the room is stored in the table
|
"""Ensure that the room is stored in the table
|
||||||
|
|
||||||
Called when we join a room over federation, and overwrites any room version
|
Called when we join a room over federation, and overwrites any room version
|
||||||
|
@ -1361,6 +1429,24 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||||
# mark the room as having an auth chain cover index.
|
# mark the room as having an auth chain cover index.
|
||||||
has_auth_chain_index = await self.has_auth_chain_index(room_id)
|
has_auth_chain_index = await self.has_auth_chain_index(room_id)
|
||||||
|
|
||||||
|
create_event = None
|
||||||
|
for e in auth_events:
|
||||||
|
if (e.type, e.state_key) == (EventTypes.Create, ""):
|
||||||
|
create_event = e
|
||||||
|
break
|
||||||
|
|
||||||
|
if create_event is None:
|
||||||
|
# If the state doesn't have a create event then the room is
|
||||||
|
# invalid, and it would fail auth checks anyway.
|
||||||
|
raise StoreError(400, "No create event in state")
|
||||||
|
|
||||||
|
room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
|
||||||
|
|
||||||
|
if not isinstance(room_creator, str):
|
||||||
|
# If the create event does not have a creator then the room is
|
||||||
|
# invalid, and it would fail auth checks anyway.
|
||||||
|
raise StoreError(400, "No creator defined on the create event")
|
||||||
|
|
||||||
await self.db_pool.simple_upsert(
|
await self.db_pool.simple_upsert(
|
||||||
desc="upsert_room_on_join",
|
desc="upsert_room_on_join",
|
||||||
table="rooms",
|
table="rooms",
|
||||||
|
@ -1368,7 +1454,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||||
values={"room_version": room_version.identifier},
|
values={"room_version": room_version.identifier},
|
||||||
insertion_values={
|
insertion_values={
|
||||||
"is_public": False,
|
"is_public": False,
|
||||||
"creator": "",
|
"creator": room_creator,
|
||||||
"has_auth_chain_index": has_auth_chain_index,
|
"has_auth_chain_index": has_auth_chain_index,
|
||||||
},
|
},
|
||||||
# rooms has a unique constraint on room_id, so no need to lock when doing an
|
# rooms has a unique constraint on room_id, so no need to lock when doing an
|
||||||
|
@ -1396,6 +1482,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||||
insertion_values={
|
insertion_values={
|
||||||
"room_version": room_version.identifier,
|
"room_version": room_version.identifier,
|
||||||
"is_public": False,
|
"is_public": False,
|
||||||
|
# We don't worry about setting the `creator` here because
|
||||||
|
# we don't process any messages in a room while a user is
|
||||||
|
# invited (only after the join).
|
||||||
"creator": "",
|
"creator": "",
|
||||||
"has_auth_chain_index": has_auth_chain_index,
|
"has_auth_chain_index": has_auth_chain_index,
|
||||||
},
|
},
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
INSERT INTO background_updates (ordering, update_name, progress_json)
|
||||||
|
VALUES (6302, 'populate_rooms_creator_column', '{}');
|
|
@ -0,0 +1,98 @@
|
||||||
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from synapse.rest import admin
|
||||||
|
from synapse.rest.client import login, room
|
||||||
|
from synapse.storage.databases.main.room import _BackgroundUpdates
|
||||||
|
|
||||||
|
from tests.unittest import HomeserverTestCase
|
||||||
|
|
||||||
|
|
||||||
|
class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
|
|
||||||
|
servlets = [
|
||||||
|
admin.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def prepare(self, reactor, clock, hs):
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
self.user_id = self.register_user("foo", "pass")
|
||||||
|
self.token = self.login("foo", "pass")
|
||||||
|
|
||||||
|
def _generate_room(self) -> str:
|
||||||
|
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
||||||
|
|
||||||
|
return room_id
|
||||||
|
|
||||||
|
def test_background_populate_rooms_creator_column(self):
|
||||||
|
"""Test that the background update to populate the rooms creator column
|
||||||
|
works properly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Insert a room without the creator
|
||||||
|
room_id = self._generate_room()
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.simple_update(
|
||||||
|
table="rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
updatevalues={"creator": None},
|
||||||
|
desc="test",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure the test is starting out with a room without a creator
|
||||||
|
room_creator_before = self.get_success(
|
||||||
|
self.store.db_pool.simple_select_one_onecol(
|
||||||
|
table="rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
retcol="creator",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertEqual(room_creator_before, None)
|
||||||
|
|
||||||
|
# Insert and run the background update.
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.simple_insert(
|
||||||
|
"background_updates",
|
||||||
|
{
|
||||||
|
"update_name": _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
|
||||||
|
"progress_json": "{}",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ... and tell the DataStore that it hasn't finished all updates yet
|
||||||
|
self.store.db_pool.updates._all_done = False
|
||||||
|
|
||||||
|
# Now let's actually drive the updates to completion
|
||||||
|
while not self.get_success(
|
||||||
|
self.store.db_pool.updates.has_completed_background_updates()
|
||||||
|
):
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure the background update filled in the room creator
|
||||||
|
room_creator_after = self.get_success(
|
||||||
|
self.store.db_pool.simple_select_one_onecol(
|
||||||
|
table="rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
retcol="creator",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertEqual(room_creator_after, self.user_id)
|
Loading…
Reference in New Issue