Properly update the threads table when thread events are redacted. (#14248)

When the last event in a thread is redacted we need to update
the threads table:

* Find the new latest event in the thread and store it into the table; or
* Remove the thread from the table if it is no longer a thread (i.e. all
  events in the thread were redacted).
This commit is contained in:
Patrick Cloke 2022-10-21 09:11:19 -04:00 committed by GitHub
parent 7fe3b908a5
commit 4dd7aa371b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 131 additions and 45 deletions

1
changelog.d/14248.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.70.0rc1 where the information returned from the `/threads` API could be stale when threaded events are redacted.

View File

@ -2028,25 +2028,37 @@ class PersistEventsStore:
redacted_event_id: The event that was redacted. redacted_event_id: The event that was redacted.
""" """
# Fetch the current relation of the event being redacted. # Fetch the relation of the event being redacted.
redacted_relates_to = self.db_pool.simple_select_one_onecol_txn( row = self.db_pool.simple_select_one_txn(
txn, txn,
table="event_relations", table="event_relations",
keyvalues={"event_id": redacted_event_id}, keyvalues={"event_id": redacted_event_id},
retcol="relates_to_id", retcols=("relates_to_id", "relation_type"),
allow_none=True, allow_none=True,
) )
# Nothing to do if no relation is found.
if row is None:
return
redacted_relates_to = row["relates_to_id"]
rel_type = row["relation_type"]
self.db_pool.simple_delete_txn(
txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
)
# Any relation information for the related event must be cleared. # Any relation information for the related event must be cleared.
if redacted_relates_to is not None: self.store._invalidate_cache_and_stream(
self.store._invalidate_cache_and_stream( txn, self.store.get_relations_for_event, (redacted_relates_to,)
txn, self.store.get_relations_for_event, (redacted_relates_to,) )
) if rel_type == RelationTypes.ANNOTATION:
self.store._invalidate_cache_and_stream( self.store._invalidate_cache_and_stream(
txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,) txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,)
) )
if rel_type == RelationTypes.REPLACE:
self.store._invalidate_cache_and_stream( self.store._invalidate_cache_and_stream(
txn, self.store.get_applicable_edit, (redacted_relates_to,) txn, self.store.get_applicable_edit, (redacted_relates_to,)
) )
if rel_type == RelationTypes.THREAD:
self.store._invalidate_cache_and_stream( self.store._invalidate_cache_and_stream(
txn, self.store.get_thread_summary, (redacted_relates_to,) txn, self.store.get_thread_summary, (redacted_relates_to,)
) )
@ -2057,9 +2069,38 @@ class PersistEventsStore:
txn, self.store.get_threads, (room_id,) txn, self.store.get_threads, (room_id,)
) )
self.db_pool.simple_delete_txn( # Find the new latest event in the thread.
txn, table="event_relations", keyvalues={"event_id": redacted_event_id} sql = """
) SELECT event_id, topological_ordering, stream_ordering
FROM event_relations
INNER JOIN events USING (event_id)
WHERE relates_to_id = ? AND relation_type = ?
ORDER BY topological_ordering DESC, stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (redacted_relates_to, RelationTypes.THREAD))
# If a latest event is found, update the threads table, this might
# be the same current latest event (if an earlier event in the thread
# was redacted).
latest_event_row = txn.fetchone()
if latest_event_row:
self.db_pool.simple_upsert_txn(
txn,
table="threads",
keyvalues={"room_id": room_id, "thread_id": redacted_relates_to},
values={
"latest_event_id": latest_event_row[0],
"topological_ordering": latest_event_row[1],
"stream_ordering": latest_event_row[2],
},
)
# Otherwise, delete the thread: it no longer exists.
else:
self.db_pool.simple_delete_one_txn(
txn, table="threads", keyvalues={"thread_id": redacted_relates_to}
)
def _store_room_topic_txn(self, txn: LoggingTransaction, event: EventBase) -> None: def _store_room_topic_txn(self, txn: LoggingTransaction, event: EventBase) -> None:
if isinstance(event.content.get("topic"), str): if isinstance(event.content.get("topic"), str):

View File

@ -1523,6 +1523,26 @@ class RelationRedactionTestCase(BaseRelationsTestCase):
) )
self.assertEqual(200, channel.code, channel.json_body) self.assertEqual(200, channel.code, channel.json_body)
def _get_threads(self) -> List[Tuple[str, str]]:
"""Request the threads in the room and returns a list of thread ID and latest event ID."""
# Request the threads in the room.
channel = self.make_request(
"GET",
f"/_matrix/client/v1/rooms/{self.room}/threads",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
threads = channel.json_body["chunk"]
return [
(
t["event_id"],
t["unsigned"]["m.relations"][RelationTypes.THREAD]["latest_event"][
"event_id"
],
)
for t in threads
]
def test_redact_relation_annotation(self) -> None: def test_redact_relation_annotation(self) -> None:
""" """
Test that annotations of an event are properly handled after the Test that annotations of an event are properly handled after the
@ -1567,58 +1587,82 @@ class RelationRedactionTestCase(BaseRelationsTestCase):
The redacted event should not be included in bundled aggregations or The redacted event should not be included in bundled aggregations or
the response to relations. the response to relations.
""" """
channel = self._send_relation( # Create a thread with a few events in it.
RelationTypes.THREAD, thread_replies = []
EventTypes.Message, for i in range(3):
content={"body": "reply 1", "msgtype": "m.text"}, channel = self._send_relation(
) RelationTypes.THREAD,
unredacted_event_id = channel.json_body["event_id"] EventTypes.Message,
content={"body": f"reply {i}", "msgtype": "m.text"},
)
thread_replies.append(channel.json_body["event_id"])
# Note that the *last* event in the thread is redacted, as that gets ##################################################
# included in the bundled aggregation. # Check the test data is configured as expected. #
channel = self._send_relation( ##################################################
RelationTypes.THREAD, self.assertEquals(self._get_related_events(), list(reversed(thread_replies)))
EventTypes.Message,
content={"body": "reply 2", "msgtype": "m.text"},
)
to_redact_event_id = channel.json_body["event_id"]
# Both relations exist.
event_ids = self._get_related_events()
relations = self._get_bundled_aggregations() relations = self._get_bundled_aggregations()
self.assertEquals(event_ids, [to_redact_event_id, unredacted_event_id])
self.assertDictContainsSubset( self.assertDictContainsSubset(
{ {"count": 3, "current_user_participated": True},
"count": 2,
"current_user_participated": True,
},
relations[RelationTypes.THREAD], relations[RelationTypes.THREAD],
) )
# And the latest event returned is the event that will be redacted. # The latest event is the last sent event.
self.assertEqual( self.assertEqual(
relations[RelationTypes.THREAD]["latest_event"]["event_id"], relations[RelationTypes.THREAD]["latest_event"]["event_id"],
to_redact_event_id, thread_replies[-1],
) )
# Redact one of the reactions. # There should be one thread, the latest event is the event that will be redacted.
self._redact(to_redact_event_id) self.assertEqual(self._get_threads(), [(self.parent_id, thread_replies[-1])])
# The unredacted relation should still exist. ##########################
event_ids = self._get_related_events() # Redact the last event. #
##########################
self._redact(thread_replies.pop())
# The thread should still exist, but the latest event should be updated.
self.assertEquals(self._get_related_events(), list(reversed(thread_replies)))
relations = self._get_bundled_aggregations() relations = self._get_bundled_aggregations()
self.assertEquals(event_ids, [unredacted_event_id])
self.assertDictContainsSubset( self.assertDictContainsSubset(
{ {"count": 2, "current_user_participated": True},
"count": 1,
"current_user_participated": True,
},
relations[RelationTypes.THREAD], relations[RelationTypes.THREAD],
) )
# And the latest event is now the unredacted event. # And the latest event is the last unredacted event.
self.assertEqual( self.assertEqual(
relations[RelationTypes.THREAD]["latest_event"]["event_id"], relations[RelationTypes.THREAD]["latest_event"]["event_id"],
unredacted_event_id, thread_replies[-1],
) )
self.assertEqual(self._get_threads(), [(self.parent_id, thread_replies[-1])])
###########################################
# Redact the *first* event in the thread. #
###########################################
self._redact(thread_replies.pop(0))
# Nothing should have changed (except the thread count).
self.assertEquals(self._get_related_events(), thread_replies)
relations = self._get_bundled_aggregations()
self.assertDictContainsSubset(
{"count": 1, "current_user_participated": True},
relations[RelationTypes.THREAD],
)
# And the latest event is the last unredacted event.
self.assertEqual(
relations[RelationTypes.THREAD]["latest_event"]["event_id"],
thread_replies[-1],
)
self.assertEqual(self._get_threads(), [(self.parent_id, thread_replies[-1])])
####################################
# Redact the last remaining event. #
####################################
self._redact(thread_replies.pop(0))
self.assertEquals(thread_replies, [])
# The event should no longer be considered a thread.
self.assertEquals(self._get_related_events(), [])
self.assertEquals(self._get_bundled_aggregations(), {})
self.assertEqual(self._get_threads(), [])
def test_redact_parent_edit(self) -> None: def test_redact_parent_edit(self) -> None:
"""Test that edits of an event are redacted when the original event """Test that edits of an event are redacted when the original event