Add insertion_event_extremities table

This commit is contained in:
Eric Eastwood 2021-07-17 03:59:39 -05:00
parent d63c34c7e5
commit 2196ba527b
4 changed files with 28 additions and 6 deletions

View File

@ -944,7 +944,7 @@ class FederationHandler(BaseHandler):
marker_event,
)
await self.store.insert_backward_extremity(
await self.store.insert_insertion_extremity(
insertion_event_id, marker_event.room_id
)

View File

@ -692,7 +692,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
INNER JOIN event_backward_extremities as b USING (event_id)
INNER JOIN insertion_event_extremities as b USING (event_id)
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE b.room_id = ?
@ -1144,11 +1144,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
async def insert_backward_extremity(self, event_id: str, room_id: str) -> None:
def _insert_backward_extremity_txn(txn):
async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
def _insert_insertion_extremity_txn(txn):
self.db_pool.simple_insert_txn(
txn,
table="event_backward_extremities",
table="insertion_event_extremities",
values={
"event_id": event_id,
"room_id": room_id,
@ -1156,7 +1156,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
)
await self.db_pool.runInteraction(
"_insert_backward_extremity_txn", _insert_backward_extremity_txn
"_insert_insertion_extremity_txn", _insert_insertion_extremity_txn
)
async def insert_received_event_to_staging(

View File

@ -1831,6 +1831,18 @@ class PersistEventsStore:
},
)
# When we receive an event with a `chunk_id` referencing the
# `next_chunk_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
WHERE next_chunk_id = ?
)
"""
txn.execute(sql, (chunk_id,))
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.

View File

@ -40,6 +40,16 @@ CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_
CREATE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id);
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
-- Add a table that keeps track of which "insertion" events need to be backfilled
CREATE TABLE IF NOT EXISTS insertion_event_extremities(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
UNIQUE (event_id)
);
CREATE INDEX IF NOT EXISTS insertion_event_extremities_event_id ON insertion_event_extremities(event_id);
CREATE INDEX IF NOT EXISTS insertion_event_extremities_room_id ON insertion_event_extremities(room_id);
-- Add a table that keeps track of how each chunk is labeled. The chunks are
-- connected together based insertion points `next_chunk_id`.
CREATE TABLE IF NOT EXISTS chunk_edges(