Merge pull request #685 from matrix-org/erikj/sync_leave

Add concurrently_execute function
This commit is contained in:
Erik Johnston 2016-04-01 15:02:59 +01:00
commit a853cdec5b
4 changed files with 83 additions and 70 deletions

View File

@ -21,6 +21,7 @@ from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.types import UserID, RoomStreamToken, StreamToken from synapse.types import UserID, RoomStreamToken, StreamToken
@ -556,14 +557,7 @@ class MessageHandler(BaseHandler):
except: except:
logger.exception("Failed to get snapshot") logger.exception("Failed to get snapshot")
# Only do N rooms at once yield concurrently_execute(handle_room, room_list, 10)
n = 5
d_list = [handle_room(e) for e in room_list]
for i in range(0, len(d_list), n):
yield defer.gatherResults(
d_list[i:i + n],
consumeErrors=True
).addErrback(unwrapFirstError)
account_data_events = [] account_data_events = []
for account_data_type, content in account_data.items(): for account_data_type, content in account_data.items():

View File

@ -23,7 +23,8 @@ from synapse.api.constants import (
EventTypes, JoinRules, RoomCreationPreset, EventTypes, JoinRules, RoomCreationPreset,
) )
from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.util import stringutils, unwrapFirstError from synapse.util import stringutils
from synapse.util.async import concurrently_execute
from synapse.util.logcontext import preserve_context_over_fn from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
@ -368,6 +369,8 @@ class RoomListHandler(BaseHandler):
def _get_public_room_list(self): def _get_public_room_list(self):
room_ids = yield self.store.get_public_room_ids() room_ids = yield self.store.get_public_room_ids()
results = []
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_room(room_id): def handle_room(room_id):
aliases = yield self.store.get_aliases_for_room(room_id) aliases = yield self.store.get_aliases_for_room(room_id)
@ -428,18 +431,12 @@ class RoomListHandler(BaseHandler):
joined_users = yield self.store.get_users_in_room(room_id) joined_users = yield self.store.get_users_in_room(room_id)
result["num_joined_members"] = len(joined_users) result["num_joined_members"] = len(joined_users)
defer.returnValue(result) results.append(result)
result = [] yield concurrently_execute(handle_room, room_ids, 10)
for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
chunk_result = yield defer.gatherResults([
handle_room(room_id)
for room_id in chunk
], consumeErrors=True).addErrback(unwrapFirstError)
result.extend(v for v in chunk_result if v)
# FIXME (erikj): START is no longer a valid value # FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": result}) defer.returnValue({"start": "START", "end": "END", "chunk": results})
class RoomContextHandler(BaseHandler): class RoomContextHandler(BaseHandler):

View File

@ -17,8 +17,8 @@ from ._base import BaseHandler
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
@ -250,15 +250,13 @@ class SyncHandler(BaseHandler):
joined = [] joined = []
invited = [] invited = []
archived = [] archived = []
deferreds = []
room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)] user_id = sync_config.user.to_string()
for room_list_chunk in room_list_chunks:
for event in room_list_chunk: @defer.inlineCallbacks
def _generate_room_entry(event):
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
room_sync_deferred = preserve_fn( room_result = yield self.full_state_sync_for_joined_room(
self.full_state_sync_for_joined_room
)(
room_id=event.room_id, room_id=event.room_id,
sync_config=sync_config, sync_config=sync_config,
now_token=now_token, now_token=now_token,
@ -267,8 +265,7 @@ class SyncHandler(BaseHandler):
tags_by_room=tags_by_room, tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room, account_data_by_room=account_data_by_room,
) )
room_sync_deferred.addCallback(joined.append) joined.append(room_result)
deferreds.append(room_sync_deferred)
elif event.membership == Membership.INVITE: elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id) invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult( invited.append(InvitedSyncResult(
@ -279,15 +276,13 @@ class SyncHandler(BaseHandler):
# Always send down rooms we were banned or kicked from. # Always send down rooms we were banned or kicked from.
if not sync_config.filter_collection.include_leave: if not sync_config.filter_collection.include_leave:
if event.membership == Membership.LEAVE: if event.membership == Membership.LEAVE:
if sync_config.user.to_string() == event.sender: if user_id == event.sender:
continue return
leave_token = now_token.copy_and_replace( leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,) "room_key", "s%d" % (event.stream_ordering,)
) )
room_sync_deferred = preserve_fn( room_result = yield self.full_state_sync_for_archived_room(
self.full_state_sync_for_archived_room
)(
sync_config=sync_config, sync_config=sync_config,
room_id=event.room_id, room_id=event.room_id,
leave_event_id=event.event_id, leave_event_id=event.event_id,
@ -296,12 +291,9 @@ class SyncHandler(BaseHandler):
tags_by_room=tags_by_room, tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room, account_data_by_room=account_data_by_room,
) )
room_sync_deferred.addCallback(archived.append) archived.append(room_result)
deferreds.append(room_sync_deferred)
yield defer.gatherResults( yield concurrently_execute(_generate_room_entry, room_list, 10)
deferreds, consumeErrors=True
).addErrback(unwrapFirstError)
account_data_for_user = sync_config.filter_collection.filter_account_data( account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data) self.account_data_for_user(account_data)

View File

@ -16,7 +16,8 @@
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from .logcontext import PreserveLoggingContext from .logcontext import PreserveLoggingContext, preserve_fn
from synapse.util import unwrapFirstError
@defer.inlineCallbacks @defer.inlineCallbacks
@ -107,3 +108,32 @@ class ObservableDeferred(object):
return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % ( return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
id(self), self._result, self._deferred, id(self), self._result, self._deferred,
) )
def concurrently_execute(func, args, limit):
"""Executes the function with each argument conncurrently while limiting
the number of concurrent executions.
Args:
func (func): Function to execute, should return a deferred.
args (list): List of arguments to pass to func, each invocation of func
gets a signle argument.
limit (int): Maximum number of conccurent executions.
Returns:
deferred: Resolved when all function invocations have finished.
"""
it = iter(args)
@defer.inlineCallbacks
def _concurrently_execute_inner():
try:
while True:
yield func(it.next())
except StopIteration:
pass
return defer.gatherResults([
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True).addErrback(unwrapFirstError)