diff --git a/changelog.d/12744.feature b/changelog.d/12744.feature new file mode 100644 index 0000000000..9836d94f8c --- /dev/null +++ b/changelog.d/12744.feature @@ -0,0 +1 @@ +Add a `drop_federated_event` callback to `SpamChecker` to disregard inbound federated events before they take up much processing power, in an emergency. diff --git a/docs/modules/spam_checker_callbacks.md b/docs/modules/spam_checker_callbacks.md index 472d957180..27c5a0ed5c 100644 --- a/docs/modules/spam_checker_callbacks.md +++ b/docs/modules/spam_checker_callbacks.md @@ -249,6 +249,24 @@ callback returns `False`, Synapse falls through to the next one. The value of th callback that does not return `False` will be used. If this happens, Synapse will not call any of the subsequent implementations of this callback. +### `should_drop_federated_event` + +_First introduced in Synapse v1.60.0_ + +```python +async def should_drop_federated_event(event: "synapse.events.EventBase") -> bool +``` + +Called when checking whether a remote server can federate an event with us. **Returning +`True` from this function will silently drop a federated event and split-brain our view +of a room's DAG, and thus you shouldn't use this callback unless you know what you are +doing.** + +If multiple modules implement this callback, they will be considered in order. If a +callback returns `False`, Synapse falls through to the next one. The value of the first +callback that does not return `False` will be used. If this happens, Synapse will not call +any of the subsequent implementations of this callback. + ## Example The example below is a module that implements the spam checker callback diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index f30207376a..61bcbe2abe 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -44,6 +44,10 @@ CHECK_EVENT_FOR_SPAM_CALLBACK = Callable[ ["synapse.events.EventBase"], Awaitable[Union[bool, str]], ] +SHOULD_DROP_FEDERATED_EVENT_CALLBACK = Callable[ + ["synapse.events.EventBase"], + Awaitable[Union[bool, str]], +] USER_MAY_JOIN_ROOM_CALLBACK = Callable[[str, str, bool], Awaitable[bool]] USER_MAY_INVITE_CALLBACK = Callable[[str, str, str], Awaitable[bool]] USER_MAY_SEND_3PID_INVITE_CALLBACK = Callable[[str, str, str, str], Awaitable[bool]] @@ -168,6 +172,9 @@ class SpamChecker: self.clock = hs.get_clock() self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = [] + self._should_drop_federated_event_callbacks: List[ + SHOULD_DROP_FEDERATED_EVENT_CALLBACK + ] = [] self._user_may_join_room_callbacks: List[USER_MAY_JOIN_ROOM_CALLBACK] = [] self._user_may_invite_callbacks: List[USER_MAY_INVITE_CALLBACK] = [] self._user_may_send_3pid_invite_callbacks: List[ @@ -191,6 +198,9 @@ class SpamChecker: def register_callbacks( self, check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None, + should_drop_federated_event: Optional[ + SHOULD_DROP_FEDERATED_EVENT_CALLBACK + ] = None, user_may_join_room: Optional[USER_MAY_JOIN_ROOM_CALLBACK] = None, user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, @@ -209,6 +219,11 @@ class SpamChecker: if check_event_for_spam is not None: self._check_event_for_spam_callbacks.append(check_event_for_spam) + if should_drop_federated_event is not None: + self._should_drop_federated_event_callbacks.append( + should_drop_federated_event + ) + if user_may_join_room is not None: self._user_may_join_room_callbacks.append(user_may_join_room) @@ -268,6 +283,31 @@ class SpamChecker: return False + async def should_drop_federated_event( + self, event: "synapse.events.EventBase" + ) -> Union[bool, str]: + """Checks if a given federated event is considered "spammy" by this + server. + + If the server considers an event spammy, it will be silently dropped, + and in doing so will split-brain our view of the room's DAG. + + Args: + event: the event to be checked + + Returns: + True if the event should be silently dropped + """ + for callback in self._should_drop_federated_event_callbacks: + with Measure( + self.clock, "{}.{}".format(callback.__module__, callback.__qualname__) + ): + res: Union[bool, str] = await delay_cancellation(callback(event)) + if res: + return res + + return False + async def user_may_join_room( self, user_id: str, room_id: str, is_invited: bool ) -> bool: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 884b5d60b4..b8232e5257 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -110,6 +110,7 @@ class FederationServer(FederationBase): self.handler = hs.get_federation_handler() self.storage = hs.get_storage() + self._spam_checker = hs.get_spam_checker() self._federation_event_handler = hs.get_federation_event_handler() self.state = hs.get_state_handler() self._event_auth_handler = hs.get_event_auth_handler() @@ -1019,6 +1020,12 @@ class FederationServer(FederationBase): except SynapseError as e: raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id) + if await self._spam_checker.should_drop_federated_event(pdu): + logger.warning( + "Unstaged federated event contains spam, dropping %s", pdu.event_id + ) + return + # Add the event to our staging area await self.store.insert_received_event_to_staging(origin, pdu) @@ -1032,6 +1039,41 @@ class FederationServer(FederationBase): pdu.room_id, room_version, lock, origin, pdu ) + async def _get_next_nonspam_staged_event_for_room( + self, room_id: str, room_version: RoomVersion + ) -> Optional[Tuple[str, EventBase]]: + """Fetch the first non-spam event from staging queue. + + Args: + room_id: the room to fetch the first non-spam event in. + room_version: the version of the room. + + Returns: + The first non-spam event in that room. + """ + + while True: + # We need to do this check outside the lock to avoid a race between + # a new event being inserted by another instance and it attempting + # to acquire the lock. + next = await self.store.get_next_staged_event_for_room( + room_id, room_version + ) + + if next is None: + return None + + origin, event = next + + if await self._spam_checker.should_drop_federated_event(event): + logger.warning( + "Staged federated event contains spam, dropping %s", + event.event_id, + ) + continue + + return next + @wrap_as_background_process("_process_incoming_pdus_in_room_inner") async def _process_incoming_pdus_in_room_inner( self, @@ -1109,12 +1151,10 @@ class FederationServer(FederationBase): (self._clock.time_msec() - received_ts) / 1000 ) - # We need to do this check outside the lock to avoid a race between - # a new event being inserted by another instance and it attempting - # to acquire the lock. - next = await self.store.get_next_staged_event_for_room( + next = await self._get_next_nonspam_staged_event_for_room( room_id, room_version ) + if not next: break diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 73f92d2df8..c4f661bb93 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -47,6 +47,7 @@ from synapse.events.spamcheck import ( CHECK_MEDIA_FILE_FOR_SPAM_CALLBACK, CHECK_REGISTRATION_FOR_SPAM_CALLBACK, CHECK_USERNAME_FOR_SPAM_CALLBACK, + SHOULD_DROP_FEDERATED_EVENT_CALLBACK, USER_MAY_CREATE_ROOM_ALIAS_CALLBACK, USER_MAY_CREATE_ROOM_CALLBACK, USER_MAY_INVITE_CALLBACK, @@ -234,6 +235,9 @@ class ModuleApi: self, *, check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None, + should_drop_federated_event: Optional[ + SHOULD_DROP_FEDERATED_EVENT_CALLBACK + ] = None, user_may_join_room: Optional[USER_MAY_JOIN_ROOM_CALLBACK] = None, user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, @@ -254,6 +258,7 @@ class ModuleApi: """ return self._spam_checker.register_callbacks( check_event_for_spam=check_event_for_spam, + should_drop_federated_event=should_drop_federated_event, user_may_join_room=user_may_join_room, user_may_invite=user_may_invite, user_may_send_3pid_invite=user_may_send_3pid_invite,