Use task scheduler

This commit is contained in:
Mathieu Velten 2023-07-17 14:42:23 +02:00 committed by Mathieu Velten
parent c8b8c96b6e
commit 5065d7df75
10 changed files with 213 additions and 575 deletions

View File

@ -12,9 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from twisted.python.failure import Failure
@ -22,12 +21,19 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse
from synapse.handlers.room import ShutdownRoomParams
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamKeyType
from synapse.types import (
JsonDict,
JsonMapping,
Requester,
ScheduledTask,
StreamKeyType,
TaskStatus,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
@ -52,12 +58,6 @@ class PaginationHandler:
paginating during a purge.
"""
# when to remove a completed deletion/purge from the results map
CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
# how often to run the purge rooms loop
PURGE_ROOMS_INTERVAL_MS = 1000 * 3600 # 1 hour
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@ -68,6 +68,7 @@ class PaginationHandler:
self._server_name = hs.hostname
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
self._task_scheduler = hs.get_task_scheduler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
@ -101,95 +102,11 @@ class PaginationHandler:
job.longest_max_lifetime,
)
if self._is_master:
self.clock.looping_call(
run_as_background_process,
PaginationHandler.PURGE_ROOMS_INTERVAL_MS,
"purge_rooms",
self.purge_rooms,
)
async def purge_rooms(self) -> None:
"""This takes care of restoring unfinished purge/shutdown rooms from the DB.
It also takes care to launch scheduled ones, like rooms that has been fully
forgotten.
It should be run regularly.
"""
rooms_to_delete = await self.store.get_rooms_to_delete()
for r in rooms_to_delete:
room_id = r["room_id"]
delete_id = r["delete_id"]
status = r["status"]
action = r["action"]
timestamp = r["timestamp"]
if (
status == DeleteStatus.STATUS_COMPLETE
or status == DeleteStatus.STATUS_FAILED
):
# remove the delete from the list 24 hours after it completes or fails
ms_since_completed = self.clock.time_msec() - timestamp
if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS:
await self.store.delete_room_to_delete(room_id, delete_id)
continue
if room_id in self._purges_in_progress_by_room:
# a delete background task is already running (or has run)
# for this room id, let's ignore it for now
continue
# If the database says we were last in the middle of shutting down the room,
# let's continue the shutdown process.
shutdown_response = None
if (
action == DeleteStatus.ACTION_SHUTDOWN
and status == DeleteStatus.STATUS_SHUTTING_DOWN
):
shutdown_params = json.loads(r["params"])
if r["response"]:
shutdown_response = json.loads(r["response"])
await self._shutdown_and_purge_room(
room_id,
delete_id,
shutdown_params=shutdown_params,
shutdown_response=shutdown_response,
)
continue
# If the database says we were last in the middle of purging the room,
# let's continue the purge process.
if status == DeleteStatus.STATUS_PURGING:
purge_now = True
# Or if we're at or past the scheduled purge time, let's start that one as well
elif status == DeleteStatus.STATUS_SCHEDULED and (
timestamp is None or self.clock.time_msec() >= timestamp
):
purge_now = True
# TODO 2 stages purge, keep memberships for a while so we don't "break" sync
if purge_now:
params = {}
if r["params"]:
params = json.loads(r["params"])
if action == DeleteStatus.ACTION_PURGE_HISTORY:
if "token" in params:
await self._purge_history(
delete_id,
room_id,
params["token"],
params.get("delete_local_events", False),
True,
)
elif action == DeleteStatus.ACTION_PURGE:
await self.purge_room(
room_id,
delete_id,
params.get("force", False),
shutdown_response=shutdown_response,
)
self._task_scheduler.register_action(self._purge_history, "purge_history")
self._task_scheduler.register_action(self._purge_room, "purge_room")
self._task_scheduler.register_action(
self._shutdown_and_purge_room, "shutdown_and_purge_room"
)
async def purge_history_for_rooms_in_range(
self, min_ms: Optional[int], max_ms: Optional[int]
@ -241,14 +158,6 @@ class PaginationHandler:
for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
" for this room",
room_id,
)
continue
# If max_lifetime is None, it means that the room has no retention policy.
# Given we only retrieve such rooms when there's a default retention policy
# defined in the server's configuration, we can safely assume that's the
@ -330,46 +239,49 @@ class PaginationHandler:
Returns:
unique ID for this purge transaction.
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
400, "History purge already in progress for %s" % (room_id,)
)
purge_id = random_string(16)
purge_id = await self._task_scheduler.schedule_task(
"purge_history",
resource_id=room_id,
params={"token": token, "delete_local_events": delete_local_events},
)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
await self.store.upsert_room_to_delete(
room_id,
purge_id,
DeleteStatus.ACTION_PURGE_HISTORY,
DeleteStatus.STATUS_PURGING,
params=json.dumps(
{"token": token, "delete_local_events": delete_local_events}
),
)
run_as_background_process(
"purge_history",
self._purge_history,
purge_id,
room_id,
token,
delete_local_events,
True,
)
return purge_id
async def _purge_history(
self,
purge_id: str,
task: ScheduledTask,
first_launch: bool,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
if (
task.resource_id is None
or task.params is None
or "token" not in task.params
or "delete_local_events" not in task.params
):
return (
TaskStatus.FAILED,
None,
"Not enough parameters passed to _purge_history",
)
err = await self.purge_history(
task.resource_id,
task.params["token"],
task.params["delete_local_events"],
)
if err is not None:
return TaskStatus.FAILED, None, err
return TaskStatus.COMPLETE, None, None
async def purge_history(
self,
room_id: str,
token: str,
delete_local_events: bool,
update_rooms_to_delete_table: bool,
) -> None:
) -> Optional[str]:
"""Carry out a history purge on a room.
Args:
@ -382,88 +294,54 @@ class PaginationHandler:
functionality since we don't need to explicitly restore those, they
will be relaunch by the retention logic.
"""
self._purges_in_progress_by_room.add(room_id)
try:
async with self.pagination_lock.write(room_id):
await self._storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
if update_rooms_to_delete_table:
await self.store.upsert_room_to_delete(
room_id,
purge_id,
DeleteStatus.ACTION_PURGE_HISTORY,
DeleteStatus.STATUS_COMPLETE,
timestamp=self.clock.time_msec(),
)
return None
except Exception:
f = Failure()
logger.error(
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
)
if update_rooms_to_delete_table:
await self.store.upsert_room_to_delete(
room_id,
purge_id,
DeleteStatus.ACTION_PURGE_HISTORY,
DeleteStatus.STATUS_FAILED,
error=f.getErrorMessage(),
timestamp=self.clock.time_msec(),
)
finally:
self._purges_in_progress_by_room.discard(room_id)
return f.getErrorMessage()
if update_rooms_to_delete_table:
# remove the purge from the list 24 hours after it completes
async def clear_purge() -> None:
await self.store.delete_room_to_delete(room_id, purge_id)
self.hs.get_reactor().callLater(
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
)
@staticmethod
def _convert_to_delete_status(res: Dict[str, Any]) -> DeleteStatus:
status = DeleteStatus()
status.delete_id = res["delete_id"]
status.action = res["action"]
status.status = res["status"]
if "error" in res:
status.error = res["error"]
if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]:
status.shutdown_room = json.loads(res["response"])
return status
async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active deleting
Args:
delete_id: delete_id returned by start_shutdown_and_purge_room
or start_purge_history.
"""
res = await self.store.get_room_to_delete(delete_id)
if res:
return PaginationHandler._convert_to_delete_status(res)
return None
return await self._task_scheduler.get_task(delete_id)
async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]:
async def get_delete_tasks_by_room(self, room_id: str) -> List[ScheduledTask]:
"""Get all active delete statuses by room
Args:
room_id: room_id that is deleted
"""
res = await self.store.get_rooms_to_delete(room_id)
return [PaginationHandler._convert_to_delete_status(r) for r in res]
return await self._task_scheduler.get_tasks(
actions=["purge_room", "shutdown_and_purge_room"], resource_ids=[room_id]
)
async def _purge_room(
self,
task: ScheduledTask,
first_launch: bool,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
if not task.resource_id:
raise Exception("No room id passed to purge_room task")
params = task.params if task.params else {}
await self.purge_room(task.resource_id, params.get("force", False))
return TaskStatus.COMPLETE, None, None
async def purge_room(
self,
room_id: str,
delete_id: str,
force: bool = False,
shutdown_response: Optional[ShutdownRoomResponse] = None,
force: bool,
) -> None:
"""Purge the given room from the database.
@ -475,10 +353,6 @@ class PaginationHandler:
"""
logger.info("starting purge room_id=%s force=%s", room_id, force)
action = DeleteStatus.ACTION_PURGE
if shutdown_response:
action = DeleteStatus.ACTION_SHUTDOWN
async with self.pagination_lock.write(room_id):
# first check that we have no users in this room
joined = await self.store.is_host_joined(room_id, self._server_name)
@ -491,25 +365,8 @@ class PaginationHandler:
else:
raise SynapseError(400, "Users are still joined to this room")
await self.store.upsert_room_to_delete(
room_id,
delete_id,
action,
DeleteStatus.STATUS_PURGING,
response=json.dumps(shutdown_response),
)
await self._storage_controllers.purge_events.purge_room(room_id)
await self.store.upsert_room_to_delete(
room_id,
delete_id,
action,
DeleteStatus.STATUS_COMPLETE,
timestamp=self.clock.time_msec(),
response=json.dumps(shutdown_response),
)
logger.info("purge complete for room_id %s", room_id)
@trace
@ -789,11 +646,9 @@ class PaginationHandler:
async def _shutdown_and_purge_room(
self,
room_id: str,
delete_id: str,
shutdown_params: ShutdownRoomParams,
shutdown_response: Optional[ShutdownRoomResponse] = None,
) -> None:
task: ScheduledTask,
first_launch: bool,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""
Shuts down and purges a room.
@ -807,50 +662,36 @@ class PaginationHandler:
Keeps track of the `DeleteStatus` (and `ShutdownRoomResponse`) in `self._delete_by_id` and persisted in DB
"""
self._purges_in_progress_by_room.add(room_id)
try:
shutdown_response = await self._room_shutdown_handler.shutdown_room(
room_id=room_id,
delete_id=delete_id,
shutdown_params=shutdown_params,
shutdown_response=shutdown_response,
if task.resource_id is None or task.params is None:
raise Exception(
"No room id and/or no parameters passed to shutdown_and_purge_room task"
)
if shutdown_params["purge"]:
await self.purge_room(
room_id,
delete_id,
shutdown_params["force_purge"],
shutdown_response=shutdown_response,
)
room_id = task.resource_id
await self.store.upsert_room_to_delete(
async def update_result(result: Optional[JsonMapping]) -> None:
await self._task_scheduler.update_task(task.id, result=result)
shutdown_result = await self._room_shutdown_handler.shutdown_room(
room_id, task.params, task.result, update_result
)
if task.params.get("purge", False):
await self.purge_room(
room_id,
delete_id,
DeleteStatus.ACTION_SHUTDOWN,
DeleteStatus.STATUS_COMPLETE,
timestamp=self.clock.time_msec(),
response=json.dumps(shutdown_response),
task.params.get("force_purge", False),
)
except Exception:
f = Failure()
logger.error(
"failed",
exc_info=(f.type, f.value, f.getTracebackObject()),
)
await self.store.upsert_room_to_delete(
room_id,
delete_id,
DeleteStatus.ACTION_SHUTDOWN,
DeleteStatus.STATUS_FAILED,
timestamp=self.clock.time_msec(),
error=f.getErrorMessage(),
)
finally:
self._purges_in_progress_by_room.discard(room_id)
def start_shutdown_and_purge_room(
return (TaskStatus.COMPLETE, shutdown_result, None)
async def get_current_delete_tasks(self, room_id: str) -> List[ScheduledTask]:
return await self._task_scheduler.get_tasks(
actions=["purge_history", "purge_room", "shutdown_and_purge_room"],
resource_ids=[room_id],
statuses=[TaskStatus.ACTIVE, TaskStatus.SCHEDULED],
)
async def start_shutdown_and_purge_room(
self,
room_id: str,
shutdown_params: ShutdownRoomParams,
@ -864,7 +705,7 @@ class PaginationHandler:
Returns:
unique ID for this delete transaction.
"""
if room_id in self._purges_in_progress_by_room:
if len(await self.get_current_delete_tasks(room_id)) > 0:
raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
# This check is double to `RoomShutdownHandler.shutdown_room`
@ -877,7 +718,11 @@ class PaginationHandler:
400, "User must be our own: %s" % (new_room_user_id,)
)
delete_id = random_string(16)
delete_id = await self._task_scheduler.schedule_task(
"shutdown_and_purge_room",
resource_id=room_id,
params=shutdown_params,
)
# we log the delete_id here so that it can be tied back to the
# request id in the log lines.
@ -887,11 +732,4 @@ class PaginationHandler:
delete_id,
)
run_as_background_process(
"shutdown_and_purge_room",
self._shutdown_and_purge_room,
room_id,
delete_id,
shutdown_params,
)
return delete_id

View File

@ -14,14 +14,13 @@
"""Contains functions for performing actions on rooms."""
import itertools
import json
import logging
import math
import random
import string
from collections import OrderedDict
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple
import attr
from typing_extensions import TypedDict
@ -59,6 +58,7 @@ from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
JsonMapping,
MutableStateMap,
Requester,
RoomAlias,
@ -1883,10 +1883,12 @@ class RoomShutdownHandler:
async def shutdown_room(
self,
room_id: str,
delete_id: str,
shutdown_params: ShutdownRoomParams,
shutdown_response: Optional[ShutdownRoomResponse] = None,
) -> ShutdownRoomResponse:
params: JsonMapping,
result: Optional[JsonMapping] = None,
update_result_fct: Optional[
Callable[[Optional[JsonMapping]], Awaitable[None]]
] = None,
) -> Optional[JsonMapping]:
"""
Shuts down a room. Moves all local users and room aliases automatically
to a new room if `new_room_user_id` is set. Otherwise local users only
@ -1908,21 +1910,16 @@ class RoomShutdownHandler:
Returns: a dict matching `ShutdownRoomResponse`.
"""
requester_user_id = shutdown_params["requester_user_id"]
new_room_user_id = shutdown_params["new_room_user_id"]
block = shutdown_params["block"]
requester_user_id = params["requester_user_id"]
new_room_user_id = params["new_room_user_id"]
block = params["block"]
new_room_name = (
shutdown_params["new_room_name"]
if shutdown_params["new_room_name"]
params["new_room_name"]
if params["new_room_name"]
else self.DEFAULT_ROOM_NAME
)
message = (
shutdown_params["message"]
if shutdown_params["message"]
else self.DEFAULT_MESSAGE
)
message = params["message"] if params["message"] else self.DEFAULT_MESSAGE
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
@ -1934,21 +1931,15 @@ class RoomShutdownHandler:
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
if not shutdown_response:
shutdown_response = {
result = (
dict(result)
if result
else {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
await self.store.upsert_room_to_delete(
room_id,
delete_id,
DeleteStatus.ACTION_SHUTDOWN,
DeleteStatus.STATUS_SHUTTING_DOWN,
params=json.dumps(shutdown_params),
response=json.dumps(shutdown_response),
)
# Action the block first (even if the room doesn't exist yet)
@ -1959,9 +1950,9 @@ class RoomShutdownHandler:
if not await self.store.get_room(room_id):
# if we don't know about the room, there is nothing left to do.
return shutdown_response
return result
new_room_id = shutdown_response.get("new_room_id")
new_room_id = result.get("new_room_id")
if new_room_user_id is not None and new_room_id is None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
@ -1982,15 +1973,9 @@ class RoomShutdownHandler:
ratelimit=False,
)
shutdown_response["new_room_id"] = new_room_id
await self.store.upsert_room_to_delete(
room_id,
delete_id,
DeleteStatus.ACTION_SHUTDOWN,
DeleteStatus.STATUS_SHUTTING_DOWN,
params=json.dumps(shutdown_params),
response=json.dumps(shutdown_response),
)
result["new_room_id"] = new_room_id
if update_result_fct:
await update_result_fct(result)
logger.info(
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
@ -2053,28 +2038,16 @@ class RoomShutdownHandler:
require_consent=False,
)
shutdown_response["kicked_users"].append(user_id)
await self.store.upsert_room_to_delete(
room_id,
delete_id,
DeleteStatus.ACTION_SHUTDOWN,
DeleteStatus.STATUS_SHUTTING_DOWN,
params=json.dumps(shutdown_params),
response=json.dumps(shutdown_response),
)
result["kicked_users"].append(user_id)
if update_result_fct:
await update_result_fct(result)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id
)
shutdown_response["failed_to_kick_users"].append(user_id)
await self.store.upsert_room_to_delete(
room_id,
delete_id,
DeleteStatus.ACTION_SHUTDOWN,
DeleteStatus.STATUS_SHUTTING_DOWN,
params=json.dumps(shutdown_params),
response=json.dumps(shutdown_response),
)
result["failed_to_kick_users"].append(user_id)
if update_result_fct:
await update_result_fct(result)
# Send message in new room and move aliases
if new_room_user_id:
@ -2093,7 +2066,7 @@ class RoomShutdownHandler:
ratelimit=False,
)
shutdown_response["local_aliases"] = list(
result["local_aliases"] = list(
await self.store.get_aliases_for_room(room_id)
)
@ -2102,6 +2075,6 @@ class RoomShutdownHandler:
room_id, new_room_id, requester_user_id
)
else:
shutdown_response["local_aliases"] = []
result["local_aliases"] = []
return shutdown_response
return result

View File

@ -38,7 +38,6 @@ from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.room import DeleteStatus
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
@ -57,7 +56,6 @@ from synapse.types import (
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_left_room
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -96,6 +94,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
self.task_scheduler = hs.get_task_scheduler()
self.member_linearizer: Linearizer = Linearizer(name="member")
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
@ -318,12 +317,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
and self._purge_retention_period
and await self.store.is_locally_forgotten_room(room_id)
):
delete_id = random_string(16)
await self.store.upsert_room_to_delete(
room_id,
delete_id,
DeleteStatus.ACTION_PURGE,
DeleteStatus.STATUS_SCHEDULED,
await self.task_scheduler.schedule_task(
"purge_room",
resource_id=room_id,
timestamp=self.clock.time_msec() + self._purge_retention_period,
)

View File

@ -93,7 +93,7 @@ from synapse.rest.admin.users import (
UserTokenRestServlet,
WhoisRestServlet,
)
from synapse.types import JsonDict, RoomStreamToken
from synapse.types import JsonDict, RoomStreamToken, TaskStatus
from synapse.util import SYNAPSE_VERSION
if TYPE_CHECKING:
@ -215,12 +215,21 @@ class PurgeHistoryStatusRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
purge_status = await self.pagination_handler.get_delete_status(purge_id)
if purge_status is None:
purge_task = await self.pagination_handler.get_delete_task(purge_id)
if purge_task is None or purge_task.action != "purge_history":
raise NotFoundError("purge id '%s' not found" % purge_id)
result = {
"status": purge_task.status
if purge_task.status == TaskStatus.COMPLETE
or purge_task.status == TaskStatus.FAILED
else "active",
}
if purge_task.error:
result["error"] = purge_task.error
# TODO active vs purging etc
return HTTPStatus.OK, purge_status.asdict(use_purge_history_format=True)
return HTTPStatus.OK, result
########################################################################################

View File

@ -19,7 +19,6 @@ from urllib import parse as urlparse
from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.filtering import Filter
from synapse.handlers.room import DeleteStatus
from synapse.http.servlet import (
ResolveRoomIdMixin,
RestServlet,
@ -37,10 +36,16 @@ from synapse.rest.admin._base import (
)
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, UserID, create_requester
from synapse.types import (
JsonDict,
RoomID,
ScheduledTask,
TaskStatus,
UserID,
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util import json_decoder
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.api.auth import Auth
@ -119,7 +124,7 @@ class RoomRestV2Servlet(RestServlet):
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
delete_id = self._pagination_handler.start_shutdown_and_purge_room(
delete_id = await self._pagination_handler.start_shutdown_and_purge_room(
room_id=room_id,
shutdown_params={
"new_room_user_id": content.get("new_room_user_id"),
@ -135,6 +140,14 @@ class RoomRestV2Servlet(RestServlet):
return HTTPStatus.OK, {"delete_id": delete_id}
def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict:
return {
"delete_id": task.id,
"status": task.status,
"shutdown_room": task.result,
}
class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
"""Get the status of the delete room background task."""
@ -154,16 +167,14 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
)
delete_statuses = await self._pagination_handler.get_delete_statuses_by_room(
room_id
)
delete_tasks = await self._pagination_handler.get_delete_tasks_by_room(room_id)
response = []
for delete_status in delete_statuses:
for delete_task in delete_tasks:
# We ignore scheduled deletes because currently they are only used
# for automatically purging forgotten room after X time.
if delete_status.status != DeleteStatus.STATUS_SCHEDULED:
response += [delete_status.asdict()]
if delete_task.status != TaskStatus.SCHEDULED:
response += [_convert_delete_task_to_response(delete_task)]
if response:
return HTTPStatus.OK, {"results": cast(JsonDict, response)}
@ -185,11 +196,14 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
delete_status = await self._pagination_handler.get_delete_status(delete_id)
if delete_status is None:
delete_task = await self._pagination_handler.get_delete_task(delete_id)
if delete_task is None or (
delete_task.action != "purge_room"
and delete_task.action != "shutdown_and_purge_room"
):
raise NotFoundError("delete id '%s' not found" % delete_id)
return HTTPStatus.OK, cast(JsonDict, delete_status.asdict())
return HTTPStatus.OK, _convert_delete_task_to_response(delete_task)
class ListRoomRestServlet(RestServlet):
@ -351,12 +365,9 @@ class RoomRestServlet(RestServlet):
Codes.BAD_JSON,
)
delete_id = random_string(16)
ret = await room_shutdown_handler.shutdown_room(
room_id=room_id,
delete_id=delete_id,
shutdown_params={
params={
"new_room_user_id": content.get("new_room_user_id"),
"new_room_name": content.get("room_name"),
"message": content.get("message"),
@ -370,9 +381,7 @@ class RoomRestServlet(RestServlet):
# Purge room
if purge:
try:
await pagination_handler.purge_room(
room_id, delete_id, force=force_purge
)
await pagination_handler.purge_room(room_id, force=force_purge)
except NotFoundError:
if block:
# We can block unknown rooms with this endpoint, in which case

View File

@ -17,7 +17,6 @@ from itertools import chain
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Collection,
Dict,
FrozenSet,
@ -1284,113 +1283,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
# If any rows still exist it means someone has not forgotten this room yet
return not rows[0][0]
async def upsert_room_to_delete(
self,
room_id: str,
delete_id: str,
action: str,
status: str,
timestamp: Optional[int] = None,
params: Optional[str] = None,
response: Optional[str] = None,
error: Optional[str] = None,
) -> None:
"""Insert or update a room to shutdown/purge.
Args:
room_id: The room ID to shutdown/purge
delete_id: The delete ID identifying this action
action: the type of job, mainly `shutdown` `purge` or `purge_history`
status: Current status of the delete. Cf `DeleteStatus` for possible values
timestamp: Time of the last update. If status is `wait_purge`,
then it specifies when to do the purge, with an empty value specifying ASAP
error: Error message to return, if any
params: JSON representation of delete job parameters
response: JSON representation of delete current status
"""
await self.db_pool.simple_upsert(
"rooms_to_delete",
{
"room_id": room_id,
"delete_id": delete_id,
},
{
"action": action,
"status": status,
"timestamp": timestamp,
"params": params,
"response": response,
"error": error,
},
desc="upsert_room_to_delete",
)
async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None:
"""Remove a room from the list of rooms to purge.
Args:
room_id: The room ID matching the delete to remove
delete_id: The delete ID identifying the delete to remove
"""
await self.db_pool.simple_delete(
"rooms_to_delete",
keyvalues={
"room_id": room_id,
"delete_id": delete_id,
},
desc="delete_room_to_delete",
)
async def get_rooms_to_delete(
self, room_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Returns all delete jobs. This includes those that have been
interrupted by a stop/restart of synapse, but also scheduled ones
like locally forgotten rooms.
Args:
room_id: if specified, will only return the delete jobs for a specific room
"""
keyvalues = {}
if room_id is not None:
keyvalues["room_id"] = room_id
return await self.db_pool.simple_select_list(
table="rooms_to_delete",
keyvalues=keyvalues,
retcols=(
"room_id",
"delete_id",
"action",
"status",
"timestamp",
"params",
"response",
"error",
),
desc="rooms_to_delete_fetch",
)
async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]:
"""Return the delete job identified by delete_id."""
return await self.db_pool.simple_select_one(
table="rooms_to_delete",
keyvalues={"delete_id": delete_id},
retcols=(
"room_id",
"delete_id",
"action",
"status",
"timestamp",
"params",
"response",
"error",
),
desc="rooms_to_delete_fetch",
)
async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
"""Get all rooms that the user has ever been in.

View File

@ -1,27 +0,0 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- cf upsert_room_to_delete docstring for the meaning of the fields.
CREATE TABLE IF NOT EXISTS rooms_to_delete(
room_id text NOT NULL,
delete_id text NOT NULL,
action text NOT NULL,
status text NOT NULL,
timestamp bigint,
params text,
response text,
error text,
UNIQUE(room_id, delete_id)
);

View File

@ -24,12 +24,11 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership, RoomTypes
from synapse.api.errors import Codes
from synapse.handlers.pagination import DeleteStatus, PaginationHandler
from synapse.rest.client import directory, events, login, room
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from synapse.util.stringutils import random_string
from synapse.util.task_scheduler import TaskScheduler
from tests import unittest
@ -50,6 +49,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
self.task_scheduler = hs.get_task_scheduler()
hs.config.consent.user_consent_version = "1"
consent_uri_builder = Mock()
@ -480,6 +480,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
self.task_scheduler = hs.get_task_scheduler()
hs.config.consent.user_consent_version = "1"
consent_uri_builder = Mock()
@ -668,7 +669,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
delete_id1 = channel.json_body["delete_id"]
# go ahead
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
# second task
channel = self.make_request(
@ -700,7 +701,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
# get status after more than clearing time for first task
# second task is not cleared
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
channel = self.make_request(
"GET",
@ -714,7 +715,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(delete_id2, channel.json_body["results"][0]["delete_id"])
# get status after more than clearing time for all tasks
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
channel = self.make_request(
"GET",
@ -725,48 +726,6 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_delete_same_room_twice(self) -> None:
"""Test that the call for delete a room at second time gives an exception."""
body = {"new_room_user_id": self.admin_user}
# first call to delete room
# and do not wait for finish the task
first_channel = self.make_request(
"DELETE",
self.url.encode("ascii"),
content=body,
access_token=self.admin_user_tok,
await_result=False,
)
# second call to delete room
second_channel = self.make_request(
"DELETE",
self.url.encode("ascii"),
content=body,
access_token=self.admin_user_tok,
)
self.assertEqual(400, second_channel.code, msg=second_channel.json_body)
self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"])
self.assertEqual(
f"Purge already in progress for {self.room_id}",
second_channel.json_body["error"],
)
# get result of first call
first_channel.await_result()
self.assertEqual(200, first_channel.code, msg=first_channel.json_body)
self.assertIn("delete_id", first_channel.json_body)
# check status after finish the task
self._test_result(
first_channel.json_body["delete_id"],
self.other_user,
expect_new_room=True,
)
def test_purge_room_and_block(self) -> None:
"""Test to purge a room and block it.
Members will not be moved to a new room and will not receive a message.
@ -1005,7 +964,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self._is_purged(room_id)
def test_resume_purge_room(self) -> None:
def test_scheduled_purge_room(self) -> None:
# Create a test room
room_id = self.helper.create_room_as(
self.admin_user,
@ -1013,12 +972,12 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
)
self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
# Schedule a purge 10 seconds in the future
self.get_success(
self.store.upsert_room_to_delete(
room_id,
random_string(16),
DeleteStatus.ACTION_PURGE,
DeleteStatus.STATUS_PURGING,
self.task_scheduler.schedule_task(
"purge_room",
resource_id=room_id,
timestamp=self.clock.time_msec() + 10 * 1000,
)
)
@ -1026,38 +985,34 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
with self.assertRaises(AssertionError):
self._is_purged(room_id)
# Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that
# the automatic purging takes place and resumes the purge
# Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
# the automatic purging takes place and launch the purge
self.reactor.advance(ONE_HOUR_IN_S)
self._is_purged(room_id)
def test_resume_shutdown_room(self) -> None:
def test_schedule_shutdown_room(self) -> None:
# Create a test room
room_id = self.helper.create_room_as(
self.other_user,
tok=self.other_user_tok,
)
delete_id = random_string(16)
self.get_success(
self.store.upsert_room_to_delete(
room_id,
delete_id,
DeleteStatus.ACTION_SHUTDOWN,
DeleteStatus.STATUS_SHUTTING_DOWN,
params=json.dumps(
{
"requester_user_id": self.admin_user,
"new_room_user_id": self.admin_user,
"new_room_name": None,
"message": None,
"block": False,
"purge": True,
"force_purge": True,
}
),
# Schedule a shutdown 10 seconds in the future
delete_id = self.get_success(
self.task_scheduler.schedule_task(
"shutdown_and_purge_room",
resource_id=room_id,
params={
"requester_user_id": self.admin_user,
"new_room_user_id": self.admin_user,
"new_room_name": None,
"message": None,
"block": False,
"purge": True,
"force_purge": True,
},
timestamp=self.clock.time_msec() + 10 * 1000,
)
)
@ -1068,7 +1023,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
with self.assertRaises(AssertionError):
self._is_purged(room_id)
# Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that
# Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
# the automatic purging takes place and resumes the purge
self.reactor.advance(ONE_HOUR_IN_S)
@ -2081,14 +2036,11 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
purge_id = random_string(16)
self.get_success(
pagination_handler._purge_history(
purge_id=purge_id,
pagination_handler.purge_history(
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
update_rooms_to_delete_table=True,
)
)

View File

@ -414,13 +414,12 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.assertEqual(messages[0]["content"]["body"], "test msg one")
self.assertEqual(messages[0]["sender"], "@notices:test")
delete_id = random_string(16)
random_string(16)
# shut down and purge room
self.get_success(
self.room_shutdown_handler.shutdown_room(
first_room_id,
delete_id,
{
"requester_user_id": self.admin_user,
"new_room_user_id": None,
@ -432,7 +431,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
},
)
)
self.get_success(self.pagination_handler.purge_room(first_room_id, "delete_id"))
self.get_success(self.pagination_handler.purge_room(first_room_id, force=False))
# user is not member anymore
self._check_invite_and_join_status(self.other_user, 0, 0)

View File

@ -2088,14 +2088,11 @@ class RoomMessageListTestCase(RoomBase):
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
purge_id = random_string(16)
self.get_success(
pagination_handler._purge_history(
purge_id=purge_id,
pagination_handler.purge_history(
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
update_rooms_to_delete_table=True,
)
)