Fold federation/handler into handlers/federation
This commit is contained in:
parent
6966971a28
commit
d2798de660
|
@ -1,156 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 matrix.org
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from .pdu_codec import PduCodec
|
||||
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.util.logutils import log_function
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FederationEventHandler(object):
|
||||
""" Responsible for:
|
||||
a) handling received Pdus before handing them on as Events to the rest
|
||||
of the home server (including auth and state conflict resoultion)
|
||||
b) converting events that were produced by local clients that may need
|
||||
to be sent to remote home servers.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
self.replication_layer = hs.get_replication_layer()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
# self.auth_handler = gs.get_auth_handler()
|
||||
self.event_handler = hs.get_handlers().federation_handler
|
||||
self.server_name = hs.hostname
|
||||
|
||||
self.lock_manager = hs.get_room_lock_manager()
|
||||
|
||||
self.replication_layer.set_handler(self)
|
||||
|
||||
self.pdu_codec = PduCodec(hs)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def handle_new_event(self, event, snapshot):
|
||||
""" Takes in an event from the client to server side, that has already
|
||||
been authed and handled by the state module, and sends it to any
|
||||
remote home servers that may be interested.
|
||||
|
||||
Args:
|
||||
event
|
||||
snapshot (.storage.Snapshot): THe snapshot the event happened after
|
||||
|
||||
Returns:
|
||||
Deferred: Resolved when it has successfully been queued for
|
||||
processing.
|
||||
"""
|
||||
yield self.fill_out_prev_events(event, snapshot)
|
||||
|
||||
pdu = self.pdu_codec.pdu_from_event(event)
|
||||
|
||||
if not hasattr(pdu, "destinations") or not pdu.destinations:
|
||||
pdu.destinations = []
|
||||
|
||||
yield self.replication_layer.send_pdu(pdu)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def backfill(self, dest, room_id, limit):
|
||||
pdus = yield self.replication_layer.backfill(dest, room_id, limit)
|
||||
|
||||
if not pdus:
|
||||
defer.returnValue([])
|
||||
|
||||
events = [
|
||||
self.pdu_codec.event_from_pdu(pdu)
|
||||
for pdu in pdus
|
||||
]
|
||||
|
||||
defer.returnValue(events)
|
||||
|
||||
@log_function
|
||||
def get_state_for_room(self, destination, room_id):
|
||||
return self.replication_layer.get_state_for_context(
|
||||
destination, room_id
|
||||
)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def on_receive_pdu(self, pdu, backfilled):
|
||||
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
||||
do auth checks and put it throught the StateHandler.
|
||||
"""
|
||||
event = self.pdu_codec.event_from_pdu(pdu)
|
||||
|
||||
try:
|
||||
with (yield self.lock_manager.lock(pdu.context)):
|
||||
if event.is_state and not backfilled:
|
||||
is_new_state = yield self.state_handler.handle_new_state(
|
||||
pdu
|
||||
)
|
||||
if not is_new_state:
|
||||
return
|
||||
else:
|
||||
is_new_state = False
|
||||
|
||||
yield self.event_handler.on_receive(event, is_new_state, backfilled)
|
||||
|
||||
except AuthError:
|
||||
# TODO: Implement something in federation that allows us to
|
||||
# respond to PDU.
|
||||
raise
|
||||
|
||||
return
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_new_state(self, pdu, new_state_event):
|
||||
# TODO: Do any store stuff here. Notifiy C2S about this new
|
||||
# state.
|
||||
|
||||
yield self.store.update_current_state(
|
||||
pdu_id=pdu.pdu_id,
|
||||
origin=pdu.origin,
|
||||
context=pdu.context,
|
||||
pdu_type=pdu.pdu_type,
|
||||
state_key=pdu.state_key
|
||||
)
|
||||
|
||||
yield self.event_handler.on_receive(new_state_event)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def fill_out_prev_events(self, event, snapshot):
|
||||
if hasattr(event, "prev_events"):
|
||||
return
|
||||
|
||||
results = snapshot.prev_pdus
|
||||
|
||||
es = [
|
||||
"%s@%s" % (p_id, origin) for p_id, origin, _ in results
|
||||
]
|
||||
|
||||
event.prev_events = [e for e in es if e != event.event_id]
|
||||
|
||||
if results:
|
||||
event.depth = max([int(v) for _, _, v in results]) + 1
|
||||
else:
|
||||
event.depth = 0
|
|
@ -43,4 +43,5 @@ class BaseRoomHandler(BaseHandler):
|
|||
|
||||
self.notifier.on_new_room_event(event, store_id)
|
||||
|
||||
yield self.hs.get_federation().handle_new_event(event, snapshot)
|
||||
federation_handler = self.hs.get_handlers().federation_handler
|
||||
yield federation_handler.handle_new_event(event, snapshot)
|
||||
|
|
|
@ -20,6 +20,9 @@ from ._base import BaseHandler
|
|||
from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.federation.pdu_codec import PduCodec
|
||||
|
||||
from synapse.api.errors import AuthError
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
@ -30,8 +33,14 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class FederationHandler(BaseHandler):
|
||||
"""Handles events that originated from federation.
|
||||
Responsible for:
|
||||
a) handling received Pdus before handing them on as Events to the rest
|
||||
of the home server (including auth and state conflict resoultion)
|
||||
b) converting events that were produced by local clients that may need
|
||||
to be sent to remote home servers.
|
||||
"""
|
||||
|
||||
"""Handles events that originated from federation."""
|
||||
def __init__(self, hs):
|
||||
super(FederationHandler, self).__init__(hs)
|
||||
|
||||
|
@ -42,6 +51,112 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
self.waiting_for_join_list = {}
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.replication_layer = hs.get_replication_layer()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
# self.auth_handler = gs.get_auth_handler()
|
||||
self.server_name = hs.hostname
|
||||
|
||||
self.lock_manager = hs.get_room_lock_manager()
|
||||
|
||||
self.replication_layer.set_handler(self)
|
||||
|
||||
self.pdu_codec = PduCodec(hs)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def handle_new_event(self, event, snapshot):
|
||||
""" Takes in an event from the client to server side, that has already
|
||||
been authed and handled by the state module, and sends it to any
|
||||
remote home servers that may be interested.
|
||||
|
||||
Args:
|
||||
event
|
||||
snapshot (.storage.Snapshot): THe snapshot the event happened after
|
||||
|
||||
Returns:
|
||||
Deferred: Resolved when it has successfully been queued for
|
||||
processing.
|
||||
"""
|
||||
yield self.fill_out_prev_events(event, snapshot)
|
||||
|
||||
pdu = self.pdu_codec.pdu_from_event(event)
|
||||
|
||||
if not hasattr(pdu, "destinations") or not pdu.destinations:
|
||||
pdu.destinations = []
|
||||
|
||||
yield self.replication_layer.send_pdu(pdu)
|
||||
|
||||
|
||||
@log_function
|
||||
def get_state_for_room(self, destination, room_id):
|
||||
return self.replication_layer.get_state_for_context(
|
||||
destination, room_id
|
||||
)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def on_receive_pdu(self, pdu, backfilled):
|
||||
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
||||
do auth checks and put it throught the StateHandler.
|
||||
"""
|
||||
event = self.pdu_codec.event_from_pdu(pdu)
|
||||
|
||||
try:
|
||||
with (yield self.lock_manager.lock(pdu.context)):
|
||||
if event.is_state and not backfilled:
|
||||
is_new_state = yield self.state_handler.handle_new_state(
|
||||
pdu
|
||||
)
|
||||
if not is_new_state:
|
||||
return
|
||||
else:
|
||||
is_new_state = False
|
||||
|
||||
yield self.on_receive(event, is_new_state, backfilled)
|
||||
|
||||
except AuthError:
|
||||
# TODO: Implement something in federation that allows us to
|
||||
# respond to PDU.
|
||||
raise
|
||||
|
||||
return
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_new_state(self, pdu, new_state_event):
|
||||
# TODO: Do any store stuff here. Notifiy C2S about this new
|
||||
# state.
|
||||
|
||||
yield self.store.update_current_state(
|
||||
pdu_id=pdu.pdu_id,
|
||||
origin=pdu.origin,
|
||||
context=pdu.context,
|
||||
pdu_type=pdu.pdu_type,
|
||||
state_key=pdu.state_key
|
||||
)
|
||||
|
||||
yield self.on_receive(new_state_event)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def fill_out_prev_events(self, event, snapshot):
|
||||
if hasattr(event, "prev_events"):
|
||||
return
|
||||
|
||||
results = snapshot.prev_pdus
|
||||
|
||||
es = [
|
||||
"%s@%s" % (p_id, origin) for p_id, origin, _ in results
|
||||
]
|
||||
|
||||
event.prev_events = [e for e in es if e != event.event_id]
|
||||
|
||||
if results:
|
||||
event.depth = max([int(v) for _, _, v in results]) + 1
|
||||
else:
|
||||
event.depth = 0
|
||||
|
||||
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def on_receive(self, event, is_new_state, backfilled):
|
||||
|
@ -86,8 +201,7 @@ class FederationHandler(BaseHandler):
|
|||
if not room:
|
||||
# Huh, let's try and get the current state
|
||||
try:
|
||||
federation = self.hs.get_federation()
|
||||
yield federation.get_state_for_room(
|
||||
yield self.get_state_for_room(
|
||||
event.origin, event.room_id
|
||||
)
|
||||
|
||||
|
@ -119,11 +233,10 @@ class FederationHandler(BaseHandler):
|
|||
"user_joined_room", user=user, room_id=event.room_id
|
||||
)
|
||||
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def backfill(self, dest, room_id, limit):
|
||||
events = yield self.hs.get_federation().backfill(dest, room_id, limit)
|
||||
events = yield self._backfill(dest, room_id, limit)
|
||||
|
||||
for event in events:
|
||||
try:
|
||||
|
@ -133,10 +246,23 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
defer.returnValue(events)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _backfill(self, dest, room_id, limit):
|
||||
pdus = yield self.replication_layer.backfill(dest, room_id, limit)
|
||||
|
||||
if not pdus:
|
||||
defer.returnValue([])
|
||||
|
||||
events = [
|
||||
self.pdu_codec.event_from_pdu(pdu)
|
||||
for pdu in pdus
|
||||
]
|
||||
|
||||
defer.returnValue(events)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def do_invite_join(self, target_host, room_id, joinee, content):
|
||||
federation = self.hs.get_federation()
|
||||
|
||||
hosts = yield self.store.get_joined_hosts_for_room(room_id)
|
||||
if self.hs.hostname in hosts:
|
||||
|
@ -146,7 +272,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
# First get current state to see if we are already joined.
|
||||
try:
|
||||
yield federation.get_state_for_room(target_host, room_id)
|
||||
yield self.get_state_for_room(target_host, room_id)
|
||||
|
||||
hosts = yield self.store.get_joined_hosts_for_room(room_id)
|
||||
if self.hs.hostname in hosts:
|
||||
|
@ -166,7 +292,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
new_event.destinations = [target_host]
|
||||
|
||||
yield federation.handle_new_event(new_event)
|
||||
yield self.handle_new_event(new_event)
|
||||
|
||||
# TODO (erikj): Time out here.
|
||||
d = defer.Deferred()
|
||||
|
|
|
@ -368,7 +368,8 @@ class RoomCreationHandler(BaseRoomHandler):
|
|||
yield self.state_handler.handle_new_event(config_event)
|
||||
# store_id = persist...
|
||||
|
||||
yield self.hs.get_federation().handle_new_event(config_event)
|
||||
federation_handler = self.hs.get_handlers().federation_handler
|
||||
yield federation_handler.handle_new_event(config_event)
|
||||
# self.notifier.on_new_room_event(event, store_id)
|
||||
|
||||
content = {"membership": Membership.JOIN}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
|
||||
# Imports required for the default HomeServer() implementation
|
||||
from synapse.federation import initialize_http_replication
|
||||
from synapse.federation.handler import FederationEventHandler
|
||||
from synapse.api.events.factory import EventFactory
|
||||
from synapse.api.notifier import Notifier
|
||||
from synapse.api.auth import Auth
|
||||
|
@ -58,7 +57,6 @@ class BaseHomeServer(object):
|
|||
'http_client',
|
||||
'db_pool',
|
||||
'persistence_service',
|
||||
'federation',
|
||||
'replication_layer',
|
||||
'datastore',
|
||||
'event_factory',
|
||||
|
@ -152,9 +150,6 @@ class HomeServer(BaseHomeServer):
|
|||
def build_replication_layer(self):
|
||||
return initialize_http_replication(self)
|
||||
|
||||
def build_federation(self):
|
||||
return FederationEventHandler(self)
|
||||
|
||||
def build_datastore(self):
|
||||
return DataStore(self)
|
||||
|
||||
|
|
|
@ -60,7 +60,9 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
def persist_event(self, event=None, backfilled=False, pdu=None):
|
||||
# FIXME (erikj): This should be removed when we start amalgamating
|
||||
# event and pdu storage
|
||||
yield self.hs.get_federation().fill_out_prev_events(event)
|
||||
if event is not None:
|
||||
federation_handler = self.hs.get_handlers().federation_handler
|
||||
yield federation_handler.fill_out_prev_events(event)
|
||||
|
||||
stream_ordering = None
|
||||
if backfilled:
|
||||
|
|
|
@ -53,23 +53,26 @@ class RoomMemberHandlerTestCase(unittest.TestCase):
|
|||
handlers=NonCallableMock(spec_set=[
|
||||
"room_member_handler",
|
||||
"profile_handler",
|
||||
"federation_handler",
|
||||
]),
|
||||
auth=NonCallableMock(spec_set=["check"]),
|
||||
federation=NonCallableMock(spec_set=[
|
||||
"handle_new_event",
|
||||
"get_state_for_room",
|
||||
]),
|
||||
state_handler=NonCallableMock(spec_set=["handle_new_event"]),
|
||||
)
|
||||
|
||||
self.federation = NonCallableMock(spec_set=[
|
||||
"handle_new_event",
|
||||
"get_state_for_room",
|
||||
])
|
||||
|
||||
self.datastore = hs.get_datastore()
|
||||
self.handlers = hs.get_handlers()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.federation = hs.get_federation()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
self.distributor = hs.get_distributor()
|
||||
self.hs = hs
|
||||
|
||||
self.handlers.federation_handler = self.federation
|
||||
|
||||
self.distributor.declare("collect_presencelike_data")
|
||||
|
||||
self.handlers.room_member_handler = RoomMemberHandler(self.hs)
|
||||
|
@ -333,21 +336,24 @@ class RoomCreationTest(unittest.TestCase):
|
|||
handlers=NonCallableMock(spec_set=[
|
||||
"room_creation_handler",
|
||||
"room_member_handler",
|
||||
"federation_handler",
|
||||
]),
|
||||
auth=NonCallableMock(spec_set=["check"]),
|
||||
federation=NonCallableMock(spec_set=[
|
||||
"handle_new_event",
|
||||
]),
|
||||
state_handler=NonCallableMock(spec_set=["handle_new_event"]),
|
||||
)
|
||||
|
||||
self.federation = NonCallableMock(spec_set=[
|
||||
"handle_new_event",
|
||||
])
|
||||
|
||||
self.datastore = hs.get_datastore()
|
||||
self.handlers = hs.get_handlers()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.federation = hs.get_federation()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
self.hs = hs
|
||||
|
||||
self.handlers.federation_handler = self.federation
|
||||
|
||||
self.handlers.room_creation_handler = RoomCreationHandler(self.hs)
|
||||
self.room_creation_handler = self.handlers.room_creation_handler
|
||||
|
||||
|
|
|
@ -128,9 +128,9 @@ class EventStreamPermissionsTestCase(RestTestCase):
|
|||
"test",
|
||||
db_pool=None,
|
||||
http_client=None,
|
||||
federation=Mock(),
|
||||
replication_layer=Mock(),
|
||||
state_handler=state_handler,
|
||||
datastore=MemoryDataStore(),
|
||||
persistence_service=persistence_service,
|
||||
clock=Mock(spec=[
|
||||
"call_later",
|
||||
|
@ -139,9 +139,10 @@ class EventStreamPermissionsTestCase(RestTestCase):
|
|||
]),
|
||||
)
|
||||
|
||||
hs.get_handlers().federation_handler = Mock()
|
||||
|
||||
hs.get_clock().time_msec.return_value = 1000000
|
||||
|
||||
hs.datastore = MemoryDataStore()
|
||||
synapse.rest.register.register_servlets(hs, self.mock_resource)
|
||||
synapse.rest.events.register_servlets(hs, self.mock_resource)
|
||||
synapse.rest.room.register_servlets(hs, self.mock_resource)
|
||||
|
|
|
@ -54,12 +54,12 @@ class RoomPermissionsTestCase(RestTestCase):
|
|||
"test",
|
||||
db_pool=None,
|
||||
http_client=None,
|
||||
federation=Mock(),
|
||||
datastore=MemoryDataStore(),
|
||||
replication_layer=Mock(),
|
||||
state_handler=state_handler,
|
||||
persistence_service=persistence_service,
|
||||
)
|
||||
hs.get_handlers().federation_handler = Mock()
|
||||
|
||||
def _get_user_by_token(token=None):
|
||||
return hs.parse_userid(self.auth_user_id)
|
||||
|
@ -403,12 +403,12 @@ class RoomsMemberListTestCase(RestTestCase):
|
|||
"test",
|
||||
db_pool=None,
|
||||
http_client=None,
|
||||
federation=Mock(),
|
||||
datastore=MemoryDataStore(),
|
||||
replication_layer=Mock(),
|
||||
state_handler=state_handler,
|
||||
persistence_service=persistence_service,
|
||||
)
|
||||
hs.get_handlers().federation_handler = Mock()
|
||||
|
||||
self.auth_user_id = self.user_id
|
||||
|
||||
|
@ -484,12 +484,12 @@ class RoomsCreateTestCase(RestTestCase):
|
|||
"test",
|
||||
db_pool=None,
|
||||
http_client=None,
|
||||
federation=Mock(),
|
||||
datastore=MemoryDataStore(),
|
||||
replication_layer=Mock(),
|
||||
state_handler=state_handler,
|
||||
persistence_service=persistence_service,
|
||||
)
|
||||
hs.get_handlers().federation_handler = Mock()
|
||||
|
||||
def _get_user_by_token(token=None):
|
||||
return hs.parse_userid(self.auth_user_id)
|
||||
|
@ -626,12 +626,12 @@ class RoomTopicTestCase(RestTestCase):
|
|||
"test",
|
||||
db_pool=None,
|
||||
http_client=None,
|
||||
federation=Mock(),
|
||||
datastore=MemoryDataStore(),
|
||||
replication_layer=Mock(),
|
||||
state_handler=state_handler,
|
||||
persistence_service=persistence_service,
|
||||
)
|
||||
hs.get_handlers().federation_handler = Mock()
|
||||
|
||||
def _get_user_by_token(token=None):
|
||||
return hs.parse_userid(self.auth_user_id)
|
||||
|
@ -729,12 +729,12 @@ class RoomMemberStateTestCase(RestTestCase):
|
|||
"test",
|
||||
db_pool=None,
|
||||
http_client=None,
|
||||
federation=Mock(),
|
||||
datastore=MemoryDataStore(),
|
||||
replication_layer=Mock(),
|
||||
state_handler=state_handler,
|
||||
persistence_service=persistence_service,
|
||||
)
|
||||
hs.get_handlers().federation_handler = Mock()
|
||||
|
||||
def _get_user_by_token(token=None):
|
||||
return hs.parse_userid(self.auth_user_id)
|
||||
|
@ -855,12 +855,12 @@ class RoomMessagesTestCase(RestTestCase):
|
|||
"test",
|
||||
db_pool=None,
|
||||
http_client=None,
|
||||
federation=Mock(),
|
||||
datastore=MemoryDataStore(),
|
||||
replication_layer=Mock(),
|
||||
state_handler=state_handler,
|
||||
persistence_service=persistence_service,
|
||||
)
|
||||
hs.get_handlers().federation_handler = Mock()
|
||||
|
||||
def _get_user_by_token(token=None):
|
||||
return hs.parse_userid(self.auth_user_id)
|
||||
|
|
Loading…
Reference in New Issue