Move roommember's bg updates to a dedicated store
This commit is contained in:
parent
e106a0e4db
commit
0496eafbf4
|
@ -27,6 +27,7 @@ from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage._base import LoggingTransaction
|
from synapse.storage._base import LoggingTransaction
|
||||||
|
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||||
from synapse.storage.engines import Sqlite3Engine
|
from synapse.storage.engines import Sqlite3Engine
|
||||||
from synapse.storage.events_worker import EventsWorkerStore
|
from synapse.storage.events_worker import EventsWorkerStore
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
|
@ -820,9 +821,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
return set(room_ids)
|
return set(room_ids)
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberStore(RoomMemberWorkerStore):
|
class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(RoomMemberStore, self).__init__(db_conn, hs)
|
super(RoomMemberBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||||
self.register_background_update_handler(
|
self.register_background_update_handler(
|
||||||
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
|
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
|
||||||
)
|
)
|
||||||
|
@ -838,112 +839,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||||
where_clause="forgotten = 1",
|
where_clause="forgotten = 1",
|
||||||
)
|
)
|
||||||
|
|
||||||
def _store_room_members_txn(self, txn, events, backfilled):
|
|
||||||
"""Store a room member in the database.
|
|
||||||
"""
|
|
||||||
self._simple_insert_many_txn(
|
|
||||||
txn,
|
|
||||||
table="room_memberships",
|
|
||||||
values=[
|
|
||||||
{
|
|
||||||
"event_id": event.event_id,
|
|
||||||
"user_id": event.state_key,
|
|
||||||
"sender": event.user_id,
|
|
||||||
"room_id": event.room_id,
|
|
||||||
"membership": event.membership,
|
|
||||||
"display_name": event.content.get("displayname", None),
|
|
||||||
"avatar_url": event.content.get("avatar_url", None),
|
|
||||||
}
|
|
||||||
for event in events
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
for event in events:
|
|
||||||
txn.call_after(
|
|
||||||
self._membership_stream_cache.entity_has_changed,
|
|
||||||
event.state_key,
|
|
||||||
event.internal_metadata.stream_ordering,
|
|
||||||
)
|
|
||||||
txn.call_after(
|
|
||||||
self.get_invited_rooms_for_user.invalidate, (event.state_key,)
|
|
||||||
)
|
|
||||||
|
|
||||||
# We update the local_invites table only if the event is "current",
|
|
||||||
# i.e., its something that has just happened. If the event is an
|
|
||||||
# outlier it is only current if its an "out of band membership",
|
|
||||||
# like a remote invite or a rejection of a remote invite.
|
|
||||||
is_new_state = not backfilled and (
|
|
||||||
not event.internal_metadata.is_outlier()
|
|
||||||
or event.internal_metadata.is_out_of_band_membership()
|
|
||||||
)
|
|
||||||
is_mine = self.hs.is_mine_id(event.state_key)
|
|
||||||
if is_new_state and is_mine:
|
|
||||||
if event.membership == Membership.INVITE:
|
|
||||||
self._simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="local_invites",
|
|
||||||
values={
|
|
||||||
"event_id": event.event_id,
|
|
||||||
"invitee": event.state_key,
|
|
||||||
"inviter": event.sender,
|
|
||||||
"room_id": event.room_id,
|
|
||||||
"stream_id": event.internal_metadata.stream_ordering,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
sql = (
|
|
||||||
"UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
|
|
||||||
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
|
|
||||||
" AND replaced_by is NULL"
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.execute(
|
|
||||||
sql,
|
|
||||||
(
|
|
||||||
event.internal_metadata.stream_ordering,
|
|
||||||
event.event_id,
|
|
||||||
event.room_id,
|
|
||||||
event.state_key,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def locally_reject_invite(self, user_id, room_id):
|
|
||||||
sql = (
|
|
||||||
"UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
|
|
||||||
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
|
|
||||||
" AND replaced_by is NULL"
|
|
||||||
)
|
|
||||||
|
|
||||||
def f(txn, stream_ordering):
|
|
||||||
txn.execute(sql, (stream_ordering, True, room_id, user_id))
|
|
||||||
|
|
||||||
with self._stream_id_gen.get_next() as stream_ordering:
|
|
||||||
yield self.runInteraction("locally_reject_invite", f, stream_ordering)
|
|
||||||
|
|
||||||
def forget(self, user_id, room_id):
|
|
||||||
"""Indicate that user_id wishes to discard history for room_id."""
|
|
||||||
|
|
||||||
def f(txn):
|
|
||||||
sql = (
|
|
||||||
"UPDATE"
|
|
||||||
" room_memberships"
|
|
||||||
" SET"
|
|
||||||
" forgotten = 1"
|
|
||||||
" WHERE"
|
|
||||||
" user_id = ?"
|
|
||||||
" AND"
|
|
||||||
" room_id = ?"
|
|
||||||
)
|
|
||||||
txn.execute(sql, (user_id, room_id))
|
|
||||||
|
|
||||||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
|
||||||
self._invalidate_cache_and_stream(
|
|
||||||
txn, self.get_forgotten_rooms_for_user, (user_id,)
|
|
||||||
)
|
|
||||||
|
|
||||||
return self.runInteraction("forget_membership", f)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _background_add_membership_profile(self, progress, batch_size):
|
def _background_add_membership_profile(self, progress, batch_size):
|
||||||
target_min_stream_id = progress.get(
|
target_min_stream_id = progress.get(
|
||||||
|
@ -1078,6 +973,117 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||||
return row_count
|
return row_count
|
||||||
|
|
||||||
|
|
||||||
|
class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
|
||||||
|
def __init__(self, db_conn, hs):
|
||||||
|
super(RoomMemberStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
|
def _store_room_members_txn(self, txn, events, backfilled):
|
||||||
|
"""Store a room member in the database.
|
||||||
|
"""
|
||||||
|
self._simple_insert_many_txn(
|
||||||
|
txn,
|
||||||
|
table="room_memberships",
|
||||||
|
values=[
|
||||||
|
{
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"user_id": event.state_key,
|
||||||
|
"sender": event.user_id,
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"membership": event.membership,
|
||||||
|
"display_name": event.content.get("displayname", None),
|
||||||
|
"avatar_url": event.content.get("avatar_url", None),
|
||||||
|
}
|
||||||
|
for event in events
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
for event in events:
|
||||||
|
txn.call_after(
|
||||||
|
self._membership_stream_cache.entity_has_changed,
|
||||||
|
event.state_key,
|
||||||
|
event.internal_metadata.stream_ordering,
|
||||||
|
)
|
||||||
|
txn.call_after(
|
||||||
|
self.get_invited_rooms_for_user.invalidate, (event.state_key,)
|
||||||
|
)
|
||||||
|
|
||||||
|
# We update the local_invites table only if the event is "current",
|
||||||
|
# i.e., its something that has just happened. If the event is an
|
||||||
|
# outlier it is only current if its an "out of band membership",
|
||||||
|
# like a remote invite or a rejection of a remote invite.
|
||||||
|
is_new_state = not backfilled and (
|
||||||
|
not event.internal_metadata.is_outlier()
|
||||||
|
or event.internal_metadata.is_out_of_band_membership()
|
||||||
|
)
|
||||||
|
is_mine = self.hs.is_mine_id(event.state_key)
|
||||||
|
if is_new_state and is_mine:
|
||||||
|
if event.membership == Membership.INVITE:
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="local_invites",
|
||||||
|
values={
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"invitee": event.state_key,
|
||||||
|
"inviter": event.sender,
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"stream_id": event.internal_metadata.stream_ordering,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
sql = (
|
||||||
|
"UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
|
||||||
|
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
|
||||||
|
" AND replaced_by is NULL"
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
sql,
|
||||||
|
(
|
||||||
|
event.internal_metadata.stream_ordering,
|
||||||
|
event.event_id,
|
||||||
|
event.room_id,
|
||||||
|
event.state_key,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def locally_reject_invite(self, user_id, room_id):
|
||||||
|
sql = (
|
||||||
|
"UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
|
||||||
|
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
|
||||||
|
" AND replaced_by is NULL"
|
||||||
|
)
|
||||||
|
|
||||||
|
def f(txn, stream_ordering):
|
||||||
|
txn.execute(sql, (stream_ordering, True, room_id, user_id))
|
||||||
|
|
||||||
|
with self._stream_id_gen.get_next() as stream_ordering:
|
||||||
|
yield self.runInteraction("locally_reject_invite", f, stream_ordering)
|
||||||
|
|
||||||
|
def forget(self, user_id, room_id):
|
||||||
|
"""Indicate that user_id wishes to discard history for room_id."""
|
||||||
|
|
||||||
|
def f(txn):
|
||||||
|
sql = (
|
||||||
|
"UPDATE"
|
||||||
|
" room_memberships"
|
||||||
|
" SET"
|
||||||
|
" forgotten = 1"
|
||||||
|
" WHERE"
|
||||||
|
" user_id = ?"
|
||||||
|
" AND"
|
||||||
|
" room_id = ?"
|
||||||
|
)
|
||||||
|
txn.execute(sql, (user_id, room_id))
|
||||||
|
|
||||||
|
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
||||||
|
self._invalidate_cache_and_stream(
|
||||||
|
txn, self.get_forgotten_rooms_for_user, (user_id,)
|
||||||
|
)
|
||||||
|
|
||||||
|
return self.runInteraction("forget_membership", f)
|
||||||
|
|
||||||
|
|
||||||
class _JoinedHostsCache(object):
|
class _JoinedHostsCache(object):
|
||||||
"""Cache for joined hosts in a room that is optimised to handle updates
|
"""Cache for joined hosts in a room that is optimised to handle updates
|
||||||
via state deltas.
|
via state deltas.
|
||||||
|
|
Loading…
Reference in New Issue