From 5e3b254dc810c5f17f635005253a977af65e3a53 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 11 May 2015 14:37:33 +0100 Subject: [PATCH 1/3] Use wait_for_events to implement 'get_events' --- synapse/notifier.py | 115 +++++++++++++------------------------------- 1 file changed, 33 insertions(+), 82 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 78eb28e4b2..e16a4608e9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -305,14 +305,16 @@ class Notifier(object): ) @defer.inlineCallbacks - def wait_for_events(self, user, rooms, filter, timeout, callback): + def wait_for_events(self, user, rooms, timeout, callback, + from_token=StreamToken("s0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ deferred = defer.Deferred() - - from_token = StreamToken("s0", "0", "0") + appservice = yield self.hs.get_datastore().get_app_service_by_user_id( + user.to_string() + ) listener = [_NotificationListener( user=user, @@ -321,6 +323,7 @@ class Notifier(object): limit=1, timeout=timeout, deferred=deferred, + appservice=appservice, )] if timeout: @@ -363,65 +366,43 @@ class Notifier(object): defer.returnValue(result) + @defer.inlineCallbacks def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. """ - deferred = defer.Deferred() - - self._get_events( - deferred, user, rooms, pagination_config.from_token, - pagination_config.limit, timeout - ).addErrback(deferred.errback) - - return deferred - - @defer.inlineCallbacks - def _get_events(self, deferred, user, rooms, from_token, limit, timeout): + from_token = pagination_config.from_token if not from_token: from_token = yield self.event_sources.get_current_token() - appservice = yield self.hs.get_datastore().get_app_service_by_user_id( - user.to_string() - ) + limit = pagination_config.limit - listener = _NotificationListener( - user, - rooms, - from_token, - limit, - timeout, - deferred, - appservice=appservice - ) - - def _timeout_listener(): - # TODO (erikj): We should probably set to_token to the current - # max rather than reusing from_token. - # Remove the timer from the listener so we don't try to cancel it. - listener.timer = None - listener.notify( - self, - [], - listener.from_token, - listener.from_token, - ) - - if timeout: - self._register_with_keys(listener) - - yield self._check_for_updates(listener) - - if not timeout: - _timeout_listener() - else: - # Only add the timer if the listener hasn't been notified - if not listener.notified(): - listener.timer = self.clock.call_later( - timeout/1000.0, _timeout_listener + @defer.inlineCallbacks + def check_for_updates(): + events = [] + end_token = from_token + for name, source in self.event_sources.sources.items(): + keyname = "%s_key" % name + stuff, new_key = yield source.get_new_events_for_user( + user, getattr(from_token, keyname), limit, ) - return + events.extend(stuff) + end_token = from_token.copy_and_replace(keyname, new_key) + + if events: + defer.returnValue((events, (from_token, end_token))) + else: + defer.returnValue(None) + + result = yield self.wait_for_events( + user, rooms, timeout, check_for_updates, from_token=from_token + ) + + if result is None: + result = ([], (from_token, from_token)) + + defer.returnValue(result) @log_function def _register_with_keys(self, listener): @@ -436,36 +417,6 @@ class Notifier(object): listener.appservice, set() ).add(listener) - @defer.inlineCallbacks - @log_function - def _check_for_updates(self, listener): - # TODO (erikj): We need to think about limits across multiple sources - events = [] - - from_token = listener.from_token - limit = listener.limit - - # TODO (erikj): DeferredList? - for name, source in self.event_sources.sources.items(): - keyname = "%s_key" % name - - stuff, new_key = yield source.get_new_events_for_user( - listener.user, - getattr(from_token, keyname), - limit, - ) - - events.extend(stuff) - - from_token = from_token.copy_and_replace(keyname, new_key) - - end_token = from_token - - if events: - listener.notify(self, events, listener.from_token, end_token) - - defer.returnValue(listener) - def _user_joined_room(self, user, room_id): new_listeners = self.user_to_listeners.get(user, set()) From e269c511f61100bfd96bb0201db320aa6d59925c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 11 May 2015 15:01:51 +0100 Subject: [PATCH 2/3] Don't bother passing the events to the notifier since it isn't using them --- synapse/notifier.py | 113 +++++++------------------------------------- 1 file changed, 18 insertions(+), 95 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e16a4608e9..abe12b1434 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.async import run_on_reactor from synapse.types import StreamToken import synapse.metrics @@ -50,13 +51,9 @@ class _NotificationListener(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, from_token, limit, timeout, deferred, - appservice=None): + def __init__(self, user, rooms, deferred, appservice=None): self.user = user self.appservice = appservice - self.from_token = from_token - self.limit = limit - self.timeout = timeout self.deferred = deferred self.rooms = rooms self.timer = None @@ -64,17 +61,14 @@ class _NotificationListener(object): def notified(self): return self.deferred.called - def notify(self, notifier, events, start_token, end_token): + def notify(self, notifier): """ Inform whoever is listening about the new events. This will also remove this listener from all the indexes in the Notifier it knows about. """ - result = (events, (start_token, end_token)) - try: - self.deferred.callback(result) - notified_events_counter.inc_by(len(events)) + self.deferred.callback(None) except defer.AlreadyCalledError: pass @@ -161,6 +155,7 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ + yield run_on_reactor() # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -168,8 +163,6 @@ class Notifier(object): room_id = event.room_id - room_source = self.event_sources.sources["room"] - room_listeners = self.room_to_listeners.get(room_id, set()) _discard_if_notified(room_listeners) @@ -200,34 +193,12 @@ class Notifier(object): logger.debug("on_new_room_event listeners %s", listeners) - # TODO (erikj): Can we make this more efficient by hitting the - # db once? - - @defer.inlineCallbacks - def notify(listener): - events, end_key = yield room_source.get_new_events_for_user( - listener.user, - listener.from_token.room_key, - listener.limit, - ) - - if events: - end_token = listener.from_token.copy_and_replace( - "room_key", end_key - ) - - listener.notify( - self, events, listener.from_token, end_token - ) - - def eb(failure): - logger.exception("Failed to notify listener", failure) - with PreserveLoggingContext(): - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + for listener in listeners: + try: + listener.notify(self) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function @@ -237,11 +208,7 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ - # TODO(paul): This is horrible, having to manually list every event - # source here individually - presence_source = self.event_sources.sources["presence"] - typing_source = self.event_sources.sources["typing"] - + yield run_on_reactor() listeners = set() for user in users: @@ -258,51 +225,12 @@ class Notifier(object): listeners |= room_listeners - @defer.inlineCallbacks - def notify(listener): - presence_events, presence_end_key = ( - yield presence_source.get_new_events_for_user( - listener.user, - listener.from_token.presence_key, - listener.limit, - ) - ) - typing_events, typing_end_key = ( - yield typing_source.get_new_events_for_user( - listener.user, - listener.from_token.typing_key, - listener.limit, - ) - ) - - if presence_events or typing_events: - end_token = listener.from_token.copy_and_replace( - "presence_key", presence_end_key - ).copy_and_replace( - "typing_key", typing_end_key - ) - - listener.notify( - self, - presence_events + typing_events, - listener.from_token, - end_token - ) - - def eb(failure): - logger.error( - "Failed to notify listener", - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject()) - ) - with PreserveLoggingContext(): - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + for listener in listeners: + try: + listener.notify(self) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks def wait_for_events(self, user, rooms, timeout, callback, @@ -319,9 +247,6 @@ class Notifier(object): listener = [_NotificationListener( user=user, rooms=rooms, - from_token=from_token, - limit=1, - timeout=timeout, deferred=deferred, appservice=appservice, )] @@ -338,7 +263,7 @@ class Notifier(object): def _timeout_listener(): timed_out[0] = True timer[0] = None - listener[0].notify(self, [], from_token, from_token) + listener[0].notify(self) # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. @@ -350,10 +275,8 @@ class Notifier(object): listener[0] = _NotificationListener( user=user, rooms=rooms, - from_token=from_token, - limit=1, - timeout=timeout, deferred=deferred, + appservice=appservice, ) self._register_with_keys(listener[0]) result = yield callback() From 2551b6645d5d0855f72638d718ceaf365bbb5938 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 12 May 2015 11:54:18 +0100 Subject: [PATCH 3/3] Update the end_token correctly, otherwise the token doesn't advance and the client gets duplicate events --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index abe12b1434..ef7d15671f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -311,7 +311,7 @@ class Notifier(object): user, getattr(from_token, keyname), limit, ) events.extend(stuff) - end_token = from_token.copy_and_replace(keyname, new_key) + end_token = end_token.copy_and_replace(keyname, new_key) if events: defer.returnValue((events, (from_token, end_token)))