push federation retry limiter down to matrixfederationclient
rather than having to instrument everywhere we make a federation call, make the MatrixFederationHttpClient manage the retry limiter.
This commit is contained in:
parent
ad8a26e361
commit
4bd597d9fc
|
@ -15,7 +15,6 @@
|
|||
|
||||
from synapse.crypto.keyclient import fetch_server_key
|
||||
from synapse.api.errors import SynapseError, Codes
|
||||
from synapse.util.retryutils import get_retry_limiter
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.logcontext import (
|
||||
|
@ -363,12 +362,6 @@ class Keyring(object):
|
|||
def get_keys_from_server(self, server_name_and_key_ids):
|
||||
@defer.inlineCallbacks
|
||||
def get_key(server_name, key_ids):
|
||||
limiter = yield get_retry_limiter(
|
||||
server_name,
|
||||
self.clock,
|
||||
self.store,
|
||||
)
|
||||
with limiter:
|
||||
keys = None
|
||||
try:
|
||||
keys = yield self.get_server_verify_key_v2_direct(
|
||||
|
|
|
@ -29,7 +29,7 @@ from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
|
|||
from synapse.events import FrozenEvent, builder
|
||||
import synapse.metrics
|
||||
|
||||
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
import copy
|
||||
import itertools
|
||||
|
@ -234,13 +234,6 @@ class FederationClient(FederationBase):
|
|||
continue
|
||||
|
||||
try:
|
||||
limiter = yield get_retry_limiter(
|
||||
destination,
|
||||
self._clock,
|
||||
self.store,
|
||||
)
|
||||
|
||||
with limiter:
|
||||
transaction_data = yield self.transport_layer.get_event(
|
||||
destination, event_id, timeout=timeout,
|
||||
)
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
@ -22,9 +22,7 @@ from .units import Transaction, Edu
|
|||
from synapse.api.errors import HttpResponseException
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.logcontext import preserve_context_over_fn
|
||||
from synapse.util.retryutils import (
|
||||
get_retry_limiter, NotRetryingDestination,
|
||||
)
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
|
@ -312,13 +310,6 @@ class TransactionQueue(object):
|
|||
yield run_on_reactor()
|
||||
|
||||
while True:
|
||||
limiter = yield get_retry_limiter(
|
||||
destination,
|
||||
self.clock,
|
||||
self.store,
|
||||
backoff_on_404=True, # If we get a 404 the other side has gone
|
||||
)
|
||||
|
||||
device_message_edus, device_stream_id, dev_list_id = (
|
||||
yield self._get_new_device_messages(destination)
|
||||
)
|
||||
|
@ -374,7 +365,6 @@ class TransactionQueue(object):
|
|||
|
||||
success = yield self._send_new_transaction(
|
||||
destination, pending_pdus, pending_edus, pending_failures,
|
||||
limiter=limiter,
|
||||
)
|
||||
if success:
|
||||
# Remove the acknowledged device messages from the database
|
||||
|
@ -392,12 +382,24 @@ class TransactionQueue(object):
|
|||
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
|
||||
else:
|
||||
break
|
||||
except NotRetryingDestination:
|
||||
except NotRetryingDestination as e:
|
||||
logger.debug(
|
||||
"TX [%s] not ready for retry yet - "
|
||||
"TX [%s] not ready for retry yet (next retry at %s) - "
|
||||
"dropping transaction for now",
|
||||
destination,
|
||||
datetime.datetime.fromtimestamp(
|
||||
(e.retry_last_ts + e.retry_interval) / 1000.0
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
"TX [%s] Failed to send transaction: %s",
|
||||
destination,
|
||||
e,
|
||||
)
|
||||
for p in pending_pdus:
|
||||
logger.info("Failed to send event %s to %s", p.event_id,
|
||||
destination)
|
||||
finally:
|
||||
# We want to be *very* sure we delete this after we stop processing
|
||||
self.pending_transactions.pop(destination, None)
|
||||
|
@ -437,7 +439,7 @@ class TransactionQueue(object):
|
|||
@measure_func("_send_new_transaction")
|
||||
@defer.inlineCallbacks
|
||||
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
|
||||
pending_failures, limiter):
|
||||
pending_failures):
|
||||
|
||||
# Sort based on the order field
|
||||
pending_pdus.sort(key=lambda t: t[1])
|
||||
|
@ -447,7 +449,6 @@ class TransactionQueue(object):
|
|||
|
||||
success = True
|
||||
|
||||
try:
|
||||
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
||||
|
||||
txn_id = str(self._next_txn_id)
|
||||
|
@ -488,7 +489,6 @@ class TransactionQueue(object):
|
|||
len(failures),
|
||||
)
|
||||
|
||||
with limiter:
|
||||
# Actually send the transaction
|
||||
|
||||
# FIXME (erikj): This is a bit of a hack to make the Pdu age
|
||||
|
@ -548,31 +548,5 @@ class TransactionQueue(object):
|
|||
"Failed to send event %s to %s", p.event_id, destination
|
||||
)
|
||||
success = False
|
||||
except RuntimeError as e:
|
||||
# We capture this here as there as nothing actually listens
|
||||
# for this finishing functions deferred.
|
||||
logger.warn(
|
||||
"TX [%s] Problem in _attempt_transaction: %s",
|
||||
destination,
|
||||
e,
|
||||
)
|
||||
|
||||
success = False
|
||||
|
||||
for p in pdus:
|
||||
logger.info("Failed to send event %s to %s", p.event_id, destination)
|
||||
except Exception as e:
|
||||
# We capture this here as there as nothing actually listens
|
||||
# for this finishing functions deferred.
|
||||
logger.warn(
|
||||
"TX [%s] Problem in _attempt_transaction: %s",
|
||||
destination,
|
||||
e,
|
||||
)
|
||||
|
||||
success = False
|
||||
|
||||
for p in pdus:
|
||||
logger.info("Failed to send event %s to %s", p.event_id, destination)
|
||||
|
||||
defer.returnValue(success)
|
||||
|
|
|
@ -163,6 +163,7 @@ class TransportLayerClient(object):
|
|||
data=json_data,
|
||||
json_data_callback=json_data_callback,
|
||||
long_retries=True,
|
||||
backoff_on_404=True, # If we get a 404 the other side has gone
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
|
|
|
@ -22,7 +22,7 @@ from twisted.internet import defer
|
|||
from synapse.api.errors import SynapseError, CodeMessageException
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
|
||||
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -121,10 +121,6 @@ class E2eKeysHandler(object):
|
|||
def do_remote_query(destination):
|
||||
destination_query = remote_queries_not_in_cache[destination]
|
||||
try:
|
||||
limiter = yield get_retry_limiter(
|
||||
destination, self.clock, self.store
|
||||
)
|
||||
with limiter:
|
||||
remote_result = yield self.federation.query_client_keys(
|
||||
destination,
|
||||
{"device_keys": destination_query},
|
||||
|
@ -239,10 +235,6 @@ class E2eKeysHandler(object):
|
|||
def claim_client_keys(destination):
|
||||
device_keys = remote_queries[destination]
|
||||
try:
|
||||
limiter = yield get_retry_limiter(
|
||||
destination, self.clock, self.store
|
||||
)
|
||||
with limiter:
|
||||
remote_result = yield self.federation.claim_client_keys(
|
||||
destination,
|
||||
{"one_time_keys": device_keys},
|
||||
|
|
|
@ -12,8 +12,7 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import synapse.util.retryutils
|
||||
from twisted.internet import defer, reactor, protocol
|
||||
from twisted.internet.error import DNSLookupError
|
||||
from twisted.web.client import readBody, HTTPConnectionPool, Agent
|
||||
|
@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object):
|
|||
reactor, MatrixFederationEndpointFactory(hs), pool=pool
|
||||
)
|
||||
self.clock = hs.get_clock()
|
||||
self._store = hs.get_datastore()
|
||||
self.version_string = hs.version_string
|
||||
self._next_id = 1
|
||||
|
||||
|
@ -106,22 +106,32 @@ class MatrixFederationHttpClient(object):
|
|||
def _request(self, destination, method, path,
|
||||
body_callback, headers_dict={}, param_bytes=b"",
|
||||
query_bytes=b"", retry_on_dns_fail=True,
|
||||
timeout=None, long_retries=False):
|
||||
timeout=None, long_retries=False, backoff_on_404=False):
|
||||
""" Creates and sends a request to the given server
|
||||
Args:
|
||||
destination (str): The remote server to send the HTTP request to.
|
||||
method (str): HTTP method
|
||||
path (str): The HTTP path
|
||||
backoff_on_404 (bool): Back off if we get a 404
|
||||
|
||||
Returns:
|
||||
Deferred: resolves with the http response object on success.
|
||||
|
||||
Fails with ``HTTPRequestException``: if we get an HTTP response
|
||||
code >= 300.
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
"""
|
||||
limiter = yield synapse.util.retryutils.get_retry_limiter(
|
||||
destination,
|
||||
self.clock,
|
||||
self._store,
|
||||
backoff_on_404=backoff_on_404,
|
||||
)
|
||||
|
||||
destination = destination.encode("ascii")
|
||||
path_bytes = path.encode("ascii")
|
||||
|
||||
with limiter:
|
||||
headers_dict[b"User-Agent"] = [self.version_string]
|
||||
headers_dict[b"Host"] = [destination]
|
||||
|
||||
|
@ -261,7 +271,7 @@ class MatrixFederationHttpClient(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def put_json(self, destination, path, data={}, json_data_callback=None,
|
||||
long_retries=False, timeout=None):
|
||||
long_retries=False, timeout=None, backoff_on_404=False):
|
||||
""" Sends the specifed json data using PUT
|
||||
|
||||
Args:
|
||||
|
@ -276,11 +286,17 @@ class MatrixFederationHttpClient(object):
|
|||
retry for a short or long time.
|
||||
timeout(int): How long to try (in ms) the destination for before
|
||||
giving up. None indicates no timeout.
|
||||
backoff_on_404 (bool): True if we should count a 404 response as
|
||||
a failure of the server (and should therefore back off future
|
||||
requests)
|
||||
|
||||
Returns:
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body. On a 4xx or 5xx error response a
|
||||
CodeMessageException is raised.
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
"""
|
||||
|
||||
if not json_data_callback:
|
||||
|
@ -303,6 +319,7 @@ class MatrixFederationHttpClient(object):
|
|||
headers_dict={"Content-Type": ["application/json"]},
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
backoff_on_404=backoff_on_404,
|
||||
)
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
|
@ -332,6 +349,9 @@ class MatrixFederationHttpClient(object):
|
|||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body. On a 4xx or 5xx error response a
|
||||
CodeMessageException is raised.
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
"""
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
|
@ -377,6 +397,9 @@ class MatrixFederationHttpClient(object):
|
|||
|
||||
The result of the deferred is a tuple of `(code, response)`,
|
||||
where `response` is a dict representing the decoded JSON body.
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
"""
|
||||
logger.debug("get_json args: %s", args)
|
||||
|
||||
|
@ -426,6 +449,9 @@ class MatrixFederationHttpClient(object):
|
|||
|
||||
Fails with ``HTTPRequestException`` if we get an HTTP response code
|
||||
>= 300
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
"""
|
||||
|
||||
encoded_args = {}
|
||||
|
|
|
@ -124,7 +124,13 @@ class RetryDestinationLimiter(object):
|
|||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
valid_err_code = False
|
||||
if exc_type is not None and issubclass(exc_type, CodeMessageException):
|
||||
if exc_type is None:
|
||||
valid_err_code = True
|
||||
elif not issubclass(exc_type, Exception):
|
||||
# avoid treating exceptions which don't derive from Exception as
|
||||
# failures; this is mostly so as not to catch defer._DefGen.
|
||||
valid_err_code = True
|
||||
elif issubclass(exc_type, CodeMessageException):
|
||||
# Some error codes are perfectly fine for some APIs, whereas other
|
||||
# APIs may expect to never received e.g. a 404. It's important to
|
||||
# handle 404 as some remote servers will return a 404 when the HS
|
||||
|
@ -142,11 +148,13 @@ class RetryDestinationLimiter(object):
|
|||
else:
|
||||
valid_err_code = False
|
||||
|
||||
if exc_type is None or valid_err_code:
|
||||
if valid_err_code:
|
||||
# We connected successfully.
|
||||
if not self.retry_interval:
|
||||
return
|
||||
|
||||
logger.debug("Connection to %s was successful; clearing backoff",
|
||||
self.destination)
|
||||
retry_last_ts = 0
|
||||
self.retry_interval = 0
|
||||
else:
|
||||
|
@ -160,6 +168,10 @@ class RetryDestinationLimiter(object):
|
|||
else:
|
||||
self.retry_interval = self.min_retry_interval
|
||||
|
||||
logger.debug(
|
||||
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
|
||||
self.destination, exc_type, exc_val, self.retry_interval
|
||||
)
|
||||
retry_last_ts = int(self.clock.time_msec())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -192,6 +192,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
|
|||
),
|
||||
json_data_callback=ANY,
|
||||
long_retries=True,
|
||||
backoff_on_404=True,
|
||||
),
|
||||
defer.succeed((200, "OK"))
|
||||
)
|
||||
|
@ -263,6 +264,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
|
|||
),
|
||||
json_data_callback=ANY,
|
||||
long_retries=True,
|
||||
backoff_on_404=True,
|
||||
),
|
||||
defer.succeed((200, "OK"))
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue