Merge pull request #1088 from matrix-org/erikj/transaction_queue_check

Correctly guard against multiple concurrent transactions
This commit is contained in:
Erik Johnston 2016-09-09 11:49:06 +01:00 committed by GitHub
commit a15ba15e64
1 changed files with 42 additions and 39 deletions

View File

@ -170,44 +170,53 @@ class TransactionQueue(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _attempt_new_transaction(self, destination): def _attempt_new_transaction(self, destination):
yield run_on_reactor() # list of (pending_pdu, deferred, order)
while True: if destination in self.pending_transactions:
# list of (pending_pdu, deferred, order) # XXX: pending_transactions can get stuck on by a never-ending
if destination in self.pending_transactions: # request at which point pending_pdus_by_dest just keeps growing.
# XXX: pending_transactions can get stuck on by a never-ending # we need application-layer timeouts of some flavour of these
# request at which point pending_pdus_by_dest just keeps growing. # requests
# we need application-layer timeouts of some flavour of these logger.debug(
# requests "TX [%s] Transaction already in progress",
logger.debug( destination
"TX [%s] Transaction already in progress",
destination
)
return
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, [])
device_message_edus, device_stream_id = (
yield self._get_new_device_messages(destination)
) )
return
pending_edus.extend(device_message_edus) try:
self.pending_transactions[destination] = 1
if pending_pdus: yield run_on_reactor()
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
if not pending_pdus and not pending_edus and not pending_failures: while True:
logger.debug("TX [%s] Nothing to send", destination) pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
self.last_device_stream_id_by_dest[destination] = device_stream_id pending_edus = self.pending_edus_by_dest.pop(destination, [])
return pending_failures = self.pending_failures_by_dest.pop(destination, [])
yield self._send_new_transaction( device_message_edus, device_stream_id = (
destination, pending_pdus, pending_edus, pending_failures, yield self._get_new_device_messages(destination)
device_stream_id, )
should_delete_from_device_stream=bool(device_message_edus)
) pending_edus.extend(device_message_edus)
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
if not pending_pdus and not pending_edus and not pending_failures:
logger.debug("TX [%s] Nothing to send", destination)
self.last_device_stream_id_by_dest[destination] = (
device_stream_id
)
return
yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
device_stream_id,
should_delete_from_device_stream=bool(device_message_edus)
)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_new_device_messages(self, destination): def _get_new_device_messages(self, destination):
@ -240,8 +249,6 @@ class TransactionQueue(object):
failures = [x.get_dict() for x in pending_failures] failures = [x.get_dict() for x in pending_failures]
try: try:
self.pending_transactions[destination] = 1
logger.debug("TX [%s] _attempt_new_transaction", destination) logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id) txn_id = str(self._next_txn_id)
@ -375,7 +382,3 @@ class TransactionQueue(object):
for p in pdus: for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination) logger.info("Failed to send event %s to %s", p.event_id, destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)