Fix phone home stats

This commit is contained in:
Erik Johnston 2017-06-14 19:37:17 +01:00
parent 3accee1a8c
commit 617304b2cf
3 changed files with 56 additions and 107 deletions

View File

@ -35,7 +35,7 @@ from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_d
from synapse.server import HomeServer from synapse.server import HomeServer
from twisted.internet import reactor, task, defer from twisted.internet import reactor, defer
from twisted.application import service from twisted.application import service
from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.web.static import File from twisted.web.static import File
@ -53,7 +53,7 @@ from synapse.api.urls import (
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics import register_memory_metrics
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.federation.transport.server import TransportLayerServer from synapse.federation.transport.server import TransportLayerServer
@ -398,7 +398,8 @@ def run(hs):
ThreadPool._worker = profile(ThreadPool._worker) ThreadPool._worker = profile(ThreadPool._worker)
reactor.run = profile(reactor.run) reactor.run = profile(reactor.run)
start_time = hs.get_clock().time() clock = hs.get_clock()
start_time = clock.time()
stats = {} stats = {}
@ -410,41 +411,14 @@ def run(hs):
if uptime < 0: if uptime < 0:
uptime = 0 uptime = 0
# If the stats directory is empty then this is the first time we've
# reported stats.
first_time = not stats
stats["homeserver"] = hs.config.server_name stats["homeserver"] = hs.config.server_name
stats["timestamp"] = now stats["timestamp"] = now
stats["uptime_seconds"] = uptime stats["uptime_seconds"] = uptime
stats["total_users"] = yield hs.get_datastore().count_all_users() stats["total_users"] = yield hs.get_datastore().count_all_users()
room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users() stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
daily_messages = yield hs.get_datastore().count_daily_messages() stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
if daily_messages is not None: daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_messages"] = daily_messages stats["daily_sent_messages"] = daily_sent_messages
else:
stats.pop("daily_messages", None)
if first_time:
# Add callbacks to report the synapse stats as metrics whenever
# prometheus requests them, typically every 30s.
# As some of the stats are expensive to calculate we only update
# them when synapse phones home to matrix.org every 24 hours.
metrics = get_metrics_for("synapse.usage")
metrics.add_callback("timestamp", lambda: stats["timestamp"])
metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"])
metrics.add_callback("total_users", lambda: stats["total_users"])
metrics.add_callback("total_room_count", lambda: stats["total_room_count"])
metrics.add_callback(
"daily_active_users", lambda: stats["daily_active_users"]
)
metrics.add_callback(
"daily_messages", lambda: stats.get("daily_messages", 0)
)
logger.info("Reporting stats to matrix.org: %s" % (stats,)) logger.info("Reporting stats to matrix.org: %s" % (stats,))
try: try:
@ -456,9 +430,12 @@ def run(hs):
logger.warn("Error reporting stats: %s", e) logger.warn("Error reporting stats: %s", e)
if hs.config.report_stats: if hs.config.report_stats:
phone_home_task = task.LoopingCall(phone_stats_home) logger.info("Scheduling stats reporting for 3 hour intervals")
logger.info("Scheduling stats reporting for 24 hour intervals") clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
phone_home_task.start(60 * 60 * 24, now=False)
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
def in_thread(): def in_thread():
# Uncomment to enable tracing of log context changes. # Uncomment to enable tracing of log context changes.

View File

@ -231,7 +231,7 @@ class DataStore(RoomMemberStore, RoomStore,
cur.close() cur.close()
self.find_stream_orderings_looping_call = self._clock.looping_call( self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 60 * 60 * 1000 self._find_stream_orderings_for_times, 10 * 60 * 1000
) )
self._stream_order_on_start = self.get_room_max_stream_ordering() self._stream_order_on_start = self.get_room_max_stream_ordering()
@ -272,17 +272,19 @@ class DataStore(RoomMemberStore, RoomStore,
Counts the number of users who used this homeserver in the last 24 hours. Counts the number of users who used this homeserver in the last 24 hours.
""" """
def _count_users(txn): def _count_users(txn):
txn.execute( yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
"SELECT COUNT(DISTINCT user_id) AS users"
" FROM user_ips" sql = """
" WHERE last_seen > ?", SELECT COALESCE(count(*), 0) FROM (
# This is close enough to a day for our purposes. SELECT user_id FROM user_ips
(int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),) WHERE last_seen > ?
) GROUP BY user_id
rows = self.cursor_to_dict(txn) ) u
if rows: """
return rows[0]["users"]
return 0 txn.execute(sql, (yesterday,))
count, = txn.fetchone()
return count
ret = yield self.runInteraction("count_users", _count_users) ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret) defer.returnValue(ret)

View File

@ -38,7 +38,6 @@ from functools import wraps
import synapse.metrics import synapse.metrics
import logging import logging
import math
import ujson as json import ujson as json
# these are only included to make the type annotations work # these are only included to make the type annotations work
@ -1599,68 +1598,39 @@ class EventsStore(SQLBaseStore):
call to this function, it will return None. call to this function, it will return None.
""" """
def _count_messages(txn): def _count_messages(txn):
now = self.hs.get_clock().time() sql = """
SELECT COALESCE(COUNT(*), 0) FROM events
txn.execute( WHERE type = 'm.room.message'
"SELECT reported_stream_token, reported_time FROM stats_reporting" AND stream_ordering > ?
) """
last_reported = self.cursor_to_dict(txn) txn.execute(sql, (self.stream_ordering_day_ago,))
count, = txn.fetchone()
txn.execute( return count
"SELECT stream_ordering"
" FROM events"
" ORDER BY stream_ordering DESC"
" LIMIT 1"
)
now_reporting = self.cursor_to_dict(txn)
if not now_reporting:
logger.info("Calculating daily messages skipped; no now_reporting")
return None
now_reporting = now_reporting[0]["stream_ordering"]
txn.execute("DELETE FROM stats_reporting")
txn.execute(
"INSERT INTO stats_reporting"
" (reported_stream_token, reported_time)"
" VALUES (?, ?)",
(now_reporting, now,)
)
if not last_reported:
logger.info("Calculating daily messages skipped; no last_reported")
return None
# Close enough to correct for our purposes.
yesterday = (now - 24 * 60 * 60)
since_yesterday_seconds = yesterday - last_reported[0]["reported_time"]
any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60
if any_since_yesterday:
logger.info(
"Calculating daily messages skipped; since_yesterday_seconds: %d" %
(since_yesterday_seconds,)
)
return None
txn.execute(
"SELECT COUNT(*) as messages"
" FROM events NATURAL JOIN event_json"
" WHERE json like '%m.room.message%'"
" AND stream_ordering > ?"
" AND stream_ordering <= ?",
(
last_reported[0]["reported_stream_token"],
now_reporting,
)
)
rows = self.cursor_to_dict(txn)
if not rows:
logger.info("Calculating daily messages skipped; messages count missing")
return None
return rows[0]["messages"]
ret = yield self.runInteraction("count_messages", _count_messages) ret = yield self.runInteraction("count_messages", _count_messages)
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks
def count_daily_sent_messages(self):
def _count_messages(txn):
# This is good enough as if you have silly characters in your own
# hostname then thats your own fault.
like_clause = "%:" + self.hs.hostname
sql = """
SELECT COALESCE(COUNT(*), 0) FROM events
WHERE type = 'm.room.message'
AND sender LIKE ?
AND stream_ordering > ?
"""
txn.execute(sql, (like_clause, self.stream_ordering_day_ago,))
count, = txn.fetchone()
return count
ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
defer.returnValue(ret)
@defer.inlineCallbacks @defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size): def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"] target_min_stream_id = progress["target_min_stream_id_inclusive"]