Merge pull request #982 from matrix-org/erikj/fix_port_script
Port script: Handle the fact that some tables have negative rowid rows
This commit is contained in:
commit
ea0b767114
|
@ -34,7 +34,7 @@ logger = logging.getLogger("synapse_port_db")
|
|||
|
||||
|
||||
BOOLEAN_COLUMNS = {
|
||||
"events": ["processed", "outlier"],
|
||||
"events": ["processed", "outlier", "contains_url"],
|
||||
"rooms": ["is_public"],
|
||||
"event_edges": ["is_state"],
|
||||
"presence_list": ["accepted"],
|
||||
|
@ -92,8 +92,12 @@ class Store(object):
|
|||
|
||||
_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
|
||||
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
|
||||
_simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
|
||||
_simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
|
||||
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
|
||||
_simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"]
|
||||
_simple_select_one_onecol_txn = SQLBaseStore.__dict__[
|
||||
"_simple_select_one_onecol_txn"
|
||||
]
|
||||
|
||||
_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
|
||||
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
|
||||
|
@ -158,31 +162,40 @@ class Porter(object):
|
|||
def setup_table(self, table):
|
||||
if table in APPEND_ONLY_TABLES:
|
||||
# It's safe to just carry on inserting.
|
||||
next_chunk = yield self.postgres_store._simple_select_one_onecol(
|
||||
row = yield self.postgres_store._simple_select_one(
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": table},
|
||||
retcol="rowid",
|
||||
retcols=("forward_rowid", "backward_rowid"),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
total_to_port = None
|
||||
if next_chunk is None:
|
||||
if row is None:
|
||||
if table == "sent_transactions":
|
||||
next_chunk, already_ported, total_to_port = (
|
||||
forward_chunk, already_ported, total_to_port = (
|
||||
yield self._setup_sent_transactions()
|
||||
)
|
||||
backward_chunk = 0
|
||||
else:
|
||||
yield self.postgres_store._simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={"table_name": table, "rowid": 1}
|
||||
values={
|
||||
"table_name": table,
|
||||
"forward_rowid": 1,
|
||||
"backward_rowid": 0,
|
||||
}
|
||||
)
|
||||
|
||||
next_chunk = 1
|
||||
forward_chunk = 1
|
||||
backward_chunk = 0
|
||||
already_ported = 0
|
||||
else:
|
||||
forward_chunk = row["forward_rowid"]
|
||||
backward_chunk = row["backward_rowid"]
|
||||
|
||||
if total_to_port is None:
|
||||
already_ported, total_to_port = yield self._get_total_count_to_port(
|
||||
table, next_chunk
|
||||
table, forward_chunk, backward_chunk
|
||||
)
|
||||
else:
|
||||
def delete_all(txn):
|
||||
|
@ -196,46 +209,85 @@ class Porter(object):
|
|||
|
||||
yield self.postgres_store._simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={"table_name": table, "rowid": 0}
|
||||
values={
|
||||
"table_name": table,
|
||||
"forward_rowid": 1,
|
||||
"backward_rowid": 0,
|
||||
}
|
||||
)
|
||||
|
||||
next_chunk = 1
|
||||
forward_chunk = 1
|
||||
backward_chunk = 0
|
||||
|
||||
already_ported, total_to_port = yield self._get_total_count_to_port(
|
||||
table, next_chunk
|
||||
table, forward_chunk, backward_chunk
|
||||
)
|
||||
|
||||
defer.returnValue((table, already_ported, total_to_port, next_chunk))
|
||||
defer.returnValue(
|
||||
(table, already_ported, total_to_port, forward_chunk, backward_chunk)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_table(self, table, postgres_size, table_size, next_chunk):
|
||||
def handle_table(self, table, postgres_size, table_size, forward_chunk,
|
||||
backward_chunk):
|
||||
if not table_size:
|
||||
return
|
||||
|
||||
self.progress.add_table(table, postgres_size, table_size)
|
||||
|
||||
if table == "event_search":
|
||||
yield self.handle_search_table(postgres_size, table_size, next_chunk)
|
||||
yield self.handle_search_table(
|
||||
postgres_size, table_size, forward_chunk, backward_chunk
|
||||
)
|
||||
return
|
||||
|
||||
select = (
|
||||
forward_select = (
|
||||
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
|
||||
% (table,)
|
||||
)
|
||||
|
||||
backward_select = (
|
||||
"SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?"
|
||||
% (table,)
|
||||
)
|
||||
|
||||
do_forward = [True]
|
||||
do_backward = [True]
|
||||
|
||||
while True:
|
||||
def r(txn):
|
||||
txn.execute(select, (next_chunk, self.batch_size,))
|
||||
rows = txn.fetchall()
|
||||
forward_rows = []
|
||||
backward_rows = []
|
||||
if do_forward[0]:
|
||||
txn.execute(forward_select, (forward_chunk, self.batch_size,))
|
||||
forward_rows = txn.fetchall()
|
||||
if not forward_rows:
|
||||
do_forward[0] = False
|
||||
|
||||
if do_backward[0]:
|
||||
txn.execute(backward_select, (backward_chunk, self.batch_size,))
|
||||
backward_rows = txn.fetchall()
|
||||
if not backward_rows:
|
||||
do_backward[0] = False
|
||||
|
||||
if forward_rows or backward_rows:
|
||||
headers = [column[0] for column in txn.description]
|
||||
else:
|
||||
headers = None
|
||||
|
||||
return headers, rows
|
||||
return headers, forward_rows, backward_rows
|
||||
|
||||
headers, rows = yield self.sqlite_store.runInteraction("select", r)
|
||||
headers, frows, brows = yield self.sqlite_store.runInteraction(
|
||||
"select", r
|
||||
)
|
||||
|
||||
if rows:
|
||||
next_chunk = rows[-1][0] + 1
|
||||
if frows or brows:
|
||||
if frows:
|
||||
forward_chunk = max(row[0] for row in frows) + 1
|
||||
if brows:
|
||||
backward_chunk = min(row[0] for row in brows) - 1
|
||||
|
||||
rows = frows + brows
|
||||
self._convert_rows(table, headers, rows)
|
||||
|
||||
def insert(txn):
|
||||
|
@ -247,7 +299,10 @@ class Porter(object):
|
|||
txn,
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": table},
|
||||
updatevalues={"rowid": next_chunk},
|
||||
updatevalues={
|
||||
"forward_rowid": forward_chunk,
|
||||
"backward_rowid": backward_chunk,
|
||||
},
|
||||
)
|
||||
|
||||
yield self.postgres_store.execute(insert)
|
||||
|
@ -259,7 +314,8 @@ class Porter(object):
|
|||
return
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_search_table(self, postgres_size, table_size, next_chunk):
|
||||
def handle_search_table(self, postgres_size, table_size, forward_chunk,
|
||||
backward_chunk):
|
||||
select = (
|
||||
"SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
|
||||
" FROM event_search as es"
|
||||
|
@ -270,7 +326,7 @@ class Porter(object):
|
|||
|
||||
while True:
|
||||
def r(txn):
|
||||
txn.execute(select, (next_chunk, self.batch_size,))
|
||||
txn.execute(select, (forward_chunk, self.batch_size,))
|
||||
rows = txn.fetchall()
|
||||
headers = [column[0] for column in txn.description]
|
||||
|
||||
|
@ -279,7 +335,7 @@ class Porter(object):
|
|||
headers, rows = yield self.sqlite_store.runInteraction("select", r)
|
||||
|
||||
if rows:
|
||||
next_chunk = rows[-1][0] + 1
|
||||
forward_chunk = rows[-1][0] + 1
|
||||
|
||||
# We have to treat event_search differently since it has a
|
||||
# different structure in the two different databases.
|
||||
|
@ -312,7 +368,10 @@ class Porter(object):
|
|||
txn,
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": "event_search"},
|
||||
updatevalues={"rowid": next_chunk},
|
||||
updatevalues={
|
||||
"forward_rowid": forward_chunk,
|
||||
"backward_rowid": backward_chunk,
|
||||
},
|
||||
)
|
||||
|
||||
yield self.postgres_store.execute(insert)
|
||||
|
@ -324,7 +383,6 @@ class Porter(object):
|
|||
else:
|
||||
return
|
||||
|
||||
|
||||
def setup_db(self, db_config, database_engine):
|
||||
db_conn = database_engine.module.connect(
|
||||
**{
|
||||
|
@ -395,10 +453,32 @@ class Porter(object):
|
|||
txn.execute(
|
||||
"CREATE TABLE port_from_sqlite3 ("
|
||||
" table_name varchar(100) NOT NULL UNIQUE,"
|
||||
" rowid bigint NOT NULL"
|
||||
" forward_rowid bigint NOT NULL,"
|
||||
" backward_rowid bigint NOT NULL"
|
||||
")"
|
||||
)
|
||||
|
||||
# The old port script created a table with just a "rowid" column.
|
||||
# We want people to be able to rerun this script from an old port
|
||||
# so that they can pick up any missing events that were not
|
||||
# ported across.
|
||||
def alter_table(txn):
|
||||
txn.execute(
|
||||
"ALTER TABLE IF EXISTS port_from_sqlite3"
|
||||
" RENAME rowid TO forward_rowid"
|
||||
)
|
||||
txn.execute(
|
||||
"ALTER TABLE IF EXISTS port_from_sqlite3"
|
||||
" ADD backward_rowid bigint NOT NULL DEFAULT 0"
|
||||
)
|
||||
|
||||
try:
|
||||
yield self.postgres_store.runInteraction(
|
||||
"alter_table", alter_table
|
||||
)
|
||||
except Exception as e:
|
||||
logger.info("Failed to create port table: %s", e)
|
||||
|
||||
try:
|
||||
yield self.postgres_store.runInteraction(
|
||||
"create_port_table", create_port_table
|
||||
|
@ -514,7 +594,11 @@ class Porter(object):
|
|||
|
||||
yield self.postgres_store._simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={"table_name": "sent_transactions", "rowid": next_chunk}
|
||||
values={
|
||||
"table_name": "sent_transactions",
|
||||
"forward_rowid": next_chunk,
|
||||
"backward_rowid": 0,
|
||||
}
|
||||
)
|
||||
|
||||
def get_sent_table_size(txn):
|
||||
|
@ -535,13 +619,18 @@ class Porter(object):
|
|||
defer.returnValue((next_chunk, inserted_rows, total_count))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_remaining_count_to_port(self, table, next_chunk):
|
||||
rows = yield self.sqlite_store.execute_sql(
|
||||
def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
|
||||
frows = yield self.sqlite_store.execute_sql(
|
||||
"SELECT count(*) FROM %s WHERE rowid >= ?" % (table,),
|
||||
next_chunk,
|
||||
forward_chunk,
|
||||
)
|
||||
|
||||
defer.returnValue(rows[0][0])
|
||||
brows = yield self.sqlite_store.execute_sql(
|
||||
"SELECT count(*) FROM %s WHERE rowid <= ?" % (table,),
|
||||
backward_chunk,
|
||||
)
|
||||
|
||||
defer.returnValue(frows[0][0] + brows[0][0])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_already_ported_count(self, table):
|
||||
|
@ -552,10 +641,10 @@ class Porter(object):
|
|||
defer.returnValue(rows[0][0])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_total_count_to_port(self, table, next_chunk):
|
||||
def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
|
||||
remaining, done = yield defer.gatherResults(
|
||||
[
|
||||
self._get_remaining_count_to_port(table, next_chunk),
|
||||
self._get_remaining_count_to_port(table, forward_chunk, backward_chunk),
|
||||
self._get_already_ported_count(table),
|
||||
],
|
||||
consumeErrors=True,
|
||||
|
|
Loading…
Reference in New Issue