Merge pull request #58 from matrix-org/get_event_counters

Add performance counters for different stages of loading events
This commit is contained in:
Mark Haines 2015-02-10 15:15:20 +00:00
commit f88d3ee8ae
1 changed files with 61 additions and 25 deletions

View File

@ -77,6 +77,43 @@ class LoggingTransaction(object):
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
class PerformanceCounters(object):
def __init__(self):
self.current_counters = {}
self.previous_counters = {}
def update(self, key, start_time, end_time=None):
if end_time is None:
end_time = time.time() * 1000;
duration = end_time - start_time
count, cum_time = self.current_counters.get(key, (0, 0))
count += 1
cum_time += duration
self.current_counters[key] = (count, cum_time)
return end_time
def interval(self, interval_duration, limit=3):
counters = []
for name, (count, cum_time) in self.current_counters.items():
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
counters.append((
(cum_time - prev_time) / interval_duration,
count - prev_count,
name
))
self.previous_counters = dict(self.current_counters)
counters.sort(reverse=True)
top_n_counters = ", ".join(
"%s(%d): %.3f%%" % (name, count, 100 * ratio)
for ratio, count, name in counters[:limit]
)
return top_n_counters
class SQLBaseStore(object): class SQLBaseStore(object):
_TXN_ID = 0 _TXN_ID = 0
@ -88,8 +125,8 @@ class SQLBaseStore(object):
self._previous_txn_total_time = 0 self._previous_txn_total_time = 0
self._current_txn_total_time = 0 self._current_txn_total_time = 0
self._previous_loop_ts = 0 self._previous_loop_ts = 0
self._txn_perf_counters = {} self._txn_perf_counters = PerformanceCounters()
self._previous_txn_perf_counters = {} self._get_event_counters = PerformanceCounters()
def start_profiling(self): def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec() self._previous_loop_ts = self._clock.time_msec()
@ -105,28 +142,17 @@ class SQLBaseStore(object):
ratio = (curr - prev)/(time_now - time_then) ratio = (curr - prev)/(time_now - time_then)
txn_counters = [] top_three_counters = self._txn_perf_counters.interval(
for name, (count, cum_time) in self._txn_perf_counters.items(): time_now - time_then, limit=3
prev_count, prev_time = self._previous_txn_perf_counters.get( )
name, (0,0)
)
txn_counters.append((
(cum_time - prev_time) / (time_now - time_then),
count - prev_count,
name
))
self._previous_txn_perf_counters = dict(self._txn_perf_counters) top_3_event_counters = self._get_event_counters.interval(
time_now - time_then, limit=3
txn_counters.sort(reverse=True)
top_three_counters = ", ".join(
"%s(%d): %.3f%%" % (name, count, 100 * ratio)
for ratio, count, name in txn_counters[:3]
) )
logger.info( logger.info(
"Total database time: %.3f%% {%s}", "Total database time: %.3f%% {%s} {%s}",
ratio * 100, top_three_counters ratio * 100, top_three_counters, top_3_event_counters
) )
self._clock.looping_call(loop, 10000) self._clock.looping_call(loop, 10000)
@ -162,11 +188,7 @@ class SQLBaseStore(object):
) )
self._current_txn_total_time += end - start self._current_txn_total_time += end - start
self._txn_perf_counters.update(desc, start, end)
count, cum_time = self._txn_perf_counters.get(desc, (0,0))
count += 1
cum_time += end - start
self._txn_perf_counters[desc] = (count, cum_time)
with PreserveLoggingContext(): with PreserveLoggingContext():
result = yield self._db_pool.runInteraction( result = yield self._db_pool.runInteraction(
@ -566,6 +588,8 @@ class SQLBaseStore(object):
"LIMIT 1 " "LIMIT 1 "
) )
start_time = time.time() * 1000;
txn.execute(sql, (event_id,)) txn.execute(sql, (event_id,))
res = txn.fetchone() res = txn.fetchone()
@ -575,6 +599,8 @@ class SQLBaseStore(object):
internal_metadata, js, redacted, rejected_reason = res internal_metadata, js, redacted, rejected_reason = res
self._get_event_counters.update("select_event", start_time)
if allow_rejected or not rejected_reason: if allow_rejected or not rejected_reason:
return self._get_event_from_row_txn( return self._get_event_from_row_txn(
txn, internal_metadata, js, redacted, txn, internal_metadata, js, redacted,
@ -586,10 +612,18 @@ class SQLBaseStore(object):
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False): check_redacted=True, get_prev_content=False):
start_time = time.time() * 1000;
update_counter = self._get_event_counters.update
d = json.loads(js) d = json.loads(js)
start_time = update_counter("decode_json", start_time)
internal_metadata = json.loads(internal_metadata) internal_metadata = json.loads(internal_metadata)
start_time = update_counter("decode_internal", start_time)
ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
start_time = update_counter("build_frozen_event", start_time)
if check_redacted and redacted: if check_redacted and redacted:
ev = prune_event(ev) ev = prune_event(ev)
@ -605,6 +639,7 @@ class SQLBaseStore(object):
if because: if because:
ev.unsigned["redacted_because"] = because ev.unsigned["redacted_because"] = because
start_time = update_counter("redact_event", start_time)
if get_prev_content and "replaces_state" in ev.unsigned: if get_prev_content and "replaces_state" in ev.unsigned:
prev = self._get_event_txn( prev = self._get_event_txn(
@ -614,6 +649,7 @@ class SQLBaseStore(object):
) )
if prev: if prev:
ev.unsigned["prev_content"] = prev.get_dict()["content"] ev.unsigned["prev_content"] = prev.get_dict()["content"]
start_time = update_counter("get_prev_content", start_time)
return ev return ev