Add endpoints for backfilling history (MSC2716) (#9247)

Work on https://github.com/matrix-org/matrix-doc/pull/2716
This commit is contained in:
Eric Eastwood 2021-06-22 04:02:53 -05:00 committed by GitHub
parent 756fd513df
commit 96f6293de5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 584 additions and 23 deletions

1
changelog.d/9247.feature Normal file
View File

@ -0,0 +1 @@
Add experimental support for backfilling history into rooms ([MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716)).

View File

@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
fi fi
# Run the tests! # Run the tests!
go test -v -tags synapse_blacklist,msc2946,msc3083 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests

View File

@ -92,11 +92,8 @@ class Auth:
async def check_from_context( async def check_from_context(
self, room_version: str, event, context, do_sig_check=True self, room_version: str, event, context, do_sig_check=True
) -> None: ) -> None:
prev_state_ids = await context.get_prev_state_ids() auth_event_ids = event.auth_event_ids()
auth_events_ids = self.compute_auth_events( auth_events_by_id = await self.store.get_events(auth_event_ids)
event, prev_state_ids, for_verification=True
)
auth_events_by_id = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()} auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}
room_version_obj = KNOWN_ROOM_VERSIONS[room_version] room_version_obj = KNOWN_ROOM_VERSIONS[room_version]

View File

@ -119,6 +119,9 @@ class EventTypes:
SpaceChild = "m.space.child" SpaceChild = "m.space.child"
SpaceParent = "m.space.parent" SpaceParent = "m.space.parent"
MSC2716_INSERTION = "org.matrix.msc2716.insertion"
MSC2716_MARKER = "org.matrix.msc2716.marker"
class ToDeviceEventTypes: class ToDeviceEventTypes:
RoomKeyRequest = "m.room_key_request" RoomKeyRequest = "m.room_key_request"
@ -185,6 +188,18 @@ class EventContentFields:
# cf https://github.com/matrix-org/matrix-doc/pull/1772 # cf https://github.com/matrix-org/matrix-doc/pull/1772
ROOM_TYPE = "type" ROOM_TYPE = "type"
# Used on normal messages to indicate they were historically imported after the fact
MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
# For "insertion" events
MSC2716_NEXT_CHUNK_ID = "org.matrix.msc2716.next_chunk_id"
# Used on normal message events to indicate where the chunk connects to
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
# For "marker" events
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
MSC2716_MARKER_INSERTION_PREV_EVENTS = (
"org.matrix.msc2716.marker.insertion_prev_events"
)
class RoomEncryptionAlgorithms: class RoomEncryptionAlgorithms:
MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2" MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"

View File

@ -29,3 +29,6 @@ class ExperimentalConfig(Config):
# MSC3026 (busy presence state) # MSC3026 (busy presence state)
self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool
# MSC2716 (backfill existing history)
self.msc2716_enabled = experimental.get("msc2716_enabled", False) # type: bool

View File

@ -119,6 +119,7 @@ class _EventInternalMetadata:
redacted = DictProperty("redacted") # type: bool redacted = DictProperty("redacted") # type: bool
txn_id = DictProperty("txn_id") # type: str txn_id = DictProperty("txn_id") # type: str
token_id = DictProperty("token_id") # type: str token_id = DictProperty("token_id") # type: str
historical = DictProperty("historical") # type: bool
# XXX: These are set by StreamWorkerStore._set_before_and_after. # XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't # I'm pretty sure that these are never persisted to the database, so shouldn't
@ -204,6 +205,14 @@ class _EventInternalMetadata:
""" """
return self._dict.get("redacted", False) return self._dict.get("redacted", False)
def is_historical(self) -> bool:
"""Whether this is a historical message.
This is used by the batchsend historical message endpoint and
is needed to and mark the event as backfilled and skip some checks
like push notifications.
"""
return self._dict.get("historical", False)
class EventBase(metaclass=abc.ABCMeta): class EventBase(metaclass=abc.ABCMeta):
@property @property

View File

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
import attr import attr
@ -33,6 +34,8 @@ from synapse.types import EventID, JsonDict
from synapse.util import Clock from synapse.util import Clock
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
@attr.s(slots=True, cmp=False, frozen=True) @attr.s(slots=True, cmp=False, frozen=True)
class EventBuilder: class EventBuilder:
@ -100,6 +103,7 @@ class EventBuilder:
self, self,
prev_event_ids: List[str], prev_event_ids: List[str],
auth_event_ids: Optional[List[str]], auth_event_ids: Optional[List[str]],
depth: Optional[int] = None,
) -> EventBase: ) -> EventBase:
"""Transform into a fully signed and hashed event """Transform into a fully signed and hashed event
@ -108,6 +112,9 @@ class EventBuilder:
auth_event_ids: The event IDs to use as the auth events. auth_event_ids: The event IDs to use as the auth events.
Should normally be set to None, which will cause them to be calculated Should normally be set to None, which will cause them to be calculated
based on the room state at the prev_events. based on the room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Returns: Returns:
The signed and hashed event. The signed and hashed event.
@ -131,8 +138,14 @@ class EventBuilder:
auth_events = auth_event_ids auth_events = auth_event_ids
prev_events = prev_event_ids prev_events = prev_event_ids
old_depth = await self._store.get_max_depth_of(prev_event_ids) # Otherwise, progress the depth as normal
depth = old_depth + 1 if depth is None:
(
_,
most_recent_prev_event_depth,
) = await self._store.get_max_depth_of(prev_event_ids)
depth = most_recent_prev_event_depth + 1
# we cap depth of generated events, to ensure that they are not # we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in # rejected by other servers (and so that they can be persisted in

View File

@ -482,6 +482,9 @@ class EventCreationHandler:
prev_event_ids: Optional[List[str]] = None, prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True, require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]: ) -> Tuple[EventBase, EventContext]:
""" """
Given a dict from a client, create a new event. Given a dict from a client, create a new event.
@ -508,6 +511,14 @@ class EventCreationHandler:
require_consent: Whether to check if the requester has require_consent: Whether to check if the requester has
consented to the privacy policy. consented to the privacy policy.
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Raises: Raises:
ResourceLimitError if server is blocked to some resource being ResourceLimitError if server is blocked to some resource being
exceeded exceeded
@ -563,11 +574,36 @@ class EventCreationHandler:
if txn_id is not None: if txn_id is not None:
builder.internal_metadata.txn_id = txn_id builder.internal_metadata.txn_id = txn_id
builder.internal_metadata.outlier = outlier
builder.internal_metadata.historical = historical
# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
if auth_event_ids is not None:
temp_event = await builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)
auth_events = await self.store.get_events_as_list(auth_event_ids)
# Create a StateMap[str]
auth_event_state_map = {
(e.type, e.state_key): e.event_id for e in auth_events
}
# Actually strip down and use the necessary auth events
auth_event_ids = self.auth.compute_auth_events(
event=temp_event,
current_state_ids=auth_event_state_map,
for_verification=False,
)
event, context = await self.create_new_client_event( event, context = await self.create_new_client_event(
builder=builder, builder=builder,
requester=requester, requester=requester,
prev_event_ids=prev_event_ids, prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids, auth_event_ids=auth_event_ids,
depth=depth,
) )
# In an ideal world we wouldn't need the second part of this condition. However, # In an ideal world we wouldn't need the second part of this condition. However,
@ -724,9 +760,13 @@ class EventCreationHandler:
self, self,
requester: Requester, requester: Requester,
event_dict: dict, event_dict: dict,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
ratelimit: bool = True, ratelimit: bool = True,
txn_id: Optional[str] = None, txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False, ignore_shadow_ban: bool = False,
outlier: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, int]: ) -> Tuple[EventBase, int]:
""" """
Creates an event, then sends it. Creates an event, then sends it.
@ -736,10 +776,24 @@ class EventCreationHandler:
Args: Args:
requester: The requester sending the event. requester: The requester sending the event.
event_dict: An entire event. event_dict: An entire event.
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
from the database.
auth_event_ids:
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
ratelimit: Whether to rate limit this send. ratelimit: Whether to rate limit this send.
txn_id: The transaction ID. txn_id: The transaction ID.
ignore_shadow_ban: True if shadow-banned users should be allowed to ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event. send this event.
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Returns: Returns:
The event, and its stream ordering (if deduplication happened, The event, and its stream ordering (if deduplication happened,
@ -779,7 +833,13 @@ class EventCreationHandler:
return event, event.internal_metadata.stream_ordering return event, event.internal_metadata.stream_ordering
event, context = await self.create_event( event, context = await self.create_event(
requester, event_dict, txn_id=txn_id requester,
event_dict,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
outlier=outlier,
depth=depth,
) )
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
@ -811,6 +871,7 @@ class EventCreationHandler:
requester: Optional[Requester] = None, requester: Optional[Requester] = None,
prev_event_ids: Optional[List[str]] = None, prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]: ) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client """Create a new event for a local client
@ -828,6 +889,10 @@ class EventCreationHandler:
Should normally be left as None, which will cause them to be calculated Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events. based on the room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Returns: Returns:
Tuple of created event, context Tuple of created event, context
""" """
@ -851,9 +916,24 @@ class EventCreationHandler:
), "Attempting to create an event with no prev_events" ), "Attempting to create an event with no prev_events"
event = await builder.build( event = await builder.build(
prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
) )
context = await self.state.compute_event_context(event)
old_state = None
# Pass on the outlier property from the builder to the event
# after it is created
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = builder.internal_metadata.outlier
# Calculate the state for outliers that pass in their own `auth_event_ids`
if auth_event_ids:
old_state = await self.store.get_events_as_list(auth_event_ids)
context = await self.state.compute_event_context(event, old_state=old_state)
if requester: if requester:
context.app_service = requester.app_service context.app_service = requester.app_service
@ -1018,6 +1098,12 @@ class EventCreationHandler:
the arguments. the arguments.
""" """
# Skip push notification actions for historical messages
# because we don't want to notify people about old history back in time.
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
await self.action_generator.handle_push_actions_for_event(event, context) await self.action_generator.handle_push_actions_for_event(event, context)
try: try:
@ -1317,13 +1403,21 @@ class EventCreationHandler:
if prev_state_ids: if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden") raise AuthError(403, "Changing the room create event is forbidden")
# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True
# Note that this returns the event that was persisted, which may not be # Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs. # the same as we passed in if it was deduplicated due transaction IDs.
( (
event, event,
event_pos, event_pos,
max_stream_token, max_stream_token,
) = await self.storage.persistence.persist_event(event, context=context) ) = await self.storage.persistence.persist_event(
event, context=context, backfilled=backfilled
)
if self._ephemeral_events_enabled: if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry. # If there's an expiry timestamp on the event, schedule its expiry.

View File

@ -257,11 +257,42 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_id: str, room_id: str,
membership: str, membership: str,
prev_event_ids: List[str], prev_event_ids: List[str],
auth_event_ids: Optional[List[str]] = None,
txn_id: Optional[str] = None, txn_id: Optional[str] = None,
ratelimit: bool = True, ratelimit: bool = True,
content: Optional[dict] = None, content: Optional[dict] = None,
require_consent: bool = True, require_consent: bool = True,
outlier: bool = False,
) -> Tuple[str, int]: ) -> Tuple[str, int]:
"""
Internal membership update function to get an existing event or create
and persist a new event for the new membership change.
Args:
requester:
target:
room_id:
membership:
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
txn_id:
ratelimit:
content:
require_consent:
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
Returns:
Tuple of event ID and stream ordering position
"""
user_id = target.to_string() user_id = target.to_string()
if content is None: if content is None:
@ -298,7 +329,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
}, },
txn_id=txn_id, txn_id=txn_id,
prev_event_ids=prev_event_ids, prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
require_consent=require_consent, require_consent=require_consent,
outlier=outlier,
) )
prev_state_ids = await context.get_prev_state_ids() prev_state_ids = await context.get_prev_state_ids()
@ -399,6 +432,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: bool = True, ratelimit: bool = True,
content: Optional[dict] = None, content: Optional[dict] = None,
require_consent: bool = True, require_consent: bool = True,
outlier: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]: ) -> Tuple[str, int]:
"""Update a user's membership in a room. """Update a user's membership in a room.
@ -413,6 +449,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: Whether to rate limit the request. ratelimit: Whether to rate limit the request.
content: The content of the created event. content: The content of the created event.
require_consent: Whether consent is required. require_consent: Whether consent is required.
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
Returns: Returns:
A tuple of the new event ID and stream ID. A tuple of the new event ID and stream ID.
@ -439,6 +483,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit=ratelimit, ratelimit=ratelimit,
content=content, content=content,
require_consent=require_consent, require_consent=require_consent,
outlier=outlier,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
) )
return result return result
@ -455,10 +502,36 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: bool = True, ratelimit: bool = True,
content: Optional[dict] = None, content: Optional[dict] = None,
require_consent: bool = True, require_consent: bool = True,
outlier: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]: ) -> Tuple[str, int]:
"""Helper for update_membership. """Helper for update_membership.
Assumes that the membership linearizer is already held for the room. Assumes that the membership linearizer is already held for the room.
Args:
requester:
target:
room_id:
action:
txn_id:
remote_room_hosts:
third_party_signed:
ratelimit:
content:
require_consent:
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
Returns:
A tuple of the new event ID and stream ID.
""" """
content_specified = bool(content) content_specified = bool(content)
if content is None: if content is None:
@ -543,6 +616,21 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if block_invite: if block_invite:
raise SynapseError(403, "Invites have been disabled on this server") raise SynapseError(403, "Invites have been disabled on this server")
if prev_event_ids:
return await self._local_membership_update(
requester=requester,
target=target,
room_id=room_id,
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
content=content,
require_consent=require_consent,
outlier=outlier,
)
latest_event_ids = await self.store.get_prev_events_for_room(room_id) latest_event_ids = await self.store.get_prev_events_for_room(room_id)
current_state_ids = await self.state_handler.get_current_state_ids( current_state_ids = await self.state_handler.get_current_state_ids(
@ -732,8 +820,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
txn_id=txn_id, txn_id=txn_id,
ratelimit=ratelimit, ratelimit=ratelimit,
prev_event_ids=latest_event_ids, prev_event_ids=latest_event_ids,
auth_event_ids=auth_event_ids,
content=content, content=content,
require_consent=require_consent, require_consent=require_consent,
outlier=outlier,
) )
async def transfer_room_state_on_room_upgrade( async def transfer_room_state_on_room_upgrade(

View File

@ -19,7 +19,7 @@ import re
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from urllib import parse as urlparse from urllib import parse as urlparse
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.errors import ( from synapse.api.errors import (
AuthError, AuthError,
Codes, Codes,
@ -266,6 +266,288 @@ class RoomSendEventRestServlet(TransactionRestServlet):
) )
class RoomBatchSendEventRestServlet(TransactionRestServlet):
"""
API endpoint which can insert a chunk of events historically back in time
next to the given `prev_event`.
`chunk_id` comes from `next_chunk_id `in the response of the batch send
endpoint and is derived from the "insertion" events added to each chunk.
It's not required for the first batch send.
`state_events_at_start` is used to define the historical state events
needed to auth the events like join events. These events will float
outside of the normal DAG as outlier's and won't be visible in the chat
history which also allows us to insert multiple chunks without having a bunch
of `@mxid joined the room` noise between each chunk.
`events` is chronological chunk/list of events you want to insert.
There is a reverse-chronological constraint on chunks so once you insert
some messages, you can only insert older ones after that.
tldr; Insert chunks from your most recent history -> oldest history.
POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event=<eventID>&chunk_id=<chunkID>
{
"events": [ ... ],
"state_events_at_start": [ ... ]
}
"""
PATTERNS = (
re.compile(
"^/_matrix/client/unstable/org.matrix.msc2716"
"/rooms/(?P<room_id>[^/]*)/batch_send$"
),
)
def __init__(self, hs):
super().__init__(hs)
self.hs = hs
self.store = hs.get_datastore()
self.state_store = hs.get_storage().state
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
async def inherit_depth_from_prev_ids(self, prev_event_ids) -> int:
(
most_recent_prev_event_id,
most_recent_prev_event_depth,
) = await self.store.get_max_depth_of(prev_event_ids)
# We want to insert the historical event after the `prev_event` but before the successor event
#
# We inherit depth from the successor event instead of the `prev_event`
# because events returned from `/messages` are first sorted by `topological_ordering`
# which is just the `depth` and then tie-break with `stream_ordering`.
#
# We mark these inserted historical events as "backfilled" which gives them a
# negative `stream_ordering`. If we use the same depth as the `prev_event`,
# then our historical event will tie-break and be sorted before the `prev_event`
# when it should come after.
#
# We want to use the successor event depth so they appear after `prev_event` because
# it has a larger `depth` but before the successor event because the `stream_ordering`
# is negative before the successor event.
successor_event_ids = await self.store.get_successor_events(
[most_recent_prev_event_id]
)
# If we can't find any successor events, then it's a forward extremity of
# historical messages and we can just inherit from the previous historical
# event which we can already assume has the correct depth where we want
# to insert into.
if not successor_event_ids:
depth = most_recent_prev_event_depth
else:
(
_,
oldest_successor_depth,
) = await self.store.get_min_depth_of(successor_event_ids)
depth = oldest_successor_depth
return depth
async def on_POST(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=False)
if not requester.app_service:
raise AuthError(
403,
"Only application services can use the /batchsend endpoint",
)
body = parse_json_object_from_request(request)
assert_params_in_dict(body, ["state_events_at_start", "events"])
prev_events_from_query = parse_strings_from_args(request.args, "prev_event")
chunk_id_from_query = parse_string(request, "chunk_id", default=None)
if prev_events_from_query is None:
raise SynapseError(
400,
"prev_event query parameter is required when inserting historical messages back in time",
errcode=Codes.MISSING_PARAM,
)
# For the event we are inserting next to (`prev_events_from_query`),
# find the most recent auth events (derived from state events) that
# allowed that message to be sent. We will use that as a base
# to auth our historical messages against.
(
most_recent_prev_event_id,
_,
) = await self.store.get_max_depth_of(prev_events_from_query)
# mapping from (type, state_key) -> state_event_id
prev_state_map = await self.state_store.get_state_ids_for_event(
most_recent_prev_event_id
)
# List of state event ID's
prev_state_ids = list(prev_state_map.values())
auth_event_ids = prev_state_ids
for state_event in body["state_events_at_start"]:
assert_params_in_dict(
state_event, ["type", "origin_server_ts", "content", "sender"]
)
logger.debug(
"RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s",
state_event,
auth_event_ids,
)
event_dict = {
"type": state_event["type"],
"origin_server_ts": state_event["origin_server_ts"],
"content": state_event["content"],
"room_id": room_id,
"sender": state_event["sender"],
"state_key": state_event["state_key"],
}
# Make the state events float off on their own
fake_prev_event_id = "$" + random_string(43)
# TODO: This is pretty much the same as some other code to handle inserting state in this file
if event_dict["type"] == EventTypes.Member:
membership = event_dict["content"].get("membership", None)
event_id, _ = await self.room_member_handler.update_membership(
requester,
target=UserID.from_string(event_dict["state_key"]),
room_id=room_id,
action=membership,
content=event_dict["content"],
outlier=True,
prev_event_ids=[fake_prev_event_id],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
auth_event_ids=auth_event_ids.copy(),
)
else:
# TODO: Add some complement tests that adds state that is not member joins
# and will use this code path. Maybe we only want to support join state events
# and can get rid of this `else`?
(
event,
_,
) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
event_dict,
outlier=True,
prev_event_ids=[fake_prev_event_id],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
auth_event_ids=auth_event_ids.copy(),
)
event_id = event.event_id
auth_event_ids.append(event_id)
events_to_create = body["events"]
# If provided, connect the chunk to the last insertion point
# The chunk ID passed in comes from the chunk_id in the
# "insertion" event from the previous chunk.
if chunk_id_from_query:
last_event_in_chunk = events_to_create[-1]
last_event_in_chunk["content"][
EventContentFields.MSC2716_CHUNK_ID
] = chunk_id_from_query
# Add an "insertion" event to the start of each chunk (next to the oldest
# event in the chunk) so the next chunk can be connected to this one.
next_chunk_id = random_string(64)
insertion_event = {
"type": EventTypes.MSC2716_INSERTION,
"sender": requester.user.to_string(),
"content": {
EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
# Since the insertion event is put at the start of the chunk,
# where the oldest event is, copy the origin_server_ts from
# the first event we're inserting
"origin_server_ts": events_to_create[0]["origin_server_ts"],
}
# Prepend the insertion event to the start of the chunk
events_to_create = [insertion_event] + events_to_create
inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query)
event_ids = []
prev_event_ids = prev_events_from_query
events_to_persist = []
for ev in events_to_create:
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
# Mark all events as historical
# This has important semantics within the Synapse internals to backfill properly
ev["content"][EventContentFields.MSC2716_HISTORICAL] = True
event_dict = {
"type": ev["type"],
"origin_server_ts": ev["origin_server_ts"],
"content": ev["content"],
"room_id": room_id,
"sender": ev["sender"], # requester.user.to_string(),
"prev_events": prev_event_ids.copy(),
}
event, context = await self.event_creation_handler.create_event(
requester,
event_dict,
prev_event_ids=event_dict.get("prev_events"),
auth_event_ids=auth_event_ids,
historical=True,
depth=inherited_depth,
)
logger.debug(
"RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
event,
prev_event_ids,
auth_event_ids,
)
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
event.sender,
)
events_to_persist.append((event, context))
event_id = event.event_id
event_ids.append(event_id)
prev_event_ids = [event_id]
# Persist events in reverse-chronological order so they have the
# correct stream_ordering as they are backfilled (which decrements).
# Events are sorted by (topological_ordering, stream_ordering)
# where topological_ordering is just depth.
for (event, context) in reversed(events_to_persist):
ev = await self.event_creation_handler.handle_new_client_event(
requester=requester,
event=event,
context=context,
)
return 200, {
"state_events": auth_event_ids,
"events": event_ids,
"next_chunk_id": next_chunk_id,
}
def on_GET(self, request, room_id):
return 501, "Not implemented"
def on_PUT(self, request, room_id):
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id
)
# TODO: Needs unit testing for room ID + alias joins # TODO: Needs unit testing for room ID + alias joins
class JoinRoomAliasServlet(TransactionRestServlet): class JoinRoomAliasServlet(TransactionRestServlet):
def __init__(self, hs): def __init__(self, hs):
@ -1054,6 +1336,8 @@ class RoomSpaceSummaryRestServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server, is_worker=False): def register_servlets(hs: "HomeServer", http_server, is_worker=False):
msc2716_enabled = hs.config.experimental.msc2716_enabled
RoomStateEventRestServlet(hs).register(http_server) RoomStateEventRestServlet(hs).register(http_server)
RoomMemberListRestServlet(hs).register(http_server) RoomMemberListRestServlet(hs).register(http_server)
JoinedRoomMemberListRestServlet(hs).register(http_server) JoinedRoomMemberListRestServlet(hs).register(http_server)
@ -1061,6 +1345,8 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False):
JoinRoomAliasServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server)
RoomMembershipRestServlet(hs).register(http_server) RoomMembershipRestServlet(hs).register(http_server)
RoomSendEventRestServlet(hs).register(http_server) RoomSendEventRestServlet(hs).register(http_server)
if msc2716_enabled:
RoomBatchSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server) PublicRoomListRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server) RoomStateRestServlet(hs).register(http_server)
RoomRedactEventRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server)

View File

@ -16,6 +16,7 @@ import logging
from queue import Empty, PriorityQueue from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Set, Tuple from typing import Collection, Dict, Iterable, List, Set, Tuple
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.events import EventBase from synapse.events import EventBase
from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.background_process_metrics import wrap_as_background_process
@ -670,8 +671,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return dict(txn) return dict(txn)
async def get_max_depth_of(self, event_ids: List[str]) -> int: async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
"""Returns the max depth of a set of event IDs """Returns the event ID and depth for the event that has the max depth from a set of event IDs
Args: Args:
event_ids: The event IDs to calculate the max depth of. event_ids: The event IDs to calculate the max depth of.
@ -680,14 +681,53 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
table="events", table="events",
column="event_id", column="event_id",
iterable=event_ids, iterable=event_ids,
retcols=("depth",), retcols=(
"event_id",
"depth",
),
desc="get_max_depth_of", desc="get_max_depth_of",
) )
if not rows: if not rows:
return 0 return None, 0
else: else:
return max(row["depth"] for row in rows) max_depth_event_id = ""
current_max_depth = 0
for row in rows:
if row["depth"] > current_max_depth:
max_depth_event_id = row["event_id"]
current_max_depth = row["depth"]
return max_depth_event_id, current_max_depth
async def get_min_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
"""Returns the event ID and depth for the event that has the min depth from a set of event IDs
Args:
event_ids: The event IDs to calculate the max depth of.
"""
rows = await self.db_pool.simple_select_many_batch(
table="events",
column="event_id",
iterable=event_ids,
retcols=(
"event_id",
"depth",
),
desc="get_min_depth_of",
)
if not rows:
return None, 0
else:
min_depth_event_id = ""
current_min_depth = MAX_DEPTH
for row in rows:
if row["depth"] < current_min_depth:
min_depth_event_id = row["event_id"]
current_min_depth = row["depth"]
return min_depth_event_id, current_min_depth
async def get_prev_events_for_room(self, room_id: str) -> List[str]: async def get_prev_events_for_room(self, room_id: str) -> List[str]:
""" """

View File

@ -863,7 +863,9 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
self.store.get_latest_event_ids_in_room(room_id) self.store.get_latest_event_ids_in_room(room_id)
) )
event = self.get_success(builder.build(prev_event_ids, None)) event = self.get_success(
builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
)
self.get_success(self.federation_handler.on_receive_pdu(hostname, event)) self.get_success(self.federation_handler.on_receive_pdu(hostname, event))

View File

@ -224,7 +224,9 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
} }
builder = factory.for_room_version(room_version, event_dict) builder = factory.for_room_version(room_version, event_dict)
join_event = self.get_success(builder.build(prev_event_ids, None)) join_event = self.get_success(
builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
)
self.get_success(federation.on_send_join_request(remote_server, join_event)) self.get_success(federation.on_send_join_request(remote_server, join_event))
self.replicate() self.replicate()

View File

@ -232,9 +232,14 @@ class RedactionTestCase(unittest.HomeserverTestCase):
self._base_builder = base_builder self._base_builder = base_builder
self._event_id = event_id self._event_id = event_id
async def build(self, prev_event_ids, auth_event_ids): async def build(
self,
prev_event_ids,
auth_event_ids,
depth: Optional[int] = None,
):
built_event = await self._base_builder.build( built_event = await self._base_builder.build(
prev_event_ids, auth_event_ids prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
) )
built_event._event_id = self._event_id built_event._event_id = self._event_id
@ -251,6 +256,10 @@ class RedactionTestCase(unittest.HomeserverTestCase):
def type(self): def type(self):
return self._base_builder.type return self._base_builder.type
@property
def internal_metadata(self):
return self._base_builder.internal_metadata
event_1, context_1 = self.get_success( event_1, context_1 = self.get_success(
self.event_creation_handler.create_new_client_event( self.event_creation_handler.create_new_client_event(
EventIdManglingBuilder( EventIdManglingBuilder(