diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 979fa22438..c443f7d2b6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore, self._group_updates_id_gen = StreamIdGenerator( db_conn, "local_group_updates", "stream_id", ) + self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id") if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3cb1abd2ab..5194c4a48d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,6 +23,7 @@ import simplejson as json from twisted.internet import defer from synapse.storage.events_worker import EventsWorkerStore +from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore from synapse.util.async import ObservableDeferred from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import ( @@ -1019,13 +1020,19 @@ class EventsStore(EventsWorkerStore): } ) - sql = ( - "UPDATE events SET outlier = ?" - " WHERE event_id = ?" + chunk_id, _ = self._insert_into_chunk_txn( + txn, event.room_id, event.event_id, + [eid for eid, _ in event.prev_events], ) - txn.execute( - sql, - (False, event.event_id,) + + self._simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event.event_id}, + updatevalues={ + "outlier": False, + "chunk_id": chunk_id, + }, ) # Update the event_backward_extremities table now that this @@ -1108,12 +1115,21 @@ class EventsStore(EventsWorkerStore): ], ) - self._simple_insert_many_txn( - txn, - table="events", - values=[ - { + for event, _ in events_and_contexts: + if event.internal_metadata.is_outlier(): + chunk_id, _topo = None, 0 + else: + chunk_id, _topo = self._insert_into_chunk_txn( + txn, event.room_id, event.event_id, + [eid for eid, _ in event.prev_events], + ) + + self._simple_insert_txn( + txn, + table="events", + values={ "stream_ordering": event.internal_metadata.stream_ordering, + "chunk_id": chunk_id, "topological_ordering": event.depth, "depth": event.depth, "event_id": event.event_id, @@ -1129,10 +1145,8 @@ class EventsStore(EventsWorkerStore): "url" in event.content and isinstance(event.content["url"], basestring) ), - } - for event, _ in events_and_contexts - ], - ) + }, + ) def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were @@ -1344,6 +1358,177 @@ class EventsStore(EventsWorkerStore): (event.event_id, event.redacts) ) + def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids): + """Computes the chunk ID and topological ordering for an event and + handles updating chunk_graph table. + + Args: + txn, + room_id (str) + event_id (str) + prev_event_ids (list[str]) + + Returns: + tuple[int, int]: Returns the chunk_id, topological_ordering for + the event + """ + + # We calculate the chunk for an event using the following rules: + # + # 1. If all prev events have the same chunk ID then use that chunk ID + # 2. If we have none of the prev events but do have events pointing to + # the event, then we use their chunk ID if: + # - They're all in the same chunk, and + # - All their prev events match the events being inserted + # 3. Otherwise, create a new chunk and use that + + # Set of chunks that the event refers to. Includes None if there were + # prev events that we don't have (or don't have a chunk for) + prev_chunk_ids = set() + + for eid in prev_event_ids: + chunk_id = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={"event_id": eid}, + retcol="chunk_id", + allow_none=True, + ) + + prev_chunk_ids.add(chunk_id) + + forward_events = self._simple_select_onecol_txn( + txn, + table="event_edges", + keyvalues={ + "prev_event_id": event_id, + "is_state": False, + }, + retcol="event_id", + ) + + # Set of chunks that refer to this event. + forward_chunk_ids = set() + + # All the prev_events of events in `forward_events`. + # Note that this will include the current event_id. + sibling_events = set() + for eid in forward_events: + chunk_id = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={"event_id": eid}, + retcol="chunk_id", + allow_none=True, + ) + + if chunk_id is not None: + # chunk_id can be None if it's an outlier + forward_chunk_ids.add(chunk_id) + + pes = self._simple_select_onecol_txn( + txn, + table="event_edges", + keyvalues={ + "event_id": eid, + "is_state": False, + }, + retcol="prev_event_id", + ) + + sibling_events.update(pes) + + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + ) + + # If there is only one previous chunk (and that isn't None), then this + # satisfies condition one. + if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids: + chunk_id = list(prev_chunk_ids)[0] + + # This event is being inserted at the end of the chunk + new_topo = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "room_id": room_id, + "chunk_id": chunk_id, + }, + retcol="MAX(topological_ordering)", + ) + new_topo += 1 + + # If there is only one forward chunk and only one sibling event (which + # would be the given event), then this satisfies condition two. + elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1: + chunk_id = list(forward_chunk_ids)[0] + + # This event is being inserted at the start of the chunk + new_topo = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "room_id": room_id, + "chunk_id": chunk_id, + }, + retcol="MIN(topological_ordering)", + ) + new_topo -= 1 + else: + chunk_id = self._chunk_id_gen.get_next() + new_topo = 0 + + # We've generated a new chunk, so we have to tell the + # ChunkDBOrderedListStore about that. + table.add_node(chunk_id) + + # We need to now update the database with any new edges between chunks + current_prev_ids = self._simple_select_onecol_txn( + txn, + table="chunk_graph", + keyvalues={ + "chunk_id": chunk_id, + }, + retcol="prev_id", + ) + + current_forward_ids = self._simple_select_onecol_txn( + txn, + table="chunk_graph", + keyvalues={ + "prev_id": chunk_id, + }, + retcol="chunk_id", + ) + + for pid in prev_chunk_ids: + if pid is not None and pid not in current_prev_ids and pid != chunk_id: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + table.add_edge(pid, chunk_id) + + for fid in forward_chunk_ids: + # Note that the edge direction is reversed than what you might + # expect. See ChunkDBOrderedListStore for more details. + if fid not in current_forward_ids and fid != chunk_id: + table.add_edge(chunk_id, fid) + + # We now need to update the backwards extremities for the chunks. + + txn.executemany(""" + INSERT INTO chunk_backwards_extremities (chunk_id, event_id) + SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events) + """, [(chunk_id, eid, eid) for eid in prev_event_ids]) + + self._simple_delete_txn( + txn, + table="chunk_backwards_extremities", + keyvalues={"event_id": event_id}, + ) + + return chunk_id, new_topo + @defer.inlineCallbacks def have_events_in_timeline(self, event_ids): """Given a list of event ids, check if we have already processed and