Fix clamp leave and disable backfill
This commit is contained in:
parent
c75f2dda46
commit
76879578ea
|
@ -211,29 +211,17 @@ class MessageHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
if source_config.direction == 'b':
|
if source_config.direction == 'b':
|
||||||
# if we're going backwards, we might need to backfill. This
|
|
||||||
# requires that we have a topo token.
|
|
||||||
if room_token.topological:
|
|
||||||
max_topo = room_token.topological
|
|
||||||
else:
|
|
||||||
max_topo = yield self.store.get_max_topological_token(
|
|
||||||
room_id, room_token.stream
|
|
||||||
)
|
|
||||||
|
|
||||||
if membership == Membership.LEAVE:
|
if membership == Membership.LEAVE:
|
||||||
# If they have left the room then clamp the token to be before
|
# If they have left the room then clamp the token to be before
|
||||||
# they left the room, to save the effort of loading from the
|
# they left the room, to save the effort of loading from the
|
||||||
# database.
|
# database.
|
||||||
leave_token = yield self.store.get_topological_token_for_event(
|
|
||||||
member_event_id
|
|
||||||
)
|
|
||||||
leave_token = RoomStreamToken.parse(leave_token)
|
|
||||||
if leave_token.topological < max_topo:
|
|
||||||
source_config.from_key = str(leave_token)
|
|
||||||
|
|
||||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
leave_token = yield self.store.get_topological_token_for_event(
|
||||||
room_id, max_topo
|
member_event_id,
|
||||||
)
|
)
|
||||||
|
source_config.from_key = yield self.store.clamp_token_before(
|
||||||
|
room_id, source_config.from_key, leave_token,
|
||||||
|
)
|
||||||
|
|
||||||
events, next_key, extremities = yield self.store.paginate_room_events(
|
events, next_key, extremities = yield self.store.paginate_room_events(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
|
|
|
@ -905,6 +905,48 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
defer.returnValue((events, token, extremities))
|
defer.returnValue((events, token, extremities))
|
||||||
|
|
||||||
|
def clamp_token_before(self, room_id, token, clamp_to):
|
||||||
|
token = RoomStreamToken.parse(token)
|
||||||
|
clamp_to = RoomStreamToken.parse(clamp_to)
|
||||||
|
|
||||||
|
def clamp_token_before_txn(txn, token):
|
||||||
|
if not token.topological:
|
||||||
|
sql = """
|
||||||
|
SELECT chunk_id, topological_ordering FROM events
|
||||||
|
WHERE room_id = ? AND stream_ordering <= ?
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (room_id, token.stream,))
|
||||||
|
row = txn.fetchone()
|
||||||
|
if not row:
|
||||||
|
return str(token)
|
||||||
|
|
||||||
|
chunk_id, topo = row
|
||||||
|
token = RoomStreamToken(chunk_id, topo, token.stream)
|
||||||
|
|
||||||
|
if token.chunk == clamp_to.chunk:
|
||||||
|
if token.topological < clamp_to.topological:
|
||||||
|
return str(token)
|
||||||
|
else:
|
||||||
|
return str(clamp_to)
|
||||||
|
|
||||||
|
sql = "SELECT rationale FROM chunk_linearized WHERE chunk_id = ?"
|
||||||
|
|
||||||
|
txn.execute(sql, (token.chunk,))
|
||||||
|
token_order, = txn.fetchone()
|
||||||
|
|
||||||
|
txn.execute(sql, (clamp_to.chunk,))
|
||||||
|
clamp_order, = txn.fetchone()
|
||||||
|
|
||||||
|
if token_order < clamp_order:
|
||||||
|
return str(token)
|
||||||
|
else:
|
||||||
|
return str(clamp_to)
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"clamp_token_before", clamp_token_before_txn, token
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class StreamStore(StreamWorkerStore):
|
class StreamStore(StreamWorkerStore):
|
||||||
def get_room_max_stream_ordering(self):
|
def get_room_max_stream_ordering(self):
|
||||||
|
|
Loading…
Reference in New Issue