Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` (#17265)

Use fully-qualified `PersistedEventPosition` (`instance_name` and `stream_ordering`) when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation.

Spawning from https://github.com/element-hq/synapse/pull/17187 where we want to utilize this change
This commit is contained in:
Eric Eastwood 2024-06-04 12:58:03 -05:00 committed by GitHub
parent eab0b548e4
commit 7d8f0ef351
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 85 additions and 75 deletions

1
changelog.d/17265.misc Normal file
View File

@ -0,0 +1 @@
Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation.

View File

@ -674,7 +674,7 @@ class FederationServer(FederationBase):
# This is in addition to the HS-level rate limiting applied by # This is in addition to the HS-level rate limiting applied by
# BaseFederationServlet. # BaseFederationServlet.
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?) # type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
requester=None, requester=None,
key=room_id, key=room_id,
update=False, update=False,
@ -717,7 +717,7 @@ class FederationServer(FederationBase):
SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE, SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
caller_supports_partial_state, caller_supports_partial_state,
) )
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
requester=None, requester=None,
key=room_id, key=room_id,
update=False, update=False,

View File

@ -126,13 +126,7 @@ class AdminHandler:
# Get all rooms the user is in or has been in # Get all rooms the user is in or has been in
rooms = await self._store.get_rooms_for_local_user_where_membership_is( rooms = await self._store.get_rooms_for_local_user_where_membership_is(
user_id, user_id,
membership_list=( membership_list=Membership.LIST,
Membership.JOIN,
Membership.LEAVE,
Membership.BAN,
Membership.INVITE,
Membership.KNOCK,
),
) )
# We only try and fetch events for rooms the user has been in. If # We only try and fetch events for rooms the user has been in. If
@ -179,7 +173,7 @@ class AdminHandler:
if room.membership == Membership.JOIN: if room.membership == Membership.JOIN:
stream_ordering = self._store.get_room_max_stream_ordering() stream_ordering = self._store.get_room_max_stream_ordering()
else: else:
stream_ordering = room.stream_ordering stream_ordering = room.event_pos.stream
from_key = RoomStreamToken(topological=0, stream=0) from_key = RoomStreamToken(topological=0, stream=0)
to_key = RoomStreamToken(stream=stream_ordering) to_key = RoomStreamToken(stream=stream_ordering)

View File

@ -199,7 +199,7 @@ class InitialSyncHandler:
) )
elif event.membership == Membership.LEAVE: elif event.membership == Membership.LEAVE:
room_end_token = RoomStreamToken( room_end_token = RoomStreamToken(
stream=event.stream_ordering, stream=event.event_pos.stream,
) )
deferred_room_state = run_in_background( deferred_room_state = run_in_background(
self._state_storage_controller.get_state_for_events, self._state_storage_controller.get_state_for_events,

View File

@ -27,7 +27,6 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
@ -38,6 +37,8 @@ from synapse.types import (
JsonMapping, JsonMapping,
Requester, Requester,
ScheduledTask, ScheduledTask,
ShutdownRoomParams,
ShutdownRoomResponse,
StreamKeyType, StreamKeyType,
TaskStatus, TaskStatus,
) )

View File

@ -40,7 +40,6 @@ from typing import (
) )
import attr import attr
from typing_extensions import TypedDict
import synapse.events.snapshot import synapse.events.snapshot
from synapse.api.constants import ( from synapse.api.constants import (
@ -81,6 +80,8 @@ from synapse.types import (
RoomAlias, RoomAlias,
RoomID, RoomID,
RoomStreamToken, RoomStreamToken,
ShutdownRoomParams,
ShutdownRoomResponse,
StateMap, StateMap,
StrCollection, StrCollection,
StreamKeyType, StreamKeyType,
@ -1780,63 +1781,6 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
return self.store.get_current_room_stream_token_for_room_id(room_id) return self.store.get_current_room_stream_token_for_room_id(room_id)
class ShutdownRoomParams(TypedDict):
"""
Attributes:
requester_user_id:
User who requested the action. Will be recorded as putting the room on the
blocking list.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
moved into that room. If not set, no new room will be created
and the users will just be removed from the old room.
new_room_name:
A string representing the name of the room that new users will
be invited to. Defaults to `Content Violation Notification`
message:
A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly
convey why the original room was shut down.
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Defaults to `false`.
purge:
If set to `true`, purge the given room from the database.
force_purge:
If set to `true`, the room will be purged from database
even if there are still users joined to the room.
"""
requester_user_id: Optional[str]
new_room_user_id: Optional[str]
new_room_name: Optional[str]
message: Optional[str]
block: bool
purge: bool
force_purge: bool
class ShutdownRoomResponse(TypedDict):
"""
Attributes:
kicked_users: An array of users (`user_id`) that were kicked.
failed_to_kick_users:
An array of users (`user_id`) that that were not kicked.
local_aliases:
An array of strings representing the local aliases that were
migrated from the old room to the new.
new_room_id: A string representing the room ID of the new room.
"""
kicked_users: List[str]
failed_to_kick_users: List[str]
local_aliases: List[str]
new_room_id: Optional[str]
class RoomShutdownHandler: class RoomShutdownHandler:
DEFAULT_MESSAGE = ( DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in" "Sharing illegal content on this server is not permitted and rooms in"

View File

@ -2805,7 +2805,7 @@ class SyncHandler:
continue continue
leave_token = now_token.copy_and_replace( leave_token = now_token.copy_and_replace(
StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering) StreamKeyType.ROOM, RoomStreamToken(stream=event.event_pos.stream)
) )
room_entries.append( room_entries.append(
RoomSyncResultBuilder( RoomSyncResultBuilder(

View File

@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
) )
sql = """ sql = """
SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version SELECT room_id, e.sender, c.membership, event_id, e.instance_name, e.stream_ordering, r.room_version
FROM local_current_membership AS c FROM local_current_membership AS c
INNER JOIN events AS e USING (room_id, event_id) INNER JOIN events AS e USING (room_id, event_id)
INNER JOIN rooms AS r USING (room_id) INNER JOIN rooms AS r USING (room_id)
@ -488,7 +488,17 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
) )
txn.execute(sql, (user_id, *args)) txn.execute(sql, (user_id, *args))
results = [RoomsForUser(*r) for r in txn] results = [
RoomsForUser(
room_id=room_id,
sender=sender,
membership=membership,
event_id=event_id,
event_pos=PersistedEventPosition(instance_name, stream_ordering),
room_version_id=room_version,
)
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
]
return results return results

View File

@ -35,7 +35,7 @@ class RoomsForUser:
sender: str sender: str
membership: str membership: str
event_id: str event_id: str
stream_ordering: int event_pos: PersistedEventPosition
room_version_id: str room_version_id: str

View File

@ -1279,3 +1279,60 @@ class ScheduledTask:
result: Optional[JsonMapping] result: Optional[JsonMapping]
# Optional error that should be assigned a value when the status is FAILED # Optional error that should be assigned a value when the status is FAILED
error: Optional[str] error: Optional[str]
class ShutdownRoomParams(TypedDict):
"""
Attributes:
requester_user_id:
User who requested the action. Will be recorded as putting the room on the
blocking list.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
moved into that room. If not set, no new room will be created
and the users will just be removed from the old room.
new_room_name:
A string representing the name of the room that new users will
be invited to. Defaults to `Content Violation Notification`
message:
A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly
convey why the original room was shut down.
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Defaults to `false`.
purge:
If set to `true`, purge the given room from the database.
force_purge:
If set to `true`, the room will be purged from database
even if there are still users joined to the room.
"""
requester_user_id: Optional[str]
new_room_user_id: Optional[str]
new_room_name: Optional[str]
message: Optional[str]
block: bool
purge: bool
force_purge: bool
class ShutdownRoomResponse(TypedDict):
"""
Attributes:
kicked_users: An array of users (`user_id`) that were kicked.
failed_to_kick_users:
An array of users (`user_id`) that that were not kicked.
local_aliases:
An array of strings representing the local aliases that were
migrated from the old room to the new.
new_room_id: A string representing the room ID of the new room.
"""
kicked_users: List[str]
failed_to_kick_users: List[str]
local_aliases: List[str]
new_room_id: Optional[str]

View File

@ -154,7 +154,10 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
USER_ID, USER_ID,
"invite", "invite",
event.event_id, event.event_id,
event.internal_metadata.stream_ordering, PersistedEventPosition(
self.hs.get_instance_name(),
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier, RoomVersions.V1.identifier,
) )
], ],