Refactor request sending to have better excpetions (#4358)
* Correctly retry and back off if we get a HTTPerror response * Refactor request sending to have better excpetions MatrixFederationHttpClient blindly reraised exceptions to the caller without differentiating "expected" failures (e.g. connection timeouts etc) versus more severe problems (e.g. programming errors). This commit adds a RequestSendFailed exception that is raised when "expected" failures happen, allowing the TransactionQueue to log them as warnings while allowing us to log other exceptions as actual exceptions.
This commit is contained in:
parent
1dcb086f33
commit
b970cb0e96
|
@ -0,0 +1 @@
|
||||||
|
Add better logging for unexpected errors while sending transactions
|
|
@ -348,6 +348,24 @@ class IncompatibleRoomVersionError(SynapseError):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class RequestSendFailed(RuntimeError):
|
||||||
|
"""Sending a HTTP request over federation failed due to not being able to
|
||||||
|
talk to the remote server for some reason.
|
||||||
|
|
||||||
|
This exception is used to differentiate "expected" errors that arise due to
|
||||||
|
networking (e.g. DNS failures, connection timeouts etc), versus unexpected
|
||||||
|
errors (like programming errors).
|
||||||
|
"""
|
||||||
|
def __init__(self, inner_exception, can_retry):
|
||||||
|
super(RequestSendFailed, self).__init__(
|
||||||
|
"Failed to send request: %s: %s" % (
|
||||||
|
type(inner_exception).__name__, inner_exception,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.inner_exception = inner_exception
|
||||||
|
self.can_retry = can_retry
|
||||||
|
|
||||||
|
|
||||||
def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
|
def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
|
||||||
""" Utility method for constructing an error response for client-server
|
""" Utility method for constructing an error response for client-server
|
||||||
interactions.
|
interactions.
|
||||||
|
|
|
@ -22,7 +22,11 @@ from prometheus_client import Counter
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.api.errors import FederationDeniedError, HttpResponseException
|
from synapse.api.errors import (
|
||||||
|
FederationDeniedError,
|
||||||
|
HttpResponseException,
|
||||||
|
RequestSendFailed,
|
||||||
|
)
|
||||||
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||||
from synapse.metrics import (
|
from synapse.metrics import (
|
||||||
LaterGauge,
|
LaterGauge,
|
||||||
|
@ -518,11 +522,16 @@ class TransactionQueue(object):
|
||||||
)
|
)
|
||||||
except FederationDeniedError as e:
|
except FederationDeniedError as e:
|
||||||
logger.info(e)
|
logger.info(e)
|
||||||
except Exception as e:
|
except RequestSendFailed as e:
|
||||||
logger.warn(
|
logger.warning("(TX [%s] Failed to send transaction: %s", destination, e)
|
||||||
"TX [%s] Failed to send transaction: %s",
|
|
||||||
|
for p, _ in pending_pdus:
|
||||||
|
logger.info("Failed to send event %s to %s", p.event_id,
|
||||||
|
destination)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"TX [%s] Failed to send transaction",
|
||||||
destination,
|
destination,
|
||||||
e,
|
|
||||||
)
|
)
|
||||||
for p, _ in pending_pdus:
|
for p, _ in pending_pdus:
|
||||||
logger.info("Failed to send event %s to %s", p.event_id,
|
logger.info("Failed to send event %s to %s", p.event_id,
|
||||||
|
|
|
@ -19,7 +19,7 @@ import random
|
||||||
import sys
|
import sys
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
from six import PY3, string_types
|
from six import PY3, raise_from, string_types
|
||||||
from six.moves import urllib
|
from six.moves import urllib
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -41,6 +41,7 @@ from synapse.api.errors import (
|
||||||
Codes,
|
Codes,
|
||||||
FederationDeniedError,
|
FederationDeniedError,
|
||||||
HttpResponseException,
|
HttpResponseException,
|
||||||
|
RequestSendFailed,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
from synapse.http.endpoint import matrix_federation_endpoint
|
from synapse.http.endpoint import matrix_federation_endpoint
|
||||||
|
@ -231,7 +232,7 @@ class MatrixFederationHttpClient(object):
|
||||||
Deferred: resolves with the http response object on success.
|
Deferred: resolves with the http response object on success.
|
||||||
|
|
||||||
Fails with ``HttpResponseException``: if we get an HTTP response
|
Fails with ``HttpResponseException``: if we get an HTTP response
|
||||||
code >= 300.
|
code >= 300 (except 429).
|
||||||
|
|
||||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||||
to retry this server.
|
to retry this server.
|
||||||
|
@ -239,8 +240,8 @@ class MatrixFederationHttpClient(object):
|
||||||
Fails with ``FederationDeniedError`` if this destination
|
Fails with ``FederationDeniedError`` if this destination
|
||||||
is not on our federation whitelist
|
is not on our federation whitelist
|
||||||
|
|
||||||
(May also fail with plenty of other Exceptions for things like DNS
|
Fails with ``RequestSendFailed`` if there were problems connecting to
|
||||||
failures, connection failures, SSL failures.)
|
the remote, due to e.g. DNS failures, connection timeouts etc.
|
||||||
"""
|
"""
|
||||||
if timeout:
|
if timeout:
|
||||||
_sec_timeout = timeout / 1000
|
_sec_timeout = timeout / 1000
|
||||||
|
@ -335,23 +336,74 @@ class MatrixFederationHttpClient(object):
|
||||||
reactor=self.hs.get_reactor(),
|
reactor=self.hs.get_reactor(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
with Measure(self.clock, "outbound_request"):
|
with Measure(self.clock, "outbound_request"):
|
||||||
response = yield make_deferred_yieldable(
|
response = yield make_deferred_yieldable(
|
||||||
request_deferred,
|
request_deferred,
|
||||||
)
|
)
|
||||||
|
except DNSLookupError as e:
|
||||||
break
|
raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
raise_from(RequestSendFailed(e, can_retry=True), e)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"{%s} [%s] Got response headers: %d %s",
|
||||||
|
request.txn_id,
|
||||||
|
request.destination,
|
||||||
|
response.code,
|
||||||
|
response.phrase.decode('ascii', errors='replace'),
|
||||||
|
)
|
||||||
|
|
||||||
|
if 200 <= response.code < 300:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# :'(
|
||||||
|
# Update transactions table?
|
||||||
|
d = treq.content(response)
|
||||||
|
d = timeout_deferred(
|
||||||
|
d,
|
||||||
|
timeout=_sec_timeout,
|
||||||
|
reactor=self.hs.get_reactor(),
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
body = yield make_deferred_yieldable(d)
|
||||||
|
except Exception as e:
|
||||||
|
# Eh, we're already going to raise an exception so lets
|
||||||
|
# ignore if this fails.
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"{%s} [%s] Request failed: %s %s: %s",
|
"{%s} [%s] Failed to get error response: %s %s: %s",
|
||||||
request.txn_id,
|
request.txn_id,
|
||||||
request.destination,
|
request.destination,
|
||||||
request.method,
|
request.method,
|
||||||
url_str,
|
url_str,
|
||||||
_flatten_response_never_received(e),
|
_flatten_response_never_received(e),
|
||||||
)
|
)
|
||||||
|
body = None
|
||||||
|
|
||||||
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
e = HttpResponseException(
|
||||||
|
response.code, response.phrase, body
|
||||||
|
)
|
||||||
|
|
||||||
|
# Retry if the error is a 429 (Too Many Requests),
|
||||||
|
# otherwise just raise a standard HttpResponseException
|
||||||
|
if response.code == 429:
|
||||||
|
raise_from(RequestSendFailed(e, can_retry=True), e)
|
||||||
|
else:
|
||||||
|
raise e
|
||||||
|
|
||||||
|
break
|
||||||
|
except RequestSendFailed as e:
|
||||||
|
logger.warn(
|
||||||
|
"{%s} [%s] Request failed: %s %s: %s",
|
||||||
|
request.txn_id,
|
||||||
|
request.destination,
|
||||||
|
request.method,
|
||||||
|
url_str,
|
||||||
|
_flatten_response_never_received(e.inner_exception),
|
||||||
|
)
|
||||||
|
|
||||||
|
if not e.can_retry:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if retries_left and not timeout:
|
if retries_left and not timeout:
|
||||||
|
@ -376,29 +428,16 @@ class MatrixFederationHttpClient(object):
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
logger.info(
|
except Exception as e:
|
||||||
"{%s} [%s] Got response headers: %d %s",
|
logger.warn(
|
||||||
|
"{%s} [%s] Request failed: %s %s: %s",
|
||||||
request.txn_id,
|
request.txn_id,
|
||||||
request.destination,
|
request.destination,
|
||||||
response.code,
|
request.method,
|
||||||
response.phrase.decode('ascii', errors='replace'),
|
url_str,
|
||||||
)
|
_flatten_response_never_received(e),
|
||||||
|
|
||||||
if 200 <= response.code < 300:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
# :'(
|
|
||||||
# Update transactions table?
|
|
||||||
d = treq.content(response)
|
|
||||||
d = timeout_deferred(
|
|
||||||
d,
|
|
||||||
timeout=_sec_timeout,
|
|
||||||
reactor=self.hs.get_reactor(),
|
|
||||||
)
|
|
||||||
body = yield make_deferred_yieldable(d)
|
|
||||||
raise HttpResponseException(
|
|
||||||
response.code, response.phrase, body
|
|
||||||
)
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
defer.returnValue(response)
|
defer.returnValue(response)
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ from synapse.api.errors import (
|
||||||
FederationDeniedError,
|
FederationDeniedError,
|
||||||
HttpResponseException,
|
HttpResponseException,
|
||||||
NotFoundError,
|
NotFoundError,
|
||||||
|
RequestSendFailed,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
@ -372,10 +373,10 @@ class MediaRepository(object):
|
||||||
"allow_remote": "false",
|
"allow_remote": "false",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
except twisted.internet.error.DNSLookupError as e:
|
except RequestSendFailed as e:
|
||||||
logger.warn("HTTP error fetching remote media %s/%s: %r",
|
logger.warn("Request failed fetching remote media %s/%s: %r",
|
||||||
server_name, media_id, e)
|
server_name, media_id, e)
|
||||||
raise NotFoundError()
|
raise SynapseError(502, "Failed to fetch remote media")
|
||||||
|
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
logger.warn("HTTP error fetching remote media %s/%s: %s",
|
logger.warn("HTTP error fetching remote media %s/%s: %s",
|
||||||
|
|
|
@ -20,6 +20,7 @@ from twisted.internet.error import ConnectingCancelledError, DNSLookupError
|
||||||
from twisted.web.client import ResponseNeverReceived
|
from twisted.web.client import ResponseNeverReceived
|
||||||
from twisted.web.http import HTTPChannel
|
from twisted.web.http import HTTPChannel
|
||||||
|
|
||||||
|
from synapse.api.errors import RequestSendFailed
|
||||||
from synapse.http.matrixfederationclient import (
|
from synapse.http.matrixfederationclient import (
|
||||||
MatrixFederationHttpClient,
|
MatrixFederationHttpClient,
|
||||||
MatrixFederationRequest,
|
MatrixFederationRequest,
|
||||||
|
@ -49,7 +50,8 @@ class FederationClientTests(HomeserverTestCase):
|
||||||
self.pump()
|
self.pump()
|
||||||
|
|
||||||
f = self.failureResultOf(d)
|
f = self.failureResultOf(d)
|
||||||
self.assertIsInstance(f.value, DNSLookupError)
|
self.assertIsInstance(f.value, RequestSendFailed)
|
||||||
|
self.assertIsInstance(f.value.inner_exception, DNSLookupError)
|
||||||
|
|
||||||
def test_client_never_connect(self):
|
def test_client_never_connect(self):
|
||||||
"""
|
"""
|
||||||
|
@ -76,7 +78,11 @@ class FederationClientTests(HomeserverTestCase):
|
||||||
self.reactor.advance(10.5)
|
self.reactor.advance(10.5)
|
||||||
f = self.failureResultOf(d)
|
f = self.failureResultOf(d)
|
||||||
|
|
||||||
self.assertIsInstance(f.value, (ConnectingCancelledError, TimeoutError))
|
self.assertIsInstance(f.value, RequestSendFailed)
|
||||||
|
self.assertIsInstance(
|
||||||
|
f.value.inner_exception,
|
||||||
|
(ConnectingCancelledError, TimeoutError),
|
||||||
|
)
|
||||||
|
|
||||||
def test_client_connect_no_response(self):
|
def test_client_connect_no_response(self):
|
||||||
"""
|
"""
|
||||||
|
@ -107,7 +113,8 @@ class FederationClientTests(HomeserverTestCase):
|
||||||
self.reactor.advance(10.5)
|
self.reactor.advance(10.5)
|
||||||
f = self.failureResultOf(d)
|
f = self.failureResultOf(d)
|
||||||
|
|
||||||
self.assertIsInstance(f.value, ResponseNeverReceived)
|
self.assertIsInstance(f.value, RequestSendFailed)
|
||||||
|
self.assertIsInstance(f.value.inner_exception, ResponseNeverReceived)
|
||||||
|
|
||||||
def test_client_gets_headers(self):
|
def test_client_gets_headers(self):
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in New Issue