Merge pull request #586 from matrix-org/erikj/presence
Fix presence `currently_active`. Add presence metrics.
This commit is contained in:
commit
ea7786e8ca
|
@ -44,6 +44,12 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
notified_presence_counter = metrics.register_counter("notified_presence")
|
||||||
|
presence_updates_counter = metrics.register_counter("presence_updates")
|
||||||
|
presence_updates_counter = metrics.register_counter("presence_updates")
|
||||||
|
timers_fired_counter = metrics.register_counter("timers_fired")
|
||||||
|
federation_presence_counter = metrics.register_counter("federation_presence")
|
||||||
|
|
||||||
|
|
||||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||||
# "currently_active"
|
# "currently_active"
|
||||||
|
@ -170,6 +176,8 @@ class PresenceHandler(BaseHandler):
|
||||||
5000,
|
5000,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _on_shutdown(self):
|
def _on_shutdown(self):
|
||||||
"""Gets called when shutting down. This lets us persist any updates that
|
"""Gets called when shutting down. This lets us persist any updates that
|
||||||
|
@ -233,7 +241,10 @@ class PresenceHandler(BaseHandler):
|
||||||
|
|
||||||
# TODO: We should probably ensure there are no races hereafter
|
# TODO: We should probably ensure there are no races hereafter
|
||||||
|
|
||||||
|
presence_updates_counter.inc_by(len(new_states))
|
||||||
|
|
||||||
if to_notify:
|
if to_notify:
|
||||||
|
notified_presence_counter.inc_by(len(to_notify))
|
||||||
yield self._persist_and_notify(to_notify.values())
|
yield self._persist_and_notify(to_notify.values())
|
||||||
|
|
||||||
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
|
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
|
||||||
|
@ -268,6 +279,8 @@ class PresenceHandler(BaseHandler):
|
||||||
for user_id in set(users_to_check)
|
for user_id in set(users_to_check)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
timers_fired_counter.inc_by(len(states))
|
||||||
|
|
||||||
changes = handle_timeouts(
|
changes = handle_timeouts(
|
||||||
states,
|
states,
|
||||||
is_mine_fn=self.hs.is_mine_id,
|
is_mine_fn=self.hs.is_mine_id,
|
||||||
|
@ -499,6 +512,7 @@ class PresenceHandler(BaseHandler):
|
||||||
updates.append(prev_state.copy_and_replace(**new_fields))
|
updates.append(prev_state.copy_and_replace(**new_fields))
|
||||||
|
|
||||||
if updates:
|
if updates:
|
||||||
|
federation_presence_counter.inc_by(len(updates))
|
||||||
yield self._update_states(updates)
|
yield self._update_states(updates)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -981,6 +995,18 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
||||||
then=new_state.last_active_ts + IDLE_TIMER
|
then=new_state.last_active_ts + IDLE_TIMER
|
||||||
)
|
)
|
||||||
|
|
||||||
|
active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
|
||||||
|
new_state = new_state.copy_and_replace(
|
||||||
|
currently_active=active,
|
||||||
|
)
|
||||||
|
|
||||||
|
if active:
|
||||||
|
wheel_timer.insert(
|
||||||
|
now=now,
|
||||||
|
obj=user_id,
|
||||||
|
then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY
|
||||||
|
)
|
||||||
|
|
||||||
if new_state.state != PresenceState.OFFLINE:
|
if new_state.state != PresenceState.OFFLINE:
|
||||||
# User has stopped syncing
|
# User has stopped syncing
|
||||||
wheel_timer.insert(
|
wheel_timer.insert(
|
||||||
|
@ -1004,12 +1030,6 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
||||||
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
|
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
|
||||||
)
|
)
|
||||||
|
|
||||||
if new_state.state == PresenceState.ONLINE:
|
|
||||||
active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
|
|
||||||
new_state = new_state.copy_and_replace(
|
|
||||||
currently_active=active,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check whether the change was something worth notifying about
|
# Check whether the change was something worth notifying about
|
||||||
if should_notify(prev_state, new_state):
|
if should_notify(prev_state, new_state):
|
||||||
new_state = new_state.copy_and_replace(
|
new_state = new_state.copy_and_replace(
|
||||||
|
|
|
@ -89,3 +89,9 @@ class WheelTimer(object):
|
||||||
ret.extend(self.entries.pop(0).queue)
|
ret.extend(self.entries.pop(0).queue)
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
l = 0
|
||||||
|
for entry in self.entries:
|
||||||
|
l += len(entry.queue)
|
||||||
|
return l
|
||||||
|
|
|
@ -49,7 +49,7 @@ class PresenceUpdateTestCase(unittest.TestCase):
|
||||||
self.assertEquals(new_state.status_msg, state.status_msg)
|
self.assertEquals(new_state.status_msg, state.status_msg)
|
||||||
self.assertEquals(state.last_federation_update_ts, now)
|
self.assertEquals(state.last_federation_update_ts, now)
|
||||||
|
|
||||||
self.assertEquals(wheel_timer.insert.call_count, 2)
|
self.assertEquals(wheel_timer.insert.call_count, 3)
|
||||||
wheel_timer.insert.assert_has_calls([
|
wheel_timer.insert.assert_has_calls([
|
||||||
call(
|
call(
|
||||||
now=now,
|
now=now,
|
||||||
|
@ -60,7 +60,12 @@ class PresenceUpdateTestCase(unittest.TestCase):
|
||||||
now=now,
|
now=now,
|
||||||
obj=user_id,
|
obj=user_id,
|
||||||
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
|
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
|
||||||
)
|
),
|
||||||
|
call(
|
||||||
|
now=now,
|
||||||
|
obj=user_id,
|
||||||
|
then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY
|
||||||
|
),
|
||||||
], any_order=True)
|
], any_order=True)
|
||||||
|
|
||||||
def test_online_to_online(self):
|
def test_online_to_online(self):
|
||||||
|
@ -91,7 +96,7 @@ class PresenceUpdateTestCase(unittest.TestCase):
|
||||||
self.assertEquals(new_state.status_msg, state.status_msg)
|
self.assertEquals(new_state.status_msg, state.status_msg)
|
||||||
self.assertEquals(state.last_federation_update_ts, now)
|
self.assertEquals(state.last_federation_update_ts, now)
|
||||||
|
|
||||||
self.assertEquals(wheel_timer.insert.call_count, 2)
|
self.assertEquals(wheel_timer.insert.call_count, 3)
|
||||||
wheel_timer.insert.assert_has_calls([
|
wheel_timer.insert.assert_has_calls([
|
||||||
call(
|
call(
|
||||||
now=now,
|
now=now,
|
||||||
|
@ -102,7 +107,12 @@ class PresenceUpdateTestCase(unittest.TestCase):
|
||||||
now=now,
|
now=now,
|
||||||
obj=user_id,
|
obj=user_id,
|
||||||
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
|
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
|
||||||
)
|
),
|
||||||
|
call(
|
||||||
|
now=now,
|
||||||
|
obj=user_id,
|
||||||
|
then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY
|
||||||
|
),
|
||||||
], any_order=True)
|
], any_order=True)
|
||||||
|
|
||||||
def test_online_to_online_last_active(self):
|
def test_online_to_online_last_active(self):
|
||||||
|
@ -153,6 +163,7 @@ class PresenceUpdateTestCase(unittest.TestCase):
|
||||||
prev_state = UserPresenceState.default(user_id)
|
prev_state = UserPresenceState.default(user_id)
|
||||||
prev_state = prev_state.copy_and_replace(
|
prev_state = prev_state.copy_and_replace(
|
||||||
state=PresenceState.ONLINE,
|
state=PresenceState.ONLINE,
|
||||||
|
last_active_ts=now,
|
||||||
)
|
)
|
||||||
|
|
||||||
new_state = prev_state.copy_and_replace(
|
new_state = prev_state.copy_and_replace(
|
||||||
|
|
Loading…
Reference in New Issue