Merge pull request #2962 from matrix-org/rav/purge_history_txns
Add transactional API to history purge
This commit is contained in:
commit
d65ceb4b48
|
@ -32,3 +32,30 @@ specified by including an event_id in the URI, or by setting a
|
||||||
id is given, that event (and others at the same graph depth) will be retained.
|
id is given, that event (and others at the same graph depth) will be retained.
|
||||||
If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch,
|
If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch,
|
||||||
in milliseconds.
|
in milliseconds.
|
||||||
|
|
||||||
|
The API starts the purge running, and returns immediately with a JSON body with
|
||||||
|
a purge id:
|
||||||
|
|
||||||
|
.. code:: json
|
||||||
|
|
||||||
|
{
|
||||||
|
"purge_id": "<opaque id>"
|
||||||
|
}
|
||||||
|
|
||||||
|
Purge status query
|
||||||
|
------------------
|
||||||
|
|
||||||
|
It is possible to poll for updates on recent purges with a second API;
|
||||||
|
|
||||||
|
``GET /_matrix/client/r0/admin/purge_history_status/<purge_id>``
|
||||||
|
|
||||||
|
(again, with a suitable ``access_token``). This API returns a JSON body like
|
||||||
|
the following:
|
||||||
|
|
||||||
|
.. code:: json
|
||||||
|
|
||||||
|
{
|
||||||
|
"status": "active"
|
||||||
|
}
|
||||||
|
|
||||||
|
The status will be one of ``active``, ``complete``, or ``failed``.
|
||||||
|
|
|
@ -13,7 +13,8 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer, reactor
|
||||||
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||||
|
@ -24,9 +25,10 @@ from synapse.types import (
|
||||||
UserID, RoomAlias, RoomStreamToken,
|
UserID, RoomAlias, RoomStreamToken,
|
||||||
)
|
)
|
||||||
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
|
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
|
||||||
from synapse.util.logcontext import preserve_fn
|
from synapse.util.logcontext import preserve_fn, run_in_background
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.util.frozenutils import unfreeze
|
from synapse.util.frozenutils import unfreeze
|
||||||
|
from synapse.util.stringutils import random_string
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
from synapse.replication.http.send_event import send_event_to_master
|
from synapse.replication.http.send_event import send_event_to_master
|
||||||
|
|
||||||
|
@ -41,6 +43,36 @@ import ujson
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PurgeStatus(object):
|
||||||
|
"""Object tracking the status of a purge request
|
||||||
|
|
||||||
|
This class contains information on the progress of a purge request, for
|
||||||
|
return by get_purge_status.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
status (int): Tracks whether this request has completed. One of
|
||||||
|
STATUS_{ACTIVE,COMPLETE,FAILED}
|
||||||
|
"""
|
||||||
|
|
||||||
|
STATUS_ACTIVE = 0
|
||||||
|
STATUS_COMPLETE = 1
|
||||||
|
STATUS_FAILED = 2
|
||||||
|
|
||||||
|
STATUS_TEXT = {
|
||||||
|
STATUS_ACTIVE: "active",
|
||||||
|
STATUS_COMPLETE: "complete",
|
||||||
|
STATUS_FAILED: "failed",
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.status = PurgeStatus.STATUS_ACTIVE
|
||||||
|
|
||||||
|
def asdict(self):
|
||||||
|
return {
|
||||||
|
"status": PurgeStatus.STATUS_TEXT[self.status]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class MessageHandler(BaseHandler):
|
class MessageHandler(BaseHandler):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -50,14 +82,87 @@ class MessageHandler(BaseHandler):
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
self.pagination_lock = ReadWriteLock()
|
self.pagination_lock = ReadWriteLock()
|
||||||
|
self._purges_in_progress_by_room = set()
|
||||||
|
# map from purge id to PurgeStatus
|
||||||
|
self._purges_by_id = {}
|
||||||
|
|
||||||
|
def start_purge_history(self, room_id, topological_ordering,
|
||||||
|
delete_local_events=False):
|
||||||
|
"""Start off a history purge on a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str): The room to purge from
|
||||||
|
|
||||||
|
topological_ordering (int): minimum topo ordering to preserve
|
||||||
|
delete_local_events (bool): True to delete local events as well as
|
||||||
|
remote ones
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: unique ID for this purge transaction.
|
||||||
|
"""
|
||||||
|
if room_id in self._purges_in_progress_by_room:
|
||||||
|
raise SynapseError(
|
||||||
|
400,
|
||||||
|
"History purge already in progress for %s" % (room_id, ),
|
||||||
|
)
|
||||||
|
|
||||||
|
purge_id = random_string(16)
|
||||||
|
|
||||||
|
# we log the purge_id here so that it can be tied back to the
|
||||||
|
# request id in the log lines.
|
||||||
|
logger.info("[purge] starting purge_id %s", purge_id)
|
||||||
|
|
||||||
|
self._purges_by_id[purge_id] = PurgeStatus()
|
||||||
|
run_in_background(
|
||||||
|
self._purge_history,
|
||||||
|
purge_id, room_id, topological_ordering, delete_local_events,
|
||||||
|
)
|
||||||
|
return purge_id
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def purge_history(self, room_id, topological_ordering,
|
def _purge_history(self, purge_id, room_id, topological_ordering,
|
||||||
delete_local_events=False):
|
delete_local_events):
|
||||||
|
"""Carry out a history purge on a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
purge_id (str): The id for this purge
|
||||||
|
room_id (str): The room to purge from
|
||||||
|
topological_ordering (int): minimum topo ordering to preserve
|
||||||
|
delete_local_events (bool): True to delete local events as well as
|
||||||
|
remote ones
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred
|
||||||
|
"""
|
||||||
|
self._purges_in_progress_by_room.add(room_id)
|
||||||
|
try:
|
||||||
with (yield self.pagination_lock.write(room_id)):
|
with (yield self.pagination_lock.write(room_id)):
|
||||||
yield self.store.purge_history(
|
yield self.store.purge_history(
|
||||||
room_id, topological_ordering, delete_local_events,
|
room_id, topological_ordering, delete_local_events,
|
||||||
)
|
)
|
||||||
|
logger.info("[purge] complete")
|
||||||
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||||
|
except Exception:
|
||||||
|
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
||||||
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||||
|
finally:
|
||||||
|
self._purges_in_progress_by_room.discard(room_id)
|
||||||
|
|
||||||
|
# remove the purge from the list 24 hours after it completes
|
||||||
|
def clear_purge():
|
||||||
|
del self._purges_by_id[purge_id]
|
||||||
|
reactor.callLater(24 * 3600, clear_purge)
|
||||||
|
|
||||||
|
def get_purge_status(self, purge_id):
|
||||||
|
"""Get the current status of an active purge
|
||||||
|
|
||||||
|
Args:
|
||||||
|
purge_id (str): purge_id returned by start_purge_history
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
PurgeStatus|None
|
||||||
|
"""
|
||||||
|
return self._purges_by_id.get(purge_id)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import Membership
|
from synapse.api.constants import Membership
|
||||||
from synapse.api.errors import AuthError, SynapseError, Codes
|
from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError
|
||||||
from synapse.types import UserID, create_requester
|
from synapse.types import UserID, create_requester
|
||||||
from synapse.http.servlet import parse_json_object_from_request
|
from synapse.http.servlet import parse_json_object_from_request
|
||||||
|
|
||||||
|
@ -185,12 +185,43 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
|
||||||
errcode=Codes.BAD_JSON,
|
errcode=Codes.BAD_JSON,
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.handlers.message_handler.purge_history(
|
purge_id = yield self.handlers.message_handler.start_purge_history(
|
||||||
room_id, depth,
|
room_id, depth,
|
||||||
delete_local_events=delete_local_events,
|
delete_local_events=delete_local_events,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((200, {}))
|
defer.returnValue((200, {
|
||||||
|
"purge_id": purge_id,
|
||||||
|
}))
|
||||||
|
|
||||||
|
|
||||||
|
class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
|
||||||
|
PATTERNS = client_path_patterns(
|
||||||
|
"/admin/purge_history_status/(?P<purge_id>[^/]+)"
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
hs (synapse.server.HomeServer)
|
||||||
|
"""
|
||||||
|
super(PurgeHistoryStatusRestServlet, self).__init__(hs)
|
||||||
|
self.handlers = hs.get_handlers()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_GET(self, request, purge_id):
|
||||||
|
requester = yield self.auth.get_user_by_req(request)
|
||||||
|
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||||
|
|
||||||
|
if not is_admin:
|
||||||
|
raise AuthError(403, "You are not a server admin")
|
||||||
|
|
||||||
|
purge_status = self.handlers.message_handler.get_purge_status(purge_id)
|
||||||
|
if purge_status is None:
|
||||||
|
raise NotFoundError("purge id '%s' not found" % purge_id)
|
||||||
|
|
||||||
|
defer.returnValue((200, purge_status.asdict()))
|
||||||
|
|
||||||
|
|
||||||
class DeactivateAccountRestServlet(ClientV1RestServlet):
|
class DeactivateAccountRestServlet(ClientV1RestServlet):
|
||||||
|
@ -561,6 +592,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
|
||||||
def register_servlets(hs, http_server):
|
def register_servlets(hs, http_server):
|
||||||
WhoisRestServlet(hs).register(http_server)
|
WhoisRestServlet(hs).register(http_server)
|
||||||
PurgeMediaCacheRestServlet(hs).register(http_server)
|
PurgeMediaCacheRestServlet(hs).register(http_server)
|
||||||
|
PurgeHistoryStatusRestServlet(hs).register(http_server)
|
||||||
DeactivateAccountRestServlet(hs).register(http_server)
|
DeactivateAccountRestServlet(hs).register(http_server)
|
||||||
PurgeHistoryRestServlet(hs).register(http_server)
|
PurgeHistoryRestServlet(hs).register(http_server)
|
||||||
UsersRestServlet(hs).register(http_server)
|
UsersRestServlet(hs).register(http_server)
|
||||||
|
|
Loading…
Reference in New Issue