Correctly guard against multiple concurrent transactions
This commit is contained in:
parent
1fe7ca1362
commit
d2688d7f03
|
@ -170,44 +170,53 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _attempt_new_transaction(self, destination):
|
def _attempt_new_transaction(self, destination):
|
||||||
yield run_on_reactor()
|
# list of (pending_pdu, deferred, order)
|
||||||
while True:
|
if destination in self.pending_transactions:
|
||||||
# list of (pending_pdu, deferred, order)
|
# XXX: pending_transactions can get stuck on by a never-ending
|
||||||
if destination in self.pending_transactions:
|
# request at which point pending_pdus_by_dest just keeps growing.
|
||||||
# XXX: pending_transactions can get stuck on by a never-ending
|
# we need application-layer timeouts of some flavour of these
|
||||||
# request at which point pending_pdus_by_dest just keeps growing.
|
# requests
|
||||||
# we need application-layer timeouts of some flavour of these
|
logger.debug(
|
||||||
# requests
|
"TX [%s] Transaction already in progress",
|
||||||
logger.debug(
|
destination
|
||||||
"TX [%s] Transaction already in progress",
|
|
||||||
destination
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
|
||||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
|
||||||
|
|
||||||
device_message_edus, device_stream_id = (
|
|
||||||
yield self._get_new_device_messages(destination)
|
|
||||||
)
|
)
|
||||||
|
return
|
||||||
|
|
||||||
pending_edus.extend(device_message_edus)
|
try:
|
||||||
|
self.pending_transactions[destination] = 1
|
||||||
|
|
||||||
if pending_pdus:
|
yield run_on_reactor()
|
||||||
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
|
||||||
destination, len(pending_pdus))
|
|
||||||
|
|
||||||
if not pending_pdus and not pending_edus and not pending_failures:
|
while True:
|
||||||
logger.debug("TX [%s] Nothing to send", destination)
|
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||||
self.last_device_stream_id_by_dest[destination] = device_stream_id
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
return
|
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
||||||
|
|
||||||
yield self._send_new_transaction(
|
device_message_edus, device_stream_id = (
|
||||||
destination, pending_pdus, pending_edus, pending_failures,
|
yield self._get_new_device_messages(destination)
|
||||||
device_stream_id,
|
)
|
||||||
should_delete_from_device_stream=bool(device_message_edus)
|
|
||||||
)
|
pending_edus.extend(device_message_edus)
|
||||||
|
|
||||||
|
if pending_pdus:
|
||||||
|
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
||||||
|
destination, len(pending_pdus))
|
||||||
|
|
||||||
|
if not pending_pdus and not pending_edus and not pending_failures:
|
||||||
|
logger.debug("TX [%s] Nothing to send", destination)
|
||||||
|
self.last_device_stream_id_by_dest[destination] = (
|
||||||
|
device_stream_id
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
yield self._send_new_transaction(
|
||||||
|
destination, pending_pdus, pending_edus, pending_failures,
|
||||||
|
device_stream_id,
|
||||||
|
should_delete_from_device_stream=bool(device_message_edus)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
# We want to be *very* sure we delete this after we stop processing
|
||||||
|
self.pending_transactions.pop(destination, None)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_new_device_messages(self, destination):
|
def _get_new_device_messages(self, destination):
|
||||||
|
@ -240,8 +249,6 @@ class TransactionQueue(object):
|
||||||
failures = [x.get_dict() for x in pending_failures]
|
failures = [x.get_dict() for x in pending_failures]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.pending_transactions[destination] = 1
|
|
||||||
|
|
||||||
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
||||||
|
|
||||||
txn_id = str(self._next_txn_id)
|
txn_id = str(self._next_txn_id)
|
||||||
|
@ -375,7 +382,3 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
for p in pdus:
|
for p in pdus:
|
||||||
logger.info("Failed to send event %s to %s", p.event_id, destination)
|
logger.info("Failed to send event %s to %s", p.event_id, destination)
|
||||||
|
|
||||||
finally:
|
|
||||||
# We want to be *very* sure we delete this after we stop processing
|
|
||||||
self.pending_transactions.pop(destination, None)
|
|
||||||
|
|
Loading…
Reference in New Issue