Improve opentracing for incoming HTTP requests (#11618)
* remove `start_active_span_from_request` Instead, pull out a separate function, `span_context_from_request`, to extract the parent span, which we can then pass into `start_active_span` as normal. This seems to be clearer all round. * Remove redundant tags from `incoming-federation-request` These are all wrapped up inside a parent span generated in AsyncResource, so there's no point duplicating all the tags that are set there. * Leave request spans open until the request completes It may take some time for the response to be encoded into JSON, and that JSON to be streamed back to the client, and really we want that inside the top-level span, so let's hand responsibility for closure to the SynapseRequest. * opentracing logs for HTTP request events * changelog
This commit is contained in:
parent
8e4083e2f6
commit
60fa4935b5
|
@ -0,0 +1 @@
|
|||
Improve opentracing support for incoming HTTP requests.
|
|
@ -22,13 +22,11 @@ from synapse.api.urls import FEDERATION_V1_PREFIX
|
|||
from synapse.http.server import HttpServer, ServletCallback
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging import opentracing
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.opentracing import (
|
||||
SynapseTags,
|
||||
start_active_span,
|
||||
start_active_span_from_request,
|
||||
tags,
|
||||
set_tag,
|
||||
span_context_from_request,
|
||||
start_active_span_follows_from,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
|
@ -279,30 +277,19 @@ class BaseFederationServlet:
|
|||
logger.warning("authenticate_request failed: %s", e)
|
||||
raise
|
||||
|
||||
request_tags = {
|
||||
SynapseTags.REQUEST_ID: request.get_request_id(),
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
|
||||
tags.HTTP_METHOD: request.get_method(),
|
||||
tags.HTTP_URL: request.get_redacted_uri(),
|
||||
tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
"authenticated_entity": origin,
|
||||
"servlet_name": request.request_metrics.name,
|
||||
}
|
||||
# update the active opentracing span with the authenticated entity
|
||||
set_tag("authenticated_entity", origin)
|
||||
|
||||
# Only accept the span context if the origin is authenticated
|
||||
# and whitelisted
|
||||
# if the origin is authenticated and whitelisted, link to its span context
|
||||
context = None
|
||||
if origin and whitelisted_homeserver(origin):
|
||||
scope = start_active_span_from_request(
|
||||
request, "incoming-federation-request", tags=request_tags
|
||||
)
|
||||
else:
|
||||
scope = start_active_span(
|
||||
"incoming-federation-request", tags=request_tags
|
||||
)
|
||||
context = span_context_from_request(request)
|
||||
|
||||
scope = start_active_span_follows_from(
|
||||
"incoming-federation-request", contexts=(context,) if context else ()
|
||||
)
|
||||
|
||||
with scope:
|
||||
opentracing.inject_response_headers(request.responseHeaders)
|
||||
|
||||
if origin and self.RATELIMIT:
|
||||
with ratelimiter.ratelimit(origin) as d:
|
||||
await d
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
import contextlib
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Generator, Optional, Tuple, Union
|
||||
from typing import TYPE_CHECKING, Any, Generator, Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
from zope.interface import implementer
|
||||
|
@ -35,6 +35,9 @@ from synapse.logging.context import (
|
|||
)
|
||||
from synapse.types import Requester
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import opentracing
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_next_request_seq = 0
|
||||
|
@ -81,6 +84,10 @@ class SynapseRequest(Request):
|
|||
# server name, for client requests this is the Requester object.
|
||||
self._requester: Optional[Union[Requester, str]] = None
|
||||
|
||||
# An opentracing span for this request. Will be closed when the request is
|
||||
# completely processed.
|
||||
self._opentracing_span: "Optional[opentracing.Span]" = None
|
||||
|
||||
# we can't yet create the logcontext, as we don't know the method.
|
||||
self.logcontext: Optional[LoggingContext] = None
|
||||
|
||||
|
@ -148,6 +155,13 @@ class SynapseRequest(Request):
|
|||
# If there's no authenticated entity, it was the requester.
|
||||
self.logcontext.request.authenticated_entity = authenticated_entity or requester
|
||||
|
||||
def set_opentracing_span(self, span: "opentracing.Span") -> None:
|
||||
"""attach an opentracing span to this request
|
||||
|
||||
Doing so will cause the span to be closed when we finish processing the request
|
||||
"""
|
||||
self._opentracing_span = span
|
||||
|
||||
def get_request_id(self) -> str:
|
||||
return "%s-%i" % (self.get_method(), self.request_seq)
|
||||
|
||||
|
@ -286,6 +300,9 @@ class SynapseRequest(Request):
|
|||
self._processing_finished_time = time.time()
|
||||
self._is_processing = False
|
||||
|
||||
if self._opentracing_span:
|
||||
self._opentracing_span.log_kv({"event": "finished processing"})
|
||||
|
||||
# if we've already sent the response, log it now; otherwise, we wait for the
|
||||
# response to be sent.
|
||||
if self.finish_time is not None:
|
||||
|
@ -299,6 +316,8 @@ class SynapseRequest(Request):
|
|||
"""
|
||||
self.finish_time = time.time()
|
||||
Request.finish(self)
|
||||
if self._opentracing_span:
|
||||
self._opentracing_span.log_kv({"event": "response sent"})
|
||||
if not self._is_processing:
|
||||
assert self.logcontext is not None
|
||||
with PreserveLoggingContext(self.logcontext):
|
||||
|
@ -333,6 +352,11 @@ class SynapseRequest(Request):
|
|||
with PreserveLoggingContext(self.logcontext):
|
||||
logger.info("Connection from client lost before response was sent")
|
||||
|
||||
if self._opentracing_span:
|
||||
self._opentracing_span.log_kv(
|
||||
{"event": "client connection lost", "reason": str(reason.value)}
|
||||
)
|
||||
|
||||
if not self._is_processing:
|
||||
self._finished_processing()
|
||||
|
||||
|
@ -421,6 +445,10 @@ class SynapseRequest(Request):
|
|||
usage.evt_db_fetch_count,
|
||||
)
|
||||
|
||||
# complete the opentracing span, if any.
|
||||
if self._opentracing_span:
|
||||
self._opentracing_span.finish()
|
||||
|
||||
try:
|
||||
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
|
||||
except Exception as e:
|
||||
|
|
|
@ -173,6 +173,7 @@ from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Pattern, Typ
|
|||
import attr
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.web.http import Request
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
from synapse.config import ConfigError
|
||||
|
@ -490,48 +491,6 @@ def start_active_span_follows_from(
|
|||
return scope
|
||||
|
||||
|
||||
def start_active_span_from_request(
|
||||
request,
|
||||
operation_name,
|
||||
references=None,
|
||||
tags=None,
|
||||
start_time=None,
|
||||
ignore_active_span=False,
|
||||
finish_on_close=True,
|
||||
):
|
||||
"""
|
||||
Extracts a span context from a Twisted Request.
|
||||
args:
|
||||
headers (twisted.web.http.Request)
|
||||
|
||||
For the other args see opentracing.tracer
|
||||
|
||||
returns:
|
||||
span_context (opentracing.span.SpanContext)
|
||||
"""
|
||||
# Twisted encodes the values as lists whereas opentracing doesn't.
|
||||
# So, we take the first item in the list.
|
||||
# Also, twisted uses byte arrays while opentracing expects strings.
|
||||
|
||||
if opentracing is None:
|
||||
return noop_context_manager() # type: ignore[unreachable]
|
||||
|
||||
header_dict = {
|
||||
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
|
||||
}
|
||||
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
|
||||
|
||||
return opentracing.tracer.start_active_span(
|
||||
operation_name,
|
||||
child_of=context,
|
||||
references=references,
|
||||
tags=tags,
|
||||
start_time=start_time,
|
||||
ignore_active_span=ignore_active_span,
|
||||
finish_on_close=finish_on_close,
|
||||
)
|
||||
|
||||
|
||||
def start_active_span_from_edu(
|
||||
edu_content,
|
||||
operation_name,
|
||||
|
@ -743,6 +702,20 @@ def active_span_context_as_string():
|
|||
return json_encoder.encode(carrier)
|
||||
|
||||
|
||||
def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]":
|
||||
"""Extract an opentracing context from the headers on an HTTP request
|
||||
|
||||
This is useful when we have received an HTTP request from another part of our
|
||||
system, and want to link our spans to those of the remote system.
|
||||
"""
|
||||
if not opentracing:
|
||||
return None
|
||||
header_dict = {
|
||||
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
|
||||
}
|
||||
return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def span_context_from_string(carrier):
|
||||
"""
|
||||
|
@ -882,10 +855,13 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
|
|||
}
|
||||
|
||||
request_name = request.request_metrics.name
|
||||
if extract_context:
|
||||
scope = start_active_span_from_request(request, request_name)
|
||||
else:
|
||||
scope = start_active_span(request_name)
|
||||
context = span_context_from_request(request) if extract_context else None
|
||||
|
||||
# we configure the scope not to finish the span immediately on exit, and instead
|
||||
# pass the span into the SynapseRequest, which will finish it once we've finished
|
||||
# sending the response to the client.
|
||||
scope = start_active_span(request_name, child_of=context, finish_on_close=False)
|
||||
request.set_opentracing_span(scope.span)
|
||||
|
||||
with scope:
|
||||
inject_response_headers(request.responseHeaders)
|
||||
|
|
Loading…
Reference in New Issue