parent
f559119de0
commit
8dff6e0322
|
@ -387,19 +387,21 @@ class InitialSyncHandler(BaseHandler):
|
||||||
receipts = []
|
receipts = []
|
||||||
defer.returnValue(receipts)
|
defer.returnValue(receipts)
|
||||||
|
|
||||||
presence, receipts, (messages, token) = yield defer.gatherResults(
|
presence, receipts, (messages, token) = yield make_deferred_yieldable(
|
||||||
[
|
defer.gatherResults(
|
||||||
run_in_background(get_presence),
|
[
|
||||||
run_in_background(get_receipts),
|
run_in_background(get_presence),
|
||||||
run_in_background(
|
run_in_background(get_receipts),
|
||||||
self.store.get_recent_events_for_room,
|
run_in_background(
|
||||||
room_id,
|
self.store.get_recent_events_for_room,
|
||||||
limit=limit,
|
room_id,
|
||||||
end_token=now_token.room_key,
|
limit=limit,
|
||||||
)
|
end_token=now_token.room_key,
|
||||||
],
|
)
|
||||||
consumeErrors=True,
|
],
|
||||||
).addErrback(unwrapFirstError)
|
consumeErrors=True,
|
||||||
|
).addErrback(unwrapFirstError),
|
||||||
|
)
|
||||||
|
|
||||||
messages = yield filter_events_for_client(
|
messages = yield filter_events_for_client(
|
||||||
self.store, user_id, messages, is_peeking=is_peeking,
|
self.store, user_id, messages, is_peeking=is_peeking,
|
||||||
|
|
|
@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
|
||||||
from synapse.util.async import ObservableDeferred
|
from synapse.util.async import ObservableDeferred
|
||||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
|
@ -147,7 +147,8 @@ class _EventPeristenceQueue(object):
|
||||||
# callbacks on the deferred.
|
# callbacks on the deferred.
|
||||||
try:
|
try:
|
||||||
ret = yield per_item_callback(item)
|
ret = yield per_item_callback(item)
|
||||||
item.deferred.callback(ret)
|
with PreserveLoggingContext():
|
||||||
|
item.deferred.callback(ret)
|
||||||
except Exception:
|
except Exception:
|
||||||
item.deferred.errback()
|
item.deferred.errback()
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
if newly_inserted:
|
if newly_inserted:
|
||||||
self.runInteraction(
|
yield self.runInteraction(
|
||||||
"add_pusher",
|
"add_pusher",
|
||||||
self._invalidate_cache_and_stream,
|
self._invalidate_cache_and_stream,
|
||||||
self.get_if_user_has_pusher, (user_id,)
|
self.get_if_user_has_pusher, (user_id,)
|
||||||
|
|
Loading…
Reference in New Issue