From 8e49892b218fa10313b2502b4913ae8416321051 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Feb 2016 11:41:04 +0000 Subject: [PATCH] Only calculate initial sync for 10 rooms at a time This helps to ensure we don't completely starve other requests. --- synapse/handlers/sync.py | 53 ++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ddeed27965..84f29e3867 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,7 +18,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from twisted.internet import defer @@ -228,10 +228,14 @@ class SyncHandler(BaseHandler): invited = [] archived = [] deferreds = [] - for event in room_list: - if event.membership == Membership.JOIN: - with PreserveLoggingContext(LoggingContext.current_context()): - room_sync_deferred = self.full_state_sync_for_joined_room( + + room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)] + for room_list_chunk in room_list_chunks: + for event in room_list_chunk: + if event.membership == Membership.JOIN: + room_sync_deferred = preserve_fn( + self.full_state_sync_for_joined_room + )( room_id=event.room_id, sync_config=sync_config, now_token=now_token, @@ -240,20 +244,21 @@ class SyncHandler(BaseHandler): tags_by_room=tags_by_room, account_data_by_room=account_data_by_room, ) - room_sync_deferred.addCallback(joined.append) - deferreds.append(room_sync_deferred) - elif event.membership == Membership.INVITE: - invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) - elif event.membership in (Membership.LEAVE, Membership.BAN): - leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) - ) - with PreserveLoggingContext(LoggingContext.current_context()): - room_sync_deferred = self.full_state_sync_for_archived_room( + room_sync_deferred.addCallback(joined.append) + deferreds.append(room_sync_deferred) + elif event.membership == Membership.INVITE: + invite = yield self.store.get_event(event.event_id) + invited.append(InvitedSyncResult( + room_id=event.room_id, + invite=invite, + )) + elif event.membership in (Membership.LEAVE, Membership.BAN): + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + room_sync_deferred = preserve_fn( + self.full_state_sync_for_archived_room + )( sync_config=sync_config, room_id=event.room_id, leave_event_id=event.event_id, @@ -262,12 +267,12 @@ class SyncHandler(BaseHandler): tags_by_room=tags_by_room, account_data_by_room=account_data_by_room, ) - room_sync_deferred.addCallback(archived.append) - deferreds.append(room_sync_deferred) + room_sync_deferred.addCallback(archived.append) + deferreds.append(room_sync_deferred) - yield defer.gatherResults( - deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + yield defer.gatherResults( + deferreds, consumeErrors=True + ).addErrback(unwrapFirstError) account_data_for_user = sync_config.filter_collection.filter_account_data( self.account_data_for_user(account_data)