Split `FederationHandler` in half (#10692)
The idea here is to take anything to do with incoming events and move it out to a separate handler, as a way of making FederationHandler smaller.
This commit is contained in:
parent
96715d7633
commit
1800aabfc2
|
@ -0,0 +1 @@
|
||||||
|
Split the event-processing methods in `FederationHandler` into a separate `FederationEventHandler`.
|
|
@ -110,6 +110,7 @@ class FederationServer(FederationBase):
|
||||||
super().__init__(hs)
|
super().__init__(hs)
|
||||||
|
|
||||||
self.handler = hs.get_federation_handler()
|
self.handler = hs.get_federation_handler()
|
||||||
|
self._federation_event_handler = hs.get_federation_event_handler()
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self._event_auth_handler = hs.get_event_auth_handler()
|
self._event_auth_handler = hs.get_event_auth_handler()
|
||||||
|
|
||||||
|
@ -787,7 +788,9 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
event = await self._check_sigs_and_hash(room_version, event)
|
event = await self._check_sigs_and_hash(room_version, event)
|
||||||
|
|
||||||
return await self.handler.on_send_membership_event(origin, event)
|
return await self._federation_event_handler.on_send_membership_event(
|
||||||
|
origin, event
|
||||||
|
)
|
||||||
|
|
||||||
async def on_event_auth(
|
async def on_event_auth(
|
||||||
self, origin: str, room_id: str, event_id: str
|
self, origin: str, room_id: str, event_id: str
|
||||||
|
@ -1005,7 +1008,7 @@ class FederationServer(FederationBase):
|
||||||
async with lock:
|
async with lock:
|
||||||
logger.info("handling received PDU: %s", event)
|
logger.info("handling received PDU: %s", event)
|
||||||
try:
|
try:
|
||||||
await self.handler.on_receive_pdu(origin, event)
|
await self._federation_event_handler.on_receive_pdu(origin, event)
|
||||||
except FederationError as e:
|
except FederationError as e:
|
||||||
# XXX: Ideally we'd inform the remote we failed to process
|
# XXX: Ideally we'd inform the remote we failed to process
|
||||||
# the event, but we can't return an error in the transaction
|
# the event, but we can't return an error in the transaction
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -62,7 +62,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self.storage = hs.get_storage()
|
self.storage = hs.get_storage()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.federation_handler = hs.get_federation_handler()
|
self.federation_event_handler = hs.get_federation_event_handler()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
|
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
|
||||||
|
@ -127,7 +127,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||||
|
|
||||||
logger.info("Got %d events from federation", len(event_and_contexts))
|
logger.info("Got %d events from federation", len(event_and_contexts))
|
||||||
|
|
||||||
max_stream_id = await self.federation_handler.persist_events_and_notify(
|
max_stream_id = await self.federation_event_handler.persist_events_and_notify(
|
||||||
room_id, event_and_contexts, backfilled
|
room_id, event_and_contexts, backfilled
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
|
||||||
from synapse.handlers.event_auth import EventAuthHandler
|
from synapse.handlers.event_auth import EventAuthHandler
|
||||||
from synapse.handlers.events import EventHandler, EventStreamHandler
|
from synapse.handlers.events import EventHandler, EventStreamHandler
|
||||||
from synapse.handlers.federation import FederationHandler
|
from synapse.handlers.federation import FederationHandler
|
||||||
|
from synapse.handlers.federation_event import FederationEventHandler
|
||||||
from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
|
from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
|
||||||
from synapse.handlers.identity import IdentityHandler
|
from synapse.handlers.identity import IdentityHandler
|
||||||
from synapse.handlers.initial_sync import InitialSyncHandler
|
from synapse.handlers.initial_sync import InitialSyncHandler
|
||||||
|
@ -546,6 +547,10 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||||
def get_federation_handler(self) -> FederationHandler:
|
def get_federation_handler(self) -> FederationHandler:
|
||||||
return FederationHandler(self)
|
return FederationHandler(self)
|
||||||
|
|
||||||
|
@cache_in_self
|
||||||
|
def get_federation_event_handler(self) -> FederationEventHandler:
|
||||||
|
return FederationEventHandler(self)
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_identity_handler(self) -> IdentityHandler:
|
def get_identity_handler(self) -> IdentityHandler:
|
||||||
return IdentityHandler(self)
|
return IdentityHandler(self)
|
||||||
|
|
|
@ -208,7 +208,7 @@ class FederationKnockingTestCase(
|
||||||
async def _check_event_auth(origin, event, context, *args, **kwargs):
|
async def _check_event_auth(origin, event, context, *args, **kwargs):
|
||||||
return context
|
return context
|
||||||
|
|
||||||
homeserver.get_federation_handler()._check_event_auth = _check_event_auth
|
homeserver.get_federation_event_handler()._check_event_auth = _check_event_auth
|
||||||
|
|
||||||
return super().prepare(reactor, clock, homeserver)
|
return super().prepare(reactor, clock, homeserver)
|
||||||
|
|
||||||
|
|
|
@ -130,7 +130,9 @@ class FederationTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
with LoggingContext("send_rejected"):
|
with LoggingContext("send_rejected"):
|
||||||
d = run_in_background(self.handler.on_receive_pdu, OTHER_SERVER, ev)
|
d = run_in_background(
|
||||||
|
self.hs.get_federation_event_handler().on_receive_pdu, OTHER_SERVER, ev
|
||||||
|
)
|
||||||
self.get_success(d)
|
self.get_success(d)
|
||||||
|
|
||||||
# that should have been rejected
|
# that should have been rejected
|
||||||
|
@ -182,7 +184,9 @@ class FederationTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
with LoggingContext("send_rejected"):
|
with LoggingContext("send_rejected"):
|
||||||
d = run_in_background(self.handler.on_receive_pdu, OTHER_SERVER, ev)
|
d = run_in_background(
|
||||||
|
self.hs.get_federation_event_handler().on_receive_pdu, OTHER_SERVER, ev
|
||||||
|
)
|
||||||
self.get_success(d)
|
self.get_success(d)
|
||||||
|
|
||||||
# that should have been rejected
|
# that should have been rejected
|
||||||
|
@ -311,7 +315,9 @@ class FederationTestCase(unittest.HomeserverTestCase):
|
||||||
with LoggingContext("receive_pdu"):
|
with LoggingContext("receive_pdu"):
|
||||||
# Fake the OTHER_SERVER federating the message event over to our local homeserver
|
# Fake the OTHER_SERVER federating the message event over to our local homeserver
|
||||||
d = run_in_background(
|
d = run_in_background(
|
||||||
self.handler.on_receive_pdu, OTHER_SERVER, message_event
|
self.hs.get_federation_event_handler().on_receive_pdu,
|
||||||
|
OTHER_SERVER,
|
||||||
|
message_event,
|
||||||
)
|
)
|
||||||
self.get_success(d)
|
self.get_success(d)
|
||||||
|
|
||||||
|
@ -382,7 +388,9 @@ class FederationTestCase(unittest.HomeserverTestCase):
|
||||||
join_event.signatures[other_server] = {"x": "y"}
|
join_event.signatures[other_server] = {"x": "y"}
|
||||||
with LoggingContext("send_join"):
|
with LoggingContext("send_join"):
|
||||||
d = run_in_background(
|
d = run_in_background(
|
||||||
self.handler.on_send_membership_event, other_server, join_event
|
self.hs.get_federation_event_handler().on_send_membership_event,
|
||||||
|
other_server,
|
||||||
|
join_event,
|
||||||
)
|
)
|
||||||
self.get_success(d)
|
self.get_success(d)
|
||||||
|
|
||||||
|
|
|
@ -885,7 +885,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
||||||
def prepare(self, reactor, clock, hs):
|
def prepare(self, reactor, clock, hs):
|
||||||
self.federation_sender = hs.get_federation_sender()
|
self.federation_sender = hs.get_federation_sender()
|
||||||
self.event_builder_factory = hs.get_event_builder_factory()
|
self.event_builder_factory = hs.get_event_builder_factory()
|
||||||
self.federation_handler = hs.get_federation_handler()
|
self.federation_event_handler = hs.get_federation_event_handler()
|
||||||
self.presence_handler = hs.get_presence_handler()
|
self.presence_handler = hs.get_presence_handler()
|
||||||
|
|
||||||
# self.event_builder_for_2 = EventBuilderFactory(hs)
|
# self.event_builder_for_2 = EventBuilderFactory(hs)
|
||||||
|
@ -1026,7 +1026,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
||||||
builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
|
builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
|
||||||
)
|
)
|
||||||
|
|
||||||
self.get_success(self.federation_handler.on_receive_pdu(hostname, event))
|
self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))
|
||||||
|
|
||||||
# Check that it was successfully persisted.
|
# Check that it was successfully persisted.
|
||||||
self.get_success(self.store.get_event(event.event_id))
|
self.get_success(self.store.get_event(event.event_id))
|
||||||
|
|
|
@ -205,7 +205,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
|
||||||
def create_room_with_remote_server(self, user, token, remote_server="other_server"):
|
def create_room_with_remote_server(self, user, token, remote_server="other_server"):
|
||||||
room = self.helper.create_room_as(user, tok=token)
|
room = self.helper.create_room_as(user, tok=token)
|
||||||
store = self.hs.get_datastore()
|
store = self.hs.get_datastore()
|
||||||
federation = self.hs.get_federation_handler()
|
federation = self.hs.get_federation_event_handler()
|
||||||
|
|
||||||
prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
|
prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
|
||||||
room_version = self.get_success(store.get_room_version(room))
|
room_version = self.get_success(store.get_room_version(room))
|
||||||
|
|
|
@ -75,7 +75,8 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
self.handler = self.homeserver.get_federation_handler()
|
self.handler = self.homeserver.get_federation_handler()
|
||||||
self.handler._check_event_auth = lambda origin, event, context, state, claimed_auth_event_map, backfilled: succeed(
|
federation_event_handler = self.homeserver.get_federation_event_handler()
|
||||||
|
federation_event_handler._check_event_auth = lambda origin, event, context, state, claimed_auth_event_map, backfilled: succeed(
|
||||||
context
|
context
|
||||||
)
|
)
|
||||||
self.client = self.homeserver.get_federation_client()
|
self.client = self.homeserver.get_federation_client()
|
||||||
|
@ -85,7 +86,9 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
# Send the join, it should return None (which is not an error)
|
# Send the join, it should return None (which is not an error)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
self.get_success(self.handler.on_receive_pdu("test.serv", join_event)),
|
self.get_success(
|
||||||
|
federation_event_handler.on_receive_pdu("test.serv", join_event)
|
||||||
|
),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -129,9 +132,10 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
federation_event_handler = self.homeserver.get_federation_event_handler()
|
||||||
with LoggingContext("test-context"):
|
with LoggingContext("test-context"):
|
||||||
failure = self.get_failure(
|
failure = self.get_failure(
|
||||||
self.handler.on_receive_pdu("test.serv", lying_event),
|
federation_event_handler.on_receive_pdu("test.serv", lying_event),
|
||||||
FederationError,
|
FederationError,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue