Fix tightloop on sending transaction

This commit is contained in:
Erik Johnston 2016-09-09 13:12:53 +01:00
parent a15ba15e64
commit 4598682b43
1 changed files with 132 additions and 120 deletions

View File

@ -209,11 +209,13 @@ class TransactionQueue(object):
) )
return return
yield self._send_new_transaction( success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures, destination, pending_pdus, pending_edus, pending_failures,
device_stream_id, device_stream_id,
should_delete_from_device_stream=bool(device_message_edus) should_delete_from_device_stream=bool(device_message_edus)
) )
if not success:
break
finally: finally:
# We want to be *very* sure we delete this after we stop processing # We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None) self.pending_transactions.pop(destination, None)
@ -242,143 +244,153 @@ class TransactionQueue(object):
pending_failures, device_stream_id, pending_failures, device_stream_id,
should_delete_from_device_stream): should_delete_from_device_stream):
# Sort based on the order field # Sort based on the order field
pending_pdus.sort(key=lambda t: t[1]) pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus] pdus = [x[0] for x in pending_pdus]
edus = pending_edus edus = pending_edus
failures = [x.get_dict() for x in pending_failures] failures = [x.get_dict() for x in pending_failures]
try: success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id) try:
logger.debug("TX [%s] _attempt_new_transaction", destination)
limiter = yield get_retry_limiter( txn_id = str(self._next_txn_id)
destination,
self.clock,
self.store,
)
logger.debug( limiter = yield get_retry_limiter(
"TX [%s] {%s} Attempting new transaction" destination,
" (pdus: %d, edus: %d, failures: %d)", self.clock,
destination, txn_id, self.store,
len(pdus), )
len(edus),
len(failures)
)
logger.debug("TX [%s] Persisting transaction...", destination) logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
len(pdus),
len(edus),
len(failures)
)
transaction = Transaction.create_new( logger.debug("TX [%s] Persisting transaction...", destination)
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
pdu_failures=failures,
)
self._next_txn_id += 1 transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
pdu_failures=failures,
)
yield self.transaction_actions.prepare_to_send(transaction) self._next_txn_id += 1
logger.debug("TX [%s] Persisted transaction", destination) yield self.transaction_actions.prepare_to_send(transaction)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
len(failures),
)
with limiter: logger.debug("TX [%s] Persisted transaction", destination)
# Actually send the transaction logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
len(failures),
)
# FIXME (erikj): This is a bit of a hack to make the Pdu age with limiter:
# keys work # Actually send the transaction
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try: # FIXME (erikj): This is a bit of a hack to make the Pdu age
response = yield self.transport_layer.send_transaction( # keys work
transaction, json_data_cb def json_data_cb():
) data = transaction.get_dict()
code = 200 now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
if response: try:
for e_id, r in response.get("pdus", {}).items(): response = yield self.transport_layer.send_transaction(
if "error" in r: transaction, json_data_cb
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e:
code = e.code
response = e.response
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
) )
code = 200
logger.debug("TX [%s] Sent transaction", destination) if response:
logger.debug("TX [%s] Marking as delivered...", destination) for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e:
code = e.code
response = e.response
yield self.transaction_actions.delivered(
transaction, code, response
)
logger.debug("TX [%s] Marked as delivered", destination)
if code != 200:
for p in pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, destination
)
else:
# Remove the acknowledged device messages from the database
if should_delete_from_device_stream:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
self.last_device_stream_id_by_dest[destination] = device_stream_id
except NotRetryingDestination:
logger.info( logger.info(
"TX [%s] not ready for retry yet - " "TX [%s] {%s} got %d response",
"dropping transaction for now", destination, txn_id, code
destination,
)
except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
) )
for p in pdus: logger.debug("TX [%s] Sent transaction", destination)
logger.info("Failed to send event %s to %s", p.event_id, destination) logger.debug("TX [%s] Marking as delivered...", destination)
except Exception as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
yield self.transaction_actions.delivered(
transaction, code, response
)
logger.debug("TX [%s] Marked as delivered", destination)
if code != 200:
for p in pdus: for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination) logger.info(
"Failed to send event %s to %s", p.event_id, destination
)
success = False
else:
# Remove the acknowledged device messages from the database
if should_delete_from_device_stream:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
self.last_device_stream_id_by_dest[destination] = device_stream_id
except NotRetryingDestination:
logger.info(
"TX [%s] not ready for retry yet - "
"dropping transaction for now",
destination,
)
success = False
except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
success = False
for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)
except Exception as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
success = False
for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)
defer.returnValue(success)