Use special UPDATE syntax
This commit is contained in:
parent
b57dcb4b51
commit
8fae3d7b1e
|
@ -22,8 +22,8 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
ALTER_TABLE = """
|
ALTER_TABLE = """
|
||||||
ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT;
|
ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT DEFAULT 0;
|
||||||
ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT;
|
ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT DEFAULT 0;
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -169,33 +169,25 @@ class SearchStore(BackgroundUpdateStore):
|
||||||
self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress,
|
self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress,
|
||||||
)
|
)
|
||||||
|
|
||||||
INSERT_CLUMP_SIZE = 1000
|
|
||||||
|
|
||||||
def reindex_search_txn(txn):
|
def reindex_search_txn(txn):
|
||||||
sql = (
|
events_sql = (
|
||||||
"SELECT e.stream_ordering, e.origin_server_ts, event_id FROM events as e"
|
"SELECT stream_ordering, origin_server_ts, event_id FROM events"
|
||||||
" INNER JOIN event_search USING (room_id, event_id)"
|
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||||
" WHERE ? <= e.stream_ordering AND e.stream_ordering < ?"
|
" ORDER BY stream_ordering DESC"
|
||||||
" ORDER BY e.stream_ordering DESC"
|
|
||||||
" LIMIT ?"
|
" LIMIT ?"
|
||||||
)
|
)
|
||||||
|
|
||||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
|
||||||
|
|
||||||
rows = txn.fetchall()
|
|
||||||
if not rows:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
min_stream_id = rows[-1][0]
|
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"UPDATE event_search SET stream_ordering = ?, origin_server_ts = ?"
|
"UPDATE event_search AS es SET es.stream_ordering = e.stream_ordering,"
|
||||||
" WHERE event_id = ?"
|
" es.origin_server_ts = e.origin_server_ts"
|
||||||
)
|
" FROM (%s) AS e"
|
||||||
|
" WHERE e.event_id = es.event_id"
|
||||||
|
" RETURNING es.stream_ordering"
|
||||||
|
) % (events_sql,)
|
||||||
|
|
||||||
for index in range(0, len(rows), INSERT_CLUMP_SIZE):
|
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||||
clump = rows[index:index + INSERT_CLUMP_SIZE]
|
rows = txn.fetchall()
|
||||||
txn.executemany(sql, clump)
|
min_stream_id = rows[-1][0]
|
||||||
|
|
||||||
progress = {
|
progress = {
|
||||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||||
|
|
Loading…
Reference in New Issue