diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 17c7e64ce7..862c07ef8c 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -22,6 +22,12 @@ class RatelimitConfig(Config): self.rc_messages_per_second = args.rc_messages_per_second self.rc_message_burst_count = args.rc_message_burst_count + self.federation_rc_window_size = args.federation_rc_window_size + self.federation_rc_sleep_limit = args.federation_rc_sleep_limit + self.federation_rc_sleep_delay = args.federation_rc_sleep_delay + self.federation_rc_reject_limit = args.federation_rc_reject_limit + self.federation_rc_concurrent = args.federation_rc_concurrent + @classmethod def add_arguments(cls, parser): super(RatelimitConfig, cls).add_arguments(parser) @@ -34,3 +40,33 @@ class RatelimitConfig(Config): "--rc-message-burst-count", type=float, default=10, help="number of message a client can send before being throttled" ) + + rc_group.add_argument( + "--federation-rc-window-size", type=int, default=10000, + help="The federation window size in milliseconds", + ) + + rc_group.add_argument( + "--federation-rc-sleep-limit", type=int, default=10, + help="The number of federation requests from a single server" + " in a window before the server will delay processing the" + " request.", + ) + + rc_group.add_argument( + "--federation-rc-sleep-delay", type=int, default=500, + help="The duration in milliseconds to delay processing events from" + " remote servers by if they go over the sleep limit.", + ) + + rc_group.add_argument( + "--federation-rc-reject-limit", type=int, default=50, + help="The maximum number of concurrent federation requests allowed" + " from a single server", + ) + + rc_group.add_argument( + "--federation-rc-concurrent", type=int, default=3, + help="The number of federation requests to concurrently process" + " from a single server", + ) diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index 6800ac46c5..2a671b9aec 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -24,6 +24,8 @@ communicate over a different (albeit still reliable) protocol. from .server import TransportLayerServer from .client import TransportLayerClient +from synapse.util.ratelimitutils import FederationRateLimiter + class TransportLayer(TransportLayerServer, TransportLayerClient): """This is a basic implementation of the transport layer that translates @@ -55,8 +57,18 @@ class TransportLayer(TransportLayerServer, TransportLayerClient): send requests """ self.keyring = homeserver.get_keyring() + self.clock = homeserver.get_clock() self.server_name = server_name self.server = server self.client = client self.request_handler = None self.received_handler = None + + self.ratelimiter = FederationRateLimiter( + self.clock, + window_size=homeserver.config.federation_rc_window_size, + sleep_limit=homeserver.config.federation_rc_sleep_limit, + sleep_msec=homeserver.config.federation_rc_sleep_delay, + reject_limit=homeserver.config.federation_rc_reject_limit, + concurrent_requests=homeserver.config.federation_rc_concurrent, + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2ffb37aa18..fce9c0195e 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -98,15 +98,23 @@ class TransportLayerServer(object): def new_handler(request, *args, **kwargs): try: (origin, content) = yield self._authenticate_request(request) - response = yield handler( - origin, content, request.args, *args, **kwargs - ) + with self.ratelimiter.ratelimit(origin) as d: + yield d + response = yield handler( + origin, content, request.args, *args, **kwargs + ) except: logger.exception("_authenticate_request failed") raise defer.returnValue(response) return new_handler + def rate_limit_origin(self, handler): + def new_handler(origin, *args, **kwargs): + response = yield handler(origin, *args, **kwargs) + defer.returnValue(response) + return new_handler() + @log_function def register_received_handler(self, handler): """ Register a handler that will be fired when we receive data. diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py new file mode 100644 index 0000000000..d4457af950 --- /dev/null +++ b/synapse/util/ratelimitutils.py @@ -0,0 +1,216 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import LimitExceededError + +from synapse.util.async import sleep + +import collections +import contextlib +import logging + + +logger = logging.getLogger(__name__) + + +class FederationRateLimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + """ + Args: + clock (Clock) + window_size (int): The window size in milliseconds. + sleep_limit (int): The number of requests received in the last + `window_size` milliseconds before we artificially start + delaying processing of requests. + sleep_msec (int): The number of milliseconds to delay processing + of incoming requests by. + reject_limit (int): The maximum number of requests that are can be + queued for processing before we start rejecting requests with + a 429 Too Many Requests response. + concurrent_requests (int): The number of concurrent requests to + process. + """ + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.ratelimiters = {} + + def ratelimit(self, host): + """Used to ratelimit an incoming request from given host + + Example usage: + + with rate_limiter.ratelimit(origin) as wait_deferred: + yield wait_deferred + # Handle request ... + + Args: + host (str): Origin of incoming request. + + Returns: + _PerHostRatelimiter + """ + return self.ratelimiters.setdefault( + host, + _PerHostRatelimiter( + clock=self.clock, + window_size=self.window_size, + sleep_limit=self.sleep_limit, + sleep_msec=self.sleep_msec, + reject_limit=self.reject_limit, + concurrent_requests=self.concurrent_requests, + ) + ).ratelimit() + + +class _PerHostRatelimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.sleeping_requests = set() + self.ready_request_queue = collections.OrderedDict() + self.current_processing = set() + self.request_times = [] + + def is_empty(self): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + return not ( + self.ready_request_queue + or self.sleeping_requests + or self.current_processing + or self.request_times + ) + + @contextlib.contextmanager + def ratelimit(self): + # `contextlib.contextmanager` takes a generator and turns it into a + # context manager. The generator should only yield once with a value + # to be returned by manager. + # Exceptions will be reraised at the yield. + + request_id = object() + ret = self._on_enter(request_id) + try: + yield ret + finally: + self._on_exit(request_id) + + def _on_enter(self, request_id): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + if queue_size > self.reject_limit: + raise LimitExceededError( + retry_after_ms=int( + self.window_size / self.sleep_limit + ), + ) + + self.request_times.append(time_now) + + def queue_request(): + if len(self.current_processing) > self.concurrent_requests: + logger.debug("Ratelimit [%s]: Queue req", id(request_id)) + queue_defer = defer.Deferred() + self.ready_request_queue[request_id] = queue_defer + return queue_defer + else: + return defer.succeed(None) + + logger.debug( + "Ratelimit [%s]: len(self.request_times)=%d", + id(request_id), len(self.request_times), + ) + + if len(self.request_times) > self.sleep_limit: + logger.debug( + "Ratelimit [%s]: sleeping req", + id(request_id), + ) + ret_defer = sleep(self.sleep_msec/1000.0) + + self.sleeping_requests.add(request_id) + + def on_wait_finished(_): + logger.debug( + "Ratelimit [%s]: Finished sleeping", + id(request_id), + ) + self.sleeping_requests.discard(request_id) + queue_defer = queue_request() + return queue_defer + + ret_defer.addBoth(on_wait_finished) + else: + ret_defer = queue_request() + + def on_start(r): + logger.debug( + "Ratelimit [%s]: Processing req", + id(request_id), + ) + self.current_processing.add(request_id) + return r + + def on_err(r): + self.current_processing.discard(request_id) + return r + + def on_both(r): + # Ensure that we've properly cleaned up. + self.sleeping_requests.discard(request_id) + self.ready_request_queue.pop(request_id, None) + return r + + ret_defer.addCallbacks(on_start, on_err) + ret_defer.addBoth(on_both) + return ret_defer + + def _on_exit(self, request_id): + logger.debug( + "Ratelimit [%s]: Processed req", + id(request_id), + ) + self.current_processing.discard(request_id) + try: + request_id, deferred = self.ready_request_queue.popitem() + self.current_processing.add(request_id) + deferred.callback(None) + except KeyError: + pass