diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 24141f97ba..85d7fd0910 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -202,6 +202,7 @@ def _retry_on_integrity_error(func): class EventsStore(EventsWorkerStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" + EVENT_FIELDS_CHUNK = "event_fields_chunk_id" def __init__(self, db_conn, hs): super(EventsStore, self).__init__(db_conn, hs) @@ -242,6 +243,11 @@ class EventsStore(EventsWorkerStore): psql_only=True, ) + self.register_background_update_handler( + self.EVENT_FIELDS_CHUNK, + self._background_compute_chunks, + ) + self._event_persist_queue = _EventPeristenceQueue() self._state_resolution_handler = hs.get_state_resolution_handler() @@ -1834,6 +1840,69 @@ class EventsStore(EventsWorkerStore): defer.returnValue(result) + @defer.inlineCallbacks + def _background_compute_chunks(self, progress, batch_size): + up_to_stream_id = progress.get("up_to_stream_id") + if up_to_stream_id is None: + up_to_stream_id = self.get_current_events_token() + 1 + + rows_inserted = progress.get("rows_inserted", 0) + + def reindex_chunks_txn(txn): + txn.execute(""" + SELECT stream_ordering, room_id, event_id FROM events + WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL + ORDER BY stream_ordering DESC + LIMIT ? + """, (up_to_stream_id, False, batch_size)) + + rows = txn.fetchall() + + stream_ordering = up_to_stream_id + for stream_ordering, room_id, event_id in rows: + prev_events = self._simple_select_onecol_txn( + txn, + table="event_edges", + keyvalues={ + "event_id": event_id, + }, + retcol="prev_event_id", + ) + + chunk_id, topo = self._compute_chunk_id_txn( + txn, room_id, event_id, prev_events, + ) + + self._simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event_id}, + updatevalues={ + "chunk_id": chunk_id, + "topological_ordering": topo, + }, + ) + + progress = { + "up_to_stream_id": stream_ordering, + "rows_inserted": rows_inserted + len(rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_FIELDS_CHUNK, progress + ) + + return len(rows) + + result = yield self.runInteraction( + self.EVENT_FIELDS_CHUNK, reindex_chunks_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_FIELDS_CHUNK) + + defer.returnValue(result) + def get_current_backfill_token(self): """The current minimum token that backfilled events have reached""" return -self._backfill_id_gen.get_current_token() diff --git a/synapse/storage/schema/delta/49/event_chunks.sql b/synapse/storage/schema/delta/49/event_chunks.sql index e995a949ce..65ec19180b 100644 --- a/synapse/storage/schema/delta/49/event_chunks.sql +++ b/synapse/storage/schema/delta/49/event_chunks.sql @@ -15,6 +15,8 @@ ALTER TABLE events ADD COLUMN chunk_id BIGINT; +-- FIXME: Add index on contains_url + INSERT INTO background_updates (update_name, progress_json) VALUES ('events_chunk_index', '{}'); @@ -80,3 +82,7 @@ INSERT INTO chunk_linearized (chunk_id, room_id, ordering) SELECT chunk_id, room_id, stream_ordering FROM event_forward_extremities INNER JOIN events USING (room_id, event_id); + + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_fields_chunk_id', '{}');