Ignore backoff history for invites, aliases, and roomdirs
Add a param to the federation client which lets us ignore historical backoff data for federation queries, and set it for a handful of operations.
This commit is contained in:
parent
4bd597d9fc
commit
5a16cb4bf0
|
@ -88,7 +88,7 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def make_query(self, destination, query_type, args,
|
def make_query(self, destination, query_type, args,
|
||||||
retry_on_dns_fail=False):
|
retry_on_dns_fail=False, ignore_backoff=False):
|
||||||
"""Sends a federation Query to a remote homeserver of the given type
|
"""Sends a federation Query to a remote homeserver of the given type
|
||||||
and arguments.
|
and arguments.
|
||||||
|
|
||||||
|
@ -98,6 +98,8 @@ class FederationClient(FederationBase):
|
||||||
handler name used in register_query_handler().
|
handler name used in register_query_handler().
|
||||||
args (dict): Mapping of strings to strings containing the details
|
args (dict): Mapping of strings to strings containing the details
|
||||||
of the query request.
|
of the query request.
|
||||||
|
ignore_backoff (bool): true to ignore the historical backoff data
|
||||||
|
and try the request anyway.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
a Deferred which will eventually yield a JSON object from the
|
a Deferred which will eventually yield a JSON object from the
|
||||||
|
@ -106,7 +108,8 @@ class FederationClient(FederationBase):
|
||||||
sent_queries_counter.inc(query_type)
|
sent_queries_counter.inc(query_type)
|
||||||
|
|
||||||
return self.transport_layer.make_query(
|
return self.transport_layer.make_query(
|
||||||
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
|
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
|
||||||
|
ignore_backoff=ignore_backoff,
|
||||||
)
|
)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
|
|
|
@ -175,7 +175,8 @@ class TransportLayerClient(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def make_query(self, destination, query_type, args, retry_on_dns_fail):
|
def make_query(self, destination, query_type, args, retry_on_dns_fail,
|
||||||
|
ignore_backoff=False):
|
||||||
path = PREFIX + "/query/%s" % query_type
|
path = PREFIX + "/query/%s" % query_type
|
||||||
|
|
||||||
content = yield self.client.get_json(
|
content = yield self.client.get_json(
|
||||||
|
@ -184,6 +185,7 @@ class TransportLayerClient(object):
|
||||||
args=args,
|
args=args,
|
||||||
retry_on_dns_fail=retry_on_dns_fail,
|
retry_on_dns_fail=retry_on_dns_fail,
|
||||||
timeout=10000,
|
timeout=10000,
|
||||||
|
ignore_backoff=ignore_backoff,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue(content)
|
defer.returnValue(content)
|
||||||
|
@ -243,6 +245,7 @@ class TransportLayerClient(object):
|
||||||
destination=destination,
|
destination=destination,
|
||||||
path=path,
|
path=path,
|
||||||
data=content,
|
data=content,
|
||||||
|
ignore_backoff=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue(response)
|
defer.returnValue(response)
|
||||||
|
@ -270,6 +273,7 @@ class TransportLayerClient(object):
|
||||||
destination=remote_server,
|
destination=remote_server,
|
||||||
path=path,
|
path=path,
|
||||||
args=args,
|
args=args,
|
||||||
|
ignore_backoff=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue(response)
|
defer.returnValue(response)
|
||||||
|
|
|
@ -175,6 +175,7 @@ class DirectoryHandler(BaseHandler):
|
||||||
"room_alias": room_alias.to_string(),
|
"room_alias": room_alias.to_string(),
|
||||||
},
|
},
|
||||||
retry_on_dns_fail=False,
|
retry_on_dns_fail=False,
|
||||||
|
ignore_backoff=True,
|
||||||
)
|
)
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
logging.warn("Error retrieving alias")
|
logging.warn("Error retrieving alias")
|
||||||
|
|
|
@ -52,7 +52,8 @@ class ProfileHandler(BaseHandler):
|
||||||
args={
|
args={
|
||||||
"user_id": target_user.to_string(),
|
"user_id": target_user.to_string(),
|
||||||
"field": "displayname",
|
"field": "displayname",
|
||||||
}
|
},
|
||||||
|
ignore_backoff=True,
|
||||||
)
|
)
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
if e.code != 404:
|
if e.code != 404:
|
||||||
|
@ -99,7 +100,8 @@ class ProfileHandler(BaseHandler):
|
||||||
args={
|
args={
|
||||||
"user_id": target_user.to_string(),
|
"user_id": target_user.to_string(),
|
||||||
"field": "avatar_url",
|
"field": "avatar_url",
|
||||||
}
|
},
|
||||||
|
ignore_backoff=True,
|
||||||
)
|
)
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
if e.code != 404:
|
if e.code != 404:
|
||||||
|
|
|
@ -106,12 +106,16 @@ class MatrixFederationHttpClient(object):
|
||||||
def _request(self, destination, method, path,
|
def _request(self, destination, method, path,
|
||||||
body_callback, headers_dict={}, param_bytes=b"",
|
body_callback, headers_dict={}, param_bytes=b"",
|
||||||
query_bytes=b"", retry_on_dns_fail=True,
|
query_bytes=b"", retry_on_dns_fail=True,
|
||||||
timeout=None, long_retries=False, backoff_on_404=False):
|
timeout=None, long_retries=False,
|
||||||
|
ignore_backoff=False,
|
||||||
|
backoff_on_404=False):
|
||||||
""" Creates and sends a request to the given server
|
""" Creates and sends a request to the given server
|
||||||
Args:
|
Args:
|
||||||
destination (str): The remote server to send the HTTP request to.
|
destination (str): The remote server to send the HTTP request to.
|
||||||
method (str): HTTP method
|
method (str): HTTP method
|
||||||
path (str): The HTTP path
|
path (str): The HTTP path
|
||||||
|
ignore_backoff (bool): true to ignore the historical backoff data
|
||||||
|
and try the request anyway.
|
||||||
backoff_on_404 (bool): Back off if we get a 404
|
backoff_on_404 (bool): Back off if we get a 404
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -127,6 +131,7 @@ class MatrixFederationHttpClient(object):
|
||||||
self.clock,
|
self.clock,
|
||||||
self._store,
|
self._store,
|
||||||
backoff_on_404=backoff_on_404,
|
backoff_on_404=backoff_on_404,
|
||||||
|
ignore_backoff=ignore_backoff,
|
||||||
)
|
)
|
||||||
|
|
||||||
destination = destination.encode("ascii")
|
destination = destination.encode("ascii")
|
||||||
|
@ -271,7 +276,9 @@ class MatrixFederationHttpClient(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def put_json(self, destination, path, data={}, json_data_callback=None,
|
def put_json(self, destination, path, data={}, json_data_callback=None,
|
||||||
long_retries=False, timeout=None, backoff_on_404=False):
|
long_retries=False, timeout=None,
|
||||||
|
ignore_backoff=False,
|
||||||
|
backoff_on_404=False):
|
||||||
""" Sends the specifed json data using PUT
|
""" Sends the specifed json data using PUT
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -286,6 +293,8 @@ class MatrixFederationHttpClient(object):
|
||||||
retry for a short or long time.
|
retry for a short or long time.
|
||||||
timeout(int): How long to try (in ms) the destination for before
|
timeout(int): How long to try (in ms) the destination for before
|
||||||
giving up. None indicates no timeout.
|
giving up. None indicates no timeout.
|
||||||
|
ignore_backoff (bool): true to ignore the historical backoff data
|
||||||
|
and try the request anyway.
|
||||||
backoff_on_404 (bool): True if we should count a 404 response as
|
backoff_on_404 (bool): True if we should count a 404 response as
|
||||||
a failure of the server (and should therefore back off future
|
a failure of the server (and should therefore back off future
|
||||||
requests)
|
requests)
|
||||||
|
@ -319,6 +328,7 @@ class MatrixFederationHttpClient(object):
|
||||||
headers_dict={"Content-Type": ["application/json"]},
|
headers_dict={"Content-Type": ["application/json"]},
|
||||||
long_retries=long_retries,
|
long_retries=long_retries,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
|
ignore_backoff=ignore_backoff,
|
||||||
backoff_on_404=backoff_on_404,
|
backoff_on_404=backoff_on_404,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -331,7 +341,7 @@ class MatrixFederationHttpClient(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def post_json(self, destination, path, data={}, long_retries=False,
|
def post_json(self, destination, path, data={}, long_retries=False,
|
||||||
timeout=None):
|
timeout=None, ignore_backoff=False):
|
||||||
""" Sends the specifed json data using POST
|
""" Sends the specifed json data using POST
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -344,7 +354,8 @@ class MatrixFederationHttpClient(object):
|
||||||
retry for a short or long time.
|
retry for a short or long time.
|
||||||
timeout(int): How long to try (in ms) the destination for before
|
timeout(int): How long to try (in ms) the destination for before
|
||||||
giving up. None indicates no timeout.
|
giving up. None indicates no timeout.
|
||||||
|
ignore_backoff (bool): true to ignore the historical backoff data and
|
||||||
|
try the request anyway.
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||||
will be the decoded JSON body. On a 4xx or 5xx error response a
|
will be the decoded JSON body. On a 4xx or 5xx error response a
|
||||||
|
@ -368,6 +379,7 @@ class MatrixFederationHttpClient(object):
|
||||||
headers_dict={"Content-Type": ["application/json"]},
|
headers_dict={"Content-Type": ["application/json"]},
|
||||||
long_retries=long_retries,
|
long_retries=long_retries,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
|
ignore_backoff=ignore_backoff,
|
||||||
)
|
)
|
||||||
|
|
||||||
if 200 <= response.code < 300:
|
if 200 <= response.code < 300:
|
||||||
|
@ -380,7 +392,7 @@ class MatrixFederationHttpClient(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
|
def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
|
||||||
timeout=None):
|
timeout=None, ignore_backoff=False):
|
||||||
""" GETs some json from the given host homeserver and path
|
""" GETs some json from the given host homeserver and path
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -392,6 +404,8 @@ class MatrixFederationHttpClient(object):
|
||||||
timeout (int): How long to try (in ms) the destination for before
|
timeout (int): How long to try (in ms) the destination for before
|
||||||
giving up. None indicates no timeout and that the request will
|
giving up. None indicates no timeout and that the request will
|
||||||
be retried.
|
be retried.
|
||||||
|
ignore_backoff (bool): true to ignore the historical backoff data
|
||||||
|
and try the request anyway.
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: Succeeds when we get *any* HTTP response.
|
Deferred: Succeeds when we get *any* HTTP response.
|
||||||
|
|
||||||
|
@ -424,6 +438,7 @@ class MatrixFederationHttpClient(object):
|
||||||
body_callback=body_callback,
|
body_callback=body_callback,
|
||||||
retry_on_dns_fail=retry_on_dns_fail,
|
retry_on_dns_fail=retry_on_dns_fail,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
|
ignore_backoff=ignore_backoff,
|
||||||
)
|
)
|
||||||
|
|
||||||
if 200 <= response.code < 300:
|
if 200 <= response.code < 300:
|
||||||
|
@ -436,13 +451,16 @@ class MatrixFederationHttpClient(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_file(self, destination, path, output_stream, args={},
|
def get_file(self, destination, path, output_stream, args={},
|
||||||
retry_on_dns_fail=True, max_size=None):
|
retry_on_dns_fail=True, max_size=None,
|
||||||
|
ignore_backoff=False):
|
||||||
"""GETs a file from a given homeserver
|
"""GETs a file from a given homeserver
|
||||||
Args:
|
Args:
|
||||||
destination (str): The remote server to send the HTTP request to.
|
destination (str): The remote server to send the HTTP request to.
|
||||||
path (str): The HTTP path to GET.
|
path (str): The HTTP path to GET.
|
||||||
output_stream (file): File to write the response body to.
|
output_stream (file): File to write the response body to.
|
||||||
args (dict): Optional dictionary used to create the query string.
|
args (dict): Optional dictionary used to create the query string.
|
||||||
|
ignore_backoff (bool): true to ignore the historical backoff data
|
||||||
|
and try the request anyway.
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: resolves with an (int,dict) tuple of the file length and
|
Deferred: resolves with an (int,dict) tuple of the file length and
|
||||||
a dict of the response headers.
|
a dict of the response headers.
|
||||||
|
@ -473,7 +491,8 @@ class MatrixFederationHttpClient(object):
|
||||||
path,
|
path,
|
||||||
query_bytes=query_bytes,
|
query_bytes=query_bytes,
|
||||||
body_callback=body_callback,
|
body_callback=body_callback,
|
||||||
retry_on_dns_fail=retry_on_dns_fail
|
retry_on_dns_fail=retry_on_dns_fail,
|
||||||
|
ignore_backoff=ignore_backoff,
|
||||||
)
|
)
|
||||||
|
|
||||||
headers = dict(response.headers.getAllRawHeaders())
|
headers = dict(response.headers.getAllRawHeaders())
|
||||||
|
|
|
@ -35,7 +35,8 @@ class NotRetryingDestination(Exception):
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_retry_limiter(destination, clock, store, **kwargs):
|
def get_retry_limiter(destination, clock, store, ignore_backoff=False,
|
||||||
|
**kwargs):
|
||||||
"""For a given destination check if we have previously failed to
|
"""For a given destination check if we have previously failed to
|
||||||
send a request there and are waiting before retrying the destination.
|
send a request there and are waiting before retrying the destination.
|
||||||
If we are not ready to retry the destination, this will raise a
|
If we are not ready to retry the destination, this will raise a
|
||||||
|
@ -43,6 +44,14 @@ def get_retry_limiter(destination, clock, store, **kwargs):
|
||||||
that will mark the destination as down if an exception is thrown (excluding
|
that will mark the destination as down if an exception is thrown (excluding
|
||||||
CodeMessageException with code < 500)
|
CodeMessageException with code < 500)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination (str): name of homeserver
|
||||||
|
clock (synapse.util.clock): timing source
|
||||||
|
store (synapse.storage.transactions.TransactionStore): datastore
|
||||||
|
ignore_backoff (bool): true to ignore the historical backoff data and
|
||||||
|
try the request anyway. We will still update the next
|
||||||
|
retry_interval on success/failure.
|
||||||
|
|
||||||
Example usage:
|
Example usage:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -66,7 +75,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
|
||||||
|
|
||||||
now = int(clock.time_msec())
|
now = int(clock.time_msec())
|
||||||
|
|
||||||
if retry_last_ts + retry_interval > now:
|
if not ignore_backoff and retry_last_ts + retry_interval > now:
|
||||||
raise NotRetryingDestination(
|
raise NotRetryingDestination(
|
||||||
retry_last_ts=retry_last_ts,
|
retry_last_ts=retry_last_ts,
|
||||||
retry_interval=retry_interval,
|
retry_interval=retry_interval,
|
||||||
|
|
Loading…
Reference in New Issue