Rate limit joins per-room (#13276)
This commit is contained in:
parent
2ee0b6ef4b
commit
b977867358
|
@ -0,0 +1 @@
|
|||
Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room, and throttle additional joins if that rate grows too large.
|
|
@ -67,6 +67,10 @@ rc_joins:
|
|||
per_second: 9999
|
||||
burst_count: 9999
|
||||
|
||||
rc_joins_per_room:
|
||||
per_second: 9999
|
||||
burst_count: 9999
|
||||
|
||||
rc_3pid_validation:
|
||||
per_second: 1000
|
||||
burst_count: 1000
|
||||
|
|
|
@ -104,6 +104,16 @@ minimum, a `notif_from` setting.)
|
|||
Specifying an `email` setting under `account_threepid_delegates` will now cause
|
||||
an error at startup.
|
||||
|
||||
## Changes to the event replication streams
|
||||
|
||||
Synapse now includes a flag indicating if an event is an outlier when
|
||||
replicating it to other workers. This is a forwards- and backwards-incompatible
|
||||
change: v1.63 and workers cannot process events replicated by v1.64 workers, and
|
||||
vice versa.
|
||||
|
||||
Once all workers are upgraded to v1.64 (or downgraded to v1.63), event
|
||||
replication will resume as normal.
|
||||
|
||||
# Upgrading to v1.62.0
|
||||
|
||||
## New signatures for spam checker callbacks
|
||||
|
|
|
@ -1471,6 +1471,25 @@ rc_joins:
|
|||
per_second: 0.03
|
||||
burst_count: 12
|
||||
```
|
||||
---
|
||||
### `rc_joins_per_room`
|
||||
|
||||
This option allows admins to ratelimit joins to a room based on the number of recent
|
||||
joins (local or remote) to that room. It is intended to mitigate mass-join spam
|
||||
waves which target multiple homeservers.
|
||||
|
||||
By default, one join is permitted to a room every second, with an accumulating
|
||||
buffer of up to ten instantaneous joins.
|
||||
|
||||
Example configuration (default values):
|
||||
```yaml
|
||||
rc_joins_per_room:
|
||||
per_second: 1
|
||||
burst_count: 10
|
||||
```
|
||||
|
||||
_Added in Synapse 1.64.0._
|
||||
|
||||
---
|
||||
### `rc_3pid_validation`
|
||||
|
||||
|
|
|
@ -112,6 +112,13 @@ class RatelimitConfig(Config):
|
|||
defaults={"per_second": 0.01, "burst_count": 10},
|
||||
)
|
||||
|
||||
# Track the rate of joins to a given room. If there are too many, temporarily
|
||||
# prevent local joins and remote joins via this server.
|
||||
self.rc_joins_per_room = RateLimitConfig(
|
||||
config.get("rc_joins_per_room", {}),
|
||||
defaults={"per_second": 1, "burst_count": 10},
|
||||
)
|
||||
|
||||
# Ratelimit cross-user key requests:
|
||||
# * For local requests this is keyed by the sending device.
|
||||
# * For requests received over federation this is keyed by the origin.
|
||||
|
|
|
@ -118,6 +118,7 @@ class FederationServer(FederationBase):
|
|||
self._federation_event_handler = hs.get_federation_event_handler()
|
||||
self.state = hs.get_state_handler()
|
||||
self._event_auth_handler = hs.get_event_auth_handler()
|
||||
self._room_member_handler = hs.get_room_member_handler()
|
||||
|
||||
self._state_storage_controller = hs.get_storage_controllers().state
|
||||
|
||||
|
@ -621,6 +622,15 @@ class FederationServer(FederationBase):
|
|||
)
|
||||
raise IncompatibleRoomVersionError(room_version=room_version)
|
||||
|
||||
# Refuse the request if that room has seen too many joins recently.
|
||||
# This is in addition to the HS-level rate limiting applied by
|
||||
# BaseFederationServlet.
|
||||
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
|
||||
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
|
||||
requester=None,
|
||||
key=room_id,
|
||||
update=False,
|
||||
)
|
||||
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
|
||||
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
|
||||
|
||||
|
@ -655,6 +665,12 @@ class FederationServer(FederationBase):
|
|||
room_id: str,
|
||||
caller_supports_partial_state: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
|
||||
requester=None,
|
||||
key=room_id,
|
||||
update=False,
|
||||
)
|
||||
|
||||
event, context = await self._on_send_membership_event(
|
||||
origin, content, Membership.JOIN, room_id
|
||||
)
|
||||
|
|
|
@ -1980,6 +1980,10 @@ class FederationEventHandler:
|
|||
event, event_pos, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
# TODO retrieve the previous state, and exclude join -> join transitions
|
||||
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
|
||||
|
||||
def _sanity_check_event(self, ev: EventBase) -> None:
|
||||
"""
|
||||
Do some early sanity checks of a received event
|
||||
|
|
|
@ -463,6 +463,7 @@ class EventCreationHandler:
|
|||
)
|
||||
self._events_shard_config = self.config.worker.events_shard_config
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self._notifier = hs.get_notifier()
|
||||
|
||||
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
|
||||
|
||||
|
@ -1550,6 +1551,16 @@ class EventCreationHandler:
|
|||
requester, is_admin_redaction=is_admin_redaction
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
(
|
||||
current_membership,
|
||||
_,
|
||||
) = await self.store.get_local_current_membership_for_user_in_room(
|
||||
event.state_key, event.room_id
|
||||
)
|
||||
if current_membership != Membership.JOIN:
|
||||
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
|
||||
|
||||
await self._maybe_kick_guest_users(event, context)
|
||||
|
||||
if event.type == EventTypes.CanonicalAlias:
|
||||
|
|
|
@ -94,12 +94,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
|
||||
)
|
||||
# Tracks joins from local users to rooms this server isn't a member of.
|
||||
# I.e. joins this server makes by requesting /make_join /send_join from
|
||||
# another server.
|
||||
self._join_rate_limiter_remote = Ratelimiter(
|
||||
store=self.store,
|
||||
clock=self.clock,
|
||||
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
|
||||
)
|
||||
# TODO: find a better place to keep this Ratelimiter.
|
||||
# It needs to be
|
||||
# - written to by event persistence code
|
||||
# - written to by something which can snoop on replication streams
|
||||
# - read by the RoomMemberHandler to rate limit joins from local users
|
||||
# - read by the FederationServer to rate limit make_joins and send_joins from
|
||||
# other homeservers
|
||||
# I wonder if a homeserver-wide collection of rate limiters might be cleaner?
|
||||
self._join_rate_per_room_limiter = Ratelimiter(
|
||||
store=self.store,
|
||||
clock=self.clock,
|
||||
rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
|
||||
)
|
||||
|
||||
# Ratelimiter for invites, keyed by room (across all issuers, all
|
||||
# recipients).
|
||||
|
@ -136,6 +153,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||
)
|
||||
|
||||
self.request_ratelimiter = hs.get_request_ratelimiter()
|
||||
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
|
||||
|
||||
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
|
||||
"""Notify the rate limiter that a room join has occurred.
|
||||
|
||||
Use this to inform the RoomMemberHandler about joins that have either
|
||||
- taken place on another homeserver, or
|
||||
- on another worker in this homeserver.
|
||||
Joins actioned by this worker should use the usual `ratelimit` method, which
|
||||
checks the limit and increments the counter in one go.
|
||||
"""
|
||||
self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)
|
||||
|
||||
@abc.abstractmethod
|
||||
async def _remote_join(
|
||||
|
@ -396,6 +425,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||
# up blocking profile updates.
|
||||
if newly_joined and ratelimit:
|
||||
await self._join_rate_limiter_local.ratelimit(requester)
|
||||
await self._join_rate_per_room_limiter.ratelimit(
|
||||
requester, key=room_id, update=False
|
||||
)
|
||||
|
||||
result_event = await self.event_creation_handler.handle_new_client_event(
|
||||
requester,
|
||||
|
@ -867,6 +899,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||
await self._join_rate_limiter_remote.ratelimit(
|
||||
requester,
|
||||
)
|
||||
await self._join_rate_per_room_limiter.ratelimit(
|
||||
requester,
|
||||
key=room_id,
|
||||
update=False,
|
||||
)
|
||||
|
||||
inviter = await self._get_inviter(target.to_string(), room_id)
|
||||
if inviter and not self.hs.is_mine(inviter):
|
||||
|
|
|
@ -21,7 +21,7 @@ from twisted.internet.interfaces import IAddress, IConnector
|
|||
from twisted.internet.protocol import ReconnectingClientFactory
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import EventTypes, ReceiptTypes
|
||||
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
|
||||
from synapse.federation import send_queue
|
||||
from synapse.federation.sender import FederationSender
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
|
@ -219,6 +219,21 @@ class ReplicationDataHandler:
|
|||
membership=row.data.membership,
|
||||
)
|
||||
|
||||
# If this event is a join, make a note of it so we have an accurate
|
||||
# cross-worker room rate limit.
|
||||
# TODO: Erik said we should exclude rows that came from ex_outliers
|
||||
# here, but I don't see how we can determine that. I guess we could
|
||||
# add a flag to row.data?
|
||||
if (
|
||||
row.data.type == EventTypes.Member
|
||||
and row.data.membership == Membership.JOIN
|
||||
and not row.data.outlier
|
||||
):
|
||||
# TODO retrieve the previous state, and exclude join -> join transitions
|
||||
self.notifier.notify_user_joined_room(
|
||||
row.data.event_id, row.data.room_id
|
||||
)
|
||||
|
||||
await self._presence_handler.process_replication_rows(
|
||||
stream_name, instance_name, token, rows
|
||||
)
|
||||
|
|
|
@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
|
|||
relates_to: Optional[str]
|
||||
membership: Optional[str]
|
||||
rejected: bool
|
||||
outlier: bool
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
|
|
|
@ -1490,7 +1490,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
async def get_all_new_forward_event_rows(
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||
"""Returns new events, for the Events replication stream
|
||||
|
||||
Args:
|
||||
|
@ -1506,10 +1506,11 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
def get_all_new_forward_event_rows(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
|
||||
" e.outlier"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
|
@ -1523,7 +1524,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
)
|
||||
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
||||
return cast(
|
||||
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
|
||||
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
|
||||
txn.fetchall(),
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
|
@ -1532,7 +1534,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
async def get_ex_outlier_stream_rows(
|
||||
self, instance_name: str, last_id: int, current_id: int
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||
"""Returns de-outliered events, for the Events replication stream
|
||||
|
||||
Args:
|
||||
|
@ -1547,11 +1549,14 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
def get_ex_outlier_stream_rows_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
|
||||
" e.outlier"
|
||||
" FROM events AS e"
|
||||
# NB: the next line (inner join) is what makes this query different from
|
||||
# get_all_new_forward_event_rows.
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
|
@ -1566,7 +1571,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
txn.execute(sql, (last_id, current_id, instance_name))
|
||||
return cast(
|
||||
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
|
||||
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
|
||||
txn.fetchall(),
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
|
|
|
@ -148,7 +148,7 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
|||
tok2 = self.login("fozzie", "bear")
|
||||
self.helper.join(self._room_id, second_member_user_id, tok=tok2)
|
||||
|
||||
def _make_join(self, user_id) -> JsonDict:
|
||||
def _make_join(self, user_id: str) -> JsonDict:
|
||||
channel = self.make_signed_federation_request(
|
||||
"GET",
|
||||
f"/_matrix/federation/v1/make_join/{self._room_id}/{user_id}"
|
||||
|
@ -260,6 +260,67 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
|||
)
|
||||
self.assertEqual(r[("m.room.member", joining_user)].membership, "join")
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
|
||||
def test_make_join_respects_room_join_rate_limit(self) -> None:
|
||||
# In the test setup, two users join the room. Since the rate limiter burst
|
||||
# count is 3, a new make_join request to the room should be accepted.
|
||||
|
||||
joining_user = "@ronniecorbett:" + self.OTHER_SERVER_NAME
|
||||
self._make_join(joining_user)
|
||||
|
||||
# Now have a new local user join the room. This saturates the rate limiter
|
||||
# bucket, so the next make_join should be denied.
|
||||
new_local_user = self.register_user("animal", "animal")
|
||||
token = self.login("animal", "animal")
|
||||
self.helper.join(self._room_id, new_local_user, tok=token)
|
||||
|
||||
joining_user = "@ronniebarker:" + self.OTHER_SERVER_NAME
|
||||
channel = self.make_signed_federation_request(
|
||||
"GET",
|
||||
f"/_matrix/federation/v1/make_join/{self._room_id}/{joining_user}"
|
||||
f"?ver={DEFAULT_ROOM_VERSION}",
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
|
||||
def test_send_join_contributes_to_room_join_rate_limit_and_is_limited(self) -> None:
|
||||
# Make two make_join requests up front. (These are rate limited, but do not
|
||||
# contribute to the rate limit.)
|
||||
join_event_dicts = []
|
||||
for i in range(2):
|
||||
joining_user = f"@misspiggy{i}:{self.OTHER_SERVER_NAME}"
|
||||
join_result = self._make_join(joining_user)
|
||||
join_event_dict = join_result["event"]
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
join_event_dict,
|
||||
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||
)
|
||||
join_event_dicts.append(join_event_dict)
|
||||
|
||||
# In the test setup, two users join the room. Since the rate limiter burst
|
||||
# count is 3, the first send_join should be accepted...
|
||||
channel = self.make_signed_federation_request(
|
||||
"PUT",
|
||||
f"/_matrix/federation/v2/send_join/{self._room_id}/join0",
|
||||
content=join_event_dicts[0],
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# ... but the second should be denied.
|
||||
channel = self.make_signed_federation_request(
|
||||
"PUT",
|
||||
f"/_matrix/federation/v2/send_join/{self._room_id}/join1",
|
||||
content=join_event_dicts[1],
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)
|
||||
|
||||
# NB: we could write a test which checks that the send_join event is seen
|
||||
# by other workers over replication, and that they update their rate limit
|
||||
# buckets accordingly. I'm going to assume that the join event gets sent over
|
||||
# replication, at which point the tests.handlers.room_member test
|
||||
# test_local_users_joining_on_another_worker_contribute_to_rate_limit
|
||||
# is probably sufficient to reassure that the bucket is updated.
|
||||
|
||||
|
||||
def _create_acl_event(content):
|
||||
return make_event_from_dict(
|
||||
|
|
|
@ -0,0 +1,290 @@
|
|||
from http import HTTPStatus
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
import synapse.rest.client.login
|
||||
import synapse.rest.client.room
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import LimitExceededError
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events import FrozenEventV3
|
||||
from synapse.federation.federation_client import SendJoinResult
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import UserID, create_requester
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.replication._base import RedisMultiWorkerStreamTestCase
|
||||
from tests.server import make_request
|
||||
from tests.test_utils import make_awaitable
|
||||
from tests.unittest import FederatingHomeserverTestCase, override_config
|
||||
|
||||
|
||||
class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
synapse.rest.client.login.register_servlets,
|
||||
synapse.rest.client.room.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.handler = hs.get_room_member_handler()
|
||||
|
||||
# Create three users.
|
||||
self.alice = self.register_user("alice", "pass")
|
||||
self.alice_token = self.login("alice", "pass")
|
||||
self.bob = self.register_user("bob", "pass")
|
||||
self.bob_token = self.login("bob", "pass")
|
||||
self.chris = self.register_user("chris", "pass")
|
||||
self.chris_token = self.login("chris", "pass")
|
||||
|
||||
# Create a room on this homeserver. Note that this counts as a join: it
|
||||
# contributes to the rate limter's count of actions
|
||||
self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
|
||||
|
||||
self.intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}"
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||
def test_local_user_local_joins_contribute_to_limit_and_are_limited(self) -> None:
|
||||
# The rate limiter has accumulated one token from Alice's join after the create
|
||||
# event.
|
||||
# Try joining the room as Bob.
|
||||
self.get_success(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.bob),
|
||||
target=UserID.from_string(self.bob),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
)
|
||||
)
|
||||
|
||||
# The rate limiter bucket is full. A second join should be denied.
|
||||
self.get_failure(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
)
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||
def test_local_user_profile_edits_dont_contribute_to_limit(self) -> None:
|
||||
# The rate limiter has accumulated one token from Alice's join after the create
|
||||
# event. Alice should still be able to change her displayname.
|
||||
self.get_success(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.alice),
|
||||
target=UserID.from_string(self.alice),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
content={"displayname": "Alice Cooper"},
|
||||
)
|
||||
)
|
||||
|
||||
# Still room in the limiter bucket. Chris's join should be accepted.
|
||||
self.get_success(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
)
|
||||
)
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 1}})
|
||||
def test_remote_joins_contribute_to_rate_limit(self) -> None:
|
||||
# Join once, to fill the rate limiter bucket.
|
||||
#
|
||||
# To do this we have to mock the responses from the remote homeserver.
|
||||
# We also patch out a bunch of event checks on our end. All we're really
|
||||
# trying to check here is that remote joins will bump the rate limter when
|
||||
# they are persisted.
|
||||
create_event_source = {
|
||||
"auth_events": [],
|
||||
"content": {
|
||||
"creator": f"@creator:{self.OTHER_SERVER_NAME}",
|
||||
"room_version": self.hs.config.server.default_room_version.identifier,
|
||||
},
|
||||
"depth": 0,
|
||||
"origin_server_ts": 0,
|
||||
"prev_events": [],
|
||||
"room_id": self.intially_unjoined_room_id,
|
||||
"sender": f"@creator:{self.OTHER_SERVER_NAME}",
|
||||
"state_key": "",
|
||||
"type": EventTypes.Create,
|
||||
}
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
create_event_source,
|
||||
self.hs.config.server.default_room_version,
|
||||
)
|
||||
create_event = FrozenEventV3(
|
||||
create_event_source,
|
||||
self.hs.config.server.default_room_version,
|
||||
{},
|
||||
None,
|
||||
)
|
||||
|
||||
join_event_source = {
|
||||
"auth_events": [create_event.event_id],
|
||||
"content": {"membership": "join"},
|
||||
"depth": 1,
|
||||
"origin_server_ts": 100,
|
||||
"prev_events": [create_event.event_id],
|
||||
"sender": self.bob,
|
||||
"state_key": self.bob,
|
||||
"room_id": self.intially_unjoined_room_id,
|
||||
"type": EventTypes.Member,
|
||||
}
|
||||
add_hashes_and_signatures(
|
||||
self.hs.config.server.default_room_version,
|
||||
join_event_source,
|
||||
self.hs.hostname,
|
||||
self.hs.signing_key,
|
||||
)
|
||||
join_event = FrozenEventV3(
|
||||
join_event_source,
|
||||
self.hs.config.server.default_room_version,
|
||||
{},
|
||||
None,
|
||||
)
|
||||
|
||||
mock_make_membership_event = Mock(
|
||||
return_value=make_awaitable(
|
||||
(
|
||||
self.OTHER_SERVER_NAME,
|
||||
join_event,
|
||||
self.hs.config.server.default_room_version,
|
||||
)
|
||||
)
|
||||
)
|
||||
mock_send_join = Mock(
|
||||
return_value=make_awaitable(
|
||||
SendJoinResult(
|
||||
join_event,
|
||||
self.OTHER_SERVER_NAME,
|
||||
state=[create_event],
|
||||
auth_chain=[create_event],
|
||||
partial_state=False,
|
||||
servers_in_room=[],
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
with patch.object(
|
||||
self.handler.federation_handler.federation_client,
|
||||
"make_membership_event",
|
||||
mock_make_membership_event,
|
||||
), patch.object(
|
||||
self.handler.federation_handler.federation_client,
|
||||
"send_join",
|
||||
mock_send_join,
|
||||
), patch(
|
||||
"synapse.event_auth._is_membership_change_allowed",
|
||||
return_value=None,
|
||||
), patch(
|
||||
"synapse.handlers.federation_event.check_state_dependent_auth_rules",
|
||||
return_value=None,
|
||||
):
|
||||
self.get_success(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.bob),
|
||||
target=UserID.from_string(self.bob),
|
||||
room_id=self.intially_unjoined_room_id,
|
||||
action=Membership.JOIN,
|
||||
remote_room_hosts=[self.OTHER_SERVER_NAME],
|
||||
)
|
||||
)
|
||||
|
||||
# Try to join as Chris. Should get denied.
|
||||
self.get_failure(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.intially_unjoined_room_id,
|
||||
action=Membership.JOIN,
|
||||
remote_room_hosts=[self.OTHER_SERVER_NAME],
|
||||
),
|
||||
LimitExceededError,
|
||||
)
|
||||
|
||||
# TODO: test that remote joins to a room are rate limited.
|
||||
# Could do this by setting the burst count to 1, then:
|
||||
# - remote-joining a room
|
||||
# - immediately leaving
|
||||
# - trying to remote-join again.
|
||||
|
||||
|
||||
class TestReplicatedJoinsLimitedByPerRoomRateLimiter(RedisMultiWorkerStreamTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
synapse.rest.client.login.register_servlets,
|
||||
synapse.rest.client.room.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.handler = hs.get_room_member_handler()
|
||||
|
||||
# Create three users.
|
||||
self.alice = self.register_user("alice", "pass")
|
||||
self.alice_token = self.login("alice", "pass")
|
||||
self.bob = self.register_user("bob", "pass")
|
||||
self.bob_token = self.login("bob", "pass")
|
||||
self.chris = self.register_user("chris", "pass")
|
||||
self.chris_token = self.login("chris", "pass")
|
||||
|
||||
# Create a room on this homeserver.
|
||||
# Note that this counts as a
|
||||
self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
|
||||
self.intially_unjoined_room_id = "!example:otherhs"
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||
def test_local_users_joining_on_another_worker_contribute_to_rate_limit(
|
||||
self,
|
||||
) -> None:
|
||||
# The rate limiter has accumulated one token from Alice's join after the create
|
||||
# event.
|
||||
self.replicate()
|
||||
|
||||
# Spawn another worker and have bob join via it.
|
||||
worker_app = self.make_worker_hs(
|
||||
"synapse.app.generic_worker", extra_config={"worker_name": "other worker"}
|
||||
)
|
||||
worker_site = self._hs_to_site[worker_app]
|
||||
channel = make_request(
|
||||
self.reactor,
|
||||
worker_site,
|
||||
"POST",
|
||||
f"/_matrix/client/v3/rooms/{self.room_id}/join",
|
||||
access_token=self.bob_token,
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
|
||||
|
||||
# wait for join to arrive over replication
|
||||
self.replicate()
|
||||
|
||||
# Try to join as Chris on the worker. Should get denied because Alice
|
||||
# and Bob have both joined the room.
|
||||
self.get_failure(
|
||||
worker_app.get_room_member_handler().update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
)
|
||||
|
||||
# Try to join as Chris on the original worker. Should get denied because Alice
|
||||
# and Bob have both joined the room.
|
||||
self.get_failure(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
)
|
|
@ -710,7 +710,7 @@ class RoomsCreateTestCase(RoomBase):
|
|||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(43, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(44, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_initial_state(self) -> None:
|
||||
# POST with initial_state config key, expect new room id
|
||||
|
@ -723,7 +723,7 @@ class RoomsCreateTestCase(RoomBase):
|
|||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(49, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(50, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_visibility_key(self) -> None:
|
||||
# POST with visibility config key, expect new room id
|
||||
|
|
|
@ -231,7 +231,7 @@ class OptionsResourceTests(unittest.TestCase):
|
|||
parse_listener_def({"type": "http", "port": 0}),
|
||||
self.resource,
|
||||
"1.0",
|
||||
max_request_body_size=1234,
|
||||
max_request_body_size=4096,
|
||||
reactor=self.reactor,
|
||||
)
|
||||
|
||||
|
|
|
@ -284,7 +284,7 @@ class HomeserverTestCase(TestCase):
|
|||
config=self.hs.config.server.listeners[0],
|
||||
resource=self.resource,
|
||||
server_version_string="1",
|
||||
max_request_body_size=1234,
|
||||
max_request_body_size=4096,
|
||||
reactor=self.reactor,
|
||||
)
|
||||
|
||||
|
@ -773,7 +773,7 @@ class FederatingHomeserverTestCase(HomeserverTestCase):
|
|||
verify_key_id,
|
||||
FetchKeyResult(
|
||||
verify_key=verify_key,
|
||||
valid_until_ts=clock.time_msec() + 1000,
|
||||
valid_until_ts=clock.time_msec() + 10000,
|
||||
),
|
||||
)
|
||||
],
|
||||
|
|
|
@ -167,6 +167,7 @@ def default_config(
|
|||
"local": {"per_second": 10000, "burst_count": 10000},
|
||||
"remote": {"per_second": 10000, "burst_count": 10000},
|
||||
},
|
||||
"rc_joins_per_room": {"per_second": 10000, "burst_count": 10000},
|
||||
"rc_invites": {
|
||||
"per_room": {"per_second": 10000, "burst_count": 10000},
|
||||
"per_user": {"per_second": 10000, "burst_count": 10000},
|
||||
|
|
Loading…
Reference in New Issue