Move logic into transaction_queue
This commit is contained in:
parent
0e830d3770
commit
daec6fc355
|
@ -106,15 +106,12 @@ class FederationClient(FederationBase):
|
||||||
Deferred: Completes when we have successfully processed the PDU
|
Deferred: Completes when we have successfully processed the PDU
|
||||||
and replicated it to any interested remote home servers.
|
and replicated it to any interested remote home servers.
|
||||||
"""
|
"""
|
||||||
order = self._order
|
|
||||||
self._order += 1
|
|
||||||
|
|
||||||
sent_pdus_destination_dist.inc_by(len(destinations))
|
sent_pdus_destination_dist.inc_by(len(destinations))
|
||||||
|
|
||||||
logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id)
|
logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id)
|
||||||
|
|
||||||
# TODO, add errback, etc.
|
# TODO, add errback, etc.
|
||||||
self._transaction_queue.send_pdu(pdu, destinations, order)
|
self._transaction_queue.send_pdu(pdu, destinations)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"[%s] transaction_layer.send_pdu... done",
|
"[%s] transaction_layer.send_pdu... done",
|
||||||
|
@ -127,16 +124,7 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def send_edu(self, destination, edu_type, content, key=None):
|
def send_edu(self, destination, edu_type, content, key=None):
|
||||||
edu = Edu(
|
self._transaction_queue.send_edu(destination, edu_type, content, key=key)
|
||||||
origin=self.server_name,
|
|
||||||
destination=destination,
|
|
||||||
edu_type=edu_type,
|
|
||||||
content=content,
|
|
||||||
)
|
|
||||||
|
|
||||||
sent_edus_counter.inc()
|
|
||||||
|
|
||||||
self._transaction_queue.send_edu(edu, key=key)
|
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def send_device_messages(self, destination):
|
def send_device_messages(self, destination):
|
||||||
|
|
|
@ -68,8 +68,6 @@ class ReplicationLayer(FederationClient, FederationServer):
|
||||||
self.transaction_actions = TransactionActions(self.store)
|
self.transaction_actions = TransactionActions(self.store)
|
||||||
self._transaction_queue = TransactionQueue(hs, transport_layer)
|
self._transaction_queue = TransactionQueue(hs, transport_layer)
|
||||||
|
|
||||||
self._order = 0
|
|
||||||
|
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
|
||||||
super(ReplicationLayer, self).__init__(hs)
|
super(ReplicationLayer, self).__init__(hs)
|
||||||
|
|
|
@ -95,6 +95,8 @@ class TransactionQueue(object):
|
||||||
# HACK to get unique tx id
|
# HACK to get unique tx id
|
||||||
self._next_txn_id = int(self.clock.time_msec())
|
self._next_txn_id = int(self.clock.time_msec())
|
||||||
|
|
||||||
|
self._order = 1
|
||||||
|
|
||||||
def can_send_to(self, destination):
|
def can_send_to(self, destination):
|
||||||
"""Can we send messages to the given server?
|
"""Can we send messages to the given server?
|
||||||
|
|
||||||
|
@ -115,11 +117,14 @@ class TransactionQueue(object):
|
||||||
else:
|
else:
|
||||||
return not destination.startswith("localhost")
|
return not destination.startswith("localhost")
|
||||||
|
|
||||||
def send_pdu(self, pdu, destinations, order):
|
def send_pdu(self, pdu, destinations):
|
||||||
# We loop through all destinations to see whether we already have
|
# We loop through all destinations to see whether we already have
|
||||||
# a transaction in progress. If we do, stick it in the pending_pdus
|
# a transaction in progress. If we do, stick it in the pending_pdus
|
||||||
# table and we'll get back to it later.
|
# table and we'll get back to it later.
|
||||||
|
|
||||||
|
order = self._order
|
||||||
|
self._order += 1
|
||||||
|
|
||||||
destinations = set(destinations)
|
destinations = set(destinations)
|
||||||
destinations = set(
|
destinations = set(
|
||||||
dest for dest in destinations if self.can_send_to(dest)
|
dest for dest in destinations if self.can_send_to(dest)
|
||||||
|
@ -140,6 +145,9 @@ class TransactionQueue(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
def send_presence(self, destination, states):
|
def send_presence(self, destination, states):
|
||||||
|
if not self.can_send_to(destination):
|
||||||
|
return
|
||||||
|
|
||||||
self.pending_presence_by_dest.setdefault(destination, {}).update({
|
self.pending_presence_by_dest.setdefault(destination, {}).update({
|
||||||
state.user_id: state for state in states
|
state.user_id: state for state in states
|
||||||
})
|
})
|
||||||
|
@ -148,8 +156,13 @@ class TransactionQueue(object):
|
||||||
self._attempt_new_transaction, destination
|
self._attempt_new_transaction, destination
|
||||||
)
|
)
|
||||||
|
|
||||||
def send_edu(self, edu, key=None):
|
def send_edu(self, destination, edu_type, content, key=None):
|
||||||
destination = edu.destination
|
edu = Edu(
|
||||||
|
origin=self.server_name,
|
||||||
|
destination=destination,
|
||||||
|
edu_type=edu_type,
|
||||||
|
content=content,
|
||||||
|
)
|
||||||
|
|
||||||
if not self.can_send_to(destination):
|
if not self.can_send_to(destination):
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue