Add some invalidations to a cache_stream

This commit is contained in:
Erik Johnston 2016-08-15 10:21:25 +01:00
parent 99bbd90b0d
commit 4d70d1f80e
7 changed files with 117 additions and 31 deletions

View File

@ -122,6 +122,9 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "pushers", "id", db_conn, "pushers", "id",
extra_tables=[("deleted_pushers", "stream_id")], extra_tables=[("deleted_pushers", "stream_id")],
) )
self._cache_id_gen = StreamIdGenerator(
db_conn, "cache_stream", "stream_id",
)
events_max = self._stream_id_gen.get_current_token() events_max = self._stream_id_gen.get_current_token()
event_cache_prefill, min_event_val = self._get_cache_dict( event_cache_prefill, min_event_val = self._get_cache_dict(

View File

@ -861,6 +861,24 @@ class SQLBaseStore(object):
return cache, min_val return cache, min_val
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
ctx = self._cache_id_gen.get_next()
stream_id = ctx.__enter__()
txn.call_after(ctx.__exit__, None, None, None)
self._simple_insert_txn(
txn,
table="cache_stream",
values={
"stream_id": stream_id,
"cache_func": cache_func.__name__,
"keys": list(keys),
"invalidation_ts": self.clock.time_msec(),
}
)
class _RollbackButIsFineException(Exception): class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying """ This exception is used to rollback a transaction without implying

View File

@ -82,32 +82,39 @@ class DirectoryStore(SQLBaseStore):
Returns: Returns:
Deferred Deferred
""" """
try: def alias_txn(txn):
yield self._simple_insert( self._simple_insert_txn(
txn,
"room_aliases", "room_aliases",
{ {
"room_alias": room_alias.to_string(), "room_alias": room_alias.to_string(),
"room_id": room_id, "room_id": room_id,
"creator": creator, "creator": creator,
}, },
desc="create_room_alias_association", )
self._simple_insert_many_txn(
txn,
table="room_alias_servers",
values=[{
"room_alias": room_alias.to_string(),
"server": server,
} for server in servers],
)
self._invalidate_cache_and_stream(
txn, self.get_aliases_for_room, (room_id,)
)
try:
ret = yield self.runInteraction(
"create_room_alias_association", alias_txn
) )
except self.database_engine.module.IntegrityError: except self.database_engine.module.IntegrityError:
raise SynapseError( raise SynapseError(
409, "Room alias %s already exists" % room_alias.to_string() 409, "Room alias %s already exists" % room_alias.to_string()
) )
defer.returnValue(ret)
for server in servers:
# TODO(erikj): Fix this to bulk insert
yield self._simple_insert(
"room_alias_servers",
{
"room_alias": room_alias.to_string(),
"server": server,
},
desc="create_room_alias_association",
)
self.get_aliases_for_room.invalidate((room_id,))
def get_room_alias_creator(self, room_alias): def get_room_alias_creator(self, room_alias):
return self._simple_select_one_onecol( return self._simple_select_one_onecol(

View File

@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database # Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts. # schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 33 SCHEMA_VERSION = 34
dir_path = os.path.abspath(os.path.dirname(__file__)) dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@ -189,18 +189,30 @@ class PresenceStore(SQLBaseStore):
desc="add_presence_list_pending", desc="add_presence_list_pending",
) )
@defer.inlineCallbacks
def set_presence_list_accepted(self, observer_localpart, observed_userid): def set_presence_list_accepted(self, observer_localpart, observed_userid):
result = yield self._simple_update_one( def update_presence_list_txn(txn):
result = self._simple_update_one_txn(
txn,
table="presence_list", table="presence_list",
keyvalues={"user_id": observer_localpart, keyvalues={
"observed_user_id": observed_userid}, "user_id": observer_localpart,
"observed_user_id": observed_userid
},
updatevalues={"accepted": True}, updatevalues={"accepted": True},
desc="set_presence_list_accepted",
) )
self.get_presence_list_accepted.invalidate((observer_localpart,))
self.get_presence_list_observers_accepted.invalidate((observed_userid,)) self._invalidate_cache_and_stream(
defer.returnValue(result) txn, self.get_presence_list_accepted, (observer_localpart,)
)
self._invalidate_cache_and_stream(
txn, self.get_presence_list_observers_accepted, (observed_userid,)
)
return result
return self.runInteraction(
"set_presence_list_accepted", update_presence_list_txn,
)
def get_presence_list(self, observer_localpart, accepted=None): def get_presence_list(self, observer_localpart, accepted=None):
if accepted: if accepted:

View File

@ -277,7 +277,6 @@ class RoomMemberStore(SQLBaseStore):
user_id, membership_list=[Membership.JOIN], user_id, membership_list=[Membership.JOIN],
) )
@defer.inlineCallbacks
def forget(self, user_id, room_id): def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id.""" """Indicate that user_id wishes to discard history for room_id."""
def f(txn): def f(txn):
@ -292,10 +291,13 @@ class RoomMemberStore(SQLBaseStore):
" room_id = ?" " room_id = ?"
) )
txn.execute(sql, (user_id, room_id)) txn.execute(sql, (user_id, room_id))
yield self.runInteraction("forget_membership", f)
self.was_forgotten_at.invalidate_all() txn.call_after(self.was_forgotten_at.invalidate_all)
self.who_forgot_in_room.invalidate_all() txn.call_after(self.did_forget.invalidate, (user_id, room_id))
self.did_forget.invalidate((user_id, room_id)) self._invalidate_cache_and_stream(
txn, self.who_forgot_in_room, (room_id,)
)
return self.runInteraction("forget_membership", f)
@cachedInlineCallbacks(num_args=2) @cachedInlineCallbacks(num_args=2)
def did_forget(self, user_id, room_id): def did_forget(self, user_id, room_id):

View File

@ -0,0 +1,44 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.prepare_database import get_statements
from synapse.storage.engines import PostgresEngine
import logging
logger = logging.getLogger(__name__)
CREATE_TABLE = """
CREATE TABLE cache_stream (
stream_id BIGINT,
cache_func TEXT,
keys TEXT[],
invalidation_ts BIGINT
);
CREATE INDEX cache_stream_id ON cache_stream(stream_id);
"""
def run_create(cur, database_engine, *args, **kwargs):
if not isinstance(database_engine, PostgresEngine):
return
for statement in get_statements(CREATE_TABLE.splitlines()):
cur.execute(statement)
def run_upgrade(cur, database_engine, *args, **kwargs):
pass