Implement background update for chunks
This commit is contained in:
parent
25ae0bf3ab
commit
34240a8d18
|
@ -202,6 +202,7 @@ def _retry_on_integrity_error(func):
|
|||
class EventsStore(EventsWorkerStore):
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(EventsStore, self).__init__(db_conn, hs)
|
||||
|
@ -242,6 +243,11 @@ class EventsStore(EventsWorkerStore):
|
|||
psql_only=True,
|
||||
)
|
||||
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_FIELDS_CHUNK,
|
||||
self._background_compute_chunks,
|
||||
)
|
||||
|
||||
self._event_persist_queue = _EventPeristenceQueue()
|
||||
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
|
@ -1834,6 +1840,69 @@ class EventsStore(EventsWorkerStore):
|
|||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_compute_chunks(self, progress, batch_size):
|
||||
up_to_stream_id = progress.get("up_to_stream_id")
|
||||
if up_to_stream_id is None:
|
||||
up_to_stream_id = self.get_current_events_token() + 1
|
||||
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
def reindex_chunks_txn(txn):
|
||||
txn.execute("""
|
||||
SELECT stream_ordering, room_id, event_id FROM events
|
||||
WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT ?
|
||||
""", (up_to_stream_id, False, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
|
||||
stream_ordering = up_to_stream_id
|
||||
for stream_ordering, room_id, event_id in rows:
|
||||
prev_events = self._simple_select_onecol_txn(
|
||||
txn,
|
||||
table="event_edges",
|
||||
keyvalues={
|
||||
"event_id": event_id,
|
||||
},
|
||||
retcol="prev_event_id",
|
||||
)
|
||||
|
||||
chunk_id, topo = self._compute_chunk_id_txn(
|
||||
txn, room_id, event_id, prev_events,
|
||||
)
|
||||
|
||||
self._simple_update_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"event_id": event_id},
|
||||
updatevalues={
|
||||
"chunk_id": chunk_id,
|
||||
"topological_ordering": topo,
|
||||
},
|
||||
)
|
||||
|
||||
progress = {
|
||||
"up_to_stream_id": stream_ordering,
|
||||
"rows_inserted": rows_inserted + len(rows)
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_FIELDS_CHUNK, progress
|
||||
)
|
||||
|
||||
return len(rows)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
def get_current_backfill_token(self):
|
||||
"""The current minimum token that backfilled events have reached"""
|
||||
return -self._backfill_id_gen.get_current_token()
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
ALTER TABLE events ADD COLUMN chunk_id BIGINT;
|
||||
|
||||
-- FIXME: Add index on contains_url
|
||||
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('events_chunk_index', '{}');
|
||||
|
||||
|
@ -80,3 +82,7 @@ INSERT INTO chunk_linearized (chunk_id, room_id, ordering)
|
|||
SELECT chunk_id, room_id, stream_ordering
|
||||
FROM event_forward_extremities
|
||||
INNER JOIN events USING (room_id, event_id);
|
||||
|
||||
|
||||
INSERT into background_updates (update_name, progress_json)
|
||||
VALUES ('event_fields_chunk_id', '{}');
|
||||
|
|
Loading…
Reference in New Issue