From 0cb169900ebd39b6f46dbff1b1909cc5b3c17493 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 11 Aug 2020 16:08:10 +0100 Subject: [PATCH 1/7] Implement login blocking based on SAML attributes (#8052) Hopefully this mostly speaks for itself. I also did a bit of cleaning up of the error handling. Fixes #8047 --- changelog.d/8052.feature | 1 + docs/sample_config.yaml | 11 ++++++ synapse/config/_util.py | 49 ++++++++++++++++++++++++++ synapse/config/saml2_config.py | 50 +++++++++++++++++++++++++++ synapse/handlers/saml_handler.py | 42 ++++++++++++++++++---- synapse/res/templates/saml_error.html | 17 ++++++--- 6 files changed, 159 insertions(+), 11 deletions(-) create mode 100644 changelog.d/8052.feature create mode 100644 synapse/config/_util.py diff --git a/changelog.d/8052.feature b/changelog.d/8052.feature new file mode 100644 index 0000000000..6aa020c764 --- /dev/null +++ b/changelog.d/8052.feature @@ -0,0 +1 @@ +Allow login to be blocked based on the values of SAML attributes. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index fe85978a1f..9235b89fb1 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1577,6 +1577,17 @@ saml2_config: # #grandfathered_mxid_source_attribute: upn + # It is possible to configure Synapse to only allow logins if SAML attributes + # match particular values. The requirements can be listed under + # `attribute_requirements` as shown below. All of the listed attributes must + # match for the login to be permitted. + # + #attribute_requirements: + # - attribute: userGroup + # value: "staff" + # - attribute: department + # value: "sales" + # Directory in which Synapse will try to find the template files below. # If not set, default templates from within the Synapse package will be used. # diff --git a/synapse/config/_util.py b/synapse/config/_util.py new file mode 100644 index 0000000000..cd31b1c3c9 --- /dev/null +++ b/synapse/config/_util.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Any, List + +import jsonschema + +from synapse.config._base import ConfigError +from synapse.types import JsonDict + + +def validate_config(json_schema: JsonDict, config: Any, config_path: List[str]) -> None: + """Validates a config setting against a JsonSchema definition + + This can be used to validate a section of the config file against a schema + definition. If the validation fails, a ConfigError is raised with a textual + description of the problem. + + Args: + json_schema: the schema to validate against + config: the configuration value to be validated + config_path: the path within the config file. This will be used as a basis + for the error message. + """ + try: + jsonschema.validate(config, json_schema) + except jsonschema.ValidationError as e: + # copy `config_path` before modifying it. + path = list(config_path) + for p in list(e.path): + if isinstance(p, int): + path.append("" % p) + else: + path.append(str(p)) + + raise ConfigError( + "Unable to parse configuration: %s at %s" % (e.message, ".".join(path)) + ) diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py index 293643b2de..9277b5f342 100644 --- a/synapse/config/saml2_config.py +++ b/synapse/config/saml2_config.py @@ -15,7 +15,9 @@ # limitations under the License. import logging +from typing import Any, List +import attr import jinja2 import pkg_resources @@ -23,6 +25,7 @@ from synapse.python_dependencies import DependencyException, check_requirements from synapse.util.module_loader import load_module, load_python_module from ._base import Config, ConfigError +from ._util import validate_config logger = logging.getLogger(__name__) @@ -80,6 +83,11 @@ class SAML2Config(Config): self.saml2_enabled = True + attribute_requirements = saml2_config.get("attribute_requirements") or [] + self.attribute_requirements = _parse_attribute_requirements_def( + attribute_requirements + ) + self.saml2_grandfathered_mxid_source_attribute = saml2_config.get( "grandfathered_mxid_source_attribute", "uid" ) @@ -341,6 +349,17 @@ class SAML2Config(Config): # #grandfathered_mxid_source_attribute: upn + # It is possible to configure Synapse to only allow logins if SAML attributes + # match particular values. The requirements can be listed under + # `attribute_requirements` as shown below. All of the listed attributes must + # match for the login to be permitted. + # + #attribute_requirements: + # - attribute: userGroup + # value: "staff" + # - attribute: department + # value: "sales" + # Directory in which Synapse will try to find the template files below. # If not set, default templates from within the Synapse package will be used. # @@ -368,3 +387,34 @@ class SAML2Config(Config): """ % { "config_dir_path": config_dir_path } + + +@attr.s(frozen=True) +class SamlAttributeRequirement: + """Object describing a single requirement for SAML attributes.""" + + attribute = attr.ib(type=str) + value = attr.ib(type=str) + + JSON_SCHEMA = { + "type": "object", + "properties": {"attribute": {"type": "string"}, "value": {"type": "string"}}, + "required": ["attribute", "value"], + } + + +ATTRIBUTE_REQUIREMENTS_SCHEMA = { + "type": "array", + "items": SamlAttributeRequirement.JSON_SCHEMA, +} + + +def _parse_attribute_requirements_def( + attribute_requirements: Any, +) -> List[SamlAttributeRequirement]: + validate_config( + ATTRIBUTE_REQUIREMENTS_SCHEMA, + attribute_requirements, + config_path=["saml2_config", "attribute_requirements"], + ) + return [SamlAttributeRequirement(**x) for x in attribute_requirements] diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py index 2d506dc1f2..c1fcb98454 100644 --- a/synapse/handlers/saml_handler.py +++ b/synapse/handlers/saml_handler.py @@ -14,15 +14,16 @@ # limitations under the License. import logging import re -from typing import Callable, Dict, Optional, Set, Tuple +from typing import TYPE_CHECKING, Callable, Dict, Optional, Set, Tuple import attr import saml2 import saml2.response from saml2.client import Saml2Client -from synapse.api.errors import SynapseError +from synapse.api.errors import AuthError, SynapseError from synapse.config import ConfigError +from synapse.config.saml2_config import SamlAttributeRequirement from synapse.http.servlet import parse_string from synapse.http.site import SynapseRequest from synapse.module_api import ModuleApi @@ -34,6 +35,9 @@ from synapse.types import ( from synapse.util.async_helpers import Linearizer from synapse.util.iterutils import chunk_seq +if TYPE_CHECKING: + import synapse.server + logger = logging.getLogger(__name__) @@ -49,7 +53,7 @@ class Saml2SessionData: class SamlHandler: - def __init__(self, hs): + def __init__(self, hs: "synapse.server.HomeServer"): self._saml_client = Saml2Client(hs.config.saml2_sp_config) self._auth = hs.get_auth() self._auth_handler = hs.get_auth_handler() @@ -62,6 +66,7 @@ class SamlHandler: self._grandfathered_mxid_source_attribute = ( hs.config.saml2_grandfathered_mxid_source_attribute ) + self._saml2_attribute_requirements = hs.config.saml2.attribute_requirements # plugin to do custom mapping from saml response to mxid self._user_mapping_provider = hs.config.saml2_user_mapping_provider_class( @@ -73,7 +78,7 @@ class SamlHandler: self._auth_provider_id = "saml" # a map from saml session id to Saml2SessionData object - self._outstanding_requests_dict = {} + self._outstanding_requests_dict = {} # type: Dict[str, Saml2SessionData] # a lock on the mappings self._mapping_lock = Linearizer(name="saml_mapping", clock=self._clock) @@ -165,11 +170,18 @@ class SamlHandler: saml2.BINDING_HTTP_POST, outstanding=self._outstanding_requests_dict, ) + except saml2.response.UnsolicitedResponse as e: + # the pysaml2 library helpfully logs an ERROR here, but neglects to log + # the session ID. I don't really want to put the full text of the exception + # in the (user-visible) exception message, so let's log the exception here + # so we can track down the session IDs later. + logger.warning(str(e)) + raise SynapseError(400, "Unexpected SAML2 login.") except Exception as e: - raise SynapseError(400, "Unable to parse SAML2 response: %s" % (e,)) + raise SynapseError(400, "Unable to parse SAML2 response: %s." % (e,)) if saml2_auth.not_signed: - raise SynapseError(400, "SAML2 response was not signed") + raise SynapseError(400, "SAML2 response was not signed.") logger.debug("SAML2 response: %s", saml2_auth.origxml) for assertion in saml2_auth.assertions: @@ -188,6 +200,9 @@ class SamlHandler: saml2_auth.in_response_to, None ) + for requirement in self._saml2_attribute_requirements: + _check_attribute_requirement(saml2_auth.ava, requirement) + remote_user_id = self._user_mapping_provider.get_remote_user_id( saml2_auth, client_redirect_url ) @@ -294,6 +309,21 @@ class SamlHandler: del self._outstanding_requests_dict[reqid] +def _check_attribute_requirement(ava: dict, req: SamlAttributeRequirement): + values = ava.get(req.attribute, []) + for v in values: + if v == req.value: + return + + logger.info( + "SAML2 attribute %s did not match required value '%s' (was '%s')", + req.attribute, + req.value, + values, + ) + raise AuthError(403, "You are not authorized to log in here.") + + DOT_REPLACE_PATTERN = re.compile( ("[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters)),)) ) diff --git a/synapse/res/templates/saml_error.html b/synapse/res/templates/saml_error.html index bfd6449c5d..01cd9bdaf3 100644 --- a/synapse/res/templates/saml_error.html +++ b/synapse/res/templates/saml_error.html @@ -2,10 +2,17 @@ - SSO error + SSO login error -

Oops! Something went wrong during authentication.

+{# a 403 means we have actively rejected their login #} +{% if code == 403 %} +

You are not allowed to log in here.

+{% else %} +

+ There was an error during authentication: +

+
{{ msg }}

If you are seeing this page after clicking a link sent to you via email, make sure you only click the confirmation link once, and that you open the @@ -37,9 +44,9 @@ // to print one. let errorDesc = new URLSearchParams(searchStr).get("error_description") if (errorDesc) { - - document.getElementById("errormsg").innerText = ` ("${errorDesc}")`; + document.getElementById("errormsg").innerText = errorDesc; } +{% endif %} - \ No newline at end of file + From db131b6b22350ea73128ac69ea82a48a6cf78a20 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 18:09:46 +0100 Subject: [PATCH 2/7] Change the default log config to reduce disk I/O and storage (#8040) * Change default log config to buffer by default. This batches up writes to the filesystem, which is more efficient for disk I/O. This means that it can take some time for logs to get written to disk. Note that ERROR logs (and above) immediately flush the buffer. This only effects new installs, as we only write the log config if started with `--generate-config` (in the same way we do for generating signing keys). * Default to keeping last 4 days of logs. This hopefully reduces the amount of logs kept for new servers. Keeping the last 1GB of logs is likely overkill for new servers, but equally may not be enough for busy ones. Instead, we keep the last four days worth of logs, enough so that admins can investigate any problems that happened over e.g. a long weekend. --- changelog.d/8040.misc | 1 + docs/sample_log_config.yaml | 41 ++++++++++++++++++++++++++++++++----- synapse/config/logger.py | 41 ++++++++++++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 10 deletions(-) create mode 100644 changelog.d/8040.misc diff --git a/changelog.d/8040.misc b/changelog.d/8040.misc new file mode 100644 index 0000000000..a126151392 --- /dev/null +++ b/changelog.d/8040.misc @@ -0,0 +1 @@ +Change the default log config to reduce disk I/O and storage for new servers. diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml index 1a2739455e..403ac005ee 100644 --- a/docs/sample_log_config.yaml +++ b/docs/sample_log_config.yaml @@ -18,13 +18,29 @@ filters: handlers: file: - class: logging.handlers.RotatingFileHandler + class: logging.handlers.TimedRotatingFileHandler formatter: precise filename: /var/log/matrix-synapse/homeserver.log - maxBytes: 104857600 - backupCount: 10 - filters: [context] + when: midnight + backupCount: 3 # Does not include the current log file. encoding: utf8 + + # Default to buffering writes to log file for efficiency. This means that + # will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR + # logs will still be flushed immediately. + buffer: + class: logging.handlers.MemoryHandler + filters: [context] + target: file + # The capacity is the number of log lines that are buffered before + # being written to disk. Increasing this will lead to better + # performance, at the expensive of it taking longer for log lines to + # be written to disk. + capacity: 10 + flushLevel: 30 # Flush for WARNING logs as well + + # A handler that writes logs to stderr. Unused by default, but can be used + # instead of "buffer" and "file" in the logger handlers. console: class: logging.StreamHandler formatter: precise @@ -36,8 +52,23 @@ loggers: # information such as access tokens. level: INFO + twisted: + # We send the twisted logging directly to the file handler, + # to work around https://github.com/matrix-org/synapse/issues/3471 + # when using "buffer" logger. Use "console" to log to stderr instead. + handlers: [file] + propagate: false + root: level: INFO - handlers: [file, console] + + # Write logs to the `buffer` handler, which will buffer them together in memory, + # then write them to a file. + # + # Replace "buffer" with "console" to log to stderr instead. (Note that you'll + # also need to update the configuation for the `twisted` logger above, in + # this case.) + # + handlers: [buffer] disable_existing_loggers: false diff --git a/synapse/config/logger.py b/synapse/config/logger.py index dd775a97e8..493e98462d 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -62,13 +62,29 @@ filters: handlers: file: - class: logging.handlers.RotatingFileHandler + class: logging.handlers.TimedRotatingFileHandler formatter: precise filename: ${log_file} - maxBytes: 104857600 - backupCount: 10 - filters: [context] + when: midnight + backupCount: 3 # Does not include the current log file. encoding: utf8 + + # Default to buffering writes to log file for efficiency. This means that + # will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR + # logs will still be flushed immediately. + buffer: + class: logging.handlers.MemoryHandler + filters: [context] + target: file + # The capacity is the number of log lines that are buffered before + # being written to disk. Increasing this will lead to better + # performance, at the expensive of it taking longer for log lines to + # be written to disk. + capacity: 10 + flushLevel: 30 # Flush for WARNING logs as well + + # A handler that writes logs to stderr. Unused by default, but can be used + # instead of "buffer" and "file" in the logger handlers. console: class: logging.StreamHandler formatter: precise @@ -80,9 +96,24 @@ loggers: # information such as access tokens. level: INFO + twisted: + # We send the twisted logging directly to the file handler, + # to work around https://github.com/matrix-org/synapse/issues/3471 + # when using "buffer" logger. Use "console" to log to stderr instead. + handlers: [file] + propagate: false + root: level: INFO - handlers: [file, console] + + # Write logs to the `buffer` handler, which will buffer them together in memory, + # then write them to a file. + # + # Replace "buffer" with "console" to log to stderr instead. (Note that you'll + # also need to update the configuation for the `twisted` logger above, in + # this case.) + # + handlers: [buffer] disable_existing_loggers: false """ From a0f574f3c291374c11b7b1cac8fc7c4fbb6ef7b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 18:10:07 +0100 Subject: [PATCH 3/7] Reduce INFO logging (#8050) c.f. #8021 A lot of the code here is to change the `Completed 200 OK` logging to include the request URI so that we can drop the `Sending request...` log line. Some notes: 1. We won't log retries, which may be confusing considering the time taken log line includes retries and sleeps. 2. The `_send_request_with_optional_trailing_slash` will always be logged *without* the forward slash, even if it succeeded only with the forward slash. --- changelog.d/8050.misc | 1 + synapse/http/client.py | 2 +- .../federation/matrix_federation_agent.py | 2 +- synapse/http/matrixfederationclient.py | 94 ++++++++++++++----- 4 files changed, 74 insertions(+), 25 deletions(-) create mode 100644 changelog.d/8050.misc diff --git a/changelog.d/8050.misc b/changelog.d/8050.misc new file mode 100644 index 0000000000..cc8d1af7fa --- /dev/null +++ b/changelog.d/8050.misc @@ -0,0 +1 @@ +Reduce amount of outbound request logging at INFO level. diff --git a/synapse/http/client.py b/synapse/http/client.py index 529532a063..8aeb70cdec 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -297,7 +297,7 @@ class SimpleHttpClient(object): outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) - logger.info("Sending request %s %s", method, redact_uri(uri)) + logger.debug("Sending request %s %s", method, redact_uri(uri)) with start_active_span( "outgoing-client-request", diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 0c02648015..369bf9c2fc 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -247,7 +247,7 @@ class MatrixHostnameEndpoint(object): port = server.port try: - logger.info("Connecting to %s:%i", host.decode("ascii"), port) + logger.debug("Connecting to %s:%i", host.decode("ascii"), port) endpoint = HostnameEndpoint(self._reactor, host, port) if self._tls_options: endpoint = wrapClientTLS(self._tls_options, endpoint) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2a6373937a..738be43f46 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -29,10 +29,11 @@ from zope.interface import implementer from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError -from twisted.internet.interfaces import IReactorPluggableNameResolver +from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers +from twisted.web.iweb import IResponse import synapse.metrics import synapse.util.retryutils @@ -74,7 +75,7 @@ MAXINT = sys.maxsize _next_id = 1 -@attr.s +@attr.s(frozen=True) class MatrixFederationRequest(object): method = attr.ib() """HTTP method @@ -110,26 +111,52 @@ class MatrixFederationRequest(object): :type: str|None """ + uri = attr.ib(init=False, type=bytes) + """The URI of this request + """ + def __attrs_post_init__(self): global _next_id - self.txn_id = "%s-O-%s" % (self.method, _next_id) + txn_id = "%s-O-%s" % (self.method, _next_id) _next_id = (_next_id + 1) % (MAXINT - 1) + object.__setattr__(self, "txn_id", txn_id) + + destination_bytes = self.destination.encode("ascii") + path_bytes = self.path.encode("ascii") + if self.query: + query_bytes = encode_query_args(self.query) + else: + query_bytes = b"" + + # The object is frozen so we can pre-compute this. + uri = urllib.parse.urlunparse( + (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ) + object.__setattr__(self, "uri", uri) + def get_json(self): if self.json_callback: return self.json_callback() return self.json -async def _handle_json_response(reactor, timeout_sec, request, response): +async def _handle_json_response( + reactor: IReactorTime, + timeout_sec: float, + request: MatrixFederationRequest, + response: IResponse, + start_ms: int, +): """ Reads the JSON body of a response, with a timeout Args: - reactor (IReactor): twisted reactor, for the timeout - timeout_sec (float): number of seconds to wait for response to complete - request (MatrixFederationRequest): the request that triggered the response - response (IResponse): response to the request + reactor: twisted reactor, for the timeout + timeout_sec: number of seconds to wait for response to complete + request: the request that triggered the response + response: response to the request + start_ms: Timestamp when request was made Returns: dict: parsed JSON response @@ -143,23 +170,35 @@ async def _handle_json_response(reactor, timeout_sec, request, response): body = await make_deferred_yieldable(d) except TimeoutError as e: logger.warning( - "{%s} [%s] Timed out reading response", request.txn_id, request.destination, + "{%s} [%s] Timed out reading response - %s %s", + request.txn_id, + request.destination, + request.method, + request.uri.decode("ascii"), ) raise RequestSendFailed(e, can_retry=True) from e except Exception as e: logger.warning( - "{%s} [%s] Error reading response: %s", + "{%s} [%s] Error reading response %s %s: %s", request.txn_id, request.destination, + request.method, + request.uri.decode("ascii"), e, ) raise + + time_taken_secs = reactor.seconds() - start_ms / 1000 + logger.info( - "{%s} [%s] Completed: %d %s", + "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), + time_taken_secs, + request.method, + request.uri.decode("ascii"), ) return body @@ -261,7 +300,9 @@ class MatrixFederationHttpClient(object): # 'M_UNRECOGNIZED' which some endpoints can return when omitting a # trailing slash on Synapse <= v0.99.3. logger.info("Retrying request with trailing slash") - request.path += "/" + + # Request is frozen so we create a new instance + request = attr.evolve(request, path=request.path + "/") response = await self._send_request(request, **send_request_args) @@ -373,9 +414,7 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - url_bytes = urllib.parse.urlunparse( - (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") - ) + url_bytes = request.uri url_str = url_bytes.decode("ascii") url_to_sign_bytes = urllib.parse.urlunparse( @@ -402,7 +441,7 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers - logger.info( + logger.debug( "{%s} [%s] Sending request: %s %s; timeout %fs", request.txn_id, request.destination, @@ -436,7 +475,6 @@ class MatrixFederationHttpClient(object): except DNSLookupError as e: raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e except Exception as e: - logger.info("Failed to send request: %s", e) raise RequestSendFailed(e, can_retry=True) from e incoming_responses_counter.labels( @@ -496,7 +534,7 @@ class MatrixFederationHttpClient(object): break except RequestSendFailed as e: - logger.warning( + logger.info( "{%s} [%s] Request failed: %s %s: %s", request.txn_id, request.destination, @@ -654,6 +692,8 @@ class MatrixFederationHttpClient(object): json=data, ) + start_ms = self.clock.time_msec() + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, @@ -664,7 +704,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -720,6 +760,8 @@ class MatrixFederationHttpClient(object): method="POST", destination=destination, path=path, query=args, json=data ) + start_ms = self.clock.time_msec() + response = await self._send_request( request, long_retries=long_retries, @@ -733,7 +775,7 @@ class MatrixFederationHttpClient(object): _sec_timeout = self.default_timeout body = await _handle_json_response( - self.reactor, _sec_timeout, request, response + self.reactor, _sec_timeout, request, response, start_ms, ) return body @@ -786,6 +828,8 @@ class MatrixFederationHttpClient(object): method="GET", destination=destination, path=path, query=args ) + start_ms = self.clock.time_msec() + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, @@ -796,7 +840,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -846,6 +890,8 @@ class MatrixFederationHttpClient(object): method="DELETE", destination=destination, path=path, query=args ) + start_ms = self.clock.time_msec() + response = await self._send_request( request, long_retries=long_retries, @@ -854,7 +900,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -914,12 +960,14 @@ class MatrixFederationHttpClient(object): ) raise logger.info( - "{%s} [%s] Completed: %d %s [%d bytes]", + "{%s} [%s] Completed: %d %s [%d bytes] %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), length, + request.method, + request.uri.decode("ascii"), ) return (length, headers) From 0304ad0c3d79e44e78f9658e71f1e1533e3aa4e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2020 11:39:29 +0100 Subject: [PATCH 4/7] Move setting of Filter into code. We do this to prevent foot guns. The default config uses a MemoryFilter, but users are free to change to logging to files directly. If they do then they have to ensure to set the `filters: [context]` on the right handler, otherwise records get written with the wrong context. Instead we move the logic to happen when we generate a record, which is when we *log* rather than *handle*. (It's possible to add filters to loggers in the config, however they don't apply to descendant loggers and so they have to be manually set on *every* logger used in the code base) --- docker/conf/log.config | 6 ------ docs/sample_log_config.yaml | 7 ------- synapse/config/logger.py | 24 ++++++++++++++++-------- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/docker/conf/log.config b/docker/conf/log.config index ed418a57cd..491bbcc87a 100644 --- a/docker/conf/log.config +++ b/docker/conf/log.config @@ -4,16 +4,10 @@ formatters: precise: format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s' -filters: - context: - (): synapse.logging.context.LoggingContextFilter - request: "" - handlers: console: class: logging.StreamHandler formatter: precise - filters: [context] loggers: synapse.storage.SQL: diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml index 403ac005ee..55a48a9ed6 100644 --- a/docs/sample_log_config.yaml +++ b/docs/sample_log_config.yaml @@ -11,11 +11,6 @@ formatters: precise: format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s' -filters: - context: - (): synapse.logging.context.LoggingContextFilter - request: "" - handlers: file: class: logging.handlers.TimedRotatingFileHandler @@ -30,7 +25,6 @@ handlers: # logs will still be flushed immediately. buffer: class: logging.handlers.MemoryHandler - filters: [context] target: file # The capacity is the number of log lines that are buffered before # being written to disk. Increasing this will lead to better @@ -44,7 +38,6 @@ handlers: console: class: logging.StreamHandler formatter: precise - filters: [context] loggers: synapse.storage.SQL: diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 493e98462d..c96e6ef62a 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -55,11 +55,6 @@ formatters: format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - \ %(request)s - %(message)s' -filters: - context: - (): synapse.logging.context.LoggingContextFilter - request: "" - handlers: file: class: logging.handlers.TimedRotatingFileHandler @@ -74,7 +69,6 @@ handlers: # logs will still be flushed immediately. buffer: class: logging.handlers.MemoryHandler - filters: [context] target: file # The capacity is the number of log lines that are buffered before # being written to disk. Increasing this will lead to better @@ -88,7 +82,6 @@ handlers: console: class: logging.StreamHandler formatter: precise - filters: [context] loggers: synapse.storage.SQL: @@ -199,11 +192,26 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): handler = logging.StreamHandler() handler.setFormatter(formatter) - handler.addFilter(LoggingContextFilter(request="")) logger.addHandler(handler) else: logging.config.dictConfig(log_config) + # We add a log record factory that runs all messages through the + # LoggingContextFilter so that we get the context *at the time we log* + # rather than when we write to a handler. This can be done in config using + # filter options, but care must when using e.g. MemoryHandler to buffer + # writes. + + log_filter = LoggingContextFilter(request="") + old_factory = logging.getLogRecordFactory() + + def factory(*args, **kwargs): + record = old_factory(*args, **kwargs) + log_filter.filter(record) + return record + + logging.setLogRecordFactory(factory) + # Route Twisted's native logging through to the standard library logging # system. observer = STDLibLogObserver() From e1af09dccbb9d3a87728b8acb3b7425bba35a1fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2020 15:18:51 +0100 Subject: [PATCH 5/7] Newsfile --- changelog.d/8051.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8051.misc diff --git a/changelog.d/8051.misc b/changelog.d/8051.misc new file mode 100644 index 0000000000..7657b503c9 --- /dev/null +++ b/changelog.d/8051.misc @@ -0,0 +1 @@ +Remove the need to set `filters` in the logging config. This change is backwards compatible, as setting the filters explicitly still works. From a1e9bb9eae7f90e93e0ba02e84e8216e3c2f447a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 19:40:02 +0100 Subject: [PATCH 6/7] Add typing info to Notifier (#8058) --- changelog.d/8058.misc | 1 + synapse/handlers/events.py | 4 -- synapse/notifier.py | 131 +++++++++++++++++++++++-------------- synapse/server.pyi | 6 ++ tox.ini | 1 + 5 files changed, 91 insertions(+), 52 deletions(-) create mode 100644 changelog.d/8058.misc diff --git a/changelog.d/8058.misc b/changelog.d/8058.misc new file mode 100644 index 0000000000..41a27e5d72 --- /dev/null +++ b/changelog.d/8058.misc @@ -0,0 +1 @@ +Add type hints to `Notifier`. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 71a89f09c7..1924636c4d 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -57,13 +57,10 @@ class EventStreamHandler(BaseHandler): timeout=0, as_client_event=True, affect_presence=True, - only_keys=None, room_id=None, is_guest=False, ): """Fetches the events stream for a given user. - - If `only_keys` is not None, events from keys will be sent down. """ if room_id: @@ -93,7 +90,6 @@ class EventStreamHandler(BaseHandler): auth_user, pagin_config, timeout, - only_keys=only_keys, is_guest=is_guest, explicit_room_id=room_id, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 22ab4a9da5..694efe7116 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -15,7 +15,17 @@ import logging from collections import namedtuple -from typing import Callable, Iterable, List, TypeVar +from typing import ( + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, +) from prometheus_client import Counter @@ -24,12 +34,14 @@ from twisted.internet import defer import synapse.server from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import PreserveLoggingContext from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import StreamToken +from synapse.streams.config import PaginationConfig +from synapse.types import Collection, StreamToken, UserID from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -77,7 +89,13 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user_id, rooms, current_token, time_now_ms): + def __init__( + self, + user_id: str, + rooms: Collection[str], + current_token: StreamToken, + time_now_ms: int, + ): self.user_id = user_id self.rooms = set(rooms) self.current_token = current_token @@ -93,13 +111,13 @@ class _NotifierUserStream(object): with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) - def notify(self, stream_key, stream_id, time_now_ms): + def notify(self, stream_key: str, stream_id: int, time_now_ms: int): """Notify any listeners for this user of a new event from an event source. Args: - stream_key(str): The stream the event came from. - stream_id(str): The new id for the stream the event came from. - time_now_ms(int): The current time in milliseconds. + stream_key: The stream the event came from. + stream_id: The new id for the stream the event came from. + time_now_ms: The current time in milliseconds. """ self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) self.last_notified_token = self.current_token @@ -112,7 +130,7 @@ class _NotifierUserStream(object): self.notify_deferred = ObservableDeferred(defer.Deferred()) noify_deferred.callback(self.current_token) - def remove(self, notifier): + def remove(self, notifier: "Notifier"): """ Remove this listener from all the indexes in the Notifier it knows about. """ @@ -123,10 +141,10 @@ class _NotifierUserStream(object): notifier.user_to_user_stream.pop(self.user_id) - def count_listeners(self): + def count_listeners(self) -> int: return len(self.notify_deferred.observers()) - def new_listener(self, token): + def new_listener(self, token: StreamToken) -> _NotificationListener: """Returns a deferred that is resolved when there is a new token greater than the given token. @@ -159,14 +177,16 @@ class Notifier(object): UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 def __init__(self, hs: "synapse.server.HomeServer"): - self.user_to_user_stream = {} - self.room_to_user_streams = {} + self.user_to_user_stream = {} # type: Dict[str, _NotifierUserStream] + self.room_to_user_streams = {} # type: Dict[str, Set[_NotifierUserStream]] self.hs = hs self.storage = hs.get_storage() self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() - self.pending_new_room_events = [] + self.pending_new_room_events = ( + [] + ) # type: List[Tuple[int, EventBase, Collection[str]]] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -178,10 +198,9 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.federation_sender = None if hs.should_send_federation(): self.federation_sender = hs.get_federation_sender() - else: - self.federation_sender = None self.state_handler = hs.get_state_handler() @@ -193,12 +212,12 @@ class Notifier(object): # when rendering the metrics page, which is likely once per minute at # most when scraping it. def count_listeners(): - all_user_streams = set() + all_user_streams = set() # type: Set[_NotifierUserStream] - for x in list(self.room_to_user_streams.values()): - all_user_streams |= x - for x in list(self.user_to_user_stream.values()): - all_user_streams.add(x) + for streams in list(self.room_to_user_streams.values()): + all_user_streams |= streams + for stream in list(self.user_to_user_stream.values()): + all_user_streams.add(stream) return sum(stream.count_listeners() for stream in all_user_streams) @@ -223,7 +242,11 @@ class Notifier(object): self.replication_callbacks.append(cb) def on_new_room_event( - self, event, room_stream_id, max_room_stream_id, extra_users=[] + self, + event: EventBase, + room_stream_id: int, + max_room_stream_id: int, + extra_users: Collection[str] = [], ): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -241,11 +264,11 @@ class Notifier(object): self.notify_replication() - def _notify_pending_new_room_events(self, max_room_stream_id): + def _notify_pending_new_room_events(self, max_room_stream_id: int): """Notify for the room events that were queued waiting for a previous event to be persisted. Args: - max_room_stream_id(int): The highest stream_id below which all + max_room_stream_id: The highest stream_id below which all events have been persisted. """ pending = self.pending_new_room_events @@ -258,7 +281,9 @@ class Notifier(object): else: self._on_new_room_event(event, room_stream_id, extra_users) - def _on_new_room_event(self, event, room_stream_id, extra_users=[]): + def _on_new_room_event( + self, event: EventBase, room_stream_id: int, extra_users: Collection[str] = [] + ): """Notify any user streams that are interested in this room event""" # poke any interested application service. run_as_background_process( @@ -275,13 +300,19 @@ class Notifier(object): "room_key", room_stream_id, users=extra_users, rooms=[event.room_id] ) - async def _notify_app_services(self, room_stream_id): + async def _notify_app_services(self, room_stream_id: int): try: await self.appservice_handler.notify_interested_services(room_stream_id) except Exception: logger.exception("Error notifying application services of event") - def on_new_event(self, stream_key, new_token, users=[], rooms=[]): + def on_new_event( + self, + stream_key: str, + new_token: int, + users: Collection[str] = [], + rooms: Collection[str] = [], + ): """ Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. @@ -307,14 +338,19 @@ class Notifier(object): self.notify_replication() - def on_new_replication_data(self): + def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" self.notify_replication() async def wait_for_events( - self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START - ): + self, + user_id: str, + timeout: int, + callback: Callable[[StreamToken, StreamToken], Awaitable[T]], + room_ids=None, + from_token=StreamToken.START, + ) -> T: """Wait until the callback returns a non empty response or the timeout fires. """ @@ -377,19 +413,16 @@ class Notifier(object): async def get_events_for( self, - user, - pagination_config, - timeout, - only_keys=None, - is_guest=False, - explicit_room_id=None, - ): + user: UserID, + pagination_config: PaginationConfig, + timeout: int, + is_guest: bool = False, + explicit_room_id: str = None, + ) -> EventStreamResult: """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. - If `only_keys` is not None, events from keys will be sent down. - If explicit_room_id is not set, the user's joined rooms will be polled for events. If explicit_room_id is set, that room will be polled for events only if @@ -404,11 +437,13 @@ class Notifier(object): room_ids, is_joined = await self._get_room_ids(user, explicit_room_id) is_peeking = not is_joined - async def check_for_updates(before_token, after_token): + async def check_for_updates( + before_token: StreamToken, after_token: StreamToken + ) -> EventStreamResult: if not after_token.is_after(before_token): return EventStreamResult([], (from_token, from_token)) - events = [] + events = [] # type: List[EventBase] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -417,8 +452,6 @@ class Notifier(object): after_id = getattr(after_token, keyname) if before_id == after_id: continue - if only_keys and name not in only_keys: - continue new_events, new_key = await source.get_new_events( user=user, @@ -476,7 +509,9 @@ class Notifier(object): return result - async def _get_room_ids(self, user, explicit_room_id): + async def _get_room_ids( + self, user: UserID, explicit_room_id: Optional[str] + ) -> Tuple[Collection[str], bool]: joined_room_ids = await self.store.get_rooms_for_user(user.to_string()) if explicit_room_id: if explicit_room_id in joined_room_ids: @@ -486,7 +521,7 @@ class Notifier(object): raise AuthError(403, "Non-joined access not allowed") return joined_room_ids, True - async def _is_world_readable(self, room_id): + async def _is_world_readable(self, room_id: str) -> bool: state = await self.state_handler.get_current_state( room_id, EventTypes.RoomHistoryVisibility, "" ) @@ -496,7 +531,7 @@ class Notifier(object): return False @log_function - def remove_expired_streams(self): + def remove_expired_streams(self) -> None: time_now_ms = self.clock.time_msec() expired_streams = [] expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS @@ -510,21 +545,21 @@ class Notifier(object): expired_stream.remove(self) @log_function - def _register_with_keys(self, user_stream): + def _register_with_keys(self, user_stream: _NotifierUserStream): self.user_to_user_stream[user_stream.user_id] = user_stream for room in user_stream.rooms: s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - def _user_joined_room(self, user_id, room_id): + def _user_joined_room(self, user_id: str, room_id: str): new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) new_user_stream.rooms.add(room_id) - def notify_replication(self): + def notify_replication(self) -> None: """Notify the any replication listeners that there's a new event""" for cb in self.replication_callbacks: cb() diff --git a/synapse/server.pyi b/synapse/server.pyi index 1aba408c21..6a9bbb4713 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -31,8 +31,10 @@ import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage from synapse.events.builder import EventBuilderFactory +from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.typing import FollowerTypingHandler from synapse.replication.tcp.streams import Stream +from synapse.streams.events import EventSources class HomeServer(object): @property @@ -153,3 +155,7 @@ class HomeServer(object): pass def get_typing_handler(self) -> FollowerTypingHandler: pass + def get_event_sources(self) -> EventSources: + pass + def get_application_service_handler(self): + return ApplicationServicesHandler(self) diff --git a/tox.ini b/tox.ini index 9a052c1e33..54f28990ce 100644 --- a/tox.ini +++ b/tox.ini @@ -198,6 +198,7 @@ commands = mypy \ synapse/logging/ \ synapse/metrics \ synapse/module_api \ + synapse/notifier.py \ synapse/push/pusherpool.py \ synapse/push/push_rule_evaluator.py \ synapse/replication \ From 3c796e4159601d93df53000806f95bb42f9275a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 21:08:43 +0100 Subject: [PATCH 7/7] Update changelog.d/8051.misc Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/8051.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/8051.misc b/changelog.d/8051.misc index 7657b503c9..9e472cd481 100644 --- a/changelog.d/8051.misc +++ b/changelog.d/8051.misc @@ -1 +1 @@ -Remove the need to set `filters` in the logging config. This change is backwards compatible, as setting the filters explicitly still works. +It is no longer necessary to explicitly define `filters` in the logging configuration. (Continuing to do so is redundant but harmless.)