Move event stream handling out of slave store. (#7491)
This allows us to have the logic on both master and workers, which is necessary to move event persistence off master. We also combine the instantiation of ID generators from DataStore and slave stores to the base worker stores. This allows us to select which process writes events independently of the master/worker splits.
This commit is contained in:
parent
5355421295
commit
1f36ff69e8
|
@ -0,0 +1 @@
|
|||
Move event stream handling out of slave store.
|
|
@ -15,11 +15,6 @@
|
|||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.replication.tcp.streams.events import (
|
||||
EventsStreamCurrentStateRow,
|
||||
EventsStreamEventRow,
|
||||
)
|
||||
from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore
|
||||
from synapse.storage.data_stores.main.event_push_actions import (
|
||||
EventPushActionsWorkerStore,
|
||||
|
@ -35,7 +30,6 @@ from synapse.storage.database import Database
|
|||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -62,11 +56,6 @@ class SlavedEventStore(
|
|||
BaseSlavedStore,
|
||||
):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
|
||||
self._backfill_id_gen = SlavedIdTracker(
|
||||
db_conn, "events", "stream_ordering", step=-1
|
||||
)
|
||||
|
||||
super(SlavedEventStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
events_max = self._stream_id_gen.get_current_token()
|
||||
|
@ -92,81 +81,3 @@ class SlavedEventStore(
|
|||
|
||||
def get_room_min_stream_ordering(self):
|
||||
return self._backfill_id_gen.get_current_token()
|
||||
|
||||
def process_replication_rows(self, stream_name, instance_name, token, rows):
|
||||
if stream_name == "events":
|
||||
self._stream_id_gen.advance(token)
|
||||
for row in rows:
|
||||
self._process_event_stream_row(token, row)
|
||||
elif stream_name == "backfill":
|
||||
self._backfill_id_gen.advance(-token)
|
||||
for row in rows:
|
||||
self.invalidate_caches_for_event(
|
||||
-token,
|
||||
row.event_id,
|
||||
row.room_id,
|
||||
row.type,
|
||||
row.state_key,
|
||||
row.redacts,
|
||||
row.relates_to,
|
||||
backfilled=True,
|
||||
)
|
||||
return super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
def _process_event_stream_row(self, token, row):
|
||||
data = row.data
|
||||
|
||||
if row.type == EventsStreamEventRow.TypeId:
|
||||
self.invalidate_caches_for_event(
|
||||
token,
|
||||
data.event_id,
|
||||
data.room_id,
|
||||
data.type,
|
||||
data.state_key,
|
||||
data.redacts,
|
||||
data.relates_to,
|
||||
backfilled=False,
|
||||
)
|
||||
elif row.type == EventsStreamCurrentStateRow.TypeId:
|
||||
self._curr_state_delta_stream_cache.entity_has_changed(
|
||||
row.data.room_id, token
|
||||
)
|
||||
|
||||
if data.type == EventTypes.Member:
|
||||
self.get_rooms_for_user_with_stream_ordering.invalidate(
|
||||
(data.state_key,)
|
||||
)
|
||||
else:
|
||||
raise Exception("Unknown events stream row type %s" % (row.type,))
|
||||
|
||||
def invalidate_caches_for_event(
|
||||
self,
|
||||
stream_ordering,
|
||||
event_id,
|
||||
room_id,
|
||||
etype,
|
||||
state_key,
|
||||
redacts,
|
||||
relates_to,
|
||||
backfilled,
|
||||
):
|
||||
self._invalidate_get_event_cache(event_id)
|
||||
|
||||
self.get_latest_event_ids_in_room.invalidate((room_id,))
|
||||
|
||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
|
||||
|
||||
if not backfilled:
|
||||
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
|
||||
|
||||
if redacts:
|
||||
self._invalidate_get_event_cache(redacts)
|
||||
|
||||
if etype == EventTypes.Member:
|
||||
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
|
||||
self.get_invited_rooms_for_local_user.invalidate((state_key,))
|
||||
|
||||
if relates_to:
|
||||
self.get_relations_for_event.invalidate_many((relates_to,))
|
||||
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
|
||||
self.get_applicable_edit.invalidate((relates_to,))
|
||||
|
|
|
@ -15,19 +15,11 @@
|
|||
# limitations under the License.
|
||||
|
||||
from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
|
||||
from synapse.storage.database import Database
|
||||
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
from .events import SlavedEventStore
|
||||
|
||||
|
||||
class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
self._push_rules_stream_id_gen = SlavedIdTracker(
|
||||
db_conn, "push_rules_stream", "stream_id"
|
||||
)
|
||||
super(SlavedPushRuleStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
def get_push_rules_stream_token(self):
|
||||
return (
|
||||
self._push_rules_stream_id_gen.get_current_token(),
|
||||
|
|
|
@ -24,7 +24,6 @@ from synapse.config.homeserver import HomeServerConfig
|
|||
from synapse.storage.database import Database
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import (
|
||||
ChainedIdGenerator,
|
||||
IdGenerator,
|
||||
MultiWriterIdGenerator,
|
||||
StreamIdGenerator,
|
||||
|
@ -125,19 +124,6 @@ class DataStore(
|
|||
self._clock = hs.get_clock()
|
||||
self.database_engine = database.engine
|
||||
|
||||
self._stream_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
extra_tables=[("local_invites", "stream_id")],
|
||||
)
|
||||
self._backfill_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
step=-1,
|
||||
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||
)
|
||||
self._presence_id_gen = StreamIdGenerator(
|
||||
db_conn, "presence_stream", "stream_id"
|
||||
)
|
||||
|
@ -164,9 +150,6 @@ class DataStore(
|
|||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
||||
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
|
||||
self._push_rules_stream_id_gen = ChainedIdGenerator(
|
||||
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
|
||||
)
|
||||
self._pushers_id_gen = StreamIdGenerator(
|
||||
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
|
||||
)
|
||||
|
|
|
@ -16,8 +16,13 @@
|
|||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import Any, Iterable, Optional
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.replication.tcp.streams.events import (
|
||||
EventsStreamCurrentStateRow,
|
||||
EventsStreamEventRow,
|
||||
)
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import Database
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
|
@ -66,7 +71,22 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
def process_replication_rows(self, stream_name, instance_name, token, rows):
|
||||
if stream_name == "caches":
|
||||
if stream_name == "events":
|
||||
for row in rows:
|
||||
self._process_event_stream_row(token, row)
|
||||
elif stream_name == "backfill":
|
||||
for row in rows:
|
||||
self._invalidate_caches_for_event(
|
||||
-token,
|
||||
row.event_id,
|
||||
row.room_id,
|
||||
row.type,
|
||||
row.state_key,
|
||||
row.redacts,
|
||||
row.relates_to,
|
||||
backfilled=True,
|
||||
)
|
||||
elif stream_name == "caches":
|
||||
if self._cache_id_gen:
|
||||
self._cache_id_gen.advance(instance_name, token)
|
||||
|
||||
|
@ -85,6 +105,84 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
|
||||
super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
def _process_event_stream_row(self, token, row):
|
||||
data = row.data
|
||||
|
||||
if row.type == EventsStreamEventRow.TypeId:
|
||||
self._invalidate_caches_for_event(
|
||||
token,
|
||||
data.event_id,
|
||||
data.room_id,
|
||||
data.type,
|
||||
data.state_key,
|
||||
data.redacts,
|
||||
data.relates_to,
|
||||
backfilled=False,
|
||||
)
|
||||
elif row.type == EventsStreamCurrentStateRow.TypeId:
|
||||
self._curr_state_delta_stream_cache.entity_has_changed(
|
||||
row.data.room_id, token
|
||||
)
|
||||
|
||||
if data.type == EventTypes.Member:
|
||||
self.get_rooms_for_user_with_stream_ordering.invalidate(
|
||||
(data.state_key,)
|
||||
)
|
||||
else:
|
||||
raise Exception("Unknown events stream row type %s" % (row.type,))
|
||||
|
||||
def _invalidate_caches_for_event(
|
||||
self,
|
||||
stream_ordering,
|
||||
event_id,
|
||||
room_id,
|
||||
etype,
|
||||
state_key,
|
||||
redacts,
|
||||
relates_to,
|
||||
backfilled,
|
||||
):
|
||||
self._invalidate_get_event_cache(event_id)
|
||||
|
||||
self.get_latest_event_ids_in_room.invalidate((room_id,))
|
||||
|
||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
|
||||
|
||||
if not backfilled:
|
||||
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
|
||||
|
||||
if redacts:
|
||||
self._invalidate_get_event_cache(redacts)
|
||||
|
||||
if etype == EventTypes.Member:
|
||||
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
|
||||
self.get_invited_rooms_for_local_user.invalidate((state_key,))
|
||||
|
||||
if relates_to:
|
||||
self.get_relations_for_event.invalidate_many((relates_to,))
|
||||
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
|
||||
self.get_applicable_edit.invalidate((relates_to,))
|
||||
|
||||
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
|
||||
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||
will know to invalidate their caches.
|
||||
|
||||
This should only be used to invalidate caches where slaves won't
|
||||
otherwise know from other replication streams that the cache should
|
||||
be invalidated.
|
||||
"""
|
||||
cache_func = getattr(self, cache_name, None)
|
||||
if not cache_func:
|
||||
return
|
||||
|
||||
cache_func.invalidate(keys)
|
||||
await self.db.runInteraction(
|
||||
"invalidate_cache_and_stream",
|
||||
self._send_invalidation_to_replication,
|
||||
cache_func.__name__,
|
||||
keys,
|
||||
)
|
||||
|
||||
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
|
||||
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||
will know to invalidate their caches.
|
||||
|
|
|
@ -37,8 +37,10 @@ from synapse.events import make_event_from_dict
|
|||
from synapse.events.utils import prune_event
|
||||
from synapse.logging.context import PreserveLoggingContext, current_context
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||
from synapse.storage.database import Database
|
||||
from synapse.storage.util.id_generators import StreamIdGenerator
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
@ -74,6 +76,31 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
def __init__(self, database: Database, db_conn, hs):
|
||||
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
if hs.config.worker_app is None:
|
||||
# We are the process in charge of generating stream ids for events,
|
||||
# so instantiate ID generators based on the database
|
||||
self._stream_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
extra_tables=[("local_invites", "stream_id")],
|
||||
)
|
||||
self._backfill_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
step=-1,
|
||||
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||
)
|
||||
else:
|
||||
# Another process is in charge of persisting events and generating
|
||||
# stream IDs: rely on the replication streams to let us know which
|
||||
# IDs we can process.
|
||||
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
|
||||
self._backfill_id_gen = SlavedIdTracker(
|
||||
db_conn, "events", "stream_ordering", step=-1
|
||||
)
|
||||
|
||||
self._get_event_cache = Cache(
|
||||
"*getEvent*",
|
||||
keylen=3,
|
||||
|
@ -85,6 +112,14 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
self._event_fetch_list = []
|
||||
self._event_fetch_ongoing = 0
|
||||
|
||||
def process_replication_rows(self, stream_name, instance_name, token, rows):
|
||||
if stream_name == "events":
|
||||
self._stream_id_gen.advance(token)
|
||||
elif stream_name == "backfill":
|
||||
self._backfill_id_gen.advance(-token)
|
||||
|
||||
super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
def get_received_ts(self, event_id):
|
||||
"""Get received_ts (when it was persisted) for the event.
|
||||
|
||||
|
|
|
@ -16,19 +16,23 @@
|
|||
|
||||
import abc
|
||||
import logging
|
||||
from typing import Union
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.push.baserules import list_with_base_rules
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore
|
||||
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.data_stores.main.pusher import PusherWorkerStore
|
||||
from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore
|
||||
from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.database import Database
|
||||
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
|
||||
from synapse.storage.util.id_generators import ChainedIdGenerator
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
|
@ -64,6 +68,7 @@ class PushRulesWorkerStore(
|
|||
ReceiptsWorkerStore,
|
||||
PusherWorkerStore,
|
||||
RoomMemberWorkerStore,
|
||||
EventsWorkerStore,
|
||||
SQLBaseStore,
|
||||
):
|
||||
"""This is an abstract base class where subclasses must implement
|
||||
|
@ -77,6 +82,15 @@ class PushRulesWorkerStore(
|
|||
def __init__(self, database: Database, db_conn, hs):
|
||||
super(PushRulesWorkerStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
if hs.config.worker.worker_app is None:
|
||||
self._push_rules_stream_id_gen = ChainedIdGenerator(
|
||||
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
|
||||
) # type: Union[ChainedIdGenerator, SlavedIdTracker]
|
||||
else:
|
||||
self._push_rules_stream_id_gen = SlavedIdTracker(
|
||||
db_conn, "push_rules_stream", "stream_id"
|
||||
)
|
||||
|
||||
push_rules_prefill, push_rules_id = self.db.get_cache_dict(
|
||||
db_conn,
|
||||
"push_rules_stream",
|
||||
|
|
|
@ -166,6 +166,7 @@ class ChainedIdGenerator(object):
|
|||
|
||||
def __init__(self, chained_generator, db_conn, table, column):
|
||||
self.chained_generator = chained_generator
|
||||
self._table = table
|
||||
self._lock = threading.Lock()
|
||||
self._current_max = _load_current_id(db_conn, table, column)
|
||||
self._unfinished_ids = deque() # type: Deque[Tuple[int, int]]
|
||||
|
@ -204,6 +205,16 @@ class ChainedIdGenerator(object):
|
|||
|
||||
return self._current_max, self.chained_generator.get_current_token()
|
||||
|
||||
def advance(self, token: int):
|
||||
"""Stub implementation for advancing the token when receiving updates
|
||||
over replication; raises an exception as this instance should be the
|
||||
only source of updates.
|
||||
"""
|
||||
|
||||
raise Exception(
|
||||
"Attempted to advance token on source for table %r", self._table
|
||||
)
|
||||
|
||||
|
||||
class MultiWriterIdGenerator:
|
||||
"""An ID generator that tracks a stream that can have multiple writers.
|
||||
|
|
Loading…
Reference in New Issue