Merge pull request #5333 from matrix-org/rav/server_keys/09_improve_notary_server
Fixes for the key-notary server
This commit is contained in:
commit
cb683d3e3c
|
@ -0,0 +1 @@
|
||||||
|
Fix various problems which made the signing-key notary server time out for some requests.
|
|
@ -46,6 +46,7 @@ from synapse.api.errors import (
|
||||||
)
|
)
|
||||||
from synapse.storage.keys import FetchKeyResult
|
from synapse.storage.keys import FetchKeyResult
|
||||||
from synapse.util import logcontext, unwrapFirstError
|
from synapse.util import logcontext, unwrapFirstError
|
||||||
|
from synapse.util.async_helpers import yieldable_gather_results
|
||||||
from synapse.util.logcontext import (
|
from synapse.util.logcontext import (
|
||||||
LoggingContext,
|
LoggingContext,
|
||||||
PreserveLoggingContext,
|
PreserveLoggingContext,
|
||||||
|
@ -169,7 +170,12 @@ class Keyring(object):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("Verifying for %s with key_ids %s", server_name, key_ids)
|
logger.debug(
|
||||||
|
"Verifying for %s with key_ids %s, min_validity %i",
|
||||||
|
server_name,
|
||||||
|
key_ids,
|
||||||
|
validity_time,
|
||||||
|
)
|
||||||
|
|
||||||
# add the key request to the queue, but don't start it off yet.
|
# add the key request to the queue, but don't start it off yet.
|
||||||
verify_request = VerifyKeyRequest(
|
verify_request = VerifyKeyRequest(
|
||||||
|
@ -744,34 +750,50 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.client = hs.get_http_client()
|
self.client = hs.get_http_client()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_keys(self, keys_to_fetch):
|
def get_keys(self, keys_to_fetch):
|
||||||
"""see KeyFetcher.get_keys"""
|
"""
|
||||||
# TODO make this more resilient
|
Args:
|
||||||
results = yield logcontext.make_deferred_yieldable(
|
keys_to_fetch (dict[str, iterable[str]]):
|
||||||
defer.gatherResults(
|
the keys to be fetched. server_name -> key_ids
|
||||||
[
|
|
||||||
run_in_background(
|
|
||||||
self.get_server_verify_key_v2_direct,
|
|
||||||
server_name,
|
|
||||||
server_keys.keys(),
|
|
||||||
)
|
|
||||||
for server_name, server_keys in keys_to_fetch.items()
|
|
||||||
],
|
|
||||||
consumeErrors=True,
|
|
||||||
).addErrback(unwrapFirstError)
|
|
||||||
)
|
|
||||||
|
|
||||||
merged = {}
|
Returns:
|
||||||
for result in results:
|
Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]:
|
||||||
merged.update(result)
|
map from server_name -> key_id -> FetchKeyResult
|
||||||
|
"""
|
||||||
|
|
||||||
defer.returnValue(
|
results = {}
|
||||||
{server_name: keys for server_name, keys in merged.items() if keys}
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_key(key_to_fetch_item):
|
||||||
|
server_name, key_ids = key_to_fetch_item
|
||||||
|
try:
|
||||||
|
keys = yield self.get_server_verify_key_v2_direct(server_name, key_ids)
|
||||||
|
results[server_name] = keys
|
||||||
|
except KeyLookupError as e:
|
||||||
|
logger.warning(
|
||||||
|
"Error looking up keys %s from %s: %s", key_ids, server_name, e
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error getting keys %s from %s", key_ids, server_name)
|
||||||
|
|
||||||
|
return yieldable_gather_results(get_key, keys_to_fetch.items()).addCallback(
|
||||||
|
lambda _: results
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_server_verify_key_v2_direct(self, server_name, key_ids):
|
def get_server_verify_key_v2_direct(self, server_name, key_ids):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
server_name (str):
|
||||||
|
key_ids (iterable[str]):
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[dict[str, FetchKeyResult]]: map from key ID to lookup result
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
KeyLookupError if there was a problem making the lookup
|
||||||
|
"""
|
||||||
keys = {} # type: dict[str, FetchKeyResult]
|
keys = {} # type: dict[str, FetchKeyResult]
|
||||||
|
|
||||||
for requested_key_id in key_ids:
|
for requested_key_id in key_ids:
|
||||||
|
@ -786,6 +808,19 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||||
path="/_matrix/key/v2/server/"
|
path="/_matrix/key/v2/server/"
|
||||||
+ urllib.parse.quote(requested_key_id),
|
+ urllib.parse.quote(requested_key_id),
|
||||||
ignore_backoff=True,
|
ignore_backoff=True,
|
||||||
|
|
||||||
|
# we only give the remote server 10s to respond. It should be an
|
||||||
|
# easy request to handle, so if it doesn't reply within 10s, it's
|
||||||
|
# probably not going to.
|
||||||
|
#
|
||||||
|
# Furthermore, when we are acting as a notary server, we cannot
|
||||||
|
# wait all day for all of the origin servers, as the requesting
|
||||||
|
# server will otherwise time out before we can respond.
|
||||||
|
#
|
||||||
|
# (Note that get_json may make 4 attempts, so this can still take
|
||||||
|
# almost 45 seconds to fetch the headers, plus up to another 60s to
|
||||||
|
# read the response).
|
||||||
|
timeout=10000,
|
||||||
)
|
)
|
||||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||||
raise_from(KeyLookupError("Failed to connect to remote server"), e)
|
raise_from(KeyLookupError("Failed to connect to remote server"), e)
|
||||||
|
@ -810,7 +845,7 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||||
)
|
)
|
||||||
keys.update(response_keys)
|
keys.update(response_keys)
|
||||||
|
|
||||||
defer.returnValue({server_name: keys})
|
defer.returnValue(keys)
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
|
@ -20,7 +20,7 @@ from twisted.web.resource import Resource
|
||||||
from twisted.web.server import NOT_DONE_YET
|
from twisted.web.server import NOT_DONE_YET
|
||||||
|
|
||||||
from synapse.api.errors import Codes, SynapseError
|
from synapse.api.errors import Codes, SynapseError
|
||||||
from synapse.crypto.keyring import KeyLookupError, ServerKeyFetcher
|
from synapse.crypto.keyring import ServerKeyFetcher
|
||||||
from synapse.http.server import respond_with_json_bytes, wrap_json_request_handler
|
from synapse.http.server import respond_with_json_bytes, wrap_json_request_handler
|
||||||
from synapse.http.servlet import parse_integer, parse_json_object_from_request
|
from synapse.http.servlet import parse_integer, parse_json_object_from_request
|
||||||
|
|
||||||
|
@ -215,15 +215,7 @@ class RemoteKey(Resource):
|
||||||
json_results.add(bytes(result["key_json"]))
|
json_results.add(bytes(result["key_json"]))
|
||||||
|
|
||||||
if cache_misses and query_remote_on_cache_miss:
|
if cache_misses and query_remote_on_cache_miss:
|
||||||
for server_name, key_ids in cache_misses.items():
|
yield self.fetcher.get_keys(cache_misses)
|
||||||
try:
|
|
||||||
yield self.fetcher.get_server_verify_key_v2_direct(
|
|
||||||
server_name, key_ids
|
|
||||||
)
|
|
||||||
except KeyLookupError as e:
|
|
||||||
logger.info("Failed to fetch key: %s", e)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Failed to get key for %r", server_name)
|
|
||||||
yield self.query_keys(
|
yield self.query_keys(
|
||||||
request, query, query_remote_on_cache_miss=False
|
request, query, query_remote_on_cache_miss=False
|
||||||
)
|
)
|
||||||
|
|
|
@ -25,11 +25,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.crypto import keyring
|
from synapse.crypto import keyring
|
||||||
from synapse.crypto.keyring import (
|
from synapse.crypto.keyring import PerspectivesKeyFetcher, ServerKeyFetcher
|
||||||
KeyLookupError,
|
|
||||||
PerspectivesKeyFetcher,
|
|
||||||
ServerKeyFetcher,
|
|
||||||
)
|
|
||||||
from synapse.storage.keys import FetchKeyResult
|
from synapse.storage.keys import FetchKeyResult
|
||||||
from synapse.util import logcontext
|
from synapse.util import logcontext
|
||||||
from synapse.util.logcontext import LoggingContext
|
from synapse.util.logcontext import LoggingContext
|
||||||
|
@ -364,9 +360,11 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||||
bytes(res["key_json"]), canonicaljson.encode_canonical_json(response)
|
bytes(res["key_json"]), canonicaljson.encode_canonical_json(response)
|
||||||
)
|
)
|
||||||
|
|
||||||
# change the server name: it should cause a rejection
|
# change the server name: the result should be ignored
|
||||||
response["server_name"] = "OTHER_SERVER"
|
response["server_name"] = "OTHER_SERVER"
|
||||||
self.get_failure(fetcher.get_keys(keys_to_fetch), KeyLookupError)
|
|
||||||
|
keys = self.get_success(fetcher.get_keys(keys_to_fetch))
|
||||||
|
self.assertEqual(keys, {})
|
||||||
|
|
||||||
|
|
||||||
class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
Loading…
Reference in New Issue