Try to only back off if we think we failed to connect to the remote
This commit is contained in:
parent
c8436b38a0
commit
9371019133
|
@ -101,63 +101,63 @@ class Keyring(object):
|
||||||
server_name, self.hs.tls_context_factory
|
server_name, self.hs.tls_context_factory
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check the response.
|
# Check the response.
|
||||||
|
|
||||||
x509_certificate_bytes = crypto.dump_certificate(
|
x509_certificate_bytes = crypto.dump_certificate(
|
||||||
crypto.FILETYPE_ASN1, tls_certificate
|
crypto.FILETYPE_ASN1, tls_certificate
|
||||||
)
|
)
|
||||||
|
|
||||||
if ("signatures" not in response
|
if ("signatures" not in response
|
||||||
or server_name not in response["signatures"]):
|
or server_name not in response["signatures"]):
|
||||||
raise ValueError("Key response not signed by remote server")
|
raise ValueError("Key response not signed by remote server")
|
||||||
|
|
||||||
if "tls_certificate" not in response:
|
if "tls_certificate" not in response:
|
||||||
raise ValueError("Key response missing TLS certificate")
|
raise ValueError("Key response missing TLS certificate")
|
||||||
|
|
||||||
tls_certificate_b64 = response["tls_certificate"]
|
tls_certificate_b64 = response["tls_certificate"]
|
||||||
|
|
||||||
if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
|
if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
|
||||||
raise ValueError("TLS certificate doesn't match")
|
raise ValueError("TLS certificate doesn't match")
|
||||||
|
|
||||||
verify_keys = {}
|
verify_keys = {}
|
||||||
for key_id, key_base64 in response["verify_keys"].items():
|
for key_id, key_base64 in response["verify_keys"].items():
|
||||||
if is_signing_algorithm_supported(key_id):
|
if is_signing_algorithm_supported(key_id):
|
||||||
key_bytes = decode_base64(key_base64)
|
key_bytes = decode_base64(key_base64)
|
||||||
verify_key = decode_verify_key_bytes(key_id, key_bytes)
|
verify_key = decode_verify_key_bytes(key_id, key_bytes)
|
||||||
verify_keys[key_id] = verify_key
|
verify_keys[key_id] = verify_key
|
||||||
|
|
||||||
for key_id in response["signatures"][server_name]:
|
for key_id in response["signatures"][server_name]:
|
||||||
if key_id not in response["verify_keys"]:
|
if key_id not in response["verify_keys"]:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Key response must include verification keys for all"
|
"Key response must include verification keys for all"
|
||||||
" signatures"
|
" signatures"
|
||||||
)
|
)
|
||||||
if key_id in verify_keys:
|
if key_id in verify_keys:
|
||||||
verify_signed_json(
|
verify_signed_json(
|
||||||
response,
|
response,
|
||||||
server_name,
|
server_name,
|
||||||
verify_keys[key_id]
|
verify_keys[key_id]
|
||||||
)
|
|
||||||
|
|
||||||
# Cache the result in the datastore.
|
|
||||||
|
|
||||||
time_now_ms = self.clock.time_msec()
|
|
||||||
|
|
||||||
yield self.store.store_server_certificate(
|
|
||||||
server_name,
|
|
||||||
server_name,
|
|
||||||
time_now_ms,
|
|
||||||
tls_certificate,
|
|
||||||
)
|
|
||||||
|
|
||||||
for key_id, key in verify_keys.items():
|
|
||||||
yield self.store.store_server_verify_key(
|
|
||||||
server_name, server_name, time_now_ms, key
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for key_id in key_ids:
|
# Cache the result in the datastore.
|
||||||
if key_id in verify_keys:
|
|
||||||
defer.returnValue(verify_keys[key_id])
|
|
||||||
return
|
|
||||||
|
|
||||||
raise ValueError("No verification key found for given key ids")
|
time_now_ms = self.clock.time_msec()
|
||||||
|
|
||||||
|
yield self.store.store_server_certificate(
|
||||||
|
server_name,
|
||||||
|
server_name,
|
||||||
|
time_now_ms,
|
||||||
|
tls_certificate,
|
||||||
|
)
|
||||||
|
|
||||||
|
for key_id, key in verify_keys.items():
|
||||||
|
yield self.store.store_server_verify_key(
|
||||||
|
server_name, server_name, time_now_ms, key
|
||||||
|
)
|
||||||
|
|
||||||
|
for key_id in key_ids:
|
||||||
|
if key_id in verify_keys:
|
||||||
|
defer.returnValue(verify_keys[key_id])
|
||||||
|
return
|
||||||
|
|
||||||
|
raise ValueError("No verification key found for given key ids")
|
||||||
|
|
|
@ -167,15 +167,6 @@ class TransactionQueue(object):
|
||||||
logger.info("TX [%s] Nothing to send", destination)
|
logger.info("TX [%s] Nothing to send", destination)
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
"TX [%s] Attempting new transaction"
|
|
||||||
" (pdus: %d, edus: %d, failures: %d)",
|
|
||||||
destination,
|
|
||||||
len(pending_pdus),
|
|
||||||
len(pending_edus),
|
|
||||||
len(pending_failures)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Sort based on the order field
|
# Sort based on the order field
|
||||||
pending_pdus.sort(key=lambda t: t[2])
|
pending_pdus.sort(key=lambda t: t[2])
|
||||||
|
|
||||||
|
@ -194,32 +185,41 @@ class TransactionQueue(object):
|
||||||
self.store,
|
self.store,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"TX [%s] Attempting new transaction"
|
||||||
|
" (pdus: %d, edus: %d, failures: %d)",
|
||||||
|
destination,
|
||||||
|
len(pending_pdus),
|
||||||
|
len(pending_edus),
|
||||||
|
len(pending_failures)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.pending_transactions[destination] = 1
|
||||||
|
|
||||||
|
logger.debug("TX [%s] Persisting transaction...", destination)
|
||||||
|
|
||||||
|
transaction = Transaction.create_new(
|
||||||
|
origin_server_ts=int(self._clock.time_msec()),
|
||||||
|
transaction_id=str(self._next_txn_id),
|
||||||
|
origin=self.server_name,
|
||||||
|
destination=destination,
|
||||||
|
pdus=pdus,
|
||||||
|
edus=edus,
|
||||||
|
pdu_failures=failures,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._next_txn_id += 1
|
||||||
|
|
||||||
|
yield self.transaction_actions.prepare_to_send(transaction)
|
||||||
|
|
||||||
|
logger.debug("TX [%s] Persisted transaction", destination)
|
||||||
|
logger.info(
|
||||||
|
"TX [%s] Sending transaction [%s]",
|
||||||
|
destination,
|
||||||
|
transaction.transaction_id,
|
||||||
|
)
|
||||||
|
|
||||||
with limiter:
|
with limiter:
|
||||||
self.pending_transactions[destination] = 1
|
|
||||||
|
|
||||||
logger.debug("TX [%s] Persisting transaction...", destination)
|
|
||||||
|
|
||||||
transaction = Transaction.create_new(
|
|
||||||
origin_server_ts=int(self._clock.time_msec()),
|
|
||||||
transaction_id=str(self._next_txn_id),
|
|
||||||
origin=self.server_name,
|
|
||||||
destination=destination,
|
|
||||||
pdus=pdus,
|
|
||||||
edus=edus,
|
|
||||||
pdu_failures=failures,
|
|
||||||
)
|
|
||||||
|
|
||||||
self._next_txn_id += 1
|
|
||||||
|
|
||||||
yield self.transaction_actions.prepare_to_send(transaction)
|
|
||||||
|
|
||||||
logger.debug("TX [%s] Persisted transaction", destination)
|
|
||||||
logger.info(
|
|
||||||
"TX [%s] Sending transaction [%s]",
|
|
||||||
destination,
|
|
||||||
transaction.transaction_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Actually send the transaction
|
# Actually send the transaction
|
||||||
|
|
||||||
# FIXME (erikj): This is a bit of a hack to make the Pdu age
|
# FIXME (erikj): This is a bit of a hack to make the Pdu age
|
||||||
|
@ -249,11 +249,11 @@ class TransactionQueue(object):
|
||||||
logger.debug("TX [%s] Sent transaction", destination)
|
logger.debug("TX [%s] Sent transaction", destination)
|
||||||
logger.debug("TX [%s] Marking as delivered...", destination)
|
logger.debug("TX [%s] Marking as delivered...", destination)
|
||||||
|
|
||||||
yield self.transaction_actions.delivered(
|
yield self.transaction_actions.delivered(
|
||||||
transaction, code, response
|
transaction, code, response
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("TX [%s] Marked as delivered", destination)
|
logger.debug("TX [%s] Marked as delivered", destination)
|
||||||
|
|
||||||
logger.debug("TX [%s] Yielding to callbacks...", destination)
|
logger.debug("TX [%s] Yielding to callbacks...", destination)
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.api.errors import CodeMessageException
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
@ -67,7 +69,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
|
||||||
class RetryDestinationLimiter(object):
|
class RetryDestinationLimiter(object):
|
||||||
def __init__(self, destination, clock, store, retry_interval,
|
def __init__(self, destination, clock, store, retry_interval,
|
||||||
min_retry_interval=20000, max_retry_interval=60 * 60 * 1000,
|
min_retry_interval=20000, max_retry_interval=60 * 60 * 1000,
|
||||||
multiplier_retry_interval=2):
|
multiplier_retry_interval=2,):
|
||||||
self.clock = clock
|
self.clock = clock
|
||||||
self.store = store
|
self.store = store
|
||||||
self.destination = destination
|
self.destination = destination
|
||||||
|
@ -87,7 +89,11 @@ class RetryDestinationLimiter(object):
|
||||||
failure.value
|
failure.value
|
||||||
)
|
)
|
||||||
|
|
||||||
if exc_type is None and exc_val is None and exc_tb is None:
|
valid_err_code = False
|
||||||
|
if exc_type is CodeMessageException:
|
||||||
|
valid_err_code = 0 <= exc_val.code < 500
|
||||||
|
|
||||||
|
if exc_type is None or valid_err_code:
|
||||||
# We connected successfully.
|
# We connected successfully.
|
||||||
if not self.retry_interval:
|
if not self.retry_interval:
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue