This commit is contained in:
Brendan Abolivier 2019-11-19 13:22:37 +00:00
parent f03c9d3444
commit 7c24d0f443
No known key found for this signature in database
GPG Key ID: 1E015C145F1916CD
4 changed files with 77 additions and 101 deletions

View File

@ -19,7 +19,7 @@ import logging
import os.path import os.path
import re import re
from textwrap import indent from textwrap import indent
from typing import List from typing import List, Dict, Optional
import attr import attr
import yaml import yaml
@ -287,13 +287,17 @@ class ServerConfig(Config):
self.retention_default_min_lifetime = None self.retention_default_min_lifetime = None
self.retention_default_max_lifetime = None self.retention_default_max_lifetime = None
self.retention_allowed_lifetime_min = retention_config.get("allowed_lifetime_min") self.retention_allowed_lifetime_min = retention_config.get(
"allowed_lifetime_min"
)
if self.retention_allowed_lifetime_min is not None: if self.retention_allowed_lifetime_min is not None:
self.retention_allowed_lifetime_min = self.parse_duration( self.retention_allowed_lifetime_min = self.parse_duration(
self.retention_allowed_lifetime_min self.retention_allowed_lifetime_min
) )
self.retention_allowed_lifetime_max = retention_config.get("allowed_lifetime_max") self.retention_allowed_lifetime_max = retention_config.get(
"allowed_lifetime_max"
)
if self.retention_allowed_lifetime_max is not None: if self.retention_allowed_lifetime_max is not None:
self.retention_allowed_lifetime_max = self.parse_duration( self.retention_allowed_lifetime_max = self.parse_duration(
self.retention_allowed_lifetime_max self.retention_allowed_lifetime_max
@ -302,14 +306,15 @@ class ServerConfig(Config):
if ( if (
self.retention_allowed_lifetime_min is not None self.retention_allowed_lifetime_min is not None
and self.retention_allowed_lifetime_max is not None and self.retention_allowed_lifetime_max is not None
and self.retention_allowed_lifetime_min > self.retention_allowed_lifetime_max and self.retention_allowed_lifetime_min
> self.retention_allowed_lifetime_max
): ):
raise ConfigError( raise ConfigError(
"Invalid retention policy limits: 'allowed_lifetime_min' can not be" "Invalid retention policy limits: 'allowed_lifetime_min' can not be"
" greater than 'allowed_lifetime_max'" " greater than 'allowed_lifetime_max'"
) )
self.retention_purge_jobs = [] self.retention_purge_jobs = [] # type: List[Dict[str, Optional[int]]]
for purge_job_config in retention_config.get("purge_jobs", []): for purge_job_config in retention_config.get("purge_jobs", []):
interval_config = purge_job_config.get("interval") interval_config = purge_job_config.get("interval")
@ -342,18 +347,22 @@ class ServerConfig(Config):
" 'longest_max_lifetime' value." " 'longest_max_lifetime' value."
) )
self.retention_purge_jobs.append({ self.retention_purge_jobs.append(
"interval": interval, {
"shortest_max_lifetime": shortest_max_lifetime, "interval": interval,
"longest_max_lifetime": longest_max_lifetime, "shortest_max_lifetime": shortest_max_lifetime,
}) "longest_max_lifetime": longest_max_lifetime,
}
)
if not self.retention_purge_jobs: if not self.retention_purge_jobs:
self.retention_purge_jobs = [{ self.retention_purge_jobs = [
"interval": self.parse_duration("1d"), {
"shortest_max_lifetime": None, "interval": self.parse_duration("1d"),
"longest_max_lifetime": None, "shortest_max_lifetime": None,
}] "longest_max_lifetime": None,
}
]
self.listeners = [] # type: List[dict] self.listeners = [] # type: List[dict]
for listener in config.get("listeners", []): for listener in config.get("listeners", []):

View File

@ -154,20 +154,17 @@ class PaginationHandler(object):
# Figure out what token we should start purging at. # Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime ts = self.clock.time_msec() - max_lifetime
stream_ordering = ( stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
yield self.store.find_first_stream_ordering_after_ts(ts)
)
r = ( r = yield self.store.get_room_event_after_stream_ordering(
yield self.store.get_room_event_after_stream_ordering( room_id, stream_ordering,
room_id, stream_ordering,
)
) )
if not r: if not r:
logger.warning( logger.warning(
"[purge] purging events not possible: No event found " "[purge] purging events not possible: No event found "
"(ts %i => stream_ordering %i)", "(ts %i => stream_ordering %i)",
ts, stream_ordering, ts,
stream_ordering,
) )
continue continue
@ -186,9 +183,7 @@ class PaginationHandler(object):
# the background so that it's not blocking any other operation apart from # the background so that it's not blocking any other operation apart from
# other purges in the same room. # other purges in the same room.
run_as_background_process( run_as_background_process(
"_purge_history", "_purge_history", self._purge_history, purge_id, room_id, token, True,
self._purge_history,
purge_id, room_id, token, True,
) )
def start_purge_history(self, room_id, token, delete_local_events=False): def start_purge_history(self, room_id, token, delete_local_events=False):

View File

@ -334,8 +334,9 @@ class RoomStore(RoomWorkerStore, SearchStore):
WHERE state.room_id > ? AND state.type = '%s' WHERE state.room_id > ? AND state.type = '%s'
ORDER BY state.room_id ASC ORDER BY state.room_id ASC
LIMIT ?; LIMIT ?;
""" % EventTypes.Retention, """
(last_room, batch_size) % EventTypes.Retention,
(last_room, batch_size),
) )
rows = self.cursor_to_dict(txn) rows = self.cursor_to_dict(txn)
@ -358,15 +359,13 @@ class RoomStore(RoomWorkerStore, SearchStore):
"event_id": row["event_id"], "event_id": row["event_id"],
"min_lifetime": retention_policy.get("min_lifetime"), "min_lifetime": retention_policy.get("min_lifetime"),
"max_lifetime": retention_policy.get("max_lifetime"), "max_lifetime": retention_policy.get("max_lifetime"),
} },
) )
logger.info("Inserted %d rows into room_retention", len(rows)) logger.info("Inserted %d rows into room_retention", len(rows))
self._background_update_progress_txn( self._background_update_progress_txn(
txn, "insert_room_retention", { txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
"room_id": rows[-1]["room_id"],
}
) )
if batch_size > len(rows): if batch_size > len(rows):
@ -375,8 +374,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
return False return False
end = yield self.runInteraction( end = yield self.runInteraction(
"insert_room_retention", "insert_room_retention", _background_insert_retention_txn,
_background_insert_retention_txn,
) )
if end: if end:
@ -585,17 +583,15 @@ class RoomStore(RoomWorkerStore, SearchStore):
) )
def _store_retention_policy_for_room_txn(self, txn, event): def _store_retention_policy_for_room_txn(self, txn, event):
if ( if hasattr(event, "content") and (
hasattr(event, "content") "min_lifetime" in event.content or "max_lifetime" in event.content
and ("min_lifetime" in event.content or "max_lifetime" in event.content)
): ):
if ( if (
("min_lifetime" in event.content and not isinstance( "min_lifetime" in event.content
event.content.get("min_lifetime"), integer_types and not isinstance(event.content.get("min_lifetime"), integer_types)
)) ) or (
or ("max_lifetime" in event.content and not isinstance( "max_lifetime" in event.content
event.content.get("max_lifetime"), integer_types and not isinstance(event.content.get("max_lifetime"), integer_types)
))
): ):
# Ignore the event if one of the value isn't an integer. # Ignore the event if one of the value isn't an integer.
return return
@ -798,7 +794,9 @@ class RoomStore(RoomWorkerStore, SearchStore):
return local_media_mxcs, remote_media_mxcs return local_media_mxcs, remote_media_mxcs
@defer.inlineCallbacks @defer.inlineCallbacks
def get_rooms_for_retention_period_in_range(self, min_ms, max_ms, include_null=False): def get_rooms_for_retention_period_in_range(
self, min_ms, max_ms, include_null=False
):
"""Retrieves all of the rooms within the given retention range. """Retrieves all of the rooms within the given retention range.
Optionally includes the rooms which don't have a retention policy. Optionally includes the rooms which don't have a retention policy.
@ -904,23 +902,24 @@ class RoomStore(RoomWorkerStore, SearchStore):
INNER JOIN current_state_events USING (event_id, room_id) INNER JOIN current_state_events USING (event_id, room_id)
WHERE room_id = ?; WHERE room_id = ?;
""", """,
(room_id,) (room_id,),
) )
return self.cursor_to_dict(txn) return self.cursor_to_dict(txn)
ret = yield self.runInteraction( ret = yield self.runInteraction(
"get_retention_policy_for_room", "get_retention_policy_for_room", get_retention_policy_for_room_txn,
get_retention_policy_for_room_txn,
) )
# If we don't know this room ID, ret will be None, in this case return the default # If we don't know this room ID, ret will be None, in this case return the default
# policy. # policy.
if not ret: if not ret:
defer.returnValue({ defer.returnValue(
"min_lifetime": self.config.retention_default_min_lifetime, {
"max_lifetime": self.config.retention_default_max_lifetime, "min_lifetime": self.config.retention_default_min_lifetime,
}) "max_lifetime": self.config.retention_default_max_lifetime,
}
)
row = ret[0] row = ret[0]

View File

@ -61,9 +61,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
self.helper.send_state( self.helper.send_state(
room_id=room_id, room_id=room_id,
event_type=EventTypes.Retention, event_type=EventTypes.Retention,
body={ body={"max_lifetime": one_day_ms * 4},
"max_lifetime": one_day_ms * 4,
},
tok=self.token, tok=self.token,
expect_code=400, expect_code=400,
) )
@ -71,9 +69,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
self.helper.send_state( self.helper.send_state(
room_id=room_id, room_id=room_id,
event_type=EventTypes.Retention, event_type=EventTypes.Retention,
body={ body={"max_lifetime": one_hour_ms},
"max_lifetime": one_hour_ms,
},
tok=self.token, tok=self.token,
expect_code=400, expect_code=400,
) )
@ -89,9 +85,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
self.helper.send_state( self.helper.send_state(
room_id=room_id, room_id=room_id,
event_type=EventTypes.Retention, event_type=EventTypes.Retention,
body={ body={"max_lifetime": lifetime},
"max_lifetime": lifetime,
},
tok=self.token, tok=self.token,
) )
@ -115,20 +109,12 @@ class RetentionTestCase(unittest.HomeserverTestCase):
events = [] events = []
# Send a first event, which should be filtered out at the end of the test. # Send a first event, which should be filtered out at the end of the test.
resp = self.helper.send( resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
room_id=room_id,
body="1",
tok=self.token,
)
# Get the event from the store so that we end up with a FrozenEvent that we can # Get the event from the store so that we end up with a FrozenEvent that we can
# give to filter_events_for_client. We need to do this now because the event won't # give to filter_events_for_client. We need to do this now because the event won't
# be in the database anymore after it has expired. # be in the database anymore after it has expired.
events.append(self.get_success( events.append(self.get_success(store.get_event(resp.get("event_id"))))
store.get_event(
resp.get("event_id")
)
))
# Advance the time by 2 days. We're using the default retention policy, therefore # Advance the time by 2 days. We're using the default retention policy, therefore
# after this the first event will still be valid. # after this the first event will still be valid.
@ -143,20 +129,16 @@ class RetentionTestCase(unittest.HomeserverTestCase):
valid_event_id = resp.get("event_id") valid_event_id = resp.get("event_id")
events.append(self.get_success( events.append(self.get_success(store.get_event(valid_event_id)))
store.get_event(
valid_event_id
)
))
# Advance the time by anothe 2 days. After this, the first event should be # Advance the time by anothe 2 days. After this, the first event should be
# outdated but not the second one. # outdated but not the second one.
self.reactor.advance(one_day_ms * 2 / 1000) self.reactor.advance(one_day_ms * 2 / 1000)
# Run filter_events_for_client with our list of FrozenEvents. # Run filter_events_for_client with our list of FrozenEvents.
filtered_events = self.get_success(filter_events_for_client( filtered_events = self.get_success(
storage, self.user_id, events filter_events_for_client(storage, self.user_id, events)
)) )
# We should only get one event back. # We should only get one event back.
self.assertEqual(len(filtered_events), 1, filtered_events) self.assertEqual(len(filtered_events), 1, filtered_events)
@ -172,28 +154,22 @@ class RetentionTestCase(unittest.HomeserverTestCase):
# Send a first event to the room. This is the event we'll want to be purged at the # Send a first event to the room. This is the event we'll want to be purged at the
# end of the test. # end of the test.
resp = self.helper.send( resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
room_id=room_id,
body="1",
tok=self.token,
)
expired_event_id = resp.get("event_id") expired_event_id = resp.get("event_id")
# Check that we can retrieve the event. # Check that we can retrieve the event.
expired_event = self.get_event(room_id, expired_event_id) expired_event = self.get_event(room_id, expired_event_id)
self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event) self.assertEqual(
expired_event.get("content", {}).get("body"), "1", expired_event
)
# Advance the time. # Advance the time.
self.reactor.advance(increment / 1000) self.reactor.advance(increment / 1000)
# Send another event. We need this because the purge job won't purge the most # Send another event. We need this because the purge job won't purge the most
# recent event in the room. # recent event in the room.
resp = self.helper.send( resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
room_id=room_id,
body="2",
tok=self.token,
)
valid_event_id = resp.get("event_id") valid_event_id = resp.get("event_id")
@ -240,8 +216,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
mock_federation_client = Mock(spec=["backfill"]) mock_federation_client = Mock(spec=["backfill"])
self.hs = self.setup_test_homeserver( self.hs = self.setup_test_homeserver(
config=config, config=config, federation_client=mock_federation_client,
federation_client=mock_federation_client,
) )
return self.hs return self.hs
@ -268,9 +243,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
self.helper.send_state( self.helper.send_state(
room_id=room_id, room_id=room_id,
event_type=EventTypes.Retention, event_type=EventTypes.Retention,
body={ body={"max_lifetime": one_day_ms * 35},
"max_lifetime": one_day_ms * 35,
},
tok=self.token, tok=self.token,
) )
@ -289,18 +262,16 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
# Check that we can retrieve the event. # Check that we can retrieve the event.
expired_event = self.get_event(room_id, first_event_id) expired_event = self.get_event(room_id, first_event_id)
self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event) self.assertEqual(
expired_event.get("content", {}).get("body"), "1", expired_event
)
# Advance the time by a month. # Advance the time by a month.
self.reactor.advance(one_day_ms * 30 / 1000) self.reactor.advance(one_day_ms * 30 / 1000)
# Send another event. We need this because the purge job won't purge the most # Send another event. We need this because the purge job won't purge the most
# recent event in the room. # recent event in the room.
resp = self.helper.send( resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
room_id=room_id,
body="2",
tok=self.token,
)
second_event_id = resp.get("event_id") second_event_id = resp.get("event_id")
@ -313,7 +284,9 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
) )
if expected_code_for_first_event == 200: if expected_code_for_first_event == 200:
self.assertEqual(first_event.get("content", {}).get("body"), "1", first_event) self.assertEqual(
first_event.get("content", {}).get("body"), "1", first_event
)
# Check that the event that hasn't been purged can still be retrieved. # Check that the event that hasn't been purged can still be retrieved.
second_event = self.get_event(room_id, second_event_id) second_event = self.get_event(room_id, second_event_id)