diff --git a/changelog.d/9411.misc b/changelog.d/9411.misc new file mode 100644 index 0000000000..c3e6cfa5f1 --- /dev/null +++ b/changelog.d/9411.misc @@ -0,0 +1 @@ +Preparatory steps for removing redundant `outlier` data from `event_json.internal_metadata` column. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 3ec4120f85..8f6b955d17 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -98,7 +98,7 @@ class DefaultDictProperty(DictProperty): class _EventInternalMetadata: - __slots__ = ["_dict", "stream_ordering"] + __slots__ = ["_dict", "stream_ordering", "outlier"] def __init__(self, internal_metadata_dict: JsonDict): # we have to copy the dict, because it turns out that the same dict is @@ -108,7 +108,10 @@ class _EventInternalMetadata: # the stream ordering of this event. None, until it has been persisted. self.stream_ordering = None # type: Optional[int] - outlier = DictProperty("outlier") # type: bool + # whether this event is an outlier (ie, whether we have the state at that point + # in the DAG) + self.outlier = False + out_of_band_membership = DictProperty("out_of_band_membership") # type: bool send_on_behalf_of = DictProperty("send_on_behalf_of") # type: str recheck_redaction = DictProperty("recheck_redaction") # type: bool @@ -129,7 +132,7 @@ class _EventInternalMetadata: return dict(self._dict) def is_outlier(self) -> bool: - return self._dict.get("outlier", False) + return self.outlier def is_out_of_band_membership(self) -> bool: """Whether this is an out of band membership, like an invite or an invite diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 7ca5c9940a..5022e0fcb3 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -54,6 +54,8 @@ def prune_event(event: EventBase) -> EventBase: event.internal_metadata.stream_ordering ) + pruned_event.internal_metadata.outlier = event.internal_metadata.outlier + # Mark the event as redacted pruned_event.internal_metadata.redacted = True diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 8af53b4f28..82ea3b895f 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -40,6 +40,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): // containing the event "event_format_version": .., // 1,2,3 etc: the event format version "internal_metadata": { .. serialized internal_metadata .. }, + "outlier": true|false, "rejected_reason": .., // The event.rejected_reason field "context": { .. serialized event context .. }, }], @@ -84,6 +85,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): "room_version": event.room_version.identifier, "event_format_version": event.format_version, "internal_metadata": event.internal_metadata.get_dict(), + "outlier": event.internal_metadata.is_outlier(), "rejected_reason": event.rejected_reason, "context": serialized_context, } @@ -116,6 +118,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): event = make_event_from_dict( event_dict, room_ver, internal_metadata, rejected_reason ) + event.internal_metadata.outlier = event_payload["outlier"] context = EventContext.deserialize( self.storage, event_payload["context"] diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 8fa104c8d3..a4c5b44292 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -40,6 +40,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): // containing the event "event_format_version": .., // 1,2,3 etc: the event format version "internal_metadata": { .. serialized internal_metadata .. }, + "outlier": true|false, "rejected_reason": .., // The event.rejected_reason field "context": { .. serialized event context .. }, "requester": { .. serialized requester .. }, @@ -79,7 +80,6 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): ratelimit (bool) extra_users (list(UserID)): Any extra users to notify about event """ - serialized_context = await context.serialize(event, store) payload = { @@ -87,6 +87,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): "room_version": event.room_version.identifier, "event_format_version": event.format_version, "internal_metadata": event.internal_metadata.get_dict(), + "outlier": event.internal_metadata.is_outlier(), "rejected_reason": event.rejected_reason, "context": serialized_context, "requester": requester.serialize(), @@ -108,6 +109,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): event = make_event_from_dict( event_dict, room_ver, internal_metadata, rejected_reason ) + event.internal_metadata.outlier = content["outlier"] requester = Requester.deserialize(self.store, content["requester"]) context = EventContext.deserialize(self.storage, content["context"]) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index cd1ceac50e..98dac19a95 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1270,8 +1270,10 @@ class PersistEventsStore: logger.exception("") raise + # update the stored internal_metadata to update the "outlier" flag. + # TODO: This is unused as of Synapse 1.31. Remove it once we are happy + # to drop backwards-compatibility with 1.30. metadata_json = json_encoder.encode(event.internal_metadata.get_dict()) - sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?" txn.execute(sql, (metadata_json, event.event_id)) @@ -1319,6 +1321,19 @@ class PersistEventsStore: d.pop("redacted_because", None) return d + def get_internal_metadata(event): + im = event.internal_metadata.get_dict() + + # temporary hack for database compatibility with Synapse 1.30 and earlier: + # store the `outlier` flag inside the internal_metadata json as well as in + # the `events` table, so that if anyone rolls back to an older Synapse, + # things keep working. This can be removed once we are happy to drop support + # for that + if event.internal_metadata.is_outlier(): + im["outlier"] = True + + return im + self.db_pool.simple_insert_many_txn( txn, table="event_json", @@ -1327,7 +1342,7 @@ class PersistEventsStore: "event_id": event.event_id, "room_id": event.room_id, "internal_metadata": json_encoder.encode( - event.internal_metadata.get_dict() + get_internal_metadata(event) ), "json": json_encoder.encode(event_dict(event)), "format_version": event.format_version, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c04e162ccc..952d4969b2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -799,6 +799,7 @@ class EventsWorkerStore(SQLBaseStore): rejected_reason=rejected_reason, ) original_ev.internal_metadata.stream_ordering = row["stream_ordering"] + original_ev.internal_metadata.outlier = row["outlier"] event_map[event_id] = original_ev @@ -905,7 +906,8 @@ class EventsWorkerStore(SQLBaseStore): ej.json, ej.format_version, r.room_version, - rej.reason + rej.reason, + e.outlier FROM events AS e JOIN event_json AS ej USING (event_id) LEFT JOIN rooms r ON r.room_id = e.room_id @@ -929,6 +931,7 @@ class EventsWorkerStore(SQLBaseStore): "room_version_id": row[5], "rejected_reason": row[6], "redactions": [], + "outlier": row[7], } # check for redactions