diff --git a/synapse/notifier.py b/synapse/notifier.py index 744cbddfa3..88f531182a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -41,6 +41,7 @@ import attr from prometheus_client import Counter from twisted.internet import defer +from twisted.internet.defer import Deferred from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership from synapse.api.errors import AuthError @@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig from synapse.types import ( + ISynapseReactor, JsonDict, MultiWriterStreamToken, PersistedEventPosition, @@ -61,7 +63,9 @@ from synapse.types import ( StreamToken, UserID, ) -from synapse.util.async_helpers import ObservableDeferred, timeout_deferred +from synapse.util.async_helpers import ( + timeout_deferred, +) from synapse.util.metrics import Measure from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_client @@ -90,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int: return n -class _NotificationListener: - """This represents a single client connection to the events stream. - The events stream handler will have yielded to the deferred, so to - notify the handler it is sufficient to resolve the deferred. - """ - - __slots__ = ["deferred"] - - def __init__(self, deferred: "defer.Deferred"): - self.deferred = deferred - - class _NotifierUserStream: """This represents a user connected to the event stream. It tracks the most recent stream token for that user. @@ -114,11 +106,13 @@ class _NotifierUserStream: def __init__( self, + reactor: ISynapseReactor, user_id: str, rooms: StrCollection, current_token: StreamToken, time_now_ms: int, ): + self.reactor = reactor self.user_id = user_id self.rooms = set(rooms) @@ -130,28 +124,31 @@ class _NotifierUserStream: self.current_token = current_token self.last_notified_ms = time_now_ms - self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred( - defer.Deferred() - ) + # Set of listeners that we need to wake up when there has been a change. + self.listeners: Set[Deferred[StreamToken]] = set() - def notify( + def update_and_fetch_deferreds( self, current_token: StreamToken, time_now_ms: int, - ) -> None: - """Notify any listeners for this user of a new event from an - event source. + ) -> Collection["Deferred[StreamToken]"]: + """Update the stream for this user because of a new event from an + event source, and return the set of deferreds to wake up. + Args: current_token: The new current token. time_now_ms: The current time in milliseconds. + + Returns: + The set of deferreds that need to be called. """ self.current_token = current_token self.last_notified_ms = time_now_ms - notify_deferred = self.notify_deferred - with PreserveLoggingContext(): - self.notify_deferred = ObservableDeferred(defer.Deferred()) - notify_deferred.callback(self.current_token) + listeners = self.listeners + self.listeners = set() + + return listeners def remove(self, notifier: "Notifier") -> None: """Remove this listener from all the indexes in the Notifier @@ -165,9 +162,9 @@ class _NotifierUserStream: notifier.user_to_user_stream.pop(self.user_id) def count_listeners(self) -> int: - return len(self.notify_deferred.observers()) + return len(self.listeners) - def new_listener(self, token: StreamToken) -> _NotificationListener: + def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]": """Returns a deferred that is resolved when there is a new token greater than the given token. @@ -177,10 +174,17 @@ class _NotifierUserStream: """ # Immediately wake up stream if something has already since happened # since their last token. - if self.current_token != token: - return _NotificationListener(defer.succeed(self.current_token)) - else: - return _NotificationListener(self.notify_deferred.observe()) + if token != self.current_token: + return defer.succeed(self.current_token) + + # Create a new deferred and add it to the set of listeners. We add a + # cancel handler to remove it from the set again, to handle timeouts. + deferred: "Deferred[StreamToken]" = Deferred( + canceller=lambda d: self.listeners.discard(d) + ) + self.listeners.add(deferred) + + return deferred @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -233,6 +237,7 @@ class Notifier: # List of callbacks to be notified when a lock is released self._lock_released_callback: List[Callable[[str, str, str], None]] = [] + self.reactor = hs.get_reactor() self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() self._pusher_pool = hs.get_pusherpool() @@ -329,12 +334,20 @@ class Notifier: user_streams = self.room_to_user_streams.get(room_id, set()) time_now_ms = self.clock.time_msec() current_token = self.event_sources.get_current_token() + + listeners: List["Deferred[StreamToken]"] = [] for user_stream in user_streams: try: - user_stream.notify(current_token, time_now_ms) + listeners.extend( + user_stream.update_and_fetch_deferreds(current_token, time_now_ms) + ) except Exception: logger.exception("Failed to notify listener") + with PreserveLoggingContext(): + for listener in listeners: + listener.callback(current_token) + users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc( len(user_streams) ) @@ -538,12 +551,24 @@ class Notifier: time_now_ms = self.clock.time_msec() current_token = self.event_sources.get_current_token() + listeners: List["Deferred[StreamToken]"] = [] for user_stream in user_streams: try: - user_stream.notify(current_token, time_now_ms) + listeners.extend( + user_stream.update_and_fetch_deferreds( + current_token, time_now_ms + ) + ) except Exception: logger.exception("Failed to notify listener") + # We resolve all these deferreds in one go so that we only need to + # call `PreserveLoggingContext` once, as it has a bunch of overhead + # (to calculate performance stats) + with PreserveLoggingContext(): + for listener in listeners: + listener.callback(current_token) + users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams)) self.notify_replication() @@ -582,6 +607,7 @@ class Notifier: if room_ids is None: room_ids = await self.store.get_rooms_for_user(user_id) user_stream = _NotifierUserStream( + reactor=self.reactor, user_id=user_id, rooms=room_ids, current_token=current_token, @@ -604,8 +630,8 @@ class Notifier: # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - listener.deferred = timeout_deferred( - listener.deferred, + listener = timeout_deferred( + listener, (end_time - now) / 1000.0, self.hs.get_reactor(), ) @@ -618,7 +644,7 @@ class Notifier: ) with PreserveLoggingContext(): - await listener.deferred + await listener log_kv( {