Fed server: use a linearizer for ongoing transactions

We don't want to process the same transaction multiple times concurrently, so
use a linearizer.
This commit is contained in:
Richard van der Hoff 2017-10-06 15:31:58 +01:00
parent ba5b9b80a5
commit 4c7c4d4061
1 changed files with 29 additions and 1 deletions

View File

@ -53,6 +53,7 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self._server_linearizer = Linearizer("fed_server") self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
# We cache responses to state queries, as they take a while and often # We cache responses to state queries, as they take a while and often
# come in waves. # come in waves.
@ -111,12 +112,39 @@ class FederationServer(FederationBase):
def on_incoming_transaction(self, transaction_data): def on_incoming_transaction(self, transaction_data):
# keep this as early as possible to make the calculated origin ts as # keep this as early as possible to make the calculated origin ts as
# accurate as possible. # accurate as possible.
request_time = int(self._clock.time_msec()) request_time = self._clock.time_msec()
transaction = Transaction(**transaction_data) transaction = Transaction(**transaction_data)
if not transaction.transaction_id:
raise Exception("Transaction missing transaction_id")
if not transaction.origin:
raise Exception("Transaction missing origin")
logger.debug("[%s] Got transaction", transaction.transaction_id) logger.debug("[%s] Got transaction", transaction.transaction_id)
# use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel.
with (yield self._transaction_linearizer.queue(
(transaction.origin, transaction.transaction_id),
)):
result = yield self._handle_incoming_transaction(
transaction, request_time,
)
defer.returnValue(result)
@defer.inlineCallbacks
def _handle_incoming_transaction(self, transaction, request_time):
""" Process an incoming transaction and return the HTTP response
Args:
transaction (Transaction): incoming transaction
request_time (int): timestamp that the HTTP request arrived at
Returns:
Deferred[(int, object)]: http response code and body
"""
response = yield self.transaction_actions.have_responded(transaction) response = yield self.transaction_actions.have_responded(transaction)
if response: if response: