Opentracing Utils (#5722)
* Add decerators for tracing functions * Use the new clean contexts * Context and edu utils * Move opentracing setters * Move whitelisting * Sectioning comments * Better args wrapper * Docstrings Co-Authored-By: Erik Johnston <erik@matrix.org> * Remove unused methods. * Don't use global * One tracing decorator to rule them all.
This commit is contained in:
parent
841b12867e
commit
18a466b84e
|
@ -0,0 +1 @@
|
|||
Add a set of opentracing utils.
|
|
@ -11,7 +11,7 @@
|
|||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.import opentracing
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# NOTE
|
||||
|
@ -150,10 +150,13 @@ Gotchas
|
|||
"""
|
||||
|
||||
import contextlib
|
||||
import inspect
|
||||
import logging
|
||||
import re
|
||||
from functools import wraps
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.config import ConfigError
|
||||
|
@ -173,36 +176,12 @@ except ImportError:
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _DumTagNames(object):
|
||||
"""wrapper of opentracings tags. We need to have them if we
|
||||
want to reference them without opentracing around. Clearly they
|
||||
should never actually show up in a trace. `set_tags` overwrites
|
||||
these with the correct ones."""
|
||||
# Block everything by default
|
||||
# A regex which matches the server_names to expose traces for.
|
||||
# None means 'block everything'.
|
||||
_homeserver_whitelist = None
|
||||
|
||||
INVALID_TAG = "invalid-tag"
|
||||
COMPONENT = INVALID_TAG
|
||||
DATABASE_INSTANCE = INVALID_TAG
|
||||
DATABASE_STATEMENT = INVALID_TAG
|
||||
DATABASE_TYPE = INVALID_TAG
|
||||
DATABASE_USER = INVALID_TAG
|
||||
ERROR = INVALID_TAG
|
||||
HTTP_METHOD = INVALID_TAG
|
||||
HTTP_STATUS_CODE = INVALID_TAG
|
||||
HTTP_URL = INVALID_TAG
|
||||
MESSAGE_BUS_DESTINATION = INVALID_TAG
|
||||
PEER_ADDRESS = INVALID_TAG
|
||||
PEER_HOSTNAME = INVALID_TAG
|
||||
PEER_HOST_IPV4 = INVALID_TAG
|
||||
PEER_HOST_IPV6 = INVALID_TAG
|
||||
PEER_PORT = INVALID_TAG
|
||||
PEER_SERVICE = INVALID_TAG
|
||||
SAMPLING_PRIORITY = INVALID_TAG
|
||||
SERVICE = INVALID_TAG
|
||||
SPAN_KIND = INVALID_TAG
|
||||
SPAN_KIND_CONSUMER = INVALID_TAG
|
||||
SPAN_KIND_PRODUCER = INVALID_TAG
|
||||
SPAN_KIND_RPC_CLIENT = INVALID_TAG
|
||||
SPAN_KIND_RPC_SERVER = INVALID_TAG
|
||||
# Util methods
|
||||
|
||||
|
||||
def only_if_tracing(func):
|
||||
|
@ -219,11 +198,13 @@ def only_if_tracing(func):
|
|||
return _only_if_tracing_inner
|
||||
|
||||
|
||||
# A regex which matches the server_names to expose traces for.
|
||||
# None means 'block everything'.
|
||||
_homeserver_whitelist = None
|
||||
@contextlib.contextmanager
|
||||
def _noop_context_manager(*args, **kwargs):
|
||||
"""Does exactly what it says on the tin"""
|
||||
yield
|
||||
|
||||
tags = _DumTagNames
|
||||
|
||||
# Setup
|
||||
|
||||
|
||||
def init_tracer(config):
|
||||
|
@ -260,13 +241,39 @@ def init_tracer(config):
|
|||
tags = opentracing.tags
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _noop_context_manager(*args, **kwargs):
|
||||
"""Does absolutely nothing really well. Can be entered and exited arbitrarily.
|
||||
Good substitute for an opentracing scope."""
|
||||
yield
|
||||
# Whitelisting
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_homeserver_whitelist(homeserver_whitelist):
|
||||
"""Sets the homeserver whitelist
|
||||
|
||||
Args:
|
||||
homeserver_whitelist (Iterable[str]): regex of whitelisted homeservers
|
||||
"""
|
||||
global _homeserver_whitelist
|
||||
if homeserver_whitelist:
|
||||
# Makes a single regex which accepts all passed in regexes in the list
|
||||
_homeserver_whitelist = re.compile(
|
||||
"({})".format(")|(".join(homeserver_whitelist))
|
||||
)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def whitelisted_homeserver(destination):
|
||||
"""Checks if a destination matches the whitelist
|
||||
|
||||
Args:
|
||||
destination (str)
|
||||
"""
|
||||
_homeserver_whitelist
|
||||
if _homeserver_whitelist:
|
||||
return _homeserver_whitelist.match(destination)
|
||||
return False
|
||||
|
||||
|
||||
# Start spans and scopes
|
||||
|
||||
# Could use kwargs but I want these to be explicit
|
||||
def start_active_span(
|
||||
operation_name,
|
||||
|
@ -285,8 +292,10 @@ def start_active_span(
|
|||
Returns:
|
||||
scope (Scope) or noop_context_manager
|
||||
"""
|
||||
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
|
||||
else:
|
||||
# We need to enter the scope here for the logcontext to become active
|
||||
return opentracing.tracer.start_active_span(
|
||||
|
@ -300,63 +309,13 @@ def start_active_span(
|
|||
)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def close_active_span():
|
||||
"""Closes the active span. This will close it's logcontext if the context
|
||||
was made for the span"""
|
||||
opentracing.tracer.scope_manager.active.__exit__(None, None, None)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_tag(key, value):
|
||||
"""Set's a tag on the active span"""
|
||||
opentracing.tracer.active_span.set_tag(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def log_kv(key_values, timestamp=None):
|
||||
"""Log to the active span"""
|
||||
opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
||||
|
||||
|
||||
# Note: we don't have a get baggage items because we're trying to hide all
|
||||
# scope and span state from synapse. I think this method may also be useless
|
||||
# as a result
|
||||
@only_if_tracing
|
||||
def set_baggage_item(key, value):
|
||||
"""Attach baggage to the active span"""
|
||||
opentracing.tracer.active_span.set_baggage_item(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_operation_name(operation_name):
|
||||
"""Sets the operation name of the active span"""
|
||||
opentracing.tracer.active_span.set_operation_name(operation_name)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_homeserver_whitelist(homeserver_whitelist):
|
||||
"""Sets the whitelist
|
||||
|
||||
Args:
|
||||
homeserver_whitelist (iterable of strings): regex of whitelisted homeservers
|
||||
"""
|
||||
global _homeserver_whitelist
|
||||
if homeserver_whitelist:
|
||||
# Makes a single regex which accepts all passed in regexes in the list
|
||||
_homeserver_whitelist = re.compile(
|
||||
"({})".format(")|(".join(homeserver_whitelist))
|
||||
)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def whitelisted_homeserver(destination):
|
||||
"""Checks if a destination matches the whitelist
|
||||
Args:
|
||||
destination (String)"""
|
||||
if _homeserver_whitelist:
|
||||
return _homeserver_whitelist.match(destination)
|
||||
return False
|
||||
def start_active_span_follows_from(operation_name, contexts):
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
else:
|
||||
references = [opentracing.follows_from(context) for context in contexts]
|
||||
scope = start_active_span(operation_name, references=references)
|
||||
return scope
|
||||
|
||||
|
||||
def start_active_span_from_context(
|
||||
|
@ -372,12 +331,16 @@ def start_active_span_from_context(
|
|||
Extracts a span context from Twisted Headers.
|
||||
args:
|
||||
headers (twisted.web.http_headers.Headers)
|
||||
|
||||
For the other args see opentracing.tracer
|
||||
|
||||
returns:
|
||||
span_context (opentracing.span.SpanContext)
|
||||
"""
|
||||
# Twisted encodes the values as lists whereas opentracing doesn't.
|
||||
# So, we take the first item in the list.
|
||||
# Also, twisted uses byte arrays while opentracing expects strings.
|
||||
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
|
||||
|
@ -395,17 +358,90 @@ def start_active_span_from_context(
|
|||
)
|
||||
|
||||
|
||||
def start_active_span_from_edu(
|
||||
edu_content,
|
||||
operation_name,
|
||||
references=[],
|
||||
tags=None,
|
||||
start_time=None,
|
||||
ignore_active_span=False,
|
||||
finish_on_close=True,
|
||||
):
|
||||
"""
|
||||
Extracts a span context from an edu and uses it to start a new active span
|
||||
|
||||
Args:
|
||||
edu_content (dict): and edu_content with a `context` field whose value is
|
||||
canonical json for a dict which contains opentracing information.
|
||||
|
||||
For the other args see opentracing.tracer
|
||||
"""
|
||||
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
|
||||
carrier = json.loads(edu_content.get("context", "{}")).get("opentracing", {})
|
||||
context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
|
||||
_references = [
|
||||
opentracing.child_of(span_context_from_string(x))
|
||||
for x in carrier.get("references", [])
|
||||
]
|
||||
|
||||
# For some reason jaeger decided not to support the visualization of multiple parent
|
||||
# spans or explicitely show references. I include the span context as a tag here as
|
||||
# an aid to people debugging but it's really not an ideal solution.
|
||||
|
||||
references += _references
|
||||
|
||||
scope = opentracing.tracer.start_active_span(
|
||||
operation_name,
|
||||
child_of=context,
|
||||
references=references,
|
||||
tags=tags,
|
||||
start_time=start_time,
|
||||
ignore_active_span=ignore_active_span,
|
||||
finish_on_close=finish_on_close,
|
||||
)
|
||||
|
||||
scope.span.set_tag("references", carrier.get("references", []))
|
||||
return scope
|
||||
|
||||
|
||||
# Opentracing setters for tags, logs, etc
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_tag(key, value):
|
||||
"""Sets a tag on the active span"""
|
||||
opentracing.tracer.active_span.set_tag(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def log_kv(key_values, timestamp=None):
|
||||
"""Log to the active span"""
|
||||
opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_operation_name(operation_name):
|
||||
"""Sets the operation name of the active span"""
|
||||
opentracing.tracer.active_span.set_operation_name(operation_name)
|
||||
|
||||
|
||||
# Injection and extraction
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def inject_active_span_twisted_headers(headers, destination):
|
||||
"""
|
||||
Injects a span context into twisted headers inplace
|
||||
Injects a span context into twisted headers in-place
|
||||
|
||||
Args:
|
||||
headers (twisted.web.http_headers.Headers)
|
||||
span (opentracing.Span)
|
||||
|
||||
Returns:
|
||||
Inplace modification of headers
|
||||
In-place modification of headers
|
||||
|
||||
Note:
|
||||
The headers set by the tracer are custom to the tracer implementation which
|
||||
|
@ -437,7 +473,7 @@ def inject_active_span_byte_dict(headers, destination):
|
|||
span (opentracing.Span)
|
||||
|
||||
Returns:
|
||||
Inplace modification of headers
|
||||
In-place modification of headers
|
||||
|
||||
Note:
|
||||
The headers set by the tracer are custom to the tracer implementation which
|
||||
|
@ -458,9 +494,190 @@ def inject_active_span_byte_dict(headers, destination):
|
|||
headers[key.encode()] = [value.encode()]
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def inject_active_span_text_map(carrier, destination=None):
|
||||
"""
|
||||
Injects a span context into a dict
|
||||
|
||||
Args:
|
||||
carrier (dict)
|
||||
destination (str): the name of the remote server. The span context
|
||||
will only be injected if the destination matches the homeserver_whitelist
|
||||
or destination is None.
|
||||
|
||||
Returns:
|
||||
In-place modification of carrier
|
||||
|
||||
Note:
|
||||
The headers set by the tracer are custom to the tracer implementation which
|
||||
should be unique enough that they don't interfere with any headers set by
|
||||
synapse or twisted. If we're still using jaeger these headers would be those
|
||||
here:
|
||||
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
|
||||
"""
|
||||
|
||||
if destination and not whitelisted_homeserver(destination):
|
||||
return
|
||||
|
||||
opentracing.tracer.inject(
|
||||
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
|
||||
)
|
||||
|
||||
|
||||
def active_span_context_as_string():
|
||||
"""
|
||||
Returns:
|
||||
The active span context encoded as a string.
|
||||
"""
|
||||
carrier = {}
|
||||
if opentracing:
|
||||
opentracing.tracer.inject(
|
||||
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
|
||||
)
|
||||
return json.dumps(carrier)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def span_context_from_string(carrier):
|
||||
"""
|
||||
Returns:
|
||||
The active span context decoded from a string.
|
||||
"""
|
||||
carrier = json.loads(carrier)
|
||||
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def extract_text_map(carrier):
|
||||
"""
|
||||
Wrapper method for opentracing's tracer.extract for TEXT_MAP.
|
||||
Args:
|
||||
carrier (dict): a dict possibly containing a span context.
|
||||
|
||||
Returns:
|
||||
The active span context extracted from carrier.
|
||||
"""
|
||||
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
|
||||
|
||||
|
||||
# Tracing decorators
|
||||
|
||||
|
||||
def trace(func):
|
||||
"""
|
||||
Decorator to trace a function.
|
||||
Sets the operation name to that of the function's.
|
||||
"""
|
||||
if opentracing is None:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
def _trace_inner(self, *args, **kwargs):
|
||||
if opentracing is None:
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
scope = start_active_span(func.__name__)
|
||||
scope.__enter__()
|
||||
|
||||
try:
|
||||
result = func(self, *args, **kwargs)
|
||||
if isinstance(result, defer.Deferred):
|
||||
|
||||
def call_back(result):
|
||||
scope.__exit__(None, None, None)
|
||||
return result
|
||||
|
||||
def err_back(result):
|
||||
scope.span.set_tag(tags.ERROR, True)
|
||||
scope.__exit__(None, None, None)
|
||||
return result
|
||||
|
||||
result.addCallbacks(call_back, err_back)
|
||||
|
||||
else:
|
||||
scope.__exit__(None, None, None)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
scope.__exit__(type(e), None, e.__traceback__)
|
||||
raise
|
||||
|
||||
return _trace_inner
|
||||
|
||||
|
||||
def trace_using_operation_name(operation_name):
|
||||
"""Decorator to trace a function. Explicitely sets the operation_name."""
|
||||
|
||||
def trace(func):
|
||||
"""
|
||||
Decorator to trace a function.
|
||||
Sets the operation name to that of the function's.
|
||||
"""
|
||||
if opentracing is None:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
def _trace_inner(self, *args, **kwargs):
|
||||
if opentracing is None:
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
scope = start_active_span(operation_name)
|
||||
scope.__enter__()
|
||||
|
||||
try:
|
||||
result = func(self, *args, **kwargs)
|
||||
if isinstance(result, defer.Deferred):
|
||||
|
||||
def call_back(result):
|
||||
scope.__exit__(None, None, None)
|
||||
return result
|
||||
|
||||
def err_back(result):
|
||||
scope.span.set_tag(tags.ERROR, True)
|
||||
scope.__exit__(None, None, None)
|
||||
return result
|
||||
|
||||
result.addCallbacks(call_back, err_back)
|
||||
else:
|
||||
scope.__exit__(None, None, None)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
scope.__exit__(type(e), None, e.__traceback__)
|
||||
raise
|
||||
|
||||
return _trace_inner
|
||||
|
||||
return trace
|
||||
|
||||
|
||||
def tag_args(func):
|
||||
"""
|
||||
Tags all of the args to the active span.
|
||||
"""
|
||||
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
def _tag_args_inner(self, *args, **kwargs):
|
||||
argspec = inspect.getargspec(func)
|
||||
for i, arg in enumerate(argspec.args[1:]):
|
||||
set_tag("ARG_" + arg, args[i])
|
||||
set_tag("args", args[len(argspec.args) :])
|
||||
set_tag("kwargs", kwargs)
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
return _tag_args_inner
|
||||
|
||||
|
||||
def trace_servlet(servlet_name, func):
|
||||
"""Decorator which traces a serlet. It starts a span with some servlet specific
|
||||
tags such as the servlet_name and request information"""
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
@defer.inlineCallbacks
|
||||
|
@ -477,6 +694,44 @@ def trace_servlet(servlet_name, func):
|
|||
},
|
||||
):
|
||||
result = yield defer.maybeDeferred(func, request, *args, **kwargs)
|
||||
defer.returnValue(result)
|
||||
defer.returnValue(result)
|
||||
|
||||
return _trace_servlet_inner
|
||||
|
||||
|
||||
# Helper class
|
||||
|
||||
|
||||
class _DummyTagNames(object):
|
||||
"""wrapper of opentracings tags. We need to have them if we
|
||||
want to reference them without opentracing around. Clearly they
|
||||
should never actually show up in a trace. `set_tags` overwrites
|
||||
these with the correct ones."""
|
||||
|
||||
INVALID_TAG = "invalid-tag"
|
||||
COMPONENT = INVALID_TAG
|
||||
DATABASE_INSTANCE = INVALID_TAG
|
||||
DATABASE_STATEMENT = INVALID_TAG
|
||||
DATABASE_TYPE = INVALID_TAG
|
||||
DATABASE_USER = INVALID_TAG
|
||||
ERROR = INVALID_TAG
|
||||
HTTP_METHOD = INVALID_TAG
|
||||
HTTP_STATUS_CODE = INVALID_TAG
|
||||
HTTP_URL = INVALID_TAG
|
||||
MESSAGE_BUS_DESTINATION = INVALID_TAG
|
||||
PEER_ADDRESS = INVALID_TAG
|
||||
PEER_HOSTNAME = INVALID_TAG
|
||||
PEER_HOST_IPV4 = INVALID_TAG
|
||||
PEER_HOST_IPV6 = INVALID_TAG
|
||||
PEER_PORT = INVALID_TAG
|
||||
PEER_SERVICE = INVALID_TAG
|
||||
SAMPLING_PRIORITY = INVALID_TAG
|
||||
SERVICE = INVALID_TAG
|
||||
SPAN_KIND = INVALID_TAG
|
||||
SPAN_KIND_CONSUMER = INVALID_TAG
|
||||
SPAN_KIND_PRODUCER = INVALID_TAG
|
||||
SPAN_KIND_RPC_CLIENT = INVALID_TAG
|
||||
SPAN_KIND_RPC_SERVER = INVALID_TAG
|
||||
|
||||
|
||||
tags = _DummyTagNames
|
||||
|
|
|
@ -131,7 +131,7 @@ class _LogContextScope(Scope):
|
|||
|
||||
def close(self):
|
||||
if self.manager.active is not self:
|
||||
logger.error("Tried to close a none active scope!")
|
||||
logger.error("Tried to close a non-active scope!")
|
||||
return
|
||||
|
||||
if self._finish_on_close:
|
||||
|
|
Loading…
Reference in New Issue