Merge pull request #3240 from matrix-org/erikj/events_chunks
Compute new chunks for new events
This commit is contained in:
commit
867132f28c
|
@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
self._group_updates_id_gen = StreamIdGenerator(
|
||||
db_conn, "local_group_updates", "stream_id",
|
||||
)
|
||||
self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id")
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
self._cache_id_gen = StreamIdGenerator(
|
||||
|
|
|
@ -23,6 +23,7 @@ import simplejson as json
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import (
|
||||
|
@ -1019,13 +1020,19 @@ class EventsStore(EventsWorkerStore):
|
|||
}
|
||||
)
|
||||
|
||||
sql = (
|
||||
"UPDATE events SET outlier = ?"
|
||||
" WHERE event_id = ?"
|
||||
chunk_id, _ = self._insert_into_chunk_txn(
|
||||
txn, event.room_id, event.event_id,
|
||||
[eid for eid, _ in event.prev_events],
|
||||
)
|
||||
txn.execute(
|
||||
sql,
|
||||
(False, event.event_id,)
|
||||
|
||||
self._simple_update_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"event_id": event.event_id},
|
||||
updatevalues={
|
||||
"outlier": False,
|
||||
"chunk_id": chunk_id,
|
||||
},
|
||||
)
|
||||
|
||||
# Update the event_backward_extremities table now that this
|
||||
|
@ -1108,12 +1115,21 @@ class EventsStore(EventsWorkerStore):
|
|||
],
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
for event, _ in events_and_contexts:
|
||||
if event.internal_metadata.is_outlier():
|
||||
chunk_id, _topo = None, 0
|
||||
else:
|
||||
chunk_id, _topo = self._insert_into_chunk_txn(
|
||||
txn, event.room_id, event.event_id,
|
||||
[eid for eid, _ in event.prev_events],
|
||||
)
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="events",
|
||||
values=[
|
||||
{
|
||||
values={
|
||||
"stream_ordering": event.internal_metadata.stream_ordering,
|
||||
"chunk_id": chunk_id,
|
||||
"topological_ordering": event.depth,
|
||||
"depth": event.depth,
|
||||
"event_id": event.event_id,
|
||||
|
@ -1129,9 +1145,7 @@ class EventsStore(EventsWorkerStore):
|
|||
"url" in event.content
|
||||
and isinstance(event.content["url"], basestring)
|
||||
),
|
||||
}
|
||||
for event, _ in events_and_contexts
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
def _store_rejected_events_txn(self, txn, events_and_contexts):
|
||||
|
@ -1344,6 +1358,177 @@ class EventsStore(EventsWorkerStore):
|
|||
(event.event_id, event.redacts)
|
||||
)
|
||||
|
||||
def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids):
|
||||
"""Computes the chunk ID and topological ordering for an event and
|
||||
handles updating chunk_graph table.
|
||||
|
||||
Args:
|
||||
txn,
|
||||
room_id (str)
|
||||
event_id (str)
|
||||
prev_event_ids (list[str])
|
||||
|
||||
Returns:
|
||||
tuple[int, int]: Returns the chunk_id, topological_ordering for
|
||||
the event
|
||||
"""
|
||||
|
||||
# We calculate the chunk for an event using the following rules:
|
||||
#
|
||||
# 1. If all prev events have the same chunk ID then use that chunk ID
|
||||
# 2. If we have none of the prev events but do have events pointing to
|
||||
# the event, then we use their chunk ID if:
|
||||
# - They're all in the same chunk, and
|
||||
# - All their prev events match the events being inserted
|
||||
# 3. Otherwise, create a new chunk and use that
|
||||
|
||||
# Set of chunks that the event refers to. Includes None if there were
|
||||
# prev events that we don't have (or don't have a chunk for)
|
||||
prev_chunk_ids = set()
|
||||
|
||||
for eid in prev_event_ids:
|
||||
chunk_id = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"event_id": eid},
|
||||
retcol="chunk_id",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
prev_chunk_ids.add(chunk_id)
|
||||
|
||||
forward_events = self._simple_select_onecol_txn(
|
||||
txn,
|
||||
table="event_edges",
|
||||
keyvalues={
|
||||
"prev_event_id": event_id,
|
||||
"is_state": False,
|
||||
},
|
||||
retcol="event_id",
|
||||
)
|
||||
|
||||
# Set of chunks that refer to this event.
|
||||
forward_chunk_ids = set()
|
||||
|
||||
# All the prev_events of events in `forward_events`.
|
||||
# Note that this will include the current event_id.
|
||||
sibling_events = set()
|
||||
for eid in forward_events:
|
||||
chunk_id = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"event_id": eid},
|
||||
retcol="chunk_id",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if chunk_id is not None:
|
||||
# chunk_id can be None if it's an outlier
|
||||
forward_chunk_ids.add(chunk_id)
|
||||
|
||||
pes = self._simple_select_onecol_txn(
|
||||
txn,
|
||||
table="event_edges",
|
||||
keyvalues={
|
||||
"event_id": eid,
|
||||
"is_state": False,
|
||||
},
|
||||
retcol="prev_event_id",
|
||||
)
|
||||
|
||||
sibling_events.update(pes)
|
||||
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock,
|
||||
)
|
||||
|
||||
# If there is only one previous chunk (and that isn't None), then this
|
||||
# satisfies condition one.
|
||||
if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids:
|
||||
chunk_id = list(prev_chunk_ids)[0]
|
||||
|
||||
# This event is being inserted at the end of the chunk
|
||||
new_topo = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"chunk_id": chunk_id,
|
||||
},
|
||||
retcol="MAX(topological_ordering)",
|
||||
)
|
||||
new_topo += 1
|
||||
|
||||
# If there is only one forward chunk and only one sibling event (which
|
||||
# would be the given event), then this satisfies condition two.
|
||||
elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1:
|
||||
chunk_id = list(forward_chunk_ids)[0]
|
||||
|
||||
# This event is being inserted at the start of the chunk
|
||||
new_topo = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"chunk_id": chunk_id,
|
||||
},
|
||||
retcol="MIN(topological_ordering)",
|
||||
)
|
||||
new_topo -= 1
|
||||
else:
|
||||
chunk_id = self._chunk_id_gen.get_next()
|
||||
new_topo = 0
|
||||
|
||||
# We've generated a new chunk, so we have to tell the
|
||||
# ChunkDBOrderedListStore about that.
|
||||
table.add_node(chunk_id)
|
||||
|
||||
# We need to now update the database with any new edges between chunks
|
||||
current_prev_ids = self._simple_select_onecol_txn(
|
||||
txn,
|
||||
table="chunk_graph",
|
||||
keyvalues={
|
||||
"chunk_id": chunk_id,
|
||||
},
|
||||
retcol="prev_id",
|
||||
)
|
||||
|
||||
current_forward_ids = self._simple_select_onecol_txn(
|
||||
txn,
|
||||
table="chunk_graph",
|
||||
keyvalues={
|
||||
"prev_id": chunk_id,
|
||||
},
|
||||
retcol="chunk_id",
|
||||
)
|
||||
|
||||
for pid in prev_chunk_ids:
|
||||
if pid is not None and pid not in current_prev_ids and pid != chunk_id:
|
||||
# Note that the edge direction is reversed than what you might
|
||||
# expect. See ChunkDBOrderedListStore for more details.
|
||||
table.add_edge(pid, chunk_id)
|
||||
|
||||
for fid in forward_chunk_ids:
|
||||
# Note that the edge direction is reversed than what you might
|
||||
# expect. See ChunkDBOrderedListStore for more details.
|
||||
if fid not in current_forward_ids and fid != chunk_id:
|
||||
table.add_edge(chunk_id, fid)
|
||||
|
||||
# We now need to update the backwards extremities for the chunks.
|
||||
|
||||
txn.executemany("""
|
||||
INSERT INTO chunk_backwards_extremities (chunk_id, event_id)
|
||||
SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events)
|
||||
""", [(chunk_id, eid, eid) for eid in prev_event_ids])
|
||||
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="chunk_backwards_extremities",
|
||||
keyvalues={"event_id": event_id},
|
||||
)
|
||||
|
||||
return chunk_id, new_topo
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def have_events_in_timeline(self, event_ids):
|
||||
"""Given a list of event ids, check if we have already processed and
|
||||
|
|
Loading…
Reference in New Issue