synapse/tests/rest/admin/test_background_updates.py

366 lines
12 KiB
Python

# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from http import HTTPStatus
from typing import Collection
from parameterized import parameterized
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
from synapse.server import HomeServer
from synapse.storage.background_updates import BackgroundUpdater
from tests import unittest
class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs: HomeServer):
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@parameterized.expand(
[
("GET", "/_synapse/admin/v1/background_updates/enabled"),
("POST", "/_synapse/admin/v1/background_updates/enabled"),
("GET", "/_synapse/admin/v1/background_updates/status"),
("POST", "/_synapse/admin/v1/background_updates/start_job"),
]
)
def test_requester_is_no_admin(self, method: str, url: str):
"""
If the user is not a server admin, an error 403 is returned.
"""
self.register_user("user", "pass", admin=False)
other_user_tok = self.login("user", "pass")
channel = self.make_request(
method,
url,
content={},
access_token=other_user_tok,
)
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_invalid_parameter(self):
"""
If parameters are invalid, an error is returned.
"""
url = "/_synapse/admin/v1/background_updates/start_job"
# empty content
channel = self.make_request(
"POST",
url,
content={},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
# job_name invalid
channel = self.make_request(
"POST",
url,
content={"job_name": "unknown"},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
def _register_bg_update(self):
"Adds a bg update but doesn't start it"
async def _fake_update(progress, batch_size) -> int:
await self.clock.sleep(0.2)
return batch_size
self.store.db_pool.updates.register_background_update_handler(
"test_update",
_fake_update,
)
self.get_success(
self.store.db_pool.simple_insert(
table="background_updates",
values={
"update_name": "test_update",
"progress_json": "{}",
},
)
)
def test_status_empty(self):
"""Test the status API works."""
channel = self.make_request(
"GET",
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled, but none should be running.
self.assertDictEqual(
channel.json_body, {"current_updates": {}, "enabled": True}
)
def test_status_bg_update(self):
"""Test the status API works with a background update."""
# Create a new background update
self._register_bg_update()
self.store.db_pool.updates.start_doing_background_updates()
self.reactor.pump([1.0, 1.0])
channel = self.make_request(
"GET",
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled, and one should be running.
self.assertDictEqual(
channel.json_body,
{
"current_updates": {
"master": {
"name": "test_update",
"average_items_per_ms": 0.001,
"total_duration_ms": 1000.0,
"total_item_count": (
BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE
),
}
},
"enabled": True,
},
)
def test_enabled(self):
"""Test the enabled API works."""
# Create a new background update
self._register_bg_update()
self.store.db_pool.updates.start_doing_background_updates()
# Test that GET works and returns enabled is True.
channel = self.make_request(
"GET",
"/_synapse/admin/v1/background_updates/enabled",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": True})
# Disable the BG updates
channel = self.make_request(
"POST",
"/_synapse/admin/v1/background_updates/enabled",
content={"enabled": False},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": False})
# Advance a bit and get the current status, note this will finish the in
# flight background update so we call it the status API twice and check
# there was no change.
self.reactor.pump([1.0, 1.0])
channel = self.make_request(
"GET",
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(
channel.json_body,
{
"current_updates": {
"master": {
"name": "test_update",
"average_items_per_ms": 0.001,
"total_duration_ms": 1000.0,
"total_item_count": (
BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE
),
}
},
"enabled": False,
},
)
# Run the reactor for a bit so the BG updates would have a chance to run
# if they were to.
self.reactor.pump([1.0, 1.0])
channel = self.make_request(
"GET",
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# There should be no change from the previous /status response.
self.assertDictEqual(
channel.json_body,
{
"current_updates": {
"master": {
"name": "test_update",
"average_items_per_ms": 0.001,
"total_duration_ms": 1000.0,
"total_item_count": (
BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE
),
}
},
"enabled": False,
},
)
# Re-enable the background updates.
channel = self.make_request(
"POST",
"/_synapse/admin/v1/background_updates/enabled",
content={"enabled": True},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": True})
self.reactor.pump([1.0, 1.0])
channel = self.make_request(
"GET",
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled and making progress.
self.assertDictEqual(
channel.json_body,
{
"current_updates": {
"master": {
"name": "test_update",
"average_items_per_ms": 0.001,
"total_duration_ms": 2000.0,
"total_item_count": (
2 * BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE
),
}
},
"enabled": True,
},
)
@parameterized.expand(
[
("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
(
"regenerate_directory",
[
"populate_user_directory_createtables",
"populate_user_directory_process_rooms",
"populate_user_directory_process_users",
"populate_user_directory_cleanup",
],
),
]
)
def test_start_backround_job(self, job_name: str, updates: Collection[str]):
"""
Test that background updates add to database and be processed.
Args:
job_name: name of the job to call with API
updates: collection of background updates to be started
"""
# no background update is waiting
self.assertTrue(
self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
)
)
channel = self.make_request(
"POST",
"/_synapse/admin/v1/background_updates/start_job",
content={"job_name": job_name},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# test that each background update is waiting now
for update in updates:
self.assertFalse(
self.get_success(
self.store.db_pool.updates.has_completed_background_update(update)
)
)
self.wait_for_background_updates()
# background updates are done
self.assertTrue(
self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
)
)
def test_start_backround_job_twice(self):
"""Test that add a background update twice return an error."""
# add job to database
self.get_success(
self.store.db_pool.simple_insert(
table="background_updates",
values={
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
},
)
)
channel = self.make_request(
"POST",
"/_synapse/admin/v1/background_updates/start_job",
content={"job_name": "populate_stats_process_rooms"},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)