From f7f07dc517941d628dcb362a7511712728912141 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Jun 2015 15:48:52 +0100 Subject: [PATCH 01/36] Begin changing the config format --- synapse/config/server.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/synapse/config/server.py b/synapse/config/server.py index d0c8fb8f3c..022ebcea94 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -61,14 +61,6 @@ class ServerConfig(Config): # e.g. matrix.org, localhost:8080, etc. server_name: "%(server_name)s" - # The port to listen for HTTPS requests on. - # For when matrix traffic is sent directly to synapse. - bind_port: %(bind_port)s - - # The port to listen for HTTP requests on. - # For when matrix traffic passes through loadbalancer that unwraps TLS. - unsecure_port: %(unsecure_port)s - # Local interface to listen on. # The empty string will cause synapse to listen on all interfaces. bind_host: "" @@ -92,6 +84,30 @@ class ServerConfig(Config): # This should be disabled if running synapse behind a load balancer # that can do automatic compression. gzip_responses: True + + listeners: + # For when matrix traffic is sent directly to synapse. + secure: + # The type of + type: http_resource + + # The port to listen for HTTPS requests on. + port: %(bind_port)s + + # Is this a TLS socket? + tls: true + + # Local interface to listen on. + # The empty string will cause synapse to listen on all interfaces. + bind_address: "" + + # For when matrix traffic passes through loadbalancer that unwraps TLS. + unsecure: + port: %(unsecure_port)s + tls: false + bind_address: "" + + """ % locals() def read_arguments(self, args): From c42ed47660e99026d50de833b0b06c75a3b25d37 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jun 2015 11:52:52 +0100 Subject: [PATCH 02/36] Fix up create_resource_tree --- synapse/app/homeserver.py | 204 +++++++++++++++++++------------------- 1 file changed, 100 insertions(+), 104 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 65a5dfa84e..79c7a8558d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -145,109 +145,6 @@ class SynapseHomeServer(HomeServer): **self.db_config.get("args", {}) ) - def create_resource_tree(self, redirect_root_to_web_client): - """Create the resource tree for this Home Server. - - This in unduly complicated because Twisted does not support putting - child resources more than 1 level deep at a time. - - Args: - web_client (bool): True to enable the web client. - redirect_root_to_web_client (bool): True to redirect '/' to the - location of the web client. This does nothing if web_client is not - True. - """ - config = self.get_config() - web_client = config.web_client - - # list containing (path_str, Resource) e.g: - # [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ] - desired_tree = [ - (CLIENT_PREFIX, self.get_resource_for_client()), - (CLIENT_V2_ALPHA_PREFIX, self.get_resource_for_client_v2_alpha()), - (FEDERATION_PREFIX, self.get_resource_for_federation()), - (CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()), - (SERVER_KEY_PREFIX, self.get_resource_for_server_key()), - (SERVER_KEY_V2_PREFIX, self.get_resource_for_server_key_v2()), - (MEDIA_PREFIX, self.get_resource_for_media_repository()), - (STATIC_PREFIX, self.get_resource_for_static_content()), - ] - - if web_client: - logger.info("Adding the web client.") - desired_tree.append((WEB_CLIENT_PREFIX, - self.get_resource_for_web_client())) - - if web_client and redirect_root_to_web_client: - self.root_resource = RootRedirect(WEB_CLIENT_PREFIX) - else: - self.root_resource = Resource() - - metrics_resource = self.get_resource_for_metrics() - if config.metrics_port is None and metrics_resource is not None: - desired_tree.append((METRICS_PREFIX, metrics_resource)) - - # ideally we'd just use getChild and putChild but getChild doesn't work - # unless you give it a Request object IN ADDITION to the name :/ So - # instead, we'll store a copy of this mapping so we can actually add - # extra resources to existing nodes. See self._resource_id for the key. - resource_mappings = {} - for full_path, res in desired_tree: - logger.info("Attaching %s to path %s", res, full_path) - last_resource = self.root_resource - for path_seg in full_path.split('/')[1:-1]: - if path_seg not in last_resource.listNames(): - # resource doesn't exist, so make a "dummy resource" - child_resource = Resource() - last_resource.putChild(path_seg, child_resource) - res_id = self._resource_id(last_resource, path_seg) - resource_mappings[res_id] = child_resource - last_resource = child_resource - else: - # we have an existing Resource, use that instead. - res_id = self._resource_id(last_resource, path_seg) - last_resource = resource_mappings[res_id] - - # =========================== - # now attach the actual desired resource - last_path_seg = full_path.split('/')[-1] - - # if there is already a resource here, thieve its children and - # replace it - res_id = self._resource_id(last_resource, last_path_seg) - if res_id in resource_mappings: - # there is a dummy resource at this path already, which needs - # to be replaced with the desired resource. - existing_dummy_resource = resource_mappings[res_id] - for child_name in existing_dummy_resource.listNames(): - child_res_id = self._resource_id(existing_dummy_resource, - child_name) - child_resource = resource_mappings[child_res_id] - # steal the children - res.putChild(child_name, child_resource) - - # finally, insert the desired resource in the right place - last_resource.putChild(last_path_seg, res) - res_id = self._resource_id(last_resource, last_path_seg) - resource_mappings[res_id] = res - - return self.root_resource - - def _resource_id(self, resource, path_seg): - """Construct an arbitrary resource ID so you can retrieve the mapping - later. - - If you want to represent resource A putChild resource B with path C, - the mapping should looks like _resource_id(A,C) = B. - - Args: - resource (Resource): The *parent* Resource - path_seg (str): The name of the child Resource to be attached. - Returns: - str: A unique string which can be a key to the child Resource. - """ - return "%s-%s" % (resource, path_seg) - def start_listening(self): config = self.get_config() @@ -447,7 +344,26 @@ def setup(config_options): database_engine=database_engine, ) - hs.create_resource_tree( + resources = { + CLIENT_PREFIX: hs.get_resource_for_client(), + CLIENT_V2_ALPHA_PREFIX: hs.get_resource_for_client_v2_alpha(), + FEDERATION_PREFIX: hs.get_resource_for_federation(), + CONTENT_REPO_PREFIX: hs.get_resource_for_content_repo(), + SERVER_KEY_PREFIX: hs.get_resource_for_server_key(), + SERVER_KEY_V2_PREFIX: hs.get_resource_for_server_key_v2(), + MEDIA_PREFIX: hs.get_resource_for_media_repository(), + STATIC_PREFIX: hs.get_resource_for_static_content(), + } + + if config.web_client: + resources[WEB_CLIENT_PREFIX] = hs.get_resource_for_web_client() + + metrics_resource = hs.get_resource_for_metrics() + if config.metrics_port is None and metrics_resource is not None: + resources[METRICS_PREFIX] = metrics_resource + + hs.root_resource = create_resource_tree( + resources, redirect_root_to_web_client=True, ) @@ -525,6 +441,86 @@ class SynapseSite(Site): self.access_logger.info(line) +def create_resource_tree(desired_tree, redirect_root_to_web_client=True): + """Create the resource tree for this Home Server. + + This in unduly complicated because Twisted does not support putting + child resources more than 1 level deep at a time. + + Args: + web_client (bool): True to enable the web client. + redirect_root_to_web_client (bool): True to redirect '/' to the + location of the web client. This does nothing if web_client is not + True. + """ + if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree: + root_resource = RootRedirect(WEB_CLIENT_PREFIX) + else: + root_resource = Resource() + + # ideally we'd just use getChild and putChild but getChild doesn't work + # unless you give it a Request object IN ADDITION to the name :/ So + # instead, we'll store a copy of this mapping so we can actually add + # extra resources to existing nodes. See self._resource_id for the key. + resource_mappings = {} + for full_path, res in desired_tree.items(): + logger.info("Attaching %s to path %s", res, full_path) + last_resource = root_resource + for path_seg in full_path.split('/')[1:-1]: + if path_seg not in last_resource.listNames(): + # resource doesn't exist, so make a "dummy resource" + child_resource = Resource() + last_resource.putChild(path_seg, child_resource) + res_id = _resource_id(last_resource, path_seg) + resource_mappings[res_id] = child_resource + last_resource = child_resource + else: + # we have an existing Resource, use that instead. + res_id = _resource_id(last_resource, path_seg) + last_resource = resource_mappings[res_id] + + # =========================== + # now attach the actual desired resource + last_path_seg = full_path.split('/')[-1] + + # if there is already a resource here, thieve its children and + # replace it + res_id = _resource_id(last_resource, last_path_seg) + if res_id in resource_mappings: + # there is a dummy resource at this path already, which needs + # to be replaced with the desired resource. + existing_dummy_resource = resource_mappings[res_id] + for child_name in existing_dummy_resource.listNames(): + child_res_id = _resource_id(existing_dummy_resource, + child_name) + child_resource = resource_mappings[child_res_id] + # steal the children + res.putChild(child_name, child_resource) + + # finally, insert the desired resource in the right place + last_resource.putChild(last_path_seg, res) + res_id = _resource_id(last_resource, last_path_seg) + resource_mappings[res_id] = res + + return root_resource + + +def _resource_id(resource, path_seg): + """Construct an arbitrary resource ID so you can retrieve the mapping + later. + + If you want to represent resource A putChild resource B with path C, + the mapping should looks like _resource_id(A,C) = B. + + Args: + resource (Resource): The *parent* Resource + path_seg (str): The name of the child Resource to be attached. + Returns: + str: A unique string which can be a key to the child Resource. + """ + return "%s-%s" % (resource, path_seg) + + def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: From fd2c07bfed9182f4fc3ccf50b4390645fe038da4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jun 2015 15:33:07 +0100 Subject: [PATCH 03/36] Use config.listeners --- synapse/app/homeserver.py | 150 +++++++++++++++++++++----------------- synapse/config/server.py | 108 ++++++++++++++++++++------- 2 files changed, 165 insertions(+), 93 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 79c7a8558d..4228bac673 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -87,16 +87,10 @@ class SynapseHomeServer(HomeServer): return MatrixFederationHttpClient(self) def build_resource_for_client(self): - res = ClientV1RestResource(self) - if self.config.gzip_responses: - res = gz_wrap(res) - return res + return ClientV1RestResource(self) def build_resource_for_client_v2_alpha(self): - res = ClientV2AlphaRestResource(self) - if self.config.gzip_responses: - res = gz_wrap(res) - return res + return ClientV2AlphaRestResource(self) def build_resource_for_federation(self): return JsonResource(self) @@ -145,49 +139,102 @@ class SynapseHomeServer(HomeServer): **self.db_config.get("args", {}) ) - def start_listening(self): - config = self.get_config() + def _listener_http(self, config, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + tls = listener_config.get("tls", False) - if not config.no_tls and config.bind_port is not None: + if tls and config.no_tls: + return + + metrics_resource = self.get_resource_for_metrics() + + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "client": + if res["compress"]: + client_v1 = gz_wrap(self.get_resource_for_client()) + client_v2 = gz_wrap(self.get_resource_for_client_v2_alpha()) + else: + client_v1 = self.get_resource_for_client() + client_v2 = self.get_resource_for_client_v2_alpha() + + resources.update({ + CLIENT_PREFIX: client_v1, + CLIENT_V2_ALPHA_PREFIX: client_v2, + }) + + if name == "federation": + resources.update({ + FEDERATION_PREFIX: self.get_resource_for_federation(), + }) + + if name in ["static", "client"]: + resources.update({ + STATIC_PREFIX: self.get_resource_for_static_content(), + }) + + if name in ["media", "federation", "client"]: + resources.update({ + MEDIA_PREFIX: self.get_resource_for_media_repository(), + CONTENT_REPO_PREFIX: self.get_resource_for_content_repo(), + }) + + if name in ["keys", "federation"]: + resources.update({ + SERVER_KEY_PREFIX: self.get_resource_for_server_key(), + SERVER_KEY_V2_PREFIX: self.get_resource_for_server_key_v2(), + }) + + if name == "webclient": + resources[WEB_CLIENT_PREFIX] = self.get_resource_for_web_client() + + if name == "metrics" and metrics_resource: + resources[METRICS_PREFIX] = metrics_resource + + root_resource = create_resource_tree(resources) + if tls: reactor.listenSSL( - config.bind_port, + port, SynapseSite( "synapse.access.https", config, - self.root_resource, + root_resource, ), self.tls_context_factory, - interface=config.bind_host + interface=bind_address ) - logger.info("Synapse now listening on port %d", config.bind_port) - - if config.unsecure_port is not None: + else: reactor.listenTCP( - config.unsecure_port, + port, SynapseSite( - "synapse.access.http", + "synapse.access.https", config, - self.root_resource, + root_resource, ), - interface=config.bind_host + interface=bind_address ) - logger.info("Synapse now listening on port %d", config.unsecure_port) + logger.info("Synapse now listening on port %d", port) - metrics_resource = self.get_resource_for_metrics() - if metrics_resource and config.metrics_port is not None: - reactor.listenTCP( - config.metrics_port, - SynapseSite( - "synapse.access.metrics", - config, - metrics_resource, - ), - interface=config.metrics_bind_host, - ) - logger.info( - "Metrics now running on %s port %d", - config.metrics_bind_host, config.metrics_port, - ) + def start_listening(self): + config = self.get_config() + + for listener in config.listeners: + if listener["type"] == "http": + self._listener_http(config, listener) + elif listener["type"] == "manhole": + f = twisted.manhole.telnet.ShellFactory() + f.username = "matrix" + f.password = "rabbithole" + f.namespace['hs'] = self + reactor.listenTCP( + listener["port"], + f, + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) def run_startup_checks(self, db_conn, database_engine): all_users_native = are_all_users_on_domain( @@ -322,11 +369,6 @@ def setup(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - if re.search(":[0-9]+$", config.server_name): - domain_with_port = config.server_name - else: - domain_with_port = "%s:%s" % (config.server_name, config.bind_port) - tls_context_factory = context_factory.ServerContextFactory(config) database_engine = create_engine(config.database_config["name"]) @@ -334,7 +376,6 @@ def setup(config_options): hs = SynapseHomeServer( config.server_name, - domain_with_port=domain_with_port, upload_dir=os.path.abspath("uploads"), db_config=config.database_config, tls_context_factory=tls_context_factory, @@ -344,29 +385,6 @@ def setup(config_options): database_engine=database_engine, ) - resources = { - CLIENT_PREFIX: hs.get_resource_for_client(), - CLIENT_V2_ALPHA_PREFIX: hs.get_resource_for_client_v2_alpha(), - FEDERATION_PREFIX: hs.get_resource_for_federation(), - CONTENT_REPO_PREFIX: hs.get_resource_for_content_repo(), - SERVER_KEY_PREFIX: hs.get_resource_for_server_key(), - SERVER_KEY_V2_PREFIX: hs.get_resource_for_server_key_v2(), - MEDIA_PREFIX: hs.get_resource_for_media_repository(), - STATIC_PREFIX: hs.get_resource_for_static_content(), - } - - if config.web_client: - resources[WEB_CLIENT_PREFIX] = hs.get_resource_for_web_client() - - metrics_resource = hs.get_resource_for_metrics() - if config.metrics_port is None and metrics_resource is not None: - resources[METRICS_PREFIX] = metrics_resource - - hs.root_resource = create_resource_tree( - resources, - redirect_root_to_web_client=True, - ) - logger.info("Preparing database: %r...", config.database_config) try: diff --git a/synapse/config/server.py b/synapse/config/server.py index 022ebcea94..26017c7efa 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -20,26 +20,73 @@ class ServerConfig(Config): def read_config(self, config): self.server_name = config["server_name"] - self.bind_port = config["bind_port"] - self.bind_host = config["bind_host"] - self.unsecure_port = config["unsecure_port"] self.manhole = config.get("manhole") self.pid_file = self.abspath(config.get("pid_file")) self.web_client = config["web_client"] self.soft_file_limit = config["soft_file_limit"] self.daemonize = config.get("daemonize") self.use_frozen_dicts = config.get("use_frozen_dicts", True) - self.gzip_responses = config["gzip_responses"] + + self.listeners = config.get("listeners", []) + + bind_port = config.get("bind_port") + if bind_port: + self.listeners = [] + bind_host = config.get("bind_host", "") + gzip_responses = config.get("gzip_responses", True) + + self.listeners.append({ + "port": bind_port, + "bind_address": bind_host, + "tls": True, + "type": "http", + "resources": [ + { + "names": ["client", "webclient"], + "compress": gzip_responses, + }, + { + "names": ["federation"], + "compress": False, + } + ] + }) + + unsecure_port = config.get("unsecure_port", bind_port - 400) + if unsecure_port: + self.listeners.append({ + "port": unsecure_port, + "bind_address": bind_host, + "tls": False, + "type": "http", + "resources": [ + { + "names": ["client", "webclient"], + "compress": gzip_responses, + }, + { + "names": ["federation"], + "compress": False, + } + ] + }) # Attempt to guess the content_addr for the v0 content repostitory content_addr = config.get("content_addr") if not content_addr: + for listener in self.listeners: + if listener["type"] == "http" and not listener.get("tls", False): + unsecure_port = listener["port"] + break + else: + raise RuntimeError("Could not determine 'content_addr'") + host = self.server_name if ':' not in host: - host = "%s:%d" % (host, self.unsecure_port) + host = "%s:%d" % (host, unsecure_port) else: host = host.split(':')[0] - host = "%s:%d" % (host, self.unsecure_port) + host = "%s:%d" % (host, unsecure_port) content_addr = "http://%s" % (host,) self.content_addr = content_addr @@ -61,9 +108,17 @@ class ServerConfig(Config): # e.g. matrix.org, localhost:8080, etc. server_name: "%(server_name)s" + # The port to listen for HTTPS requests on. + # For when matrix traffic is sent directly to synapse. + # bind_port: %(bind_port)s + + # The port to listen for HTTP requests on. + # For when matrix traffic passes through loadbalancer that unwraps TLS. + # unsecure_port: %(unsecure_port)s + # Local interface to listen on. # The empty string will cause synapse to listen on all interfaces. - bind_host: "" + # bind_host: "" # When running as a daemon, the file to store the pid in pid_file: %(pid_file)s @@ -83,31 +138,30 @@ class ServerConfig(Config): # Should synapse compress HTTP responses to clients that support it? # This should be disabled if running synapse behind a load balancer # that can do automatic compression. - gzip_responses: True + # gzip_responses: True listeners: - # For when matrix traffic is sent directly to synapse. - secure: - # The type of - type: http_resource + - port: %(unsecure_port)s + tls: false + bind_address: '' + type: http - # The port to listen for HTTPS requests on. - port: %(bind_port)s - - # Is this a TLS socket? - tls: true - - # Local interface to listen on. - # The empty string will cause synapse to listen on all interfaces. - bind_address: "" - - # For when matrix traffic passes through loadbalancer that unwraps TLS. - unsecure: - port: %(unsecure_port)s - tls: false - bind_address: "" + resources: + - names: [client, webclient] + compress: true + - names: [federation] + compress: false + - port: %(bind_port)s + tls: true + bind_address: '' + type: http + resources: + - names: [client, webclient] + compress: true + - names: [federation] + compress: false """ % locals() def read_arguments(self, args): From 9c5fc81c2ddd29eac62d368e7f8d24972f8894a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jun 2015 17:13:23 +0100 Subject: [PATCH 04/36] Correctly handle x_forwaded listener option --- synapse/app/homeserver.py | 31 +++++++++++++++++++++++++++---- synapse/config/server.py | 2 ++ synapse/server.py | 12 ++---------- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 4228bac673..12da0bc4b5 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -34,7 +34,7 @@ from twisted.application import service from twisted.enterprise import adbapi from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File -from twisted.web.server import Site, GzipEncoderFactory +from twisted.web.server import Site, GzipEncoderFactory, Request from twisted.web.http import proxiedLogFormatter, combinedLogFormatter from synapse.http.server import JsonResource, RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource @@ -199,7 +199,7 @@ class SynapseHomeServer(HomeServer): port, SynapseSite( "synapse.access.https", - config, + listener_config, root_resource, ), self.tls_context_factory, @@ -210,7 +210,7 @@ class SynapseHomeServer(HomeServer): port, SynapseSite( "synapse.access.https", - config, + listener_config, root_resource, ), interface=bind_address @@ -441,6 +441,28 @@ class SynapseService(service.Service): return self._port.stopListening() +class XForwardedForRequest(Request): + def __init__(self, *args, **kw): + Request.__init__(self, *args, **kw) + + """ + Add a layer on top of another request that only uses the value of an + X-Forwarded-For header as the result of C{getClientIP}. + """ + def getClientIP(self): + """ + @return: The client address (the first address) in the value of the + I{X-Forwarded-For header}. If the header is not present, return + C{b"-"}. + """ + return self.requestHeaders.getRawHeaders( + b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() + + +def XForwardedFactory(*args, **kwargs): + return XForwardedForRequest(*args, **kwargs) + + class SynapseSite(Site): """ Subclass of a twisted http Site that does access logging with python's @@ -448,7 +470,8 @@ class SynapseSite(Site): """ def __init__(self, logger_name, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) - if config.captcha_ip_origin_is_x_forwarded: + if config.get("x_forwarded", False): + self.requestFactory = XForwardedFactory self._log_formatter = proxiedLogFormatter else: self._log_formatter = combinedLogFormatter diff --git a/synapse/config/server.py b/synapse/config/server.py index 26017c7efa..9dab167b21 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -157,6 +157,8 @@ class ServerConfig(Config): bind_address: '' type: http + x_forwarded: False + resources: - names: [client, webclient] compress: true diff --git a/synapse/server.py b/synapse/server.py index 8b3dc675cc..4d1fb1cbf6 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -132,16 +132,8 @@ class BaseHomeServer(object): setattr(BaseHomeServer, "get_%s" % (depname), _get) def get_ip_from_request(self, request): - # May be an X-Forwarding-For header depending on config - ip_addr = request.getClientIP() - if self.config.captcha_ip_origin_is_x_forwarded: - # use the header - if request.requestHeaders.hasHeader("X-Forwarded-For"): - ip_addr = request.requestHeaders.getRawHeaders( - "X-Forwarded-For" - )[0] - - return ip_addr + # X-Forwarded-For is handled by our custom request type. + return request.getClientIP() def is_mine(self, domain_specific_string): return domain_specific_string.domain == self.hostname From 942e39e87c735a6ead5375681ceea035a945fd7d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jun 2015 17:13:54 +0100 Subject: [PATCH 05/36] PEP8 --- synapse/app/homeserver.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 12da0bc4b5..91cc06a49e 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -63,7 +63,6 @@ import synapse import logging import os -import re import resource import subprocess @@ -532,8 +531,9 @@ def create_resource_tree(desired_tree, redirect_root_to_web_client=True): # to be replaced with the desired resource. existing_dummy_resource = resource_mappings[res_id] for child_name in existing_dummy_resource.listNames(): - child_res_id = _resource_id(existing_dummy_resource, - child_name) + child_res_id = _resource_id( + existing_dummy_resource, child_name + ) child_resource = resource_mappings[child_res_id] # steal the children res.putChild(child_name, child_resource) From 261ccd7f5f3dddc1c4538c2697ba00918120ddc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jun 2015 17:17:29 +0100 Subject: [PATCH 06/36] Fix tests --- tests/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/utils.py b/tests/utils.py index 3b5c335911..eb035cf48f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -114,6 +114,8 @@ class MockHttpResource(HttpServer): mock_request.method = http_method mock_request.uri = path + mock_request.getClientIP.return_value = "-" + mock_request.requestHeaders.getRawHeaders.return_value=[ "X-Matrix origin=test,key=,sig=" ] From a005b7269a9c1df517f6a01e72c1eea5f4cb0354 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jun 2015 17:41:36 +0100 Subject: [PATCH 07/36] Add backwards compat support for metrics, manhole and webclient config options --- synapse/app/homeserver.py | 7 ------- synapse/config/metrics.py | 6 ------ synapse/config/server.py | 30 +++++++++++++++++++++++++++--- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 91cc06a49e..95e9122d3e 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -408,13 +408,6 @@ def setup(config_options): logger.info("Database prepared in %r.", config.database_config) - if config.manhole: - f = twisted.manhole.telnet.ShellFactory() - f.username = "matrix" - f.password = "rabbithole" - f.namespace['hs'] = hs - reactor.listenTCP(config.manhole, f, interface='127.0.0.1') - hs.start_listening() hs.get_pusherpool().start() diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py index 0cfb30ce7f..ae5a691527 100644 --- a/synapse/config/metrics.py +++ b/synapse/config/metrics.py @@ -28,10 +28,4 @@ class MetricsConfig(Config): # Enable collection and rendering of performance metrics enable_metrics: False - - # Separate port to accept metrics requests on - # metrics_port: 8081 - - # Which host to bind the metric listener to - # metrics_bind_host: 127.0.0.1 """ diff --git a/synapse/config/server.py b/synapse/config/server.py index 9dab167b21..95bc967d0e 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -20,7 +20,6 @@ class ServerConfig(Config): def read_config(self, config): self.server_name = config["server_name"] - self.manhole = config.get("manhole") self.pid_file = self.abspath(config.get("pid_file")) self.web_client = config["web_client"] self.soft_file_limit = config["soft_file_limit"] @@ -35,6 +34,8 @@ class ServerConfig(Config): bind_host = config.get("bind_host", "") gzip_responses = config.get("gzip_responses", True) + names = ["client", "webclient"] if self.web_client else ["client"] + self.listeners.append({ "port": bind_port, "bind_address": bind_host, @@ -42,7 +43,7 @@ class ServerConfig(Config): "type": "http", "resources": [ { - "names": ["client", "webclient"], + "names": names, "compress": gzip_responses, }, { @@ -61,7 +62,7 @@ class ServerConfig(Config): "type": "http", "resources": [ { - "names": ["client", "webclient"], + "names": names, "compress": gzip_responses, }, { @@ -71,6 +72,29 @@ class ServerConfig(Config): ] }) + manhole = config.get("manhole") + if manhole: + self.listeners.append({ + "port": manhole, + "bind_address": "127.0.0.1", + "type": "manhole", + }) + + metrics_port = config.get("metrics_port") + if metrics_port: + self.listeners.append({ + "port": metrics_port, + "bind_address": config.get("metrics_bind_host", "127.0.0.1"), + "tls": False, + "type": "http", + "resources": [ + { + "names": ["metrics"], + "compress": False, + }, + ] + }) + # Attempt to guess the content_addr for the v0 content repostitory content_addr = config.get("content_addr") if not content_addr: From 186f61a3ac0810d991584c479ca4508c91252b87 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 11:25:53 +0100 Subject: [PATCH 08/36] Document listener config. Remove deprecated config options --- synapse/config/captcha.py | 8 ---- synapse/config/server.py | 82 +++++++++++++++++++++++---------------- 2 files changed, 48 insertions(+), 42 deletions(-) diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py index ba221121cb..cf72dc4340 100644 --- a/synapse/config/captcha.py +++ b/synapse/config/captcha.py @@ -21,10 +21,6 @@ class CaptchaConfig(Config): self.recaptcha_private_key = config["recaptcha_private_key"] self.recaptcha_public_key = config["recaptcha_public_key"] self.enable_registration_captcha = config["enable_registration_captcha"] - # XXX: This is used for more than just captcha - self.captcha_ip_origin_is_x_forwarded = ( - config["captcha_ip_origin_is_x_forwarded"] - ) self.captcha_bypass_secret = config.get("captcha_bypass_secret") self.recaptcha_siteverify_api = config["recaptcha_siteverify_api"] @@ -43,10 +39,6 @@ class CaptchaConfig(Config): # public/private key. enable_registration_captcha: False - # When checking captchas, use the X-Forwarded-For (XFF) header - # as the client IP and not the actual client IP. - captcha_ip_origin_is_x_forwarded: False - # A secret key used to bypass the captcha test entirely. #captcha_bypass_secret: "YOUR_SECRET_HERE" diff --git a/synapse/config/server.py b/synapse/config/server.py index 95bc967d0e..b5af387378 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -132,18 +132,6 @@ class ServerConfig(Config): # e.g. matrix.org, localhost:8080, etc. server_name: "%(server_name)s" - # The port to listen for HTTPS requests on. - # For when matrix traffic is sent directly to synapse. - # bind_port: %(bind_port)s - - # The port to listen for HTTP requests on. - # For when matrix traffic passes through loadbalancer that unwraps TLS. - # unsecure_port: %(unsecure_port)s - - # Local interface to listen on. - # The empty string will cause synapse to listen on all interfaces. - # bind_host: "" - # When running as a daemon, the file to store the pid in pid_file: %(pid_file)s @@ -155,39 +143,65 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 - # Turn on the twisted telnet manhole service on localhost on the given - # port. - #manhole: 9000 - - # Should synapse compress HTTP responses to clients that support it? - # This should be disabled if running synapse behind a load balancer - # that can do automatic compression. - # gzip_responses: True - + # List of ports that Synapse should listen on, their purpose and their + # configuration. listeners: - - port: %(unsecure_port)s + # Main HTTPS listener + # For when matrix traffic is sent directly to synapse. + - + # The port to listen for HTTPS requests on. + port: %(bind_port)s + + # Local interface to listen on. + # The empty string will cause synapse to listen on all interfaces. + bind_address: '' + + # This is a 'http' listener, allows us to specify 'resources'. + type: http + + tls: true + + # Use the X-Forwarded-For (XFF) header as the client IP and not the + # actual client IP. + x_forwarded: false + + # List of HTTP resources to serve on this listener. + resources: + - + # List of resources to host on this listener. + names: + - client # The client-server APIs, both v1 and v2 + - webclient # The bundled webclient. + + # Should synapse compress HTTP responses to clients that support it? + # This should be disabled if running synapse behind a load balancer + # that can do automatic compression. + compress: true + + - names: [federation] # Federation APIs + compress: false + + # Unsecure HTTP listener, + # For when matrix traffic passes through loadbalancer that unwraps TLS. + - + port: %(unsecure_port)s tls: false bind_address: '' type: http - resources: - - names: [client, webclient] - compress: true - - names: [federation] - compress: false - - - port: %(bind_port)s - tls: true - bind_address: '' - type: http - - x_forwarded: False + x_forwarded: false resources: - names: [client, webclient] compress: true - names: [federation] compress: false + + # Turn on the twisted telnet manhole service on localhost on the given + # port. + # - port: 9000 + # bind_address: 127.0.0.1 + # type: manhole """ % locals() def read_arguments(self, args): From 9d0326baa63f7e982d25f80abfc6fab09d3f25b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 11:27:29 +0100 Subject: [PATCH 09/36] Remove redundant newline --- synapse/config/server.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/config/server.py b/synapse/config/server.py index b5af387378..f4d4a87103 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -183,8 +183,7 @@ class ServerConfig(Config): # Unsecure HTTP listener, # For when matrix traffic passes through loadbalancer that unwraps TLS. - - - port: %(unsecure_port)s + - port: %(unsecure_port)s tls: false bind_address: '' type: http From 83f119a84a592ccf0d2dc06e699bcf11eb4380b0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 13:05:11 +0100 Subject: [PATCH 10/36] Log requests and responses sent via http.client --- synapse/http/client.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index e746f2416e..9091ae2d38 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -61,21 +61,31 @@ class SimpleHttpClient(object): self.agent = Agent(reactor, pool=pool) self.version_string = hs.version_string - def request(self, method, *args, **kwargs): + def request(self, method, uri, *args, **kwargs): # A small wrapper around self.agent.request() so we can easily attach # counters to it outgoing_requests_counter.inc(method) d = preserve_context_over_fn( self.agent.request, - method, *args, **kwargs + method, uri, *args, **kwargs ) + logger.info("Sending request %s %s", method, uri) + def _cb(response): incoming_responses_counter.inc(method, response.code) + logger.info( + "Received response to %s %s: %s", + method, uri, response.code + ) return response def _eb(failure): incoming_responses_counter.inc(method, "ERR") + logger.info( + "Error sending request to %s %s: %s %s", + method, uri, failure.type, failure.getErrorMessage() + ) return failure d.addCallbacks(_cb, _eb) @@ -84,7 +94,9 @@ class SimpleHttpClient(object): @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}): + # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) + query_bytes = urllib.urlencode(args, True) response = yield self.request( @@ -105,7 +117,7 @@ class SimpleHttpClient(object): def post_json_get_json(self, uri, post_json): json_str = encode_canonical_json(post_json) - logger.info("HTTP POST %s -> %s", json_str, uri) + logger.debug("HTTP POST %s -> %s", json_str, uri) response = yield self.request( "POST", From f00f8346f143dc306e184b6d479294ab11a4ff55 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 13:37:58 +0100 Subject: [PATCH 11/36] Make http.server request logging more verbose, but redact access_tokens --- synapse/http/server.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index ae8f3b3972..e6e8a59f6c 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -32,6 +32,7 @@ from twisted.web.util import redirectTo import collections import logging +import re import urllib logger = logging.getLogger(__name__) @@ -82,9 +83,18 @@ def request_handler(request_handler): code = None start = self.clock.time_msec() try: + request_uri = request.uri + + # Don't log access_tokens + request_uri = re.sub( + r'(\?.*access_token=)[^&]*(.*)$', + r'\1\2', + request_uri + ) + logger.info( - "Received request: %s %s", - request.method, request.path + "%s - Received request: %s %s", + request.getClientIP(), request.method, request_uri ) d = request_handler(self, request) with PreserveLoggingContext(): From b5209c57441d9e7bace28a03774d2605a6572514 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 16:36:49 +0100 Subject: [PATCH 12/36] Create SynapseRequest that overrides __repr__ to not print access_token --- synapse/app/homeserver.py | 49 ++++++++++++++++++++++++++++++++++----- synapse/http/server.py | 14 +++-------- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 95e9122d3e..7c1ad6bc13 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -63,6 +63,7 @@ import synapse import logging import os +import re import resource import subprocess @@ -433,9 +434,34 @@ class SynapseService(service.Service): return self._port.stopListening() -class XForwardedForRequest(Request): - def __init__(self, *args, **kw): +class SynapseRequest(Request): + def __init__(self, site_tag, *args, **kw): Request.__init__(self, *args, **kw) + self.site_tag = site_tag + self.authenticated_entity = None + + def __repr__(self): + # We overwrite this so that we don't log ``access_token`` + return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + self.__class__.__name__, + id(self), + self.method, + self.get_redacted_uri(), + self.clientproto, + self.site_tag, + ) + + def get_redacted_uri(self): + return re.sub( + r'(\?.*access_token=)[^&]*(.*)$', + r'\1\2', + self.uri + ) + + +class XForwardedForRequest(SynapseRequest): + def __init__(self, *args, **kw): + SynapseRequest.__init__(self, *args, **kw) """ Add a layer on top of another request that only uses the value of an @@ -451,8 +477,16 @@ class XForwardedForRequest(Request): b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() -def XForwardedFactory(*args, **kwargs): - return XForwardedForRequest(*args, **kwargs) +class SynapseRequestFactory(object): + def __init__(self, site_tag, x_forwarded_for): + self.site_tag = site_tag + self.x_forwarded_for = x_forwarded_for + + def __call__(self, *args, **kwargs): + if self.x_forwarded_for: + return XForwardedForRequest(self.site_tag, *args, **kwargs) + else: + return SynapseRequest(self.site_tag, *args, **kwargs) class SynapseSite(Site): @@ -462,8 +496,11 @@ class SynapseSite(Site): """ def __init__(self, logger_name, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) - if config.get("x_forwarded", False): - self.requestFactory = XForwardedFactory + + proxied = config.get("x_forwarded", False) + self.requestFactory = SynapseRequestFactory(None, proxied) + + if proxied: self._log_formatter = proxiedLogFormatter else: self._log_formatter = combinedLogFormatter diff --git a/synapse/http/server.py b/synapse/http/server.py index e6e8a59f6c..7f8b9dbb29 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -32,7 +32,6 @@ from twisted.web.util import redirectTo import collections import logging -import re import urllib logger = logging.getLogger(__name__) @@ -83,18 +82,11 @@ def request_handler(request_handler): code = None start = self.clock.time_msec() try: - request_uri = request.uri - - # Don't log access_tokens - request_uri = re.sub( - r'(\?.*access_token=)[^&]*(.*)$', - r'\1\2', - request_uri - ) - logger.info( "%s - Received request: %s %s", - request.getClientIP(), request.method, request_uri + request.getClientIP(), + request.method, + request.get_redacted_uri() ) d = request_handler(self, request) with PreserveLoggingContext(): From cee69441d3d3b4d966b6ec69c7dbf4eb3b876bb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 17:11:44 +0100 Subject: [PATCH 13/36] Log more when we have processed the request --- synapse/api/auth.py | 2 ++ synapse/app/homeserver.py | 10 ++++++++-- synapse/federation/transport/server.py | 1 + synapse/http/server.py | 14 ++++++++++++-- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d5bf0be85c..4da62e5d8d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -370,6 +370,8 @@ class Auth(object): user_agent=user_agent ) + request.authenticated_entity = user.to_string() + defer.returnValue((user, ClientInfo(device_id, token_id))) except KeyError: raise AuthError( diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7c1ad6bc13..fca6f06e3b 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -143,6 +143,7 @@ class SynapseHomeServer(HomeServer): port = listener_config["port"] bind_address = listener_config.get("bind_address", "") tls = listener_config.get("tls", False) + site_tag = listener_config.get("tag", port) if tls and config.no_tls: return @@ -199,6 +200,7 @@ class SynapseHomeServer(HomeServer): port, SynapseSite( "synapse.access.https", + site_tag, listener_config, root_resource, ), @@ -210,6 +212,7 @@ class SynapseHomeServer(HomeServer): port, SynapseSite( "synapse.access.https", + site_tag, listener_config, root_resource, ), @@ -458,6 +461,9 @@ class SynapseRequest(Request): self.uri ) + def get_user_agent(self): + return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + class XForwardedForRequest(SynapseRequest): def __init__(self, *args, **kw): @@ -494,11 +500,11 @@ class SynapseSite(Site): Subclass of a twisted http Site that does access logging with python's standard logging """ - def __init__(self, logger_name, config, resource, *args, **kwargs): + def __init__(self, logger_name, tag, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) proxied = config.get("x_forwarded", False) - self.requestFactory = SynapseRequestFactory(None, proxied) + self.requestFactory = SynapseRequestFactory(tag, proxied) if proxied: self._log_formatter = proxiedLogFormatter diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 31190e700a..bad93c6b2f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -94,6 +94,7 @@ class TransportLayerServer(object): yield self.keyring.verify_json_for_server(origin, json_request) logger.info("Request from %s", origin) + request.authenticated_entity = origin defer.returnValue((origin, content)) diff --git a/synapse/http/server.py b/synapse/http/server.py index 7f8b9dbb29..34645a371a 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -125,8 +125,18 @@ def request_handler(request_handler): code = str(code) if code else "-" end = self.clock.time_msec() logger.info( - "Processed request: %dms %s %s %s", - end-start, code, request.method, request.path + "%s - %s - {%s}" + " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", + request.getClientIP(), + request.site_tag, + request.authenticated_entity, + end-start, + request.sentLength, + code, + request.method, + request.get_redacted_uri(), + request.clientproto, + request.get_user_agent(), ) return wrapped_request_handler From aaa749d366f768dd164f899c1d8e5eedd44ee5f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 18:18:05 +0100 Subject: [PATCH 14/36] Disable twisted access logging. Move access logging to SynapseRequest object --- synapse/app/homeserver.py | 64 ++++++++++++++++++++-------- synapse/http/server.py | 90 ++++++++++++++------------------------- 2 files changed, 79 insertions(+), 75 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fca6f06e3b..7effedf7dc 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -35,7 +35,6 @@ from twisted.enterprise import adbapi from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File from twisted.web.server import Site, GzipEncoderFactory, Request -from twisted.web.http import proxiedLogFormatter, combinedLogFormatter from synapse.http.server import JsonResource, RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource @@ -61,11 +60,13 @@ import twisted.manhole.telnet import synapse +import contextlib import logging import os import re import resource import subprocess +import time logger = logging.getLogger("synapse.app.homeserver") @@ -438,10 +439,11 @@ class SynapseService(service.Service): class SynapseRequest(Request): - def __init__(self, site_tag, *args, **kw): + def __init__(self, site, *args, **kw): Request.__init__(self, *args, **kw) - self.site_tag = site_tag + self.site = site self.authenticated_entity = None + self.start_time = 0 def __repr__(self): # We overwrite this so that we don't log ``access_token`` @@ -451,7 +453,7 @@ class SynapseRequest(Request): self.method, self.get_redacted_uri(), self.clientproto, - self.site_tag, + self.site.site_tag, ) def get_redacted_uri(self): @@ -464,6 +466,38 @@ class SynapseRequest(Request): def get_user_agent(self): return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + def started_processing(self): + self.site.access_logger.info( + "%s - %s - Received request: %s %s", + self.getClientIP(), + self.site.site_tag, + self.method, + self.get_redacted_uri() + ) + self.start_time = int(time.time() * 1000) + + def finished_processing(self): + self.site.access_logger.info( + "%s - %s - {%s}" + " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", + self.getClientIP(), + self.site.site_tag, + self.authenticated_entity, + int(time.time() * 1000) - self.start_time, + self.sentLength, + self.code, + self.method, + self.get_redacted_uri(), + self.clientproto, + self.get_user_agent(), + ) + + @contextlib.contextmanager + def processing(self): + self.started_processing() + yield + self.finished_processing() + class XForwardedForRequest(SynapseRequest): def __init__(self, *args, **kw): @@ -484,15 +518,15 @@ class XForwardedForRequest(SynapseRequest): class SynapseRequestFactory(object): - def __init__(self, site_tag, x_forwarded_for): - self.site_tag = site_tag + def __init__(self, site, x_forwarded_for): + self.site = site self.x_forwarded_for = x_forwarded_for def __call__(self, *args, **kwargs): if self.x_forwarded_for: - return XForwardedForRequest(self.site_tag, *args, **kwargs) + return XForwardedForRequest(self.site, *args, **kwargs) else: - return SynapseRequest(self.site_tag, *args, **kwargs) + return SynapseRequest(self.site, *args, **kwargs) class SynapseSite(Site): @@ -500,21 +534,17 @@ class SynapseSite(Site): Subclass of a twisted http Site that does access logging with python's standard logging """ - def __init__(self, logger_name, tag, config, resource, *args, **kwargs): + def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) - proxied = config.get("x_forwarded", False) - self.requestFactory = SynapseRequestFactory(tag, proxied) + self.site_tag = site_tag - if proxied: - self._log_formatter = proxiedLogFormatter - else: - self._log_formatter = combinedLogFormatter + proxied = config.get("x_forwarded", False) + self.requestFactory = SynapseRequestFactory(self, proxied) self.access_logger = logging.getLogger(logger_name) def log(self, request): - line = self._log_formatter(self._logDateTime, request) - self.access_logger.info(line) + pass def create_resource_tree(desired_tree, redirect_root_to_web_client=True): diff --git a/synapse/http/server.py b/synapse/http/server.py index 34645a371a..807ff95c65 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -79,65 +79,39 @@ def request_handler(request_handler): _next_request_id += 1 with LoggingContext(request_id) as request_context: request_context.request = request_id - code = None - start = self.clock.time_msec() - try: - logger.info( - "%s - Received request: %s %s", - request.getClientIP(), - request.method, - request.get_redacted_uri() - ) - d = request_handler(self, request) - with PreserveLoggingContext(): - yield d - code = request.code - except CodeMessageException as e: - code = e.code - if isinstance(e, SynapseError): - logger.info( - "%s SynapseError: %s - %s", request, code, e.msg + with request.processing(): + try: + d = request_handler(self, request) + with PreserveLoggingContext(): + yield d + except CodeMessageException as e: + code = e.code + if isinstance(e, SynapseError): + logger.info( + "%s SynapseError: %s - %s", request, code, e.msg + ) + else: + logger.exception(e) + outgoing_responses_counter.inc(request.method, str(code)) + respond_with_json( + request, code, cs_exception(e), send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + version_string=self.version_string, + ) + except: + logger.exception( + "Failed handle request %s.%s on %r: %r", + request_handler.__module__, + request_handler.__name__, + self, + request + ) + respond_with_json( + request, + 500, + {"error": "Internal server error"}, + send_cors=True ) - else: - logger.exception(e) - outgoing_responses_counter.inc(request.method, str(code)) - respond_with_json( - request, code, cs_exception(e), send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, - ) - except: - code = 500 - logger.exception( - "Failed handle request %s.%s on %r: %r", - request_handler.__module__, - request_handler.__name__, - self, - request - ) - respond_with_json( - request, - 500, - {"error": "Internal server error"}, - send_cors=True - ) - finally: - code = str(code) if code else "-" - end = self.clock.time_msec() - logger.info( - "%s - %s - {%s}" - " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", - request.getClientIP(), - request.site_tag, - request.authenticated_entity, - end-start, - request.sentLength, - code, - request.method, - request.get_redacted_uri(), - request.clientproto, - request.get_user_agent(), - ) return wrapped_request_handler From fb7def33446ed8cb01bc6e57cd56a7737b3be8b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Jun 2015 10:09:43 +0100 Subject: [PATCH 15/36] Remove access_token from synapse.rest.client.v1.transactions {get,store}_response logging --- synapse/rest/client/v1/transactions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v1/transactions.py b/synapse/rest/client/v1/transactions.py index d933fea18a..b861069b89 100644 --- a/synapse/rest/client/v1/transactions.py +++ b/synapse/rest/client/v1/transactions.py @@ -39,10 +39,10 @@ class HttpTransactionStore(object): A tuple of (HTTP response code, response content) or None. """ try: - logger.debug("get_response Key: %s TxnId: %s", key, txn_id) + logger.debug("get_response TxnId: %s", txn_id) (last_txn_id, response) = self.transactions[key] if txn_id == last_txn_id: - logger.info("get_response: Returning a response for %s", key) + logger.info("get_response: Returning a response for %s", txn_id) return response except KeyError: pass @@ -58,7 +58,7 @@ class HttpTransactionStore(object): txn_id (str): The transaction ID for this request. response (tuple): A tuple of (HTTP response code, response content) """ - logger.debug("store_response Key: %s TxnId: %s", key, txn_id) + logger.debug("store_response TxnId: %s", txn_id) self.transactions[key] = (txn_id, response) def store_client_transaction(self, request, txn_id, response): From 9a3cd1c00db95e894882b7ab4308022a3b014974 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 16 Jun 2015 16:03:35 +0100 Subject: [PATCH 16/36] Correct -H SERVER_NAME in config-missing complaint message --- synapse/config/_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index d4163d6272..d483c67c6a 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -148,7 +148,7 @@ class Config(object): if not config_args.config_path: config_parser.error( "Must supply a config file.\nA config file can be automatically" - " generated using \"--generate-config -h SERVER_NAME" + " generated using \"--generate-config -H SERVER_NAME" " -c CONFIG-FILE\"" ) @@ -209,7 +209,7 @@ class Config(object): if not config_args.config_path: config_parser.error( "Must supply a config file.\nA config file can be automatically" - " generated using \"--generate-config -h SERVER_NAME" + " generated using \"--generate-config -H SERVER_NAME" " -c CONFIG-FILE\"" ) From 04604062984a6dc27174fa42e4c9f91fbe7b0c42 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Jun 2015 16:59:38 +0100 Subject: [PATCH 17/36] Don't do unecessary db ops in presence.get_state --- synapse/handlers/message.py | 1 + synapse/handlers/presence.py | 28 ++++++++++++++-------------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 867fdbefb0..1ed9e961f1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -403,6 +403,7 @@ class MessageHandler(BaseHandler): target_user=UserID.from_string(m.user_id), auth_user=auth_user, as_event=True, + check_auth=False, ) presence.append(member_presence) except SynapseError: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 023ad33ab0..7c03198313 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -191,24 +191,24 @@ class PresenceHandler(BaseHandler): defer.returnValue(False) @defer.inlineCallbacks - def get_state(self, target_user, auth_user, as_event=False): + def get_state(self, target_user, auth_user, as_event=False, check_auth=True): if self.hs.is_mine(target_user): - visible = yield self.is_presence_visible( - observer_user=auth_user, - observed_user=target_user - ) + if check_auth: + visible = yield self.is_presence_visible( + observer_user=auth_user, + observed_user=target_user + ) - if not visible: - raise SynapseError(404, "Presence information not visible") - state = yield self.store.get_presence_state(target_user.localpart) - if "mtime" in state: - del state["mtime"] - state["presence"] = state.pop("state") + if not visible: + raise SynapseError(404, "Presence information not visible") if target_user in self._user_cachemap: - cached_state = self._user_cachemap[target_user].get_state() - if "last_active" in cached_state: - state["last_active"] = cached_state["last_active"] + state = self._user_cachemap[target_user].get_state() + else: + state = yield self.store.get_presence_state(target_user.localpart) + if "mtime" in state: + del state["mtime"] + state["presence"] = state.pop("state") else: # TODO(paul): Have remote server send us permissions set state = self._get_or_offline_usercache(target_user).get_state() From b849a64f8d467a7b1159b4e8c4db4f3d73696f78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Jun 2015 17:03:24 +0100 Subject: [PATCH 18/36] Use DeferredList --- synapse/handlers/message.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1ed9e961f1..de9c6da9ec 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -396,20 +396,20 @@ class MessageHandler(BaseHandler): ] presence_handler = self.hs.get_handlers().presence_handler - presence = [] - for m in room_members: - try: - member_presence = yield presence_handler.get_state( + presence_defs = yield defer.DeferredList( + [ + presence_handler.get_state( target_user=UserID.from_string(m.user_id), auth_user=auth_user, as_event=True, check_auth=False, ) - presence.append(member_presence) - except SynapseError: - logger.exception( - "Failed to get member presence of %r", m.user_id - ) + for m in room_members + ], + consumeErrors=True, + ) + + presence = [p for success, p in presence_defs if success] time_now = self.clock.time_msec() From eceb554a2f3e5bc3d1eb8b4f87c756dfd4c48340 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Jun 2015 17:12:27 +0100 Subject: [PATCH 19/36] Use another deferred list --- synapse/handlers/message.py | 45 ++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index de9c6da9ec..e324662f18 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -380,15 +380,6 @@ class MessageHandler(BaseHandler): if limit is None: limit = 10 - messages, token = yield self.store.get_recent_events_for_room( - room_id, - limit=limit, - end_token=now_token.room_key, - ) - - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) - room_members = [ m for m in current_state.values() if m.type == EventTypes.Member @@ -396,20 +387,38 @@ class MessageHandler(BaseHandler): ] presence_handler = self.hs.get_handlers().presence_handler - presence_defs = yield defer.DeferredList( + + @defer.inlineCallbacks + def get_presence(): + presence_defs = yield defer.DeferredList( + [ + presence_handler.get_state( + target_user=UserID.from_string(m.user_id), + auth_user=auth_user, + as_event=True, + check_auth=False, + ) + for m in room_members + ], + consumeErrors=True, + ) + + defer.returnValue([p for success, p in presence_defs if success]) + + presence, (messages, token) = yield defer.gatherResults( [ - presence_handler.get_state( - target_user=UserID.from_string(m.user_id), - auth_user=auth_user, - as_event=True, - check_auth=False, + get_presence(), + self.store.get_recent_events_for_room( + room_id, + limit=limit, + end_token=now_token.room_key, ) - for m in room_members ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) - presence = [p for success, p in presence_defs if success] + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() From d88e20cdb988a44155ef087ef28cd37c982cfe5d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 17 Jun 2015 17:26:03 +0100 Subject: [PATCH 20/36] Fix bug where synapse was sending AS user queries incorrectly. Bug introduced in 92b20713d7c6346aeb20dc09963081e472752bb5 which reversed the comparison when checking if a user existed in the users table. Added UTs to prevent this happening again. --- synapse/handlers/appservice.py | 2 +- tests/handlers/test_appservice.py | 43 +++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8269482e47..1240e51649 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -177,7 +177,7 @@ class ApplicationServicesHandler(object): return user_info = yield self.store.get_user_by_id(user_id) - if not user_info: + if user_info: defer.returnValue(False) return diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 06cb1dd4cf..9e95d1e532 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -57,6 +57,49 @@ class AppServiceHandlerTestCase(unittest.TestCase): interested_service, event ) + @defer.inlineCallbacks + def test_query_user_exists_unknown_user(self): + user_id = "@someone:anywhere" + services = [self._mkservice(is_interested=True)] + services[0].is_interested_in_user = Mock(return_value=True) + self.mock_store.get_app_services = Mock(return_value=services) + self.mock_store.get_user_by_id = Mock(return_value=None) + + event = Mock( + sender=user_id, + type="m.room.message", + room_id="!foo:bar" + ) + self.mock_as_api.push = Mock() + self.mock_as_api.query_user = Mock() + yield self.handler.notify_interested_services(event) + self.mock_as_api.query_user.assert_called_once_with( + services[0], user_id + ) + + @defer.inlineCallbacks + def test_query_user_exists_known_user(self): + user_id = "@someone:anywhere" + services = [self._mkservice(is_interested=True)] + services[0].is_interested_in_user = Mock(return_value=True) + self.mock_store.get_app_services = Mock(return_value=services) + self.mock_store.get_user_by_id = Mock(return_value={ + "name": user_id + }) + + event = Mock( + sender=user_id, + type="m.room.message", + room_id="!foo:bar" + ) + self.mock_as_api.push = Mock() + self.mock_as_api.query_user = Mock() + yield self.handler.notify_interested_services(event) + self.assertFalse( + self.mock_as_api.query_user.called, + "query_user called when it shouldn't have been." + ) + @defer.inlineCallbacks def test_query_room_alias_exists(self): room_alias_str = "#foo:bar" From 050ebccf309916c57c58b67776e66d98fffbff0f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 11:36:26 +0100 Subject: [PATCH 21/36] Fix notifier leak --- synapse/notifier.py | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 078abfc56d..27c034ed51 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -308,49 +308,48 @@ class Notifier(object): else: current_token = user_stream.current_token - listener = [_NotificationListener(deferred)] - - if timeout and not current_token.is_after(from_token): - user_stream.listeners.add(listener[0]) - + result = None if current_token.is_after(from_token): result = yield callback(from_token, current_token) - else: - result = None - - timer = [None] if result: - user_stream.listeners.discard(listener[0]) defer.returnValue(result) - return if timeout: + timer = [None] + listeners = [] timed_out = [False] + def notify_listeners(): + user_stream.listeners.difference_update(listeners) + for listener in listeners: + listener.notify(current_token) + del listeners[:] + def _timeout_listener(): timed_out[0] = True timer[0] = None - user_stream.listeners.discard(listener[0]) - listener[0].notify(current_token) + notify_listeners() # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) while not result and not timed_out[0]: - new_token = yield deferred deferred = defer.Deferred() - listener[0] = _NotificationListener(deferred) - user_stream.listeners.add(listener[0]) + notify_listeners() + listeners.append(_NotificationListener(deferred)) + user_stream.listeners.update(listeners) + new_token = yield deferred + result = yield callback(current_token, new_token) current_token = new_token - if timer[0] is not None: - try: - self.clock.cancel_call_later(timer[0]) - except: - logger.exception("Failed to cancel notifer timer") + if timer[0] is not None: + try: + self.clock.cancel_call_later(timer[0]) + except: + logger.exception("Failed to cancel notifer timer") defer.returnValue(result) From 22049ea700173017cf2f8e88fb8848e06b82f9b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 15:49:05 +0100 Subject: [PATCH 22/36] Refactor the notifier.wait_for_events code to be clearer. Add _NotifierUserStream.new_listener that accpets a token to avoid races. --- synapse/notifier.py | 120 ++++++++++++++++++--------------------- synapse/util/__init__.py | 8 ++- synapse/util/async.py | 13 ++++- 3 files changed, 72 insertions(+), 69 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 27c034ed51..e441561029 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor +from synapse.util.async import run_on_reactor, ObservableDeferred from synapse.types import StreamToken import synapse.metrics @@ -45,20 +45,16 @@ class _NotificationListener(object): The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. """ + __slots__ = ["deferred"] def __init__(self, deferred): - self.deferred = deferred + object.__setattr__(self, "deferred", deferred) - def notified(self): - return self.deferred.called + def __getattr__(self, name): + return getattr(self.deferred, name) - def notify(self, token): - """ Inform whoever is listening about the new events. - """ - try: - self.deferred.callback(token) - except defer.AlreadyCalledError: - pass + def __setattr__(self, name, value): + setattr(self.deferred, name, value) class _NotifierUserStream(object): @@ -75,11 +71,12 @@ class _NotifierUserStream(object): appservice=None): self.user = str(user) self.appservice = appservice - self.listeners = set() self.rooms = set(rooms) self.current_token = current_token self.last_notified_ms = time_now_ms + self.notify_deferred = ObservableDeferred(defer.Deferred()) + def notify(self, stream_key, stream_id, time_now_ms): """Notify any listeners for this user of a new event from an event source. @@ -91,12 +88,10 @@ class _NotifierUserStream(object): self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) - if self.listeners: - self.last_notified_ms = time_now_ms - listeners = self.listeners - self.listeners = set() - for listener in listeners: - listener.notify(self.current_token) + self.last_notified_ms = time_now_ms + noify_deferred = self.notify_deferred + self.notify_deferred = ObservableDeferred(defer.Deferred()) + noify_deferred.callback(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier @@ -114,6 +109,18 @@ class _NotifierUserStream(object): self.appservice, set() ).discard(self) + def count_listeners(self): + return len(self.noify_deferred.observers()) + + def new_listener(self, token): + """Returns a deferred that is resolved when there is a new token + greater than the given token. + """ + if self.current_token.is_after(token): + return _NotificationListener(defer.succeed(self.current_token)) + else: + return _NotificationListener(self.notify_deferred.observe()) + class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -158,7 +165,7 @@ class Notifier(object): for x in self.appservice_to_user_streams.values(): all_user_streams |= x - return sum(len(stream.listeners) for stream in all_user_streams) + return sum(stream.count_listeners() for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) metrics.register_callback( @@ -286,10 +293,6 @@ class Notifier(object): """Wait until the callback returns a non empty response or the timeout fires. """ - - deferred = defer.Deferred() - time_now_ms = self.clock.time_msec() - user = str(user) user_stream = self.user_to_user_stream.get(user) if user_stream is None: @@ -302,54 +305,38 @@ class Notifier(object): rooms=rooms, appservice=appservice, current_token=current_token, - time_now_ms=time_now_ms, + time_now_ms=self.clock.time_msec(), ) self._register_with_keys(user_stream) - else: - current_token = user_stream.current_token result = None - if current_token.is_after(from_token): - result = yield callback(from_token, current_token) - - if result: - defer.returnValue(result) - if timeout: - timer = [None] - listeners = [] - timed_out = [False] + listener = None + timer = self.clock.call_later( + timeout/1000., lambda: listener.cancel() + ) - def notify_listeners(): - user_stream.listeners.difference_update(listeners) - for listener in listeners: - listener.notify(current_token) - del listeners[:] - - def _timeout_listener(): - timed_out[0] = True - timer[0] = None - notify_listeners() - - # We create multiple notification listeners so we have to manage - # canceling the timeout ourselves. - timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) - - while not result and not timed_out[0]: - deferred = defer.Deferred() - notify_listeners() - listeners.append(_NotificationListener(deferred)) - user_stream.listeners.update(listeners) - new_token = yield deferred - - result = yield callback(current_token, new_token) - current_token = new_token - - if timer[0] is not None: + prev_token = from_token + while not result: try: - self.clock.cancel_call_later(timer[0]) - except: - logger.exception("Failed to cancel notifer timer") + # We need to start listening to the streams *before* doing + # the callback, as otherwise we may miss something. + current_token = user_stream.current_token + + result = yield callback(prev_token, current_token) + if result: + break + + prev_token = current_token + listener = user_stream.new_listener(prev_token) + yield listener.deferred + except defer.CancelledError: + break + + self.clock.cancel_call_later(timer, ignore_errs=True) + else: + current_token = user_stream.current_token + result = yield callback(from_token, current_token) defer.returnValue(result) @@ -367,6 +354,9 @@ class Notifier(object): @defer.inlineCallbacks def check_for_updates(before_token, after_token): + if not after_token.is_after(before_token): + defer.returnValue(None) + events = [] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -401,7 +391,7 @@ class Notifier(object): expired_streams = [] expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS for stream in self.user_to_user_stream.values(): - if stream.listeners: + if stream.count_listeners(): continue if stream.last_notified_ms < expire_before_ts: expired_streams.append(stream) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 260714ccc2..07ff25cef3 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -91,8 +91,12 @@ class Clock(object): with PreserveLoggingContext(): return reactor.callLater(delay, wrapped_callback, *args, **kwargs) - def cancel_call_later(self, timer): - timer.cancel() + def cancel_call_later(self, timer, ignore_errs=False): + try: + timer.cancel() + except: + if not ignore_errs: + raise def time_bound_deferred(self, given_deferred, time_out): if given_deferred.called: diff --git a/synapse/util/async.py b/synapse/util/async.py index 1c2044e5b4..6f567bcaa6 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -45,7 +45,7 @@ class ObservableDeferred(object): def __init__(self, deferred, consumeErrors=False): object.__setattr__(self, "_deferred", deferred) object.__setattr__(self, "_result", None) - object.__setattr__(self, "_observers", []) + object.__setattr__(self, "_observers", set()) def callback(r): self._result = (True, r) @@ -74,12 +74,21 @@ class ObservableDeferred(object): def observe(self): if not self._result: d = defer.Deferred() - self._observers.append(d) + + def remove(r): + self._observers.discard(d) + return r + d.addBoth(remove) + + self._observers.add(d) return d else: success, res = self._result return defer.succeed(res) if success else defer.fail(res) + def observers(self): + return self._observers + def __getattr__(self, name): return getattr(self._deferred, name) From 1f24c2e5896c1c9e5c7fded779fdb0ddcf2a3801 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 16:09:53 +0100 Subject: [PATCH 23/36] Don't bother proxying lookups on _NotificationListener to underlying deferred --- synapse/notifier.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e441561029..053475a2f5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -48,13 +48,7 @@ class _NotificationListener(object): __slots__ = ["deferred"] def __init__(self, deferred): - object.__setattr__(self, "deferred", deferred) - - def __getattr__(self, name): - return getattr(self.deferred, name) - - def __setattr__(self, name, value): - setattr(self.deferred, name, value) + self.deferred = deferred class _NotifierUserStream(object): @@ -313,14 +307,12 @@ class Notifier(object): if timeout: listener = None timer = self.clock.call_later( - timeout/1000., lambda: listener.cancel() + timeout/1000., lambda: listener.deferred.cancel() ) prev_token = from_token while not result: try: - # We need to start listening to the streams *before* doing - # the callback, as otherwise we may miss something. current_token = user_stream.current_token result = yield callback(prev_token, current_token) From 73513ececce51427971d49f0d55bfa76dafc391e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 16:15:10 +0100 Subject: [PATCH 24/36] Documentation --- synapse/notifier.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 053475a2f5..5475ee36ca 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -305,10 +305,13 @@ class Notifier(object): result = None if timeout: - listener = None - timer = self.clock.call_later( - timeout/1000., lambda: listener.deferred.cancel() - ) + listener = None # Will be set to a _NotificationListener that + # we'll be waiting on. Allows us to cancel it. + + def timed_out(): + if listener: + listener.deferred.cancel() + timer = self.clock.call_later(timeout/1000., timed_out) prev_token = from_token while not result: @@ -319,6 +322,10 @@ class Notifier(object): if result: break + # Now we wait for the _NotifierUserStream to be told there + # is a new token. + # We need to supply the token we supplied to callback so + # that we don't miss any current_token updates. prev_token = current_token listener = user_stream.new_listener(prev_token) yield listener.deferred From 6f6ebd216d6a84ee72f1becfb31bf40ba960e3e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 17:00:32 +0100 Subject: [PATCH 25/36] PEP8 --- synapse/notifier.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 5475ee36ca..d6655f3f5a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -305,8 +305,9 @@ class Notifier(object): result = None if timeout: - listener = None # Will be set to a _NotificationListener that - # we'll be waiting on. Allows us to cancel it. + # Will be set to a _NotificationListener that we'll be waiting on. + # Allows us to cancel it. + listener = None def timed_out(): if listener: From 9e5a353663c111cfe34b727f75eed3616201ef72 Mon Sep 17 00:00:00 2001 From: Eric Myhre Date: Thu, 18 Jun 2015 23:38:20 -0500 Subject: [PATCH 26/36] Make upload dir a configurable path. Fixes SYN-425. Signed-off-by: Eric Myhre --- synapse/app/homeserver.py | 3 +-- synapse/config/repository.py | 5 +++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 95e9122d3e..9aec23d065 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -112,7 +112,7 @@ class SynapseHomeServer(HomeServer): def build_resource_for_content_repo(self): return ContentRepoResource( - self, self.upload_dir, self.auth, self.content_addr + self, self.config.uploads_path, self.auth, self.content_addr ) def build_resource_for_media_repository(self): @@ -375,7 +375,6 @@ def setup(config_options): hs = SynapseHomeServer( config.server_name, - upload_dir=os.path.abspath("uploads"), db_config=config.database_config, tls_context_factory=tls_context_factory, config=config, diff --git a/synapse/config/repository.py b/synapse/config/repository.py index adaf4e4bb2..6891abd71d 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -21,13 +21,18 @@ class ContentRepositoryConfig(Config): self.max_upload_size = self.parse_size(config["max_upload_size"]) self.max_image_pixels = self.parse_size(config["max_image_pixels"]) self.media_store_path = self.ensure_directory(config["media_store_path"]) + self.uploads_path = self.ensure_directory(config["uploads_path"]) def default_config(self, config_dir_path, server_name): media_store = self.default_path("media_store") + uploads_path = self.default_path("uploads") return """ # Directory where uploaded images and attachments are stored. media_store_path: "%(media_store)s" + # Directory where in-progress uploads are stored. + uploads_path: "%(uploads_path)s" + # The largest allowed upload size in bytes max_upload_size: "10M" From ad460a83151d3c3d7c4980e6312537871a3234ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 09:23:26 +0100 Subject: [PATCH 27/36] Add Eric Myhre to AUTHORS --- AUTHORS.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/AUTHORS.rst b/AUTHORS.rst index 3a457cd9fc..d7224ff5de 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -38,3 +38,7 @@ Brabo Ivan Shapovalov * contrib/systemd: a sample systemd unit file and a logger configuration + +Eric Myhre + * Fix bug where ``media_store_path`` config option was ignored by v0 content + repository API. From 9d112f444036eec00bfc44f8947f60fd48f9c7e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 10:13:03 +0100 Subject: [PATCH 28/36] Add IDs to outbound transactions --- synapse/http/matrixfederationclient.py | 124 ++++++++++++++----------- 1 file changed, 71 insertions(+), 53 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7f3d8fc884..902b278419 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json import simplejson as json import logging +import sys import urllib import urlparse logger = logging.getLogger(__name__) +outbound_logger = logging.getLogger("synapse.http.outbound") metrics = synapse.metrics.get_metrics_for(__name__) @@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self.version_string = hs.version_string + self._next_id = 1 + @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", @@ -123,8 +127,13 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.info("Sending request to %s: %s %s", - destination, method, url_bytes) + txn_id = "%s-%s" % (method, self._next_id) + self._next_id = (self._next_id + 1) % (sys.maxint - 1) + + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url_bytes + ) logger.debug( "Types: %s", @@ -141,63 +150,72 @@ class MatrixFederationHttpClient(object): endpoint = self._getEndpoint(reactor, destination) - while True: - producer = None - if body_callback: - producer = body_callback(method, url_bytes, headers_dict) + log_result = None + try: + while True: + producer = None + if body_callback: + producer = body_callback(method, url_bytes, headers_dict) - try: - request_deferred = preserve_context_over_fn( - self.agent.request, - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) - - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=timeout/1000. if timeout else 60, - ) - - logger.debug("Got response to %s", method) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn( - "DNS Lookup failed to %s with %s", + try: + request_deferred = preserve_context_over_fn( + self.agent.request, destination, - e + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer ) - raise - logger.warn( - "Sending request failed to %s: %s %s: %s - %s", - destination, - method, - url_bytes, - type(e).__name__, - _flatten_response_never_received(e), - ) + response = yield self.clock.time_bound_deferred( + request_deferred, + time_out=timeout/1000. if timeout else 60, + ) - if retries_left and not timeout: - yield sleep(2 ** (5 - retries_left)) - retries_left -= 1 - else: - raise + logger.debug("{%s} Got response to %s", txn_id, method) + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise - logger.info( - "Received response %d %s for %s: %s %s", - response.code, - response.phrase, - destination, - method, - url_bytes - ) + logger.warn( + "{%s} Sending request failed to %s: %s %s: %s - %s", + txn_id, + destination, + method, + url_bytes, + type(e).__name__, + _flatten_response_never_received(e), + ) + + log_result = "%s - %s" % ( + type(e).__name__, _flatten_response_never_received(e), + ) + + if retries_left and not timeout: + yield sleep(2 ** (5 - retries_left)) + retries_left -= 1 + else: + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) if 200 <= response.code < 300: pass From eb928c9f52697bef99b982a1aa5229a20e65c447 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 10:16:48 +0100 Subject: [PATCH 29/36] Add site_tag to logger --- synapse/app/homeserver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7effedf7dc..c02fc889c8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -200,7 +200,7 @@ class SynapseHomeServer(HomeServer): reactor.listenSSL( port, SynapseSite( - "synapse.access.https", + "synapse.access.https.%s" % (site_tag,), site_tag, listener_config, root_resource, @@ -212,7 +212,7 @@ class SynapseHomeServer(HomeServer): reactor.listenTCP( port, SynapseSite( - "synapse.access.https", + "synapse.access.http.%s" % (site_tag,), site_tag, listener_config, root_resource, From 18968efa0afacc72b91d626d1a6adc2d5476b130 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 10:18:02 +0100 Subject: [PATCH 30/36] Remove stale debug lines --- synapse/http/matrixfederationclient.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 902b278419..1b90692731 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -135,15 +135,6 @@ class MatrixFederationHttpClient(object): txn_id, destination, method, url_bytes ) - logger.debug( - "Types: %s", - [ - type(destination), type(method), type(path_bytes), - type(param_bytes), - type(query_bytes) - ] - ) - # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) retries_left = 5 @@ -175,7 +166,6 @@ class MatrixFederationHttpClient(object): time_out=timeout/1000. if timeout else 60, ) - logger.debug("{%s} Got response to %s", txn_id, method) log_result = "%d %s" % (response.code, response.phrase,) break except Exception as e: From 653533a3dac1790f218aa4978f775f8098656b11 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 11:45:55 +0100 Subject: [PATCH 31/36] Fix log context when sending requests --- synapse/http/client.py | 10 ++--- synapse/http/matrixfederationclient.py | 58 +++++++++++++------------- synapse/util/logcontext.py | 52 ++++++++++++++--------- 3 files changed, 68 insertions(+), 52 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index 9091ae2d38..49737d55da 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -109,7 +109,7 @@ class SimpleHttpClient(object): bodyProducer=FileBodyProducer(StringIO(query_bytes)) ) - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -128,7 +128,7 @@ class SimpleHttpClient(object): bodyProducer=FileBodyProducer(StringIO(json_str)) ) - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -161,7 +161,7 @@ class SimpleHttpClient(object): }) ) - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) if 200 <= response.code < 300: defer.returnValue(json.loads(body)) @@ -204,7 +204,7 @@ class SimpleHttpClient(object): bodyProducer=FileBodyProducer(StringIO(json_str)) ) - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) if 200 <= response.code < 300: defer.returnValue(json.loads(body)) @@ -238,7 +238,7 @@ class CaptchaServerHttpClient(SimpleHttpClient): ) try: - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(body) except PartialDownloadError as e: # twisted dislikes google's response, no content length. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 1b90692731..ed47e701e7 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -127,7 +127,7 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - txn_id = "%s-%s" % (method, self._next_id) + txn_id = "%s-O-%s" % (method, self._next_id) self._next_id = (self._next_id + 1) % (sys.maxint - 1) outbound_logger.info( @@ -139,7 +139,9 @@ class MatrixFederationHttpClient(object): # (once we have reliable transactions in place) retries_left = 5 - endpoint = self._getEndpoint(reactor, destination) + endpoint = preserve_context_over_fn( + self._getEndpoint, reactor, destination + ) log_result = None try: @@ -149,21 +151,25 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, url_bytes, headers_dict) try: - request_deferred = preserve_context_over_fn( - self.agent.request, - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + def send_request(): + request_deferred = self.agent.request( + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=timeout/1000. if timeout else 60, + return self.clock.time_bound_deferred( + request_deferred, + time_out=timeout/1000. if timeout else 60, + ) + + response = yield preserve_context_over_fn( + send_request, ) log_result = "%d %s" % (response.code, response.phrase,) @@ -212,7 +218,7 @@ class MatrixFederationHttpClient(object): else: # :'( # Update transactions table? - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) raise HttpResponseException( response.code, response.phrase, body ) @@ -292,10 +298,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") - + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @defer.inlineCallbacks @@ -338,9 +341,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -398,9 +399,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -443,7 +442,10 @@ class MatrixFederationHttpClient(object): headers = dict(response.headers.getAllRawHeaders()) try: - length = yield _readBodyToFile(response, output_stream, max_size) + length = yield preserve_context_over_fn( + _readBodyToFile, + response, output_stream, max_size + ) except: logger.exception("Failed to download body") raise diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a92d518b43..7e6062c1b8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -140,6 +140,37 @@ class PreserveLoggingContext(object): ) +class _PreservingContextDeferred(defer.Deferred): + """A deferred that ensures that all callbacks and errbacks are called with + the given logging context. + """ + def __init__(self, context): + self._log_context = context + defer.Deferred.__init__(self) + + def addCallbacks(self, callback, errback=None, + callbackArgs=None, callbackKeywords=None, + errbackArgs=None, errbackKeywords=None): + callback = self._wrap_callback(callback) + errback = self._wrap_callback(errback) + return defer.Deferred.addCallbacks( + self, callback, + errback=errback, + callbackArgs=callbackArgs, + callbackKeywords=callbackKeywords, + errbackArgs=errbackArgs, + errbackKeywords=errbackKeywords, + ) + + def _wrap_callback(self, f): + def g(res, *args, **kwargs): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = self._log_context + res = f(res, *args, **kwargs) + return res + return g + + def preserve_context_over_fn(fn, *args, **kwargs): """Takes a function and invokes it with the given arguments, but removes and restores the current logging context while doing so. @@ -160,24 +191,7 @@ def preserve_context_over_deferred(deferred): """Given a deferred wrap it such that any callbacks added later to it will be invoked with the current context. """ - d = defer.Deferred() - current_context = LoggingContext.current_context() - - def cb(res): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = current_context - res = d.callback(res) - return res - - def eb(failure): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = current_context - res = d.errback(failure) - return res - - if deferred.called: - return deferred - - deferred.addCallbacks(cb, eb) + d = _PreservingContextDeferred(current_context) + deferred.chainDeferred(d) return d From a68abc79fd90465aed6ead3eec1a5704c64a1682 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 11:48:55 +0100 Subject: [PATCH 32/36] Add comment on cancellation of observers --- synapse/util/async.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/util/async.py b/synapse/util/async.py index 6f567bcaa6..5a1d545c96 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -38,6 +38,9 @@ class ObservableDeferred(object): deferred. If consumeErrors is true errors will be captured from the origin deferred. + + Cancelling or otherwise resolving an observer will not affect the original + ObservableDeferred. """ __slots__ = ["_deferred", "_observers", "_result"] From 2f556e0c55c816b1344c88b964e6f86e397367d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 16:22:53 +0100 Subject: [PATCH 33/36] Fix typo --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index d6655f3f5a..bdd03dcbe8 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -104,7 +104,7 @@ class _NotifierUserStream(object): ).discard(self) def count_listeners(self): - return len(self.noify_deferred.observers()) + return len(self.notify_deferred.observers()) def new_listener(self, token): """Returns a deferred that is resolved when there is a new token From 9c72011fd72099f02b7b9ec0f55b83f472bd11a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Jun 2015 10:12:19 +0100 Subject: [PATCH 34/36] Bumb version --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/__init__.py b/synapse/__init__.py index d111335a1a..1f09df79cd 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.9.2-r2" +__version__ = "0.9.3-rc1" From f043b14bc024e5b05cf4356b301f3ea75a4c9674 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Jun 2015 10:22:42 +0100 Subject: [PATCH 35/36] Update change log --- CHANGES.rst | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 62aecda27a..caebde3341 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,21 @@ +Changes in synapse v0.9.3 (2015-XX-XX) +====================================== + +General: + +* Fix a memory leak in the notifier. (SYN-412) +* Improve performance of room initial sync. (SYN-418) +* General improvements to logging. +* Remove ``access_token`` query params from ``INFO`` level logging. + +Configuration: + +* Add support for specifying and configuring multiple listeners. (SYN-389) + +Application services: + +* Fix bug where synapse failed to send user queries to application services. + Changes in synapse v0.9.2-r2 (2015-06-15) ========================================= From 480d7203882c71fdee15d464766ba459ac065133 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jul 2015 15:12:32 +0100 Subject: [PATCH 36/36] Bump changelog and version to v0.9.3 --- CHANGES.rst | 7 ++++++- synapse/__init__.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index caebde3341..6a5fce899a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,11 @@ -Changes in synapse v0.9.3 (2015-XX-XX) +Changes in synapse v0.9.3 (2015-07-01) ====================================== +No changes from v0.9.3 Release Candidate 1. + +Changes in synapse v0.9.3-rc1 (2015-06-23) +========================================== + General: * Fix a memory leak in the notifier. (SYN-412) diff --git a/synapse/__init__.py b/synapse/__init__.py index 1f09df79cd..96e37308d6 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.9.3-rc1" +__version__ = "0.9.3"