Make historical events discoverable from backfill for servers without any scrollback history (MSC2716) (#10245)
* Make historical messages available to federated servers Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716 Follow-up to https://github.com/matrix-org/synapse/pull/9247 * Debug message not available on federation * Add base starting insertion point when no chunk ID is provided * Fix messages from multiple senders in historical chunk Follow-up to https://github.com/matrix-org/synapse/pull/9247 Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716 --- Previously, Synapse would throw a 403, `Cannot force another user to join.`, because we were trying to use `?user_id` from a single virtual user which did not match with messages from other users in the chunk. * Remove debug lines * Messing with selecting insertion event extremeties * Move db schema change to new version * Add more better comments * Make a fake requester with just what we need See https://github.com/matrix-org/synapse/pull/10276#discussion_r660999080 * Store insertion events in table * Make base insertion event float off on its own See https://github.com/matrix-org/synapse/pull/10250#issuecomment-875711889 Conflicts: synapse/rest/client/v1/room.py * Validate that the app service can actually control the given user See https://github.com/matrix-org/synapse/pull/10276#issuecomment-876316455 Conflicts: synapse/rest/client/v1/room.py * Add some better comments on what we're trying to check for * Continue debugging * Share validation logic * Add inserted historical messages to /backfill response * Remove debug sql queries * Some marker event implemntation trials * Clean up PR * Rename insertion_event_id to just event_id * Add some better sql comments * More accurate description * Add changelog * Make it clear what MSC the change is part of * Add more detail on which insertion event came through * Address review and improve sql queries * Only use event_id as unique constraint * Fix test case where insertion event is already in the normal DAG * Remove debug changes * Switch to chunk events so we can auth via power_levels Previously, we were using `content.chunk_id` to connect one chunk to another. But these events can be from any `sender` and we can't tell who should be able to send historical events. We know we only want the application service to do it but these events have the sender of a real historical message, not the application service user ID as the sender. Other federated homeservers also have no indicator which senders are an application service on the originating homeserver. So we want to auth all of the MSC2716 events via power_levels and have them be sent by the application service with proper PL levels in the room. * Switch to chunk events for federation * Add unstable room version to support new historical PL * Fix federated events being rejected for no state_groups Add fix from https://github.com/matrix-org/synapse/pull/10439 until it merges. * Only connect base insertion event to prev_event_ids Per discussion with @erikjohnston, https://matrix.to/#/!UytJQHLQYfvYWsGrGY:jki.re/$12bTUiObDFdHLAYtT7E-BvYRp3k_xv8w0dUQHibasJk?via=jki.re&via=matrix.org * Make it possible to get the room_version with txn * Allow but ignore historical events in unsupported room version See https://github.com/matrix-org/synapse/pull/10245#discussion_r675592489 We can't reject historical events on unsupported room versions because homeservers without knowledge of MSC2716 or the new room version don't reject historical events either. Since we can't rely on the auth check here to stop historical events on unsupported room versions, I've added some additional checks in the processing/persisting code (`synapse/storage/databases/main/events.py` -> `_handle_insertion_event` and `_handle_chunk_event`). I've had to do some refactoring so there is method to fetch the room version by `txn`. * Move to unique index syntax See https://github.com/matrix-org/synapse/pull/10245#discussion_r675638509 * High-level document how the insertion->chunk lookup works * Remove create_event fallback for room_versions See https://github.com/matrix-org/synapse/pull/10245/files#r677641879 * Use updated method name
This commit is contained in:
parent
8c201c97ec
commit
d0b294ad97
|
@ -0,0 +1 @@
|
||||||
|
Make historical events discoverable from backfill for servers without any scrollback history (part of MSC2716).
|
|
@ -206,9 +206,6 @@ class EventContentFields:
|
||||||
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
|
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
|
||||||
# For "marker" events
|
# For "marker" events
|
||||||
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
|
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
|
||||||
MSC2716_MARKER_INSERTION_PREV_EVENTS = (
|
|
||||||
"org.matrix.msc2716.marker.insertion_prev_events"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class RoomTypes:
|
class RoomTypes:
|
||||||
|
|
|
@ -73,6 +73,9 @@ class RoomVersion:
|
||||||
# MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
|
# MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
|
||||||
# m.room.membership event with membership 'knock'.
|
# m.room.membership event with membership 'knock'.
|
||||||
msc2403_knocking = attr.ib(type=bool)
|
msc2403_knocking = attr.ib(type=bool)
|
||||||
|
# MSC2716: Adds m.room.power_levels -> content.historical field to control
|
||||||
|
# whether "insertion", "chunk", "marker" events can be sent
|
||||||
|
msc2716_historical = attr.ib(type=bool)
|
||||||
|
|
||||||
|
|
||||||
class RoomVersions:
|
class RoomVersions:
|
||||||
|
@ -88,6 +91,7 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=False,
|
msc2176_redaction_rules=False,
|
||||||
msc3083_join_rules=False,
|
msc3083_join_rules=False,
|
||||||
msc2403_knocking=False,
|
msc2403_knocking=False,
|
||||||
|
msc2716_historical=False,
|
||||||
)
|
)
|
||||||
V2 = RoomVersion(
|
V2 = RoomVersion(
|
||||||
"2",
|
"2",
|
||||||
|
@ -101,6 +105,7 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=False,
|
msc2176_redaction_rules=False,
|
||||||
msc3083_join_rules=False,
|
msc3083_join_rules=False,
|
||||||
msc2403_knocking=False,
|
msc2403_knocking=False,
|
||||||
|
msc2716_historical=False,
|
||||||
)
|
)
|
||||||
V3 = RoomVersion(
|
V3 = RoomVersion(
|
||||||
"3",
|
"3",
|
||||||
|
@ -114,6 +119,7 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=False,
|
msc2176_redaction_rules=False,
|
||||||
msc3083_join_rules=False,
|
msc3083_join_rules=False,
|
||||||
msc2403_knocking=False,
|
msc2403_knocking=False,
|
||||||
|
msc2716_historical=False,
|
||||||
)
|
)
|
||||||
V4 = RoomVersion(
|
V4 = RoomVersion(
|
||||||
"4",
|
"4",
|
||||||
|
@ -127,6 +133,7 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=False,
|
msc2176_redaction_rules=False,
|
||||||
msc3083_join_rules=False,
|
msc3083_join_rules=False,
|
||||||
msc2403_knocking=False,
|
msc2403_knocking=False,
|
||||||
|
msc2716_historical=False,
|
||||||
)
|
)
|
||||||
V5 = RoomVersion(
|
V5 = RoomVersion(
|
||||||
"5",
|
"5",
|
||||||
|
@ -140,6 +147,7 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=False,
|
msc2176_redaction_rules=False,
|
||||||
msc3083_join_rules=False,
|
msc3083_join_rules=False,
|
||||||
msc2403_knocking=False,
|
msc2403_knocking=False,
|
||||||
|
msc2716_historical=False,
|
||||||
)
|
)
|
||||||
V6 = RoomVersion(
|
V6 = RoomVersion(
|
||||||
"6",
|
"6",
|
||||||
|
@ -153,6 +161,7 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=False,
|
msc2176_redaction_rules=False,
|
||||||
msc3083_join_rules=False,
|
msc3083_join_rules=False,
|
||||||
msc2403_knocking=False,
|
msc2403_knocking=False,
|
||||||
|
msc2716_historical=False,
|
||||||
)
|
)
|
||||||
MSC2176 = RoomVersion(
|
MSC2176 = RoomVersion(
|
||||||
"org.matrix.msc2176",
|
"org.matrix.msc2176",
|
||||||
|
@ -166,6 +175,7 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=True,
|
msc2176_redaction_rules=True,
|
||||||
msc3083_join_rules=False,
|
msc3083_join_rules=False,
|
||||||
msc2403_knocking=False,
|
msc2403_knocking=False,
|
||||||
|
msc2716_historical=False,
|
||||||
)
|
)
|
||||||
MSC3083 = RoomVersion(
|
MSC3083 = RoomVersion(
|
||||||
"org.matrix.msc3083.v2",
|
"org.matrix.msc3083.v2",
|
||||||
|
@ -179,6 +189,7 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=False,
|
msc2176_redaction_rules=False,
|
||||||
msc3083_join_rules=True,
|
msc3083_join_rules=True,
|
||||||
msc2403_knocking=False,
|
msc2403_knocking=False,
|
||||||
|
msc2716_historical=False,
|
||||||
)
|
)
|
||||||
V7 = RoomVersion(
|
V7 = RoomVersion(
|
||||||
"7",
|
"7",
|
||||||
|
@ -192,6 +203,21 @@ class RoomVersions:
|
||||||
msc2176_redaction_rules=False,
|
msc2176_redaction_rules=False,
|
||||||
msc3083_join_rules=False,
|
msc3083_join_rules=False,
|
||||||
msc2403_knocking=True,
|
msc2403_knocking=True,
|
||||||
|
msc2716_historical=False,
|
||||||
|
)
|
||||||
|
MSC2716 = RoomVersion(
|
||||||
|
"org.matrix.msc2716",
|
||||||
|
RoomDisposition.STABLE,
|
||||||
|
EventFormatVersions.V3,
|
||||||
|
StateResolutionVersions.V2,
|
||||||
|
enforce_key_validity=True,
|
||||||
|
special_case_aliases_auth=False,
|
||||||
|
strict_canonicaljson=True,
|
||||||
|
limit_notifications_power_levels=True,
|
||||||
|
msc2176_redaction_rules=False,
|
||||||
|
msc3083_join_rules=False,
|
||||||
|
msc2403_knocking=True,
|
||||||
|
msc2716_historical=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -207,6 +233,7 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
|
||||||
RoomVersions.MSC2176,
|
RoomVersions.MSC2176,
|
||||||
RoomVersions.MSC3083,
|
RoomVersions.MSC3083,
|
||||||
RoomVersions.V7,
|
RoomVersions.V7,
|
||||||
|
RoomVersions.MSC2716,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,6 +205,13 @@ def check(
|
||||||
if event.type == EventTypes.Redaction:
|
if event.type == EventTypes.Redaction:
|
||||||
check_redaction(room_version_obj, event, auth_events)
|
check_redaction(room_version_obj, event, auth_events)
|
||||||
|
|
||||||
|
if (
|
||||||
|
event.type == EventTypes.MSC2716_INSERTION
|
||||||
|
or event.type == EventTypes.MSC2716_CHUNK
|
||||||
|
or event.type == EventTypes.MSC2716_MARKER
|
||||||
|
):
|
||||||
|
check_historical(room_version_obj, event, auth_events)
|
||||||
|
|
||||||
logger.debug("Allowing! %s", event)
|
logger.debug("Allowing! %s", event)
|
||||||
|
|
||||||
|
|
||||||
|
@ -539,6 +546,37 @@ def check_redaction(
|
||||||
raise AuthError(403, "You don't have permission to redact events")
|
raise AuthError(403, "You don't have permission to redact events")
|
||||||
|
|
||||||
|
|
||||||
|
def check_historical(
|
||||||
|
room_version_obj: RoomVersion,
|
||||||
|
event: EventBase,
|
||||||
|
auth_events: StateMap[EventBase],
|
||||||
|
) -> None:
|
||||||
|
"""Check whether the event sender is allowed to send historical related
|
||||||
|
events like "insertion", "chunk", and "marker".
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
AuthError if the event sender is not allowed to send historical related events
|
||||||
|
("insertion", "chunk", and "marker").
|
||||||
|
"""
|
||||||
|
# Ignore the auth checks in room versions that do not support historical
|
||||||
|
# events
|
||||||
|
if not room_version_obj.msc2716_historical:
|
||||||
|
return
|
||||||
|
|
||||||
|
user_level = get_user_power_level(event.user_id, auth_events)
|
||||||
|
|
||||||
|
historical_level = get_named_level(auth_events, "historical", 100)
|
||||||
|
|
||||||
|
if user_level < historical_level:
|
||||||
|
raise AuthError(
|
||||||
|
403,
|
||||||
|
'You don\'t have permission to send send historical related events ("insertion", "chunk", and "marker")',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _check_power_levels(
|
def _check_power_levels(
|
||||||
room_version_obj: RoomVersion,
|
room_version_obj: RoomVersion,
|
||||||
event: EventBase,
|
event: EventBase,
|
||||||
|
|
|
@ -126,6 +126,9 @@ def prune_event_dict(room_version: RoomVersion, event_dict: dict) -> dict:
|
||||||
if room_version.msc2176_redaction_rules:
|
if room_version.msc2176_redaction_rules:
|
||||||
add_fields("invite")
|
add_fields("invite")
|
||||||
|
|
||||||
|
if room_version.msc2716_historical:
|
||||||
|
add_fields("historical")
|
||||||
|
|
||||||
elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
|
elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
|
||||||
add_fields("aliases")
|
add_fields("aliases")
|
||||||
elif event_type == EventTypes.RoomHistoryVisibility:
|
elif event_type == EventTypes.RoomHistoryVisibility:
|
||||||
|
|
|
@ -2748,9 +2748,11 @@ class FederationHandler(BaseHandler):
|
||||||
event.event_id,
|
event.event_id,
|
||||||
e.event_id,
|
e.event_id,
|
||||||
)
|
)
|
||||||
context = await self.state_handler.compute_event_context(e)
|
missing_auth_event_context = (
|
||||||
|
await self.state_handler.compute_event_context(e)
|
||||||
|
)
|
||||||
await self._auth_and_persist_event(
|
await self._auth_and_persist_event(
|
||||||
origin, e, context, auth_events=auth
|
origin, e, missing_auth_event_context, auth_events=auth
|
||||||
)
|
)
|
||||||
|
|
||||||
if e.event_id in event_auth_events:
|
if e.event_id in event_auth_events:
|
||||||
|
|
|
@ -951,6 +951,7 @@ class RoomCreationHandler(BaseHandler):
|
||||||
"kick": 50,
|
"kick": 50,
|
||||||
"redact": 50,
|
"redact": 50,
|
||||||
"invite": 50,
|
"invite": 50,
|
||||||
|
"historical": 100,
|
||||||
}
|
}
|
||||||
|
|
||||||
if config["original_invitees_have_ops"]:
|
if config["original_invitees_have_ops"]:
|
||||||
|
|
|
@ -504,7 +504,6 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
|
||||||
|
|
||||||
events_to_create = body["events"]
|
events_to_create = body["events"]
|
||||||
|
|
||||||
prev_event_ids = prev_events_from_query
|
|
||||||
inherited_depth = await self._inherit_depth_from_prev_ids(
|
inherited_depth = await self._inherit_depth_from_prev_ids(
|
||||||
prev_events_from_query
|
prev_events_from_query
|
||||||
)
|
)
|
||||||
|
@ -516,6 +515,10 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
|
||||||
chunk_id_to_connect_to = chunk_id_from_query
|
chunk_id_to_connect_to = chunk_id_from_query
|
||||||
base_insertion_event = None
|
base_insertion_event = None
|
||||||
if chunk_id_from_query:
|
if chunk_id_from_query:
|
||||||
|
# All but the first base insertion event should point at a fake
|
||||||
|
# event, which causes the HS to ask for the state at the start of
|
||||||
|
# the chunk later.
|
||||||
|
prev_event_ids = [fake_prev_event_id]
|
||||||
# TODO: Verify the chunk_id_from_query corresponds to an insertion event
|
# TODO: Verify the chunk_id_from_query corresponds to an insertion event
|
||||||
pass
|
pass
|
||||||
# Otherwise, create an insertion event to act as a starting point.
|
# Otherwise, create an insertion event to act as a starting point.
|
||||||
|
@ -526,6 +529,8 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
|
||||||
# an insertion event), in which case we just create a new insertion event
|
# an insertion event), in which case we just create a new insertion event
|
||||||
# that can then get pointed to by a "marker" event later.
|
# that can then get pointed to by a "marker" event later.
|
||||||
else:
|
else:
|
||||||
|
prev_event_ids = prev_events_from_query
|
||||||
|
|
||||||
base_insertion_event_dict = self._create_insertion_event_dict(
|
base_insertion_event_dict = self._create_insertion_event_dict(
|
||||||
sender=requester.user.to_string(),
|
sender=requester.user.to_string(),
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
|
|
|
@ -936,15 +936,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
# We want to make sure that we do a breadth-first, "depth" ordered
|
# We want to make sure that we do a breadth-first, "depth" ordered
|
||||||
# search.
|
# search.
|
||||||
|
|
||||||
query = (
|
# Look for the prev_event_id connected to the given event_id
|
||||||
"SELECT depth, prev_event_id FROM event_edges"
|
query = """
|
||||||
" INNER JOIN events"
|
SELECT depth, prev_event_id FROM event_edges
|
||||||
" ON prev_event_id = events.event_id"
|
/* Get the depth of the prev_event_id from the events table */
|
||||||
" WHERE event_edges.event_id = ?"
|
INNER JOIN events
|
||||||
" AND event_edges.is_state = ?"
|
ON prev_event_id = events.event_id
|
||||||
" LIMIT ?"
|
/* Find an event which matches the given event_id */
|
||||||
)
|
WHERE event_edges.event_id = ?
|
||||||
|
AND event_edges.is_state = ?
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Look for the "insertion" events connected to the given event_id
|
||||||
|
connected_insertion_event_query = """
|
||||||
|
SELECT e.depth, i.event_id FROM insertion_event_edges AS i
|
||||||
|
/* Get the depth of the insertion event from the events table */
|
||||||
|
INNER JOIN events AS e USING (event_id)
|
||||||
|
/* Find an insertion event which points via prev_events to the given event_id */
|
||||||
|
WHERE i.insertion_prev_event_id = ?
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Find any chunk connections of a given insertion event
|
||||||
|
chunk_connection_query = """
|
||||||
|
SELECT e.depth, c.event_id FROM insertion_events AS i
|
||||||
|
/* Find the chunk that connects to the given insertion event */
|
||||||
|
INNER JOIN chunk_events AS c
|
||||||
|
ON i.next_chunk_id = c.chunk_id
|
||||||
|
/* Get the depth of the chunk start event from the events table */
|
||||||
|
INNER JOIN events AS e USING (event_id)
|
||||||
|
/* Find an insertion event which matches the given event_id */
|
||||||
|
WHERE i.event_id = ?
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
# In a PriorityQueue, the lowest valued entries are retrieved first.
|
||||||
|
# We're using depth as the priority in the queue.
|
||||||
|
# Depth is lowest at the oldest-in-time message and highest and
|
||||||
|
# newest-in-time message. We add events to the queue with a negative depth so that
|
||||||
|
# we process the newest-in-time messages first going backwards in time.
|
||||||
queue = PriorityQueue()
|
queue = PriorityQueue()
|
||||||
|
|
||||||
for event_id in event_list:
|
for event_id in event_list:
|
||||||
|
@ -970,9 +1001,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
|
|
||||||
event_results.add(event_id)
|
event_results.add(event_id)
|
||||||
|
|
||||||
txn.execute(query, (event_id, False, limit - len(event_results)))
|
# Try and find any potential historical chunks of message history.
|
||||||
|
#
|
||||||
|
# First we look for an insertion event connected to the current
|
||||||
|
# event (by prev_event). If we find any, we need to go and try to
|
||||||
|
# find any chunk events connected to the insertion event (by
|
||||||
|
# chunk_id). If we find any, we'll add them to the queue and
|
||||||
|
# navigate up the DAG like normal in the next iteration of the loop.
|
||||||
|
txn.execute(
|
||||||
|
connected_insertion_event_query, (event_id, limit - len(event_results))
|
||||||
|
)
|
||||||
|
connected_insertion_event_id_results = txn.fetchall()
|
||||||
|
logger.debug(
|
||||||
|
"_get_backfill_events: connected_insertion_event_query %s",
|
||||||
|
connected_insertion_event_id_results,
|
||||||
|
)
|
||||||
|
for row in connected_insertion_event_id_results:
|
||||||
|
connected_insertion_event_depth = row[0]
|
||||||
|
connected_insertion_event = row[1]
|
||||||
|
queue.put((-connected_insertion_event_depth, connected_insertion_event))
|
||||||
|
|
||||||
for row in txn:
|
# Find any chunk connections for the given insertion event
|
||||||
|
txn.execute(
|
||||||
|
chunk_connection_query,
|
||||||
|
(connected_insertion_event, limit - len(event_results)),
|
||||||
|
)
|
||||||
|
chunk_start_event_id_results = txn.fetchall()
|
||||||
|
logger.debug(
|
||||||
|
"_get_backfill_events: chunk_start_event_id_results %s",
|
||||||
|
chunk_start_event_id_results,
|
||||||
|
)
|
||||||
|
for row in chunk_start_event_id_results:
|
||||||
|
if row[1] not in event_results:
|
||||||
|
queue.put((-row[0], row[1]))
|
||||||
|
|
||||||
|
# Navigate up the DAG by prev_event
|
||||||
|
txn.execute(query, (event_id, False, limit - len(event_results)))
|
||||||
|
prev_event_id_results = txn.fetchall()
|
||||||
|
logger.debug(
|
||||||
|
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
|
||||||
|
)
|
||||||
|
|
||||||
|
for row in prev_event_id_results:
|
||||||
if row[1] not in event_results:
|
if row[1] not in event_results:
|
||||||
queue.put((-row[0], row[1]))
|
queue.put((-row[0], row[1]))
|
||||||
|
|
||||||
|
|
|
@ -1502,6 +1502,9 @@ class PersistEventsStore:
|
||||||
|
|
||||||
self._handle_event_relations(txn, event)
|
self._handle_event_relations(txn, event)
|
||||||
|
|
||||||
|
self._handle_insertion_event(txn, event)
|
||||||
|
self._handle_chunk_event(txn, event)
|
||||||
|
|
||||||
# Store the labels for this event.
|
# Store the labels for this event.
|
||||||
labels = event.content.get(EventContentFields.LABELS)
|
labels = event.content.get(EventContentFields.LABELS)
|
||||||
if labels:
|
if labels:
|
||||||
|
@ -1754,6 +1757,94 @@ class PersistEventsStore:
|
||||||
if rel_type == RelationTypes.REPLACE:
|
if rel_type == RelationTypes.REPLACE:
|
||||||
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
|
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
|
||||||
|
|
||||||
|
def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
|
||||||
|
"""Handles keeping track of insertion events and edges/connections.
|
||||||
|
Part of MSC2716.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn: The database transaction object
|
||||||
|
event: The event to process
|
||||||
|
"""
|
||||||
|
|
||||||
|
if event.type != EventTypes.MSC2716_INSERTION:
|
||||||
|
# Not a insertion event
|
||||||
|
return
|
||||||
|
|
||||||
|
# Skip processing a insertion event if the room version doesn't
|
||||||
|
# support it.
|
||||||
|
room_version = self.store.get_room_version_txn(txn, event.room_id)
|
||||||
|
if not room_version.msc2716_historical:
|
||||||
|
return
|
||||||
|
|
||||||
|
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
|
||||||
|
if next_chunk_id is None:
|
||||||
|
# Invalid insertion event without next chunk ID
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
|
||||||
|
)
|
||||||
|
|
||||||
|
# Keep track of the insertion event and the chunk ID
|
||||||
|
self.db_pool.simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="insertion_events",
|
||||||
|
values={
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"next_chunk_id": next_chunk_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert an edge for every prev_event connection
|
||||||
|
for prev_event_id in event.prev_events:
|
||||||
|
self.db_pool.simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="insertion_event_edges",
|
||||||
|
values={
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"insertion_prev_event_id": prev_event_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
|
||||||
|
"""Handles inserting the chunk edges/connections between the chunk event
|
||||||
|
and an insertion event. Part of MSC2716.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn: The database transaction object
|
||||||
|
event: The event to process
|
||||||
|
"""
|
||||||
|
|
||||||
|
if event.type != EventTypes.MSC2716_CHUNK:
|
||||||
|
# Not a chunk event
|
||||||
|
return
|
||||||
|
|
||||||
|
# Skip processing a chunk event if the room version doesn't
|
||||||
|
# support it.
|
||||||
|
room_version = self.store.get_room_version_txn(txn, event.room_id)
|
||||||
|
if not room_version.msc2716_historical:
|
||||||
|
return
|
||||||
|
|
||||||
|
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
|
||||||
|
if chunk_id is None:
|
||||||
|
# Invalid chunk event without a chunk ID
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
|
||||||
|
|
||||||
|
# Keep track of the insertion event and the chunk ID
|
||||||
|
self.db_pool.simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="chunk_events",
|
||||||
|
values={
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"chunk_id": chunk_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
def _handle_redaction(self, txn, redacted_event_id):
|
def _handle_redaction(self, txn, redacted_event_id):
|
||||||
"""Handles receiving a redaction and checking whether we need to remove
|
"""Handles receiving a redaction and checking whether we need to remove
|
||||||
any redacted relations from the database.
|
any redacted relations from the database.
|
||||||
|
|
|
@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
|
||||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import DatabasePool
|
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
|
@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
async def get_room_version(self, room_id: str) -> RoomVersion:
|
async def get_room_version(self, room_id: str) -> RoomVersion:
|
||||||
"""Get the room_version of a given room
|
"""Get the room_version of a given room
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
NotFoundError: if the room is unknown
|
NotFoundError: if the room is unknown
|
||||||
|
|
||||||
UnsupportedRoomVersionError: if the room uses an unknown room version.
|
UnsupportedRoomVersionError: if the room uses an unknown room version.
|
||||||
Typically this happens if support for the room's version has been
|
Typically this happens if support for the room's version has been
|
||||||
removed from Synapse.
|
removed from Synapse.
|
||||||
"""
|
"""
|
||||||
room_version_id = await self.get_room_version_id(room_id)
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_room_version_txn",
|
||||||
|
self.get_room_version_txn,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_room_version_txn(
|
||||||
|
self, txn: LoggingTransaction, room_id: str
|
||||||
|
) -> RoomVersion:
|
||||||
|
"""Get the room_version of a given room
|
||||||
|
Args:
|
||||||
|
txn: Transaction object
|
||||||
|
room_id: The room_id of the room you are trying to get the version for
|
||||||
|
Raises:
|
||||||
|
NotFoundError: if the room is unknown
|
||||||
|
UnsupportedRoomVersionError: if the room uses an unknown room version.
|
||||||
|
Typically this happens if support for the room's version has been
|
||||||
|
removed from Synapse.
|
||||||
|
"""
|
||||||
|
room_version_id = self.get_room_version_id_txn(txn, room_id)
|
||||||
v = KNOWN_ROOM_VERSIONS.get(room_version_id)
|
v = KNOWN_ROOM_VERSIONS.get(room_version_id)
|
||||||
|
|
||||||
if not v:
|
if not v:
|
||||||
|
@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
@cached(max_entries=10000)
|
@cached(max_entries=10000)
|
||||||
async def get_room_version_id(self, room_id: str) -> str:
|
async def get_room_version_id(self, room_id: str) -> str:
|
||||||
"""Get the room_version of a given room
|
"""Get the room_version of a given room
|
||||||
|
Raises:
|
||||||
|
NotFoundError: if the room is unknown
|
||||||
|
"""
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_room_version_id_txn",
|
||||||
|
self.get_room_version_id_txn,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
|
||||||
|
"""Get the room_version of a given room
|
||||||
|
Args:
|
||||||
|
txn: Transaction object
|
||||||
|
room_id: The room_id of the room you are trying to get the version for
|
||||||
Raises:
|
Raises:
|
||||||
NotFoundError: if the room is unknown
|
NotFoundError: if the room is unknown
|
||||||
"""
|
"""
|
||||||
|
@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
# First we try looking up room version from the database, but for old
|
# First we try looking up room version from the database, but for old
|
||||||
# rooms we might not have added the room version to it yet so we fall
|
# rooms we might not have added the room version to it yet so we fall
|
||||||
# back to previous behaviour and look in current state events.
|
# back to previous behaviour and look in current state events.
|
||||||
|
#
|
||||||
# We really should have an entry in the rooms table for every room we
|
# We really should have an entry in the rooms table for every room we
|
||||||
# care about, but let's be a bit paranoid (at least while the background
|
# care about, but let's be a bit paranoid (at least while the background
|
||||||
# update is happening) to avoid breaking existing rooms.
|
# update is happening) to avoid breaking existing rooms.
|
||||||
version = await self.db_pool.simple_select_one_onecol(
|
room_version = self.db_pool.simple_select_one_onecol_txn(
|
||||||
|
txn,
|
||||||
table="rooms",
|
table="rooms",
|
||||||
keyvalues={"room_id": room_id},
|
keyvalues={"room_id": room_id},
|
||||||
retcol="room_version",
|
retcol="room_version",
|
||||||
desc="get_room_version",
|
|
||||||
allow_none=True,
|
allow_none=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if version is not None:
|
if room_version is None:
|
||||||
return version
|
raise NotFoundError("Could not room_version for %s" % (room_id,))
|
||||||
|
|
||||||
# Retrieve the room's create event
|
return room_version
|
||||||
create_event = await self.get_create_event_for_room(room_id)
|
|
||||||
return create_event.content.get("room_version", "1")
|
|
||||||
|
|
||||||
async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
|
async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
|
||||||
"""Get the predecessor of an upgraded room if it exists.
|
"""Get the predecessor of an upgraded room if it exists.
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Add a table that keeps track of "insertion" events and
|
||||||
|
-- their next_chunk_id's so we can navigate to the next chunk of history.
|
||||||
|
CREATE TABLE IF NOT EXISTS insertion_events(
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
next_chunk_id TEXT NOT NULL
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id);
|
||||||
|
|
||||||
|
-- Add a table that keeps track of all of the events we are inserting between.
|
||||||
|
-- We use this when navigating the DAG and when we hit an event which matches
|
||||||
|
-- `insertion_prev_event_id`, it should backfill from the "insertion" event and
|
||||||
|
-- navigate the historical messages from there.
|
||||||
|
CREATE TABLE IF NOT EXISTS insertion_event_edges(
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
insertion_prev_event_id TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
|
||||||
|
|
||||||
|
-- Add a table that keeps track of how each chunk is labeled. The chunks are
|
||||||
|
-- connected together based on an insertion events `next_chunk_id`.
|
||||||
|
CREATE TABLE IF NOT EXISTS chunk_events(
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
chunk_id TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS chunk_events_event_id ON chunk_events(event_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS chunk_events_chunk_id ON chunk_events(chunk_id);
|
Loading…
Reference in New Issue