Allow paginating backwards from stream token
This commit is contained in:
parent
ba8931829b
commit
4e7948b47a
|
@ -16,7 +16,7 @@
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.api.errors import SynapseError, AuthError, Codes
|
from synapse.api.errors import AuthError, Codes
|
||||||
from synapse.streams.config import PaginationConfig
|
from synapse.streams.config import PaginationConfig
|
||||||
from synapse.events.utils import serialize_event
|
from synapse.events.utils import serialize_event
|
||||||
from synapse.events.validator import EventValidator
|
from synapse.events.validator import EventValidator
|
||||||
|
@ -119,9 +119,12 @@ class MessageHandler(BaseHandler):
|
||||||
if source_config.direction == 'b':
|
if source_config.direction == 'b':
|
||||||
# if we're going backwards, we might need to backfill. This
|
# if we're going backwards, we might need to backfill. This
|
||||||
# requires that we have a topo token.
|
# requires that we have a topo token.
|
||||||
if room_token.topological is None:
|
if room_token.topological:
|
||||||
raise SynapseError(400, "Invalid token: cannot paginate "
|
max_topo = room_token.topological
|
||||||
"backwards from a stream token")
|
else:
|
||||||
|
max_topo = yield self.store.get_max_topological_token_for_stream_and_room(
|
||||||
|
room_id, room_token.stream
|
||||||
|
)
|
||||||
|
|
||||||
if membership == Membership.LEAVE:
|
if membership == Membership.LEAVE:
|
||||||
# If they have left the room then clamp the token to be before
|
# If they have left the room then clamp the token to be before
|
||||||
|
@ -131,11 +134,11 @@ class MessageHandler(BaseHandler):
|
||||||
member_event_id
|
member_event_id
|
||||||
)
|
)
|
||||||
leave_token = RoomStreamToken.parse(leave_token)
|
leave_token = RoomStreamToken.parse(leave_token)
|
||||||
if leave_token.topological < room_token.topological:
|
if leave_token.topological < max_topo:
|
||||||
source_config.from_key = str(leave_token)
|
source_config.from_key = str(leave_token)
|
||||||
|
|
||||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||||
room_id, room_token.topological
|
room_id, max_topo
|
||||||
)
|
)
|
||||||
|
|
||||||
events, next_key = yield data_source.get_pagination_rows(
|
events, next_key = yield data_source.get_pagination_rows(
|
||||||
|
|
|
@ -234,10 +234,10 @@ class StreamStore(SQLBaseStore):
|
||||||
get_prev_content=True
|
get_prev_content=True
|
||||||
)
|
)
|
||||||
|
|
||||||
ret.reverse()
|
|
||||||
|
|
||||||
self._set_before_and_after(ret, rows, topo_order=False)
|
self._set_before_and_after(ret, rows, topo_order=False)
|
||||||
|
|
||||||
|
ret.reverse()
|
||||||
|
|
||||||
if rows:
|
if rows:
|
||||||
key = "s%d" % min(r["stream_ordering"] for r in rows)
|
key = "s%d" % min(r["stream_ordering"] for r in rows)
|
||||||
else:
|
else:
|
||||||
|
@ -570,6 +570,18 @@ class StreamStore(SQLBaseStore):
|
||||||
row["topological_ordering"], row["stream_ordering"],)
|
row["topological_ordering"], row["stream_ordering"],)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
|
||||||
|
sql = (
|
||||||
|
"SELECT max(topological_ordering) FROM events"
|
||||||
|
" WHERE room_id = ? AND stream_ordering < ?"
|
||||||
|
)
|
||||||
|
return self._execute(
|
||||||
|
"get_max_topological_token_for_stream_and_room", None,
|
||||||
|
sql, room_id, stream_key,
|
||||||
|
).addCallback(
|
||||||
|
lambda r: r[0][0] if r else 0
|
||||||
|
)
|
||||||
|
|
||||||
def _get_max_topological_txn(self, txn):
|
def _get_max_topological_txn(self, txn):
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"SELECT MAX(topological_ordering) FROM events"
|
"SELECT MAX(topological_ordering) FROM events"
|
||||||
|
|
|
@ -1044,13 +1044,6 @@ class RoomMessageListTestCase(RestTestCase):
|
||||||
self.assertTrue("chunk" in response)
|
self.assertTrue("chunk" in response)
|
||||||
self.assertTrue("end" in response)
|
self.assertTrue("end" in response)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_stream_token_is_rejected_for_back_pagination(self):
|
|
||||||
(code, response) = yield self.mock_resource.trigger_get(
|
|
||||||
"/rooms/%s/messages?access_token=x&from=s0_0_0_0_0&dir=b" %
|
|
||||||
self.room_id)
|
|
||||||
self.assertEquals(400, code)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_stream_token_is_accepted_for_fwd_pagianation(self):
|
def test_stream_token_is_accepted_for_fwd_pagianation(self):
|
||||||
token = "s0_0_0_0_0"
|
token = "s0_0_0_0_0"
|
||||||
|
|
Loading…
Reference in New Issue