synapse-old/synapse/http/client.py

575 lines
20 KiB
Python
Raw Normal View History

2014-08-12 08:10:52 -06:00
# -*- coding: utf-8 -*-
2016-01-06 21:26:29 -07:00
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
2014-08-12 08:10:52 -06:00
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2018-07-09 00:09:20 -06:00
import logging
import urllib
2014-08-12 08:10:52 -06:00
2018-07-09 00:09:20 -06:00
from six import StringIO
2018-07-09 00:09:20 -06:00
from canonicaljson import encode_canonical_json, json
from prometheus_client import Counter
2018-07-09 00:09:20 -06:00
from OpenSSL import SSL
from OpenSSL.SSL import VERIFY_NONE
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
2018-07-09 00:09:20 -06:00
from twisted.web._newclient import ResponseDone
2014-10-29 19:21:33 -06:00
from twisted.web.client import (
Agent,
BrowserLikeRedirectAgent,
ContentDecoderAgent,
FileBodyProducer as TwistedFileBodyProducer,
2018-07-09 00:09:20 -06:00
GzipDecoder,
HTTPConnectionPool,
2018-07-09 00:09:20 -06:00
PartialDownloadError,
readBody,
2014-10-29 19:21:33 -06:00
)
from twisted.web.http import PotentialDataLoss
2014-08-12 08:10:52 -06:00
from twisted.web.http_headers import Headers
2018-07-09 00:09:20 -06:00
from synapse.api.errors import (
CodeMessageException,
Codes,
MatrixCodeMessageException,
SynapseError,
)
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
2014-08-12 08:10:52 -06:00
logger = logging.getLogger(__name__)
2018-05-21 18:47:37 -06:00
outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
2018-05-22 16:32:57 -06:00
incoming_responses_counter = Counter("synapse_http_client_responses", "",
["method", "code"])
2014-08-12 08:10:52 -06:00
class SimpleHttpClient(object):
"""
2014-11-20 11:00:10 -07:00
A simple, no-frills HTTP client with methods that wrap up common ways of
using HTTP in Matrix
"""
def __init__(self, hs):
self.hs = hs
pool = HTTPConnectionPool(reactor)
2018-01-29 07:30:15 -07:00
# the pusher makes lots of concurrent SSL connections to sygnal, and
# tends to do so in batches, so we need to allow the pool to keep lots
# of idle connections around.
pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
pool.cachedConnectionTimeout = 2 * 60
# The default context factory in Twisted 14.0.0 (which we require) is
2014-11-20 11:00:10 -07:00
# BrowserLikePolicyForHTTPS which will do regular cert validation
# 'like a browser'
self.agent = Agent(
reactor,
connectTimeout=15,
contextFactory=hs.get_http_client_context_factory(),
pool=pool,
)
2015-10-05 19:49:39 -06:00
self.user_agent = hs.version_string
2017-05-05 03:52:46 -06:00
self.clock = hs.get_clock()
2015-10-05 19:49:39 -06:00
if hs.config.user_agent_suffix:
2015-10-06 08:53:33 -06:00
self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,)
@defer.inlineCallbacks
def request(self, method, uri, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
2018-05-21 18:47:37 -06:00
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = self.agent.request(
method, uri, *args, **kwargs
)
add_timeout_to_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)
2018-05-21 18:47:37 -06:00
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
2018-06-06 03:25:48 -06:00
method, redact_uri(uri), response.code
)
2017-05-05 04:00:21 -06:00
defer.returnValue(response)
except Exception as e:
2018-05-21 18:47:37 -06:00
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
2018-06-06 03:25:48 -06:00
method, redact_uri(uri), type(e).__name__, e.message
)
raise
2014-11-20 06:53:34 -07:00
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):
2017-10-27 03:59:50 -06:00
"""
Args:
uri (str):
args (dict[str, str|List[str]]): query params
headers (dict[str, List[str]]|None): If not None, a map from
header name to a list of values for that header
Returns:
Deferred[object]: parsed json
"""
# TODO: Do we ever want to log message contents?
2014-11-20 06:53:34 -07:00
logger.debug("post_urlencoded_get_json args: %s", args)
query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
2014-11-20 06:53:34 -07:00
actual_headers = {
b"Content-Type": [b"application/x-www-form-urlencoded"],
b"User-Agent": [self.user_agent],
}
if headers:
actual_headers.update(headers)
response = yield self.request(
2014-11-20 06:53:34 -07:00
"POST",
uri.encode("ascii"),
headers=Headers(actual_headers),
bodyProducer=FileBodyProducer(StringIO(query_bytes))
2014-11-20 06:53:34 -07:00
)
body = yield make_deferred_yieldable(readBody(response))
2014-11-20 06:53:34 -07:00
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def post_json_get_json(self, uri, post_json, headers=None):
"""
Args:
uri (str):
post_json (object):
headers (dict[str, List[str]]|None): If not None, a map from
header name to a list of values for that header
Returns:
Deferred[object]: parsed json
"""
json_str = encode_canonical_json(post_json)
logger.debug("HTTP POST %s -> %s", json_str, uri)
actual_headers = {
b"Content-Type": [b"application/json"],
b"User-Agent": [self.user_agent],
}
if headers:
actual_headers.update(headers)
response = yield self.request(
"POST",
uri.encode("ascii"),
headers=Headers(actual_headers),
bodyProducer=FileBodyProducer(StringIO(json_str))
)
body = yield make_deferred_yieldable(readBody(response))
if 200 <= response.code < 300:
defer.returnValue(json.loads(body))
else:
raise self._exceptionFromFailedRequest(response, body)
defer.returnValue(json.loads(body))
2014-11-20 06:53:34 -07:00
@defer.inlineCallbacks
def get_json(self, uri, args={}, headers=None):
""" Gets some json from the given URI.
2014-11-20 06:53:34 -07:00
Args:
uri (str): The URI to request, not including query parameters
2014-11-20 06:53:34 -07:00
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
and *not* a string.
headers (dict[str, List[str]]|None): If not None, a map from
header name to a list of values for that header
2014-11-20 06:53:34 -07:00
Returns:
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
HTTP body as JSON.
Raises:
2015-02-11 09:41:16 -07:00
On a non-2xx HTTP response. The response body will be used as the
error message.
2014-11-20 06:53:34 -07:00
"""
2017-04-26 03:07:01 -06:00
try:
body = yield self.get_raw(uri, args, headers=headers)
defer.returnValue(json.loads(body))
2017-04-26 03:07:01 -06:00
except CodeMessageException as e:
raise self._exceptionFromFailedRequest(e.code, e.msg)
@defer.inlineCallbacks
def put_json(self, uri, json_body, args={}, headers=None):
""" Puts some json to the given URI.
Args:
uri (str): The URI to request, not including query parameters
json_body (dict): The JSON to put in the HTTP body,
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
and *not* a string.
headers (dict[str, List[str]]|None): If not None, a map from
header name to a list of values for that header
Returns:
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
HTTP body as JSON.
Raises:
On a non-2xx HTTP response.
"""
if len(args):
query_bytes = urllib.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
2014-11-20 06:53:34 -07:00
json_str = encode_canonical_json(json_body)
actual_headers = {
b"Content-Type": [b"application/json"],
b"User-Agent": [self.user_agent],
}
if headers:
actual_headers.update(headers)
response = yield self.request(
"PUT",
uri.encode("ascii"),
headers=Headers(actual_headers),
bodyProducer=FileBodyProducer(StringIO(json_str))
2014-11-20 06:53:34 -07:00
)
body = yield make_deferred_yieldable(readBody(response))
2014-11-20 06:53:34 -07:00
if 200 <= response.code < 300:
defer.returnValue(json.loads(body))
else:
# NB: This is explicitly not json.loads(body)'d because the contract
# of CodeMessageException is a *string* message. Callers can always
# load it into JSON if they want.
raise CodeMessageException(response.code, body)
2014-11-20 06:53:34 -07:00
@defer.inlineCallbacks
def get_raw(self, uri, args={}, headers=None):
""" Gets raw text from the given URI.
Args:
uri (str): The URI to request, not including query parameters
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
and *not* a string.
headers (dict[str, List[str]]|None): If not None, a map from
header name to a list of values for that header
Returns:
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
HTTP body at text.
Raises:
On a non-2xx HTTP response. The response body will be used as the
error message.
"""
if len(args):
query_bytes = urllib.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
actual_headers = {
b"User-Agent": [self.user_agent],
}
if headers:
actual_headers.update(headers)
response = yield self.request(
"GET",
uri.encode("ascii"),
headers=Headers(actual_headers),
)
body = yield make_deferred_yieldable(readBody(response))
if 200 <= response.code < 300:
defer.returnValue(body)
else:
raise CodeMessageException(response.code, body)
2014-11-20 06:53:34 -07:00
def _exceptionFromFailedRequest(self, response, body):
try:
jsonBody = json.loads(body)
errcode = jsonBody['errcode']
error = jsonBody['error']
return MatrixCodeMessageException(response.code, error, errcode)
2017-05-03 04:02:59 -06:00
except (ValueError, KeyError):
return CodeMessageException(response.code, body)
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
# The two should be factored out.
@defer.inlineCallbacks
def get_file(self, url, output_stream, max_size=None, headers=None):
"""GETs a file from a given URL
Args:
url (str): The URL to GET
output_stream (file): File to write the response body to.
headers (dict[str, List[str]]|None): If not None, a map from
header name to a list of values for that header
Returns:
A (int,dict,string,int) tuple of the file length, dict of the response
headers, absolute URI of the response and HTTP response code.
"""
actual_headers = {
b"User-Agent": [self.user_agent],
}
if headers:
actual_headers.update(headers)
response = yield self.request(
"GET",
url.encode("ascii"),
headers=Headers(actual_headers),
)
resp_headers = dict(response.headers.getAllRawHeaders())
if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size:
logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
2016-04-08 14:36:48 -06:00
raise SynapseError(
502,
"Requested file is too large > %r bytes" % (self.max_size,),
Codes.TOO_LARGE,
)
if response.code > 299:
logger.warn("Got %d when downloading %s" % (response.code, url))
2016-04-08 14:36:48 -06:00
raise SynapseError(
502,
"Got error %d" % (response.code,),
Codes.UNKNOWN,
)
# TODO: if our Content-Type is HTML or something, just read the first
# N bytes into RAM rather than saving it all to disk only to read it
# straight back in again
try:
length = yield make_deferred_yieldable(_readBodyToFile(
response, output_stream, max_size,
))
2016-04-08 14:36:48 -06:00
except Exception as e:
logger.exception("Failed to download body")
2016-04-08 14:36:48 -06:00
raise SynapseError(
502,
("Failed to download remote body: %s" % e),
Codes.UNKNOWN,
)
defer.returnValue(
(length, resp_headers, response.request.absoluteURI, response.code),
)
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
# The two should be factored out.
class _ReadBodyToFileProtocol(protocol.Protocol):
def __init__(self, stream, deferred, max_size):
self.stream = stream
self.deferred = deferred
self.length = 0
self.max_size = max_size
def dataReceived(self, data):
self.stream.write(data)
self.length += len(data)
if self.max_size is not None and self.length >= self.max_size:
self.deferred.errback(SynapseError(
502,
"Requested file is too large > %r bytes" % (self.max_size,),
Codes.TOO_LARGE,
))
self.deferred = defer.Deferred()
self.transport.loseConnection()
def connectionLost(self, reason):
if reason.check(ResponseDone):
self.deferred.callback(self.length)
elif reason.check(PotentialDataLoss):
# stolen from https://github.com/twisted/treq/pull/49/files
# http://twistedmatrix.com/trac/ticket/4840
self.deferred.callback(self.length)
else:
self.deferred.errback(reason)
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
# The two should be factored out.
def _readBodyToFile(response, stream, max_size):
d = defer.Deferred()
response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
return d
2014-11-20 06:53:34 -07:00
2016-04-03 05:56:29 -06:00
class CaptchaServerHttpClient(SimpleHttpClient):
2014-11-20 06:53:34 -07:00
"""
Separate HTTP client for talking to google's captcha servers
Only slightly special because accepts partial download responses
used only by c/s api v1
2014-11-20 06:53:34 -07:00
"""
@defer.inlineCallbacks
def post_urlencoded_get_raw(self, url, args={}):
query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
response = yield self.request(
"POST",
url.encode("ascii"),
bodyProducer=FileBodyProducer(StringIO(query_bytes)),
headers=Headers({
b"Content-Type": [b"application/x-www-form-urlencoded"],
b"User-Agent": [self.user_agent],
})
)
try:
body = yield make_deferred_yieldable(readBody(response))
defer.returnValue(body)
except PartialDownloadError as e:
# twisted dislikes google's response, no content length.
2014-11-20 06:53:34 -07:00
defer.returnValue(e.response)
2014-08-12 08:10:52 -06:00
2016-04-03 05:56:29 -06:00
class SpiderEndpointFactory(object):
def __init__(self, hs):
self.blacklist = hs.config.url_preview_ip_range_blacklist
self.whitelist = hs.config.url_preview_ip_range_whitelist
self.policyForHTTPS = hs.get_http_client_context_factory()
def endpointForURI(self, uri):
logger.info("Getting endpoint for %s", uri.toBytes())
if uri.scheme == "http":
endpoint_factory = HostnameEndpoint
elif uri.scheme == "https":
tlsCreator = self.policyForHTTPS.creatorForNetloc(uri.host, uri.port)
def endpoint_factory(reactor, host, port, **kw):
return wrapClientTLS(
tlsCreator,
HostnameEndpoint(reactor, host, port, **kw))
else:
logger.warn("Can't get endpoint for unrecognised scheme %s", uri.scheme)
return None
return SpiderEndpoint(
reactor, uri.host, uri.port, self.blacklist, self.whitelist,
endpoint=endpoint_factory, endpoint_kw_args=dict(timeout=15),
)
class SpiderHttpClient(SimpleHttpClient):
"""
Separate HTTP client for spidering arbitrary URLs.
Special in that it follows retries and has a UA that looks
like a browser.
used by the preview_url endpoint in the content repo.
"""
def __init__(self, hs):
SimpleHttpClient.__init__(self, hs)
# clobber the base class's agent and UA:
self.agent = ContentDecoderAgent(
BrowserLikeRedirectAgent(
Agent.usingEndpointFactory(
reactor,
SpiderEndpointFactory(hs)
)
2018-04-28 03:39:37 -06:00
), [(b'gzip', GzipDecoder)]
)
2016-04-03 05:56:29 -06:00
# We could look like Chrome:
# self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko)
# Chrome Safari" % hs.version_string)
2014-10-30 05:10:17 -06:00
def encode_urlencode_args(args):
2016-03-08 03:09:07 -07:00
return {k: encode_urlencode_arg(v) for k, v in args.items()}
def encode_urlencode_arg(arg):
if isinstance(arg, unicode):
return arg.encode('utf-8')
elif isinstance(arg, list):
2016-03-08 03:09:07 -07:00
return [encode_urlencode_arg(i) for i in arg]
else:
return arg
2014-10-30 05:10:17 -06:00
2016-03-08 03:09:07 -07:00
2014-08-12 08:10:52 -06:00
def _print_ex(e):
if hasattr(e, "reasons") and e.reasons:
for ex in e.reasons:
_print_ex(ex)
else:
logger.exception(e)
class InsecureInterceptableContextFactory(ssl.ContextFactory):
"""
Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain.
Do not use this since it allows an attacker to intercept your communications.
2015-09-09 06:05:00 -06:00
"""
def __init__(self):
self._context = SSL.Context(SSL.SSLv23_METHOD)
self._context.set_verify(VERIFY_NONE, lambda *_: None)
def getContext(self, hostname=None, port=None):
return self._context
def creatorForNetloc(self, hostname, port):
return self
class FileBodyProducer(TwistedFileBodyProducer):
"""Workaround for https://twistedmatrix.com/trac/ticket/8473
We override the pauseProducing and resumeProducing methods in twisted's
FileBodyProducer so that they do not raise exceptions if the task has
already completed.
"""
def pauseProducing(self):
try:
super(FileBodyProducer, self).pauseProducing()
except task.TaskDone:
# task has already completed
pass
def resumeProducing(self):
try:
super(FileBodyProducer, self).resumeProducing()
except task.NotPaused:
# task was not paused (probably because it had already completed)
pass