From a27fb7d5cac3cacc55a1c778f02d074d4115eea6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Oct 2019 11:05:48 +0100 Subject: [PATCH 1/8] Don't repeatedly attempt to censor events we don't have. Currently we don't set `have_censored` column if we don't have the target event of a redaction, which means we repeatedly attempt to censor the same non-existant event. When we persist non-redacted events we unset the `have_censored` column for any redactions that target said event. --- synapse/storage/events.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ddf7ab6479..5d56ceeab3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1389,6 +1389,22 @@ class EventsStore( ], ) + for event, _ in events_and_contexts: + if not event.internal_metadata.is_redacted(): + # If we're persisting an unredacted event we go and ensure + # that we mark any redactions that reference this event as + # requiring censoring. + self._simple_update_txn( + txn, + table="redactions", + keyvalues={ + "redacts": event.event_id, + }, + updatevalues={ + "have_censored": False, + } + ) + def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were rejected @@ -1589,7 +1605,7 @@ class EventsStore( sql = """ SELECT redact_event.event_id, redacts FROM redactions INNER JOIN events AS redact_event USING (event_id) - INNER JOIN events AS original_event ON ( + LEFT JOIN events AS original_event ON ( redact_event.room_id = original_event.room_id AND redacts = original_event.event_id ) From 898dde981b41a4dfb79b5830f17e6eb9871ef762 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Oct 2019 13:23:34 +0100 Subject: [PATCH 2/8] Add received_ts column to redactions. This will allow us to efficiently search for uncensored redactions in the DB before a given time. --- synapse/storage/events.py | 20 +++--- synapse/storage/events_bg_updates.py | 61 +++++++++++++++++++ .../schema/delta/56/redaction_censor2.sql | 20 ++++++ 3 files changed, 92 insertions(+), 9 deletions(-) create mode 100644 synapse/storage/schema/delta/56/redaction_censor2.sql diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d56ceeab3..3104815f1a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1397,12 +1397,8 @@ class EventsStore( self._simple_update_txn( txn, table="redactions", - keyvalues={ - "redacts": event.event_id, - }, - updatevalues={ - "have_censored": False, - } + keyvalues={"redacts": event.event_id}, + updatevalues={"have_censored": False}, ) def _store_rejected_events_txn(self, txn, events_and_contexts): @@ -1568,9 +1564,15 @@ class EventsStore( def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) - txn.execute( - "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts), + + self._simple_insert_txn( + txn, + table="redactions", + values={ + "event_id": event.event_id, + "redacts": event.redacts, + "received_ts": self._clock.time_msec(), + }, ) @defer.inlineCallbacks diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py index 6587f31e2b..5717baf48c 100644 --- a/synapse/storage/events_bg_updates.py +++ b/synapse/storage/events_bg_updates.py @@ -67,6 +67,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update ) + self.register_background_update_handler( + "redactions_received_ts", self._redactions_received_ts + ) + @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] @@ -397,3 +401,60 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): ) return num_handled + + @defer.inlineCallbacks + def _redactions_received_ts(self, progress, batch_size): + """Handles filling out the `received_ts` column in redactions. + """ + last_event_id = progress.get("last_event_id", "") + + def _redactions_received_ts_txn(txn): + # Fetch the set of event IDs that we want to update + sql = """ + SELECT event_id FROM redactions + WHERE event_id > ? + ORDER BY event_id ASC + LIMIT ? + """ + + txn.execute(sql, (last_event_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + upper_event_id, = rows[-1] + + # Update the redactions with the received_ts. + # + # Note: Not all events have an associated received_ts, so we + # fallback to using origin_server_ts. If we for some reason don't + # have an origin_server_ts, lets just use the current timestamp. + # + # We don't want to leave it null, as then we'll never try and + # censor those redactions. + sql = """ + UPDATE redactions + SET received_ts = ( + SELECT COALESCE(received_ts, origin_server_ts, ?) FROM events + WHERE events.event_id = redactions.event_id + ) + WHERE ? <= event_id AND event_id <= ? + """ + + txn.execute(sql, (self._clock.time_msec(), last_event_id, upper_event_id)) + + self._background_update_progress_txn( + txn, "redactions_received_ts", {"last_event_id": upper_event_id} + ) + + return len(rows) + + count = yield self.runInteraction( + "_redactions_received_ts", _redactions_received_ts_txn + ) + + if not count: + yield self._end_background_update("redactions_received_ts") + + return count diff --git a/synapse/storage/schema/delta/56/redaction_censor2.sql b/synapse/storage/schema/delta/56/redaction_censor2.sql new file mode 100644 index 0000000000..77a5eca499 --- /dev/null +++ b/synapse/storage/schema/delta/56/redaction_censor2.sql @@ -0,0 +1,20 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE redactions ADD COLUMN received_ts BIGINT; +CREATE INDEX redactions_have_censored_ts ON redactions(received_ts) WHERE not have_censored; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('redactions_received_ts', '{}'); From 5e8387af9e771ae42c7c8c4dc186000d862d3787 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Oct 2019 13:28:41 +0100 Subject: [PATCH 3/8] Use `received_ts` to find uncensored redacted events Joining against `events` and ordering by `stream_ordering` is inefficient as it forced scanning the entirety of the redactions table. This isn't the case if we use `redactions.received_ts` column as we can then use an index. --- synapse/storage/events.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3104815f1a..2e485c8644 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1589,36 +1589,29 @@ class EventsStore( if self.hs.config.redaction_retention_period is None: return - max_pos = yield self.find_first_stream_ordering_after_ts( - self._clock.time_msec() - self.hs.config.redaction_retention_period - ) + before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period # We fetch all redactions that: # 1. point to an event we have, - # 2. has a stream ordering from before the cut off, and + # 2. has a received_ts from before the cut off, and # 3. we haven't yet censored. # # This is limited to 100 events to ensure that we don't try and do too # much at once. We'll get called again so this should eventually catch # up. - # - # We use the range [-max_pos, max_pos] to handle backfilled events, - # which are given negative stream ordering. sql = """ - SELECT redact_event.event_id, redacts FROM redactions - INNER JOIN events AS redact_event USING (event_id) + SELECT redactions.event_id, redacts FROM redactions LEFT JOIN events AS original_event ON ( - redact_event.room_id = original_event.room_id - AND redacts = original_event.event_id + redacts = original_event.event_id ) WHERE NOT have_censored - AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ? - ORDER BY redact_event.stream_ordering ASC + AND redactions.received_ts <= ? + ORDER BY redactions.received_ts ASC LIMIT ? """ rows = yield self._execute( - "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100 + "_censor_redactions_fetch", None, sql, before_ts, 100 ) updates = [] From 2b8352e6387a71f8bea8b512f1a491f1fedf06fc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Oct 2019 13:36:29 +0100 Subject: [PATCH 4/8] Newsfile --- changelog.d/6141.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6141.bugfix diff --git a/changelog.d/6141.bugfix b/changelog.d/6141.bugfix new file mode 100644 index 0000000000..c93920b7b5 --- /dev/null +++ b/changelog.d/6141.bugfix @@ -0,0 +1 @@ +Fix bad performance of censoring redactions background task. From ce7a3e7e27ba4215ed86112d2967b0acca4cfb77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 10:14:01 +0100 Subject: [PATCH 5/8] Fix fetching censored redactions from DB Fetching a censored redactions caused an exception due to the code expecting redactions to have a `redact` key, which redacted redactions don't have. --- synapse/storage/events_worker.py | 14 +++++++++++++ tests/storage/test_redaction.py | 36 ++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index c6fa7f82fd..57ce0304e9 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -238,6 +238,20 @@ class EventsWorkerStore(SQLBaseStore): # we have to recheck auth now. if not allow_rejected and entry.event.type == EventTypes.Redaction: + if not hasattr(entry.event, "redacts"): + # A redacted redaction doesn't have a `redacts` key, in + # which case lets just withhold the event. + # + # Note: Most of the time if the redactions has been + # redacted we still have the un-redacted event in the DB + # and so we'll still see the `redacts` key. However, this + # isn't always true e.g. if we have censored the event. + logger.debug( + "Withholding redaction event %s as we don't have redacts key", + event_id, + ) + continue + redacted_event_id = entry.event.redacts event_map = yield self._get_events_from_cache_or_db([redacted_event_id]) original_event_entry = event_map.get(redacted_event_id) diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index deecfad9fb..427d3c49c5 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -118,6 +118,8 @@ class RedactionTestCase(unittest.HomeserverTestCase): self.get_success(self.store.persist_event(event, context)) + return event + def test_redact(self): self.get_success( self.inject_room_member(self.room1, self.u_alice, Membership.JOIN) @@ -361,3 +363,37 @@ class RedactionTestCase(unittest.HomeserverTestCase): ) self.assert_dict({"content": {}}, json.loads(event_json)) + + def test_redact_redaction(self): + """Tests that we can redact a redaction and can fetch it again. + """ + + self.get_success( + self.inject_room_member(self.room1, self.u_alice, Membership.JOIN) + ) + + msg_event = self.get_success(self.inject_message(self.room1, self.u_alice, "t")) + + first_redact_event = self.get_success( + self.inject_redaction( + self.room1, msg_event.event_id, self.u_alice, "Redacting message" + ) + ) + + self.get_success( + self.inject_redaction( + self.room1, + first_redact_event.event_id, + self.u_alice, + "Redacting redaction", + ) + ) + + # Now lets jump to the future where we have censored the redaction event + # in the DB. + self.reactor.advance(60 * 60 * 24 * 31) + + # We just want to check that fetching the event doesn't raise an exception. + self.get_success( + self.store.get_event(first_redact_event.event_id, allow_none=True) + ) From 33d4ebdf78149705aa4b73cabe593337619ca2a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 10:18:17 +0100 Subject: [PATCH 6/8] Newsfile --- changelog.d/6145.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6145.bugfix diff --git a/changelog.d/6145.bugfix b/changelog.d/6145.bugfix new file mode 100644 index 0000000000..9e0eb5dd4c --- /dev/null +++ b/changelog.d/6145.bugfix @@ -0,0 +1 @@ +Fix fetching censored redactions from DB, which caused APIs like initial sync to fail if it tried to include the censored redaction. From f44f1d2e8374b7250a8a68cf3a49e6d1ac63b0fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 10:36:27 +0100 Subject: [PATCH 7/8] Fix errors storing large retry intervals. We have set the max retry interval to a value larger than a postgres or sqlite int can hold, which caused exceptions when updating the destinations table. To fix postgres we need to change the column to a bigint, and for sqlite we lower the max interval to 2**62 (which is still incredibly long). --- ...stinations_retry_interval_type.sql.postgres | 18 ++++++++++++++++++ synapse/util/retryutils.py | 2 +- tests/storage/test_transactions.py | 11 +++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres diff --git a/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres b/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres new file mode 100644 index 0000000000..b9bbb18a91 --- /dev/null +++ b/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres @@ -0,0 +1,18 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- We want to store large retry intervals so we upgrade the column from INT +-- to BIGINT. We don't need to do this on SQLite. +ALTER TABLE destinations ALTER retry_interval SET DATA TYPE BIGINT; diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index a5f2fbef5c..af69587196 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -29,7 +29,7 @@ MIN_RETRY_INTERVAL = 10 * 60 * 1000 RETRY_MULTIPLIER = 5 # a cap on the backoff. (Essentially none) -MAX_RETRY_INTERVAL = 2 ** 63 +MAX_RETRY_INTERVAL = 2 ** 62 class NotRetryingDestination(Exception): diff --git a/tests/storage/test_transactions.py b/tests/storage/test_transactions.py index a771d5af29..8e817e2c7f 100644 --- a/tests/storage/test_transactions.py +++ b/tests/storage/test_transactions.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util.retryutils import MAX_RETRY_INTERVAL + from tests.unittest import HomeserverTestCase @@ -45,3 +47,12 @@ class TransactionStoreTestCase(HomeserverTestCase): """ d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100) self.get_success(d) + + def test_large_destination_retry(self): + d = self.store.set_destination_retry_timings( + "example.com", MAX_RETRY_INTERVAL, MAX_RETRY_INTERVAL, MAX_RETRY_INTERVAL + ) + self.get_success(d) + + d = self.store.get_destination_retry_timings("example.com") + self.get_success(d) From 2bc027ab71c33960e216ec194612a616a4aa11b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 10:41:29 +0100 Subject: [PATCH 8/8] Newsfile --- changelog.d/6146.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6146.bugfix diff --git a/changelog.d/6146.bugfix b/changelog.d/6146.bugfix new file mode 100644 index 0000000000..1dad801836 --- /dev/null +++ b/changelog.d/6146.bugfix @@ -0,0 +1 @@ +Fix exceptions when storing large retry intervals for down remote servers.