Merge pull request #3628 from turt2live/travis/goodby-pdu-failures
Remove pdu_failures from transactions
This commit is contained in:
commit
5e2ee64660
|
@ -0,0 +1 @@
|
||||||
|
Remove unused field "pdu_failures" from transactions.
|
|
@ -207,10 +207,6 @@ class FederationServer(FederationBase):
|
||||||
edu.content
|
edu.content
|
||||||
)
|
)
|
||||||
|
|
||||||
pdu_failures = getattr(transaction, "pdu_failures", [])
|
|
||||||
for fail in pdu_failures:
|
|
||||||
logger.info("Got failure %r", fail)
|
|
||||||
|
|
||||||
response = {
|
response = {
|
||||||
"pdus": pdu_results,
|
"pdus": pdu_results,
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,8 +62,6 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
self.edus = SortedDict() # stream position -> Edu
|
self.edus = SortedDict() # stream position -> Edu
|
||||||
|
|
||||||
self.failures = SortedDict() # stream position -> (destination, Failure)
|
|
||||||
|
|
||||||
self.device_messages = SortedDict() # stream position -> destination
|
self.device_messages = SortedDict() # stream position -> destination
|
||||||
|
|
||||||
self.pos = 1
|
self.pos = 1
|
||||||
|
@ -79,7 +77,7 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
for queue_name in [
|
for queue_name in [
|
||||||
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
|
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
|
||||||
"edus", "failures", "device_messages", "pos_time",
|
"edus", "device_messages", "pos_time",
|
||||||
]:
|
]:
|
||||||
register(queue_name, getattr(self, queue_name))
|
register(queue_name, getattr(self, queue_name))
|
||||||
|
|
||||||
|
@ -149,12 +147,6 @@ class FederationRemoteSendQueue(object):
|
||||||
for key in keys[:i]:
|
for key in keys[:i]:
|
||||||
del self.edus[key]
|
del self.edus[key]
|
||||||
|
|
||||||
# Delete things out of failure map
|
|
||||||
keys = self.failures.keys()
|
|
||||||
i = self.failures.bisect_left(position_to_delete)
|
|
||||||
for key in keys[:i]:
|
|
||||||
del self.failures[key]
|
|
||||||
|
|
||||||
# Delete things out of device map
|
# Delete things out of device map
|
||||||
keys = self.device_messages.keys()
|
keys = self.device_messages.keys()
|
||||||
i = self.device_messages.bisect_left(position_to_delete)
|
i = self.device_messages.bisect_left(position_to_delete)
|
||||||
|
@ -204,13 +196,6 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
self.notifier.on_new_replication_data()
|
self.notifier.on_new_replication_data()
|
||||||
|
|
||||||
def send_failure(self, failure, destination):
|
|
||||||
"""As per TransactionQueue"""
|
|
||||||
pos = self._next_pos()
|
|
||||||
|
|
||||||
self.failures[pos] = (destination, str(failure))
|
|
||||||
self.notifier.on_new_replication_data()
|
|
||||||
|
|
||||||
def send_device_messages(self, destination):
|
def send_device_messages(self, destination):
|
||||||
"""As per TransactionQueue"""
|
"""As per TransactionQueue"""
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
|
@ -285,17 +270,6 @@ class FederationRemoteSendQueue(object):
|
||||||
for (pos, edu) in edus:
|
for (pos, edu) in edus:
|
||||||
rows.append((pos, EduRow(edu)))
|
rows.append((pos, EduRow(edu)))
|
||||||
|
|
||||||
# Fetch changed failures
|
|
||||||
i = self.failures.bisect_right(from_token)
|
|
||||||
j = self.failures.bisect_right(to_token) + 1
|
|
||||||
failures = self.failures.items()[i:j]
|
|
||||||
|
|
||||||
for (pos, (destination, failure)) in failures:
|
|
||||||
rows.append((pos, FailureRow(
|
|
||||||
destination=destination,
|
|
||||||
failure=failure,
|
|
||||||
)))
|
|
||||||
|
|
||||||
# Fetch changed device messages
|
# Fetch changed device messages
|
||||||
i = self.device_messages.bisect_right(from_token)
|
i = self.device_messages.bisect_right(from_token)
|
||||||
j = self.device_messages.bisect_right(to_token) + 1
|
j = self.device_messages.bisect_right(to_token) + 1
|
||||||
|
@ -417,34 +391,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
|
||||||
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
|
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
|
||||||
|
|
||||||
|
|
||||||
class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
|
|
||||||
"destination", # str
|
|
||||||
"failure",
|
|
||||||
))):
|
|
||||||
"""Streams failures to a remote server. Failures are issued when there was
|
|
||||||
something wrong with a transaction the remote sent us, e.g. it included
|
|
||||||
an event that was invalid.
|
|
||||||
"""
|
|
||||||
|
|
||||||
TypeId = "f"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_data(data):
|
|
||||||
return FailureRow(
|
|
||||||
destination=data["destination"],
|
|
||||||
failure=data["failure"],
|
|
||||||
)
|
|
||||||
|
|
||||||
def to_data(self):
|
|
||||||
return {
|
|
||||||
"destination": self.destination,
|
|
||||||
"failure": self.failure,
|
|
||||||
}
|
|
||||||
|
|
||||||
def add_to_buffer(self, buff):
|
|
||||||
buff.failures.setdefault(self.destination, []).append(self.failure)
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
|
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
|
||||||
"destination", # str
|
"destination", # str
|
||||||
))):
|
))):
|
||||||
|
@ -471,7 +417,6 @@ TypeToRow = {
|
||||||
PresenceRow,
|
PresenceRow,
|
||||||
KeyedEduRow,
|
KeyedEduRow,
|
||||||
EduRow,
|
EduRow,
|
||||||
FailureRow,
|
|
||||||
DeviceRow,
|
DeviceRow,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -481,7 +426,6 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
|
||||||
"presence", # list(UserPresenceState)
|
"presence", # list(UserPresenceState)
|
||||||
"keyed_edus", # dict of destination -> { key -> Edu }
|
"keyed_edus", # dict of destination -> { key -> Edu }
|
||||||
"edus", # dict of destination -> [Edu]
|
"edus", # dict of destination -> [Edu]
|
||||||
"failures", # dict of destination -> [failures]
|
|
||||||
"device_destinations", # set of destinations
|
"device_destinations", # set of destinations
|
||||||
))
|
))
|
||||||
|
|
||||||
|
@ -503,7 +447,6 @@ def process_rows_for_federation(transaction_queue, rows):
|
||||||
presence=[],
|
presence=[],
|
||||||
keyed_edus={},
|
keyed_edus={},
|
||||||
edus={},
|
edus={},
|
||||||
failures={},
|
|
||||||
device_destinations=set(),
|
device_destinations=set(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -532,9 +475,5 @@ def process_rows_for_federation(transaction_queue, rows):
|
||||||
edu.destination, edu.edu_type, edu.content, key=None,
|
edu.destination, edu.edu_type, edu.content, key=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
for destination, failure_list in iteritems(buff.failures):
|
|
||||||
for failure in failure_list:
|
|
||||||
transaction_queue.send_failure(destination, failure)
|
|
||||||
|
|
||||||
for destination in buff.device_destinations:
|
for destination in buff.device_destinations:
|
||||||
transaction_queue.send_device_messages(destination)
|
transaction_queue.send_device_messages(destination)
|
||||||
|
|
|
@ -116,9 +116,6 @@ class TransactionQueue(object):
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# destination -> list of tuple(failure, deferred)
|
|
||||||
self.pending_failures_by_dest = {}
|
|
||||||
|
|
||||||
# destination -> stream_id of last successfully sent to-device message.
|
# destination -> stream_id of last successfully sent to-device message.
|
||||||
# NB: may be a long or an int.
|
# NB: may be a long or an int.
|
||||||
self.last_device_stream_id_by_dest = {}
|
self.last_device_stream_id_by_dest = {}
|
||||||
|
@ -382,19 +379,6 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
self._attempt_new_transaction(destination)
|
self._attempt_new_transaction(destination)
|
||||||
|
|
||||||
def send_failure(self, failure, destination):
|
|
||||||
if destination == self.server_name or destination == "localhost":
|
|
||||||
return
|
|
||||||
|
|
||||||
if not self.can_send_to(destination):
|
|
||||||
return
|
|
||||||
|
|
||||||
self.pending_failures_by_dest.setdefault(
|
|
||||||
destination, []
|
|
||||||
).append(failure)
|
|
||||||
|
|
||||||
self._attempt_new_transaction(destination)
|
|
||||||
|
|
||||||
def send_device_messages(self, destination):
|
def send_device_messages(self, destination):
|
||||||
if destination == self.server_name or destination == "localhost":
|
if destination == self.server_name or destination == "localhost":
|
||||||
return
|
return
|
||||||
|
@ -469,7 +453,6 @@ class TransactionQueue(object):
|
||||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
||||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
|
||||||
|
|
||||||
pending_edus.extend(
|
pending_edus.extend(
|
||||||
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
||||||
|
@ -497,7 +480,7 @@ class TransactionQueue(object):
|
||||||
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
||||||
destination, len(pending_pdus))
|
destination, len(pending_pdus))
|
||||||
|
|
||||||
if not pending_pdus and not pending_edus and not pending_failures:
|
if not pending_pdus and not pending_edus:
|
||||||
logger.debug("TX [%s] Nothing to send", destination)
|
logger.debug("TX [%s] Nothing to send", destination)
|
||||||
self.last_device_stream_id_by_dest[destination] = (
|
self.last_device_stream_id_by_dest[destination] = (
|
||||||
device_stream_id
|
device_stream_id
|
||||||
|
@ -507,7 +490,7 @@ class TransactionQueue(object):
|
||||||
# END CRITICAL SECTION
|
# END CRITICAL SECTION
|
||||||
|
|
||||||
success = yield self._send_new_transaction(
|
success = yield self._send_new_transaction(
|
||||||
destination, pending_pdus, pending_edus, pending_failures,
|
destination, pending_pdus, pending_edus,
|
||||||
)
|
)
|
||||||
if success:
|
if success:
|
||||||
sent_transactions_counter.inc()
|
sent_transactions_counter.inc()
|
||||||
|
@ -584,14 +567,12 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
@measure_func("_send_new_transaction")
|
@measure_func("_send_new_transaction")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
|
def _send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||||
pending_failures):
|
|
||||||
|
|
||||||
# Sort based on the order field
|
# Sort based on the order field
|
||||||
pending_pdus.sort(key=lambda t: t[1])
|
pending_pdus.sort(key=lambda t: t[1])
|
||||||
pdus = [x[0] for x in pending_pdus]
|
pdus = [x[0] for x in pending_pdus]
|
||||||
edus = pending_edus
|
edus = pending_edus
|
||||||
failures = [x.get_dict() for x in pending_failures]
|
|
||||||
|
|
||||||
success = True
|
success = True
|
||||||
|
|
||||||
|
@ -601,11 +582,10 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"TX [%s] {%s} Attempting new transaction"
|
"TX [%s] {%s} Attempting new transaction"
|
||||||
" (pdus: %d, edus: %d, failures: %d)",
|
" (pdus: %d, edus: %d)",
|
||||||
destination, txn_id,
|
destination, txn_id,
|
||||||
len(pdus),
|
len(pdus),
|
||||||
len(edus),
|
len(edus),
|
||||||
len(failures)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("TX [%s] Persisting transaction...", destination)
|
logger.debug("TX [%s] Persisting transaction...", destination)
|
||||||
|
@ -617,7 +597,6 @@ class TransactionQueue(object):
|
||||||
destination=destination,
|
destination=destination,
|
||||||
pdus=pdus,
|
pdus=pdus,
|
||||||
edus=edus,
|
edus=edus,
|
||||||
pdu_failures=failures,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
self._next_txn_id += 1
|
self._next_txn_id += 1
|
||||||
|
@ -627,12 +606,11 @@ class TransactionQueue(object):
|
||||||
logger.debug("TX [%s] Persisted transaction", destination)
|
logger.debug("TX [%s] Persisted transaction", destination)
|
||||||
logger.info(
|
logger.info(
|
||||||
"TX [%s] {%s} Sending transaction [%s],"
|
"TX [%s] {%s} Sending transaction [%s],"
|
||||||
" (PDUs: %d, EDUs: %d, failures: %d)",
|
" (PDUs: %d, EDUs: %d)",
|
||||||
destination, txn_id,
|
destination, txn_id,
|
||||||
transaction.transaction_id,
|
transaction.transaction_id,
|
||||||
len(pdus),
|
len(pdus),
|
||||||
len(edus),
|
len(edus),
|
||||||
len(failures),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Actually send the transaction
|
# Actually send the transaction
|
||||||
|
|
|
@ -283,11 +283,10 @@ class FederationSendServlet(BaseFederationServlet):
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)",
|
"Received txn %s from %s. (PDUs: %d, EDUs: %d)",
|
||||||
transaction_id, origin,
|
transaction_id, origin,
|
||||||
len(transaction_data.get("pdus", [])),
|
len(transaction_data.get("pdus", [])),
|
||||||
len(transaction_data.get("edus", [])),
|
len(transaction_data.get("edus", [])),
|
||||||
len(transaction_data.get("failures", [])),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# We should ideally be getting this from the security layer.
|
# We should ideally be getting this from the security layer.
|
||||||
|
|
|
@ -73,7 +73,6 @@ class Transaction(JsonEncodedObject):
|
||||||
"previous_ids",
|
"previous_ids",
|
||||||
"pdus",
|
"pdus",
|
||||||
"edus",
|
"edus",
|
||||||
"pdu_failures",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
internal_keys = [
|
internal_keys = [
|
||||||
|
|
|
@ -44,7 +44,6 @@ def _expect_edu(destination, edu_type, content, origin="test"):
|
||||||
"content": content,
|
"content": content,
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"pdu_failures": [],
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue