diff --git a/synapse/notifier.py b/synapse/notifier.py index 7282dfd7f3..4ebe1d66de 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function +from synapse.util.async import run_on_reactor from synapse.types import StreamToken import synapse.metrics @@ -49,13 +50,9 @@ class _NotificationListener(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, from_token, limit, timeout, deferred, - appservice=None): + def __init__(self, user, rooms, deferred, appservice=None): self.user = user self.appservice = appservice - self.from_token = from_token - self.limit = limit - self.timeout = timeout self.deferred = deferred self.rooms = rooms self.timer = None @@ -63,17 +60,14 @@ class _NotificationListener(object): def notified(self): return self.deferred.called - def notify(self, notifier, events, start_token, end_token): + def notify(self, notifier): """ Inform whoever is listening about the new events. This will also remove this listener from all the indexes in the Notifier it knows about. """ - result = (events, (start_token, end_token)) - try: - self.deferred.callback(result) - notified_events_counter.inc_by(len(events)) + self.deferred.callback(None) except defer.AlreadyCalledError: pass @@ -160,6 +154,7 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ + yield run_on_reactor() # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -167,8 +162,6 @@ class Notifier(object): room_id = event.room_id - room_source = self.event_sources.sources["room"] - room_listeners = self.room_to_listeners.get(room_id, set()) _discard_if_notified(room_listeners) @@ -199,33 +192,11 @@ class Notifier(object): logger.debug("on_new_room_event listeners %s", listeners) - # TODO (erikj): Can we make this more efficient by hitting the - # db once? - - @defer.inlineCallbacks - def notify(listener): - events, end_key = yield room_source.get_new_events_for_user( - listener.user, - listener.from_token.room_key, - listener.limit, - ) - - if events: - end_token = listener.from_token.copy_and_replace( - "room_key", end_key - ) - - listener.notify( - self, events, listener.from_token, end_token - ) - - def eb(failure): - logger.exception("Failed to notify listener", failure) - - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + for listener in listeners: + try: + listener.notify(self) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function @@ -235,11 +206,7 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ - # TODO(paul): This is horrible, having to manually list every event - # source here individually - presence_source = self.event_sources.sources["presence"] - typing_source = self.event_sources.sources["typing"] - + yield run_on_reactor() listeners = set() for user in users: @@ -256,68 +223,29 @@ class Notifier(object): listeners |= room_listeners - @defer.inlineCallbacks - def notify(listener): - presence_events, presence_end_key = ( - yield presence_source.get_new_events_for_user( - listener.user, - listener.from_token.presence_key, - listener.limit, - ) - ) - typing_events, typing_end_key = ( - yield typing_source.get_new_events_for_user( - listener.user, - listener.from_token.typing_key, - listener.limit, - ) - ) - - if presence_events or typing_events: - end_token = listener.from_token.copy_and_replace( - "presence_key", presence_end_key - ).copy_and_replace( - "typing_key", typing_end_key - ) - - listener.notify( - self, - presence_events + typing_events, - listener.from_token, - end_token - ) - - def eb(failure): - logger.error( - "Failed to notify listener", - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject()) - ) - - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + for listener in listeners: + try: + listener.notify(self) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks - def wait_for_events(self, user, rooms, filter, timeout, callback): + def wait_for_events(self, user, rooms, timeout, callback, + from_token=StreamToken("s0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ deferred = defer.Deferred() - - from_token = StreamToken("s0", "0", "0") + appservice = yield self.hs.get_datastore().get_app_service_by_user_id( + user.to_string() + ) listener = [_NotificationListener( user=user, rooms=rooms, - from_token=from_token, - limit=1, - timeout=timeout, deferred=deferred, + appservice=appservice, )] if timeout: @@ -332,7 +260,7 @@ class Notifier(object): def _timeout_listener(): timed_out[0] = True timer[0] = None - listener[0].notify(self, [], from_token, from_token) + listener[0].notify(self) # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. @@ -344,10 +272,8 @@ class Notifier(object): listener[0] = _NotificationListener( user=user, rooms=rooms, - from_token=from_token, - limit=1, - timeout=timeout, deferred=deferred, + appservice=appservice, ) self._register_with_keys(listener[0]) result = yield callback() @@ -360,65 +286,43 @@ class Notifier(object): defer.returnValue(result) + @defer.inlineCallbacks def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. """ - deferred = defer.Deferred() - - self._get_events( - deferred, user, rooms, pagination_config.from_token, - pagination_config.limit, timeout - ).addErrback(deferred.errback) - - return deferred - - @defer.inlineCallbacks - def _get_events(self, deferred, user, rooms, from_token, limit, timeout): + from_token = pagination_config.from_token if not from_token: from_token = yield self.event_sources.get_current_token() - appservice = yield self.hs.get_datastore().get_app_service_by_user_id( - user.to_string() - ) + limit = pagination_config.limit - listener = _NotificationListener( - user, - rooms, - from_token, - limit, - timeout, - deferred, - appservice=appservice - ) - - def _timeout_listener(): - # TODO (erikj): We should probably set to_token to the current - # max rather than reusing from_token. - # Remove the timer from the listener so we don't try to cancel it. - listener.timer = None - listener.notify( - self, - [], - listener.from_token, - listener.from_token, - ) - - if timeout: - self._register_with_keys(listener) - - yield self._check_for_updates(listener) - - if not timeout: - _timeout_listener() - else: - # Only add the timer if the listener hasn't been notified - if not listener.notified(): - listener.timer = self.clock.call_later( - timeout/1000.0, _timeout_listener + @defer.inlineCallbacks + def check_for_updates(): + events = [] + end_token = from_token + for name, source in self.event_sources.sources.items(): + keyname = "%s_key" % name + stuff, new_key = yield source.get_new_events_for_user( + user, getattr(from_token, keyname), limit, ) - return + events.extend(stuff) + end_token = end_token.copy_and_replace(keyname, new_key) + + if events: + defer.returnValue((events, (from_token, end_token))) + else: + defer.returnValue(None) + + result = yield self.wait_for_events( + user, rooms, timeout, check_for_updates, from_token=from_token + ) + + if result is None: + result = ([], (from_token, from_token)) + + defer.returnValue(result) @log_function def _register_with_keys(self, listener): @@ -433,36 +337,6 @@ class Notifier(object): listener.appservice, set() ).add(listener) - @defer.inlineCallbacks - @log_function - def _check_for_updates(self, listener): - # TODO (erikj): We need to think about limits across multiple sources - events = [] - - from_token = listener.from_token - limit = listener.limit - - # TODO (erikj): DeferredList? - for name, source in self.event_sources.sources.items(): - keyname = "%s_key" % name - - stuff, new_key = yield source.get_new_events_for_user( - listener.user, - getattr(from_token, keyname), - limit, - ) - - events.extend(stuff) - - from_token = from_token.copy_and_replace(keyname, new_key) - - end_token = from_token - - if events: - listener.notify(self, events, listener.from_token, end_token) - - defer.returnValue(listener) - def _user_joined_room(self, user, room_id): new_listeners = self.user_to_listeners.get(user, set())