From a9a74101a4925bd208db682952b5dadf4b157a8d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 4 Apr 2018 08:58:53 +0100 Subject: [PATCH 1/2] Document the behaviour of ResponseCache it looks like everything that uses ResponseCache expects to have to `make_deferred_yieldable` its results. It's debatable whether that is the best approach, but let's document it for now to avoid further confusion. --- synapse/util/caches/response_cache.py | 32 +++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 00af539880..4ecd91deb5 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -31,6 +31,18 @@ class ResponseCache(object): self.timeout_sec = timeout_ms / 1000. def get(self, key): + """Look up the given key. + + Returns a deferred which doesn't follow the synapse logcontext rules, + so you'll probably want to make_deferred_yieldable it. + + Args: + key (str): + + Returns: + twisted.internet.defer.Deferred|None: None if there is no entry + for this key; otherwise a deferred result. + """ result = self.pending_result_cache.get(key) if result is not None: return result.observe() @@ -38,6 +50,26 @@ class ResponseCache(object): return None def set(self, key, deferred): + """Set the entry for the given key to the given deferred. + + *deferred* should run its callbacks in the sentinel logcontext (ie, + you should wrap normal synapse deferreds with + logcontext.run_in_background). + + Returns a new Deferred which also doesn't follow the synapse logcontext + rules, so you will want to make_deferred_yieldable it + + (TODO: before using this more widely, it might make sense to refactor + it and get() so that they do the necessary wrapping rather than having + to do it everywhere ResponseCache is used.) + + Args: + key (str): + deferred (twisted.internet.defer.Deferred): + + Returns: + twisted.internet.defer.Deferred + """ result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result From 121591568bdc6f9abde0c09cd80e4a3aa74ee109 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 09:56:00 +0100 Subject: [PATCH 2/2] Send events to ASes concurrently --- synapse/handlers/appservice.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..69eacff949 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -18,7 +18,9 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import ( + make_deferred_yieldable, preserve_fn, run_in_background, +) import logging @@ -84,11 +86,16 @@ class ApplicationServicesHandler(object): if not events: break + events_by_room = {} for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + @defer.inlineCallbacks + def handle_event(event): # Gather interested services services = yield self._get_services_for_event(event) if len(services) == 0: - continue # no services need notifying + return # no services need notifying # Do we know this user exists? If not, poke the user # query API for all services which match that user regex. @@ -108,6 +115,16 @@ class ApplicationServicesHandler(object): service, event ) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + yield make_deferred_yieldable(defer.gatherResults([ + run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True)) + events_processed_counter.inc_by(len(events)) yield self.store.set_appservice_last_pos(upper_bound)