From ddd48b6851675cd192e67a143e9dfde051afecad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Dec 2019 14:00:09 +0000 Subject: [PATCH 1/7] Move account validity bg updates to registration store --- synapse/storage/_base.py | 66 ------------------- .../storage/data_stores/main/registration.py | 64 ++++++++++++++++++ 2 files changed, 64 insertions(+), 66 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 459901ac60..7ebab31af0 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -252,8 +252,6 @@ class SQLBaseStore(object): # A set of tables that are not safe to use native upserts in. self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys()) - self._account_validity = self.hs.config.account_validity - # We add the user_directory_search table to the blacklist on SQLite # because the existing search table does not have an index, making it # unsafe to use native upserts. @@ -272,14 +270,6 @@ class SQLBaseStore(object): self.rand = random.SystemRandom() - if self._account_validity.enabled: - self._clock.call_later( - 0.0, - run_as_background_process, - "account_validity_set_expiration_dates", - self._set_expiration_date_when_missing, - ) - @defer.inlineCallbacks def _check_safe_to_upsert(self): """ @@ -312,62 +302,6 @@ class SQLBaseStore(object): self._check_safe_to_upsert, ) - @defer.inlineCallbacks - def _set_expiration_date_when_missing(self): - """ - Retrieves the list of registered users that don't have an expiration date, and - adds an expiration date for each of them. - """ - - def select_users_with_no_expiration_date_txn(txn): - """Retrieves the list of registered users with no expiration date from the - database, filtering out deactivated users. - """ - sql = ( - "SELECT users.name FROM users" - " LEFT JOIN account_validity ON (users.name = account_validity.user_id)" - " WHERE account_validity.user_id is NULL AND users.deactivated = 0;" - ) - txn.execute(sql, []) - - res = self.cursor_to_dict(txn) - if res: - for user in res: - self.set_expiration_date_for_user_txn( - txn, user["name"], use_delta=True - ) - - yield self.runInteraction( - "get_users_with_no_expiration_date", - select_users_with_no_expiration_date_txn, - ) - - def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False): - """Sets an expiration date to the account with the given user ID. - - Args: - user_id (str): User ID to set an expiration date for. - use_delta (bool): If set to False, the expiration date for the user will be - now + validity period. If set to True, this expiration date will be a - random value in the [now + period - d ; now + period] range, d being a - delta equal to 10% of the validity period. - """ - now_ms = self._clock.time_msec() - expiration_ts = now_ms + self._account_validity.period - - if use_delta: - expiration_ts = self.rand.randrange( - expiration_ts - self._account_validity.startup_job_max_delta, - expiration_ts, - ) - - self._simple_upsert_txn( - txn, - "account_validity", - keyvalues={"user_id": user_id}, - values={"expiration_ts_ms": expiration_ts, "email_sent": False}, - ) - def start_profiling(self): self._previous_loop_ts = monotonic_time() diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index 98cf6427c3..653c9318cb 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -926,6 +926,14 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): self._account_validity = hs.config.account_validity + if self._account_validity.enabled: + self._clock.call_later( + 0.0, + run_as_background_process, + "account_validity_set_expiration_dates", + self._set_expiration_date_when_missing, + ) + # Create a background job for culling expired 3PID validity tokens def start_cull(): # run as a background process to make sure that the database transactions @@ -1502,3 +1510,59 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): self._invalidate_cache_and_stream( txn, self.get_user_deactivated_status, (user_id,) ) + + @defer.inlineCallbacks + def _set_expiration_date_when_missing(self): + """ + Retrieves the list of registered users that don't have an expiration date, and + adds an expiration date for each of them. + """ + + def select_users_with_no_expiration_date_txn(txn): + """Retrieves the list of registered users with no expiration date from the + database, filtering out deactivated users. + """ + sql = ( + "SELECT users.name FROM users" + " LEFT JOIN account_validity ON (users.name = account_validity.user_id)" + " WHERE account_validity.user_id is NULL AND users.deactivated = 0;" + ) + txn.execute(sql, []) + + res = self.cursor_to_dict(txn) + if res: + for user in res: + self.set_expiration_date_for_user_txn( + txn, user["name"], use_delta=True + ) + + yield self.runInteraction( + "get_users_with_no_expiration_date", + select_users_with_no_expiration_date_txn, + ) + + def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False): + """Sets an expiration date to the account with the given user ID. + + Args: + user_id (str): User ID to set an expiration date for. + use_delta (bool): If set to False, the expiration date for the user will be + now + validity period. If set to True, this expiration date will be a + random value in the [now + period - d ; now + period] range, d being a + delta equal to 10% of the validity period. + """ + now_ms = self._clock.time_msec() + expiration_ts = now_ms + self._account_validity.period + + if use_delta: + expiration_ts = self.rand.randrange( + expiration_ts - self._account_validity.startup_job_max_delta, + expiration_ts, + ) + + self._simple_upsert_txn( + txn, + "account_validity", + keyvalues={"user_id": user_id}, + values={"expiration_ts_ms": expiration_ts, "email_sent": False}, + ) From 6b2867096b8a2cf8afdb5de2bab93bbf31f76065 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Dec 2019 14:08:48 +0000 Subject: [PATCH 2/7] Move event fetch vars to EventWorkStore --- synapse/storage/_base.py | 12 ------------ synapse/storage/data_stores/main/client_ips.py | 2 +- synapse/storage/data_stores/main/devices.py | 2 +- synapse/storage/data_stores/main/events_worker.py | 13 +++++++++++++ 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7ebab31af0..6b8120a608 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,7 +18,6 @@ import itertools import logging import random import sys -import threading import time from typing import Iterable, Tuple @@ -36,7 +35,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id from synapse.util import batch_iter -from synapse.util.caches.descriptors import Cache from synapse.util.stringutils import exception_to_unicode # import a function which will return a monotonic time, in seconds @@ -237,16 +235,6 @@ class SQLBaseStore(object): # to watch it self._txn_perf_counters = PerformanceCounters() - self._get_event_cache = Cache( - "*getEvent*", keylen=3, max_entries=hs.config.event_cache_size - ) - - self._event_fetch_lock = threading.Condition() - self._event_fetch_list = [] - self._event_fetch_ongoing = 0 - - self._pending_ds = [] - self.database_engine = hs.database_engine # A set of tables that are not safe to use native upserts in. diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index 706c6a1f3f..7931b876ce 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -21,7 +21,7 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage import background_updates -from synapse.storage._base import Cache +from synapse.util.caches.descriptors import Cache from synapse.util.caches import CACHE_SIZE_FACTOR logger = logging.getLogger(__name__) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 71f62036c0..b50ee026a2 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -31,11 +31,11 @@ from synapse.logging.opentracing import ( ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import ( - Cache, SQLBaseStore, db_to_json, make_in_list_sql_clause, ) +from synapse.util.caches.descriptors import Cache from synapse.storage.background_updates import BackgroundUpdateStore from synapse.types import get_verify_key_from_cross_signing_key from synapse.util import batch_iter diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 4c4b76bd93..e782e8f481 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -17,6 +17,7 @@ from __future__ import division import itertools import logging +import threading from collections import namedtuple from canonicaljson import json @@ -34,6 +35,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.types import get_domain_from_id from synapse.util import batch_iter +from synapse.util.caches.descriptors import Cache from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -53,6 +55,17 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def __init__(self, db_conn, hs): + super(EventsWorkerStore, self).__init__(db_conn, hs) + + self._get_event_cache = Cache( + "*getEvent*", keylen=3, max_entries=hs.config.event_cache_size + ) + + self._event_fetch_lock = threading.Condition() + self._event_fetch_list = [] + self._event_fetch_ongoing = 0 + def get_received_ts(self, event_id): """Get received_ts (when it was persisted) for the event. From 1056d6885a7b96be85c5ff19e26eba2ed3f90dd4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Dec 2019 14:28:46 +0000 Subject: [PATCH 3/7] Move cache invalidation to main data store --- synapse/replication/slave/storage/_base.py | 3 +- synapse/storage/_base.py | 104 -------------- synapse/storage/data_stores/main/__init__.py | 2 + synapse/storage/data_stores/main/cache.py | 131 ++++++++++++++++++ .../storage/data_stores/main/client_ips.py | 2 +- synapse/storage/data_stores/main/devices.py | 14 +- 6 files changed, 143 insertions(+), 113 deletions(-) create mode 100644 synapse/storage/data_stores/main/cache.py diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 456bc005a0..71e5877aca 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -18,7 +18,8 @@ from typing import Dict import six -from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore +from synapse.storage._base import SQLBaseStore +from synapse.storage.data_stores.main.cache import _CURRENT_STATE_CACHE_NAME from synapse.storage.engines import PostgresEngine from ._slaved_id_tracker import SlavedIdTracker diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6b8120a608..c02248cfe9 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -14,7 +14,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools import logging import random import sys @@ -34,7 +33,6 @@ from synapse.logging.context import LoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id -from synapse.util import batch_iter from synapse.util.stringutils import exception_to_unicode # import a function which will return a monotonic time, in seconds @@ -77,10 +75,6 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = { "event_search": "event_search_event_id_idx", } -# This is a special cache name we use to batch multiple invalidations of caches -# based on the current state when notifying workers over replication. -_CURRENT_STATE_CACHE_NAME = "cs_cache_fake" - class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object @@ -1322,47 +1316,6 @@ class SQLBaseStore(object): return cache, min_val - def _invalidate_cache_and_stream(self, txn, cache_func, keys): - """Invalidates the cache and adds it to the cache stream so slaves - will know to invalidate their caches. - - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should - be invalidated. - """ - txn.call_after(cache_func.invalidate, keys) - self._send_invalidation_to_replication(txn, cache_func.__name__, keys) - - def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): - """Special case invalidation of caches based on current state. - - We special case this so that we can batch the cache invalidations into a - single replication poke. - - Args: - txn - room_id (str): Room where state changed - members_changed (iterable[str]): The user_ids of members that have changed - """ - txn.call_after(self._invalidate_state_caches, room_id, members_changed) - - if members_changed: - # We need to be careful that the size of the `members_changed` list - # isn't so large that it causes problems sending over replication, so we - # send them in chunks. - # Max line length is 16K, and max user ID length is 255, so 50 should - # be safe. - for chunk in batch_iter(members_changed, 50): - keys = itertools.chain([room_id], chunk) - self._send_invalidation_to_replication( - txn, _CURRENT_STATE_CACHE_NAME, keys - ) - else: - # if no members changed, we still need to invalidate the other caches. - self._send_invalidation_to_replication( - txn, _CURRENT_STATE_CACHE_NAME, [room_id] - ) - def _invalidate_state_caches(self, room_id, members_changed): """Invalidates caches that are based on the current state, but does not stream invalidations down replication. @@ -1396,63 +1349,6 @@ class SQLBaseStore(object): # which is fine. pass - def _send_invalidation_to_replication(self, txn, cache_name, keys): - """Notifies replication that given cache has been invalidated. - - Note that this does *not* invalidate the cache locally. - - Args: - txn - cache_name (str) - keys (iterable[str]) - """ - - if isinstance(self.database_engine, PostgresEngine): - # get_next() returns a context manager which is designed to wrap - # the transaction. However, we want to only get an ID when we want - # to use it, here, so we need to call __enter__ manually, and have - # __exit__ called after the transaction finishes. - ctx = self._cache_id_gen.get_next() - stream_id = ctx.__enter__() - txn.call_on_exception(ctx.__exit__, None, None, None) - txn.call_after(ctx.__exit__, None, None, None) - txn.call_after(self.hs.get_notifier().on_new_replication_data) - - self._simple_insert_txn( - txn, - table="cache_invalidation_stream", - values={ - "stream_id": stream_id, - "cache_func": cache_name, - "keys": list(keys), - "invalidation_ts": self.clock.time_msec(), - }, - ) - - def get_all_updated_caches(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed([]) - - def get_all_updated_caches_txn(txn): - # We purposefully don't bound by the current token, as we want to - # send across cache invalidations as quickly as possible. Cache - # invalidations are idempotent, so duplicates are fine. - sql = ( - "SELECT stream_id, cache_func, keys, invalidation_ts" - " FROM cache_invalidation_stream" - " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, limit)) - return txn.fetchall() - - return self.runInteraction("get_all_updated_caches", get_all_updated_caches_txn) - - def get_cache_stream_token(self): - if self._cache_id_gen: - return self._cache_id_gen.get_current_token() - else: - return 0 - def _simple_select_list_paginate( self, table, diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index 10c940df1e..474924c68f 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -32,6 +32,7 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache from .account_data import AccountDataStore from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore +from .cache import CacheInvalidationStore from .client_ips import ClientIpStore from .deviceinbox import DeviceInboxStore from .devices import DeviceStore @@ -110,6 +111,7 @@ class DataStore( MonthlyActiveUsersStore, StatsStore, RelationsStore, + CacheInvalidationStore, ): def __init__(self, db_conn, hs): self.hs = hs diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py new file mode 100644 index 0000000000..6efcc5f3b0 --- /dev/null +++ b/synapse/storage/data_stores/main/cache.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import itertools +import logging + +from twisted.internet import defer + +from synapse.storage._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine +from synapse.util import batch_iter + +logger = logging.getLogger(__name__) + + +# This is a special cache name we use to batch multiple invalidations of caches +# based on the current state when notifying workers over replication. +_CURRENT_STATE_CACHE_NAME = "cs_cache_fake" + + +class CacheInvalidationStore(SQLBaseStore): + def _invalidate_cache_and_stream(self, txn, cache_func, keys): + """Invalidates the cache and adds it to the cache stream so slaves + will know to invalidate their caches. + + This should only be used to invalidate caches where slaves won't + otherwise know from other replication streams that the cache should + be invalidated. + """ + txn.call_after(cache_func.invalidate, keys) + self._send_invalidation_to_replication(txn, cache_func.__name__, keys) + + def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): + """Special case invalidation of caches based on current state. + + We special case this so that we can batch the cache invalidations into a + single replication poke. + + Args: + txn + room_id (str): Room where state changed + members_changed (iterable[str]): The user_ids of members that have changed + """ + txn.call_after(self._invalidate_state_caches, room_id, members_changed) + + if members_changed: + # We need to be careful that the size of the `members_changed` list + # isn't so large that it causes problems sending over replication, so we + # send them in chunks. + # Max line length is 16K, and max user ID length is 255, so 50 should + # be safe. + for chunk in batch_iter(members_changed, 50): + keys = itertools.chain([room_id], chunk) + self._send_invalidation_to_replication( + txn, _CURRENT_STATE_CACHE_NAME, keys + ) + else: + # if no members changed, we still need to invalidate the other caches. + self._send_invalidation_to_replication( + txn, _CURRENT_STATE_CACHE_NAME, [room_id] + ) + + def _send_invalidation_to_replication(self, txn, cache_name, keys): + """Notifies replication that given cache has been invalidated. + + Note that this does *not* invalidate the cache locally. + + Args: + txn + cache_name (str) + keys (iterable[str]) + """ + + if isinstance(self.database_engine, PostgresEngine): + # get_next() returns a context manager which is designed to wrap + # the transaction. However, we want to only get an ID when we want + # to use it, here, so we need to call __enter__ manually, and have + # __exit__ called after the transaction finishes. + ctx = self._cache_id_gen.get_next() + stream_id = ctx.__enter__() + txn.call_on_exception(ctx.__exit__, None, None, None) + txn.call_after(ctx.__exit__, None, None, None) + txn.call_after(self.hs.get_notifier().on_new_replication_data) + + self._simple_insert_txn( + txn, + table="cache_invalidation_stream", + values={ + "stream_id": stream_id, + "cache_func": cache_name, + "keys": list(keys), + "invalidation_ts": self.clock.time_msec(), + }, + ) + + def get_all_updated_caches(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_caches_txn(txn): + # We purposefully don't bound by the current token, as we want to + # send across cache invalidations as quickly as possible. Cache + # invalidations are idempotent, so duplicates are fine. + sql = ( + "SELECT stream_id, cache_func, keys, invalidation_ts" + " FROM cache_invalidation_stream" + " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, limit)) + return txn.fetchall() + + return self.runInteraction("get_all_updated_caches", get_all_updated_caches_txn) + + def get_cache_stream_token(self): + if self._cache_id_gen: + return self._cache_id_gen.get_current_token() + else: + return 0 diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index 7931b876ce..cae93b0e22 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -21,8 +21,8 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage import background_updates -from synapse.util.caches.descriptors import Cache from synapse.util.caches import CACHE_SIZE_FACTOR +from synapse.util.caches.descriptors import Cache logger = logging.getLogger(__name__) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index b50ee026a2..a3ad23e783 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -30,16 +30,16 @@ from synapse.logging.opentracing import ( whitelisted_homeserver, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage._base import ( - SQLBaseStore, - db_to_json, - make_in_list_sql_clause, -) -from synapse.util.caches.descriptors import Cache +from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.types import get_verify_key_from_cross_signing_key from synapse.util import batch_iter -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import ( + Cache, + cached, + cachedInlineCallbacks, + cachedList, +) logger = logging.getLogger(__name__) From a785a2febe6783bfa800504c6750028bc61c84ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Dec 2019 14:32:00 +0000 Subject: [PATCH 4/7] Newsfile --- changelog.d/6454.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6454.misc diff --git a/changelog.d/6454.misc b/changelog.d/6454.misc new file mode 100644 index 0000000000..9e5259157c --- /dev/null +++ b/changelog.d/6454.misc @@ -0,0 +1 @@ +Move data store specific code out of `SQLBaseStore`. From 00f0d67566cdfe8eae44aeae1c982c42a255cfcd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Dec 2019 10:45:59 +0000 Subject: [PATCH 5/7] Move get_user_count_txn out of base store --- synapse/storage/_base.py | 12 ------------ synapse/storage/data_stores/main/__init__.py | 12 ++++++++++++ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c02248cfe9..90019c8b0a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1440,18 +1440,6 @@ class SQLBaseStore(object): return cls.cursor_to_dict(txn) - def get_user_count_txn(self, txn): - """Get a total number of registered users in the users list. - - Args: - txn : Transaction object - Returns: - int : number of users - """ - sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" - txn.execute(sql_count) - return txn.fetchone()[0] - def _simple_search_list( self, table, term, col, retcols, desc="_simple_search_list" ): diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index 474924c68f..76315935dd 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -516,6 +516,18 @@ class DataStore( retval = {"users": users, "total": count} return retval + def get_user_count_txn(self, txn): + """Get a total number of registered users in the users list. + + Args: + txn : Transaction object + Returns: + int : number of users + """ + sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" + txn.execute(sql_count) + return txn.fetchone()[0] + def search_users(self, term): """Function to search users list for one or more users with the matched term. From a7f20500ff39399634d4623e284fb2f9892776ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Dec 2019 15:45:42 +0000 Subject: [PATCH 6/7] _CURRENT_STATE_CACHE_NAME is public --- synapse/replication/slave/storage/_base.py | 4 ++-- synapse/storage/data_stores/main/cache.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 71e5877aca..6ece1d6745 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -19,7 +19,7 @@ from typing import Dict import six from synapse.storage._base import SQLBaseStore -from synapse.storage.data_stores.main.cache import _CURRENT_STATE_CACHE_NAME +from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME from synapse.storage.engines import PostgresEngine from ._slaved_id_tracker import SlavedIdTracker @@ -63,7 +63,7 @@ class BaseSlavedStore(SQLBaseStore): if stream_name == "caches": self._cache_id_gen.advance(token) for row in rows: - if row.cache_func == _CURRENT_STATE_CACHE_NAME: + if row.cache_func == CURRENT_STATE_CACHE_NAME: room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index 6efcc5f3b0..258c08722a 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) # This is a special cache name we use to batch multiple invalidations of caches # based on the current state when notifying workers over replication. -_CURRENT_STATE_CACHE_NAME = "cs_cache_fake" +CURRENT_STATE_CACHE_NAME = "cs_cache_fake" class CacheInvalidationStore(SQLBaseStore): @@ -65,12 +65,12 @@ class CacheInvalidationStore(SQLBaseStore): for chunk in batch_iter(members_changed, 50): keys = itertools.chain([room_id], chunk) self._send_invalidation_to_replication( - txn, _CURRENT_STATE_CACHE_NAME, keys + txn, CURRENT_STATE_CACHE_NAME, keys ) else: # if no members changed, we still need to invalidate the other caches. self._send_invalidation_to_replication( - txn, _CURRENT_STATE_CACHE_NAME, [room_id] + txn, CURRENT_STATE_CACHE_NAME, [room_id] ) def _send_invalidation_to_replication(self, txn, cache_name, keys): From 9186c105a0d49c60582ff93cf108e511334c57fd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Dec 2019 15:46:19 +0000 Subject: [PATCH 7/7] Revert "Move get_user_count_txn out of base store" This reverts commit 00f0d67566cdfe8eae44aeae1c982c42a255cfcd. Its going to get removed soon, so lets not make merge conflicts. --- synapse/storage/_base.py | 12 ++++++++++++ synapse/storage/data_stores/main/__init__.py | 12 ------------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 90019c8b0a..c02248cfe9 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1440,6 +1440,18 @@ class SQLBaseStore(object): return cls.cursor_to_dict(txn) + def get_user_count_txn(self, txn): + """Get a total number of registered users in the users list. + + Args: + txn : Transaction object + Returns: + int : number of users + """ + sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" + txn.execute(sql_count) + return txn.fetchone()[0] + def _simple_search_list( self, table, term, col, retcols, desc="_simple_search_list" ): diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index 76315935dd..474924c68f 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -516,18 +516,6 @@ class DataStore( retval = {"users": users, "total": count} return retval - def get_user_count_txn(self, txn): - """Get a total number of registered users in the users list. - - Args: - txn : Transaction object - Returns: - int : number of users - """ - sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" - txn.execute(sql_count) - return txn.fetchone()[0] - def search_users(self, term): """Function to search users list for one or more users with the matched term.