diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4664cfe6d9..468a259a6a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,6 +27,7 @@ from synapse.api.errors import SynapseError from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict +from functools import wraps import synapse import synapse.metrics @@ -150,6 +151,27 @@ class _EventPeristenceQueue(object): _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) +def _retry_on_integrity_error(func): + """Wraps a database function so that it gets retried on IntegrityError, + with `delete_existing=True` passed in. + + Args: + func: function that returns a Deferred and accepts a `delete_existing` arg + """ + @wraps(func) + @defer.inlineCallbacks + def f(self, *args, **kwargs): + try: + res = yield func(self, *args, **kwargs) + defer.returnValue(res) + except self.database_engine.module.IntegrityError: + logger.exception("IntegrityError, retrying.") + res = yield func(self, *args, delete_existing=True, **kwargs) + defer.returnValue(res) + + return f + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -229,8 +251,10 @@ class EventsStore(SQLBaseStore): self._event_persist_queue.handle_queue(room_id, persisting_queue) + @_retry_on_integrity_error @defer.inlineCallbacks - def _persist_events(self, events_and_contexts, backfilled=False): + def _persist_events(self, events_and_contexts, backfilled=False, + delete_existing=False): if not events_and_contexts: return @@ -273,12 +297,15 @@ class EventsStore(SQLBaseStore): self._persist_events_txn, events_and_contexts=chunk, backfilled=backfilled, + delete_existing=delete_existing, ) persist_event_counter.inc_by(len(chunk)) + @_retry_on_integrity_error @defer.inlineCallbacks @log_function - def _persist_event(self, event, context, current_state=None, backfilled=False): + def _persist_event(self, event, context, current_state=None, backfilled=False, + delete_existing=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -291,6 +318,7 @@ class EventsStore(SQLBaseStore): context=context, current_state=current_state, backfilled=backfilled, + delete_existing=delete_existing, ) persist_event_counter.inc() except _RollbackButIsFineException: @@ -353,7 +381,8 @@ class EventsStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) @log_function - def _persist_event_txn(self, txn, event, context, current_state, backfilled=False): + def _persist_event_txn(self, txn, event, context, current_state, backfilled=False, + delete_existing=False): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: @@ -393,15 +422,20 @@ class EventsStore(SQLBaseStore): txn, [(event, context)], backfilled=backfilled, + delete_existing=delete_existing, ) @log_function - def _persist_events_txn(self, txn, events_and_contexts, backfilled): + def _persist_events_txn(self, txn, events_and_contexts, backfilled, + delete_existing=False): """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table, and the rejections table. Things reading from those table will need to check whether the event was rejected. + + If delete_existing is True then existing events will be purged from the + database before insertion. This is useful when retrying due to IntegrityError. """ # Ensure that we don't have the same event twice. # Pick the earliest non-outlier if there is one, else the earliest one. @@ -537,6 +571,33 @@ class EventsStore(SQLBaseStore): ] } + if delete_existing: + # For paranoia reasons, we go and delete all the existing entries + # for these events so we can reinsert them. + # This gets around any problems with some tables already having + # entries. + + for table in ( + "events", + "event_json", + "event_content_hashes", + "event_destinations", + "event_edge_hashes", + "event_edges", + "event_forward_extremities", + "event_push_actions", + "event_reference_hashes", + "event_search", + "event_signatures", + "event_to_state_groups", + "rejections", + "redactions", + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + [ev for ev, _ in events_and_contexts] + ) + self._simple_insert_many_txn( txn, table="event_json",