Make deleting push actions more efficient
There's no index on received_ts, so manually binary search using the stream_ordering index, and only update it once an hour.
This commit is contained in:
parent
149fa411e2
commit
d4503e25ed
|
@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
self._clock = hs.get_clock()
|
||||||
self.database_engine = hs.database_engine
|
self.database_engine = hs.database_engine
|
||||||
|
|
||||||
self.client_ip_last_seen = Cache(
|
self.client_ip_last_seen = Cache(
|
||||||
|
@ -173,6 +174,14 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
prefilled_cache=push_rules_prefill,
|
prefilled_cache=push_rules_prefill,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
cur = db_conn.cursor()
|
||||||
|
self._find_stream_orderings_for_times_txn(cur)
|
||||||
|
cur.close()
|
||||||
|
|
||||||
|
self.find_stream_orderings_looping_call = self._clock.looping_call(
|
||||||
|
self._find_stream_orderings_for_times, 60 * 60 * 1000
|
||||||
|
)
|
||||||
|
|
||||||
super(DataStore, self).__init__(hs)
|
super(DataStore, self).__init__(hs)
|
||||||
|
|
||||||
def take_presence_startup_info(self):
|
def take_presence_startup_info(self):
|
||||||
|
|
|
@ -153,7 +153,6 @@ class SQLBaseStore(object):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self._db_pool = hs.get_db_pool()
|
self._db_pool = hs.get_db_pool()
|
||||||
self._clock = hs.get_clock()
|
|
||||||
|
|
||||||
self._previous_txn_total_time = 0
|
self._previous_txn_total_time = 0
|
||||||
self._current_txn_total_time = 0
|
self._current_txn_total_time = 0
|
||||||
|
|
|
@ -22,10 +22,12 @@ import ujson as json
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000
|
|
||||||
|
|
||||||
|
|
||||||
class EventPushActionsStore(SQLBaseStore):
|
class EventPushActionsStore(SQLBaseStore):
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.stream_ordering_month_ago = None
|
||||||
|
super(EventPushActionsStore, self).__init__(hs)
|
||||||
|
|
||||||
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
|
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
|
@ -237,9 +239,6 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
user_id: user ID to delete for
|
user_id: user ID to delete for
|
||||||
topological_ordering: The lowest topological ordering which will
|
topological_ordering: The lowest topological ordering which will
|
||||||
not be deleted.
|
not be deleted.
|
||||||
|
|
||||||
Returns:
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
||||||
|
@ -259,15 +258,63 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"DELETE FROM event_push_actions "
|
"DELETE FROM event_push_actions "
|
||||||
" WHERE user_id = ? AND room_id = ? AND "
|
" WHERE user_id = ? AND room_id = ? AND "
|
||||||
" topological_ordering < ? AND stream_ordering < ("
|
" topological_ordering < ? AND stream_ordering < ?"
|
||||||
" SELECT stream_ordering FROM events"
|
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
|
||||||
" WHERE room_id = ? AND received_ts < ?"
|
|
||||||
" ORDER BY stream_ordering DESC"
|
|
||||||
" LIMIT 1"
|
|
||||||
")",
|
|
||||||
(user_id, room_id, topological_ordering, room_id, threshold)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _find_stream_orderings_for_times(self):
|
||||||
|
yield self.runInteraction(
|
||||||
|
"_find_stream_orderings_for_times",
|
||||||
|
self._find_stream_orderings_for_times_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
def _find_stream_orderings_for_times_txn(self, txn):
|
||||||
|
logger.info("Searching for stream ordering 1 month ago")
|
||||||
|
self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
|
||||||
|
txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Found stream ordering 1 month ago: it's %d",
|
||||||
|
self.stream_ordering_month_ago
|
||||||
|
)
|
||||||
|
|
||||||
|
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
|
||||||
|
"""
|
||||||
|
Find the stream_ordering of the first event that was received after
|
||||||
|
a given timestamp. This is relatively slow as there is no index on
|
||||||
|
received_ts but we can then use this to delete push actions before
|
||||||
|
this.
|
||||||
|
|
||||||
|
received_ts must necessarily be in the same order as stream_ordering
|
||||||
|
and stream_ordering is indexed, so we manually binary search using
|
||||||
|
stream_ordering
|
||||||
|
"""
|
||||||
|
txn.execute("SELECT MAX(stream_ordering) FROM events")
|
||||||
|
max_stream_ordering = txn.fetchone()[0]
|
||||||
|
|
||||||
|
range_start = 0
|
||||||
|
range_end = max_stream_ordering
|
||||||
|
|
||||||
|
sql = (
|
||||||
|
"SELECT received_ts FROM events"
|
||||||
|
" WHERE stream_ordering > ?"
|
||||||
|
" ORDER BY stream_ordering"
|
||||||
|
" LIMIT 1"
|
||||||
|
)
|
||||||
|
|
||||||
|
while range_end - range_start > 1:
|
||||||
|
middle = int((range_end + range_start) / 2)
|
||||||
|
txn.execute(sql, (middle,))
|
||||||
|
middle_ts = txn.fetchone()[0]
|
||||||
|
if ts > middle_ts:
|
||||||
|
range_start = middle
|
||||||
|
else:
|
||||||
|
range_end = middle
|
||||||
|
logger.info("done: picking %d from %d and %d", range_end, range_start, range_end)
|
||||||
|
|
||||||
|
return range_end
|
||||||
|
|
||||||
|
|
||||||
def _action_has_highlight(actions):
|
def _action_has_highlight(actions):
|
||||||
for action in actions:
|
for action in actions:
|
||||||
|
|
Loading…
Reference in New Issue