Review comments

This commit is contained in:
Erik Johnston 2019-10-30 13:33:38 +00:00
parent 9fb96889a4
commit a8d16f6c00
6 changed files with 37 additions and 19 deletions

View File

@ -226,7 +226,6 @@ class HomeServer(object):
self.admin_redaction_ratelimiter = Ratelimiter() self.admin_redaction_ratelimiter = Ratelimiter()
self.registration_ratelimiter = Ratelimiter() self.registration_ratelimiter = Ratelimiter()
self.datastore = None
self.datastores = None self.datastores = None
# Other kwargs are explicit dependencies # Other kwargs are explicit dependencies
@ -236,8 +235,8 @@ class HomeServer(object):
def setup(self): def setup(self):
logger.info("Setting up.") logger.info("Setting up.")
with self.get_db_conn() as conn: with self.get_db_conn() as conn:
self.datastore = self.DATASTORE_CLASS(conn, self) datastore = self.DATASTORE_CLASS(conn, self)
self.datastores = DataStores(self.datastore, conn, self) self.datastores = DataStores(datastore, conn, self)
conn.commit() conn.commit()
logger.info("Finished setting up.") logger.info("Finished setting up.")

View File

@ -29,7 +29,7 @@ stored in `synapse.storage.schema`.
from synapse.storage.data_stores import DataStores from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStore from synapse.storage.persist_events import EventsPersistenceStorage
__all__ = ["DataStores", "DataStore"] __all__ = ["DataStores", "DataStore"]
@ -44,7 +44,7 @@ class Storage(object):
# interfaces. # interfaces.
self.main = stores.main self.main = stores.main
self.persistence = EventsPersistenceStore(hs, stores) self.persistence = EventsPersistenceStorage(hs, stores)
def are_all_users_on_domain(txn, database_engine, domain): def are_all_users_on_domain(txn, database_engine, domain):

View File

@ -146,7 +146,7 @@ class EventsStore(
@_retry_on_integrity_error @_retry_on_integrity_error
@defer.inlineCallbacks @defer.inlineCallbacks
def _persist_events( def _persist_events_and_state_updates(
self, self,
events_and_contexts, events_and_contexts,
current_state_for_room, current_state_for_room,
@ -155,18 +155,27 @@ class EventsStore(
backfilled=False, backfilled=False,
delete_existing=False, delete_existing=False,
): ):
"""Persist events to db """Persist a set of events alongside updates to the current state and
forward extremities tables.
Args: Args:
events_and_contexts (list[(EventBase, EventContext)]): events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool): current_state_for_room (dict[str, dict]): Map from room_id to the
current state of the room based on forward extremities
state_delta_for_room (dict[str, tuple]): Map from room_id to tuple
of `(to_delete, to_insert)` where to_delete is a list
of type/state keys to remove from current state, and to_insert
is a map (type,key)->event_id giving the state delta in each
room.
new_forward_extremities (dict[str, list[str]]): Map from room_id
to list of event IDs that are the new forward extremities of
the room.
backfilled (bool)
delete_existing (bool): delete_existing (bool):
Returns: Returns:
Deferred: resolves when the events have been persisted Deferred: resolves when the events have been persisted
""" """
if not events_and_contexts:
return
# We want to calculate the stream orderings as late as possible, as # We want to calculate the stream orderings as late as possible, as
# we only notify after all events with a lesser stream ordering have # we only notify after all events with a lesser stream ordering have

View File

@ -171,7 +171,13 @@ class _EventPeristenceQueue(object):
pass pass
class EventsPersistenceStore(object): class EventsPersistenceStorage(object):
"""High level interface for handling persisting newly received events.
Takes care of batching up events by room, and calculating the necessary
current state and forward extremity changes.
"""
def __init__(self, hs, stores: DataStores): def __init__(self, hs, stores: DataStores):
# We ultimately want to split out the state store from the main store, # We ultimately want to split out the state store from the main store,
# so we use separate variables here even though they point to the same # so we use separate variables here even though they point to the same
@ -257,7 +263,8 @@ class EventsPersistenceStore(object):
def _persist_events( def _persist_events(
self, events_and_contexts, backfilled=False, delete_existing=False self, events_and_contexts, backfilled=False, delete_existing=False
): ):
"""Persist events to db """Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
Args: Args:
events_and_contexts (list[(EventBase, EventContext)]): events_and_contexts (list[(EventBase, EventContext)]):
@ -399,7 +406,7 @@ class EventsPersistenceStore(object):
if current_state is not None: if current_state is not None:
current_state_for_room[room_id] = current_state current_state_for_room[room_id] = current_state
yield self.main_store._persist_events( yield self.main_store._persist_events_and_state_updates(
chunk, chunk,
current_state_for_room=current_state_for_room, current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room, state_delta_for_room=state_delta_for_room,

View File

@ -178,7 +178,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
kr = keyring.Keyring(self.hs) kr = keyring.Keyring(self.hs)
key1 = signedjson.key.generate_signing_key(1) key1 = signedjson.key.generate_signing_key(1)
r = self.hs.datastore.store_server_verify_keys( r = self.hs.get_datastore().store_server_verify_keys(
"server9", "server9",
time.time() * 1000, time.time() * 1000,
[("server9", get_key_id(key1), FetchKeyResult(get_verify_key(key1), 1000))], [("server9", get_key_id(key1), FetchKeyResult(get_verify_key(key1), 1000))],
@ -209,7 +209,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
) )
key1 = signedjson.key.generate_signing_key(1) key1 = signedjson.key.generate_signing_key(1)
r = self.hs.datastore.store_server_verify_keys( r = self.hs.get_datastore().store_server_verify_keys(
"server9", "server9",
time.time() * 1000, time.time() * 1000,
[("server9", get_key_id(key1), FetchKeyResult(get_verify_key(key1), None))], [("server9", get_key_id(key1), FetchKeyResult(get_verify_key(key1), None))],

View File

@ -36,7 +36,8 @@ class MessageAcceptTests(unittest.TestCase):
# Figure out what the most recent event is # Figure out what the most recent event is
most_recent = self.successResultOf( most_recent = self.successResultOf(
maybeDeferred( maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id self.homeserver.get_datastore().get_latest_event_ids_in_room,
self.room_id,
) )
)[0] )[0]
@ -75,7 +76,8 @@ class MessageAcceptTests(unittest.TestCase):
self.assertEqual( self.assertEqual(
self.successResultOf( self.successResultOf(
maybeDeferred( maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id self.homeserver.get_datastore().get_latest_event_ids_in_room,
self.room_id,
) )
)[0], )[0],
"$join:test.serv", "$join:test.serv",
@ -97,7 +99,8 @@ class MessageAcceptTests(unittest.TestCase):
# Figure out what the most recent event is # Figure out what the most recent event is
most_recent = self.successResultOf( most_recent = self.successResultOf(
maybeDeferred( maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id self.homeserver.get_datastore().get_latest_event_ids_in_room,
self.room_id,
) )
)[0] )[0]
@ -137,6 +140,6 @@ class MessageAcceptTests(unittest.TestCase):
# Make sure the invalid event isn't there # Make sure the invalid event isn't there
extrem = maybeDeferred( extrem = maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id self.homeserver.get_datastore().get_latest_event_ids_in_room, self.room_id
) )
self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv") self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")