Don't port over all of the sent_transactions table
This commit is contained in:
parent
42b7139dec
commit
4e49f52375
|
@ -122,8 +122,8 @@ class Store(object):
|
||||||
|
|
||||||
return self.db_pool.runWithConnection(r)
|
return self.db_pool.runWithConnection(r)
|
||||||
|
|
||||||
def execute(self, f):
|
def execute(self, f, *args, **kwargs):
|
||||||
return self.runInteraction(f.__name__, f)
|
return self.runInteraction(f.__name__, f, *args, **kwargs)
|
||||||
|
|
||||||
def insert_many_txn(self, txn, table, headers, rows):
|
def insert_many_txn(self, txn, table, headers, rows):
|
||||||
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
|
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
|
||||||
|
@ -347,9 +347,118 @@ class Porter(object):
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
self.__dict__.update(kwargs)
|
self.__dict__.update(kwargs)
|
||||||
|
|
||||||
|
def convert_rows(self, table, headers, rows):
|
||||||
|
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
|
||||||
|
|
||||||
|
bool_cols = [
|
||||||
|
i for i, h in enumerate(headers) if h in bool_col_names
|
||||||
|
]
|
||||||
|
|
||||||
|
def conv(j, col):
|
||||||
|
if j in bool_cols:
|
||||||
|
return bool(col)
|
||||||
|
return col
|
||||||
|
|
||||||
|
for i, row in enumerate(rows):
|
||||||
|
rows[i] = tuple(
|
||||||
|
self.postgres_store.database_engine.encode_parameter(
|
||||||
|
conv(j, col)
|
||||||
|
)
|
||||||
|
for j, col in enumerate(row)
|
||||||
|
if j > 0
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def handle_table(self, table):
|
def handle_table(self, table):
|
||||||
if table in APPEND_ONLY_TABLES:
|
def delete_all(txn):
|
||||||
|
txn.execute(
|
||||||
|
"DELETE FROM port_from_sqlite3 WHERE table_name = %s",
|
||||||
|
(table,)
|
||||||
|
)
|
||||||
|
txn.execute("TRUNCATE %s CASCADE" % (table,))
|
||||||
|
|
||||||
|
def get_table_size(txn):
|
||||||
|
txn.execute("SELECT count(*) FROM %s" % (table,))
|
||||||
|
size, = txn.fetchone()
|
||||||
|
return int(size)
|
||||||
|
|
||||||
|
if table == "sent_transactions":
|
||||||
|
# This is a big table, and we really only need some of the recent
|
||||||
|
# data
|
||||||
|
yield self.postgres_store.execute(delete_all)
|
||||||
|
|
||||||
|
# Only save things from the last day
|
||||||
|
yesterday = 1429114568820 #int(time.time()*1000) - 86400000
|
||||||
|
|
||||||
|
# And save the max transaction id from each destination
|
||||||
|
select = (
|
||||||
|
"SELECT rowid, * FROM sent_transactions WHERE rowid IN ("
|
||||||
|
"SELECT max(rowid) FROM sent_transactions"
|
||||||
|
" GROUP BY destination"
|
||||||
|
")"
|
||||||
|
)
|
||||||
|
|
||||||
|
def r(txn):
|
||||||
|
txn.execute(select)
|
||||||
|
rows = txn.fetchall()
|
||||||
|
headers = [column[0] for column in txn.description]
|
||||||
|
|
||||||
|
ts_ind = headers.index('ts')
|
||||||
|
|
||||||
|
return headers, [r for r in rows if r[ts_ind] < yesterday]
|
||||||
|
|
||||||
|
headers, rows = yield self.sqlite_store.runInteraction(
|
||||||
|
"select", r,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.convert_rows(table, headers, rows)
|
||||||
|
|
||||||
|
inserted_rows = len(rows)
|
||||||
|
max_inserted_rowid = max(r[0] for r in rows)
|
||||||
|
|
||||||
|
def insert(txn):
|
||||||
|
self.postgres_store.insert_many_txn(
|
||||||
|
txn, table, headers[1:], rows
|
||||||
|
)
|
||||||
|
|
||||||
|
yield self.postgres_store.execute(insert)
|
||||||
|
|
||||||
|
def get_start_id(txn):
|
||||||
|
txn.execute(
|
||||||
|
"SELECT rowid FROM sent_transactions WHERE ts >= ?"
|
||||||
|
" ORDER BY rowid ASC LIMIT 1",
|
||||||
|
(yesterday,)
|
||||||
|
)
|
||||||
|
|
||||||
|
rows = txn.fetchall()
|
||||||
|
if rows:
|
||||||
|
return rows[0][0]
|
||||||
|
else:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
next_chunk = yield self.sqlite_store.execute(get_start_id)
|
||||||
|
next_chunk = max(max_inserted_rowid + 1, next_chunk)
|
||||||
|
|
||||||
|
yield self.postgres_store._simple_insert(
|
||||||
|
table="port_from_sqlite3",
|
||||||
|
values={"table_name": table, "rowid": next_chunk}
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_sent_table_size(txn):
|
||||||
|
txn.execute(
|
||||||
|
"SELECT count(*) FROM sent_transactions"
|
||||||
|
" WHERE ts >= ?",
|
||||||
|
(yesterday,)
|
||||||
|
)
|
||||||
|
size, = txn.fetchone()
|
||||||
|
return int(size)
|
||||||
|
|
||||||
|
table_size = yield self.sqlite_store.execute(
|
||||||
|
get_sent_table_size
|
||||||
|
)
|
||||||
|
|
||||||
|
table_size += inserted_rows
|
||||||
|
elif table in APPEND_ONLY_TABLES:
|
||||||
# It's safe to just carry on inserting.
|
# It's safe to just carry on inserting.
|
||||||
next_chunk = yield self.postgres_store._simple_select_one_onecol(
|
next_chunk = yield self.postgres_store._simple_select_one_onecol(
|
||||||
table="port_from_sqlite3",
|
table="port_from_sqlite3",
|
||||||
|
@ -365,28 +474,18 @@ class Porter(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
next_chunk = 1
|
next_chunk = 1
|
||||||
|
|
||||||
|
table_size = yield self.sqlite_store.execute(get_table_size)
|
||||||
else:
|
else:
|
||||||
def delete_all(txn):
|
yield self.postgres_store.execute(delete_all)
|
||||||
txn.execute(
|
self.postgres_store._simple_insert(
|
||||||
"DELETE FROM port_from_sqlite3 WHERE table_name = %s",
|
|
||||||
(table,)
|
|
||||||
)
|
|
||||||
txn.execute("TRUNCATE %s CASCADE" % (table,))
|
|
||||||
self.postgres_store._simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="port_from_sqlite3",
|
table="port_from_sqlite3",
|
||||||
values={"table_name": table, "rowid": 0}
|
values={"table_name": table, "rowid": 0}
|
||||||
)
|
)
|
||||||
yield self.postgres_store.execute(delete_all)
|
|
||||||
|
|
||||||
next_chunk = 1
|
|
||||||
|
|
||||||
def get_table_size(txn):
|
|
||||||
txn.execute("SELECT count(*) FROM %s" % (table,))
|
|
||||||
size, = txn.fetchone()
|
|
||||||
return int(size)
|
|
||||||
|
|
||||||
table_size = yield self.sqlite_store.execute(get_table_size)
|
table_size = yield self.sqlite_store.execute(get_table_size)
|
||||||
|
next_chunk = 1
|
||||||
|
|
||||||
postgres_size = yield self.postgres_store.execute(get_table_size)
|
postgres_size = yield self.postgres_store.execute(get_table_size)
|
||||||
|
|
||||||
if not table_size:
|
if not table_size:
|
||||||
|
@ -399,8 +498,6 @@ class Porter(object):
|
||||||
% (table,)
|
% (table,)
|
||||||
)
|
)
|
||||||
|
|
||||||
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
def r(txn):
|
def r(txn):
|
||||||
txn.execute(select, (next_chunk, self.batch_size,))
|
txn.execute(select, (next_chunk, self.batch_size,))
|
||||||
|
@ -412,24 +509,9 @@ class Porter(object):
|
||||||
headers, rows = yield self.sqlite_store.runInteraction("select", r)
|
headers, rows = yield self.sqlite_store.runInteraction("select", r)
|
||||||
|
|
||||||
if rows:
|
if rows:
|
||||||
bool_cols = [
|
|
||||||
i for i, h in enumerate(headers) if h in bool_col_names
|
|
||||||
]
|
|
||||||
next_chunk = rows[-1][0] + 1
|
next_chunk = rows[-1][0] + 1
|
||||||
|
|
||||||
def conv(j, col):
|
self.convert_rows(table, headers, rows)
|
||||||
if j in bool_cols:
|
|
||||||
return bool(col)
|
|
||||||
return col
|
|
||||||
|
|
||||||
for i, row in enumerate(rows):
|
|
||||||
rows[i] = tuple(
|
|
||||||
self.postgres_store.database_engine.encode_parameter(
|
|
||||||
conv(j, col)
|
|
||||||
)
|
|
||||||
for j, col in enumerate(row)
|
|
||||||
if j > 0
|
|
||||||
)
|
|
||||||
|
|
||||||
def insert(txn):
|
def insert(txn):
|
||||||
self.postgres_store.insert_many_txn(
|
self.postgres_store.insert_many_txn(
|
||||||
|
|
Loading…
Reference in New Issue