Run the AS senders as background processes (#4189)

This should fix some "Starting db connection from sentinel context" warnings,
and will mean we get metrics for these processes.
This commit is contained in:
Richard van der Hoff 2018-12-04 10:53:49 +01:00 committed by GitHub
parent c03324294d
commit 52e87fbfbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 12 deletions

2
changelog.d/4189.misc Normal file
View File

@ -0,0 +1,2 @@
Run the AS senders as background processes to fix warnings

View File

@ -53,8 +53,8 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.appservice import ApplicationServiceState from synapse.appservice import ApplicationServiceState
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import run_in_background from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -104,14 +104,23 @@ class _ServiceQueuer(object):
self.clock = clock self.clock = clock
def enqueue(self, service, event): def enqueue(self, service, event):
# if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event) self.queued_events.setdefault(service.id, []).append(event)
run_in_background(self._send_request, service)
# start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight:
return
run_as_background_process(
"as-sender-%s" % (service.id, ),
self._send_request, service,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_request(self, service): def _send_request(self, service):
if service.id in self.requests_in_flight: # sanity-check: we shouldn't get here if this service already has a sender
return # running.
assert(service.id not in self.requests_in_flight)
self.requests_in_flight.add(service.id) self.requests_in_flight.add(service.id)
try: try:
@ -119,8 +128,6 @@ class _ServiceQueuer(object):
events = self.queued_events.pop(service.id, []) events = self.queued_events.pop(service.id, [])
if not events: if not events:
return return
with Measure(self.clock, "servicequeuer.send"):
try: try:
yield self.txn_ctrl.send(service, events) yield self.txn_ctrl.send(service, events)
except Exception: except Exception:
@ -223,7 +230,12 @@ class _Recoverer(object):
self.backoff_counter = 1 self.backoff_counter = 1
def recover(self): def recover(self):
self.clock.call_later((2 ** self.backoff_counter), self.retry) def _retry():
run_as_background_process(
"as-recoverer-%s" % (self.service.id,),
self.retry,
)
self.clock.call_later((2 ** self.backoff_counter), _retry)
def _backoff(self): def _backoff(self):
# cap the backoff to be around 8.5min => (2^9) = 512 secs # cap the backoff to be around 8.5min => (2^9) = 512 secs