Move RequestMetrics handling into SynapseRequest.processing()

It fits quite nicely here, and opens the path to getting rid of the
"include_metrics" mess.
This commit is contained in:
Richard van der Hoff 2018-05-09 23:05:14 +01:00
parent 09b29f9c4a
commit c6f730282c
2 changed files with 64 additions and 24 deletions

View File

@ -19,7 +19,7 @@ from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
) )
from synapse.http.request_metrics import ( from synapse.http.request_metrics import (
RequestMetrics, requests_counter, requests_counter,
outgoing_responses_counter, outgoing_responses_counter,
) )
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
@ -81,19 +81,18 @@ def wrap_request_handler(request_handler, include_metrics=False):
with LoggingContext(request_id) as request_context: with LoggingContext(request_id) as request_context:
request_context.request = request_id request_context.request = request_id
with Measure(self.clock, "wrapped_request_handler"): with Measure(self.clock, "wrapped_request_handler"):
request_metrics = RequestMetrics()
# we start the request metrics timer here with an initial stab # we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be # at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render # JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet. # will update it once it picks a servlet.
servlet_name = self.__class__.__name__ servlet_name = self.__class__.__name__
request_metrics.start(self.clock.time_msec(), name=servlet_name) with request.processing(servlet_name):
with request.processing():
try: try:
with PreserveLoggingContext(request_context): with PreserveLoggingContext(request_context):
if include_metrics: if include_metrics:
yield request_handler(self, request, request_metrics) yield request_handler(
self, request, request.request_metrics,
)
else: else:
requests_counter.inc(request.method, servlet_name) requests_counter.inc(request.method, servlet_name)
yield request_handler(self, request) yield request_handler(self, request)
@ -135,13 +134,7 @@ def wrap_request_handler(request_handler, include_metrics=False):
pretty_print=_request_user_agent_is_curl(request), pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string, version_string=self.version_string,
) )
finally:
try:
request_metrics.stop(
self.clock.time_msec(), request
)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
return wrapped_request_handler return wrapped_request_handler

View File

@ -12,20 +12,38 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.util.logcontext import LoggingContext
from twisted.web.server import Site, Request
import contextlib import contextlib
import logging import logging
import re import re
import time import time
from twisted.web.server import Site, Request
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
_next_request_seq = 0 _next_request_seq = 0
class SynapseRequest(Request): class SynapseRequest(Request):
"""Class which encapsulates an HTTP request to synapse.
All of the requests processed in synapse are of this type.
It extends twisted's twisted.web.server.Request, and adds:
* Unique request ID
* Redaction of access_token query-params in __repr__
* Logging at start and end
* Metrics to record CPU, wallclock and DB time by endpoint.
It provides a method `processing` which should be called by the Resource
which is handling the request, and returns a context manager.
"""
def __init__(self, site, *args, **kw): def __init__(self, site, *args, **kw):
Request.__init__(self, *args, **kw) Request.__init__(self, *args, **kw)
self.site = site self.site = site
@ -59,7 +77,11 @@ class SynapseRequest(Request):
def get_user_agent(self): def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def started_processing(self): def _started_processing(self, servlet_name):
self.start_time = int(time.time() * 1000)
self.request_metrics = RequestMetrics()
self.request_metrics.start(self.start_time, name=servlet_name)
self.site.access_logger.info( self.site.access_logger.info(
"%s - %s - Received request: %s %s", "%s - %s - Received request: %s %s",
self.getClientIP(), self.getClientIP(),
@ -67,10 +89,8 @@ class SynapseRequest(Request):
self.method, self.method,
self.get_redacted_uri() self.get_redacted_uri()
) )
self.start_time = int(time.time() * 1000)
def finished_processing(self):
def _finished_processing(self):
try: try:
context = LoggingContext.current_context() context = LoggingContext.current_context()
ru_utime, ru_stime = context.get_resource_usage() ru_utime, ru_stime = context.get_resource_usage()
@ -81,6 +101,8 @@ class SynapseRequest(Request):
ru_utime, ru_stime = (0, 0) ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration_ms = (0, 0) db_txn_count, db_txn_duration_ms = (0, 0)
end_time = int(time.time() * 1000)
self.site.access_logger.info( self.site.access_logger.info(
"%s - %s - {%s}" "%s - %s - {%s}"
" Processed request: %dms (%dms, %dms) (%dms/%dms/%d)" " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
@ -88,7 +110,7 @@ class SynapseRequest(Request):
self.getClientIP(), self.getClientIP(),
self.site.site_tag, self.site.site_tag,
self.authenticated_entity, self.authenticated_entity,
int(time.time() * 1000) - self.start_time, end_time - self.start_time,
int(ru_utime * 1000), int(ru_utime * 1000),
int(ru_stime * 1000), int(ru_stime * 1000),
db_sched_duration_ms, db_sched_duration_ms,
@ -102,11 +124,36 @@ class SynapseRequest(Request):
self.get_user_agent(), self.get_user_agent(),
) )
try:
self.request_metrics.stop(end_time, self)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
@contextlib.contextmanager @contextlib.contextmanager
def processing(self): def processing(self, servlet_name):
self.started_processing() """Record the fact that we are processing this request.
Returns a context manager; the correct way to use this is:
@defer.inlineCallbacks
def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
This will log the request's arrival. Once the context manager is
closed, the completion of the request will be logged, and the various
metrics will be updated.
Args:
servlet_name (str): the name of the servlet which will be
processing this request. This is used in the metrics.
It is possible to update this afterwards by updating
self.request_metrics.servlet_name.
"""
self._started_processing(servlet_name)
yield yield
self.finished_processing() self._finished_processing()
class XForwardedForRequest(SynapseRequest): class XForwardedForRequest(SynapseRequest):