Track presence state per-device and combine to a user state. (#16066)

Tracks presence on an individual per-device basis and combine
the per-device state into a per-user state. This should help in
situations where a user has multiple devices with conflicting status
(e.g. one is syncing with unavailable and one is syncing with online).

The tie-breaking is done by priority:

    BUSY > ONLINE > UNAVAILABLE > OFFLINE
This commit is contained in:
Patrick Cloke 2023-09-05 09:58:51 -04:00 committed by GitHub
parent 36ae8611fe
commit ea75346f6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 764 additions and 63 deletions

1
changelog.d/16066.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.

1
changelog.d/16170.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.

View File

@ -1 +0,0 @@
Simplify presence code when using workers.

1
changelog.d/16171.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.

View File

@ -1 +0,0 @@
Track per-device information in the presence code.

1
changelog.d/16172.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.

View File

@ -1 +0,0 @@
Track per-device information in the presence code.

View File

@ -20,18 +20,53 @@ from synapse.api.constants import PresenceState
from synapse.types import JsonDict from synapse.types import JsonDict
@attr.s(slots=True, auto_attribs=True)
class UserDevicePresenceState:
"""
Represents the current presence state of a user's device.
user_id: The user ID.
device_id: The user's device ID.
state: The presence state, see PresenceState.
last_active_ts: Time in msec that the device last interacted with server.
last_sync_ts: Time in msec that the device last *completed* a sync
(or event stream).
"""
user_id: str
device_id: Optional[str]
state: str
last_active_ts: int
last_sync_ts: int
@classmethod
def default(
cls, user_id: str, device_id: Optional[str]
) -> "UserDevicePresenceState":
"""Returns a default presence state."""
return cls(
user_id=user_id,
device_id=device_id,
state=PresenceState.OFFLINE,
last_active_ts=0,
last_sync_ts=0,
)
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
class UserPresenceState: class UserPresenceState:
"""Represents the current presence state of the user. """Represents the current presence state of the user.
user_id user_id: The user ID.
last_active: Time in msec that the user last interacted with server. state: The presence state, see PresenceState.
last_federation_update: Time in msec since either a) we sent a presence last_active_ts: Time in msec that the user last interacted with server.
last_federation_update_ts: Time in msec since either a) we sent a presence
update to other servers or b) we received a presence update, depending update to other servers or b) we received a presence update, depending
on if is a local user or not. on if is a local user or not.
last_user_sync: Time in msec that the user last *completed* a sync last_user_sync_ts: Time in msec that the user last *completed* a sync
(or event stream). (or event stream).
status_msg: User set status message. status_msg: User set status message.
currently_active: True if the user is currently syncing.
""" """
user_id: str user_id: str

View File

@ -13,13 +13,56 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""This module is responsible for keeping track of presence status of local """
This module is responsible for keeping track of presence status of local
and remote users. and remote users.
The methods that define policy are: The methods that define policy are:
- PresenceHandler._update_states - PresenceHandler._update_states
- PresenceHandler._handle_timeouts - PresenceHandler._handle_timeouts
- should_notify - should_notify
# Tracking local presence
For local users, presence is tracked on a per-device basis. When a user has multiple
devices the user presence state is derived by coalescing the presence from each
device:
BUSY > ONLINE > UNAVAILABLE > OFFLINE
The time that each device was last active and last synced is tracked in order to
automatically downgrade a device's presence state:
A device may move from ONLINE -> UNAVAILABLE, if it has not been active for
a period of time.
A device may go from any state -> OFFLINE, if it is not active and has not
synced for a period of time.
The timeouts are handled using a wheel timer, which has coarse buckets. Timings
do not need to be exact.
Generally a device's presence state is updated whenever a user syncs (via the
set_presence parameter), when the presence API is called, or if "pro-active"
events occur, including:
* Sending an event, receipt, read marker.
* Updating typing status.
The busy state has special status that it cannot is not downgraded by a call to
sync with a lower priority state *and* it takes a long period of time to transition
to offline.
# Persisting (and restoring) presence
For all users, presence is persisted on a per-user basis. Data is kept in-memory
and persisted periodically. When Synapse starts each worker loads the current
presence state and then tracks the presence stream to keep itself up-to-date.
When restoring presence for local users a pseudo-device is created to match the
user state; this device follows the normal timeout logic (see above) and will
automatically be replaced with any information from currently available devices.
""" """
import abc import abc
import contextlib import contextlib
@ -30,6 +73,7 @@ from contextlib import contextmanager
from types import TracebackType from types import TracebackType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
AbstractSet,
Any, Any,
Callable, Callable,
Collection, Collection,
@ -49,7 +93,7 @@ from prometheus_client import Counter
import synapse.metrics import synapse.metrics
from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.appservice import ApplicationService from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background from synapse.logging.context import run_in_background
@ -162,6 +206,7 @@ class BasePresenceHandler(abc.ABC):
self.VALID_PRESENCE += (PresenceState.BUSY,) self.VALID_PRESENCE += (PresenceState.BUSY,)
active_presence = self.store.take_presence_startup_info() active_presence = self.store.take_presence_startup_info()
# The combined status across all user devices.
self.user_to_current_state = {state.user_id: state for state in active_presence} self.user_to_current_state = {state.user_id: state for state in active_presence}
@abc.abstractmethod @abc.abstractmethod
@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler):
lambda: len(self.user_to_current_state), lambda: len(self.user_to_current_state),
) )
# The per-device presence state, maps user to devices to per-device presence state.
self._user_to_device_to_current_state: Dict[
str, Dict[Optional[str], UserDevicePresenceState]
] = {}
now = self.clock.time_msec() now = self.clock.time_msec()
if self._presence_enabled: if self._presence_enabled:
for state in self.user_to_current_state.values(): for state in self.user_to_current_state.values():
# Create a psuedo-device to properly handle time outs. This will
# be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
pseudo_device_id = None
self._user_to_device_to_current_state[state.user_id] = {
pseudo_device_id: UserDevicePresenceState(
user_id=state.user_id,
device_id=pseudo_device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
}
self.wheel_timer.insert( self.wheel_timer.insert(
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
) )
@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler):
# Keeps track of the number of *ongoing* syncs on other processes. # Keeps track of the number of *ongoing* syncs on other processes.
# #
# While any sync is ongoing on another process the user will never # While any sync is ongoing on another process the user's device will never
# go offline. # go offline.
# #
# Each process has a unique identifier and an update frequency. If # Each process has a unique identifier and an update frequency. If
@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler):
timers_fired_counter.inc(len(states)) timers_fired_counter.inc(len(states))
syncing_user_ids = { # Set of user ID & device IDs which are currently syncing.
user_id syncing_user_devices = {
for (user_id, _), count in self._user_device_to_num_current_syncs.items() user_id_device_id
for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
if count if count
} }
syncing_user_ids.update( syncing_user_devices.update(
user_id itertools.chain(*self.external_process_to_current_syncs.values())
for user_id, _ in itertools.chain(
*self.external_process_to_current_syncs.values()
)
) )
changes = handle_timeouts( changes = handle_timeouts(
states, states,
is_mine_fn=self.is_mine_id, is_mine_fn=self.is_mine_id,
syncing_user_ids=syncing_user_ids, syncing_user_devices=syncing_user_devices,
user_to_devices=self._user_to_device_to_current_state,
now=now, now=now,
) )
@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler):
bump_active_time_counter.inc() bump_active_time_counter.inc()
prev_state = await self.current_state_for_user(user_id) now = self.clock.time_msec()
new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()} # Update the device information & mark the device as online if it was
if prev_state.state == PresenceState.UNAVAILABLE: # unavailable.
new_fields["state"] = PresenceState.ONLINE devices = self._user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id,
UserDevicePresenceState.default(user_id, device_id),
)
device_state.last_active_ts = now
if device_state.state == PresenceState.UNAVAILABLE:
device_state.state = PresenceState.ONLINE
# Update the user state, this will always update last_active_ts and
# might update the presence state.
prev_state = await self.current_state_for_user(user_id)
new_fields: Dict[str, Any] = {
"last_active_ts": now,
"state": _combine_device_states(devices.values()),
}
await self._update_states([prev_state.copy_and_replace(**new_fields)]) await self._update_states([prev_state.copy_and_replace(**new_fields)])
@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler):
if is_syncing and (user_id, device_id) not in process_presence: if is_syncing and (user_id, device_id) not in process_presence:
process_presence.add((user_id, device_id)) process_presence.add((user_id, device_id))
elif not is_syncing and (user_id, device_id) in process_presence: elif not is_syncing and (user_id, device_id) in process_presence:
devices = self._user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id, UserDevicePresenceState.default(user_id, device_id)
)
device_state.last_sync_ts = sync_time_msec
new_state = prev_state.copy_and_replace( new_state = prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec last_user_sync_ts=sync_time_msec
) )
@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler):
process_presence = self.external_process_to_current_syncs.pop( process_presence = self.external_process_to_current_syncs.pop(
process_id, set() process_id, set()
) )
prev_states = await self.current_state_for_users(
{user_id for user_id, device_id in process_presence}
)
time_now_ms = self.clock.time_msec() time_now_ms = self.clock.time_msec()
# Mark each device as having a last sync time.
updated_users = set()
for user_id, device_id in process_presence:
device_state = self._user_to_device_to_current_state.setdefault(
user_id, {}
).setdefault(
device_id, UserDevicePresenceState.default(user_id, device_id)
)
device_state.last_sync_ts = time_now_ms
updated_users.add(user_id)
# Update each user (and insert into the appropriate timers to check if
# they've gone offline).
prev_states = await self.current_state_for_users(updated_users)
await self._update_states( await self._update_states(
[ [
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler):
if prev_state.state == PresenceState.BUSY and is_sync: if prev_state.state == PresenceState.BUSY and is_sync:
presence = PresenceState.BUSY presence = PresenceState.BUSY
# Update the device specific information.
devices = self._user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id,
UserDevicePresenceState.default(user_id, device_id),
)
device_state.state = presence
device_state.last_active_ts = now
if is_sync:
device_state.last_sync_ts = now
# Based on the state of each user's device calculate the new presence state.
presence = _combine_device_states(devices.values())
new_fields = {"state": presence} new_fields = {"state": presence}
if presence == PresenceState.ONLINE or presence == PresenceState.BUSY: if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
def handle_timeouts( def handle_timeouts(
user_states: List[UserPresenceState], user_states: List[UserPresenceState],
is_mine_fn: Callable[[str], bool], is_mine_fn: Callable[[str], bool],
syncing_user_ids: Set[str], syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]],
user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]],
now: int, now: int,
) -> List[UserPresenceState]: ) -> List[UserPresenceState]:
"""Checks the presence of users that have timed out and updates as """Checks the presence of users that have timed out and updates as
@ -1882,7 +1993,8 @@ def handle_timeouts(
Args: Args:
user_states: List of UserPresenceState's to check. user_states: List of UserPresenceState's to check.
is_mine_fn: Function that returns if a user_id is ours is_mine_fn: Function that returns if a user_id is ours
syncing_user_ids: Set of user_ids with active syncs. syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
user_to_devices: A map of user ID to device ID to UserDevicePresenceState.
now: Current time in ms. now: Current time in ms.
Returns: Returns:
@ -1891,9 +2003,16 @@ def handle_timeouts(
changes = {} # Actual changes we need to notify people about changes = {} # Actual changes we need to notify people about
for state in user_states: for state in user_states:
is_mine = is_mine_fn(state.user_id) user_id = state.user_id
is_mine = is_mine_fn(user_id)
new_state = handle_timeout(state, is_mine, syncing_user_ids, now) new_state = handle_timeout(
state,
is_mine,
syncing_user_devices,
user_to_devices.get(user_id, {}),
now,
)
if new_state: if new_state:
changes[state.user_id] = new_state changes[state.user_id] = new_state
@ -1901,14 +2020,19 @@ def handle_timeouts(
def handle_timeout( def handle_timeout(
state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int state: UserPresenceState,
is_mine: bool,
syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]],
user_devices: Dict[Optional[str], UserDevicePresenceState],
now: int,
) -> Optional[UserPresenceState]: ) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed """Checks the presence of the user to see if any of the timers have elapsed
Args: Args:
state state: UserPresenceState to check.
is_mine: Whether the user is ours is_mine: Whether the user is ours
syncing_user_ids: Set of user_ids with active syncs. syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
user_devices: A map of device ID to UserDevicePresenceState.
now: Current time in ms. now: Current time in ms.
Returns: Returns:
@ -1919,34 +2043,55 @@ def handle_timeout(
return None return None
changed = False changed = False
user_id = state.user_id
if is_mine: if is_mine:
if state.state == PresenceState.ONLINE: # Check per-device whether the device should be considered idle or offline
if now - state.last_active_ts > IDLE_TIMER: # due to timeouts.
# Currently online, but last activity ages ago so auto device_changed = False
# idle offline_devices = []
state = state.copy_and_replace(state=PresenceState.UNAVAILABLE) for device_id, device_state in user_devices.items():
changed = True if device_state.state == PresenceState.ONLINE:
elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: if now - device_state.last_active_ts > IDLE_TIMER:
# So that we send down a notification that we've # Currently online, but last activity ages ago so auto
# stopped updating. # idle
device_state.state = PresenceState.UNAVAILABLE
device_changed = True
# If there are have been no sync for a while (and none ongoing),
# set presence to offline.
if (state.user_id, device_id) not in syncing_device_ids:
# If the user has done something recently but hasn't synced,
# don't set them as offline.
sync_or_active = max(
device_state.last_sync_ts, device_state.last_active_ts
)
if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
# Mark the device as going offline.
offline_devices.append(device_id)
device_changed = True
# Offline devices are not needed and do not add information.
for device_id in offline_devices:
user_devices.pop(device_id)
# If the presence state of the devices changed, then (maybe) update
# the user's overall presence state.
if device_changed:
new_presence = _combine_device_states(user_devices.values())
if new_presence != state.state:
state = state.copy_and_replace(state=new_presence)
changed = True changed = True
if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# So that we send down a notification that we've
# stopped updating.
changed = True
if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
# Need to send ping to other servers to ensure they don't # Need to send ping to other servers to ensure they don't
# timeout and set us to offline # timeout and set us to offline
changed = True changed = True
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
if user_id not in syncing_user_ids:
# If the user has done something recently but hasn't synced,
# don't set them as offline.
sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(state=PresenceState.OFFLINE)
changed = True
else: else:
# We expect to be poked occasionally by the other side. # We expect to be poked occasionally by the other side.
# This is to protect against forgetful/buggy servers, so that # This is to protect against forgetful/buggy servers, so that
@ -2036,6 +2181,46 @@ def handle_update(
return new_state, persist_and_notify, federation_ping return new_state, persist_and_notify, federation_ping
PRESENCE_BY_PRIORITY = {
PresenceState.BUSY: 4,
PresenceState.ONLINE: 3,
PresenceState.UNAVAILABLE: 2,
PresenceState.OFFLINE: 1,
}
def _combine_device_states(
device_states: Iterable[UserDevicePresenceState],
) -> str:
"""
Find the device to use presence information from.
Orders devices by priority, then last_active_ts.
Args:
device_states: An iterable of device presence states
Return:
The combined presence state.
"""
# Based on (all) the user's devices calculate the new presence state.
presence = PresenceState.OFFLINE
last_active_ts = -1
# Find the device to use the presence state of based on the presence priority,
# but tie-break with how recently the device has been seen.
for device_state in device_states:
if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > (
PRESENCE_BY_PRIORITY[presence],
last_active_ts,
):
presence = device_state.state
last_active_ts = device_state.last_active_ts
return presence
async def get_interested_parties( async def get_interested_parties(
store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState] store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]: ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:

View File

@ -21,7 +21,7 @@ from signedjson.key import generate_signing_key
from twisted.test.proto_helpers import MemoryReactor from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.presence import UserPresenceState from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events.builder import EventBuilder from synapse.events.builder import EventBuilder
from synapse.federation.sender import FederationSender from synapse.federation.sender import FederationSender
@ -352,6 +352,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_idle_timer(self) -> None: def test_idle_timer(self) -> None:
user_id = "@foo:bar" user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!" status_msg = "I'm here!"
now = 5000000 now = 5000000
@ -362,8 +363,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now, last_user_sync_ts=now,
status_msg=status_msg, status_msg=status_msg,
) )
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
assert new_state is not None assert new_state is not None
@ -376,6 +390,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
presence state into unavailable. presence state into unavailable.
""" """
user_id = "@foo:bar" user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!" status_msg = "I'm here!"
now = 5000000 now = 5000000
@ -386,8 +401,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now, last_user_sync_ts=now,
status_msg=status_msg, status_msg=status_msg,
) )
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
assert new_state is not None assert new_state is not None
@ -396,6 +424,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_sync_timeout(self) -> None: def test_sync_timeout(self) -> None:
user_id = "@foo:bar" user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!" status_msg = "I'm here!"
now = 5000000 now = 5000000
@ -406,8 +435,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1, last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
status_msg=status_msg, status_msg=status_msg,
) )
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
assert new_state is not None assert new_state is not None
@ -416,6 +458,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_sync_online(self) -> None: def test_sync_online(self) -> None:
user_id = "@foo:bar" user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!" status_msg = "I'm here!"
now = 5000000 now = 5000000
@ -426,9 +469,20 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1, last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
status_msg=status_msg, status_msg=status_msg,
) )
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=True, syncing_user_ids={user_id}, now=now state,
is_mine=True,
syncing_device_ids={(user_id, device_id)},
user_devices={device_id: device_state},
now=now,
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -438,6 +492,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_federation_ping(self) -> None: def test_federation_ping(self) -> None:
user_id = "@foo:bar" user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!" status_msg = "I'm here!"
now = 5000000 now = 5000000
@ -449,14 +504,28 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1, last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
status_msg=status_msg, status_msg=status_msg,
) )
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
self.assertEqual(state, new_state) self.assertEqual(state, new_state)
def test_no_timeout(self) -> None: def test_no_timeout(self) -> None:
user_id = "@foo:bar" user_id = "@foo:bar"
device_id = "dev-1"
now = 5000000 now = 5000000
state = UserPresenceState.default(user_id) state = UserPresenceState.default(user_id)
@ -466,8 +535,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now, last_user_sync_ts=now,
last_federation_update_ts=now, last_federation_update_ts=now,
) )
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNone(new_state) self.assertIsNone(new_state)
@ -485,8 +567,9 @@ class PresenceTimeoutTestCase(unittest.TestCase):
status_msg=status_msg, status_msg=status_msg,
) )
# Note that this is a remote user so we do not have their device information.
new_state = handle_timeout( new_state = handle_timeout(
state, is_mine=False, syncing_user_ids=set(), now=now state, is_mine=False, syncing_device_ids=set(), user_devices={}, now=now
) )
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
@ -496,6 +579,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_last_active(self) -> None: def test_last_active(self) -> None:
user_id = "@foo:bar" user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!" status_msg = "I'm here!"
now = 5000000 now = 5000000
@ -507,8 +591,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_federation_update_ts=now, last_federation_update_ts=now,
status_msg=status_msg, status_msg=status_msg,
) )
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state) self.assertIsNotNone(new_state)
self.assertEqual(state, new_state) self.assertEqual(state, new_state)
@ -579,7 +676,7 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
[ [
(PresenceState.BUSY, PresenceState.BUSY), (PresenceState.BUSY, PresenceState.BUSY),
(PresenceState.ONLINE, PresenceState.ONLINE), (PresenceState.ONLINE, PresenceState.ONLINE),
(PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE), (PresenceState.UNAVAILABLE, PresenceState.ONLINE),
# Offline syncs don't update the state. # Offline syncs don't update the state.
(PresenceState.OFFLINE, PresenceState.ONLINE), (PresenceState.OFFLINE, PresenceState.ONLINE),
] ]
@ -800,6 +897,389 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
# we should now be online # we should now be online
self.assertEqual(state.state, PresenceState.ONLINE) self.assertEqual(state.state, PresenceState.ONLINE)
@parameterized.expand(
# A list of tuples of 4 strings:
#
# * The presence state of device 1.
# * The presence state of device 2.
# * The expected user presence state after both devices have synced.
# * The expected user presence state after device 1 has idled.
# * The expected user presence state after device 2 has idled.
# * True to use workers, False a monolith.
[
(*cases, workers)
for workers in (False, True)
for cases in [
# If both devices have the same state, online should eventually idle.
# Otherwise, the state doesn't change.
(
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
),
# If the second device has a "lower" state it should fallback to it.
(
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.ONLINE,
PresenceState.OFFLINE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
# If the second device has a "higher" state it should override.
(
PresenceState.UNAVAILABLE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.OFFLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.OFFLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
]
],
name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[5] else 'monolith'}",
)
@unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
def test_set_presence_from_syncing_multi_device(
self,
dev_1_state: str,
dev_2_state: str,
expected_state_1: str,
expected_state_2: str,
expected_state_3: str,
test_with_workers: bool,
) -> None:
"""
Test the behaviour of multiple devices syncing at the same time.
Roughly the user's presence state should be set to the "highest" priority
of all the devices. When a device then goes offline its state should be
discarded and the next highest should win.
Note that these tests use the idle timer (and don't close the syncs), it
is unlikely that a *single* sync would last this long, but is close enough
to continually syncing with that current state.
"""
user_id = f"@test:{self.hs.config.server.server_name}"
# By default, we call /sync against the main process.
worker_presence_handler = self.presence_handler
if test_with_workers:
# Create a worker and use it to handle /sync traffic instead.
# This is used to test that presence changes get replicated from workers
# to the main process correctly.
worker_to_sync_against = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
)
worker_presence_handler = worker_to_sync_against.get_presence_handler()
# 1. Sync with the first device.
self.get_success(
worker_presence_handler.user_syncing(
user_id,
"dev-1",
affect_presence=dev_1_state != PresenceState.OFFLINE,
presence_state=dev_1_state,
),
by=0.01,
)
# 2. Wait half the idle timer.
self.reactor.advance(IDLE_TIMER / 1000 / 2)
self.reactor.pump([0.1])
# 3. Sync with the second device.
self.get_success(
worker_presence_handler.user_syncing(
user_id,
"dev-2",
affect_presence=dev_2_state != PresenceState.OFFLINE,
presence_state=dev_2_state,
),
by=0.01,
)
# 4. Assert the expected presence state.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_1)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_1)
# When testing with workers, make another random sync (with any *different*
# user) to keep the process information from expiring.
#
# This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER.
if test_with_workers:
with self.get_success(
worker_presence_handler.user_syncing(
f"@other-user:{self.hs.config.server.server_name}",
"dev-3",
affect_presence=True,
presence_state=PresenceState.ONLINE,
),
by=0.01,
):
pass
# 5. Advance such that the first device should be discarded (the idle timer),
# then pump so _handle_timeouts function to called.
self.reactor.advance(IDLE_TIMER / 1000 / 2)
self.reactor.pump([0.01])
# 6. Assert the expected presence state.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_2)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_2)
# 7. Advance such that the second device should be discarded (half the idle timer),
# then pump so _handle_timeouts function to called.
self.reactor.advance(IDLE_TIMER / 1000 / 2)
self.reactor.pump([0.1])
# 8. The devices are still "syncing" (the sync context managers were never
# closed), so might idle.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_3)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_3)
@parameterized.expand(
# A list of tuples of 4 strings:
#
# * The presence state of device 1.
# * The presence state of device 2.
# * The expected user presence state after both devices have synced.
# * The expected user presence state after device 1 has stopped syncing.
# * True to use workers, False a monolith.
[
(*cases, workers)
for workers in (False, True)
for cases in [
# If both devices have the same state, nothing exciting should happen.
(
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
),
(
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
),
# If the second device has a "lower" state it should fallback to it.
(
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.ONLINE,
PresenceState.OFFLINE,
PresenceState.ONLINE,
PresenceState.OFFLINE,
),
(
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
),
# If the second device has a "higher" state it should override.
(
PresenceState.UNAVAILABLE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
),
(
PresenceState.OFFLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
),
(
PresenceState.OFFLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
]
],
name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[4] else 'monolith'}",
)
@unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
def test_set_presence_from_non_syncing_multi_device(
self,
dev_1_state: str,
dev_2_state: str,
expected_state_1: str,
expected_state_2: str,
test_with_workers: bool,
) -> None:
"""
Test the behaviour of multiple devices syncing at the same time.
Roughly the user's presence state should be set to the "highest" priority
of all the devices. When a device then goes offline its state should be
discarded and the next highest should win.
Note that these tests use the idle timer (and don't close the syncs), it
is unlikely that a *single* sync would last this long, but is close enough
to continually syncing with that current state.
"""
user_id = f"@test:{self.hs.config.server.server_name}"
# By default, we call /sync against the main process.
worker_presence_handler = self.presence_handler
if test_with_workers:
# Create a worker and use it to handle /sync traffic instead.
# This is used to test that presence changes get replicated from workers
# to the main process correctly.
worker_to_sync_against = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
)
worker_presence_handler = worker_to_sync_against.get_presence_handler()
# 1. Sync with the first device.
sync_1 = self.get_success(
worker_presence_handler.user_syncing(
user_id,
"dev-1",
affect_presence=dev_1_state != PresenceState.OFFLINE,
presence_state=dev_1_state,
),
by=0.1,
)
# 2. Sync with the second device.
sync_2 = self.get_success(
worker_presence_handler.user_syncing(
user_id,
"dev-2",
affect_presence=dev_2_state != PresenceState.OFFLINE,
presence_state=dev_2_state,
),
by=0.1,
)
# 3. Assert the expected presence state.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_1)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_1)
# 4. Disconnect the first device.
with sync_1:
pass
# 5. Advance such that the first device should be discarded (the sync timeout),
# then pump so _handle_timeouts function to called.
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
self.reactor.pump([5])
# 6. Assert the expected presence state.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_2)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_2)
# 7. Disconnect the second device.
with sync_2:
pass
# 8. Advance such that the second device should be discarded (the sync timeout),
# then pump so _handle_timeouts function to called.
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
self.reactor.pump([5])
# 9. There are no more devices, should be offline.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, PresenceState.OFFLINE)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, PresenceState.OFFLINE)
def test_set_presence_from_syncing_keeps_status(self) -> None: def test_set_presence_from_syncing_keeps_status(self) -> None:
"""Test that presence set by syncing retains status message""" """Test that presence set by syncing retains status message"""
status_msg = "I'm here!" status_msg = "I'm here!"