Merge pull request #3591 from matrix-org/rav/logcontext_fixes

Logcontext fixes
This commit is contained in:
Richard van der Hoff 2018-07-24 11:11:27 +01:00 committed by GitHub
commit 69292de6cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 30 additions and 21 deletions

1
changelog.d/3591.misc Normal file
View File

@ -0,0 +1 @@
Fix some random logcontext leaks.

View File

@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse import synapse
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
yield self._check_user_exists(event.state_key) yield self._check_user_exists(event.state_key)
if not self.started_scheduler: if not self.started_scheduler:
self.scheduler.start().addErrback(log_failure) def start_scheduler():
return self.scheduler.start().addErrback(log_failure)
run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True self.started_scheduler = True
# Fork off pushes to these services # Fork off pushes to these services

View File

@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
try: try:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
room_end_token = now_token.room_key room_end_token = now_token.room_key
deferred_room_state = self.state_handler.get_current_state( deferred_room_state = run_in_background(
event.room_id self.state_handler.get_current_state,
event.room_id,
) )
elif event.membership == Membership.LEAVE: elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,) room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = self.store.get_state_for_events( deferred_room_state = run_in_background(
[event.event_id], None self.store.get_state_for_events,
[event.event_id], None,
) )
deferred_room_state.addCallback( deferred_room_state.addCallback(
lambda states: states[event.event_id] lambda states: states[event.event_id]
@ -387,7 +389,8 @@ class InitialSyncHandler(BaseHandler):
receipts = [] receipts = []
defer.returnValue(receipts) defer.returnValue(receipts)
presence, receipts, (messages, token) = yield defer.gatherResults( presence, receipts, (messages, token) = yield make_deferred_yieldable(
defer.gatherResults(
[ [
run_in_background(get_presence), run_in_background(get_presence),
run_in_background(get_receipts), run_in_background(get_receipts),
@ -399,7 +402,8 @@ class InitialSyncHandler(BaseHandler):
) )
], ],
consumeErrors=True, consumeErrors=True,
).addErrback(unwrapFirstError) ).addErrback(unwrapFirstError),
)
messages = yield filter_events_for_client( messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking, self.store, user_id, messages, is_peeking=is_peeking,

View File

@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -147,6 +147,7 @@ class _EventPeristenceQueue(object):
# callbacks on the deferred. # callbacks on the deferred.
try: try:
ret = yield per_item_callback(item) ret = yield per_item_callback(item)
with PreserveLoggingContext():
item.deferred.callback(ret) item.deferred.callback(ret)
except Exception: except Exception:
item.deferred.errback() item.deferred.errback()

View File

@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
) )
if newly_inserted: if newly_inserted:
self.runInteraction( yield self.runInteraction(
"add_pusher", "add_pusher",
self._invalidate_cache_and_stream, self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,) self.get_if_user_has_pusher, (user_id,)