Merge pull request #4671 from matrix-org/erikj/state_cache_invalidation

Batch cache invalidation over replication
This commit is contained in:
Erik Johnston 2019-02-19 13:14:30 +00:00 committed by GitHub
commit c003450057
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 33 deletions

1
changelog.d/4671.misc Normal file
View File

@ -0,0 +1 @@
Improve replication performance by reducing cache invalidation traffic.

View File

@ -137,7 +137,6 @@ for each stream so that on reconneciton it can start streaming from the correct
place. Note: not all RDATA have valid tokens due to batching. See place. Note: not all RDATA have valid tokens due to batching. See
``RdataCommand`` for more details. ``RdataCommand`` for more details.
Example Example
~~~~~~~ ~~~~~~~
@ -221,3 +220,28 @@ SYNC (S, C)
See ``synapse/replication/tcp/commands.py`` for a detailed description and the See ``synapse/replication/tcp/commands.py`` for a detailed description and the
format of each command. format of each command.
Cache Invalidation Stream
~~~~~~~~~~~~~~~~~~~~~~~~~
The cache invalidation stream is used to inform workers when they need to
invalidate any of their caches in the data store. This is done by streaming all
cache invalidations done on master down to the workers, assuming that any caches
on the workers also exist on the master.
Each individual cache invalidation results in a row being sent down replication,
which includes the cache name (the name of the function) and they key to
invalidate. For example::
> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
However, there are times when a number of caches need to be invalidated at the
same time with the same key. To reduce traffic we batch those invalidations into
a single poke by defining a special cache name that workers understand to mean
to expand to invalidate the correct caches.
Currently the special cache names are declared in ``synapse/storage/_base.py``
and are:
1. ``cs_cache_fake`` ─ invalidates caches that depend on the current state

View File

@ -17,7 +17,7 @@ import logging
import six import six
from synapse.storage._base import SQLBaseStore from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
from ._slaved_id_tracker import SlavedIdTracker from ._slaved_id_tracker import SlavedIdTracker
@ -54,6 +54,11 @@ class BaseSlavedStore(SQLBaseStore):
if stream_name == "caches": if stream_name == "caches":
self._cache_id_gen.advance(token) self._cache_id_gen.advance(token)
for row in rows: for row in rows:
if row.cache_func == _CURRENT_STATE_CACHE_NAME:
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
try: try:
getattr(self, row.cache_func).invalidate(tuple(row.keys)) getattr(self, row.cache_func).invalidate(tuple(row.keys))
except AttributeError: except AttributeError:

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import itertools
import logging import logging
import sys import sys
import threading import threading
@ -28,6 +29,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id
from synapse.util.caches.descriptors import Cache from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.stringutils import exception_to_unicode from synapse.util.stringutils import exception_to_unicode
@ -64,6 +66,10 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
"event_search": "event_search_event_id_idx", "event_search": "event_search_event_id_idx",
} }
# This is a special cache name we use to batch multiple invalidations of caches
# based on the current state when notifying workers over replication.
_CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
class LoggingTransaction(object): class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object """An object that almost-transparently proxies for the 'txn' object
@ -1184,6 +1190,56 @@ class SQLBaseStore(object):
be invalidated. be invalidated.
""" """
txn.call_after(cache_func.invalidate, keys) txn.call_after(cache_func.invalidate, keys)
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
"""Special case invalidation of caches based on current state.
We special case this so that we can batch the cache invalidations into a
single replication poke.
Args:
txn
room_id (str): Room where state changed
members_changed (iterable[str]): The user_ids of members that have changed
"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)
keys = itertools.chain([room_id], members_changed)
self._send_invalidation_to_replication(
txn, _CURRENT_STATE_CACHE_NAME, keys,
)
def _invalidate_state_caches(self, room_id, members_changed):
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.
Args:
room_id (str): Room where state changed
members_changed (iterable[str]): The user_ids of members that have
changed
"""
for member in members_changed:
self.get_rooms_for_user_with_stream_ordering.invalidate((member,))
for host in set(get_domain_from_id(u) for u in members_changed):
self.is_host_joined.invalidate((room_id, host))
self.was_host_joined.invalidate((room_id, host))
self.get_users_in_room.invalidate((room_id,))
self.get_room_summary.invalidate((room_id,))
self.get_current_state_ids.invalidate((room_id,))
def _send_invalidation_to_replication(self, txn, cache_name, keys):
"""Notifies replication that given cache has been invalidated.
Note that this does *not* invalidate the cache locally.
Args:
txn
cache_name (str)
keys (iterable[str])
"""
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
# get_next() returns a context manager which is designed to wrap # get_next() returns a context manager which is designed to wrap
@ -1201,7 +1257,7 @@ class SQLBaseStore(object):
table="cache_invalidation_stream", table="cache_invalidation_stream",
values={ values={
"stream_id": stream_id, "stream_id": stream_id,
"cache_func": cache_func.__name__, "cache_func": cache_name,
"keys": list(keys), "keys": list(keys),
"invalidation_ts": self.clock.time_msec(), "invalidation_ts": self.clock.time_msec(),
} }

View File

@ -979,30 +979,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
if ev_type == EventTypes.Member if ev_type == EventTypes.Member
) )
for member in members_changed: self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
self._invalidate_cache_and_stream(
txn, self.get_rooms_for_user_with_stream_ordering, (member,)
)
for host in set(get_domain_from_id(u) for u in members_changed):
self._invalidate_cache_and_stream(
txn, self.is_host_joined, (room_id, host)
)
self._invalidate_cache_and_stream(
txn, self.was_host_joined, (room_id, host)
)
self._invalidate_cache_and_stream(
txn, self.get_users_in_room, (room_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_room_summary, (room_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_current_state_ids, (room_id,)
)
def _update_forward_extremities_txn(self, txn, new_forward_extremities, def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order): max_stream_order):