Add type hints to `synapse/tests/rest/admin` (#11590)

This commit is contained in:
Dirk Klimpel 2021-12-16 20:59:56 +01:00 committed by GitHub
parent 1847d027e6
commit 8428ef66c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 57 deletions

1
changelog.d/11590.misc Normal file
View File

@ -0,0 +1 @@
Add type hints to `synapse/tests/rest/admin`.

View File

@ -242,6 +242,9 @@ disallow_untyped_defs = True
[mypy-tests.storage.test_user_directory]
disallow_untyped_defs = True
[mypy-tests.rest.admin.*]
disallow_untyped_defs = True
[mypy-tests.rest.client.test_directory]
disallow_untyped_defs = True

View File

@ -23,6 +23,7 @@ from synapse.api.errors import Codes
from synapse.rest.client import login
from synapse.server import HomeServer
from synapse.storage.background_updates import BackgroundUpdater
from synapse.types import JsonDict
from synapse.util import Clock
from tests import unittest
@ -96,7 +97,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
def _register_bg_update(self) -> None:
"Adds a bg update but doesn't start it"
async def _fake_update(progress, batch_size) -> int:
async def _fake_update(progress: JsonDict, batch_size: int) -> int:
await self.clock.sleep(0.2)
return batch_size

View File

@ -16,11 +16,14 @@ from typing import List, Optional
from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from tests import unittest
@ -31,7 +34,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
def prepare(self, reactor, clock, hs: HomeServer):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@ -44,7 +47,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
("/_synapse/admin/v1/federation/destinations/dummy",),
]
)
def test_requester_is_no_admin(self, url: str):
def test_requester_is_no_admin(self, url: str) -> None:
"""
If the user is not a server admin, an error 403 is returned.
"""
@ -62,7 +65,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_invalid_parameter(self):
def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@ -117,7 +120,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_limit(self):
def test_limit(self) -> None:
"""
Testing list of destinations with limit
"""
@ -137,7 +140,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["next_token"], "5")
self._check_fields(channel.json_body["destinations"])
def test_from(self):
def test_from(self) -> None:
"""
Testing list of destinations with a defined starting point (from)
"""
@ -157,7 +160,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["destinations"])
def test_limit_and_from(self):
def test_limit_and_from(self) -> None:
"""
Testing list of destinations with a defined starting point and limit
"""
@ -177,7 +180,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["destinations"]), 10)
self._check_fields(channel.json_body["destinations"])
def test_next_token(self):
def test_next_token(self) -> None:
"""
Testing that `next_token` appears at the right place
"""
@ -238,7 +241,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["destinations"]), 1)
self.assertNotIn("next_token", channel.json_body)
def test_list_all_destinations(self):
def test_list_all_destinations(self) -> None:
"""
List all destinations.
"""
@ -259,7 +262,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
# Check that all fields are available
self._check_fields(channel.json_body["destinations"])
def test_order_by(self):
def test_order_by(self) -> None:
"""
Testing order list with parameter `order_by`
"""
@ -268,7 +271,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
expected_destination_list: List[str],
order_by: Optional[str],
dir: Optional[str] = None,
):
) -> None:
"""Request the list of destinations in a certain order.
Assert that order is what we expect
@ -358,13 +361,13 @@ class FederationTestCase(unittest.HomeserverTestCase):
[dest[0][0], dest[2][0], dest[1][0]], "last_successful_stream_ordering", "b"
)
def test_search_term(self):
def test_search_term(self) -> None:
"""Test that searching for a destination works correctly"""
def _search_test(
expected_destination: Optional[str],
search_term: str,
):
) -> None:
"""Search for a destination and check that the returned destinationis a match
Args:
@ -410,7 +413,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
_search_test(None, "foo")
_search_test(None, "bar")
def test_get_single_destination(self):
def test_get_single_destination(self) -> None:
"""
Get one specific destinations.
"""
@ -429,7 +432,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
# convert channel.json_body into a List
self._check_fields([channel.json_body])
def _create_destinations(self, number_destinations: int):
def _create_destinations(self, number_destinations: int) -> None:
"""Create a number of destinations
Args:
@ -442,7 +445,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.store.set_destination_last_successful_stream_ordering(dest, 100)
)
def _check_fields(self, content: List[JsonDict]):
def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that the expected destination attributes are present in content
Args:

View File

@ -580,7 +580,9 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
return server_and_media_id
def _access_media(self, server_and_media_id, expect_success=True) -> None:
def _access_media(
self, server_and_media_id: str, expect_success: bool = True
) -> None:
"""
Try to access a media and check the result
"""

View File

@ -14,6 +14,7 @@
import random
import string
from http import HTTPStatus
from typing import Optional
from twisted.test.proto_helpers import MemoryReactor
@ -42,21 +43,27 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
self.url = "/_synapse/admin/v1/registration_tokens"
def _new_token(self, **kwargs) -> str:
def _new_token(
self,
token: Optional[str] = None,
uses_allowed: Optional[int] = None,
pending: int = 0,
completed: int = 0,
expiry_time: Optional[int] = None,
) -> str:
"""Helper function to create a token."""
token = kwargs.get(
"token",
"".join(random.choices(string.ascii_letters, k=8)),
)
if token is None:
token = "".join(random.choices(string.ascii_letters, k=8))
self.get_success(
self.store.db_pool.simple_insert(
"registration_tokens",
{
"token": token,
"uses_allowed": kwargs.get("uses_allowed", None),
"pending": kwargs.get("pending", 0),
"completed": kwargs.get("completed", 0),
"expiry_time": kwargs.get("expiry_time", None),
"uses_allowed": uses_allowed,
"pending": pending,
"completed": completed,
"expiry_time": expiry_time,
},
)
)

View File

@ -66,7 +66,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
)
self.url = "/_synapse/admin/v1/rooms/%s" % self.room_id
def test_requester_is_no_admin(self):
def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
@ -81,7 +81,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_room_does_not_exist(self):
def test_room_does_not_exist(self) -> None:
"""
Check that unknown rooms/server return 200
"""
@ -96,7 +96,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
def test_room_is_not_valid(self):
def test_room_is_not_valid(self) -> None:
"""
Check that invalid room names, return an error HTTPStatus.BAD_REQUEST.
"""
@ -115,7 +115,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
channel.json_body["error"],
)
def test_new_room_user_does_not_exist(self):
def test_new_room_user_does_not_exist(self) -> None:
"""
Tests that the user ID must be from local server but it does not have to exist.
"""
@ -133,7 +133,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self.assertIn("failed_to_kick_users", channel.json_body)
self.assertIn("local_aliases", channel.json_body)
def test_new_room_user_is_not_local(self):
def test_new_room_user_is_not_local(self) -> None:
"""
Check that only local users can create new room to move members.
"""
@ -151,7 +151,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
channel.json_body["error"],
)
def test_block_is_not_bool(self):
def test_block_is_not_bool(self) -> None:
"""
If parameter `block` is not boolean, return an error
"""
@ -166,7 +166,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
def test_purge_is_not_bool(self):
def test_purge_is_not_bool(self) -> None:
"""
If parameter `purge` is not boolean, return an error
"""
@ -181,7 +181,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
def test_purge_room_and_block(self):
def test_purge_room_and_block(self) -> None:
"""Test to purge a room and block it.
Members will not be moved to a new room and will not receive a message.
"""
@ -212,7 +212,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self._is_blocked(self.room_id, expect=True)
self._has_no_members(self.room_id)
def test_purge_room_and_not_block(self):
def test_purge_room_and_not_block(self) -> None:
"""Test to purge a room and do not block it.
Members will not be moved to a new room and will not receive a message.
"""
@ -243,7 +243,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self._is_blocked(self.room_id, expect=False)
self._has_no_members(self.room_id)
def test_block_room_and_not_purge(self):
def test_block_room_and_not_purge(self) -> None:
"""Test to block a room without purging it.
Members will not be moved to a new room and will not receive a message.
The room will not be purged.
@ -299,7 +299,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self._is_blocked(room_id)
def test_shutdown_room_consent(self):
def test_shutdown_room_consent(self) -> None:
"""Test that we can shutdown rooms with local users who have not
yet accepted the privacy policy. This used to fail when we tried to
force part the user from the old room.
@ -351,7 +351,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self._is_purged(self.room_id)
self._has_no_members(self.room_id)
def test_shutdown_room_block_peek(self):
def test_shutdown_room_block_peek(self) -> None:
"""Test that a world_readable room can no longer be peeked into after
it has been shut down.
Members will be moved to a new room and will receive a message.
@ -400,7 +400,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
# Assert we can no longer peek into the room
self._assert_peek(self.room_id, expect_code=HTTPStatus.FORBIDDEN)
def _is_blocked(self, room_id, expect=True):
def _is_blocked(self, room_id: str, expect: bool = True) -> None:
"""Assert that the room is blocked or not"""
d = self.store.is_room_blocked(room_id)
if expect:
@ -408,17 +408,17 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
else:
self.assertIsNone(self.get_success(d))
def _has_no_members(self, room_id):
def _has_no_members(self, room_id: str) -> None:
"""Assert there is now no longer anyone in the room"""
users_in_room = self.get_success(self.store.get_users_in_room(room_id))
self.assertEqual([], users_in_room)
def _is_member(self, room_id, user_id):
def _is_member(self, room_id: str, user_id: str) -> None:
"""Test that user is member of the room"""
users_in_room = self.get_success(self.store.get_users_in_room(room_id))
self.assertIn(user_id, users_in_room)
def _is_purged(self, room_id):
def _is_purged(self, room_id: str) -> None:
"""Test that the following tables have been purged of all rows related to the room."""
for table in PURGE_TABLES:
count = self.get_success(
@ -432,7 +432,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(count, 0, msg=f"Rows not purged in {table}")
def _assert_peek(self, room_id, expect_code):
def _assert_peek(self, room_id: str, expect_code: int) -> None:
"""Assert that the admin user can (or cannot) peek into the room."""
url = "rooms/%s/initialSync" % (room_id,)
@ -492,7 +492,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
("GET", "/_synapse/admin/v2/rooms/delete_status/%s"),
]
)
def test_requester_is_no_admin(self, method: str, url: str):
def test_requester_is_no_admin(self, method: str, url: str) -> None:
"""
If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
@ -507,7 +507,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_room_does_not_exist(self):
def test_room_does_not_exist(self) -> None:
"""
Check that unknown rooms/server return 200
@ -544,7 +544,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
("GET", "/_synapse/admin/v2/rooms/%s/delete_status"),
]
)
def test_room_is_not_valid(self, method: str, url: str):
def test_room_is_not_valid(self, method: str, url: str) -> None:
"""
Check that invalid room names, return an error HTTPStatus.BAD_REQUEST.
"""
@ -562,7 +562,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
channel.json_body["error"],
)
def test_new_room_user_does_not_exist(self):
def test_new_room_user_does_not_exist(self) -> None:
"""
Tests that the user ID must be from local server but it does not have to exist.
"""
@ -580,7 +580,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self._test_result(delete_id, self.other_user, expect_new_room=True)
def test_new_room_user_is_not_local(self):
def test_new_room_user_is_not_local(self) -> None:
"""
Check that only local users can create new room to move members.
"""
@ -598,7 +598,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
channel.json_body["error"],
)
def test_block_is_not_bool(self):
def test_block_is_not_bool(self) -> None:
"""
If parameter `block` is not boolean, return an error
"""
@ -613,7 +613,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
def test_purge_is_not_bool(self):
def test_purge_is_not_bool(self) -> None:
"""
If parameter `purge` is not boolean, return an error
"""
@ -628,7 +628,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
def test_delete_expired_status(self):
def test_delete_expired_status(self) -> None:
"""Test that the task status is removed after expiration."""
# first task, do not purge, that we can create a second task
@ -699,7 +699,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_delete_same_room_twice(self):
def test_delete_same_room_twice(self) -> None:
"""Test that the call for delete a room at second time gives an exception."""
body = {"new_room_user_id": self.admin_user}
@ -743,7 +743,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
expect_new_room=True,
)
def test_purge_room_and_block(self):
def test_purge_room_and_block(self) -> None:
"""Test to purge a room and block it.
Members will not be moved to a new room and will not receive a message.
"""
@ -774,7 +774,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self._is_blocked(self.room_id, expect=True)
self._has_no_members(self.room_id)
def test_purge_room_and_not_block(self):
def test_purge_room_and_not_block(self) -> None:
"""Test to purge a room and do not block it.
Members will not be moved to a new room and will not receive a message.
"""
@ -805,7 +805,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self._is_blocked(self.room_id, expect=False)
self._has_no_members(self.room_id)
def test_block_room_and_not_purge(self):
def test_block_room_and_not_purge(self) -> None:
"""Test to block a room without purging it.
Members will not be moved to a new room and will not receive a message.
The room will not be purged.
@ -838,7 +838,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self._is_blocked(self.room_id, expect=True)
self._has_no_members(self.room_id)
def test_shutdown_room_consent(self):
def test_shutdown_room_consent(self) -> None:
"""Test that we can shutdown rooms with local users who have not
yet accepted the privacy policy. This used to fail when we tried to
force part the user from the old room.
@ -899,7 +899,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self._is_purged(self.room_id)
self._has_no_members(self.room_id)
def test_shutdown_room_block_peek(self):
def test_shutdown_room_block_peek(self) -> None:
"""Test that a world_readable room can no longer be peeked into after
it has been shut down.
Members will be moved to a new room and will receive a message.