Fix opentracing and Prometheus metrics for replication requests (#10996)
This commit fixes two bugs to do with decorators not instrumenting `ReplicationEndpoint`'s `send_request` correctly. There are two decorators on `send_request`: Prometheus' `Gauge.track_inprogress()` and Synapse's `opentracing.trace`. `Gauge.track_inprogress()` does not have any support for async functions when used as a decorator. Since async functions behave like regular functions that return coroutines, only the creation of the coroutine was covered by the metric and none of the actual body of `send_request`. `Gauge.track_inprogress()` returns a regular, non-async function wrapping `send_request`, which is the source of the next bug. The `opentracing.trace` decorator would normally handle async functions correctly, but since the wrapped `send_request` is a non-async function, the decorator ends up suffering from the same issue as `Gauge.track_inprogress()`: the opentracing span only measures the creation of the coroutine and none of the actual function body. Using `Gauge.track_inprogress()` as a context manager instead of a decorator resolves both bugs.
This commit is contained in:
parent
406f7bfa17
commit
6b18eb4430
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug introduced in Synapse 1.21.0 that causes opentracing and Prometheus metrics for replication requests to be measured incorrectly.
|
|
@ -807,6 +807,14 @@ def trace(func=None, opname=None):
|
||||||
result.addCallbacks(call_back, err_back)
|
result.addCallbacks(call_back, err_back)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
if inspect.isawaitable(result):
|
||||||
|
logger.error(
|
||||||
|
"@trace may not have wrapped %s correctly! "
|
||||||
|
"The function is not async but returned a %s.",
|
||||||
|
func.__qualname__,
|
||||||
|
type(result).__name__,
|
||||||
|
)
|
||||||
|
|
||||||
scope.__exit__(None, None, None)
|
scope.__exit__(None, None, None)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
|
@ -182,85 +182,87 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace(opname="outgoing_replication_request")
|
@trace(opname="outgoing_replication_request")
|
||||||
@outgoing_gauge.track_inprogress()
|
|
||||||
async def send_request(*, instance_name="master", **kwargs):
|
async def send_request(*, instance_name="master", **kwargs):
|
||||||
if instance_name == local_instance_name:
|
with outgoing_gauge.track_inprogress():
|
||||||
raise Exception("Trying to send HTTP request to self")
|
if instance_name == local_instance_name:
|
||||||
if instance_name == "master":
|
raise Exception("Trying to send HTTP request to self")
|
||||||
host = master_host
|
if instance_name == "master":
|
||||||
port = master_port
|
host = master_host
|
||||||
elif instance_name in instance_map:
|
port = master_port
|
||||||
host = instance_map[instance_name].host
|
elif instance_name in instance_map:
|
||||||
port = instance_map[instance_name].port
|
host = instance_map[instance_name].host
|
||||||
else:
|
port = instance_map[instance_name].port
|
||||||
raise Exception(
|
else:
|
||||||
"Instance %r not in 'instance_map' config" % (instance_name,)
|
raise Exception(
|
||||||
|
"Instance %r not in 'instance_map' config" % (instance_name,)
|
||||||
|
)
|
||||||
|
|
||||||
|
data = await cls._serialize_payload(**kwargs)
|
||||||
|
|
||||||
|
url_args = [
|
||||||
|
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
|
||||||
|
]
|
||||||
|
|
||||||
|
if cls.CACHE:
|
||||||
|
txn_id = random_string(10)
|
||||||
|
url_args.append(txn_id)
|
||||||
|
|
||||||
|
if cls.METHOD == "POST":
|
||||||
|
request_func = client.post_json_get_json
|
||||||
|
elif cls.METHOD == "PUT":
|
||||||
|
request_func = client.put_json
|
||||||
|
elif cls.METHOD == "GET":
|
||||||
|
request_func = client.get_json
|
||||||
|
else:
|
||||||
|
# We have already asserted in the constructor that a
|
||||||
|
# compatible was picked, but lets be paranoid.
|
||||||
|
raise Exception(
|
||||||
|
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
|
||||||
|
)
|
||||||
|
|
||||||
|
uri = "http://%s:%s/_synapse/replication/%s/%s" % (
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
cls.NAME,
|
||||||
|
"/".join(url_args),
|
||||||
)
|
)
|
||||||
|
|
||||||
data = await cls._serialize_payload(**kwargs)
|
try:
|
||||||
|
# We keep retrying the same request for timeouts. This is so that we
|
||||||
|
# have a good idea that the request has either succeeded or failed
|
||||||
|
# on the master, and so whether we should clean up or not.
|
||||||
|
while True:
|
||||||
|
headers: Dict[bytes, List[bytes]] = {}
|
||||||
|
# Add an authorization header, if configured.
|
||||||
|
if replication_secret:
|
||||||
|
headers[b"Authorization"] = [
|
||||||
|
b"Bearer " + replication_secret
|
||||||
|
]
|
||||||
|
opentracing.inject_header_dict(headers, check_destination=False)
|
||||||
|
try:
|
||||||
|
result = await request_func(uri, data, headers=headers)
|
||||||
|
break
|
||||||
|
except RequestTimedOutError:
|
||||||
|
if not cls.RETRY_ON_TIMEOUT:
|
||||||
|
raise
|
||||||
|
|
||||||
url_args = [
|
logger.warning("%s request timed out; retrying", cls.NAME)
|
||||||
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
|
|
||||||
]
|
|
||||||
|
|
||||||
if cls.CACHE:
|
# If we timed out we probably don't need to worry about backing
|
||||||
txn_id = random_string(10)
|
# off too much, but lets just wait a little anyway.
|
||||||
url_args.append(txn_id)
|
await clock.sleep(1)
|
||||||
|
except HttpResponseException as e:
|
||||||
|
# We convert to SynapseError as we know that it was a SynapseError
|
||||||
|
# on the main process that we should send to the client. (And
|
||||||
|
# importantly, not stack traces everywhere)
|
||||||
|
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
|
||||||
|
raise e.to_synapse_error()
|
||||||
|
except Exception as e:
|
||||||
|
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
|
||||||
|
raise SynapseError(502, "Failed to talk to main process") from e
|
||||||
|
|
||||||
if cls.METHOD == "POST":
|
_outgoing_request_counter.labels(cls.NAME, 200).inc()
|
||||||
request_func = client.post_json_get_json
|
return result
|
||||||
elif cls.METHOD == "PUT":
|
|
||||||
request_func = client.put_json
|
|
||||||
elif cls.METHOD == "GET":
|
|
||||||
request_func = client.get_json
|
|
||||||
else:
|
|
||||||
# We have already asserted in the constructor that a
|
|
||||||
# compatible was picked, but lets be paranoid.
|
|
||||||
raise Exception(
|
|
||||||
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
|
|
||||||
)
|
|
||||||
|
|
||||||
uri = "http://%s:%s/_synapse/replication/%s/%s" % (
|
|
||||||
host,
|
|
||||||
port,
|
|
||||||
cls.NAME,
|
|
||||||
"/".join(url_args),
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# We keep retrying the same request for timeouts. This is so that we
|
|
||||||
# have a good idea that the request has either succeeded or failed on
|
|
||||||
# the master, and so whether we should clean up or not.
|
|
||||||
while True:
|
|
||||||
headers: Dict[bytes, List[bytes]] = {}
|
|
||||||
# Add an authorization header, if configured.
|
|
||||||
if replication_secret:
|
|
||||||
headers[b"Authorization"] = [b"Bearer " + replication_secret]
|
|
||||||
opentracing.inject_header_dict(headers, check_destination=False)
|
|
||||||
try:
|
|
||||||
result = await request_func(uri, data, headers=headers)
|
|
||||||
break
|
|
||||||
except RequestTimedOutError:
|
|
||||||
if not cls.RETRY_ON_TIMEOUT:
|
|
||||||
raise
|
|
||||||
|
|
||||||
logger.warning("%s request timed out; retrying", cls.NAME)
|
|
||||||
|
|
||||||
# If we timed out we probably don't need to worry about backing
|
|
||||||
# off too much, but lets just wait a little anyway.
|
|
||||||
await clock.sleep(1)
|
|
||||||
except HttpResponseException as e:
|
|
||||||
# We convert to SynapseError as we know that it was a SynapseError
|
|
||||||
# on the main process that we should send to the client. (And
|
|
||||||
# importantly, not stack traces everywhere)
|
|
||||||
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
|
|
||||||
raise e.to_synapse_error()
|
|
||||||
except Exception as e:
|
|
||||||
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
|
|
||||||
raise SynapseError(502, "Failed to talk to main process") from e
|
|
||||||
|
|
||||||
_outgoing_request_counter.labels(cls.NAME, 200).inc()
|
|
||||||
return result
|
|
||||||
|
|
||||||
return send_request
|
return send_request
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue