Add admin API to get a list of federated rooms (#11658)

This commit is contained in:
Dirk Klimpel 2022-01-25 17:11:40 +01:00 committed by GitHub
parent 0938f32e93
commit 6a72c910f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 444 additions and 25 deletions

View File

@ -0,0 +1 @@
Add an admin API to get a list of rooms that federate with a given remote homeserver.

View File

@ -119,6 +119,66 @@ The following parameters should be set in the URL:
The response fields are the same like in the `destinations` array in
[List of destinations](#list-of-destinations) response.
## Destination rooms
This API gets the rooms that federate with a specific remote server.
The API is:
```
GET /_synapse/admin/v1/federation/destinations/<destination>/rooms
```
A response body like the following is returned:
```json
{
"rooms":[
{
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
"stream_ordering": 8326
},
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"stream_ordering": 93534
}
],
"total": 2
}
```
To paginate, check for `next_token` and if present, call the endpoint again
with `from` set to the value of `next_token`. This will return a new page.
If the endpoint does not return a `next_token` then there are no more destinations
to paginate through.
**Parameters**
The following parameters should be set in the URL:
- `destination` - Name of the remote server.
The following query parameters are available:
- `from` - Offset in the returned list. Defaults to `0`.
- `limit` - Maximum amount of destinations to return. Defaults to `100`.
- `dir` - Direction of room order by `room_id`. Either `f` for forwards or `b` for
backwards. Defaults to `f`.
**Response**
The following fields are returned in the JSON response body:
- `rooms` - An array of objects, each containing information about a room.
Room objects contain the following fields:
- `room_id` - string - The ID of the room.
- `stream_ordering` - integer - The stream ordering of the most recent
successfully-sent [PDU](understanding_synapse_through_grafana_graphs.md#federation)
to this destination in this room.
- `next_token`: string representing a positive integer - Indication for pagination. See above.
- `total` - integer - Total number of destinations.
## Reset connection timeout
Synapse makes federation requests to other homeservers. If a federation request fails,

View File

@ -41,6 +41,7 @@ from synapse.rest.admin.event_reports import (
EventReportsRestServlet,
)
from synapse.rest.admin.federation import (
DestinationMembershipRestServlet,
DestinationResetConnectionRestServlet,
DestinationRestServlet,
ListDestinationsRestServlet,
@ -268,6 +269,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ListRegistrationTokensRestServlet(hs).register(http_server)
NewRegistrationTokenRestServlet(hs).register(http_server)
RegistrationTokenRestServlet(hs).register(http_server)
DestinationMembershipRestServlet(hs).register(http_server)
DestinationResetConnectionRestServlet(hs).register(http_server)
DestinationRestServlet(hs).register(http_server)
ListDestinationsRestServlet(hs).register(http_server)

View File

@ -148,6 +148,62 @@ class DestinationRestServlet(RestServlet):
return HTTPStatus.OK, response
class DestinationMembershipRestServlet(RestServlet):
"""Get list of rooms of a destination.
This needs user to have administrator access in Synapse.
GET /_synapse/admin/v1/federation/destinations/<destination>/rooms?from=0&limit=10
returns:
200 OK with a list of rooms if success otherwise an error.
The parameters `from` and `limit` are required only for pagination.
By default, a `limit` of 100 is used.
"""
PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]*)/rooms$")
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastore()
async def on_GET(
self, request: SynapseRequest, destination: str
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
if not await self._store.is_destination_known(destination):
raise NotFoundError("Unknown destination")
start = parse_integer(request, "from", default=0)
limit = parse_integer(request, "limit", default=100)
if start < 0:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Query parameter from must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)
if limit < 0:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Query parameter limit must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)
direction = parse_string(request, "dir", default="f", allowed_values=("f", "b"))
rooms, total = await self._store.get_destination_rooms_paginate(
destination, start, limit, direction
)
response = {"rooms": rooms, "total": total}
if (start + limit) < total:
response["next_token"] = str(start + len(rooms))
return HTTPStatus.OK, response
class DestinationResetConnectionRestServlet(RestServlet):
"""Reset destinations' connection timeouts and wake it up.
This needs user to have administrator access in Synapse.

View File

@ -561,6 +561,54 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
"get_destinations_paginate_txn", get_destinations_paginate_txn
)
async def get_destination_rooms_paginate(
self, destination: str, start: int, limit: int, direction: str = "f"
) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of destination's rooms.
This will return a json list of rooms and the
total number of rooms.
Args:
destination: the destination to query
start: start number to begin the query from
limit: number of rows to retrieve
direction: sort ascending or descending by room_id
Returns:
A tuple of a dict of rooms and a count of total rooms.
"""
def get_destination_rooms_paginate_txn(
txn: LoggingTransaction,
) -> Tuple[List[JsonDict], int]:
if direction == "b":
order = "DESC"
else:
order = "ASC"
sql = """
SELECT COUNT(*) as total_rooms
FROM destination_rooms
WHERE destination = ?
"""
txn.execute(sql, [destination])
count = cast(Tuple[int], txn.fetchone())[0]
rooms = self.db_pool.simple_select_list_paginate_txn(
txn=txn,
table="destination_rooms",
orderby="room_id",
start=start,
limit=limit,
retcols=("room_id", "stream_ordering"),
order_direction=order,
)
return rooms, count
return await self.db_pool.runInteraction(
"get_destination_rooms_paginate_txn", get_destination_rooms_paginate_txn
)
async def is_destination_known(self, destination: str) -> bool:
"""Check if a destination is known to the server."""
result = await self.db_pool.simple_select_one_onecol(

View File

@ -20,7 +20,7 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
@ -52,9 +52,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
]
)
def test_requester_is_no_admin(self, method: str, url: str) -> None:
"""
If the user is not a server admin, an error 403 is returned.
"""
"""If the user is not a server admin, an error 403 is returned."""
self.register_user("user", "pass", admin=False)
other_user_tok = self.login("user", "pass")
@ -70,9 +68,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
"""If parameters are invalid, an error is returned."""
# negative limit
channel = self.make_request(
@ -135,9 +131,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_limit(self) -> None:
"""
Testing list of destinations with limit
"""
"""Testing list of destinations with limit"""
number_destinations = 20
self._create_destinations(number_destinations)
@ -155,9 +149,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self._check_fields(channel.json_body["destinations"])
def test_from(self) -> None:
"""
Testing list of destinations with a defined starting point (from)
"""
"""Testing list of destinations with a defined starting point (from)"""
number_destinations = 20
self._create_destinations(number_destinations)
@ -175,9 +167,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self._check_fields(channel.json_body["destinations"])
def test_limit_and_from(self) -> None:
"""
Testing list of destinations with a defined starting point and limit
"""
"""Testing list of destinations with a defined starting point and limit"""
number_destinations = 20
self._create_destinations(number_destinations)
@ -195,9 +185,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self._check_fields(channel.json_body["destinations"])
def test_next_token(self) -> None:
"""
Testing that `next_token` appears at the right place
"""
"""Testing that `next_token` appears at the right place"""
number_destinations = 20
self._create_destinations(number_destinations)
@ -256,9 +244,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
def test_list_all_destinations(self) -> None:
"""
List all destinations.
"""
"""List all destinations."""
number_destinations = 5
self._create_destinations(number_destinations)
@ -277,9 +263,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self._check_fields(channel.json_body["destinations"])
def test_order_by(self) -> None:
"""
Testing order list with parameter `order_by`
"""
"""Testing order list with parameter `order_by`"""
def _order_test(
expected_destination_list: List[str],
@ -543,3 +527,271 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertIn("retry_interval", c)
self.assertIn("failure_ts", c)
self.assertIn("last_successful_stream_ordering", c)
class DestinationMembershipTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
room.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.dest = "sub0.example.com"
self.url = f"/_synapse/admin/v1/federation/destinations/{self.dest}/rooms"
# Record that we successfully contacted a destination in the DB.
self.get_success(
self.store.set_destination_retry_timings(self.dest, None, 0, 0)
)
def test_requester_is_no_admin(self) -> None:
"""If the user is not a server admin, an error 403 is returned."""
self.register_user("user", "pass", admin=False)
other_user_tok = self.login("user", "pass")
channel = self.make_request(
"GET",
self.url,
access_token=other_user_tok,
)
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_invalid_parameter(self) -> None:
"""If parameters are invalid, an error is returned."""
# negative limit
channel = self.make_request(
"GET",
self.url + "?limit=-5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# negative from
channel = self.make_request(
"GET",
self.url + "?from=-5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# invalid search order
channel = self.make_request(
"GET",
self.url + "?dir=bar",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# invalid destination
channel = self.make_request(
"GET",
"/_synapse/admin/v1/federation/destinations/%s/rooms" % ("invalid",),
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_limit(self) -> None:
"""Testing list of destinations with limit"""
number_rooms = 5
self._create_destination_rooms(number_rooms)
channel = self.make_request(
"GET",
self.url + "?limit=3",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_rooms)
self.assertEqual(len(channel.json_body["rooms"]), 3)
self.assertEqual(channel.json_body["next_token"], "3")
self._check_fields(channel.json_body["rooms"])
def test_from(self) -> None:
"""Testing list of rooms with a defined starting point (from)"""
number_rooms = 10
self._create_destination_rooms(number_rooms)
channel = self.make_request(
"GET",
self.url + "?from=5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_rooms)
self.assertEqual(len(channel.json_body["rooms"]), 5)
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["rooms"])
def test_limit_and_from(self) -> None:
"""Testing list of rooms with a defined starting point and limit"""
number_rooms = 10
self._create_destination_rooms(number_rooms)
channel = self.make_request(
"GET",
self.url + "?from=3&limit=5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_rooms)
self.assertEqual(channel.json_body["next_token"], "8")
self.assertEqual(len(channel.json_body["rooms"]), 5)
self._check_fields(channel.json_body["rooms"])
def test_order_direction(self) -> None:
"""Testing order list with parameter `dir`"""
number_rooms = 4
self._create_destination_rooms(number_rooms)
# get list in forward direction
channel_asc = self.make_request(
"GET",
self.url + "?dir=f",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel_asc.code, msg=channel_asc.json_body)
self.assertEqual(channel_asc.json_body["total"], number_rooms)
self.assertEqual(number_rooms, len(channel_asc.json_body["rooms"]))
self._check_fields(channel_asc.json_body["rooms"])
# get list in backward direction
channel_desc = self.make_request(
"GET",
self.url + "?dir=b",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel_desc.code, msg=channel_desc.json_body)
self.assertEqual(channel_desc.json_body["total"], number_rooms)
self.assertEqual(number_rooms, len(channel_desc.json_body["rooms"]))
self._check_fields(channel_desc.json_body["rooms"])
# test that both lists have different directions
for i in range(0, number_rooms):
self.assertEqual(
channel_asc.json_body["rooms"][i]["room_id"],
channel_desc.json_body["rooms"][number_rooms - 1 - i]["room_id"],
)
def test_next_token(self) -> None:
"""Testing that `next_token` appears at the right place"""
number_rooms = 5
self._create_destination_rooms(number_rooms)
# `next_token` does not appear
# Number of results is the number of entries
channel = self.make_request(
"GET",
self.url + "?limit=5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_rooms)
self.assertEqual(len(channel.json_body["rooms"]), number_rooms)
self.assertNotIn("next_token", channel.json_body)
# `next_token` does not appear
# Number of max results is larger than the number of entries
channel = self.make_request(
"GET",
self.url + "?limit=6",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_rooms)
self.assertEqual(len(channel.json_body["rooms"]), number_rooms)
self.assertNotIn("next_token", channel.json_body)
# `next_token` does appear
# Number of max results is smaller than the number of entries
channel = self.make_request(
"GET",
self.url + "?limit=4",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_rooms)
self.assertEqual(len(channel.json_body["rooms"]), 4)
self.assertEqual(channel.json_body["next_token"], "4")
# Check
# Set `from` to value of `next_token` for request remaining entries
# `next_token` does not appear
channel = self.make_request(
"GET",
self.url + "?from=4",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_rooms)
self.assertEqual(len(channel.json_body["rooms"]), 1)
self.assertNotIn("next_token", channel.json_body)
def test_destination_rooms(self) -> None:
"""Testing that request the list of rooms is successfully."""
number_rooms = 3
self._create_destination_rooms(number_rooms)
channel = self.make_request(
"GET",
self.url,
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_rooms)
self.assertEqual(number_rooms, len(channel.json_body["rooms"]))
self._check_fields(channel.json_body["rooms"])
def _create_destination_rooms(self, number_rooms: int) -> None:
"""Create a number rooms for destination
Args:
number_rooms: Number of rooms to be created
"""
for _ in range(0, number_rooms):
room_id = self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok
)
self.get_success(
self.store.store_destination_rooms_entries((self.dest,), room_id, 1234)
)
def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that the expected room attributes are present in content
Args:
content: List that is checked for content
"""
for c in content:
self.assertIn("room_id", c)
self.assertIn("stream_ordering", c)