Fix providing a `RoomStreamToken` instance to `_notify_app_services_ephemeral` (#11137)
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
parent
7537201840
commit
c9c3aea9b1
|
@ -0,0 +1 @@
|
||||||
|
Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code.
|
|
@ -182,7 +182,7 @@ class ApplicationServicesHandler:
|
||||||
def notify_interested_services_ephemeral(
|
def notify_interested_services_ephemeral(
|
||||||
self,
|
self,
|
||||||
stream_key: str,
|
stream_key: str,
|
||||||
new_token: Optional[int],
|
new_token: Union[int, RoomStreamToken],
|
||||||
users: Optional[Collection[Union[str, UserID]]] = None,
|
users: Optional[Collection[Union[str, UserID]]] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
|
@ -203,7 +203,7 @@ class ApplicationServicesHandler:
|
||||||
Appservices will only receive ephemeral events that fall within their
|
Appservices will only receive ephemeral events that fall within their
|
||||||
registered user and room namespaces.
|
registered user and room namespaces.
|
||||||
|
|
||||||
new_token: The latest stream token.
|
new_token: The stream token of the event.
|
||||||
users: The users that should be informed of the new event, if any.
|
users: The users that should be informed of the new event, if any.
|
||||||
"""
|
"""
|
||||||
if not self.notify_appservices:
|
if not self.notify_appservices:
|
||||||
|
@ -212,6 +212,19 @@ class ApplicationServicesHandler:
|
||||||
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
|
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Assert that new_token is an integer (and not a RoomStreamToken).
|
||||||
|
# All of the supported streams that this function handles use an
|
||||||
|
# integer to track progress (rather than a RoomStreamToken - a
|
||||||
|
# vector clock implementation) as they don't support multiple
|
||||||
|
# stream writers.
|
||||||
|
#
|
||||||
|
# As a result, we simply assert that new_token is an integer.
|
||||||
|
# If we do end up needing to pass a RoomStreamToken down here
|
||||||
|
# in the future, using RoomStreamToken.stream (the minimum stream
|
||||||
|
# position) to convert to an ascending integer value should work.
|
||||||
|
# Additional context: https://github.com/matrix-org/synapse/pull/11137
|
||||||
|
assert isinstance(new_token, int)
|
||||||
|
|
||||||
services = [
|
services = [
|
||||||
service
|
service
|
||||||
for service in self.store.get_app_services()
|
for service in self.store.get_app_services()
|
||||||
|
@ -231,14 +244,13 @@ class ApplicationServicesHandler:
|
||||||
self,
|
self,
|
||||||
services: List[ApplicationService],
|
services: List[ApplicationService],
|
||||||
stream_key: str,
|
stream_key: str,
|
||||||
new_token: Optional[int],
|
new_token: int,
|
||||||
users: Collection[Union[str, UserID]],
|
users: Collection[Union[str, UserID]],
|
||||||
) -> None:
|
) -> None:
|
||||||
logger.debug("Checking interested services for %s" % (stream_key))
|
logger.debug("Checking interested services for %s" % (stream_key))
|
||||||
with Measure(self.clock, "notify_interested_services_ephemeral"):
|
with Measure(self.clock, "notify_interested_services_ephemeral"):
|
||||||
for service in services:
|
for service in services:
|
||||||
# Only handle typing if we have the latest token
|
if stream_key == "typing_key":
|
||||||
if stream_key == "typing_key" and new_token is not None:
|
|
||||||
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
|
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
|
||||||
# for typing_key due to performance reasons and due to their highly
|
# for typing_key due to performance reasons and due to their highly
|
||||||
# ephemeral nature.
|
# ephemeral nature.
|
||||||
|
|
|
@ -383,29 +383,6 @@ class Notifier:
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error notifying application services of event")
|
logger.exception("Error notifying application services of event")
|
||||||
|
|
||||||
def _notify_app_services_ephemeral(
|
|
||||||
self,
|
|
||||||
stream_key: str,
|
|
||||||
new_token: Union[int, RoomStreamToken],
|
|
||||||
users: Optional[Collection[Union[str, UserID]]] = None,
|
|
||||||
) -> None:
|
|
||||||
"""Notify application services of ephemeral event activity.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
stream_key: The stream the event came from.
|
|
||||||
new_token: The value of the new stream token.
|
|
||||||
users: The users that should be informed of the new event, if any.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
stream_token = None
|
|
||||||
if isinstance(new_token, int):
|
|
||||||
stream_token = new_token
|
|
||||||
self.appservice_handler.notify_interested_services_ephemeral(
|
|
||||||
stream_key, stream_token, users or []
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Error notifying application services of event")
|
|
||||||
|
|
||||||
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
|
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
|
||||||
try:
|
try:
|
||||||
self._pusher_pool.on_new_notifications(max_room_stream_token)
|
self._pusher_pool.on_new_notifications(max_room_stream_token)
|
||||||
|
@ -467,12 +444,15 @@ class Notifier:
|
||||||
|
|
||||||
self.notify_replication()
|
self.notify_replication()
|
||||||
|
|
||||||
# Notify appservices
|
# Notify appservices.
|
||||||
self._notify_app_services_ephemeral(
|
try:
|
||||||
stream_key,
|
self.appservice_handler.notify_interested_services_ephemeral(
|
||||||
new_token,
|
stream_key,
|
||||||
users,
|
new_token,
|
||||||
)
|
users,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error notifying application services of event")
|
||||||
|
|
||||||
def on_new_replication_data(self) -> None:
|
def on_new_replication_data(self) -> None:
|
||||||
"""Used to inform replication listeners that something has happened
|
"""Used to inform replication listeners that something has happened
|
||||||
|
|
|
@ -427,7 +427,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||||
user_ids: the users who were signed
|
user_ids: the users who were signed
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
THe new stream ID.
|
The new stream ID.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async with self._device_list_id_gen.get_next() as stream_id:
|
async with self._device_list_id_gen.get_next() as stream_id:
|
||||||
|
@ -1322,7 +1322,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
|
|
||||||
async def add_device_change_to_streams(
|
async def add_device_change_to_streams(
|
||||||
self, user_id: str, device_ids: Collection[str], hosts: List[str]
|
self, user_id: str, device_ids: Collection[str], hosts: List[str]
|
||||||
):
|
) -> int:
|
||||||
"""Persist that a user's devices have been updated, and which hosts
|
"""Persist that a user's devices have been updated, and which hosts
|
||||||
(if any) should be poked.
|
(if any) should be poked.
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -92,7 +92,7 @@ class PresenceStore(PresenceBackgroundUpdateStore):
|
||||||
prefilled_cache=presence_cache_prefill,
|
prefilled_cache=presence_cache_prefill,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def update_presence(self, presence_states):
|
async def update_presence(self, presence_states) -> Tuple[int, int]:
|
||||||
assert self._can_persist_presence
|
assert self._can_persist_presence
|
||||||
|
|
||||||
stream_ordering_manager = self._presence_id_gen.get_next_mult(
|
stream_ordering_manager = self._presence_id_gen.get_next_mult(
|
||||||
|
|
Loading…
Reference in New Issue