HTTP Replication Client (#15470)
Separate out a HTTP client for replication in preparation for also supporting using UNIX sockets. The major difference from the base class is that this does not use treq to handle HTTP requests.
This commit is contained in:
parent
ab4535b608
commit
d3bd03559b
|
@ -0,0 +1 @@
|
|||
Create new `Client` for use with HTTP Replication between workers. Contributed by Jason Little.
|
|
@ -74,8 +74,9 @@ from twisted.web.iweb import (
|
|||
from synapse.api.errors import Codes, HttpResponseException, SynapseError
|
||||
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
|
||||
from synapse.http.proxyagent import ProxyAgent
|
||||
from synapse.http.replicationagent import ReplicationAgent
|
||||
from synapse.http.types import QueryParams
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||
from synapse.types import ISynapseReactor
|
||||
from synapse.util import json_decoder
|
||||
|
@ -819,6 +820,136 @@ class SimpleHttpClient(BaseHttpClient):
|
|||
)
|
||||
|
||||
|
||||
class ReplicationClient(BaseHttpClient):
|
||||
"""Client for connecting to replication endpoints via HTTP and HTTPS.
|
||||
|
||||
Attributes:
|
||||
agent: The custom Twisted Agent used for constructing the connection.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
hs: The HomeServer instance to pass in
|
||||
"""
|
||||
super().__init__(hs)
|
||||
|
||||
# Use a pool, but a very small one.
|
||||
pool = HTTPConnectionPool(self.reactor)
|
||||
pool.maxPersistentPerHost = 5
|
||||
pool.cachedConnectionTimeout = 2 * 60
|
||||
|
||||
self.agent: IAgent = ReplicationAgent(
|
||||
hs.get_reactor(),
|
||||
contextFactory=hs.get_http_client_context_factory(),
|
||||
pool=pool,
|
||||
)
|
||||
|
||||
async def request(
|
||||
self,
|
||||
method: str,
|
||||
uri: str,
|
||||
data: Optional[bytes] = None,
|
||||
headers: Optional[Headers] = None,
|
||||
) -> IResponse:
|
||||
"""
|
||||
Make a request, differs from BaseHttpClient.request in that it does not use treq.
|
||||
|
||||
Args:
|
||||
method: HTTP method to use.
|
||||
uri: URI to query.
|
||||
data: Data to send in the request body, if applicable.
|
||||
headers: Request headers.
|
||||
|
||||
Returns:
|
||||
Response object, once the headers have been read.
|
||||
|
||||
Raises:
|
||||
RequestTimedOutError if the request times out before the headers are read
|
||||
|
||||
"""
|
||||
outgoing_requests_counter.labels(method).inc()
|
||||
|
||||
logger.debug("Sending request %s %s", method, uri)
|
||||
|
||||
with start_active_span(
|
||||
"outgoing-replication-request",
|
||||
tags={
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
|
||||
tags.HTTP_METHOD: method,
|
||||
tags.HTTP_URL: uri,
|
||||
},
|
||||
finish_on_close=True,
|
||||
):
|
||||
try:
|
||||
body_producer = None
|
||||
if data is not None:
|
||||
body_producer = QuieterFileBodyProducer(
|
||||
BytesIO(data),
|
||||
cooperator=self._cooperator,
|
||||
)
|
||||
|
||||
# Skip the fancy treq stuff, we don't need cookie handling, redirects,
|
||||
# or buffered response bodies.
|
||||
method_bytes = method.encode("ascii")
|
||||
uri_bytes = uri.encode("ascii")
|
||||
|
||||
# To preserve the logging context, the timeout is treated
|
||||
# in a similar way to `defer.gatherResults`:
|
||||
# * Each logging context-preserving fork is wrapped in
|
||||
# `run_in_background`. In this case there is only one,
|
||||
# since the timeout fork is not logging-context aware.
|
||||
# * The `Deferred` that joins the forks back together is
|
||||
# wrapped in `make_deferred_yieldable` to restore the
|
||||
# logging context regardless of the path taken.
|
||||
# (The logic/comments for this came from MatrixFederationHttpClient)
|
||||
request_deferred = run_in_background(
|
||||
self.agent.request,
|
||||
method_bytes,
|
||||
uri_bytes,
|
||||
headers,
|
||||
bodyProducer=body_producer,
|
||||
)
|
||||
|
||||
# we use our own timeout mechanism rather than twisted's as a workaround
|
||||
# for https://twistedmatrix.com/trac/ticket/9534.
|
||||
# (Updated url https://github.com/twisted/twisted/issues/9534)
|
||||
request_deferred = timeout_deferred(
|
||||
request_deferred,
|
||||
60,
|
||||
self.hs.get_reactor(),
|
||||
)
|
||||
|
||||
# turn timeouts into RequestTimedOutErrors
|
||||
request_deferred.addErrback(_timeout_to_request_timed_out_error)
|
||||
|
||||
response = await make_deferred_yieldable(request_deferred)
|
||||
|
||||
incoming_responses_counter.labels(method, response.code).inc()
|
||||
logger.info(
|
||||
"Received response to %s %s: %s",
|
||||
method,
|
||||
uri,
|
||||
response.code,
|
||||
)
|
||||
return response
|
||||
except Exception as e:
|
||||
incoming_responses_counter.labels(method, "ERR").inc()
|
||||
logger.info(
|
||||
"Error sending request to %s %s: %s %s",
|
||||
method,
|
||||
uri,
|
||||
type(e).__name__,
|
||||
e.args[0],
|
||||
)
|
||||
set_tag(tags.ERROR, True)
|
||||
set_tag("error_reason", e.args[0])
|
||||
raise
|
||||
|
||||
|
||||
def _timeout_to_request_timed_out_error(f: Failure) -> Failure:
|
||||
if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
|
||||
# The TCP connection has its own timeout (set by the 'connectTimeout' param
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
||||
from twisted.internet.interfaces import IStreamClientEndpoint
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
|
||||
from twisted.web.error import SchemeNotSupported
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import (
|
||||
IAgent,
|
||||
IAgentEndpointFactory,
|
||||
IBodyProducer,
|
||||
IPolicyForHTTPS,
|
||||
IResponse,
|
||||
)
|
||||
|
||||
from synapse.types import ISynapseReactor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@implementer(IAgentEndpointFactory)
|
||||
class ReplicationEndpointFactory:
|
||||
"""Connect to a given TCP socket"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reactor: ISynapseReactor,
|
||||
context_factory: IPolicyForHTTPS,
|
||||
) -> None:
|
||||
self.reactor = reactor
|
||||
self.context_factory = context_factory
|
||||
|
||||
def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
|
||||
"""
|
||||
This part of the factory decides what kind of endpoint is being connected to.
|
||||
|
||||
Args:
|
||||
uri: The pre-parsed URI object containing all the uri data
|
||||
|
||||
Returns: The correct client endpoint object
|
||||
"""
|
||||
if uri.scheme in (b"http", b"https"):
|
||||
endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port)
|
||||
if uri.scheme == b"https":
|
||||
endpoint = wrapClientTLS(
|
||||
self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint
|
||||
)
|
||||
return endpoint
|
||||
else:
|
||||
raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}")
|
||||
|
||||
|
||||
@implementer(IAgent)
|
||||
class ReplicationAgent(_AgentBase):
|
||||
"""
|
||||
Client for connecting to replication endpoints via HTTP and HTTPS.
|
||||
|
||||
Much of this code is copied from Twisted's twisted.web.client.Agent.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reactor: ISynapseReactor,
|
||||
contextFactory: IPolicyForHTTPS,
|
||||
connectTimeout: Optional[float] = None,
|
||||
bindAddress: Optional[bytes] = None,
|
||||
pool: Optional[HTTPConnectionPool] = None,
|
||||
):
|
||||
"""
|
||||
Create a ReplicationAgent.
|
||||
|
||||
Args:
|
||||
reactor: A reactor for this Agent to place outgoing connections.
|
||||
contextFactory: A factory for TLS contexts, to control the
|
||||
verification parameters of OpenSSL. The default is to use a
|
||||
BrowserLikePolicyForHTTPS, so unless you have special
|
||||
requirements you can leave this as-is.
|
||||
connectTimeout: The amount of time that this Agent will wait
|
||||
for the peer to accept a connection.
|
||||
bindAddress: The local address for client sockets to bind to.
|
||||
pool: An HTTPConnectionPool instance, or None, in which
|
||||
case a non-persistent HTTPConnectionPool instance will be
|
||||
created.
|
||||
"""
|
||||
_AgentBase.__init__(self, reactor, pool)
|
||||
endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory)
|
||||
self._endpointFactory = endpoint_factory
|
||||
|
||||
def request(
|
||||
self,
|
||||
method: bytes,
|
||||
uri: bytes,
|
||||
headers: Optional[Headers] = None,
|
||||
bodyProducer: Optional[IBodyProducer] = None,
|
||||
) -> "defer.Deferred[IResponse]":
|
||||
"""
|
||||
Issue a request to the server indicated by the given uri.
|
||||
|
||||
An existing connection from the connection pool may be used or a new
|
||||
one may be created.
|
||||
|
||||
Currently, HTTP and HTTPS schemes are supported in uri.
|
||||
|
||||
This is copied from twisted.web.client.Agent, except:
|
||||
|
||||
* It uses a different pool key (combining the host & port).
|
||||
* It does not call _ensureValidURI(...) since it breaks on some
|
||||
UNIX paths.
|
||||
|
||||
See: twisted.web.iweb.IAgent.request
|
||||
"""
|
||||
parsedURI = URI.fromBytes(uri)
|
||||
try:
|
||||
endpoint = self._endpointFactory.endpointForURI(parsedURI)
|
||||
except SchemeNotSupported:
|
||||
return defer.fail(Failure())
|
||||
|
||||
# This sets the Pool key to be:
|
||||
# (http(s), <host:ip>)
|
||||
key = (parsedURI.scheme, parsedURI.netloc)
|
||||
|
||||
# _requestWithEndpoint comes from _AgentBase class
|
||||
return self._requestWithEndpoint(
|
||||
key,
|
||||
endpoint,
|
||||
method,
|
||||
parsedURI,
|
||||
headers,
|
||||
bodyProducer,
|
||||
parsedURI.originForm,
|
||||
)
|
|
@ -194,7 +194,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
|||
the `instance_map` config).
|
||||
"""
|
||||
clock = hs.get_clock()
|
||||
client = hs.get_simple_http_client()
|
||||
client = hs.get_replication_client()
|
||||
local_instance_name = hs.get_instance_name()
|
||||
|
||||
# The value of these option should match the replication listener settings
|
||||
|
|
|
@ -107,7 +107,11 @@ from synapse.handlers.stats import StatsHandler
|
|||
from synapse.handlers.sync import SyncHandler
|
||||
from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
|
||||
from synapse.handlers.user_directory import UserDirectoryHandler
|
||||
from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
|
||||
from synapse.http.client import (
|
||||
InsecureInterceptableContextFactory,
|
||||
ReplicationClient,
|
||||
SimpleHttpClient,
|
||||
)
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.media.media_repository import MediaRepository
|
||||
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
|
||||
|
@ -471,6 +475,13 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||
)
|
||||
return MatrixFederationHttpClient(self, tls_client_options_factory)
|
||||
|
||||
@cache_in_self
|
||||
def get_replication_client(self) -> ReplicationClient:
|
||||
"""
|
||||
An HTTP client for HTTP replication.
|
||||
"""
|
||||
return ReplicationClient(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_room_creation_handler(self) -> RoomCreationHandler:
|
||||
return RoomCreationHandler(self)
|
||||
|
|
|
@ -228,6 +228,7 @@ class StateTestCase(unittest.TestCase):
|
|||
"get_macaroon_generator",
|
||||
"get_instance_name",
|
||||
"get_simple_http_client",
|
||||
"get_replication_client",
|
||||
"hostname",
|
||||
]
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue