Merge pull request #2366 from matrix-org/erikj/push_metrics
Add more metrics to push rule evaluation
This commit is contained in:
commit
d3862812ff
|
@ -20,6 +20,8 @@ from twisted.internet import defer
|
||||||
from .push_rule_evaluator import PushRuleEvaluatorForEvent
|
from .push_rule_evaluator import PushRuleEvaluatorForEvent
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
|
from synapse.metrics import get_metrics_for
|
||||||
|
from synapse.util.caches import metrics as cache_metrics
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.util.async import Linearizer
|
from synapse.util.async import Linearizer
|
||||||
|
|
||||||
|
@ -31,6 +33,23 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
rules_by_room = {}
|
rules_by_room = {}
|
||||||
|
|
||||||
|
push_metrics = get_metrics_for(__name__)
|
||||||
|
|
||||||
|
push_rules_invalidation_counter = push_metrics.register_counter(
|
||||||
|
"push_rules_invalidation_counter"
|
||||||
|
)
|
||||||
|
push_rules_state_size_counter = push_metrics.register_counter(
|
||||||
|
"push_rules_state_size_counter"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Measures whether we use the fast path of using state deltas, or if we have to
|
||||||
|
# recalculate from scratch
|
||||||
|
push_rules_delta_state_cache_metric = cache_metrics.register_cache(
|
||||||
|
"cache",
|
||||||
|
size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values
|
||||||
|
cache_name="push_rules_delta_state_cache_metric",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class BulkPushRuleEvaluator(object):
|
class BulkPushRuleEvaluator(object):
|
||||||
"""Calculates the outcome of push rules for an event for all users in the
|
"""Calculates the outcome of push rules for an event for all users in the
|
||||||
|
@ -41,6 +60,12 @@ class BulkPushRuleEvaluator(object):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
self.room_push_rule_cache_metrics = cache_metrics.register_cache(
|
||||||
|
"cache",
|
||||||
|
size_callback=lambda: 0, # There's not good value for this
|
||||||
|
cache_name="room_push_rule_cache",
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_rules_for_event(self, event, context):
|
def _get_rules_for_event(self, event, context):
|
||||||
"""This gets the rules for all users in the room at the time of the event,
|
"""This gets the rules for all users in the room at the time of the event,
|
||||||
|
@ -78,7 +103,10 @@ class BulkPushRuleEvaluator(object):
|
||||||
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
|
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
|
||||||
# before any lookup methods get called on it as otherwise there may be
|
# before any lookup methods get called on it as otherwise there may be
|
||||||
# a race if invalidate_all gets called (which assumes its in the cache)
|
# a race if invalidate_all gets called (which assumes its in the cache)
|
||||||
return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
|
return RulesForRoom(
|
||||||
|
self.hs, room_id, self._get_rules_for_room.cache,
|
||||||
|
self.room_push_rule_cache_metrics,
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def action_for_event_by_user(self, event, context):
|
def action_for_event_by_user(self, event, context):
|
||||||
|
@ -161,17 +189,19 @@ class RulesForRoom(object):
|
||||||
the entire cache for the room.
|
the entire cache for the room.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs, room_id, rules_for_room_cache):
|
def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
hs (HomeServer)
|
hs (HomeServer)
|
||||||
room_id (str)
|
room_id (str)
|
||||||
rules_for_room_cache(Cache): The cache object that caches these
|
rules_for_room_cache(Cache): The cache object that caches these
|
||||||
RoomsForUser objects.
|
RoomsForUser objects.
|
||||||
|
room_push_rule_cache_metrics (CacheMetric)
|
||||||
"""
|
"""
|
||||||
self.room_id = room_id
|
self.room_id = room_id
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
|
self.room_push_rule_cache_metrics = room_push_rule_cache_metrics
|
||||||
|
|
||||||
self.linearizer = Linearizer(name="rules_for_room")
|
self.linearizer = Linearizer(name="rules_for_room")
|
||||||
|
|
||||||
|
@ -215,13 +245,17 @@ class RulesForRoom(object):
|
||||||
|
|
||||||
if state_group and self.state_group == state_group:
|
if state_group and self.state_group == state_group:
|
||||||
logger.debug("Using cached rules for %r", self.room_id)
|
logger.debug("Using cached rules for %r", self.room_id)
|
||||||
|
self.room_push_rule_cache_metrics.inc_hits()
|
||||||
defer.returnValue(self.rules_by_user)
|
defer.returnValue(self.rules_by_user)
|
||||||
|
|
||||||
with (yield self.linearizer.queue(())):
|
with (yield self.linearizer.queue(())):
|
||||||
if state_group and self.state_group == state_group:
|
if state_group and self.state_group == state_group:
|
||||||
logger.debug("Using cached rules for %r", self.room_id)
|
logger.debug("Using cached rules for %r", self.room_id)
|
||||||
|
self.room_push_rule_cache_metrics.inc_hits()
|
||||||
defer.returnValue(self.rules_by_user)
|
defer.returnValue(self.rules_by_user)
|
||||||
|
|
||||||
|
self.room_push_rule_cache_metrics.inc_misses()
|
||||||
|
|
||||||
ret_rules_by_user = {}
|
ret_rules_by_user = {}
|
||||||
missing_member_event_ids = {}
|
missing_member_event_ids = {}
|
||||||
if state_group and self.state_group == context.prev_group:
|
if state_group and self.state_group == context.prev_group:
|
||||||
|
@ -229,8 +263,13 @@ class RulesForRoom(object):
|
||||||
# results.
|
# results.
|
||||||
ret_rules_by_user = self.rules_by_user
|
ret_rules_by_user = self.rules_by_user
|
||||||
current_state_ids = context.delta_ids
|
current_state_ids = context.delta_ids
|
||||||
|
|
||||||
|
push_rules_delta_state_cache_metric.inc_hits()
|
||||||
else:
|
else:
|
||||||
current_state_ids = context.current_state_ids
|
current_state_ids = context.current_state_ids
|
||||||
|
push_rules_delta_state_cache_metric.inc_misses()
|
||||||
|
|
||||||
|
push_rules_state_size_counter.inc_by(len(current_state_ids))
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Looking for member changes in %r %r", state_group, current_state_ids
|
"Looking for member changes in %r %r", state_group, current_state_ids
|
||||||
|
@ -375,6 +414,7 @@ class RulesForRoom(object):
|
||||||
self.state_group = object()
|
self.state_group = object()
|
||||||
self.member_map = {}
|
self.member_map = {}
|
||||||
self.rules_by_user = {}
|
self.rules_by_user = {}
|
||||||
|
push_rules_invalidation_counter.inc()
|
||||||
|
|
||||||
def update_cache(self, sequence, members, rules_by_user, state_group):
|
def update_cache(self, sequence, members, rules_by_user, state_group):
|
||||||
if sequence == self.sequence:
|
if sequence == self.sequence:
|
||||||
|
|
Loading…
Reference in New Issue