Merge pull request #437 from matrix-org/markjh/parallel_sync
Do the /sync in parallel across the rooms like /initialSync does
This commit is contained in:
commit
a874c0894a
|
@ -17,6 +17,7 @@ from ._base import BaseHandler
|
||||||
|
|
||||||
from synapse.streams.config import PaginationConfig
|
from synapse.streams.config import PaginationConfig
|
||||||
from synapse.api.constants import Membership, EventTypes
|
from synapse.api.constants import Membership, EventTypes
|
||||||
|
from synapse.util import unwrapFirstError
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
@ -209,9 +210,10 @@ class SyncHandler(BaseHandler):
|
||||||
joined = []
|
joined = []
|
||||||
invited = []
|
invited = []
|
||||||
archived = []
|
archived = []
|
||||||
|
deferreds = []
|
||||||
for event in room_list:
|
for event in room_list:
|
||||||
if event.membership == Membership.JOIN:
|
if event.membership == Membership.JOIN:
|
||||||
room_sync = yield self.full_state_sync_for_joined_room(
|
room_sync_deferred = self.full_state_sync_for_joined_room(
|
||||||
room_id=event.room_id,
|
room_id=event.room_id,
|
||||||
sync_config=sync_config,
|
sync_config=sync_config,
|
||||||
now_token=now_token,
|
now_token=now_token,
|
||||||
|
@ -220,7 +222,8 @@ class SyncHandler(BaseHandler):
|
||||||
tags_by_room=tags_by_room,
|
tags_by_room=tags_by_room,
|
||||||
account_data_by_room=account_data_by_room,
|
account_data_by_room=account_data_by_room,
|
||||||
)
|
)
|
||||||
joined.append(room_sync)
|
room_sync_deferred.addCallback(joined.append)
|
||||||
|
deferreds.append(room_sync_deferred)
|
||||||
elif event.membership == Membership.INVITE:
|
elif event.membership == Membership.INVITE:
|
||||||
invite = yield self.store.get_event(event.event_id)
|
invite = yield self.store.get_event(event.event_id)
|
||||||
invited.append(InvitedSyncResult(
|
invited.append(InvitedSyncResult(
|
||||||
|
@ -231,7 +234,7 @@ class SyncHandler(BaseHandler):
|
||||||
leave_token = now_token.copy_and_replace(
|
leave_token = now_token.copy_and_replace(
|
||||||
"room_key", "s%d" % (event.stream_ordering,)
|
"room_key", "s%d" % (event.stream_ordering,)
|
||||||
)
|
)
|
||||||
room_sync = yield self.full_state_sync_for_archived_room(
|
room_sync_deferred = self.full_state_sync_for_archived_room(
|
||||||
sync_config=sync_config,
|
sync_config=sync_config,
|
||||||
room_id=event.room_id,
|
room_id=event.room_id,
|
||||||
leave_event_id=event.event_id,
|
leave_event_id=event.event_id,
|
||||||
|
@ -240,7 +243,12 @@ class SyncHandler(BaseHandler):
|
||||||
tags_by_room=tags_by_room,
|
tags_by_room=tags_by_room,
|
||||||
account_data_by_room=account_data_by_room,
|
account_data_by_room=account_data_by_room,
|
||||||
)
|
)
|
||||||
archived.append(room_sync)
|
room_sync_deferred.addCallback(archived.append)
|
||||||
|
deferreds.append(room_sync_deferred)
|
||||||
|
|
||||||
|
yield defer.gatherResults(
|
||||||
|
deferreds, consumeErrors=True
|
||||||
|
).addErrback(unwrapFirstError)
|
||||||
|
|
||||||
defer.returnValue(SyncResult(
|
defer.returnValue(SyncResult(
|
||||||
presence=presence,
|
presence=presence,
|
||||||
|
|
Loading…
Reference in New Issue