Track if current_state_events.membership is up to date
This commit is contained in:
parent
c618a5d348
commit
059d8c1a4e
|
@ -24,6 +24,8 @@ from canonicaljson import json
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
from synapse.storage._base import LoggingTransaction
|
||||||
from synapse.storage.events_worker import EventsWorkerStore
|
from synapse.storage.events_worker import EventsWorkerStore
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
|
@ -57,6 +59,49 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberWorkerStore(EventsWorkerStore):
|
class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
|
def __init__(self, db_conn, hs):
|
||||||
|
super(RoomMemberWorkerStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
|
# Is the current_state_events.membership up to date? Or is the
|
||||||
|
# background update still running?
|
||||||
|
self._current_state_events_membership_up_to_date = False
|
||||||
|
|
||||||
|
txn = LoggingTransaction(
|
||||||
|
db_conn.cursor(),
|
||||||
|
name="_check_safe_current_state_events_membership_updated",
|
||||||
|
database_engine=self.database_engine,
|
||||||
|
after_callbacks=[],
|
||||||
|
exception_callbacks=[],
|
||||||
|
)
|
||||||
|
self._check_safe_current_state_events_membership_updated_txn(txn)
|
||||||
|
txn.close()
|
||||||
|
|
||||||
|
def _check_safe_current_state_events_membership_updated_txn(self, txn):
|
||||||
|
"""Checks if it is safe to assume the new current_state_events
|
||||||
|
membership column is up to date
|
||||||
|
"""
|
||||||
|
|
||||||
|
pending_update = self._simple_select_one_txn(
|
||||||
|
txn,
|
||||||
|
table="background_updates",
|
||||||
|
keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
|
||||||
|
retcols=["update_name"],
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._current_state_events_membership_up_to_date = not pending_update
|
||||||
|
|
||||||
|
# If the update is still running, reschedule to run.
|
||||||
|
if pending_update:
|
||||||
|
self._clock.call_later(
|
||||||
|
15.0,
|
||||||
|
run_as_background_process,
|
||||||
|
"_check_safe_current_state_events_membership_updated",
|
||||||
|
self.runInteraction,
|
||||||
|
"_check_safe_current_state_events_membership_updated",
|
||||||
|
self._check_safe_current_state_events_membership_updated_txn,
|
||||||
|
)
|
||||||
|
|
||||||
@cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
|
@cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
|
||||||
def get_hosts_in_room(self, room_id, cache_context):
|
def get_hosts_in_room(self, room_id, cache_context):
|
||||||
"""Returns the set of all hosts currently in the room
|
"""Returns the set of all hosts currently in the room
|
||||||
|
|
Loading…
Reference in New Issue