Use callbacks to notify tcp replication rather than deferreds
This commit is contained in:
parent
36d2b66f90
commit
1df7c28661
|
@ -163,6 +163,8 @@ class Notifier(object):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self.pending_new_room_events = []
|
self.pending_new_room_events = []
|
||||||
|
|
||||||
|
self.replication_callbacks = []
|
||||||
|
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.appservice_handler = hs.get_application_service_handler()
|
self.appservice_handler = hs.get_application_service_handler()
|
||||||
|
|
||||||
|
@ -202,6 +204,12 @@ class Notifier(object):
|
||||||
lambda: len(self.user_to_user_stream),
|
lambda: len(self.user_to_user_stream),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def add_replication_callback(self, cb):
|
||||||
|
"""Add a callback that will be called when some new data is available.
|
||||||
|
Callback is not given any arguments.
|
||||||
|
"""
|
||||||
|
self.replication_callbacks.append(cb)
|
||||||
|
|
||||||
@preserve_fn
|
@preserve_fn
|
||||||
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
|
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
|
||||||
extra_users=[]):
|
extra_users=[]):
|
||||||
|
@ -510,6 +518,9 @@ class Notifier(object):
|
||||||
self.replication_deferred = ObservableDeferred(defer.Deferred())
|
self.replication_deferred = ObservableDeferred(defer.Deferred())
|
||||||
deferred.callback(None)
|
deferred.callback(None)
|
||||||
|
|
||||||
|
for cb in self.replication_callbacks:
|
||||||
|
preserve_fn(cb)()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def wait_for_replication(self, callback, timeout):
|
def wait_for_replication(self, callback, timeout):
|
||||||
"""Wait for an event to happen.
|
"""Wait for an event to happen.
|
||||||
|
@ -550,9 +561,3 @@ class Notifier(object):
|
||||||
break
|
break
|
||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
def wait_once_for_replication(self):
|
|
||||||
"""Returns a deferred which resolves when there is new data for
|
|
||||||
replication to handle.
|
|
||||||
"""
|
|
||||||
return self.replication_deferred.observe()
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ from twisted.internet.protocol import Factory
|
||||||
from streams import STREAMS_MAP, FederationStream
|
from streams import STREAMS_MAP, FederationStream
|
||||||
from protocol import ServerReplicationStreamProtocol
|
from protocol import ServerReplicationStreamProtocol
|
||||||
|
|
||||||
from synapse.util.logcontext import preserve_fn
|
|
||||||
from synapse.util.metrics import Measure, measure_func
|
from synapse.util.metrics import Measure, measure_func
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -66,7 +65,6 @@ class ReplicationStreamer(object):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self.notifier = hs.get_notifier()
|
|
||||||
self.presence_handler = hs.get_presence_handler()
|
self.presence_handler = hs.get_presence_handler()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
|
@ -101,8 +99,7 @@ class ReplicationStreamer(object):
|
||||||
if not hs.config.send_federation:
|
if not hs.config.send_federation:
|
||||||
self.federation_sender = hs.get_federation_sender()
|
self.federation_sender = hs.get_federation_sender()
|
||||||
|
|
||||||
# Start listening for updates from the notifier
|
hs.get_notifier().add_replication_callback(self.on_notifier_poke)
|
||||||
preserve_fn(self.notifier_listener)()
|
|
||||||
|
|
||||||
# Keeps track of whether we are currently checking for updates
|
# Keeps track of whether we are currently checking for updates
|
||||||
self.is_looping = False
|
self.is_looping = False
|
||||||
|
@ -115,16 +112,6 @@ class ReplicationStreamer(object):
|
||||||
for conn in self.connections:
|
for conn in self.connections:
|
||||||
conn.send_error("server shutting down")
|
conn.send_error("server shutting down")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def notifier_listener(self):
|
|
||||||
"""Sits forever looping on the notifier waiting for new data.
|
|
||||||
"""
|
|
||||||
while True:
|
|
||||||
yield self.notifier.wait_once_for_replication()
|
|
||||||
logger.debug("Woken up by notifier")
|
|
||||||
# We need to call this each time we get woken up, as per docstring
|
|
||||||
preserve_fn(self.on_notifier_poke)()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_notifier_poke(self):
|
def on_notifier_poke(self):
|
||||||
"""Checks if there is actually any new data and sends it to the
|
"""Checks if there is actually any new data and sends it to the
|
||||||
|
|
Loading…
Reference in New Issue