Allow for the configuration of max request retries and min/max retry delays in the matrix federation client (#15783)
This commit is contained in:
parent
1fcefd8f3e
commit
496f73103d
|
@ -0,0 +1 @@
|
||||||
|
Allow for the configuration of max request retries and min/max retry delays in the matrix federation client.
|
|
@ -1196,6 +1196,32 @@ Example configuration:
|
||||||
allow_device_name_lookup_over_federation: true
|
allow_device_name_lookup_over_federation: true
|
||||||
```
|
```
|
||||||
---
|
---
|
||||||
|
### `federation`
|
||||||
|
|
||||||
|
The federation section defines some sub-options related to federation.
|
||||||
|
|
||||||
|
The following options are related to configuring timeout and retry logic for one request,
|
||||||
|
independently of the others.
|
||||||
|
Short retry algorithm is used when something or someone will wait for the request to have an
|
||||||
|
answer, while long retry is used for requests that happen in the background,
|
||||||
|
like sending a federation transaction.
|
||||||
|
|
||||||
|
* `client_timeout`: timeout for the federation requests. Default to 60s.
|
||||||
|
* `max_short_retry_delay`: maximum delay to be used for the short retry algo. Default to 2s.
|
||||||
|
* `max_long_retry_delay`: maximum delay to be used for the short retry algo. Default to 60s.
|
||||||
|
* `max_short_retries`: maximum number of retries for the short retry algo. Default to 3 attempts.
|
||||||
|
* `max_long_retries`: maximum number of retries for the long retry algo. Default to 10 attempts.
|
||||||
|
|
||||||
|
Example configuration:
|
||||||
|
```yaml
|
||||||
|
federation:
|
||||||
|
client_timeout: 180s
|
||||||
|
max_short_retry_delay: 7s
|
||||||
|
max_long_retry_delay: 100s
|
||||||
|
max_short_retries: 5
|
||||||
|
max_long_retries: 20
|
||||||
|
```
|
||||||
|
---
|
||||||
## Caching
|
## Caching
|
||||||
|
|
||||||
Options related to caching.
|
Options related to caching.
|
||||||
|
|
|
@ -22,6 +22,8 @@ class FederationConfig(Config):
|
||||||
section = "federation"
|
section = "federation"
|
||||||
|
|
||||||
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
|
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
|
||||||
|
federation_config = config.setdefault("federation", {})
|
||||||
|
|
||||||
# FIXME: federation_domain_whitelist needs sytests
|
# FIXME: federation_domain_whitelist needs sytests
|
||||||
self.federation_domain_whitelist: Optional[dict] = None
|
self.federation_domain_whitelist: Optional[dict] = None
|
||||||
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
|
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
|
||||||
|
@ -49,5 +51,19 @@ class FederationConfig(Config):
|
||||||
"allow_device_name_lookup_over_federation", False
|
"allow_device_name_lookup_over_federation", False
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Allow for the configuration of timeout, max request retries
|
||||||
|
# and min/max retry delays in the matrix federation client.
|
||||||
|
self.client_timeout_ms = Config.parse_duration(
|
||||||
|
federation_config.get("client_timeout", "60s")
|
||||||
|
)
|
||||||
|
self.max_long_retry_delay_ms = Config.parse_duration(
|
||||||
|
federation_config.get("max_long_retry_delay", "60s")
|
||||||
|
)
|
||||||
|
self.max_short_retry_delay_ms = Config.parse_duration(
|
||||||
|
federation_config.get("max_short_retry_delay", "2s")
|
||||||
|
)
|
||||||
|
self.max_long_retries = federation_config.get("max_long_retries", 10)
|
||||||
|
self.max_short_retries = federation_config.get("max_short_retries", 3)
|
||||||
|
|
||||||
|
|
||||||
_METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}}
|
_METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}}
|
||||||
|
|
|
@ -95,8 +95,6 @@ incoming_responses_counter = Counter(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
MAX_LONG_RETRIES = 10
|
|
||||||
MAX_SHORT_RETRIES = 3
|
|
||||||
MAXINT = sys.maxsize
|
MAXINT = sys.maxsize
|
||||||
|
|
||||||
|
|
||||||
|
@ -413,7 +411,16 @@ class MatrixFederationHttpClient:
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self._store = hs.get_datastores().main
|
self._store = hs.get_datastores().main
|
||||||
self.version_string_bytes = hs.version_string.encode("ascii")
|
self.version_string_bytes = hs.version_string.encode("ascii")
|
||||||
self.default_timeout = 60
|
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
|
||||||
|
|
||||||
|
self.max_long_retry_delay_seconds = (
|
||||||
|
hs.config.federation.max_long_retry_delay_ms / 1000
|
||||||
|
)
|
||||||
|
self.max_short_retry_delay_seconds = (
|
||||||
|
hs.config.federation.max_short_retry_delay_ms / 1000
|
||||||
|
)
|
||||||
|
self.max_long_retries = hs.config.federation.max_long_retries
|
||||||
|
self.max_short_retries = hs.config.federation.max_short_retries
|
||||||
|
|
||||||
self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
|
self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
|
||||||
|
|
||||||
|
@ -542,10 +549,10 @@ class MatrixFederationHttpClient:
|
||||||
logger.exception(f"Invalid destination: {request.destination}.")
|
logger.exception(f"Invalid destination: {request.destination}.")
|
||||||
raise FederationDeniedError(request.destination)
|
raise FederationDeniedError(request.destination)
|
||||||
|
|
||||||
if timeout:
|
if timeout is not None:
|
||||||
_sec_timeout = timeout / 1000
|
_sec_timeout = timeout / 1000
|
||||||
else:
|
else:
|
||||||
_sec_timeout = self.default_timeout
|
_sec_timeout = self.default_timeout_seconds
|
||||||
|
|
||||||
if (
|
if (
|
||||||
self.hs.config.federation.federation_domain_whitelist is not None
|
self.hs.config.federation.federation_domain_whitelist is not None
|
||||||
|
@ -590,9 +597,9 @@ class MatrixFederationHttpClient:
|
||||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||||
# (once we have reliable transactions in place)
|
# (once we have reliable transactions in place)
|
||||||
if long_retries:
|
if long_retries:
|
||||||
retries_left = MAX_LONG_RETRIES
|
retries_left = self.max_long_retries
|
||||||
else:
|
else:
|
||||||
retries_left = MAX_SHORT_RETRIES
|
retries_left = self.max_short_retries
|
||||||
|
|
||||||
url_bytes = request.uri
|
url_bytes = request.uri
|
||||||
url_str = url_bytes.decode("ascii")
|
url_str = url_bytes.decode("ascii")
|
||||||
|
@ -737,24 +744,34 @@ class MatrixFederationHttpClient:
|
||||||
|
|
||||||
if retries_left and not timeout:
|
if retries_left and not timeout:
|
||||||
if long_retries:
|
if long_retries:
|
||||||
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
|
delay_seconds = 4 ** (
|
||||||
delay = min(delay, 60)
|
self.max_long_retries + 1 - retries_left
|
||||||
delay *= random.uniform(0.8, 1.4)
|
)
|
||||||
|
delay_seconds = min(
|
||||||
|
delay_seconds, self.max_long_retry_delay_seconds
|
||||||
|
)
|
||||||
|
delay_seconds *= random.uniform(0.8, 1.4)
|
||||||
else:
|
else:
|
||||||
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
|
delay_seconds = 0.5 * 2 ** (
|
||||||
delay = min(delay, 2)
|
self.max_short_retries - retries_left
|
||||||
delay *= random.uniform(0.8, 1.4)
|
)
|
||||||
|
delay_seconds = min(
|
||||||
|
delay_seconds, self.max_short_retry_delay_seconds
|
||||||
|
)
|
||||||
|
delay_seconds *= random.uniform(0.8, 1.4)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"{%s} [%s] Waiting %ss before re-sending...",
|
"{%s} [%s] Waiting %ss before re-sending...",
|
||||||
request.txn_id,
|
request.txn_id,
|
||||||
request.destination,
|
request.destination,
|
||||||
delay,
|
delay_seconds,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Sleep for the calculated delay, or wake up immediately
|
# Sleep for the calculated delay, or wake up immediately
|
||||||
# if we get notified that the server is back up.
|
# if we get notified that the server is back up.
|
||||||
await self._sleeper.sleep(request.destination, delay * 1000)
|
await self._sleeper.sleep(
|
||||||
|
request.destination, delay_seconds * 1000
|
||||||
|
)
|
||||||
retries_left -= 1
|
retries_left -= 1
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
@ -953,7 +970,7 @@ class MatrixFederationHttpClient:
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
_sec_timeout = timeout / 1000
|
_sec_timeout = timeout / 1000
|
||||||
else:
|
else:
|
||||||
_sec_timeout = self.default_timeout
|
_sec_timeout = self.default_timeout_seconds
|
||||||
|
|
||||||
if parser is None:
|
if parser is None:
|
||||||
parser = cast(ByteParser[T], JsonParser())
|
parser = cast(ByteParser[T], JsonParser())
|
||||||
|
@ -1031,10 +1048,10 @@ class MatrixFederationHttpClient:
|
||||||
ignore_backoff=ignore_backoff,
|
ignore_backoff=ignore_backoff,
|
||||||
)
|
)
|
||||||
|
|
||||||
if timeout:
|
if timeout is not None:
|
||||||
_sec_timeout = timeout / 1000
|
_sec_timeout = timeout / 1000
|
||||||
else:
|
else:
|
||||||
_sec_timeout = self.default_timeout
|
_sec_timeout = self.default_timeout_seconds
|
||||||
|
|
||||||
body = await _handle_response(
|
body = await _handle_response(
|
||||||
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
|
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
|
||||||
|
@ -1142,7 +1159,7 @@ class MatrixFederationHttpClient:
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
_sec_timeout = timeout / 1000
|
_sec_timeout = timeout / 1000
|
||||||
else:
|
else:
|
||||||
_sec_timeout = self.default_timeout
|
_sec_timeout = self.default_timeout_seconds
|
||||||
|
|
||||||
if parser is None:
|
if parser is None:
|
||||||
parser = cast(ByteParser[T], JsonParser())
|
parser = cast(ByteParser[T], JsonParser())
|
||||||
|
@ -1218,7 +1235,7 @@ class MatrixFederationHttpClient:
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
_sec_timeout = timeout / 1000
|
_sec_timeout = timeout / 1000
|
||||||
else:
|
else:
|
||||||
_sec_timeout = self.default_timeout
|
_sec_timeout = self.default_timeout_seconds
|
||||||
|
|
||||||
body = await _handle_response(
|
body = await _handle_response(
|
||||||
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
|
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
|
||||||
|
@ -1270,7 +1287,7 @@ class MatrixFederationHttpClient:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
d = read_body_with_max_size(response, output_stream, max_size)
|
d = read_body_with_max_size(response, output_stream, max_size)
|
||||||
d.addTimeout(self.default_timeout, self.reactor)
|
d.addTimeout(self.default_timeout_seconds, self.reactor)
|
||||||
length = await make_deferred_yieldable(d)
|
length = await make_deferred_yieldable(d)
|
||||||
except BodyExceededMaxSize:
|
except BodyExceededMaxSize:
|
||||||
msg = "Requested file is too large > %r bytes" % (max_size,)
|
msg = "Requested file is too large > %r bytes" % (max_size,)
|
||||||
|
|
|
@ -40,7 +40,7 @@ from synapse.server import HomeServer
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.server import FakeTransport
|
from tests.server import FakeTransport
|
||||||
from tests.unittest import HomeserverTestCase
|
from tests.unittest import HomeserverTestCase, override_config
|
||||||
|
|
||||||
|
|
||||||
def check_logcontext(context: LoggingContextOrSentinel) -> None:
|
def check_logcontext(context: LoggingContextOrSentinel) -> None:
|
||||||
|
@ -640,3 +640,21 @@ class FederationClientTests(HomeserverTestCase):
|
||||||
self.cl.build_auth_headers(
|
self.cl.build_auth_headers(
|
||||||
b"", b"GET", b"https://example.com", destination_is=b""
|
b"", b"GET", b"https://example.com", destination_is=b""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{
|
||||||
|
"federation": {
|
||||||
|
"client_timeout": "180s",
|
||||||
|
"max_long_retry_delay": "100s",
|
||||||
|
"max_short_retry_delay": "7s",
|
||||||
|
"max_long_retries": 20,
|
||||||
|
"max_short_retries": 5,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
def test_configurable_retry_and_delay_values(self) -> None:
|
||||||
|
self.assertEqual(self.cl.default_timeout_seconds, 180)
|
||||||
|
self.assertEqual(self.cl.max_long_retry_delay_seconds, 100)
|
||||||
|
self.assertEqual(self.cl.max_short_retry_delay_seconds, 7)
|
||||||
|
self.assertEqual(self.cl.max_long_retries, 20)
|
||||||
|
self.assertEqual(self.cl.max_short_retries, 5)
|
||||||
|
|
Loading…
Reference in New Issue