Add ReadWriteLock for pagination and history prune
This commit is contained in:
parent
7335f0adda
commit
8f8798bc0d
|
@ -26,7 +26,7 @@ from synapse.types import (
|
||||||
UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
|
UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
|
||||||
)
|
)
|
||||||
from synapse.util import unwrapFirstError
|
from synapse.util import unwrapFirstError
|
||||||
from synapse.util.async import concurrently_execute, run_on_reactor
|
from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
|
||||||
from synapse.util.caches.snapshot_cache import SnapshotCache
|
from synapse.util.caches.snapshot_cache import SnapshotCache
|
||||||
from synapse.util.logcontext import preserve_fn
|
from synapse.util.logcontext import preserve_fn
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
@ -50,6 +50,8 @@ class MessageHandler(BaseHandler):
|
||||||
self.validator = EventValidator()
|
self.validator = EventValidator()
|
||||||
self.snapshot_cache = SnapshotCache()
|
self.snapshot_cache = SnapshotCache()
|
||||||
|
|
||||||
|
self.pagination_lock = ReadWriteLock()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def purge_history(self, room_id, event_id):
|
def purge_history(self, room_id, event_id):
|
||||||
event = yield self.store.get_event(event_id)
|
event = yield self.store.get_event(event_id)
|
||||||
|
@ -59,9 +61,8 @@ class MessageHandler(BaseHandler):
|
||||||
|
|
||||||
depth = event.depth
|
depth = event.depth
|
||||||
|
|
||||||
# TODO: Lock.
|
with (yield self.pagination_lock.write(room_id)):
|
||||||
|
yield self.store.delete_old_state(room_id, depth)
|
||||||
yield self.store.delete_old_state(room_id, depth)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||||
|
@ -98,42 +99,43 @@ class MessageHandler(BaseHandler):
|
||||||
|
|
||||||
source_config = pagin_config.get_source_config("room")
|
source_config = pagin_config.get_source_config("room")
|
||||||
|
|
||||||
membership, member_event_id = yield self._check_in_room_or_world_readable(
|
with (yield self.pagination_lock.read(room_id)):
|
||||||
room_id, user_id
|
membership, member_event_id = yield self._check_in_room_or_world_readable(
|
||||||
)
|
room_id, user_id
|
||||||
|
|
||||||
if source_config.direction == 'b':
|
|
||||||
# if we're going backwards, we might need to backfill. This
|
|
||||||
# requires that we have a topo token.
|
|
||||||
if room_token.topological:
|
|
||||||
max_topo = room_token.topological
|
|
||||||
else:
|
|
||||||
max_topo = yield self.store.get_max_topological_token_for_stream_and_room(
|
|
||||||
room_id, room_token.stream
|
|
||||||
)
|
|
||||||
|
|
||||||
if membership == Membership.LEAVE:
|
|
||||||
# If they have left the room then clamp the token to be before
|
|
||||||
# they left the room, to save the effort of loading from the
|
|
||||||
# database.
|
|
||||||
leave_token = yield self.store.get_topological_token_for_event(
|
|
||||||
member_event_id
|
|
||||||
)
|
|
||||||
leave_token = RoomStreamToken.parse(leave_token)
|
|
||||||
if leave_token.topological < max_topo:
|
|
||||||
source_config.from_key = str(leave_token)
|
|
||||||
|
|
||||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
|
||||||
room_id, max_topo
|
|
||||||
)
|
)
|
||||||
|
|
||||||
events, next_key = yield data_source.get_pagination_rows(
|
if source_config.direction == 'b':
|
||||||
requester.user, source_config, room_id
|
# if we're going backwards, we might need to backfill. This
|
||||||
)
|
# requires that we have a topo token.
|
||||||
|
if room_token.topological:
|
||||||
|
max_topo = room_token.topological
|
||||||
|
else:
|
||||||
|
max_topo = yield self.store.get_max_topological_token(
|
||||||
|
room_id, room_token.stream
|
||||||
|
)
|
||||||
|
|
||||||
next_token = pagin_config.from_token.copy_and_replace(
|
if membership == Membership.LEAVE:
|
||||||
"room_key", next_key
|
# If they have left the room then clamp the token to be before
|
||||||
)
|
# they left the room, to save the effort of loading from the
|
||||||
|
# database.
|
||||||
|
leave_token = yield self.store.get_topological_token_for_event(
|
||||||
|
member_event_id
|
||||||
|
)
|
||||||
|
leave_token = RoomStreamToken.parse(leave_token)
|
||||||
|
if leave_token.topological < max_topo:
|
||||||
|
source_config.from_key = str(leave_token)
|
||||||
|
|
||||||
|
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||||
|
room_id, max_topo
|
||||||
|
)
|
||||||
|
|
||||||
|
events, next_key = yield data_source.get_pagination_rows(
|
||||||
|
requester.user, source_config, room_id
|
||||||
|
)
|
||||||
|
|
||||||
|
next_token = pagin_config.from_token.copy_and_replace(
|
||||||
|
"room_key", next_key
|
||||||
|
)
|
||||||
|
|
||||||
if not events:
|
if not events:
|
||||||
defer.returnValue({
|
defer.returnValue({
|
||||||
|
|
|
@ -487,13 +487,13 @@ class StreamStore(SQLBaseStore):
|
||||||
row["topological_ordering"], row["stream_ordering"],)
|
row["topological_ordering"], row["stream_ordering"],)
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
|
def get_max_topological_token(self, room_id, stream_key):
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT max(topological_ordering) FROM events"
|
"SELECT max(topological_ordering) FROM events"
|
||||||
" WHERE room_id = ? AND stream_ordering < ?"
|
" WHERE room_id = ? AND stream_ordering < ?"
|
||||||
)
|
)
|
||||||
return self._execute(
|
return self._execute(
|
||||||
"get_max_topological_token_for_stream_and_room", None,
|
"get_max_topological_token", None,
|
||||||
sql, room_id, stream_key,
|
sql, room_id, stream_key,
|
||||||
).addCallback(
|
).addCallback(
|
||||||
lambda r: r[0][0] if r else 0
|
lambda r: r[0][0] if r else 0
|
||||||
|
|
Loading…
Reference in New Issue