fed server: refactor on_incoming_transaction

Move as much as possible to after the have_responded check, and reduce the
number of times we iterate over the pdu list.
This commit is contained in:
Richard van der Hoff 2017-10-06 15:18:58 +01:00
parent c7b0678356
commit ba5b9b80a5
1 changed files with 29 additions and 24 deletions

View File

@ -109,23 +109,12 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_incoming_transaction(self, transaction_data): def on_incoming_transaction(self, transaction_data):
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = int(self._clock.time_msec())
transaction = Transaction(**transaction_data) transaction = Transaction(**transaction_data)
received_pdus_counter.inc_by(len(transaction.pdus))
for p in transaction.pdus:
if "unsigned" in p:
unsigned = p["unsigned"]
if "age" in unsigned:
p["age"] = unsigned["age"]
if "age" in p:
p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
del p["age"]
pdu_list = [
self.event_from_pdu_json(p) for p in transaction.pdus
]
logger.debug("[%s] Got transaction", transaction.transaction_id) logger.debug("[%s] Got transaction", transaction.transaction_id)
response = yield self.transaction_actions.have_responded(transaction) response = yield self.transaction_actions.have_responded(transaction)
@ -140,17 +129,35 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id) logger.debug("[%s] Transaction is new", transaction.transaction_id)
results = [] received_pdus_counter.inc_by(len(transaction.pdus))
pdu_list = []
for p in transaction.pdus:
if "unsigned" in p:
unsigned = p["unsigned"]
if "age" in unsigned:
p["age"] = unsigned["age"]
if "age" in p:
p["age_ts"] = request_time - int(p["age"])
del p["age"]
event = self.event_from_pdu_json(p)
pdu_list.append(event)
pdu_results = {}
for pdu in pdu_list: for pdu in pdu_list:
event_id = pdu.event_id
try: try:
yield self._handle_received_pdu(transaction.origin, pdu) yield self._handle_received_pdu(transaction.origin, pdu)
results.append({}) pdu_results[event_id] = {}
except FederationError as e: except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
self.send_failure(e, transaction.origin) self.send_failure(e, transaction.origin)
results.append({"error": str(e)}) pdu_results[event_id] = {"error": str(e)}
except Exception as e: except Exception as e:
results.append({"error": str(e)}) pdu_results[event_id] = {"error": str(e)}
logger.exception("Failed to handle PDU") logger.exception("Failed to handle PDU")
if hasattr(transaction, "edus"): if hasattr(transaction, "edus"):
@ -164,14 +171,12 @@ class FederationServer(FederationBase):
for failure in getattr(transaction, "pdu_failures", []): for failure in getattr(transaction, "pdu_failures", []):
logger.info("Got failure %r", failure) logger.info("Got failure %r", failure)
logger.debug("Returning: %s", str(results))
response = { response = {
"pdus": dict(zip( "pdus": pdu_results,
(p.event_id for p in pdu_list), results
)),
} }
logger.debug("Returning: %s", str(response))
yield self.transaction_actions.set_response( yield self.transaction_actions.set_response(
transaction, transaction,
200, response 200, response