Allow bigger responses to `/federation/v1/state` (#12877)
* Refactor HTTP response size limits Rather than passing a separate `max_response_size` down the stack, make it an attribute of the `parser`. * Allow bigger responses on `federation/v1/state` `/state` can return huge responses, so we need to handle that.
This commit is contained in:
parent
4660d9fdcf
commit
1b338476af
|
@ -0,0 +1 @@
|
|||
Fix a bug introduced in Synapse 1.54 which could sometimes cause exceptions when handling federated traffic.
|
|
@ -49,11 +49,6 @@ from synapse.types import JsonDict
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Send join responses can be huge, so we set a separate limit here. The response
|
||||
# is parsed in a streaming manner, which helps alleviate the issue of memory
|
||||
# usage a bit.
|
||||
MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024
|
||||
|
||||
|
||||
class TransportLayerClient:
|
||||
"""Sends federation HTTP requests to other servers"""
|
||||
|
@ -349,7 +344,6 @@ class TransportLayerClient:
|
|||
path=path,
|
||||
data=content,
|
||||
parser=SendJoinParser(room_version, v1_api=True),
|
||||
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
|
||||
)
|
||||
|
||||
async def send_join_v2(
|
||||
|
@ -372,7 +366,6 @@ class TransportLayerClient:
|
|||
args=query_params,
|
||||
data=content,
|
||||
parser=SendJoinParser(room_version, v1_api=False),
|
||||
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
|
||||
)
|
||||
|
||||
async def send_leave_v1(
|
||||
|
@ -1360,6 +1353,11 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
|
|||
|
||||
CONTENT_TYPE = "application/json"
|
||||
|
||||
# /send_join responses can be huge, so we override the size limit here. The response
|
||||
# is parsed in a streaming manner, which helps alleviate the issue of memory
|
||||
# usage a bit.
|
||||
MAX_RESPONSE_SIZE = 500 * 1024 * 1024
|
||||
|
||||
def __init__(self, room_version: RoomVersion, v1_api: bool):
|
||||
self._response = SendJoinResponse([], [], event_dict={})
|
||||
self._room_version = room_version
|
||||
|
@ -1427,6 +1425,9 @@ class _StateParser(ByteParser[StateRequestResponse]):
|
|||
|
||||
CONTENT_TYPE = "application/json"
|
||||
|
||||
# As with /send_join, /state responses can be huge.
|
||||
MAX_RESPONSE_SIZE = 500 * 1024 * 1024
|
||||
|
||||
def __init__(self, room_version: RoomVersion):
|
||||
self._response = StateRequestResponse([], [])
|
||||
self._room_version = room_version
|
||||
|
|
|
@ -92,9 +92,6 @@ incoming_responses_counter = Counter(
|
|||
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
|
||||
)
|
||||
|
||||
# a federation response can be rather large (eg a big state_ids is 50M or so), so we
|
||||
# need a generous limit here.
|
||||
MAX_RESPONSE_SIZE = 100 * 1024 * 1024
|
||||
|
||||
MAX_LONG_RETRIES = 10
|
||||
MAX_SHORT_RETRIES = 3
|
||||
|
@ -116,6 +113,11 @@ class ByteParser(ByteWriteable, Generic[T], abc.ABC):
|
|||
the content type doesn't match we fail the request.
|
||||
"""
|
||||
|
||||
# a federation response can be rather large (eg a big state_ids is 50M or so), so we
|
||||
# need a generous limit here.
|
||||
MAX_RESPONSE_SIZE: int = 100 * 1024 * 1024
|
||||
"""The largest response this parser will accept."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def finish(self) -> T:
|
||||
"""Called when response has finished streaming and the parser should
|
||||
|
@ -203,7 +205,6 @@ async def _handle_response(
|
|||
response: IResponse,
|
||||
start_ms: int,
|
||||
parser: ByteParser[T],
|
||||
max_response_size: Optional[int] = None,
|
||||
) -> T:
|
||||
"""
|
||||
Reads the body of a response with a timeout and sends it to a parser
|
||||
|
@ -215,15 +216,12 @@ async def _handle_response(
|
|||
response: response to the request
|
||||
start_ms: Timestamp when request was made
|
||||
parser: The parser for the response
|
||||
max_response_size: The maximum size to read from the response, if None
|
||||
uses the default.
|
||||
|
||||
Returns:
|
||||
The parsed response
|
||||
"""
|
||||
|
||||
if max_response_size is None:
|
||||
max_response_size = MAX_RESPONSE_SIZE
|
||||
max_response_size = parser.MAX_RESPONSE_SIZE
|
||||
|
||||
try:
|
||||
check_content_type_is(response.headers, parser.CONTENT_TYPE)
|
||||
|
@ -240,7 +238,7 @@ async def _handle_response(
|
|||
"{%s} [%s] JSON response exceeded max size %i - %s %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
MAX_RESPONSE_SIZE,
|
||||
max_response_size,
|
||||
request.method,
|
||||
request.uri.decode("ascii"),
|
||||
)
|
||||
|
@ -772,7 +770,6 @@ class MatrixFederationHttpClient:
|
|||
backoff_on_404: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
parser: Literal[None] = None,
|
||||
max_response_size: Optional[int] = None,
|
||||
) -> Union[JsonDict, list]:
|
||||
...
|
||||
|
||||
|
@ -790,7 +787,6 @@ class MatrixFederationHttpClient:
|
|||
backoff_on_404: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
parser: Optional[ByteParser[T]] = None,
|
||||
max_response_size: Optional[int] = None,
|
||||
) -> T:
|
||||
...
|
||||
|
||||
|
@ -807,7 +803,6 @@ class MatrixFederationHttpClient:
|
|||
backoff_on_404: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
parser: Optional[ByteParser] = None,
|
||||
max_response_size: Optional[int] = None,
|
||||
):
|
||||
"""Sends the specified json data using PUT
|
||||
|
||||
|
@ -843,8 +838,6 @@ class MatrixFederationHttpClient:
|
|||
enabled.
|
||||
parser: The parser to use to decode the response. Defaults to
|
||||
parsing as JSON.
|
||||
max_response_size: The maximum size to read from the response, if None
|
||||
uses the default.
|
||||
|
||||
Returns:
|
||||
Succeeds when we get a 2xx HTTP response. The
|
||||
|
@ -895,7 +888,6 @@ class MatrixFederationHttpClient:
|
|||
response,
|
||||
start_ms,
|
||||
parser=parser,
|
||||
max_response_size=max_response_size,
|
||||
)
|
||||
|
||||
return body
|
||||
|
@ -984,7 +976,6 @@ class MatrixFederationHttpClient:
|
|||
ignore_backoff: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
parser: Literal[None] = None,
|
||||
max_response_size: Optional[int] = None,
|
||||
) -> Union[JsonDict, list]:
|
||||
...
|
||||
|
||||
|
@ -999,7 +990,6 @@ class MatrixFederationHttpClient:
|
|||
ignore_backoff: bool = ...,
|
||||
try_trailing_slash_on_400: bool = ...,
|
||||
parser: ByteParser[T] = ...,
|
||||
max_response_size: Optional[int] = ...,
|
||||
) -> T:
|
||||
...
|
||||
|
||||
|
@ -1013,7 +1003,6 @@ class MatrixFederationHttpClient:
|
|||
ignore_backoff: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
parser: Optional[ByteParser] = None,
|
||||
max_response_size: Optional[int] = None,
|
||||
):
|
||||
"""GETs some json from the given host homeserver and path
|
||||
|
||||
|
@ -1043,9 +1032,6 @@ class MatrixFederationHttpClient:
|
|||
parser: The parser to use to decode the response. Defaults to
|
||||
parsing as JSON.
|
||||
|
||||
max_response_size: The maximum size to read from the response. If None,
|
||||
uses the default.
|
||||
|
||||
Returns:
|
||||
Succeeds when we get a 2xx HTTP response. The
|
||||
result will be the decoded JSON body.
|
||||
|
@ -1090,7 +1076,6 @@ class MatrixFederationHttpClient:
|
|||
response,
|
||||
start_ms,
|
||||
parser=parser,
|
||||
max_response_size=max_response_size,
|
||||
)
|
||||
|
||||
return body
|
||||
|
|
|
@ -26,7 +26,7 @@ from twisted.web.http import HTTPChannel
|
|||
|
||||
from synapse.api.errors import RequestSendFailed
|
||||
from synapse.http.matrixfederationclient import (
|
||||
MAX_RESPONSE_SIZE,
|
||||
JsonParser,
|
||||
MatrixFederationHttpClient,
|
||||
MatrixFederationRequest,
|
||||
)
|
||||
|
@ -609,9 +609,9 @@ class FederationClientTests(HomeserverTestCase):
|
|||
while not test_d.called:
|
||||
protocol.dataReceived(b"a" * chunk_size)
|
||||
sent += chunk_size
|
||||
self.assertLessEqual(sent, MAX_RESPONSE_SIZE)
|
||||
self.assertLessEqual(sent, JsonParser.MAX_RESPONSE_SIZE)
|
||||
|
||||
self.assertEqual(sent, MAX_RESPONSE_SIZE)
|
||||
self.assertEqual(sent, JsonParser.MAX_RESPONSE_SIZE)
|
||||
|
||||
f = self.failureResultOf(test_d)
|
||||
self.assertIsInstance(f.value, RequestSendFailed)
|
||||
|
|
Loading…
Reference in New Issue