Merge pull request #2979 from matrix-org/erikj/no_handlers

Don't build handlers on workers unnecessarily
This commit is contained in:
Erik Johnston 2018-03-13 13:46:38 +00:00 committed by GitHub
commit 56e709857c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 41 additions and 112 deletions

View File

@ -156,7 +156,6 @@ def start(config_options):
) )
ss.setup() ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def start(): def start():

View File

@ -161,7 +161,6 @@ def start(config_options):
) )
ss.setup() ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def start(): def start():

View File

@ -144,7 +144,6 @@ def start(config_options):
) )
ss.setup() ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def start(): def start():

View File

@ -211,7 +211,6 @@ def start(config_options):
) )
ss.setup() ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def start(): def start():

View File

@ -348,7 +348,7 @@ def setup(config_options):
hs.get_state_handler().start_caching() hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling() hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates() hs.get_datastore().start_doing_background_updates()
hs.get_replication_layer().start_get_pdu_cache() hs.get_replication_client().start_get_pdu_cache()
register_memory_metrics(hs) register_memory_metrics(hs)

View File

@ -158,7 +158,6 @@ def start(config_options):
) )
ss.setup() ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def start(): def start():

View File

@ -15,11 +15,3 @@
""" This package includes all the federation specific logic. """ This package includes all the federation specific logic.
""" """
from .replication import ReplicationLayer
def initialize_http_replication(hs):
transport = hs.get_federation_transport_client()
return ReplicationLayer(hs, transport)

View File

@ -54,27 +54,19 @@ class FederationServer(FederationBase):
super(FederationServer, self).__init__(hs) super(FederationServer, self).__init__(hs)
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self._server_linearizer = async.Linearizer("fed_server") self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler") self._transaction_linearizer = async.Linearizer("fed_txn_handler")
self.transaction_actions = TransactionActions(self.store) self.transaction_actions = TransactionActions(self.store)
self.handler = None
self.registry = hs.get_federation_registry() self.registry = hs.get_federation_registry()
# We cache responses to state queries, as they take a while and often # We cache responses to state queries, as they take a while and often
# come in waves. # come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are
documented on :py:class:`.ReplicationHandler`.
"""
self.handler = handler
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_backfill_request(self, origin, room_id, versions, limit): def on_backfill_request(self, origin, room_id, versions, limit):

View File

@ -1,51 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This layer is responsible for replicating with remote home servers using
a given transport.
"""
from .federation_client import FederationClient
from .federation_server import FederationServer
import logging
logger = logging.getLogger(__name__)
class ReplicationLayer(FederationClient, FederationServer):
"""This layer is responsible for replicating with remote home servers over
the given transport. I.e., does the sending and receiving of PDUs to
remote home servers.
The layer communicates with the rest of the server via a registered
ReplicationHandler.
In more detail, the layer:
* Receives incoming data and processes it into transactions and pdus.
* Fetches any PDUs it thinks it might have missed.
* Keeps the current state for contexts up to date by applying the
suitable conflict resolution.
* Sends outgoing pdus wrapped in transactions.
* Fills out the references to previous pdus/transactions appropriately
for outgoing data.
"""
def __init__(self, hs, transport_layer):
super(ReplicationLayer, self).__init__(hs)
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name

View File

@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = (
def register_servlets(hs, resource, authenticator, ratelimiter): def register_servlets(hs, resource, authenticator, ratelimiter):
for servletclass in FEDERATION_SERVLET_CLASSES: for servletclass in FEDERATION_SERVLET_CLASSES:
servletclass( servletclass(
handler=hs.get_replication_layer(), handler=hs.get_replication_server(),
authenticator=authenticator, authenticator=authenticator,
ratelimiter=ratelimiter, ratelimiter=ratelimiter,
server_name=hs.hostname, server_name=hs.hostname,

View File

@ -37,7 +37,6 @@ class DeviceHandler(BaseHandler):
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler() self._auth_handler = hs.get_auth_handler()
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self.federation = hs.get_replication_layer()
self._edu_updater = DeviceListEduUpdater(hs, self) self._edu_updater = DeviceListEduUpdater(hs, self)
@ -432,7 +431,7 @@ class DeviceListEduUpdater(object):
def __init__(self, hs, device_handler): def __init__(self, hs, device_handler):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_client()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.device_handler = device_handler self.device_handler = device_handler

View File

@ -36,7 +36,7 @@ class DirectoryHandler(BaseHandler):
self.appservice_handler = hs.get_application_service_handler() self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_client()
hs.get_federation_registry().register_query_handler( hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query "directory", self.on_directory_query
) )

View File

@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
class E2eKeysHandler(object): class E2eKeysHandler(object):
def __init__(self, hs): def __init__(self, hs):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_client()
self.device_handler = hs.get_device_handler() self.device_handler = hs.get_device_handler()
self.is_mine = hs.is_mine self.is_mine = hs.is_mine
self.clock = hs.get_clock() self.clock = hs.get_clock()

View File

@ -68,7 +68,7 @@ class FederationHandler(BaseHandler):
self.hs = hs self.hs = hs
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.replication_layer = hs.get_replication_layer() self.replication_layer = hs.get_replication_client()
self.state_handler = hs.get_state_handler() self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname self.server_name = hs.hostname
self.keyring = hs.get_keyring() self.keyring = hs.get_keyring()
@ -78,8 +78,6 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker() self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
self.replication_layer.set_handler(self)
# When joining a room we need to queue any events for that room up # When joining a room we need to queue any events for that room up
self.room_queues = {} self.room_queues = {}
self._room_pdu_linearizer = Linearizer("fed_room_pdu") self._room_pdu_linearizer = Linearizer("fed_room_pdu")

View File

@ -93,7 +93,6 @@ class PresenceHandler(object):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.wheel_timer = WheelTimer() self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.replication = hs.get_replication_layer()
self.federation = hs.get_federation_sender() self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()

View File

@ -31,7 +31,7 @@ class ProfileHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(ProfileHandler, self).__init__(hs) super(ProfileHandler, self).__init__(hs)
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_client()
hs.get_federation_registry().register_query_handler( hs.get_federation_registry().register_query_handler(
"profile", self.on_profile_query "profile", self.on_profile_query
) )

View File

@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler):
def _get_remote_list_cached(self, server_name, limit=None, since_token=None, def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False, search_filter=None, include_all_networks=False,
third_party_instance_id=None,): third_party_instance_id=None,):
repl_layer = self.hs.get_replication_layer() repl_layer = self.hs.get_replication_client()
if search_filter: if search_filter:
# We can't cache when asking for search # We can't cache when asking for search
return repl_layer.get_public_rooms( return repl_layer.get_public_rooms(

View File

@ -55,7 +55,6 @@ class RoomMemberHandler(object):
self.registration_handler = hs.get_handlers().registration_handler self.registration_handler = hs.get_handlers().registration_handler
self.profile_handler = hs.get_profile_handler() self.profile_handler = hs.get_profile_handler()
self.event_creation_hander = hs.get_event_creation_handler() self.event_creation_hander = hs.get_event_creation_handler()
self.replication_layer = hs.get_replication_layer()
self.member_linearizer = Linearizer(name="member") self.member_linearizer = Linearizer(name="member")
@ -212,7 +211,7 @@ class RoomMemberHandler(object):
# if this is a join with a 3pid signature, we may need to turn a 3pid # if this is a join with a 3pid signature, we may need to turn a 3pid
# invite into a normal invite before we can handle the join. # invite into a normal invite before we can handle the join.
if third_party_signed is not None: if third_party_signed is not None:
yield self.replication_layer.exchange_third_party_invite( yield self.federation_handler.exchange_third_party_invite(
third_party_signed["sender"], third_party_signed["sender"],
target.to_string(), target.to_string(),
room_id, room_id,

View File

@ -32,7 +32,8 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
from synapse.crypto.keyring import Keyring from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker from synapse.events.spamcheck import SpamChecker
from synapse.federation import initialize_http_replication from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import FederationServer
from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.federation_server import FederationHandlerRegistry from synapse.federation.federation_server import FederationHandlerRegistry
from synapse.federation.transport.client import TransportLayerClient from synapse.federation.transport.client import TransportLayerClient
@ -100,7 +101,8 @@ class HomeServer(object):
DEPENDENCIES = [ DEPENDENCIES = [
'http_client', 'http_client',
'db_pool', 'db_pool',
'replication_layer', 'replication_client',
'replication_server',
'handlers', 'handlers',
'v1auth', 'v1auth',
'auth', 'auth',
@ -197,8 +199,11 @@ class HomeServer(object):
def get_ratelimiter(self): def get_ratelimiter(self):
return self.ratelimiter return self.ratelimiter
def build_replication_layer(self): def build_replication_client(self):
return initialize_http_replication(self) return FederationClient(self)
def build_replication_server(self):
return FederationServer(self)
def build_handlers(self): def build_handlers(self):
return Handlers(self) return Handlers(self)

View File

@ -47,7 +47,7 @@ class DirectoryTestCase(unittest.TestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
http_client=None, http_client=None,
resource_for_federation=Mock(), resource_for_federation=Mock(),
replication_layer=self.mock_federation, replication_client=self.mock_federation,
federation_registry=self.mock_registry, federation_registry=self.mock_registry,
) )
hs.handlers = DirectoryHandlers(hs) hs.handlers = DirectoryHandlers(hs)

View File

@ -34,7 +34,7 @@ class E2eKeysHandlerTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.hs = yield utils.setup_test_homeserver( self.hs = yield utils.setup_test_homeserver(
handlers=None, handlers=None,
replication_layer=mock.Mock(), replication_client=mock.Mock(),
) )
self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs) self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs)

View File

@ -51,7 +51,8 @@ class ProfileTestCase(unittest.TestCase):
http_client=None, http_client=None,
handlers=None, handlers=None,
resource_for_federation=Mock(), resource_for_federation=Mock(),
replication_layer=self.mock_federation, replication_client=self.mock_federation,
replication_server=Mock(),
federation_registry=self.mock_registry, federation_registry=self.mock_registry,
ratelimiter=NonCallableMock(spec_set=[ ratelimiter=NonCallableMock(spec_set=[
"send_message", "send_message",

View File

@ -81,7 +81,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
"get_current_state_deltas", "get_current_state_deltas",
]), ]),
state_handler=self.state_handler, state_handler=self.state_handler,
handlers=None, handlers=Mock(),
notifier=mock_notifier, notifier=mock_notifier,
resource_for_client=Mock(), resource_for_client=Mock(),
resource_for_federation=self.mock_federation_resource, resource_for_federation=self.mock_federation_resource,

View File

@ -31,7 +31,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
self.hs = yield setup_test_homeserver( self.hs = yield setup_test_homeserver(
"blue", "blue",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=[ ratelimiter=NonCallableMock(spec_set=[
"send_message", "send_message",
]), ]),

View File

@ -114,7 +114,7 @@ class EventStreamPermissionsTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=[ ratelimiter=NonCallableMock(spec_set=[
"send_message", "send_message",
]), ]),

View File

@ -45,7 +45,7 @@ class ProfileTestCase(unittest.TestCase):
http_client=None, http_client=None,
resource_for_client=self.mock_resource, resource_for_client=self.mock_resource,
federation=Mock(), federation=Mock(),
replication_layer=Mock(), replication_client=Mock(),
profile_handler=self.mock_handler profile_handler=self.mock_handler
) )

View File

@ -46,7 +46,7 @@ class RoomPermissionsTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
"red", "red",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]), ratelimiter=NonCallableMock(spec_set=["send_message"]),
) )
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()
@ -409,7 +409,7 @@ class RoomsMemberListTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
"red", "red",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]), ratelimiter=NonCallableMock(spec_set=["send_message"]),
) )
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()
@ -493,7 +493,7 @@ class RoomsCreateTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
"red", "red",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]), ratelimiter=NonCallableMock(spec_set=["send_message"]),
) )
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()
@ -582,7 +582,7 @@ class RoomTopicTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
"red", "red",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]), ratelimiter=NonCallableMock(spec_set=["send_message"]),
) )
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()
@ -697,7 +697,7 @@ class RoomMemberStateTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
"red", "red",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]), ratelimiter=NonCallableMock(spec_set=["send_message"]),
) )
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()
@ -829,7 +829,7 @@ class RoomMessagesTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
"red", "red",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]), ratelimiter=NonCallableMock(spec_set=["send_message"]),
) )
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()
@ -929,7 +929,7 @@ class RoomInitialSyncTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
"red", "red",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=[ ratelimiter=NonCallableMock(spec_set=[
"send_message", "send_message",
]), ]),
@ -1003,7 +1003,7 @@ class RoomMessageListTestCase(RestTestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
"red", "red",
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]), ratelimiter=NonCallableMock(spec_set=["send_message"]),
) )
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()

View File

@ -47,7 +47,7 @@ class RoomTypingTestCase(RestTestCase):
"red", "red",
clock=self.clock, clock=self.clock,
http_client=None, http_client=None,
replication_layer=Mock(), replication_client=Mock(),
ratelimiter=NonCallableMock(spec_set=[ ratelimiter=NonCallableMock(spec_set=[
"send_message", "send_message",
]), ]),

View File

@ -42,7 +42,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
config=config, config=config,
federation_sender=Mock(), federation_sender=Mock(),
replication_layer=Mock(), replication_client=Mock(),
) )
self.as_token = "token1" self.as_token = "token1"
@ -119,7 +119,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
config=config, config=config,
federation_sender=Mock(), federation_sender=Mock(),
replication_layer=Mock(), replication_client=Mock(),
) )
self.db_pool = hs.get_db_pool() self.db_pool = hs.get_db_pool()
@ -455,7 +455,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
config=config, config=config,
datastore=Mock(), datastore=Mock(),
federation_sender=Mock(), federation_sender=Mock(),
replication_layer=Mock(), replication_client=Mock(),
) )
ApplicationServiceStore(None, hs) ApplicationServiceStore(None, hs)
@ -473,7 +473,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
config=config, config=config,
datastore=Mock(), datastore=Mock(),
federation_sender=Mock(), federation_sender=Mock(),
replication_layer=Mock(), replication_client=Mock(),
) )
with self.assertRaises(ConfigError) as cm: with self.assertRaises(ConfigError) as cm:
@ -497,7 +497,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
config=config, config=config,
datastore=Mock(), datastore=Mock(),
federation_sender=Mock(), federation_sender=Mock(),
replication_layer=Mock(), replication_client=Mock(),
) )
with self.assertRaises(ConfigError) as cm: with self.assertRaises(ConfigError) as cm: