Add tests for `last_successful_stream_ordering` (#8258)
This commit is contained in:
parent
77b4711bc2
commit
765437df54
|
@ -0,0 +1 @@
|
||||||
|
Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.
|
|
@ -28,6 +28,24 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||||
return_value=make_awaitable(["test", "host2"])
|
return_value=make_awaitable(["test", "host2"])
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# whenever send_transaction is called, record the pdu data
|
||||||
|
self.pdus = []
|
||||||
|
self.failed_pdus = []
|
||||||
|
self.is_online = True
|
||||||
|
self.hs.get_federation_transport_client().send_transaction.side_effect = (
|
||||||
|
self.record_transaction
|
||||||
|
)
|
||||||
|
|
||||||
|
async def record_transaction(self, txn, json_cb):
|
||||||
|
if self.is_online:
|
||||||
|
data = json_cb()
|
||||||
|
self.pdus.extend(data["pdus"])
|
||||||
|
return {}
|
||||||
|
else:
|
||||||
|
data = json_cb()
|
||||||
|
self.failed_pdus.extend(data["pdus"])
|
||||||
|
raise IOError("Failed to connect because this is a test!")
|
||||||
|
|
||||||
def get_destination_room(self, room: str, destination: str = "host2") -> dict:
|
def get_destination_room(self, room: str, destination: str = "host2") -> dict:
|
||||||
"""
|
"""
|
||||||
Gets the destination_rooms entry for a (destination, room_id) pair.
|
Gets the destination_rooms entry for a (destination, room_id) pair.
|
||||||
|
@ -80,3 +98,61 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||||
self.assertEqual(row_1["event_id"], event_id_1)
|
self.assertEqual(row_1["event_id"], event_id_1)
|
||||||
self.assertEqual(row_2["event_id"], event_id_2)
|
self.assertEqual(row_2["event_id"], event_id_2)
|
||||||
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
|
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
|
||||||
|
|
||||||
|
@override_config({"send_federation": True})
|
||||||
|
def test_catch_up_last_successful_stream_ordering_tracking(self):
|
||||||
|
"""
|
||||||
|
Tests that we populate the `destination_rooms` table as needed.
|
||||||
|
"""
|
||||||
|
self.register_user("u1", "you the one")
|
||||||
|
u1_token = self.login("u1", "you the one")
|
||||||
|
room = self.helper.create_room_as("u1", tok=u1_token)
|
||||||
|
|
||||||
|
# take the remote offline
|
||||||
|
self.is_online = False
|
||||||
|
|
||||||
|
self.get_success(
|
||||||
|
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
|
||||||
|
)
|
||||||
|
|
||||||
|
self.helper.send(room, "wombats!", tok=u1_token)
|
||||||
|
self.pump()
|
||||||
|
|
||||||
|
lsso_1 = self.get_success(
|
||||||
|
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
|
||||||
|
"host2"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIsNone(
|
||||||
|
lsso_1,
|
||||||
|
"There should be no last successful stream ordering for an always-offline destination",
|
||||||
|
)
|
||||||
|
|
||||||
|
# bring the remote online
|
||||||
|
self.is_online = True
|
||||||
|
|
||||||
|
event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
|
||||||
|
|
||||||
|
lsso_2 = self.get_success(
|
||||||
|
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
|
||||||
|
"host2"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
row_2 = self.get_destination_room(room)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
self.pdus[0]["content"]["body"],
|
||||||
|
"rabbits!",
|
||||||
|
"Test fault: didn't receive the right PDU",
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
row_2["event_id"],
|
||||||
|
event_id_2,
|
||||||
|
"Test fault: destination_rooms not updated correctly",
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
lsso_2,
|
||||||
|
row_2["stream_ordering"],
|
||||||
|
"Send succeeded but not marked as last_successful_stream_ordering",
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue