Handle the case of remote users leaving a partial join room for device lists (#13885)
This commit is contained in:
parent
85e161631a
commit
e8318a4333
|
@ -0,0 +1 @@
|
||||||
|
Correctly handle a race with device lists when a remote user leaves during a partial join.
|
|
@ -53,9 +53,9 @@ logger = logging.getLogger("synapse.app.admin_cmd")
|
||||||
|
|
||||||
class AdminCmdSlavedStore(
|
class AdminCmdSlavedStore(
|
||||||
SlavedFilteringStore,
|
SlavedFilteringStore,
|
||||||
SlavedDeviceStore,
|
|
||||||
SlavedPushRuleStore,
|
SlavedPushRuleStore,
|
||||||
SlavedEventStore,
|
SlavedEventStore,
|
||||||
|
SlavedDeviceStore,
|
||||||
TagsWorkerStore,
|
TagsWorkerStore,
|
||||||
DeviceInboxWorkerStore,
|
DeviceInboxWorkerStore,
|
||||||
AccountDataWorkerStore,
|
AccountDataWorkerStore,
|
||||||
|
|
|
@ -598,11 +598,6 @@ class EventsPersistenceStorageController:
|
||||||
# room
|
# room
|
||||||
state_delta_for_room: Dict[str, DeltaState] = {}
|
state_delta_for_room: Dict[str, DeltaState] = {}
|
||||||
|
|
||||||
# Set of remote users which were in rooms the server has left or who may
|
|
||||||
# have left rooms the server is in. We should check if we still share any
|
|
||||||
# rooms and if not we mark their device lists as stale.
|
|
||||||
potentially_left_users: Set[str] = set()
|
|
||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||||
# Work out the new "current state" for each room.
|
# Work out the new "current state" for each room.
|
||||||
|
@ -716,8 +711,6 @@ class EventsPersistenceStorageController:
|
||||||
room_id,
|
room_id,
|
||||||
ev_ctx_rm,
|
ev_ctx_rm,
|
||||||
delta,
|
delta,
|
||||||
current_state,
|
|
||||||
potentially_left_users,
|
|
||||||
)
|
)
|
||||||
if not is_still_joined:
|
if not is_still_joined:
|
||||||
logger.info("Server no longer in room %s", room_id)
|
logger.info("Server no longer in room %s", room_id)
|
||||||
|
@ -725,20 +718,6 @@ class EventsPersistenceStorageController:
|
||||||
current_state = {}
|
current_state = {}
|
||||||
delta.no_longer_in_room = True
|
delta.no_longer_in_room = True
|
||||||
|
|
||||||
# Add all remote users that might have left rooms.
|
|
||||||
potentially_left_users.update(
|
|
||||||
user_id
|
|
||||||
for event_type, user_id in delta.to_delete
|
|
||||||
if event_type == EventTypes.Member
|
|
||||||
and not self.is_mine_id(user_id)
|
|
||||||
)
|
|
||||||
potentially_left_users.update(
|
|
||||||
user_id
|
|
||||||
for event_type, user_id in delta.to_insert.keys()
|
|
||||||
if event_type == EventTypes.Member
|
|
||||||
and not self.is_mine_id(user_id)
|
|
||||||
)
|
|
||||||
|
|
||||||
state_delta_for_room[room_id] = delta
|
state_delta_for_room[room_id] = delta
|
||||||
|
|
||||||
await self.persist_events_store._persist_events_and_state_updates(
|
await self.persist_events_store._persist_events_and_state_updates(
|
||||||
|
@ -749,8 +728,6 @@ class EventsPersistenceStorageController:
|
||||||
inhibit_local_membership_updates=backfilled,
|
inhibit_local_membership_updates=backfilled,
|
||||||
)
|
)
|
||||||
|
|
||||||
await self._handle_potentially_left_users(potentially_left_users)
|
|
||||||
|
|
||||||
return replaced_events
|
return replaced_events
|
||||||
|
|
||||||
async def _calculate_new_extremities(
|
async def _calculate_new_extremities(
|
||||||
|
@ -1126,8 +1103,6 @@ class EventsPersistenceStorageController:
|
||||||
room_id: str,
|
room_id: str,
|
||||||
ev_ctx_rm: List[Tuple[EventBase, EventContext]],
|
ev_ctx_rm: List[Tuple[EventBase, EventContext]],
|
||||||
delta: DeltaState,
|
delta: DeltaState,
|
||||||
current_state: Optional[StateMap[str]],
|
|
||||||
potentially_left_users: Set[str],
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Check if the server will still be joined after the given events have
|
"""Check if the server will still be joined after the given events have
|
||||||
been persised.
|
been persised.
|
||||||
|
@ -1137,11 +1112,6 @@ class EventsPersistenceStorageController:
|
||||||
ev_ctx_rm
|
ev_ctx_rm
|
||||||
delta: The delta of current state between what is in the database
|
delta: The delta of current state between what is in the database
|
||||||
and what the new current state will be.
|
and what the new current state will be.
|
||||||
current_state: The new current state if it already been calculated,
|
|
||||||
otherwise None.
|
|
||||||
potentially_left_users: If the server has left the room, then joined
|
|
||||||
remote users will be added to this set to indicate that the
|
|
||||||
server may no longer be sharing a room with them.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not any(
|
if not any(
|
||||||
|
@ -1195,45 +1165,4 @@ class EventsPersistenceStorageController:
|
||||||
):
|
):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# The server will leave the room, so we go and find out which remote
|
|
||||||
# users will still be joined when we leave.
|
|
||||||
if current_state is None:
|
|
||||||
current_state = await self.main_store.get_partial_current_state_ids(room_id)
|
|
||||||
current_state = dict(current_state)
|
|
||||||
for key in delta.to_delete:
|
|
||||||
current_state.pop(key, None)
|
|
||||||
|
|
||||||
current_state.update(delta.to_insert)
|
|
||||||
|
|
||||||
remote_event_ids = [
|
|
||||||
event_id
|
|
||||||
for (
|
|
||||||
typ,
|
|
||||||
state_key,
|
|
||||||
), event_id in current_state.items()
|
|
||||||
if typ == EventTypes.Member and not self.is_mine_id(state_key)
|
|
||||||
]
|
|
||||||
members = await self.main_store.get_membership_from_event_ids(remote_event_ids)
|
|
||||||
potentially_left_users.update(
|
|
||||||
member.user_id
|
|
||||||
for member in members.values()
|
|
||||||
if member and member.membership == Membership.JOIN
|
|
||||||
)
|
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def _handle_potentially_left_users(self, user_ids: Set[str]) -> None:
|
|
||||||
"""Given a set of remote users check if the server still shares a room with
|
|
||||||
them. If not then mark those users' device cache as stale.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not user_ids:
|
|
||||||
return
|
|
||||||
|
|
||||||
joined_users = await self.main_store.get_users_server_still_shares_room_with(
|
|
||||||
user_ids
|
|
||||||
)
|
|
||||||
left_users = user_ids - joined_users
|
|
||||||
|
|
||||||
for user_id in left_users:
|
|
||||||
await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
|
|
||||||
|
|
|
@ -83,6 +83,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class DataStore(
|
class DataStore(
|
||||||
EventsBackgroundUpdatesStore,
|
EventsBackgroundUpdatesStore,
|
||||||
|
DeviceStore,
|
||||||
RoomMemberStore,
|
RoomMemberStore,
|
||||||
RoomStore,
|
RoomStore,
|
||||||
RoomBatchStore,
|
RoomBatchStore,
|
||||||
|
@ -114,7 +115,6 @@ class DataStore(
|
||||||
StreamWorkerStore,
|
StreamWorkerStore,
|
||||||
OpenIdStore,
|
OpenIdStore,
|
||||||
ClientIpWorkerStore,
|
ClientIpWorkerStore,
|
||||||
DeviceStore,
|
|
||||||
DeviceInboxStore,
|
DeviceInboxStore,
|
||||||
UserDirectoryStore,
|
UserDirectoryStore,
|
||||||
UserErasureStore,
|
UserErasureStore,
|
||||||
|
|
|
@ -47,6 +47,7 @@ from synapse.storage.database import (
|
||||||
make_tuple_comparison_clause,
|
make_tuple_comparison_clause,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
|
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
|
||||||
|
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
||||||
from synapse.storage.types import Cursor
|
from synapse.storage.types import Cursor
|
||||||
from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
|
from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
|
||||||
from synapse.util import json_decoder, json_encoder
|
from synapse.util import json_decoder, json_encoder
|
||||||
|
@ -70,7 +71,7 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
|
||||||
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
|
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
|
||||||
|
|
||||||
|
|
||||||
class DeviceWorkerStore(EndToEndKeyWorkerStore):
|
class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
database: DatabasePool,
|
database: DatabasePool,
|
||||||
|
@ -985,24 +986,59 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
|
||||||
desc="mark_remote_user_device_cache_as_valid",
|
desc="mark_remote_user_device_cache_as_valid",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def handle_potentially_left_users(self, user_ids: Set[str]) -> None:
|
||||||
|
"""Given a set of remote users check if the server still shares a room with
|
||||||
|
them. If not then mark those users' device cache as stale.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not user_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.db_pool.runInteraction(
|
||||||
|
"_handle_potentially_left_users",
|
||||||
|
self.handle_potentially_left_users_txn,
|
||||||
|
user_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
def handle_potentially_left_users_txn(
|
||||||
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
user_ids: Set[str],
|
||||||
|
) -> None:
|
||||||
|
"""Given a set of remote users check if the server still shares a room with
|
||||||
|
them. If not then mark those users' device cache as stale.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not user_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
joined_users = self.get_users_server_still_shares_room_with_txn(txn, user_ids)
|
||||||
|
left_users = user_ids - joined_users
|
||||||
|
|
||||||
|
for user_id in left_users:
|
||||||
|
self.mark_remote_user_device_list_as_unsubscribed_txn(txn, user_id)
|
||||||
|
|
||||||
async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
|
async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
|
||||||
"""Mark that we no longer track device lists for remote user."""
|
"""Mark that we no longer track device lists for remote user."""
|
||||||
|
|
||||||
def _mark_remote_user_device_list_as_unsubscribed_txn(
|
|
||||||
txn: LoggingTransaction,
|
|
||||||
) -> None:
|
|
||||||
self.db_pool.simple_delete_txn(
|
|
||||||
txn,
|
|
||||||
table="device_lists_remote_extremeties",
|
|
||||||
keyvalues={"user_id": user_id},
|
|
||||||
)
|
|
||||||
self._invalidate_cache_and_stream(
|
|
||||||
txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
|
|
||||||
)
|
|
||||||
|
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
"mark_remote_user_device_list_as_unsubscribed",
|
"mark_remote_user_device_list_as_unsubscribed",
|
||||||
_mark_remote_user_device_list_as_unsubscribed_txn,
|
self.mark_remote_user_device_list_as_unsubscribed_txn,
|
||||||
|
user_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
def mark_remote_user_device_list_as_unsubscribed_txn(
|
||||||
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
user_id: str,
|
||||||
|
) -> None:
|
||||||
|
self.db_pool.simple_delete_txn(
|
||||||
|
txn,
|
||||||
|
table="device_lists_remote_extremeties",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
)
|
||||||
|
self._invalidate_cache_and_stream(
|
||||||
|
txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_dehydrated_device(
|
async def get_dehydrated_device(
|
||||||
|
|
|
@ -1202,6 +1202,12 @@ class PersistEventsStore:
|
||||||
txn, room_id, members_changed
|
txn, room_id, members_changed
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Check if any of the remote membership changes requires us to
|
||||||
|
# unsubscribe from their device lists.
|
||||||
|
self.store.handle_potentially_left_users_txn(
|
||||||
|
txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
|
||||||
|
)
|
||||||
|
|
||||||
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
|
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
|
||||||
"""Update the room version in the database based off current state
|
"""Update the room version in the database based off current state
|
||||||
events.
|
events.
|
||||||
|
|
|
@ -662,31 +662,37 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
if not user_ids:
|
if not user_ids:
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
def _get_users_server_still_shares_room_with_txn(
|
|
||||||
txn: LoggingTransaction,
|
|
||||||
) -> Set[str]:
|
|
||||||
sql = """
|
|
||||||
SELECT state_key FROM current_state_events
|
|
||||||
WHERE
|
|
||||||
type = 'm.room.member'
|
|
||||||
AND membership = 'join'
|
|
||||||
AND %s
|
|
||||||
GROUP BY state_key
|
|
||||||
"""
|
|
||||||
|
|
||||||
clause, args = make_in_list_sql_clause(
|
|
||||||
self.database_engine, "state_key", user_ids
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.execute(sql % (clause,), args)
|
|
||||||
|
|
||||||
return {row[0] for row in txn}
|
|
||||||
|
|
||||||
return await self.db_pool.runInteraction(
|
return await self.db_pool.runInteraction(
|
||||||
"get_users_server_still_shares_room_with",
|
"get_users_server_still_shares_room_with",
|
||||||
_get_users_server_still_shares_room_with_txn,
|
self.get_users_server_still_shares_room_with_txn,
|
||||||
|
user_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_users_server_still_shares_room_with_txn(
|
||||||
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
user_ids: Collection[str],
|
||||||
|
) -> Set[str]:
|
||||||
|
if not user_ids:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT state_key FROM current_state_events
|
||||||
|
WHERE
|
||||||
|
type = 'm.room.member'
|
||||||
|
AND membership = 'join'
|
||||||
|
AND %s
|
||||||
|
GROUP BY state_key
|
||||||
|
"""
|
||||||
|
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "state_key", user_ids
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.execute(sql % (clause,), args)
|
||||||
|
|
||||||
|
return {row[0] for row in txn}
|
||||||
|
|
||||||
@cancellable
|
@cancellable
|
||||||
async def get_rooms_for_user(
|
async def get_rooms_for_user(
|
||||||
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
|
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
|
||||||
|
|
Loading…
Reference in New Issue