Port `EventInternalMetadata` class to Rust (#16782)
There are a couple of things we need to be careful of here: 1. The current python code does no validation when loading from the DB, so we need to be careful to ignore such errors (at least on jki.re there are some old events with internal metadata fields of the wrong type). 2. We want to be memory efficient, as we often have many hundreds of thousands of events in the cache at a time. --------- Co-authored-by: Quentin Gliech <quenting@element.io>
This commit is contained in:
parent
81b1c56288
commit
5d3850b038
|
@ -0,0 +1 @@
|
|||
Port `EventInternalMetadata` class to Rust.
|
|
@ -0,0 +1,430 @@
|
|||
/*
|
||||
* This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
*
|
||||
* Copyright (C) 2024 New Vector, Ltd
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* See the GNU Affero General Public License for more details:
|
||||
* <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
*
|
||||
* Originally licensed under the Apache License, Version 2.0:
|
||||
* <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
*
|
||||
* [This file includes modifications made by New Vector Limited]
|
||||
*
|
||||
*/
|
||||
|
||||
//! Implements the internal metadata class attached to events.
|
||||
//!
|
||||
//! The internal metadata is a bit like a `TypedDict`, in that it is stored as a
|
||||
//! JSON dict in the DB. Most events have zero, or only a few, of these keys
|
||||
//! set. Therefore, since we care more about memory size than performance here,
|
||||
//! we store these fields in a mapping.
|
||||
//!
|
||||
//! We want to store (most) of the fields as Rust objects, so we implement the
|
||||
//! mapping by using a vec of enums. This is less efficient than using
|
||||
//! attributes, but for small number of keys is actually faster than using a
|
||||
//! hash or btree map.
|
||||
|
||||
use std::{num::NonZeroI64, ops::Deref};
|
||||
|
||||
use anyhow::Context;
|
||||
use log::warn;
|
||||
use pyo3::{
|
||||
exceptions::PyAttributeError,
|
||||
pyclass, pymethods,
|
||||
types::{PyDict, PyString},
|
||||
IntoPy, PyAny, PyObject, PyResult, Python,
|
||||
};
|
||||
|
||||
/// Definitions of the various fields of the internal metadata.
|
||||
#[derive(Clone)]
|
||||
enum EventInternalMetadataData {
|
||||
OutOfBandMembership(bool),
|
||||
SendOnBehalfOf(Box<str>),
|
||||
RecheckRedaction(bool),
|
||||
SoftFailed(bool),
|
||||
ProactivelySend(bool),
|
||||
Redacted(bool),
|
||||
TxnId(Box<str>),
|
||||
TokenId(i64),
|
||||
DeviceId(Box<str>),
|
||||
}
|
||||
|
||||
impl EventInternalMetadataData {
|
||||
/// Convert the field to its name and python object.
|
||||
fn to_python_pair<'a>(&self, py: Python<'a>) -> (&'a PyString, PyObject) {
|
||||
match self {
|
||||
EventInternalMetadataData::OutOfBandMembership(o) => {
|
||||
(pyo3::intern!(py, "out_of_band_membership"), o.into_py(py))
|
||||
}
|
||||
EventInternalMetadataData::SendOnBehalfOf(o) => {
|
||||
(pyo3::intern!(py, "send_on_behalf_of"), o.into_py(py))
|
||||
}
|
||||
EventInternalMetadataData::RecheckRedaction(o) => {
|
||||
(pyo3::intern!(py, "recheck_redaction"), o.into_py(py))
|
||||
}
|
||||
EventInternalMetadataData::SoftFailed(o) => {
|
||||
(pyo3::intern!(py, "soft_failed"), o.into_py(py))
|
||||
}
|
||||
EventInternalMetadataData::ProactivelySend(o) => {
|
||||
(pyo3::intern!(py, "proactively_send"), o.into_py(py))
|
||||
}
|
||||
EventInternalMetadataData::Redacted(o) => {
|
||||
(pyo3::intern!(py, "redacted"), o.into_py(py))
|
||||
}
|
||||
EventInternalMetadataData::TxnId(o) => (pyo3::intern!(py, "txn_id"), o.into_py(py)),
|
||||
EventInternalMetadataData::TokenId(o) => (pyo3::intern!(py, "token_id"), o.into_py(py)),
|
||||
EventInternalMetadataData::DeviceId(o) => {
|
||||
(pyo3::intern!(py, "device_id"), o.into_py(py))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts from python key/values to the field.
|
||||
///
|
||||
/// Returns `None` if the key is a valid but unrecognized string.
|
||||
fn from_python_pair(key: &PyAny, value: &PyAny) -> PyResult<Option<Self>> {
|
||||
let key_str: &str = key.extract()?;
|
||||
|
||||
let e = match key_str {
|
||||
"out_of_band_membership" => EventInternalMetadataData::OutOfBandMembership(
|
||||
value
|
||||
.extract()
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
|
||||
"send_on_behalf_of" => EventInternalMetadataData::SendOnBehalfOf(
|
||||
value
|
||||
.extract()
|
||||
.map(String::into_boxed_str)
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
"recheck_redaction" => EventInternalMetadataData::RecheckRedaction(
|
||||
value
|
||||
.extract()
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
"soft_failed" => EventInternalMetadataData::SoftFailed(
|
||||
value
|
||||
.extract()
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
"proactively_send" => EventInternalMetadataData::ProactivelySend(
|
||||
value
|
||||
.extract()
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
"redacted" => EventInternalMetadataData::Redacted(
|
||||
value
|
||||
.extract()
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
"txn_id" => EventInternalMetadataData::TxnId(
|
||||
value
|
||||
.extract()
|
||||
.map(String::into_boxed_str)
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
"token_id" => EventInternalMetadataData::TokenId(
|
||||
value
|
||||
.extract()
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
"device_id" => EventInternalMetadataData::DeviceId(
|
||||
value
|
||||
.extract()
|
||||
.map(String::into_boxed_str)
|
||||
.with_context(|| format!("'{key_str}' has invalid type"))?,
|
||||
),
|
||||
_ => return Ok(None),
|
||||
};
|
||||
|
||||
Ok(Some(e))
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper macro to find the given field in internal metadata, returning None if
|
||||
/// not found.
|
||||
macro_rules! get_property_opt {
|
||||
($self:expr, $name:ident) => {
|
||||
$self.data.iter().find_map(|entry| {
|
||||
if let EventInternalMetadataData::$name(data) = entry {
|
||||
Some(data)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
};
|
||||
}
|
||||
|
||||
/// Helper macro to find the given field in internal metadata, raising an
|
||||
/// attribute error if not found.
|
||||
macro_rules! get_property {
|
||||
($self:expr, $name:ident) => {
|
||||
get_property_opt!($self, $name).ok_or_else(|| {
|
||||
PyAttributeError::new_err(format!(
|
||||
"'EventInternalMetadata' has no attribute '{}'",
|
||||
stringify!($name),
|
||||
))
|
||||
})
|
||||
};
|
||||
}
|
||||
|
||||
/// Helper macro to set the give field.
|
||||
macro_rules! set_property {
|
||||
($self:expr, $name:ident, $obj:expr) => {
|
||||
for entry in &mut $self.data {
|
||||
if let EventInternalMetadataData::$name(data) = entry {
|
||||
*data = $obj;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
$self.data.push(EventInternalMetadataData::$name($obj))
|
||||
};
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
#[derive(Clone)]
|
||||
pub struct EventInternalMetadata {
|
||||
/// The fields of internal metadata. This functions as a mapping.
|
||||
data: Vec<EventInternalMetadataData>,
|
||||
|
||||
/// The stream ordering of this event. None, until it has been persisted.
|
||||
#[pyo3(get, set)]
|
||||
stream_ordering: Option<NonZeroI64>,
|
||||
|
||||
/// whether this event is an outlier (ie, whether we have the state at that
|
||||
/// point in the DAG)
|
||||
#[pyo3(get, set)]
|
||||
outlier: bool,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl EventInternalMetadata {
|
||||
#[new]
|
||||
fn new(dict: &PyDict) -> PyResult<Self> {
|
||||
let mut data = Vec::with_capacity(dict.len());
|
||||
|
||||
for (key, value) in dict.iter() {
|
||||
match EventInternalMetadataData::from_python_pair(key, value) {
|
||||
Ok(Some(entry)) => data.push(entry),
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
warn!("Ignoring internal metadata field '{key}', as failed to convert to Rust due to {err}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data.shrink_to_fit();
|
||||
|
||||
Ok(EventInternalMetadata {
|
||||
data,
|
||||
stream_ordering: None,
|
||||
outlier: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn copy(&self) -> Self {
|
||||
self.clone()
|
||||
}
|
||||
|
||||
fn get_dict(&self, py: Python<'_>) -> PyResult<PyObject> {
|
||||
let dict = PyDict::new(py);
|
||||
|
||||
for entry in &self.data {
|
||||
let (key, value) = entry.to_python_pair(py);
|
||||
dict.set_item(key, value)?;
|
||||
}
|
||||
|
||||
Ok(dict.into())
|
||||
}
|
||||
|
||||
fn is_outlier(&self) -> bool {
|
||||
self.outlier
|
||||
}
|
||||
|
||||
/// Whether this event is an out-of-band membership.
|
||||
///
|
||||
/// OOB memberships are a special case of outlier events: they are
|
||||
/// membership events for federated rooms that we aren't full members of.
|
||||
/// Examples include invites received over federation, and rejections for
|
||||
/// such invites.
|
||||
///
|
||||
/// The concept of an OOB membership is needed because these events need to
|
||||
/// be processed as if they're new regular events (e.g. updating membership
|
||||
/// state in the database, relaying to clients via /sync, etc) despite being
|
||||
/// outliers.
|
||||
///
|
||||
/// See also
|
||||
/// https://element-hq.github.io/synapse/develop/development/room-dag-concepts.html#out-of-band-membership-events.
|
||||
///
|
||||
/// (Added in synapse 0.99.0, so may be unreliable for events received
|
||||
/// before that)
|
||||
fn is_out_of_band_membership(&self) -> bool {
|
||||
get_property_opt!(self, OutOfBandMembership)
|
||||
.copied()
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Whether this server should send the event on behalf of another server.
|
||||
/// This is used by the federation "send_join" API to forward the initial
|
||||
/// join event for a server in the room.
|
||||
///
|
||||
/// returns a str with the name of the server this event is sent on behalf
|
||||
/// of.
|
||||
fn get_send_on_behalf_of(&self) -> Option<&str> {
|
||||
let s = get_property_opt!(self, SendOnBehalfOf);
|
||||
s.map(|a| a.deref())
|
||||
}
|
||||
|
||||
/// Whether the redaction event needs to be rechecked when fetching
|
||||
/// from the database.
|
||||
///
|
||||
/// Starting in room v3 redaction events are accepted up front, and later
|
||||
/// checked to see if the redacter and redactee's domains match.
|
||||
///
|
||||
/// If the sender of the redaction event is allowed to redact any event
|
||||
/// due to auth rules, then this will always return false.
|
||||
fn need_to_check_redaction(&self) -> bool {
|
||||
get_property_opt!(self, RecheckRedaction)
|
||||
.copied()
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Whether the event has been soft failed.
|
||||
///
|
||||
/// Soft failed events should be handled as usual, except:
|
||||
/// 1. They should not go down sync or event streams, or generally sent to
|
||||
/// clients.
|
||||
/// 2. They should not be added to the forward extremities (and therefore
|
||||
/// not to current state).
|
||||
fn is_soft_failed(&self) -> bool {
|
||||
get_property_opt!(self, SoftFailed)
|
||||
.copied()
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Whether the event, if ours, should be sent to other clients and servers.
|
||||
///
|
||||
/// This is used for sending dummy events internally. Servers and clients
|
||||
/// can still explicitly fetch the event.
|
||||
fn should_proactively_send(&self) -> bool {
|
||||
get_property_opt!(self, ProactivelySend)
|
||||
.copied()
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
/// Whether the event has been redacted.
|
||||
///
|
||||
/// This is used for efficiently checking whether an event has been marked
|
||||
/// as redacted without needing to make another database call.
|
||||
fn is_redacted(&self) -> bool {
|
||||
get_property_opt!(self, Redacted).copied().unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Whether this event can trigger a push notification
|
||||
fn is_notifiable(&self) -> bool {
|
||||
!self.outlier || self.is_out_of_band_membership()
|
||||
}
|
||||
|
||||
// ** The following are the getters and setters of the various properties **
|
||||
|
||||
#[getter]
|
||||
fn get_out_of_band_membership(&self) -> PyResult<bool> {
|
||||
let bool = get_property!(self, OutOfBandMembership)?;
|
||||
Ok(*bool)
|
||||
}
|
||||
#[setter]
|
||||
fn set_out_of_band_membership(&mut self, obj: bool) {
|
||||
set_property!(self, OutOfBandMembership, obj);
|
||||
}
|
||||
|
||||
#[getter(send_on_behalf_of)]
|
||||
fn getter_send_on_behalf_of(&self) -> PyResult<&str> {
|
||||
let s = get_property!(self, SendOnBehalfOf)?;
|
||||
Ok(s)
|
||||
}
|
||||
#[setter]
|
||||
fn set_send_on_behalf_of(&mut self, obj: String) {
|
||||
set_property!(self, SendOnBehalfOf, obj.into_boxed_str());
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn get_recheck_redaction(&self) -> PyResult<bool> {
|
||||
let bool = get_property!(self, RecheckRedaction)?;
|
||||
Ok(*bool)
|
||||
}
|
||||
#[setter]
|
||||
fn set_recheck_redaction(&mut self, obj: bool) {
|
||||
set_property!(self, RecheckRedaction, obj);
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn get_soft_failed(&self) -> PyResult<bool> {
|
||||
let bool = get_property!(self, SoftFailed)?;
|
||||
Ok(*bool)
|
||||
}
|
||||
#[setter]
|
||||
fn set_soft_failed(&mut self, obj: bool) {
|
||||
set_property!(self, SoftFailed, obj);
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn get_proactively_send(&self) -> PyResult<bool> {
|
||||
let bool = get_property!(self, ProactivelySend)?;
|
||||
Ok(*bool)
|
||||
}
|
||||
#[setter]
|
||||
fn set_proactively_send(&mut self, obj: bool) {
|
||||
set_property!(self, ProactivelySend, obj);
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn get_redacted(&self) -> PyResult<bool> {
|
||||
let bool = get_property!(self, Redacted)?;
|
||||
Ok(*bool)
|
||||
}
|
||||
#[setter]
|
||||
fn set_redacted(&mut self, obj: bool) {
|
||||
set_property!(self, Redacted, obj);
|
||||
}
|
||||
|
||||
/// The transaction ID, if it was set when the event was created.
|
||||
#[getter]
|
||||
fn get_txn_id(&self) -> PyResult<&str> {
|
||||
let s = get_property!(self, TxnId)?;
|
||||
Ok(s)
|
||||
}
|
||||
#[setter]
|
||||
fn set_txn_id(&mut self, obj: String) {
|
||||
set_property!(self, TxnId, obj.into_boxed_str());
|
||||
}
|
||||
|
||||
/// The access token ID of the user who sent this event, if any.
|
||||
#[getter]
|
||||
fn get_token_id(&self) -> PyResult<i64> {
|
||||
let r = get_property!(self, TokenId)?;
|
||||
Ok(*r)
|
||||
}
|
||||
#[setter]
|
||||
fn set_token_id(&mut self, obj: i64) {
|
||||
set_property!(self, TokenId, obj);
|
||||
}
|
||||
|
||||
/// The device ID of the user who sent this event, if any.
|
||||
#[getter]
|
||||
fn get_device_id(&self) -> PyResult<&str> {
|
||||
let s = get_property!(self, DeviceId)?;
|
||||
Ok(s)
|
||||
}
|
||||
#[setter]
|
||||
fn set_device_id(&mut self, obj: String) {
|
||||
set_property!(self, DeviceId, obj.into_boxed_str());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
*
|
||||
* Copyright (C) 2024 New Vector, Ltd
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* See the GNU Affero General Public License for more details:
|
||||
* <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
*
|
||||
* Originally licensed under the Apache License, Version 2.0:
|
||||
* <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
*
|
||||
* [This file includes modifications made by New Vector Limited]
|
||||
*
|
||||
*/
|
||||
|
||||
//! Classes for representing Events.
|
||||
|
||||
use pyo3::{types::PyModule, PyResult, Python};
|
||||
|
||||
mod internal_metadata;
|
||||
|
||||
/// Called when registering modules with python.
|
||||
pub fn register_module(py: Python<'_>, m: &PyModule) -> PyResult<()> {
|
||||
let child_module = PyModule::new(py, "events")?;
|
||||
child_module.add_class::<internal_metadata::EventInternalMetadata>()?;
|
||||
|
||||
m.add_submodule(child_module)?;
|
||||
|
||||
// We need to manually add the module to sys.modules to make `from
|
||||
// synapse.synapse_rust import events` work.
|
||||
py.import("sys")?
|
||||
.getattr("modules")?
|
||||
.set_item("synapse.synapse_rust.events", child_module)?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -3,6 +3,7 @@ use pyo3::prelude::*;
|
|||
use pyo3_log::ResetHandle;
|
||||
|
||||
pub mod acl;
|
||||
pub mod events;
|
||||
pub mod push;
|
||||
|
||||
lazy_static! {
|
||||
|
@ -41,6 +42,7 @@ fn synapse_rust(py: Python<'_>, m: &PyModule) -> PyResult<()> {
|
|||
|
||||
acl::register_module(py, m)?;
|
||||
push::register_module(py, m)?;
|
||||
events::register_module(py, m)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ from unpaddedbase64 import encode_base64
|
|||
|
||||
from synapse.api.constants import RelationTypes
|
||||
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
|
||||
from synapse.synapse_rust.events import EventInternalMetadata
|
||||
from synapse.types import JsonDict, StrCollection
|
||||
from synapse.util.caches import intern_dict
|
||||
from synapse.util.frozenutils import freeze
|
||||
|
@ -74,7 +75,7 @@ T = TypeVar("T")
|
|||
#
|
||||
# Note that DictProperty/DefaultDictProperty cannot actually be used with
|
||||
# EventBuilder as it lacks a _dict property.
|
||||
_DictPropertyInstance = Union["_EventInternalMetadata", "EventBase", "EventBuilder"]
|
||||
_DictPropertyInstance = Union["EventBase", "EventBuilder"]
|
||||
|
||||
|
||||
class DictProperty(Generic[T]):
|
||||
|
@ -111,7 +112,7 @@ class DictProperty(Generic[T]):
|
|||
if instance is None:
|
||||
return self
|
||||
try:
|
||||
assert isinstance(instance, (EventBase, _EventInternalMetadata))
|
||||
assert isinstance(instance, EventBase)
|
||||
return instance._dict[self.key]
|
||||
except KeyError as e1:
|
||||
# We want this to look like a regular attribute error (mostly so that
|
||||
|
@ -127,11 +128,11 @@ class DictProperty(Generic[T]):
|
|||
) from e1.__context__
|
||||
|
||||
def __set__(self, instance: _DictPropertyInstance, v: T) -> None:
|
||||
assert isinstance(instance, (EventBase, _EventInternalMetadata))
|
||||
assert isinstance(instance, EventBase)
|
||||
instance._dict[self.key] = v
|
||||
|
||||
def __delete__(self, instance: _DictPropertyInstance) -> None:
|
||||
assert isinstance(instance, (EventBase, _EventInternalMetadata))
|
||||
assert isinstance(instance, EventBase)
|
||||
try:
|
||||
del instance._dict[self.key]
|
||||
except KeyError as e1:
|
||||
|
@ -176,118 +177,10 @@ class DefaultDictProperty(DictProperty, Generic[T]):
|
|||
) -> Union[T, "DefaultDictProperty"]:
|
||||
if instance is None:
|
||||
return self
|
||||
assert isinstance(instance, (EventBase, _EventInternalMetadata))
|
||||
assert isinstance(instance, EventBase)
|
||||
return instance._dict.get(self.key, self.default)
|
||||
|
||||
|
||||
class _EventInternalMetadata:
|
||||
__slots__ = ["_dict", "stream_ordering", "outlier"]
|
||||
|
||||
def __init__(self, internal_metadata_dict: JsonDict):
|
||||
# we have to copy the dict, because it turns out that the same dict is
|
||||
# reused. TODO: fix that
|
||||
self._dict = dict(internal_metadata_dict)
|
||||
|
||||
# the stream ordering of this event. None, until it has been persisted.
|
||||
self.stream_ordering: Optional[int] = None
|
||||
|
||||
# whether this event is an outlier (ie, whether we have the state at that point
|
||||
# in the DAG)
|
||||
self.outlier = False
|
||||
|
||||
out_of_band_membership: DictProperty[bool] = DictProperty("out_of_band_membership")
|
||||
send_on_behalf_of: DictProperty[str] = DictProperty("send_on_behalf_of")
|
||||
recheck_redaction: DictProperty[bool] = DictProperty("recheck_redaction")
|
||||
soft_failed: DictProperty[bool] = DictProperty("soft_failed")
|
||||
proactively_send: DictProperty[bool] = DictProperty("proactively_send")
|
||||
redacted: DictProperty[bool] = DictProperty("redacted")
|
||||
|
||||
txn_id: DictProperty[str] = DictProperty("txn_id")
|
||||
"""The transaction ID, if it was set when the event was created."""
|
||||
|
||||
token_id: DictProperty[int] = DictProperty("token_id")
|
||||
"""The access token ID of the user who sent this event, if any."""
|
||||
|
||||
device_id: DictProperty[str] = DictProperty("device_id")
|
||||
"""The device ID of the user who sent this event, if any."""
|
||||
|
||||
def get_dict(self) -> JsonDict:
|
||||
return dict(self._dict)
|
||||
|
||||
def is_outlier(self) -> bool:
|
||||
return self.outlier
|
||||
|
||||
def is_out_of_band_membership(self) -> bool:
|
||||
"""Whether this event is an out-of-band membership.
|
||||
|
||||
OOB memberships are a special case of outlier events: they are membership events
|
||||
for federated rooms that we aren't full members of. Examples include invites
|
||||
received over federation, and rejections for such invites.
|
||||
|
||||
The concept of an OOB membership is needed because these events need to be
|
||||
processed as if they're new regular events (e.g. updating membership state in
|
||||
the database, relaying to clients via /sync, etc) despite being outliers.
|
||||
|
||||
See also https://element-hq.github.io/synapse/develop/development/room-dag-concepts.html#out-of-band-membership-events.
|
||||
|
||||
(Added in synapse 0.99.0, so may be unreliable for events received before that)
|
||||
"""
|
||||
return self._dict.get("out_of_band_membership", False)
|
||||
|
||||
def get_send_on_behalf_of(self) -> Optional[str]:
|
||||
"""Whether this server should send the event on behalf of another server.
|
||||
This is used by the federation "send_join" API to forward the initial join
|
||||
event for a server in the room.
|
||||
|
||||
returns a str with the name of the server this event is sent on behalf of.
|
||||
"""
|
||||
return self._dict.get("send_on_behalf_of")
|
||||
|
||||
def need_to_check_redaction(self) -> bool:
|
||||
"""Whether the redaction event needs to be rechecked when fetching
|
||||
from the database.
|
||||
|
||||
Starting in room v3 redaction events are accepted up front, and later
|
||||
checked to see if the redacter and redactee's domains match.
|
||||
|
||||
If the sender of the redaction event is allowed to redact any event
|
||||
due to auth rules, then this will always return false.
|
||||
"""
|
||||
return self._dict.get("recheck_redaction", False)
|
||||
|
||||
def is_soft_failed(self) -> bool:
|
||||
"""Whether the event has been soft failed.
|
||||
|
||||
Soft failed events should be handled as usual, except:
|
||||
1. They should not go down sync or event streams, or generally
|
||||
sent to clients.
|
||||
2. They should not be added to the forward extremities (and
|
||||
therefore not to current state).
|
||||
"""
|
||||
return self._dict.get("soft_failed", False)
|
||||
|
||||
def should_proactively_send(self) -> bool:
|
||||
"""Whether the event, if ours, should be sent to other clients and
|
||||
servers.
|
||||
|
||||
This is used for sending dummy events internally. Servers and clients
|
||||
can still explicitly fetch the event.
|
||||
"""
|
||||
return self._dict.get("proactively_send", True)
|
||||
|
||||
def is_redacted(self) -> bool:
|
||||
"""Whether the event has been redacted.
|
||||
|
||||
This is used for efficiently checking whether an event has been
|
||||
marked as redacted without needing to make another database call.
|
||||
"""
|
||||
return self._dict.get("redacted", False)
|
||||
|
||||
def is_notifiable(self) -> bool:
|
||||
"""Whether this event can trigger a push notification"""
|
||||
return not self.is_outlier() or self.is_out_of_band_membership()
|
||||
|
||||
|
||||
class EventBase(metaclass=abc.ABCMeta):
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
|
@ -313,7 +206,7 @@ class EventBase(metaclass=abc.ABCMeta):
|
|||
|
||||
self._dict = event_dict
|
||||
|
||||
self.internal_metadata = _EventInternalMetadata(internal_metadata_dict)
|
||||
self.internal_metadata = EventInternalMetadata(internal_metadata_dict)
|
||||
|
||||
depth: DictProperty[int] = DictProperty("depth")
|
||||
content: DictProperty[JsonDict] = DictProperty("content")
|
||||
|
|
|
@ -31,9 +31,10 @@ from synapse.api.room_versions import (
|
|||
)
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.event_auth import auth_types_for_event
|
||||
from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.state import StateHandler
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.synapse_rust.events import EventInternalMetadata
|
||||
from synapse.types import EventID, JsonDict, StrCollection
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import Clock
|
||||
|
@ -93,8 +94,8 @@ class EventBuilder:
|
|||
_redacts: Optional[str] = None
|
||||
_origin_server_ts: Optional[int] = None
|
||||
|
||||
internal_metadata: _EventInternalMetadata = attr.Factory(
|
||||
lambda: _EventInternalMetadata({})
|
||||
internal_metadata: EventInternalMetadata = attr.Factory(
|
||||
lambda: EventInternalMetadata({})
|
||||
)
|
||||
|
||||
@property
|
||||
|
|
|
@ -1155,7 +1155,7 @@ class FederationClient(FederationBase):
|
|||
# NB: We *need* to copy to ensure that we don't have multiple
|
||||
# references being passed on, as that causes... issues.
|
||||
for s in signed_state:
|
||||
s.internal_metadata = copy.deepcopy(s.internal_metadata)
|
||||
s.internal_metadata = s.internal_metadata.copy()
|
||||
|
||||
# double-check that the auth chain doesn't include a different create event
|
||||
auth_chain_create_events = [
|
||||
|
|
|
@ -1496,7 +1496,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
room_version_id=row[5],
|
||||
rejected_reason=row[6],
|
||||
redactions=[],
|
||||
outlier=row[7],
|
||||
outlier=bool(row[7]), # This is an int in SQLite3
|
||||
)
|
||||
|
||||
# check for redactions
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from synapse.types import JsonDict
|
||||
|
||||
class EventInternalMetadata:
|
||||
def __init__(self, internal_metadata_dict: JsonDict): ...
|
||||
|
||||
stream_ordering: Optional[int]
|
||||
"""the stream ordering of this event. None, until it has been persisted."""
|
||||
|
||||
outlier: bool
|
||||
"""whether this event is an outlier (ie, whether we have the state at that
|
||||
point in the DAG)"""
|
||||
|
||||
out_of_band_membership: bool
|
||||
send_on_behalf_of: str
|
||||
recheck_redaction: bool
|
||||
soft_failed: bool
|
||||
proactively_send: bool
|
||||
redacted: bool
|
||||
|
||||
txn_id: str
|
||||
"""The transaction ID, if it was set when the event was created."""
|
||||
token_id: int
|
||||
"""The access token ID of the user who sent this event, if any."""
|
||||
device_id: str
|
||||
"""The device ID of the user who sent this event, if any."""
|
||||
|
||||
def get_dict(self) -> JsonDict: ...
|
||||
def is_outlier(self) -> bool: ...
|
||||
def copy(self) -> "EventInternalMetadata": ...
|
||||
def is_out_of_band_membership(self) -> bool:
|
||||
"""Whether this event is an out-of-band membership.
|
||||
|
||||
OOB memberships are a special case of outlier events: they are membership events
|
||||
for federated rooms that we aren't full members of. Examples include invites
|
||||
received over federation, and rejections for such invites.
|
||||
|
||||
The concept of an OOB membership is needed because these events need to be
|
||||
processed as if they're new regular events (e.g. updating membership state in
|
||||
the database, relaying to clients via /sync, etc) despite being outliers.
|
||||
|
||||
See also https://element-hq.github.io/synapse/develop/development/room-dag-concepts.html#out-of-band-membership-events.
|
||||
|
||||
(Added in synapse 0.99.0, so may be unreliable for events received before that)
|
||||
"""
|
||||
...
|
||||
def get_send_on_behalf_of(self) -> Optional[str]:
|
||||
"""Whether this server should send the event on behalf of another server.
|
||||
This is used by the federation "send_join" API to forward the initial join
|
||||
event for a server in the room.
|
||||
|
||||
returns a str with the name of the server this event is sent on behalf of.
|
||||
"""
|
||||
...
|
||||
def need_to_check_redaction(self) -> bool:
|
||||
"""Whether the redaction event needs to be rechecked when fetching
|
||||
from the database.
|
||||
|
||||
Starting in room v3 redaction events are accepted up front, and later
|
||||
checked to see if the redacter and redactee's domains match.
|
||||
|
||||
If the sender of the redaction event is allowed to redact any event
|
||||
due to auth rules, then this will always return false.
|
||||
"""
|
||||
...
|
||||
def is_soft_failed(self) -> bool:
|
||||
"""Whether the event has been soft failed.
|
||||
|
||||
Soft failed events should be handled as usual, except:
|
||||
1. They should not go down sync or event streams, or generally
|
||||
sent to clients.
|
||||
2. They should not be added to the forward extremities (and
|
||||
therefore not to current state).
|
||||
"""
|
||||
...
|
||||
def should_proactively_send(self) -> bool:
|
||||
"""Whether the event, if ours, should be sent to other clients and
|
||||
servers.
|
||||
|
||||
This is used for sending dummy events internally. Servers and clients
|
||||
can still explicitly fetch the event.
|
||||
"""
|
||||
...
|
||||
def is_redacted(self) -> bool:
|
||||
"""Whether the event has been redacted.
|
||||
|
||||
This is used for efficiently checking whether an event has been
|
||||
marked as redacted without needing to make another database call.
|
||||
"""
|
||||
...
|
||||
def is_notifiable(self) -> bool:
|
||||
"""Whether this event can trigger a push notification"""
|
||||
...
|
|
@ -324,7 +324,7 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
|
|||
)
|
||||
|
||||
self.event_ids: List[str] = []
|
||||
for idx in range(20):
|
||||
for idx in range(1, 21): # Stream ordering starts at 1.
|
||||
event_json = {
|
||||
"type": f"test {idx}",
|
||||
"room_id": self.room_id,
|
||||
|
|
|
@ -44,12 +44,13 @@ from synapse.api.room_versions import (
|
|||
EventFormatVersions,
|
||||
RoomVersion,
|
||||
)
|
||||
from synapse.events import EventBase, _EventInternalMetadata
|
||||
from synapse.events import EventBase
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.synapse_rust.events import EventInternalMetadata
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock, json_encoder
|
||||
|
||||
|
@ -1209,7 +1210,7 @@ class FakeEvent:
|
|||
type = "foo"
|
||||
state_key = "foo"
|
||||
|
||||
internal_metadata = _EventInternalMetadata({})
|
||||
internal_metadata = EventInternalMetadata({})
|
||||
|
||||
def auth_event_ids(self) -> List[str]:
|
||||
return self.auth_events
|
||||
|
|
|
@ -25,9 +25,10 @@ from twisted.test.proto_helpers import MemoryReactor
|
|||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.events import EventBase, _EventInternalMetadata
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.builder import EventBuilder
|
||||
from synapse.server import HomeServer
|
||||
from synapse.synapse_rust.events import EventInternalMetadata
|
||||
from synapse.types import JsonDict, RoomID, UserID
|
||||
from synapse.util import Clock
|
||||
|
||||
|
@ -268,7 +269,7 @@ class RedactionTestCase(unittest.HomeserverTestCase):
|
|||
return self._base_builder.type
|
||||
|
||||
@property
|
||||
def internal_metadata(self) -> _EventInternalMetadata:
|
||||
def internal_metadata(self) -> EventInternalMetadata:
|
||||
return self._base_builder.internal_metadata
|
||||
|
||||
event_1, unpersisted_context_1 = self.get_success(
|
||||
|
|
Loading…
Reference in New Issue