Fix logcontexts for running pushers
First of all, avoid resetting the logcontext before running the pushers, to fix the "Starting db txn 'get_all_updated_receipts' from sentinel context" warning. Instead, give them their own "background process" logcontexts.
This commit is contained in:
parent
b4d6db5c4a
commit
66f7dc8c87
|
@ -162,11 +162,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
|||
else:
|
||||
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
elif stream_name == "events":
|
||||
yield self.pusher_pool.on_new_notifications(
|
||||
self.pusher_pool.on_new_notifications(
|
||||
token, token,
|
||||
)
|
||||
elif stream_name == "receipts":
|
||||
yield self.pusher_pool.on_new_receipts(
|
||||
self.pusher_pool.on_new_receipts(
|
||||
token, token, set(row.room_id for row in rows)
|
||||
)
|
||||
except Exception:
|
||||
|
|
|
@ -2386,8 +2386,7 @@ class FederationHandler(BaseHandler):
|
|||
extra_users=extra_users
|
||||
)
|
||||
|
||||
logcontext.run_in_background(
|
||||
self.pusher_pool.on_new_notifications,
|
||||
self.pusher_pool.on_new_notifications(
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
|
|
|
@ -774,11 +774,8 @@ class EventCreationHandler(object):
|
|||
event, context=context
|
||||
)
|
||||
|
||||
# this intentionally does not yield: we don't care about the result
|
||||
# and don't need to wait for it.
|
||||
run_in_background(
|
||||
self.pusher_pool.on_new_notifications,
|
||||
event_stream_id, max_stream_id
|
||||
self.pusher_pool.on_new_notifications(
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
def _notify():
|
||||
|
|
|
@ -18,7 +18,6 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
|
@ -116,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
|
|||
|
||||
affected_room_ids = list(set([r["room_id"] for r in receipts]))
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_event(
|
||||
"receipt_key", max_batch_id, rooms=affected_room_ids
|
||||
)
|
||||
# Note that the min here shouldn't be relied upon to be accurate.
|
||||
self.hs.get_pusherpool().on_new_receipts(
|
||||
min_batch_id, max_batch_id, affected_room_ids
|
||||
)
|
||||
self.notifier.on_new_event(
|
||||
"receipt_key", max_batch_id, rooms=affected_room_ids
|
||||
)
|
||||
# Note that the min here shouldn't be relied upon to be accurate.
|
||||
self.hs.get_pusherpool().on_new_receipts(
|
||||
min_batch_id, max_batch_id, affected_room_ids,
|
||||
)
|
||||
|
||||
defer.returnValue(True)
|
||||
defer.returnValue(True)
|
||||
|
||||
@logcontext.preserve_fn # caller should not yield on this
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -18,6 +18,7 @@ import logging
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.push.pusher import PusherFactory
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
|
@ -122,8 +123,14 @@ class PusherPool:
|
|||
p['app_id'], p['pushkey'], p['user_name'],
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
run_as_background_process(
|
||||
"on_new_notifications",
|
||||
self._on_new_notifications, min_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
try:
|
||||
users_affected = yield self.store.get_push_action_users_in_range(
|
||||
min_stream_id, max_stream_id
|
||||
|
@ -147,8 +154,14 @@ class PusherPool:
|
|||
except Exception:
|
||||
logger.exception("Exception in pusher on_new_notifications")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
|
||||
run_as_background_process(
|
||||
"on_new_receipts",
|
||||
self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
|
||||
try:
|
||||
# Need to subtract 1 from the minimum because the lower bound here
|
||||
# is not inclusive
|
||||
|
|
Loading…
Reference in New Issue