Merge branch 'develop' of github.com:matrix-org/synapse into erikj/cache_memberships

This commit is contained in:
Erik Johnston 2019-10-03 17:51:06 +01:00
commit 6511071837
5 changed files with 44 additions and 19 deletions

1
changelog.d/6160.misc Normal file
View File

@ -0,0 +1 @@
Add some metrics on the federation sender.

View File

@ -38,7 +38,7 @@ from synapse.metrics import (
events_processed_counter, events_processed_counter,
) )
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import measure_func from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -183,8 +183,8 @@ class FederationSender(object):
# Otherwise if the last member on a server in a room is # Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't # banned then it won't receive the event because it won't
# be in the room after the ban. # be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room( destinations = yield self.state.get_hosts_in_room_at_events(
event.room_id, latest_event_ids=event.prev_event_ids() event.room_id, event_ids=event.prev_event_ids()
) )
except Exception: except Exception:
logger.exception( logger.exception(
@ -207,6 +207,7 @@ class FederationSender(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_room_events(events): def handle_room_events(events):
with Measure(self.clock, "handle_room_events"):
for event in events: for event in events:
yield handle_event(event) yield handle_event(event)

View File

@ -33,7 +33,7 @@ from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -191,11 +191,22 @@ class StateHandler(object):
return joined_users return joined_users
@defer.inlineCallbacks @defer.inlineCallbacks
def get_current_hosts_in_room(self, room_id, latest_event_ids=None): def get_current_hosts_in_room(self, room_id):
if not latest_event_ids: event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) return (yield self.get_hosts_in_room_at_events(room_id, event_ids))
logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids) @defer.inlineCallbacks
def get_hosts_in_room_at_events(self, room_id, event_ids):
"""Get the hosts that were in a room at the given event ids
Args:
room_id (str):
event_ids (list[str]):
Returns:
Deferred[list[str]]: the hosts in the room at the given events
"""
entry = yield self.resolve_state_groups_for_events(room_id, event_ids)
joined_hosts = yield self.store.get_joined_hosts(room_id, entry) joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
return joined_hosts return joined_hosts
@ -344,6 +355,7 @@ class StateHandler(object):
return context return context
@measure_func()
@defer.inlineCallbacks @defer.inlineCallbacks
def resolve_state_groups_for_events(self, room_id, event_ids): def resolve_state_groups_for_events(self, room_id, event_ids):
""" Given a list of event_ids this method fetches the state at each """ Given a list of event_ids this method fetches the state at each

View File

@ -33,6 +33,7 @@ from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util.metrics import Measure
from synapse.util.stringutils import to_ascii from synapse.util.stringutils import to_ascii
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -483,6 +484,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
) )
return result return result
@defer.inlineCallbacks
def get_joined_users_from_state(self, room_id, state_entry): def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group state_group = state_entry.state_group
if not state_group: if not state_group:
@ -492,9 +494,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object() # To do this we set the state_group to a new object as object() != object()
state_group = object() state_group = object()
return self._get_joined_users_from_context( with Measure(self._clock, "get_joined_users_from_state"):
return (
yield self._get_joined_users_from_context(
room_id, state_group, state_entry.state, context=state_entry room_id, state_group, state_entry.state, context=state_entry
) )
)
@cachedInlineCallbacks( @cachedInlineCallbacks(
num_args=2, cache_context=True, iterable=True, max_entries=100000 num_args=2, cache_context=True, iterable=True, max_entries=100000
@ -695,6 +700,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return True return True
@defer.inlineCallbacks
def get_joined_hosts(self, room_id, state_entry): def get_joined_hosts(self, room_id, state_entry):
state_group = state_entry.state_group state_group = state_entry.state_group
if not state_group: if not state_group:
@ -704,9 +710,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object() # To do this we set the state_group to a new object as object() != object()
state_group = object() state_group = object()
return self._get_joined_hosts( with Measure(self._clock, "get_joined_hosts"):
return (
yield self._get_joined_hosts(
room_id, state_group, state_entry.state, state_entry=state_entry room_id, state_group, state_entry.state, state_entry=state_entry
) )
)
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
# @defer.inlineCallbacks # @defer.inlineCallbacks

View File

@ -60,12 +60,14 @@ in_flight = InFlightGauge(
) )
def measure_func(name): def measure_func(name=None):
def wrapper(func): def wrapper(func):
block_name = func.__name__ if name is None else name
@wraps(func) @wraps(func)
@defer.inlineCallbacks @defer.inlineCallbacks
def measured_func(self, *args, **kwargs): def measured_func(self, *args, **kwargs):
with Measure(self.clock, name): with Measure(self.clock, block_name):
r = yield func(self, *args, **kwargs) r = yield func(self, *args, **kwargs)
return r return r