Sliding Sync: Fix state leaking on incremental sync
This commit is contained in:
parent
f3fd6852ac
commit
4daa533e82
|
@ -39,6 +39,7 @@ from synapse.logging.opentracing import (
|
||||||
trace,
|
trace,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
||||||
|
from synapse.storage.databases.main.state_deltas import StateDelta
|
||||||
from synapse.storage.databases.main.stream import PaginateFunction
|
from synapse.storage.databases.main.stream import PaginateFunction
|
||||||
from synapse.storage.roommember import (
|
from synapse.storage.roommember import (
|
||||||
MemberSummary,
|
MemberSummary,
|
||||||
|
@ -48,6 +49,7 @@ from synapse.types import (
|
||||||
MutableStateMap,
|
MutableStateMap,
|
||||||
PersistedEventPosition,
|
PersistedEventPosition,
|
||||||
Requester,
|
Requester,
|
||||||
|
RoomStreamToken,
|
||||||
SlidingSyncStreamToken,
|
SlidingSyncStreamToken,
|
||||||
StateMap,
|
StateMap,
|
||||||
StrCollection,
|
StrCollection,
|
||||||
|
@ -470,6 +472,64 @@ class SlidingSyncHandler:
|
||||||
|
|
||||||
return state_map
|
return state_map
|
||||||
|
|
||||||
|
@trace
|
||||||
|
async def get_current_state_deltas_for_room(
|
||||||
|
self,
|
||||||
|
room_id: str,
|
||||||
|
room_membership_for_user_at_to_token: RoomsForUserType,
|
||||||
|
from_token: RoomStreamToken,
|
||||||
|
to_token: RoomStreamToken,
|
||||||
|
) -> List[StateDelta]:
|
||||||
|
"""
|
||||||
|
Get the state deltas between two tokens taking into account the user's
|
||||||
|
membership. If the user is LEAVE/BAN, we will only get the state deltas up to
|
||||||
|
their LEAVE/BAN event (inclusive).
|
||||||
|
|
||||||
|
(> `from_token` and <= `to_token`)
|
||||||
|
"""
|
||||||
|
membership = room_membership_for_user_at_to_token.membership
|
||||||
|
# We don't know how to handle `membership` values other than these. The
|
||||||
|
# code below would need to be updated.
|
||||||
|
assert membership in (
|
||||||
|
Membership.JOIN,
|
||||||
|
Membership.INVITE,
|
||||||
|
Membership.KNOCK,
|
||||||
|
Membership.LEAVE,
|
||||||
|
Membership.BAN,
|
||||||
|
)
|
||||||
|
|
||||||
|
# People shouldn't see past their leave/ban event
|
||||||
|
if membership in (
|
||||||
|
Membership.LEAVE,
|
||||||
|
Membership.BAN,
|
||||||
|
):
|
||||||
|
to_bound = (
|
||||||
|
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
|
||||||
|
)
|
||||||
|
# If we are participating in the room, we can get the latest current state in
|
||||||
|
# the room
|
||||||
|
elif membership == Membership.JOIN:
|
||||||
|
to_bound = to_token
|
||||||
|
# We can only rely on the stripped state included in the invite/knock event
|
||||||
|
# itself so there will never be any state deltas to send down.
|
||||||
|
elif membership in (Membership.INVITE, Membership.KNOCK):
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
# We don't know how to handle this type of membership yet
|
||||||
|
#
|
||||||
|
# FIXME: We should use `assert_never` here but for some reason
|
||||||
|
# the exhaustive matching doesn't recognize the `Never` here.
|
||||||
|
# assert_never(membership)
|
||||||
|
raise AssertionError(
|
||||||
|
f"Unexpected membership {membership} that we don't know how to handle yet"
|
||||||
|
)
|
||||||
|
|
||||||
|
return await self.store.get_current_state_deltas_for_room(
|
||||||
|
room_id=room_id,
|
||||||
|
from_token=from_token,
|
||||||
|
to_token=to_bound,
|
||||||
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def get_room_sync_data(
|
async def get_room_sync_data(
|
||||||
self,
|
self,
|
||||||
|
@ -790,8 +850,9 @@ class SlidingSyncHandler:
|
||||||
# TODO: Limit the number of state events we're about to send down
|
# TODO: Limit the number of state events we're about to send down
|
||||||
# the room, if its too many we should change this to an
|
# the room, if its too many we should change this to an
|
||||||
# `initial=True`?
|
# `initial=True`?
|
||||||
deltas = await self.store.get_current_state_deltas_for_room(
|
deltas = await self.get_current_state_deltas_for_room(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
|
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
|
||||||
from_token=from_bound,
|
from_token=from_bound,
|
||||||
to_token=to_token.room_key,
|
to_token=to_token.room_key,
|
||||||
)
|
)
|
||||||
|
|
|
@ -243,6 +243,13 @@ class StateDeltasStore(SQLBaseStore):
|
||||||
|
|
||||||
(> `from_token` and <= `to_token`)
|
(> `from_token` and <= `to_token`)
|
||||||
"""
|
"""
|
||||||
|
# We can bail early if the `from_token` is after the `to_token`
|
||||||
|
if (
|
||||||
|
to_token is not None
|
||||||
|
and from_token is not None
|
||||||
|
and to_token.is_before_or_eq(from_token)
|
||||||
|
):
|
||||||
|
return []
|
||||||
|
|
||||||
if (
|
if (
|
||||||
from_token is not None
|
from_token is not None
|
||||||
|
|
|
@ -751,9 +751,10 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||||
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
|
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
|
||||||
|
|
||||||
@parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)])
|
@parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)])
|
||||||
def test_rooms_required_state_leave_ban(self, stop_membership: str) -> None:
|
def test_rooms_required_state_leave_ban_initial(self, stop_membership: str) -> None:
|
||||||
"""
|
"""
|
||||||
Test `rooms.required_state` should not return state past a leave/ban event.
|
Test `rooms.required_state` should not return state past a leave/ban event when
|
||||||
|
it's the first "initial" time the room is being sent down the connection.
|
||||||
"""
|
"""
|
||||||
user1_id = self.register_user("user1", "pass")
|
user1_id = self.register_user("user1", "pass")
|
||||||
user1_tok = self.login(user1_id, "pass")
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
@ -788,6 +789,13 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||||
body={"foo": "bar"},
|
body={"foo": "bar"},
|
||||||
tok=user2_tok,
|
tok=user2_tok,
|
||||||
)
|
)
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id1,
|
||||||
|
event_type="org.matrix.bar_state",
|
||||||
|
state_key="",
|
||||||
|
body={"bar": "bar"},
|
||||||
|
tok=user2_tok,
|
||||||
|
)
|
||||||
|
|
||||||
if stop_membership == Membership.LEAVE:
|
if stop_membership == Membership.LEAVE:
|
||||||
# User 1 leaves
|
# User 1 leaves
|
||||||
|
@ -796,6 +804,8 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||||
# User 1 is banned
|
# User 1 is banned
|
||||||
self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
|
self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
|
||||||
|
|
||||||
|
# Get the state_map before we change the state as this is the final state we
|
||||||
|
# expect User1 to be able to see
|
||||||
state_map = self.get_success(
|
state_map = self.get_success(
|
||||||
self.storage_controllers.state.get_current_state(room_id1)
|
self.storage_controllers.state.get_current_state(room_id1)
|
||||||
)
|
)
|
||||||
|
@ -808,12 +818,36 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||||
body={"foo": "qux"},
|
body={"foo": "qux"},
|
||||||
tok=user2_tok,
|
tok=user2_tok,
|
||||||
)
|
)
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id1,
|
||||||
|
event_type="org.matrix.bar_state",
|
||||||
|
state_key="",
|
||||||
|
body={"bar": "qux"},
|
||||||
|
tok=user2_tok,
|
||||||
|
)
|
||||||
self.helper.leave(room_id1, user3_id, tok=user3_tok)
|
self.helper.leave(room_id1, user3_id, tok=user3_tok)
|
||||||
|
|
||||||
# Make an incremental Sliding Sync request
|
# Make an incremental Sliding Sync request
|
||||||
|
#
|
||||||
|
# Also expand the required state to include the `org.matrix.bar_state` event.
|
||||||
|
# This is just an extra complication of the test.
|
||||||
|
sync_body = {
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"ranges": [[0, 1]],
|
||||||
|
"required_state": [
|
||||||
|
[EventTypes.Create, ""],
|
||||||
|
[EventTypes.Member, "*"],
|
||||||
|
["org.matrix.foo_state", ""],
|
||||||
|
["org.matrix.bar_state", ""],
|
||||||
|
],
|
||||||
|
"timeline_limit": 3,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||||
|
|
||||||
# Only user2 and user3 sent events in the 3 events we see in the `timeline`
|
# We should only see the state up to the leave/ban event
|
||||||
self._assertRequiredStateIncludes(
|
self._assertRequiredStateIncludes(
|
||||||
response_body["rooms"][room_id1]["required_state"],
|
response_body["rooms"][room_id1]["required_state"],
|
||||||
{
|
{
|
||||||
|
@ -822,6 +856,126 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||||
state_map[(EventTypes.Member, user2_id)],
|
state_map[(EventTypes.Member, user2_id)],
|
||||||
state_map[(EventTypes.Member, user3_id)],
|
state_map[(EventTypes.Member, user3_id)],
|
||||||
state_map[("org.matrix.foo_state", "")],
|
state_map[("org.matrix.foo_state", "")],
|
||||||
|
state_map[("org.matrix.bar_state", "")],
|
||||||
|
},
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
|
||||||
|
|
||||||
|
@parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)])
|
||||||
|
def test_rooms_required_state_leave_ban_incremental(
|
||||||
|
self, stop_membership: str
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Test `rooms.required_state` should not return state past a leave/ban event on
|
||||||
|
incremental sync.
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
user2_id = self.register_user("user2", "pass")
|
||||||
|
user2_tok = self.login(user2_id, "pass")
|
||||||
|
user3_id = self.register_user("user3", "pass")
|
||||||
|
user3_tok = self.login(user3_id, "pass")
|
||||||
|
|
||||||
|
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||||
|
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||||
|
self.helper.join(room_id1, user3_id, tok=user3_tok)
|
||||||
|
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id1,
|
||||||
|
event_type="org.matrix.foo_state",
|
||||||
|
state_key="",
|
||||||
|
body={"foo": "bar"},
|
||||||
|
tok=user2_tok,
|
||||||
|
)
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id1,
|
||||||
|
event_type="org.matrix.bar_state",
|
||||||
|
state_key="",
|
||||||
|
body={"bar": "bar"},
|
||||||
|
tok=user2_tok,
|
||||||
|
)
|
||||||
|
|
||||||
|
sync_body = {
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"ranges": [[0, 1]],
|
||||||
|
"required_state": [
|
||||||
|
[EventTypes.Create, ""],
|
||||||
|
[EventTypes.Member, "*"],
|
||||||
|
["org.matrix.foo_state", ""],
|
||||||
|
],
|
||||||
|
"timeline_limit": 3,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||||
|
|
||||||
|
if stop_membership == Membership.LEAVE:
|
||||||
|
# User 1 leaves
|
||||||
|
self.helper.leave(room_id1, user1_id, tok=user1_tok)
|
||||||
|
elif stop_membership == Membership.BAN:
|
||||||
|
# User 1 is banned
|
||||||
|
self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
|
||||||
|
|
||||||
|
# Get the state_map before we change the state as this is the final state we
|
||||||
|
# expect User1 to be able to see
|
||||||
|
state_map = self.get_success(
|
||||||
|
self.storage_controllers.state.get_current_state(room_id1)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Change the state after user 1 leaves
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id1,
|
||||||
|
event_type="org.matrix.foo_state",
|
||||||
|
state_key="",
|
||||||
|
body={"foo": "qux"},
|
||||||
|
tok=user2_tok,
|
||||||
|
)
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id1,
|
||||||
|
event_type="org.matrix.bar_state",
|
||||||
|
state_key="",
|
||||||
|
body={"bar": "qux"},
|
||||||
|
tok=user2_tok,
|
||||||
|
)
|
||||||
|
self.helper.leave(room_id1, user3_id, tok=user3_tok)
|
||||||
|
|
||||||
|
# Make an incremental Sliding Sync request
|
||||||
|
#
|
||||||
|
# Also expand the required state to include the `org.matrix.bar_state` event.
|
||||||
|
# This is just an extra complication of the test.
|
||||||
|
sync_body = {
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"ranges": [[0, 1]],
|
||||||
|
"required_state": [
|
||||||
|
[EventTypes.Create, ""],
|
||||||
|
[EventTypes.Member, "*"],
|
||||||
|
["org.matrix.foo_state", ""],
|
||||||
|
["org.matrix.bar_state", ""],
|
||||||
|
],
|
||||||
|
"timeline_limit": 3,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||||
|
|
||||||
|
# User1 should only see the state up to the leave/ban event
|
||||||
|
self._assertRequiredStateIncludes(
|
||||||
|
response_body["rooms"][room_id1]["required_state"],
|
||||||
|
{
|
||||||
|
# User1 should see their leave/ban membership
|
||||||
|
state_map[(EventTypes.Member, user1_id)],
|
||||||
|
state_map[("org.matrix.bar_state", "")],
|
||||||
|
# The commented out state events were already returned in the initial
|
||||||
|
# sync so we shouldn't see them again on the incremental sync. And we
|
||||||
|
# shouldn't see the state events that changed after the leave/ban event.
|
||||||
|
#
|
||||||
|
# state_map[(EventTypes.Create, "")],
|
||||||
|
# state_map[(EventTypes.Member, user2_id)],
|
||||||
|
# state_map[(EventTypes.Member, user3_id)],
|
||||||
|
# state_map[("org.matrix.foo_state", "")],
|
||||||
},
|
},
|
||||||
exact=True,
|
exact=True,
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue