Add basic opentracing support (#5544)
* Configure and initialise tracer Includes config options for the tracer and sets up JaegerClient. * Scope manager using LogContexts We piggy-back our tracer scopes by using log context. The current log context gives us the current scope. If new scope is created we create a stack of scopes in the context. * jaeger is a dependency now * Carrier inject and extraction for Twisted Headers * Trace federation requests on the way in and out. The span is created in _started_processing and closed in _finished_processing because we need a meaningful log context. * Create logcontext for new scope. Instead of having a stack of scopes in a logcontext we create a new context for a new scope if the current logcontext already has a scope. * Remove scope from logcontext if logcontext is top level * Disable tracer if not configured * typo * Remove dependence on jaeger internals * bools * Set service name * :Explicitely state that the tracer is disabled * Black is the new black * Newsfile * Code style * Use the new config setup. * Generate config. * Copyright * Rename config to opentracing * Remove user whitelisting * Empty whitelist by default * User ConfigError instead of RuntimeError * Use isinstance * Use tag constants for opentracing. * Remove debug comment and no need to explicitely record error * Two errors a "s(c)entry" * Docstrings! * Remove debugging brainslip * Homeserver Whitlisting * Better opentracing config comment * linting * Inclue worker name in service_name * Make opentracing an optional dependency * Neater config retreival * Clean up dummy tags * Instantiate tracing as object instead of global class * Inlcude opentracing as a homeserver member. * Thread opentracing to the request level * Reference opetnracing through hs * Instantiate dummy opentracin g for tests. * About to revert, just keeping the unfinished changes just in case * Revert back to global state, commit number: 9ce4a3d9067bf9889b86c360c05ac88618b85c4f * Use class level methods in tracerutils * Start and stop requests spans in a place where we have access to the authenticated entity * Seen it, isort it * Make sure to close the active span. * I'm getting black and blue from this. * Logger formatting Co-Authored-By: Erik Johnston <erik@matrix.org> * Outdated comment * Import opentracing at the top * Return a contextmanager * Start tracing client requests from the servlet * Return noop context manager if not tracing * Explicitely say that these are federation requests * Include servlet name in client requests * Use context manager * Move opentracing to logging/ * Seen it, isort it again! * Ignore twisted return exceptions on context exit * Escape the scope * Scopes should be entered to make them useful. * Nicer decorator names * Just one init, init? * Don't need to close something that isn't open * Docs make you smarter
This commit is contained in:
parent
1890cfcf82
commit
38a6d3eea7
|
@ -0,0 +1 @@
|
||||||
|
Added opentracing and configuration options.
|
|
@ -1395,3 +1395,20 @@ password_config:
|
||||||
# module: "my_custom_project.SuperRulesSet"
|
# module: "my_custom_project.SuperRulesSet"
|
||||||
# config:
|
# config:
|
||||||
# example_option: 'things'
|
# example_option: 'things'
|
||||||
|
|
||||||
|
|
||||||
|
## Opentracing ##
|
||||||
|
# These settings enable opentracing which implements distributed tracing
|
||||||
|
# This allows you to observe the causal chain of events across servers
|
||||||
|
# including requests, key lookups etc. across any server running
|
||||||
|
# synapse or any other other services which supports opentracing.
|
||||||
|
# (specifically those implemented with jaeger)
|
||||||
|
|
||||||
|
#opentracing:
|
||||||
|
# # Enable / disable tracer
|
||||||
|
# tracer_enabled: false
|
||||||
|
# # The list of homeservers we wish to expose our current traces to.
|
||||||
|
# # The list is a list of regexes which are matched against the
|
||||||
|
# # servername of the homeserver
|
||||||
|
# homeserver_whitelist:
|
||||||
|
# - ".*"
|
||||||
|
|
|
@ -243,6 +243,9 @@ def start(hs, listeners=None):
|
||||||
# Load the certificate from disk.
|
# Load the certificate from disk.
|
||||||
refresh_certificate(hs)
|
refresh_certificate(hs)
|
||||||
|
|
||||||
|
# Start the tracer
|
||||||
|
synapse.logging.opentracing.init_tracer(hs.config)
|
||||||
|
|
||||||
# It is now safe to start your Synapse.
|
# It is now safe to start your Synapse.
|
||||||
hs.start_listening(listeners)
|
hs.start_listening(listeners)
|
||||||
hs.get_datastore().start_profiling()
|
hs.get_datastore().start_profiling()
|
||||||
|
|
|
@ -40,6 +40,7 @@ from .spam_checker import SpamCheckerConfig
|
||||||
from .stats import StatsConfig
|
from .stats import StatsConfig
|
||||||
from .third_party_event_rules import ThirdPartyRulesConfig
|
from .third_party_event_rules import ThirdPartyRulesConfig
|
||||||
from .tls import TlsConfig
|
from .tls import TlsConfig
|
||||||
|
from .tracer import TracerConfig
|
||||||
from .user_directory import UserDirectoryConfig
|
from .user_directory import UserDirectoryConfig
|
||||||
from .voip import VoipConfig
|
from .voip import VoipConfig
|
||||||
from .workers import WorkerConfig
|
from .workers import WorkerConfig
|
||||||
|
@ -75,5 +76,6 @@ class HomeServerConfig(
|
||||||
ServerNoticesConfig,
|
ServerNoticesConfig,
|
||||||
RoomDirectoryConfig,
|
RoomDirectoryConfig,
|
||||||
ThirdPartyRulesConfig,
|
ThirdPartyRulesConfig,
|
||||||
|
TracerConfig,
|
||||||
):
|
):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2019 The Matrix.org Foundation C.I.C.d
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ._base import Config, ConfigError
|
||||||
|
|
||||||
|
|
||||||
|
class TracerConfig(Config):
|
||||||
|
def read_config(self, config, **kwargs):
|
||||||
|
self.tracer_config = config.get("opentracing")
|
||||||
|
|
||||||
|
self.tracer_config = config.get("opentracing", {"tracer_enabled": False})
|
||||||
|
|
||||||
|
if self.tracer_config.get("tracer_enabled", False):
|
||||||
|
# The tracer is enabled so sanitize the config
|
||||||
|
# If no whitelists are given
|
||||||
|
self.tracer_config.setdefault("homeserver_whitelist", [])
|
||||||
|
|
||||||
|
if not isinstance(self.tracer_config.get("homeserver_whitelist"), list):
|
||||||
|
raise ConfigError("Tracer homesererver_whitelist config is malformed")
|
||||||
|
|
||||||
|
def generate_config_section(cls, **kwargs):
|
||||||
|
return """\
|
||||||
|
## Opentracing ##
|
||||||
|
# These settings enable opentracing which implements distributed tracing
|
||||||
|
# This allows you to observe the causal chain of events across servers
|
||||||
|
# including requests, key lookups etc. across any server running
|
||||||
|
# synapse or any other other services which supports opentracing.
|
||||||
|
# (specifically those implemented with jaeger)
|
||||||
|
|
||||||
|
#opentracing:
|
||||||
|
# # Enable / disable tracer
|
||||||
|
# tracer_enabled: false
|
||||||
|
# # The list of homeservers we wish to expose our current traces to.
|
||||||
|
# # The list is a list of regexes which are matched against the
|
||||||
|
# # servername of the homeserver
|
||||||
|
# homeserver_whitelist:
|
||||||
|
# - ".*"
|
||||||
|
"""
|
|
@ -21,6 +21,7 @@ import re
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
|
import synapse.logging.opentracing as opentracing
|
||||||
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
|
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.api.urls import (
|
from synapse.api.urls import (
|
||||||
|
@ -288,14 +289,29 @@ class BaseFederationServlet(object):
|
||||||
logger.warn("authenticate_request failed: %s", e)
|
logger.warn("authenticate_request failed: %s", e)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if origin:
|
# Start an opentracing span
|
||||||
with ratelimiter.ratelimit(origin) as d:
|
with opentracing.start_active_span_from_context(
|
||||||
yield d
|
request.requestHeaders,
|
||||||
|
"incoming-federation-request",
|
||||||
|
tags={
|
||||||
|
"request_id": request.get_request_id(),
|
||||||
|
opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_SERVER,
|
||||||
|
opentracing.tags.HTTP_METHOD: request.get_method(),
|
||||||
|
opentracing.tags.HTTP_URL: request.get_redacted_uri(),
|
||||||
|
opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||||
|
"authenticated_entity": origin,
|
||||||
|
},
|
||||||
|
):
|
||||||
|
if origin:
|
||||||
|
with ratelimiter.ratelimit(origin) as d:
|
||||||
|
yield d
|
||||||
|
response = yield func(
|
||||||
|
origin, content, request.args, *args, **kwargs
|
||||||
|
)
|
||||||
|
else:
|
||||||
response = yield func(
|
response = yield func(
|
||||||
origin, content, request.args, *args, **kwargs
|
origin, content, request.args, *args, **kwargs
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
response = yield func(origin, content, request.args, *args, **kwargs)
|
|
||||||
|
|
||||||
defer.returnValue(response)
|
defer.returnValue(response)
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ from twisted.internet.task import _EPSILON, Cooperator
|
||||||
from twisted.web._newclient import ResponseDone
|
from twisted.web._newclient import ResponseDone
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
|
|
||||||
|
import synapse.logging.opentracing as opentracing
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
import synapse.util.retryutils
|
import synapse.util.retryutils
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
|
@ -339,9 +340,25 @@ class MatrixFederationHttpClient(object):
|
||||||
else:
|
else:
|
||||||
query_bytes = b""
|
query_bytes = b""
|
||||||
|
|
||||||
headers_dict = {b"User-Agent": [self.version_string_bytes]}
|
# Retreive current span
|
||||||
|
scope = opentracing.start_active_span(
|
||||||
|
"outgoing-federation-request",
|
||||||
|
tags={
|
||||||
|
opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_CLIENT,
|
||||||
|
opentracing.tags.PEER_ADDRESS: request.destination,
|
||||||
|
opentracing.tags.HTTP_METHOD: request.method,
|
||||||
|
opentracing.tags.HTTP_URL: request.path,
|
||||||
|
},
|
||||||
|
finish_on_close=True,
|
||||||
|
)
|
||||||
|
|
||||||
with limiter:
|
# Inject the span into the headers
|
||||||
|
headers_dict = {}
|
||||||
|
opentracing.inject_active_span_byte_dict(headers_dict, request.destination)
|
||||||
|
|
||||||
|
headers_dict[b"User-Agent"] = [self.version_string_bytes]
|
||||||
|
|
||||||
|
with limiter, scope:
|
||||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||||
# (once we have reliable transactions in place)
|
# (once we have reliable transactions in place)
|
||||||
if long_retries:
|
if long_retries:
|
||||||
|
@ -419,6 +436,10 @@ class MatrixFederationHttpClient(object):
|
||||||
response.phrase.decode("ascii", errors="replace"),
|
response.phrase.decode("ascii", errors="replace"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
opentracing.set_tag(
|
||||||
|
opentracing.tags.HTTP_STATUS_CODE, response.code
|
||||||
|
)
|
||||||
|
|
||||||
if 200 <= response.code < 300:
|
if 200 <= response.code < 300:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
@ -499,8 +520,7 @@ class MatrixFederationHttpClient(object):
|
||||||
_flatten_response_never_received(e),
|
_flatten_response_never_received(e),
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
defer.returnValue(response)
|
||||||
defer.returnValue(response)
|
|
||||||
|
|
||||||
def build_auth_headers(
|
def build_auth_headers(
|
||||||
self, destination, method, url_bytes, content=None, destination_is=None
|
self, destination, method, url_bytes, content=None, destination_is=None
|
||||||
|
|
|
@ -20,6 +20,7 @@ import logging
|
||||||
from canonicaljson import json
|
from canonicaljson import json
|
||||||
|
|
||||||
from synapse.api.errors import Codes, SynapseError
|
from synapse.api.errors import Codes, SynapseError
|
||||||
|
from synapse.logging.opentracing import trace_servlet
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -290,7 +291,11 @@ class RestServlet(object):
|
||||||
for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"):
|
for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"):
|
||||||
if hasattr(self, "on_%s" % (method,)):
|
if hasattr(self, "on_%s" % (method,)):
|
||||||
method_handler = getattr(self, "on_%s" % (method,))
|
method_handler = getattr(self, "on_%s" % (method,))
|
||||||
http_server.register_paths(method, patterns, method_handler)
|
http_server.register_paths(
|
||||||
|
method,
|
||||||
|
patterns,
|
||||||
|
trace_servlet(self.__class__.__name__, method_handler),
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError("RestServlet must register something.")
|
raise NotImplementedError("RestServlet must register something.")
|
||||||
|
|
|
@ -186,6 +186,7 @@ class LoggingContext(object):
|
||||||
"alive",
|
"alive",
|
||||||
"request",
|
"request",
|
||||||
"tag",
|
"tag",
|
||||||
|
"scope",
|
||||||
]
|
]
|
||||||
|
|
||||||
thread_local = threading.local()
|
thread_local = threading.local()
|
||||||
|
@ -238,6 +239,7 @@ class LoggingContext(object):
|
||||||
self.request = None
|
self.request = None
|
||||||
self.tag = ""
|
self.tag = ""
|
||||||
self.alive = True
|
self.alive = True
|
||||||
|
self.scope = None
|
||||||
|
|
||||||
self.parent_context = parent_context
|
self.parent_context = parent_context
|
||||||
|
|
||||||
|
@ -322,10 +324,12 @@ class LoggingContext(object):
|
||||||
another LoggingContext
|
another LoggingContext
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# 'request' is the only field we currently use in the logger, so that's
|
# we track the current request
|
||||||
# all we need to copy
|
|
||||||
record.request = self.request
|
record.request = self.request
|
||||||
|
|
||||||
|
# we also track the current scope:
|
||||||
|
record.scope = self.scope
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if get_thread_id() != self.main_thread:
|
if get_thread_id() != self.main_thread:
|
||||||
logger.warning("Started logcontext %s on different thread", self)
|
logger.warning("Started logcontext %s on different thread", self)
|
||||||
|
|
|
@ -0,0 +1,362 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2019 The Matrix.org Foundation C.I.C.d
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.import opentracing
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE
|
||||||
|
# This is a small wrapper around opentracing because opentracing is not currently
|
||||||
|
# packaged downstream (specifically debian). Since opentracing instrumentation is
|
||||||
|
# fairly invasive it was awkward to make it optional. As a result we opted to encapsulate
|
||||||
|
# all opentracing state in these methods which effectively noop if opentracing is
|
||||||
|
# not present. We should strongly consider encouraging the downstream distributers
|
||||||
|
# to package opentracing and making opentracing a full dependency. In order to facilitate
|
||||||
|
# this move the methods have work very similarly to opentracing's and it should only
|
||||||
|
# be a matter of few regexes to move over to opentracing's access patterns proper.
|
||||||
|
|
||||||
|
try:
|
||||||
|
import opentracing
|
||||||
|
except ImportError:
|
||||||
|
opentracing = None
|
||||||
|
try:
|
||||||
|
from jaeger_client import Config as JaegerConfig
|
||||||
|
from synapse.logging.scopecontextmanager import LogContextScopeManager
|
||||||
|
except ImportError:
|
||||||
|
JaegerConfig = None
|
||||||
|
LogContextScopeManager = None
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class _DumTagNames(object):
|
||||||
|
"""wrapper of opentracings tags. We need to have them if we
|
||||||
|
want to reference them without opentracing around. Clearly they
|
||||||
|
should never actually show up in a trace. `set_tags` overwrites
|
||||||
|
these with the correct ones."""
|
||||||
|
|
||||||
|
INVALID_TAG = "invalid-tag"
|
||||||
|
COMPONENT = INVALID_TAG
|
||||||
|
DATABASE_INSTANCE = INVALID_TAG
|
||||||
|
DATABASE_STATEMENT = INVALID_TAG
|
||||||
|
DATABASE_TYPE = INVALID_TAG
|
||||||
|
DATABASE_USER = INVALID_TAG
|
||||||
|
ERROR = INVALID_TAG
|
||||||
|
HTTP_METHOD = INVALID_TAG
|
||||||
|
HTTP_STATUS_CODE = INVALID_TAG
|
||||||
|
HTTP_URL = INVALID_TAG
|
||||||
|
MESSAGE_BUS_DESTINATION = INVALID_TAG
|
||||||
|
PEER_ADDRESS = INVALID_TAG
|
||||||
|
PEER_HOSTNAME = INVALID_TAG
|
||||||
|
PEER_HOST_IPV4 = INVALID_TAG
|
||||||
|
PEER_HOST_IPV6 = INVALID_TAG
|
||||||
|
PEER_PORT = INVALID_TAG
|
||||||
|
PEER_SERVICE = INVALID_TAG
|
||||||
|
SAMPLING_PRIORITY = INVALID_TAG
|
||||||
|
SERVICE = INVALID_TAG
|
||||||
|
SPAN_KIND = INVALID_TAG
|
||||||
|
SPAN_KIND_CONSUMER = INVALID_TAG
|
||||||
|
SPAN_KIND_PRODUCER = INVALID_TAG
|
||||||
|
SPAN_KIND_RPC_CLIENT = INVALID_TAG
|
||||||
|
SPAN_KIND_RPC_SERVER = INVALID_TAG
|
||||||
|
|
||||||
|
|
||||||
|
def only_if_tracing(func):
|
||||||
|
"""Executes the function only if we're tracing. Otherwise return.
|
||||||
|
Assumes the function wrapped may return None"""
|
||||||
|
|
||||||
|
@wraps(func)
|
||||||
|
def _only_if_tracing_inner(*args, **kwargs):
|
||||||
|
if opentracing:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
return _only_if_tracing_inner
|
||||||
|
|
||||||
|
|
||||||
|
# Block everything by default
|
||||||
|
_homeserver_whitelist = None
|
||||||
|
|
||||||
|
tags = _DumTagNames
|
||||||
|
|
||||||
|
|
||||||
|
def init_tracer(config):
|
||||||
|
"""Set the whitelists and initialise the JaegerClient tracer
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config (Config)
|
||||||
|
The config used by the homeserver. Here it's used to set the service
|
||||||
|
name to the homeserver's.
|
||||||
|
"""
|
||||||
|
global opentracing
|
||||||
|
if not config.tracer_config.get("tracer_enabled", False):
|
||||||
|
# We don't have a tracer
|
||||||
|
opentracing = None
|
||||||
|
return
|
||||||
|
|
||||||
|
if not opentracing:
|
||||||
|
logger.error(
|
||||||
|
"The server has been configure to use opentracing but opentracing is not installed."
|
||||||
|
)
|
||||||
|
raise ModuleNotFoundError("opentracing")
|
||||||
|
|
||||||
|
if not JaegerConfig:
|
||||||
|
logger.error(
|
||||||
|
"The server has been configure to use opentracing but opentracing is not installed."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Include the worker name
|
||||||
|
name = config.worker_name if config.worker_name else "master"
|
||||||
|
|
||||||
|
set_homeserver_whitelist(config.tracer_config["homeserver_whitelist"])
|
||||||
|
jaeger_config = JaegerConfig(
|
||||||
|
config={"sampler": {"type": "const", "param": 1}, "logging": True},
|
||||||
|
service_name="{} {}".format(config.server_name, name),
|
||||||
|
scope_manager=LogContextScopeManager(config),
|
||||||
|
)
|
||||||
|
jaeger_config.initialize_tracer()
|
||||||
|
|
||||||
|
# Set up tags to be opentracing's tags
|
||||||
|
global tags
|
||||||
|
tags = opentracing.tags
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _noop_context_manager(*args, **kwargs):
|
||||||
|
"""Does absolutely nothing really well. Can be entered and exited arbitrarily.
|
||||||
|
Good substitute for an opentracing scope."""
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
# Could use kwargs but I want these to be explicit
|
||||||
|
def start_active_span(
|
||||||
|
operation_name,
|
||||||
|
child_of=None,
|
||||||
|
references=None,
|
||||||
|
tags=None,
|
||||||
|
start_time=None,
|
||||||
|
ignore_active_span=False,
|
||||||
|
finish_on_close=True,
|
||||||
|
):
|
||||||
|
"""Starts an active opentracing span. Note, the scope doesn't become active
|
||||||
|
until it has been entered, however, the span starts from the time this
|
||||||
|
message is called.
|
||||||
|
Args:
|
||||||
|
See opentracing.tracer
|
||||||
|
Returns:
|
||||||
|
scope (Scope) or noop_context_manager
|
||||||
|
"""
|
||||||
|
if opentracing is None:
|
||||||
|
return _noop_context_manager()
|
||||||
|
else:
|
||||||
|
# We need to enter the scope here for the logcontext to become active
|
||||||
|
return opentracing.tracer.start_active_span(
|
||||||
|
operation_name,
|
||||||
|
child_of=child_of,
|
||||||
|
references=references,
|
||||||
|
tags=tags,
|
||||||
|
start_time=start_time,
|
||||||
|
ignore_active_span=ignore_active_span,
|
||||||
|
finish_on_close=finish_on_close,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@only_if_tracing
|
||||||
|
def close_active_span():
|
||||||
|
"""Closes the active span. This will close it's logcontext if the context
|
||||||
|
was made for the span"""
|
||||||
|
opentracing.tracer.scope_manager.active.__exit__(None, None, None)
|
||||||
|
|
||||||
|
|
||||||
|
@only_if_tracing
|
||||||
|
def set_tag(key, value):
|
||||||
|
"""Set's a tag on the active span"""
|
||||||
|
opentracing.tracer.active_span.set_tag(key, value)
|
||||||
|
|
||||||
|
|
||||||
|
@only_if_tracing
|
||||||
|
def log_kv(key_values, timestamp=None):
|
||||||
|
"""Log to the active span"""
|
||||||
|
opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
||||||
|
|
||||||
|
|
||||||
|
# Note: we don't have a get baggage items because we're trying to hide all
|
||||||
|
# scope and span state from synapse. I think this method may also be useless
|
||||||
|
# as a result
|
||||||
|
@only_if_tracing
|
||||||
|
def set_baggage_item(key, value):
|
||||||
|
"""Attach baggage to the active span"""
|
||||||
|
opentracing.tracer.active_span.set_baggage_item(key, value)
|
||||||
|
|
||||||
|
|
||||||
|
@only_if_tracing
|
||||||
|
def set_operation_name(operation_name):
|
||||||
|
"""Sets the operation name of the active span"""
|
||||||
|
opentracing.tracer.active_span.set_operation_name(operation_name)
|
||||||
|
|
||||||
|
|
||||||
|
@only_if_tracing
|
||||||
|
def set_homeserver_whitelist(homeserver_whitelist):
|
||||||
|
"""Sets the whitelist
|
||||||
|
|
||||||
|
Args:
|
||||||
|
homeserver_whitelist (iterable of strings): regex of whitelisted homeservers
|
||||||
|
"""
|
||||||
|
global _homeserver_whitelist
|
||||||
|
if homeserver_whitelist:
|
||||||
|
# Makes a single regex which accepts all passed in regexes in the list
|
||||||
|
_homeserver_whitelist = re.compile(
|
||||||
|
"({})".format(")|(".join(homeserver_whitelist))
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@only_if_tracing
|
||||||
|
def whitelisted_homeserver(destination):
|
||||||
|
"""Checks if a destination matches the whitelist
|
||||||
|
Args:
|
||||||
|
destination (String)"""
|
||||||
|
global _homeserver_whitelist
|
||||||
|
if _homeserver_whitelist:
|
||||||
|
return _homeserver_whitelist.match(destination)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def start_active_span_from_context(
|
||||||
|
headers,
|
||||||
|
operation_name,
|
||||||
|
references=None,
|
||||||
|
tags=None,
|
||||||
|
start_time=None,
|
||||||
|
ignore_active_span=False,
|
||||||
|
finish_on_close=True,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Extracts a span context from Twisted Headers.
|
||||||
|
args:
|
||||||
|
headers (twisted.web.http_headers.Headers)
|
||||||
|
returns:
|
||||||
|
span_context (opentracing.span.SpanContext)
|
||||||
|
"""
|
||||||
|
# Twisted encodes the values as lists whereas opentracing doesn't.
|
||||||
|
# So, we take the first item in the list.
|
||||||
|
# Also, twisted uses byte arrays while opentracing expects strings.
|
||||||
|
if opentracing is None:
|
||||||
|
return _noop_context_manager()
|
||||||
|
|
||||||
|
header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()}
|
||||||
|
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
|
||||||
|
|
||||||
|
return opentracing.tracer.start_active_span(
|
||||||
|
operation_name,
|
||||||
|
child_of=context,
|
||||||
|
references=references,
|
||||||
|
tags=tags,
|
||||||
|
start_time=start_time,
|
||||||
|
ignore_active_span=ignore_active_span,
|
||||||
|
finish_on_close=finish_on_close,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@only_if_tracing
|
||||||
|
def inject_active_span_twisted_headers(headers, destination):
|
||||||
|
"""
|
||||||
|
Injects a span context into twisted headers inplace
|
||||||
|
|
||||||
|
Args:
|
||||||
|
headers (twisted.web.http_headers.Headers)
|
||||||
|
span (opentracing.Span)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Inplace modification of headers
|
||||||
|
|
||||||
|
Note:
|
||||||
|
The headers set by the tracer are custom to the tracer implementation which
|
||||||
|
should be unique enough that they don't interfere with any headers set by
|
||||||
|
synapse or twisted. If we're still using jaeger these headers would be those
|
||||||
|
here:
|
||||||
|
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not whitelisted_homeserver(destination):
|
||||||
|
return
|
||||||
|
|
||||||
|
span = opentracing.tracer.active_span
|
||||||
|
carrier = {}
|
||||||
|
opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier)
|
||||||
|
|
||||||
|
for key, value in carrier.items():
|
||||||
|
headers.addRawHeaders(key, value)
|
||||||
|
|
||||||
|
|
||||||
|
@only_if_tracing
|
||||||
|
def inject_active_span_byte_dict(headers, destination):
|
||||||
|
"""
|
||||||
|
Injects a span context into a dict where the headers are encoded as byte
|
||||||
|
strings
|
||||||
|
|
||||||
|
Args:
|
||||||
|
headers (dict)
|
||||||
|
span (opentracing.Span)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Inplace modification of headers
|
||||||
|
|
||||||
|
Note:
|
||||||
|
The headers set by the tracer are custom to the tracer implementation which
|
||||||
|
should be unique enough that they don't interfere with any headers set by
|
||||||
|
synapse or twisted. If we're still using jaeger these headers would be those
|
||||||
|
here:
|
||||||
|
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
|
||||||
|
"""
|
||||||
|
if not whitelisted_homeserver(destination):
|
||||||
|
return
|
||||||
|
|
||||||
|
span = opentracing.tracer.active_span
|
||||||
|
|
||||||
|
carrier = {}
|
||||||
|
opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier)
|
||||||
|
|
||||||
|
for key, value in carrier.items():
|
||||||
|
headers[key.encode()] = [value.encode()]
|
||||||
|
|
||||||
|
|
||||||
|
def trace_servlet(servlet_name, func):
|
||||||
|
"""Decorator which traces a serlet. It starts a span with some servlet specific
|
||||||
|
tags such as the servlet_name and request information"""
|
||||||
|
|
||||||
|
@wraps(func)
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _trace_servlet_inner(request, *args, **kwargs):
|
||||||
|
with start_active_span_from_context(
|
||||||
|
request.requestHeaders,
|
||||||
|
"incoming-client-request",
|
||||||
|
tags={
|
||||||
|
"request_id": request.get_request_id(),
|
||||||
|
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
|
||||||
|
tags.HTTP_METHOD: request.get_method(),
|
||||||
|
tags.HTTP_URL: request.get_redacted_uri(),
|
||||||
|
tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||||
|
"servlet_name": servlet_name,
|
||||||
|
},
|
||||||
|
):
|
||||||
|
result = yield defer.maybeDeferred(func, request, *args, **kwargs)
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
return _trace_servlet_inner
|
|
@ -0,0 +1,140 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.import logging
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from opentracing import Scope, ScopeManager
|
||||||
|
|
||||||
|
import twisted
|
||||||
|
|
||||||
|
from synapse.logging.context import LoggingContext, nested_logging_context
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class LogContextScopeManager(ScopeManager):
|
||||||
|
"""
|
||||||
|
The LogContextScopeManager tracks the active scope in opentracing
|
||||||
|
by using the log contexts which are native to synapse. This is so
|
||||||
|
that the basic opentracing api can be used across twisted defereds.
|
||||||
|
(I would love to break logcontexts and this into an OS package. but
|
||||||
|
let's wait for twisted's contexts to be released.)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, config):
|
||||||
|
# Set the whitelists
|
||||||
|
logger.info(config.tracer_config)
|
||||||
|
self._homeserver_whitelist = config.tracer_config["homeserver_whitelist"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active(self):
|
||||||
|
"""
|
||||||
|
Returns the currently active Scope which can be used to access the
|
||||||
|
currently active Scope.span.
|
||||||
|
If there is a non-null Scope, its wrapped Span
|
||||||
|
becomes an implicit parent of any newly-created Span at
|
||||||
|
Tracer.start_active_span() time.
|
||||||
|
|
||||||
|
Return:
|
||||||
|
(Scope) : the Scope that is active, or None if not
|
||||||
|
available.
|
||||||
|
"""
|
||||||
|
ctx = LoggingContext.current_context()
|
||||||
|
if ctx is LoggingContext.sentinel:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
return ctx.scope
|
||||||
|
|
||||||
|
def activate(self, span, finish_on_close):
|
||||||
|
"""
|
||||||
|
Makes a Span active.
|
||||||
|
Args
|
||||||
|
span (Span): the span that should become active.
|
||||||
|
finish_on_close (Boolean): whether Span should be automatically
|
||||||
|
finished when Scope.close() is called.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Scope to control the end of the active period for
|
||||||
|
*span*. It is a programming error to neglect to call
|
||||||
|
Scope.close() on the returned instance.
|
||||||
|
"""
|
||||||
|
|
||||||
|
enter_logcontext = False
|
||||||
|
ctx = LoggingContext.current_context()
|
||||||
|
|
||||||
|
if ctx is LoggingContext.sentinel:
|
||||||
|
# We don't want this scope to affect.
|
||||||
|
logger.error("Tried to activate scope outside of loggingcontext")
|
||||||
|
return Scope(None, span)
|
||||||
|
elif ctx.scope is not None:
|
||||||
|
# We want the logging scope to look exactly the same so we give it
|
||||||
|
# a blank suffix
|
||||||
|
ctx = nested_logging_context("")
|
||||||
|
enter_logcontext = True
|
||||||
|
|
||||||
|
scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
|
||||||
|
ctx.scope = scope
|
||||||
|
return scope
|
||||||
|
|
||||||
|
|
||||||
|
class _LogContextScope(Scope):
|
||||||
|
"""
|
||||||
|
A custom opentracing scope. The only significant difference is that it will
|
||||||
|
close the log context it's related to if the logcontext was created specifically
|
||||||
|
for this scope.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, manager, span, logcontext, enter_logcontext, finish_on_close):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
manager (LogContextScopeManager):
|
||||||
|
the manager that is responsible for this scope.
|
||||||
|
span (Span):
|
||||||
|
the opentracing span which this scope represents the local
|
||||||
|
lifetime for.
|
||||||
|
logcontext (LogContext):
|
||||||
|
the logcontext to which this scope is attached.
|
||||||
|
enter_logcontext (Boolean):
|
||||||
|
if True the logcontext will be entered and exited when the scope
|
||||||
|
is entered and exited respectively
|
||||||
|
finish_on_close (Boolean):
|
||||||
|
if True finish the span when the scope is closed
|
||||||
|
"""
|
||||||
|
super(_LogContextScope, self).__init__(manager, span)
|
||||||
|
self.logcontext = logcontext
|
||||||
|
self._finish_on_close = finish_on_close
|
||||||
|
self._enter_logcontext = enter_logcontext
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
if self._enter_logcontext:
|
||||||
|
self.logcontext.__enter__()
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
if type == twisted.internet.defer._DefGen_Return:
|
||||||
|
super(_LogContextScope, self).__exit__(None, None, None)
|
||||||
|
else:
|
||||||
|
super(_LogContextScope, self).__exit__(type, value, traceback)
|
||||||
|
if self._enter_logcontext:
|
||||||
|
self.logcontext.__exit__(type, value, traceback)
|
||||||
|
else: # the logcontext existed before the creation of the scope
|
||||||
|
self.logcontext.scope = None
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.manager.active is not self:
|
||||||
|
logger.error("Tried to close a none active scope!")
|
||||||
|
return
|
||||||
|
|
||||||
|
if self._finish_on_close:
|
||||||
|
self.span.finish()
|
|
@ -95,6 +95,7 @@ CONDITIONAL_REQUIREMENTS = {
|
||||||
"url_preview": ["lxml>=3.5.0"],
|
"url_preview": ["lxml>=3.5.0"],
|
||||||
"test": ["mock>=2.0", "parameterized"],
|
"test": ["mock>=2.0", "parameterized"],
|
||||||
"sentry": ["sentry-sdk>=0.7.2"],
|
"sentry": ["sentry-sdk>=0.7.2"],
|
||||||
|
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
|
||||||
"jwt": ["pyjwt>=1.6.4"],
|
"jwt": ["pyjwt>=1.6.4"],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue