Correctly handle total/remaining counts in the presence of sent_transasctions table

This commit is contained in:
Erik Johnston 2015-04-28 11:16:44 +01:00
parent ce8b0b2868
commit 4a13ae7201
1 changed files with 168 additions and 124 deletions

View File

@ -125,6 +125,12 @@ class Store(object):
def execute(self, f, *args, **kwargs): def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs) return self.runInteraction(f.__name__, f, *args, **kwargs)
def execute_sql(self, sql, *args):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()
return self.runInteraction("execute_sql", r)
def insert_many_txn(self, txn, table, headers, rows): def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % ( sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table, table,
@ -146,119 +152,9 @@ class Porter(object):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.__dict__.update(kwargs) self.__dict__.update(kwargs)
def convert_rows(self, table, headers, rows):
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
bool_cols = [
i for i, h in enumerate(headers) if h in bool_col_names
]
def conv(j, col):
if j in bool_cols:
return bool(col)
return col
for i, row in enumerate(rows):
rows[i] = tuple(
self.postgres_store.database_engine.encode_parameter(
conv(j, col)
)
for j, col in enumerate(row)
if j > 0
)
@defer.inlineCallbacks @defer.inlineCallbacks
def setup_table(self, table): def setup_table(self, table):
def delete_all(txn): if table in APPEND_ONLY_TABLES:
txn.execute(
"DELETE FROM port_from_sqlite3 WHERE table_name = %s",
(table,)
)
txn.execute("TRUNCATE %s CASCADE" % (table,))
def get_table_size(txn):
txn.execute("SELECT count(*) FROM %s" % (table,))
size, = txn.fetchone()
return int(size)
if table == "sent_transactions":
# This is a big table, and we really only need some of the recent
# data
yield self.postgres_store.execute(delete_all)
# Only save things from the last day
yesterday = int(time.time()*1000) - 86400000
# And save the max transaction id from each destination
select = (
"SELECT rowid, * FROM sent_transactions WHERE rowid IN ("
"SELECT max(rowid) FROM sent_transactions"
" GROUP BY destination"
")"
)
def r(txn):
txn.execute(select)
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
ts_ind = headers.index('ts')
return headers, [r for r in rows if r[ts_ind] < yesterday]
headers, rows = yield self.sqlite_store.runInteraction(
"select", r,
)
self.convert_rows(table, headers, rows)
inserted_rows = len(rows)
max_inserted_rowid = max(r[0] for r in rows)
def insert(txn):
self.postgres_store.insert_many_txn(
txn, table, headers[1:], rows
)
yield self.postgres_store.execute(insert)
def get_start_id(txn):
txn.execute(
"SELECT rowid FROM sent_transactions WHERE ts >= ?"
" ORDER BY rowid ASC LIMIT 1",
(yesterday,)
)
rows = txn.fetchall()
if rows:
return rows[0][0]
else:
return 1
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "rowid": next_chunk}
)
def get_sent_table_size(txn):
txn.execute(
"SELECT count(*) FROM sent_transactions"
" WHERE ts >= ?",
(yesterday,)
)
size, = txn.fetchone()
return int(size)
table_size = yield self.sqlite_store.execute(
get_sent_table_size
)
table_size += inserted_rows
elif table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting. # It's safe to just carry on inserting.
next_chunk = yield self.postgres_store._simple_select_one_onecol( next_chunk = yield self.postgres_store._simple_select_one_onecol(
table="port_from_sqlite3", table="port_from_sqlite3",
@ -267,28 +163,47 @@ class Porter(object):
allow_none=True, allow_none=True,
) )
total_to_port = None
if next_chunk is None: if next_chunk is None:
if table == "sent_transactions":
next_chunk, already_ported, total_to_port = (
yield self._setup_sent_transactions()
)
else:
yield self.postgres_store._simple_insert( yield self.postgres_store._simple_insert(
table="port_from_sqlite3", table="port_from_sqlite3",
values={"table_name": table, "rowid": 1} values={"table_name": table, "rowid": 1}
) )
next_chunk = 1 next_chunk = 1
already_ported = 0
table_size = yield self.sqlite_store.execute(get_table_size) if total_to_port is None:
already_ported, total_to_port = yield self._get_total_count_to_port(
table, next_chunk
)
else: else:
def delete_all(txn):
txn.execute(
"DELETE FROM port_from_sqlite3 WHERE table_name = %s",
(table,)
)
txn.execute("TRUNCATE %s CASCADE" % (table,))
yield self.postgres_store.execute(delete_all) yield self.postgres_store.execute(delete_all)
self.postgres_store._simple_insert(
yield self.postgres_store._simple_insert(
table="port_from_sqlite3", table="port_from_sqlite3",
values={"table_name": table, "rowid": 0} values={"table_name": table, "rowid": 0}
) )
table_size = yield self.sqlite_store.execute(get_table_size)
next_chunk = 1 next_chunk = 1
postgres_size = yield self.postgres_store.execute(get_table_size) already_ported, total_to_port = yield self._get_total_count_to_port(
table, next_chunk
)
defer.returnValue((table, postgres_size, table_size, next_chunk)) defer.returnValue((table, already_ported, total_to_port, next_chunk))
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_table(self, table, postgres_size, table_size, next_chunk): def handle_table(self, table, postgres_size, table_size, next_chunk):
@ -315,7 +230,7 @@ class Porter(object):
if rows: if rows:
next_chunk = rows[-1][0] + 1 next_chunk = rows[-1][0] + 1
self.convert_rows(table, headers, rows) self._convert_rows(table, headers, rows)
def insert(txn): def insert(txn):
self.postgres_store.insert_many_txn( self.postgres_store.insert_many_txn(
@ -414,7 +329,7 @@ class Porter(object):
except Exception as e: except Exception as e:
logger.info("Failed to create port table: %s", e) logger.info("Failed to create port table: %s", e)
self.progress.set_state("Preparing tables") self.progress.set_state("Setting up")
# Set up tables. # Set up tables.
setup_res = yield defer.gatherResults( setup_res = yield defer.gatherResults(
@ -444,6 +359,135 @@ class Porter(object):
finally: finally:
reactor.stop() reactor.stop()
def _convert_rows(self, table, headers, rows):
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
bool_cols = [
i for i, h in enumerate(headers) if h in bool_col_names
]
def conv(j, col):
if j in bool_cols:
return bool(col)
return col
for i, row in enumerate(rows):
rows[i] = tuple(
self.postgres_store.database_engine.encode_parameter(
conv(j, col)
)
for j, col in enumerate(row)
if j > 0
)
@defer.inlineCallbacks
def _setup_sent_transactions(self):
# Only save things from the last day
yesterday = int(time.time()*1000) - 86400000
# And save the max transaction id from each destination
select = (
"SELECT rowid, * FROM sent_transactions WHERE rowid IN ("
"SELECT max(rowid) FROM sent_transactions"
" GROUP BY destination"
")"
)
def r(txn):
txn.execute(select)
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
ts_ind = headers.index('ts')
return headers, [r for r in rows if r[ts_ind] < yesterday]
headers, rows = yield self.sqlite_store.runInteraction(
"select", r,
)
self._convert_rows("sent_transactions", headers, rows)
inserted_rows = len(rows)
max_inserted_rowid = max(r[0] for r in rows)
def insert(txn):
self.postgres_store.insert_many_txn(
txn, "sent_transactions", headers[1:], rows
)
yield self.postgres_store.execute(insert)
def get_start_id(txn):
txn.execute(
"SELECT rowid FROM sent_transactions WHERE ts >= ?"
" ORDER BY rowid ASC LIMIT 1",
(yesterday,)
)
rows = txn.fetchall()
if rows:
return rows[0][0]
else:
return 1
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={"table_name": "sent_transactions", "rowid": next_chunk}
)
def get_sent_table_size(txn):
txn.execute(
"SELECT count(*) FROM sent_transactions"
" WHERE ts >= ?",
(yesterday,)
)
size, = txn.fetchone()
return int(size)
remaining_count = yield self.sqlite_store.execute(
get_sent_table_size
)
total_count = remaining_count + inserted_rows
defer.returnValue((next_chunk, remaining_count, total_count))
@defer.inlineCallbacks
def _get_remaining_count_to_port(self, table, next_chunk):
rows = yield self.sqlite_store.execute_sql(
"SELECT count(*) FROM %s WHERE rowid >= ?" % (table,),
next_chunk,
)
defer.returnValue(rows[0][0])
@defer.inlineCallbacks
def _get_already_ported_count(self, table):
rows = yield self.postgres_store.execute_sql(
"SELECT count(*) FROM %s" % (table,),
)
defer.returnValue(rows[0][0])
@defer.inlineCallbacks
def _get_total_count_to_port(self, table, next_chunk):
remaining, done = yield defer.gatherResults(
[
self._get_remaining_count_to_port(table, next_chunk),
self._get_already_ported_count(table),
],
consumeErrors=True,
)
remaining = int(remaining) if remaining else 0
done = int(done) if done else 0
defer.returnValue((done, remaining + done))
############################################## ##############################################
###### The following is simply UI stuff ###### ###### The following is simply UI stuff ######