commit
fe672a04f7
|
@ -114,7 +114,15 @@ class FederationServer(FederationBase):
|
|||
with PreserveLoggingContext():
|
||||
dl = []
|
||||
for pdu in pdu_list:
|
||||
dl.append(self._handle_new_pdu(transaction.origin, pdu))
|
||||
d = self._handle_new_pdu(transaction.origin, pdu)
|
||||
|
||||
def handle_failure(failure):
|
||||
failure.trap(FederationError)
|
||||
self.send_failure(failure.value, transaction.origin)
|
||||
|
||||
d.addErrback(handle_failure)
|
||||
|
||||
dl.append(d)
|
||||
|
||||
if hasattr(transaction, "edus"):
|
||||
for edu in [Edu(**x) for x in transaction.edus]:
|
||||
|
@ -124,6 +132,9 @@ class FederationServer(FederationBase):
|
|||
edu.content
|
||||
)
|
||||
|
||||
for failure in getattr(transaction, "pdu_failures", []):
|
||||
logger.info("Got failure %r", failure)
|
||||
|
||||
results = yield defer.DeferredList(dl, consumeErrors=True)
|
||||
|
||||
ret = []
|
||||
|
@ -132,10 +143,16 @@ class FederationServer(FederationBase):
|
|||
ret.append({})
|
||||
else:
|
||||
logger.exception(r[1])
|
||||
ret.append({"error": str(r[1])})
|
||||
ret.append({"error": str(r[1].value)})
|
||||
|
||||
logger.debug("Returning: %s", str(ret))
|
||||
|
||||
response = {
|
||||
"pdus": dict(zip(
|
||||
(p.event_id for p in pdu_list), ret
|
||||
)),
|
||||
}
|
||||
|
||||
yield self.transaction_actions.set_response(
|
||||
transaction,
|
||||
200, response
|
||||
|
|
|
@ -91,7 +91,7 @@ class TransactionQueue(object):
|
|||
if not deferred.called:
|
||||
deferred.errback(failure)
|
||||
else:
|
||||
logger.warn("Failed to send pdu", failure)
|
||||
logger.warn("Failed to send pdu", failure.value)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self._attempt_new_transaction(destination).addErrback(eb)
|
||||
|
@ -116,7 +116,7 @@ class TransactionQueue(object):
|
|||
if not deferred.called:
|
||||
deferred.errback(failure)
|
||||
else:
|
||||
logger.warn("Failed to send edu", failure)
|
||||
logger.warn("Failed to send edu", failure.value)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self._attempt_new_transaction(destination).addErrback(eb)
|
||||
|
@ -133,6 +133,15 @@ class TransactionQueue(object):
|
|||
(failure, deferred)
|
||||
)
|
||||
|
||||
def eb(failure):
|
||||
if not deferred.called:
|
||||
deferred.errback(failure)
|
||||
else:
|
||||
logger.warn("Failed to send failure", failure.value)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self._attempt_new_transaction(destination).addErrback(eb)
|
||||
|
||||
yield deferred
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -249,6 +258,15 @@ class TransactionQueue(object):
|
|||
transaction, json_data_cb
|
||||
)
|
||||
code = 200
|
||||
|
||||
if response:
|
||||
for e_id, r in getattr(response, "pdus", {}).items():
|
||||
if "error" in r:
|
||||
logger.warn(
|
||||
"Transaction returned error for %s: %s",
|
||||
e_id, r,
|
||||
)
|
||||
|
||||
except HttpResponseException as e:
|
||||
code = e.code
|
||||
response = e.response
|
||||
|
|
|
@ -146,14 +146,22 @@ class MatrixFederationHttpClient(object):
|
|||
)
|
||||
raise SynapseError(400, "Domain specified not found.")
|
||||
|
||||
if hasattr(e, "reasons"):
|
||||
reasons = ", ".join(
|
||||
f.value.message
|
||||
for f in e.reasons
|
||||
)
|
||||
else:
|
||||
reasons = e.message
|
||||
|
||||
logger.warn(
|
||||
"Sending request failed to %s: %s %s : %s",
|
||||
"Sending request failed to %s: %s %s: %s - %s",
|
||||
destination,
|
||||
method,
|
||||
url_bytes,
|
||||
e
|
||||
type(e). __name__,
|
||||
reasons,
|
||||
)
|
||||
_print_ex(e)
|
||||
|
||||
if retries_left:
|
||||
yield sleep(2 ** (5 - retries_left))
|
||||
|
@ -447,14 +455,6 @@ def _readBodyToFile(response, stream, max_size):
|
|||
return d
|
||||
|
||||
|
||||
def _print_ex(e):
|
||||
if hasattr(e, "reasons") and e.reasons:
|
||||
for ex in e.reasons:
|
||||
_print_ex(ex)
|
||||
else:
|
||||
logger.warn(e)
|
||||
|
||||
|
||||
class _JsonProducer(object):
|
||||
""" Used by the twisted http client to create the HTTP body from json
|
||||
"""
|
||||
|
|
|
@ -99,8 +99,6 @@ class Clock(object):
|
|||
except:
|
||||
pass
|
||||
|
||||
return res
|
||||
|
||||
given_deferred.addCallbacks(callback=sucess, errback=err)
|
||||
|
||||
timer = self.call_later(time_out, timed_out_fn)
|
||||
|
|
Loading…
Reference in New Issue