Catch-up after Federation Outage (split, 1) (#8230)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
parent
e351298444
commit
58f61f10f7
|
@ -0,0 +1 @@
|
|||
Track the latest event for every destination and room for catch-up after federation outage.
|
|
@ -209,7 +209,7 @@ class FederationSender:
|
|||
logger.debug("Sending %s to %r", event, destinations)
|
||||
|
||||
if destinations:
|
||||
self._send_pdu(event, destinations)
|
||||
await self._send_pdu(event, destinations)
|
||||
|
||||
now = self.clock.time_msec()
|
||||
ts = await self.store.get_received_ts(event.event_id)
|
||||
|
@ -265,7 +265,7 @@ class FederationSender:
|
|||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
|
||||
async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
|
||||
# We loop through all destinations to see whether we already have
|
||||
# a transaction in progress. If we do, stick it in the pending_pdus
|
||||
# table and we'll get back to it later.
|
||||
|
@ -280,6 +280,13 @@ class FederationSender:
|
|||
sent_pdus_destination_dist_total.inc(len(destinations))
|
||||
sent_pdus_destination_dist_count.inc()
|
||||
|
||||
# track the fact that we have a PDU for these destinations,
|
||||
# to allow us to perform catch-up later on if the remote is unreachable
|
||||
# for a while.
|
||||
await self.store.store_destination_rooms_entries(
|
||||
destinations, pdu.room_id, pdu.internal_metadata.stream_ordering,
|
||||
)
|
||||
|
||||
for destination in destinations:
|
||||
self._get_per_destination_queue(destination).send_pdu(pdu)
|
||||
|
||||
|
|
|
@ -952,7 +952,7 @@ class DatabasePool:
|
|||
key_names: Collection[str],
|
||||
key_values: Collection[Iterable[Any]],
|
||||
value_names: Collection[str],
|
||||
value_values: Iterable[Iterable[str]],
|
||||
value_values: Iterable[Iterable[Any]],
|
||||
) -> None:
|
||||
"""
|
||||
Upsert, many times.
|
||||
|
@ -981,7 +981,7 @@ class DatabasePool:
|
|||
key_names: Iterable[str],
|
||||
key_values: Collection[Iterable[Any]],
|
||||
value_names: Collection[str],
|
||||
value_values: Iterable[Iterable[str]],
|
||||
value_values: Iterable[Iterable[Any]],
|
||||
) -> None:
|
||||
"""
|
||||
Upsert, many times, but without native UPSERT support or batching.
|
||||
|
|
|
@ -69,6 +69,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
|||
# room_depth
|
||||
# state_groups
|
||||
# state_groups_state
|
||||
# destination_rooms
|
||||
|
||||
# we will build a temporary table listing the events so that we don't
|
||||
# have to keep shovelling the list back and forth across the
|
||||
|
@ -336,6 +337,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
|||
# and finally, the tables with an index on room_id (or no useful index)
|
||||
for table in (
|
||||
"current_state_events",
|
||||
"destination_rooms",
|
||||
"event_backward_extremities",
|
||||
"event_forward_extremities",
|
||||
"event_json",
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/* Copyright 2020 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
-- This schema delta alters the schema to enable 'catching up' remote homeservers
|
||||
-- after there has been a connectivity problem for any reason.
|
||||
|
||||
-- This stores, for each (destination, room) pair, the stream_ordering of the
|
||||
-- latest event for that destination.
|
||||
CREATE TABLE IF NOT EXISTS destination_rooms (
|
||||
-- the destination in question.
|
||||
destination TEXT NOT NULL REFERENCES destinations (destination),
|
||||
-- the ID of the room in question
|
||||
room_id TEXT NOT NULL REFERENCES rooms (room_id),
|
||||
-- the stream_ordering of the event
|
||||
stream_ordering INTEGER NOT NULL,
|
||||
PRIMARY KEY (destination, room_id)
|
||||
-- We don't declare a foreign key on stream_ordering here because that'd mean
|
||||
-- we'd need to either maintain an index (expensive) or do a table scan of
|
||||
-- destination_rooms whenever we delete an event (also potentially expensive).
|
||||
-- In addition to that, a foreign key on stream_ordering would be redundant
|
||||
-- as this row doesn't need to refer to a specific event; if the event gets
|
||||
-- deleted then it doesn't affect the validity of the stream_ordering here.
|
||||
);
|
||||
|
||||
-- This index is needed to make it so that a deletion of a room (in the rooms
|
||||
-- table) can be efficient, as otherwise a table scan would need to be performed
|
||||
-- to check that no destination_rooms rows point to the room to be deleted.
|
||||
-- Also: it makes it efficient to delete all the entries for a given room ID,
|
||||
-- such as when purging a room.
|
||||
CREATE INDEX IF NOT EXISTS destination_rooms_room_id
|
||||
ON destination_rooms (room_id);
|
|
@ -15,13 +15,14 @@
|
|||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import Optional, Tuple
|
||||
from typing import Iterable, Optional, Tuple
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
|
||||
|
@ -164,7 +165,9 @@ class TransactionStore(SQLBaseStore):
|
|||
allow_none=True,
|
||||
)
|
||||
|
||||
if result and result["retry_last_ts"] > 0:
|
||||
# check we have a row and retry_last_ts is not null or zero
|
||||
# (retry_last_ts can't be negative)
|
||||
if result and result["retry_last_ts"]:
|
||||
return result
|
||||
else:
|
||||
return None
|
||||
|
@ -273,3 +276,60 @@ class TransactionStore(SQLBaseStore):
|
|||
await self.db_pool.runInteraction(
|
||||
"_cleanup_transactions", _cleanup_transactions_txn
|
||||
)
|
||||
|
||||
async def store_destination_rooms_entries(
|
||||
self, destinations: Iterable[str], room_id: str, stream_ordering: int,
|
||||
) -> None:
|
||||
"""
|
||||
Updates or creates `destination_rooms` entries in batch for a single event.
|
||||
|
||||
Args:
|
||||
destinations: list of destinations
|
||||
room_id: the room_id of the event
|
||||
stream_ordering: the stream_ordering of the event
|
||||
"""
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"store_destination_rooms_entries",
|
||||
self._store_destination_rooms_entries_txn,
|
||||
destinations,
|
||||
room_id,
|
||||
stream_ordering,
|
||||
)
|
||||
|
||||
def _store_destination_rooms_entries_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
destinations: Iterable[str],
|
||||
room_id: str,
|
||||
stream_ordering: int,
|
||||
) -> None:
|
||||
|
||||
# ensure we have a `destinations` row for this destination, as there is
|
||||
# a foreign key constraint.
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
q = """
|
||||
INSERT INTO destinations (destination)
|
||||
VALUES (?)
|
||||
ON CONFLICT DO NOTHING;
|
||||
"""
|
||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||
q = """
|
||||
INSERT OR IGNORE INTO destinations (destination)
|
||||
VALUES (?);
|
||||
"""
|
||||
else:
|
||||
raise RuntimeError("Unknown database engine")
|
||||
|
||||
txn.execute_batch(q, ((destination,) for destination in destinations))
|
||||
|
||||
rows = [(destination, room_id) for destination in destinations]
|
||||
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
"destination_rooms",
|
||||
["destination", "room_id"],
|
||||
rows,
|
||||
["stream_ordering"],
|
||||
[(stream_ordering,)] * len(rows),
|
||||
)
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
from mock import Mock
|
||||
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
|
||||
from tests.test_utils import event_injection, make_awaitable
|
||||
from tests.unittest import FederatingHomeserverTestCase, override_config
|
||||
|
||||
|
||||
class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def make_homeserver(self, reactor, clock):
|
||||
return self.setup_test_homeserver(
|
||||
federation_transport_client=Mock(spec=["send_transaction"]),
|
||||
)
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
# stub out get_current_hosts_in_room
|
||||
state_handler = hs.get_state_handler()
|
||||
|
||||
# This mock is crucial for destination_rooms to be populated.
|
||||
state_handler.get_current_hosts_in_room = Mock(
|
||||
return_value=make_awaitable(["test", "host2"])
|
||||
)
|
||||
|
||||
def get_destination_room(self, room: str, destination: str = "host2") -> dict:
|
||||
"""
|
||||
Gets the destination_rooms entry for a (destination, room_id) pair.
|
||||
|
||||
Args:
|
||||
room: room ID
|
||||
destination: what destination, default is "host2"
|
||||
|
||||
Returns:
|
||||
Dictionary of { event_id: str, stream_ordering: int }
|
||||
"""
|
||||
event_id, stream_ordering = self.get_success(
|
||||
self.hs.get_datastore().db_pool.execute(
|
||||
"test:get_destination_rooms",
|
||||
None,
|
||||
"""
|
||||
SELECT event_id, stream_ordering
|
||||
FROM destination_rooms dr
|
||||
JOIN events USING (stream_ordering)
|
||||
WHERE dr.destination = ? AND dr.room_id = ?
|
||||
""",
|
||||
destination,
|
||||
room,
|
||||
)
|
||||
)[0]
|
||||
return {"event_id": event_id, "stream_ordering": stream_ordering}
|
||||
|
||||
@override_config({"send_federation": True})
|
||||
def test_catch_up_destination_rooms_tracking(self):
|
||||
"""
|
||||
Tests that we populate the `destination_rooms` table as needed.
|
||||
"""
|
||||
self.register_user("u1", "you the one")
|
||||
u1_token = self.login("u1", "you the one")
|
||||
room = self.helper.create_room_as("u1", tok=u1_token)
|
||||
|
||||
self.get_success(
|
||||
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
|
||||
)
|
||||
|
||||
event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"]
|
||||
|
||||
row_1 = self.get_destination_room(room)
|
||||
|
||||
event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
|
||||
|
||||
row_2 = self.get_destination_room(room)
|
||||
|
||||
# check: events correctly registered in order
|
||||
self.assertEqual(row_1["event_id"], event_id_1)
|
||||
self.assertEqual(row_2["event_id"], event_id_2)
|
||||
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
|
Loading…
Reference in New Issue