Optimise notifier mk2 (#17766)
Based on #17765. Basically the idea is to reduce the overhead of calling `ObservableDeferred` in a loop. The two gains are: a) just using a list of deferreds rather than the machinery of `ObservableDeferred`, and b) only calling `PreseverLoggingContext` once. `PreseverLoggingContext` in particular is expensive to call a lot as each time it needs to call `get_thread_resource_usage` twice, so that it an update the CPU metrics of the log context.
This commit is contained in:
parent
602956ef64
commit
ae4862c38f
|
@ -0,0 +1 @@
|
||||||
|
Increase performance of the notifier when there are many syncing users.
|
|
@ -41,6 +41,7 @@ import attr
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
from twisted.internet.defer import Deferred
|
||||||
|
|
||||||
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
|
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
|
||||||
from synapse.api.errors import AuthError
|
from synapse.api.errors import AuthError
|
||||||
|
@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
|
||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.streams.config import PaginationConfig
|
from synapse.streams.config import PaginationConfig
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
|
ISynapseReactor,
|
||||||
JsonDict,
|
JsonDict,
|
||||||
MultiWriterStreamToken,
|
MultiWriterStreamToken,
|
||||||
PersistedEventPosition,
|
PersistedEventPosition,
|
||||||
|
@ -61,7 +63,9 @@ from synapse.types import (
|
||||||
StreamToken,
|
StreamToken,
|
||||||
UserID,
|
UserID,
|
||||||
)
|
)
|
||||||
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
from synapse.util.async_helpers import (
|
||||||
|
timeout_deferred,
|
||||||
|
)
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.util.stringutils import shortstr
|
from synapse.util.stringutils import shortstr
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
@ -90,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
|
||||||
return n
|
return n
|
||||||
|
|
||||||
|
|
||||||
class _NotificationListener:
|
|
||||||
"""This represents a single client connection to the events stream.
|
|
||||||
The events stream handler will have yielded to the deferred, so to
|
|
||||||
notify the handler it is sufficient to resolve the deferred.
|
|
||||||
"""
|
|
||||||
|
|
||||||
__slots__ = ["deferred"]
|
|
||||||
|
|
||||||
def __init__(self, deferred: "defer.Deferred"):
|
|
||||||
self.deferred = deferred
|
|
||||||
|
|
||||||
|
|
||||||
class _NotifierUserStream:
|
class _NotifierUserStream:
|
||||||
"""This represents a user connected to the event stream.
|
"""This represents a user connected to the event stream.
|
||||||
It tracks the most recent stream token for that user.
|
It tracks the most recent stream token for that user.
|
||||||
|
@ -114,11 +106,13 @@ class _NotifierUserStream:
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
reactor: ISynapseReactor,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
rooms: StrCollection,
|
rooms: StrCollection,
|
||||||
current_token: StreamToken,
|
current_token: StreamToken,
|
||||||
time_now_ms: int,
|
time_now_ms: int,
|
||||||
):
|
):
|
||||||
|
self.reactor = reactor
|
||||||
self.user_id = user_id
|
self.user_id = user_id
|
||||||
self.rooms = set(rooms)
|
self.rooms = set(rooms)
|
||||||
|
|
||||||
|
@ -130,28 +124,31 @@ class _NotifierUserStream:
|
||||||
self.current_token = current_token
|
self.current_token = current_token
|
||||||
self.last_notified_ms = time_now_ms
|
self.last_notified_ms = time_now_ms
|
||||||
|
|
||||||
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
|
# Set of listeners that we need to wake up when there has been a change.
|
||||||
defer.Deferred()
|
self.listeners: Set[Deferred[StreamToken]] = set()
|
||||||
)
|
|
||||||
|
|
||||||
def notify(
|
def update_and_fetch_deferreds(
|
||||||
self,
|
self,
|
||||||
current_token: StreamToken,
|
current_token: StreamToken,
|
||||||
time_now_ms: int,
|
time_now_ms: int,
|
||||||
) -> None:
|
) -> Collection["Deferred[StreamToken]"]:
|
||||||
"""Notify any listeners for this user of a new event from an
|
"""Update the stream for this user because of a new event from an
|
||||||
event source.
|
event source, and return the set of deferreds to wake up.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
current_token: The new current token.
|
current_token: The new current token.
|
||||||
time_now_ms: The current time in milliseconds.
|
time_now_ms: The current time in milliseconds.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The set of deferreds that need to be called.
|
||||||
"""
|
"""
|
||||||
self.current_token = current_token
|
self.current_token = current_token
|
||||||
self.last_notified_ms = time_now_ms
|
self.last_notified_ms = time_now_ms
|
||||||
notify_deferred = self.notify_deferred
|
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
listeners = self.listeners
|
||||||
self.notify_deferred = ObservableDeferred(defer.Deferred())
|
self.listeners = set()
|
||||||
notify_deferred.callback(self.current_token)
|
|
||||||
|
return listeners
|
||||||
|
|
||||||
def remove(self, notifier: "Notifier") -> None:
|
def remove(self, notifier: "Notifier") -> None:
|
||||||
"""Remove this listener from all the indexes in the Notifier
|
"""Remove this listener from all the indexes in the Notifier
|
||||||
|
@ -165,9 +162,9 @@ class _NotifierUserStream:
|
||||||
notifier.user_to_user_stream.pop(self.user_id)
|
notifier.user_to_user_stream.pop(self.user_id)
|
||||||
|
|
||||||
def count_listeners(self) -> int:
|
def count_listeners(self) -> int:
|
||||||
return len(self.notify_deferred.observers())
|
return len(self.listeners)
|
||||||
|
|
||||||
def new_listener(self, token: StreamToken) -> _NotificationListener:
|
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
|
||||||
"""Returns a deferred that is resolved when there is a new token
|
"""Returns a deferred that is resolved when there is a new token
|
||||||
greater than the given token.
|
greater than the given token.
|
||||||
|
|
||||||
|
@ -177,10 +174,17 @@ class _NotifierUserStream:
|
||||||
"""
|
"""
|
||||||
# Immediately wake up stream if something has already since happened
|
# Immediately wake up stream if something has already since happened
|
||||||
# since their last token.
|
# since their last token.
|
||||||
if self.current_token != token:
|
if token != self.current_token:
|
||||||
return _NotificationListener(defer.succeed(self.current_token))
|
return defer.succeed(self.current_token)
|
||||||
else:
|
|
||||||
return _NotificationListener(self.notify_deferred.observe())
|
# Create a new deferred and add it to the set of listeners. We add a
|
||||||
|
# cancel handler to remove it from the set again, to handle timeouts.
|
||||||
|
deferred: "Deferred[StreamToken]" = Deferred(
|
||||||
|
canceller=lambda d: self.listeners.discard(d)
|
||||||
|
)
|
||||||
|
self.listeners.add(deferred)
|
||||||
|
|
||||||
|
return deferred
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
@ -233,6 +237,7 @@ class Notifier:
|
||||||
# List of callbacks to be notified when a lock is released
|
# List of callbacks to be notified when a lock is released
|
||||||
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
|
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
|
||||||
|
|
||||||
|
self.reactor = hs.get_reactor()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.appservice_handler = hs.get_application_service_handler()
|
self.appservice_handler = hs.get_application_service_handler()
|
||||||
self._pusher_pool = hs.get_pusherpool()
|
self._pusher_pool = hs.get_pusherpool()
|
||||||
|
@ -329,12 +334,20 @@ class Notifier:
|
||||||
user_streams = self.room_to_user_streams.get(room_id, set())
|
user_streams = self.room_to_user_streams.get(room_id, set())
|
||||||
time_now_ms = self.clock.time_msec()
|
time_now_ms = self.clock.time_msec()
|
||||||
current_token = self.event_sources.get_current_token()
|
current_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
listeners: List["Deferred[StreamToken]"] = []
|
||||||
for user_stream in user_streams:
|
for user_stream in user_streams:
|
||||||
try:
|
try:
|
||||||
user_stream.notify(current_token, time_now_ms)
|
listeners.extend(
|
||||||
|
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to notify listener")
|
logger.exception("Failed to notify listener")
|
||||||
|
|
||||||
|
with PreserveLoggingContext():
|
||||||
|
for listener in listeners:
|
||||||
|
listener.callback(current_token)
|
||||||
|
|
||||||
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
|
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
|
||||||
len(user_streams)
|
len(user_streams)
|
||||||
)
|
)
|
||||||
|
@ -538,12 +551,24 @@ class Notifier:
|
||||||
|
|
||||||
time_now_ms = self.clock.time_msec()
|
time_now_ms = self.clock.time_msec()
|
||||||
current_token = self.event_sources.get_current_token()
|
current_token = self.event_sources.get_current_token()
|
||||||
|
listeners: List["Deferred[StreamToken]"] = []
|
||||||
for user_stream in user_streams:
|
for user_stream in user_streams:
|
||||||
try:
|
try:
|
||||||
user_stream.notify(current_token, time_now_ms)
|
listeners.extend(
|
||||||
|
user_stream.update_and_fetch_deferreds(
|
||||||
|
current_token, time_now_ms
|
||||||
|
)
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to notify listener")
|
logger.exception("Failed to notify listener")
|
||||||
|
|
||||||
|
# We resolve all these deferreds in one go so that we only need to
|
||||||
|
# call `PreserveLoggingContext` once, as it has a bunch of overhead
|
||||||
|
# (to calculate performance stats)
|
||||||
|
with PreserveLoggingContext():
|
||||||
|
for listener in listeners:
|
||||||
|
listener.callback(current_token)
|
||||||
|
|
||||||
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
||||||
|
|
||||||
self.notify_replication()
|
self.notify_replication()
|
||||||
|
@ -582,6 +607,7 @@ class Notifier:
|
||||||
if room_ids is None:
|
if room_ids is None:
|
||||||
room_ids = await self.store.get_rooms_for_user(user_id)
|
room_ids = await self.store.get_rooms_for_user(user_id)
|
||||||
user_stream = _NotifierUserStream(
|
user_stream = _NotifierUserStream(
|
||||||
|
reactor=self.reactor,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
rooms=room_ids,
|
rooms=room_ids,
|
||||||
current_token=current_token,
|
current_token=current_token,
|
||||||
|
@ -604,8 +630,8 @@ class Notifier:
|
||||||
# Now we wait for the _NotifierUserStream to be told there
|
# Now we wait for the _NotifierUserStream to be told there
|
||||||
# is a new token.
|
# is a new token.
|
||||||
listener = user_stream.new_listener(prev_token)
|
listener = user_stream.new_listener(prev_token)
|
||||||
listener.deferred = timeout_deferred(
|
listener = timeout_deferred(
|
||||||
listener.deferred,
|
listener,
|
||||||
(end_time - now) / 1000.0,
|
(end_time - now) / 1000.0,
|
||||||
self.hs.get_reactor(),
|
self.hs.get_reactor(),
|
||||||
)
|
)
|
||||||
|
@ -618,7 +644,7 @@ class Notifier:
|
||||||
)
|
)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
await listener.deferred
|
await listener
|
||||||
|
|
||||||
log_kv(
|
log_kv(
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue