Clean-up presence code (#16092)
Misc. clean-ups to: * Use keyword arguments. * Return early (reducing indentation) of some functions. * Removing duplicated / unused code. * Use wrap_as_background_process.
This commit is contained in:
parent
dac97642e4
commit
efd4d06d76
|
@ -0,0 +1 @@
|
||||||
|
Clean-up the presence code.
|
|
@ -30,7 +30,6 @@ from types import TracebackType
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Awaitable,
|
|
||||||
Callable,
|
Callable,
|
||||||
Collection,
|
Collection,
|
||||||
Dict,
|
Dict,
|
||||||
|
@ -54,7 +53,10 @@ from synapse.appservice import ApplicationService
|
||||||
from synapse.events.presence_router import PresenceRouter
|
from synapse.events.presence_router import PresenceRouter
|
||||||
from synapse.logging.context import run_in_background
|
from synapse.logging.context import run_in_background
|
||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import (
|
||||||
|
run_as_background_process,
|
||||||
|
wrap_as_background_process,
|
||||||
|
)
|
||||||
from synapse.replication.http.presence import (
|
from synapse.replication.http.presence import (
|
||||||
ReplicationBumpPresenceActiveTime,
|
ReplicationBumpPresenceActiveTime,
|
||||||
ReplicationPresenceSetState,
|
ReplicationPresenceSetState,
|
||||||
|
@ -141,6 +143,8 @@ class BasePresenceHandler(abc.ABC):
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
|
||||||
|
self._presence_enabled = hs.config.server.use_presence
|
||||||
|
|
||||||
self._federation = None
|
self._federation = None
|
||||||
if hs.should_send_federation():
|
if hs.should_send_federation():
|
||||||
self._federation = hs.get_federation_sender()
|
self._federation = hs.get_federation_sender()
|
||||||
|
@ -149,6 +153,15 @@ class BasePresenceHandler(abc.ABC):
|
||||||
|
|
||||||
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
||||||
|
|
||||||
|
self.VALID_PRESENCE: Tuple[str, ...] = (
|
||||||
|
PresenceState.ONLINE,
|
||||||
|
PresenceState.UNAVAILABLE,
|
||||||
|
PresenceState.OFFLINE,
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._busy_presence_enabled:
|
||||||
|
self.VALID_PRESENCE += (PresenceState.BUSY,)
|
||||||
|
|
||||||
active_presence = self.store.take_presence_startup_info()
|
active_presence = self.store.take_presence_startup_info()
|
||||||
self.user_to_current_state = {state.user_id: state for state in active_presence}
|
self.user_to_current_state = {state.user_id: state for state in active_presence}
|
||||||
|
|
||||||
|
@ -395,8 +408,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
|
|
||||||
self._presence_writer_instance = hs.config.worker.writers.presence[0]
|
self._presence_writer_instance = hs.config.worker.writers.presence[0]
|
||||||
|
|
||||||
self._presence_enabled = hs.config.server.use_presence
|
|
||||||
|
|
||||||
# Route presence EDUs to the right worker
|
# Route presence EDUs to the right worker
|
||||||
hs.get_federation_registry().register_instances_for_edu(
|
hs.get_federation_registry().register_instances_for_edu(
|
||||||
EduTypes.PRESENCE,
|
EduTypes.PRESENCE,
|
||||||
|
@ -421,8 +432,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
|
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
|
||||||
)
|
)
|
||||||
|
|
||||||
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
|
||||||
|
|
||||||
hs.get_reactor().addSystemEventTrigger(
|
hs.get_reactor().addSystemEventTrigger(
|
||||||
"before",
|
"before",
|
||||||
"shutdown",
|
"shutdown",
|
||||||
|
@ -490,7 +499,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
# what the spec wants: see comment in the BasePresenceHandler version
|
# what the spec wants: see comment in the BasePresenceHandler version
|
||||||
# of this function.
|
# of this function.
|
||||||
await self.set_state(
|
await self.set_state(
|
||||||
UserID.from_string(user_id), {"presence": presence_state}, True
|
UserID.from_string(user_id),
|
||||||
|
{"presence": presence_state},
|
||||||
|
ignore_status_msg=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
|
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
|
||||||
|
@ -601,22 +612,13 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
"""
|
"""
|
||||||
presence = state["presence"]
|
presence = state["presence"]
|
||||||
|
|
||||||
valid_presence = (
|
if presence not in self.VALID_PRESENCE:
|
||||||
PresenceState.ONLINE,
|
|
||||||
PresenceState.UNAVAILABLE,
|
|
||||||
PresenceState.OFFLINE,
|
|
||||||
PresenceState.BUSY,
|
|
||||||
)
|
|
||||||
|
|
||||||
if presence not in valid_presence or (
|
|
||||||
presence == PresenceState.BUSY and not self._busy_presence_enabled
|
|
||||||
):
|
|
||||||
raise SynapseError(400, "Invalid presence state")
|
raise SynapseError(400, "Invalid presence state")
|
||||||
|
|
||||||
user_id = target_user.to_string()
|
user_id = target_user.to_string()
|
||||||
|
|
||||||
# If presence is disabled, no-op
|
# If presence is disabled, no-op
|
||||||
if not self.hs.config.server.use_presence:
|
if not self._presence_enabled:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Proxy request to instance that writes presence
|
# Proxy request to instance that writes presence
|
||||||
|
@ -633,7 +635,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
with the app.
|
with the app.
|
||||||
"""
|
"""
|
||||||
# If presence is disabled, no-op
|
# If presence is disabled, no-op
|
||||||
if not self.hs.config.server.use_presence:
|
if not self._presence_enabled:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Proxy request to instance that writes presence
|
# Proxy request to instance that writes presence
|
||||||
|
@ -649,7 +651,6 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.wheel_timer: WheelTimer[str] = WheelTimer()
|
self.wheel_timer: WheelTimer[str] = WheelTimer()
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self._presence_enabled = hs.config.server.use_presence
|
|
||||||
|
|
||||||
federation_registry = hs.get_federation_registry()
|
federation_registry = hs.get_federation_registry()
|
||||||
|
|
||||||
|
@ -700,8 +701,6 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
self._on_shutdown,
|
self._on_shutdown,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._next_serial = 1
|
|
||||||
|
|
||||||
# Keeps track of the number of *ongoing* syncs on this process. While
|
# Keeps track of the number of *ongoing* syncs on this process. While
|
||||||
# this is non zero a user will never go offline.
|
# this is non zero a user will never go offline.
|
||||||
self.user_to_num_current_syncs: Dict[str, int] = {}
|
self.user_to_num_current_syncs: Dict[str, int] = {}
|
||||||
|
@ -723,21 +722,16 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
# Start a LoopingCall in 30s that fires every 5s.
|
# Start a LoopingCall in 30s that fires every 5s.
|
||||||
# The initial delay is to allow disconnected clients a chance to
|
# The initial delay is to allow disconnected clients a chance to
|
||||||
# reconnect before we treat them as offline.
|
# reconnect before we treat them as offline.
|
||||||
def run_timeout_handler() -> Awaitable[None]:
|
|
||||||
return run_as_background_process(
|
|
||||||
"handle_presence_timeouts", self._handle_timeouts
|
|
||||||
)
|
|
||||||
|
|
||||||
self.clock.call_later(
|
self.clock.call_later(
|
||||||
30, self.clock.looping_call, run_timeout_handler, 5000
|
30, self.clock.looping_call, self._handle_timeouts, 5000
|
||||||
)
|
)
|
||||||
|
|
||||||
def run_persister() -> Awaitable[None]:
|
self.clock.call_later(
|
||||||
return run_as_background_process(
|
60,
|
||||||
"persist_presence_changes", self._persist_unpersisted_changes
|
self.clock.looping_call,
|
||||||
)
|
self._persist_unpersisted_changes,
|
||||||
|
60 * 1000,
|
||||||
self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
|
)
|
||||||
|
|
||||||
LaterGauge(
|
LaterGauge(
|
||||||
"synapse_handlers_presence_wheel_timer_size",
|
"synapse_handlers_presence_wheel_timer_size",
|
||||||
|
@ -783,6 +777,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
)
|
)
|
||||||
logger.info("Finished _on_shutdown")
|
logger.info("Finished _on_shutdown")
|
||||||
|
|
||||||
|
@wrap_as_background_process("persist_presence_changes")
|
||||||
async def _persist_unpersisted_changes(self) -> None:
|
async def _persist_unpersisted_changes(self) -> None:
|
||||||
"""We periodically persist the unpersisted changes, as otherwise they
|
"""We periodically persist the unpersisted changes, as otherwise they
|
||||||
may stack up and slow down shutdown times.
|
may stack up and slow down shutdown times.
|
||||||
|
@ -898,6 +893,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
states, [destination]
|
states, [destination]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@wrap_as_background_process("handle_presence_timeouts")
|
||||||
async def _handle_timeouts(self) -> None:
|
async def _handle_timeouts(self) -> None:
|
||||||
"""Checks the presence of users that have timed out and updates as
|
"""Checks the presence of users that have timed out and updates as
|
||||||
appropriate.
|
appropriate.
|
||||||
|
@ -955,7 +951,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
with the app.
|
with the app.
|
||||||
"""
|
"""
|
||||||
# If presence is disabled, no-op
|
# If presence is disabled, no-op
|
||||||
if not self.hs.config.server.use_presence:
|
if not self._presence_enabled:
|
||||||
return
|
return
|
||||||
|
|
||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
|
@ -990,56 +986,51 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
client that is being used by a user.
|
client that is being used by a user.
|
||||||
presence_state: The presence state indicated in the sync request
|
presence_state: The presence state indicated in the sync request
|
||||||
"""
|
"""
|
||||||
# Override if it should affect the user's presence, if presence is
|
if not affect_presence or not self._presence_enabled:
|
||||||
# disabled.
|
return _NullContextManager()
|
||||||
if not self.hs.config.server.use_presence:
|
|
||||||
affect_presence = False
|
|
||||||
|
|
||||||
if affect_presence:
|
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
|
||||||
|
|
||||||
|
prev_state = await self.current_state_for_user(user_id)
|
||||||
|
|
||||||
|
# If they're busy then they don't stop being busy just by syncing,
|
||||||
|
# so just update the last sync time.
|
||||||
|
if prev_state.state != PresenceState.BUSY:
|
||||||
|
# XXX: We set_state separately here and just update the last_active_ts above
|
||||||
|
# This keeps the logic as similar as possible between the worker and single
|
||||||
|
# process modes. Using set_state will actually cause last_active_ts to be
|
||||||
|
# updated always, which is not what the spec calls for, but synapse has done
|
||||||
|
# this for... forever, I think.
|
||||||
|
await self.set_state(
|
||||||
|
UserID.from_string(user_id),
|
||||||
|
{"presence": presence_state},
|
||||||
|
ignore_status_msg=True,
|
||||||
|
)
|
||||||
|
# Retrieve the new state for the logic below. This should come from the
|
||||||
|
# in-memory cache.
|
||||||
prev_state = await self.current_state_for_user(user_id)
|
prev_state = await self.current_state_for_user(user_id)
|
||||||
|
|
||||||
# If they're busy then they don't stop being busy just by syncing,
|
# To keep the single process behaviour consistent with worker mode, run the
|
||||||
# so just update the last sync time.
|
# same logic as `update_external_syncs_row`, even though it looks weird.
|
||||||
if prev_state.state != PresenceState.BUSY:
|
if prev_state.state == PresenceState.OFFLINE:
|
||||||
# XXX: We set_state separately here and just update the last_active_ts above
|
await self._update_states(
|
||||||
# This keeps the logic as similar as possible between the worker and single
|
[
|
||||||
# process modes. Using set_state will actually cause last_active_ts to be
|
prev_state.copy_and_replace(
|
||||||
# updated always, which is not what the spec calls for, but synapse has done
|
state=PresenceState.ONLINE,
|
||||||
# this for... forever, I think.
|
last_active_ts=self.clock.time_msec(),
|
||||||
await self.set_state(
|
last_user_sync_ts=self.clock.time_msec(),
|
||||||
UserID.from_string(user_id), {"presence": presence_state}, True
|
)
|
||||||
)
|
]
|
||||||
# Retrieve the new state for the logic below. This should come from the
|
)
|
||||||
# in-memory cache.
|
# otherwise, set the new presence state & update the last sync time,
|
||||||
prev_state = await self.current_state_for_user(user_id)
|
# but don't update last_active_ts as this isn't an indication that
|
||||||
|
# they've been active (even though it's probably been updated by
|
||||||
# To keep the single process behaviour consistent with worker mode, run the
|
# set_state above)
|
||||||
# same logic as `update_external_syncs_row`, even though it looks weird.
|
else:
|
||||||
if prev_state.state == PresenceState.OFFLINE:
|
await self._update_states(
|
||||||
await self._update_states(
|
[prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
|
||||||
[
|
)
|
||||||
prev_state.copy_and_replace(
|
|
||||||
state=PresenceState.ONLINE,
|
|
||||||
last_active_ts=self.clock.time_msec(),
|
|
||||||
last_user_sync_ts=self.clock.time_msec(),
|
|
||||||
)
|
|
||||||
]
|
|
||||||
)
|
|
||||||
# otherwise, set the new presence state & update the last sync time,
|
|
||||||
# but don't update last_active_ts as this isn't an indication that
|
|
||||||
# they've been active (even though it's probably been updated by
|
|
||||||
# set_state above)
|
|
||||||
else:
|
|
||||||
await self._update_states(
|
|
||||||
[
|
|
||||||
prev_state.copy_and_replace(
|
|
||||||
last_user_sync_ts=self.clock.time_msec()
|
|
||||||
)
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _end() -> None:
|
async def _end() -> None:
|
||||||
try:
|
try:
|
||||||
|
@ -1061,8 +1052,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
if affect_presence:
|
run_in_background(_end)
|
||||||
run_in_background(_end)
|
|
||||||
|
|
||||||
return _user_syncing()
|
return _user_syncing()
|
||||||
|
|
||||||
|
@ -1229,20 +1219,11 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
status_msg = state.get("status_msg", None)
|
status_msg = state.get("status_msg", None)
|
||||||
presence = state["presence"]
|
presence = state["presence"]
|
||||||
|
|
||||||
valid_presence = (
|
if presence not in self.VALID_PRESENCE:
|
||||||
PresenceState.ONLINE,
|
|
||||||
PresenceState.UNAVAILABLE,
|
|
||||||
PresenceState.OFFLINE,
|
|
||||||
PresenceState.BUSY,
|
|
||||||
)
|
|
||||||
|
|
||||||
if presence not in valid_presence or (
|
|
||||||
presence == PresenceState.BUSY and not self._busy_presence_enabled
|
|
||||||
):
|
|
||||||
raise SynapseError(400, "Invalid presence state")
|
raise SynapseError(400, "Invalid presence state")
|
||||||
|
|
||||||
# If presence is disabled, no-op
|
# If presence is disabled, no-op
|
||||||
if not self.hs.config.server.use_presence:
|
if not self._presence_enabled:
|
||||||
return
|
return
|
||||||
|
|
||||||
user_id = target_user.to_string()
|
user_id = target_user.to_string()
|
||||||
|
|
Loading…
Reference in New Issue