instead of inserting user daily visit data at the end of the day, instead insert incrementally through the day
This commit is contained in:
parent
977765bde2
commit
f077e97914
|
@ -17,6 +17,7 @@ import gc
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import datetime
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
import synapse.config.logger
|
import synapse.config.logger
|
||||||
|
@ -475,9 +476,24 @@ def run(hs):
|
||||||
" changes across releases."
|
" changes across releases."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# def recurring_user_daily_visit_stats():
|
||||||
|
|
||||||
def generate_user_daily_visit_stats():
|
def generate_user_daily_visit_stats():
|
||||||
hs.get_datastore().generate_user_daily_visits()
|
hs.get_datastore().generate_user_daily_visits()
|
||||||
|
|
||||||
|
# Since user daily stats are bucketed at midnight UTC,
|
||||||
|
# and user_ips.last_seen can be updated at any time, it is important to call
|
||||||
|
# generate_user_daily_visit_stats immediately prior to the day end. Assuming
|
||||||
|
# an hourly cadence, the simplist way is to allign all calls to the hour
|
||||||
|
# end
|
||||||
|
end_of_hour = datetime.datetime.now().replace(microsecond=0, second=0, minute=0) \
|
||||||
|
+ datetime.timedelta(hours=1) \
|
||||||
|
- datetime.timedelta(seconds=10) # Ensure method fires before day transistion
|
||||||
|
|
||||||
|
time_to_next_hour = end_of_hour - datetime.datetime.now()
|
||||||
|
clock.call_later(time_to_next_hour.seconds,
|
||||||
|
clock.looping_call(generate_user_daily_visit_stats, 60 * 60 * 1000))
|
||||||
|
|
||||||
if hs.config.report_stats:
|
if hs.config.report_stats:
|
||||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||||
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
|
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
|
||||||
|
@ -490,9 +506,6 @@ def run(hs):
|
||||||
# be quite busy the first few minutes
|
# be quite busy the first few minutes
|
||||||
clock.call_later(5 * 60, phone_stats_home)
|
clock.call_later(5 * 60, phone_stats_home)
|
||||||
|
|
||||||
clock.looping_call(generate_user_daily_visit_stats, 10 * 60 * 1000)
|
|
||||||
clock.call_later(5 * 60, generate_user_daily_visit_stats)
|
|
||||||
|
|
||||||
if hs.config.daemonize and hs.config.print_pidfile:
|
if hs.config.daemonize and hs.config.print_pidfile:
|
||||||
print (hs.config.pid_file)
|
print (hs.config.pid_file)
|
||||||
|
|
||||||
|
|
|
@ -353,48 +353,22 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
Generates daily visit data for use in cohort/ retention analysis
|
Generates daily visit data for use in cohort/ retention analysis
|
||||||
"""
|
"""
|
||||||
def _generate_user_daily_visits(txn):
|
def _generate_user_daily_visits(txn):
|
||||||
logger.info("Calling _generate_user_daily_visits")
|
|
||||||
# determine timestamp of previous days
|
|
||||||
yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1)
|
|
||||||
yesterday_start = datetime.datetime(yesterday.year, yesterday.month,
|
|
||||||
yesterday.day, tzinfo=tz.tzutc())
|
|
||||||
yesterday_start_time = int(time.mktime(yesterday_start.timetuple())) * 1000
|
|
||||||
|
|
||||||
# Check that this job has not already been completed
|
# determine timestamp of the day start
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
today_start = datetime.datetime(now.year, now.month,
|
||||||
|
now.day, tzinfo=tz.tzutc())
|
||||||
|
today_start_time = int(time.mktime(today_start.timetuple())) * 1000
|
||||||
|
logger.info(today_start_time)
|
||||||
sql = """
|
sql = """
|
||||||
SELECT timestamp
|
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
|
||||||
FROM user_daily_visits
|
SELECT user_id, device_id, ?
|
||||||
ORDER by timestamp desc limit 1
|
FROM user_ips AS u
|
||||||
"""
|
LEFT JOIN user_daily_visits USING (user_id, device_id)
|
||||||
txn.execute(sql)
|
WHERE last_seen > ? AND timestamp IS NULL
|
||||||
row = txn.fetchone()
|
GROUP BY user_id, device_id;
|
||||||
|
"""
|
||||||
# Bail if the most recent time is yesterday
|
txn.execute(sql, (today_start_time, today_start_time))
|
||||||
if row and row[0] == yesterday_start_time:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Not specificying an upper bound means that if the update is run at
|
|
||||||
# 10 mins past midnight and the user is active during a 30 min session
|
|
||||||
# that the user is still included in the previous days stats
|
|
||||||
# This does mean that if the update is run hours late, then it is possible
|
|
||||||
# to overstate the cohort, but this seems a reasonable trade off
|
|
||||||
# The alternative is to insert on every request - but prefer to avoid
|
|
||||||
# for performance reasons
|
|
||||||
sql = """
|
|
||||||
SELECT user_id, device_id
|
|
||||||
FROM user_ips
|
|
||||||
WHERE last_seen > ?
|
|
||||||
"""
|
|
||||||
txn.execute(sql, (yesterday_start_time,))
|
|
||||||
user_visits = txn.fetchall()
|
|
||||||
|
|
||||||
sql = """
|
|
||||||
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
|
|
||||||
VALUES (?, ?, ?)
|
|
||||||
"""
|
|
||||||
|
|
||||||
for visit in user_visits:
|
|
||||||
txn.execute(sql, (visit + (yesterday_start_time,)))
|
|
||||||
|
|
||||||
return self.runInteraction("generate_user_daily_visits",
|
return self.runInteraction("generate_user_daily_visits",
|
||||||
_generate_user_daily_visits)
|
_generate_user_daily_visits)
|
||||||
|
|
Loading…
Reference in New Issue