Split out SignatureStore and EventFederationStore
This commit is contained in:
parent
17445e6701
commit
a9a2d66cdd
|
@ -17,13 +17,13 @@ import logging
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.storage.event_federation import EventFederationStore
|
from synapse.storage.event_federation import EventFederationWorkerStore
|
||||||
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
|
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
|
||||||
from synapse.storage.events_worker import EventsWorkerStore
|
from synapse.storage.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.roommember import RoomMemberWorkerStore
|
from synapse.storage.roommember import RoomMemberWorkerStore
|
||||||
from synapse.storage.state import StateGroupWorkerStore
|
from synapse.storage.state import StateGroupWorkerStore
|
||||||
from synapse.storage.stream import StreamStore
|
from synapse.storage.stream import StreamStore
|
||||||
from synapse.storage.signatures import SignatureStore
|
from synapse.storage.signatures import SignatureWorkerStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
@ -40,8 +40,12 @@ logger = logging.getLogger(__name__)
|
||||||
# the method descriptor on the DataStore and chuck them into our class.
|
# the method descriptor on the DataStore and chuck them into our class.
|
||||||
|
|
||||||
|
|
||||||
class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
|
class SlavedEventStore(EventFederationWorkerStore,
|
||||||
EventsWorkerStore, StateGroupWorkerStore,
|
RoomMemberWorkerStore,
|
||||||
|
EventPushActionsWorkerStore,
|
||||||
|
EventsWorkerStore,
|
||||||
|
StateGroupWorkerStore,
|
||||||
|
SignatureWorkerStore,
|
||||||
BaseSlavedStore):
|
BaseSlavedStore):
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
|
@ -72,9 +76,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
|
||||||
|
|
||||||
# Cached functions can't be accessed through a class instance so we need
|
# Cached functions can't be accessed through a class instance so we need
|
||||||
# to reach inside the __dict__ to extract them.
|
# to reach inside the __dict__ to extract them.
|
||||||
get_latest_event_ids_in_room = EventFederationStore.__dict__[
|
|
||||||
"get_latest_event_ids_in_room"
|
|
||||||
]
|
|
||||||
|
|
||||||
get_recent_event_ids_for_room = (
|
get_recent_event_ids_for_room = (
|
||||||
StreamStore.__dict__["get_recent_event_ids_for_room"]
|
StreamStore.__dict__["get_recent_event_ids_for_room"]
|
||||||
|
@ -100,48 +101,13 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
|
||||||
|
|
||||||
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
||||||
|
|
||||||
get_backfill_events = DataStore.get_backfill_events.__func__
|
|
||||||
_get_backfill_events = DataStore._get_backfill_events.__func__
|
|
||||||
get_missing_events = DataStore.get_missing_events.__func__
|
|
||||||
_get_missing_events = DataStore._get_missing_events.__func__
|
|
||||||
|
|
||||||
get_auth_chain = DataStore.get_auth_chain.__func__
|
|
||||||
get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
|
|
||||||
_get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
|
|
||||||
|
|
||||||
get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
|
get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
|
||||||
|
|
||||||
get_forward_extremeties_for_room = (
|
|
||||||
DataStore.get_forward_extremeties_for_room.__func__
|
|
||||||
)
|
|
||||||
_get_forward_extremeties_for_room = (
|
|
||||||
EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
|
|
||||||
)
|
|
||||||
|
|
||||||
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
|
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
|
||||||
|
|
||||||
get_federation_out_pos = DataStore.get_federation_out_pos.__func__
|
get_federation_out_pos = DataStore.get_federation_out_pos.__func__
|
||||||
update_federation_out_pos = DataStore.update_federation_out_pos.__func__
|
update_federation_out_pos = DataStore.update_federation_out_pos.__func__
|
||||||
|
|
||||||
get_latest_event_ids_and_hashes_in_room = (
|
|
||||||
DataStore.get_latest_event_ids_and_hashes_in_room.__func__
|
|
||||||
)
|
|
||||||
_get_latest_event_ids_and_hashes_in_room = (
|
|
||||||
DataStore._get_latest_event_ids_and_hashes_in_room.__func__
|
|
||||||
)
|
|
||||||
_get_event_reference_hashes_txn = (
|
|
||||||
DataStore._get_event_reference_hashes_txn.__func__
|
|
||||||
)
|
|
||||||
add_event_hashes = (
|
|
||||||
DataStore.add_event_hashes.__func__
|
|
||||||
)
|
|
||||||
get_event_reference_hashes = (
|
|
||||||
SignatureStore.__dict__["get_event_reference_hashes"]
|
|
||||||
)
|
|
||||||
get_event_reference_hash = (
|
|
||||||
SignatureStore.__dict__["get_event_reference_hash"]
|
|
||||||
)
|
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedEventStore, self).stream_positions()
|
result = super(SlavedEventStore, self).stream_positions()
|
||||||
result["events"] = self._stream_id_gen.get_current_token()
|
result["events"] = self._stream_id_gen.get_current_token()
|
||||||
|
|
|
@ -15,7 +15,10 @@
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
|
from synapse.storage.events import EventsWorkerStore
|
||||||
|
from synapse.storage.signatures import SignatureWorkerStore
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from unpaddedbase64 import encode_base64
|
from unpaddedbase64 import encode_base64
|
||||||
|
@ -27,30 +30,8 @@ from Queue import PriorityQueue, Empty
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class EventFederationStore(SQLBaseStore):
|
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||||
""" Responsible for storing and serving up the various graphs associated
|
SQLBaseStore):
|
||||||
with an event. Including the main event graph and the auth chains for an
|
|
||||||
event.
|
|
||||||
|
|
||||||
Also has methods for getting the front (latest) and back (oldest) edges
|
|
||||||
of the event graphs. These are used to generate the parents for new events
|
|
||||||
and backfilling from another server respectively.
|
|
||||||
"""
|
|
||||||
|
|
||||||
EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
|
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
|
||||||
super(EventFederationStore, self).__init__(db_conn, hs)
|
|
||||||
|
|
||||||
self.register_background_update_handler(
|
|
||||||
self.EVENT_AUTH_STATE_ONLY,
|
|
||||||
self._background_delete_non_state_event_auth,
|
|
||||||
)
|
|
||||||
|
|
||||||
hs.get_clock().looping_call(
|
|
||||||
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_auth_chain(self, event_ids, include_given=False):
|
def get_auth_chain(self, event_ids, include_given=False):
|
||||||
"""Get auth events for given event_ids. The events *must* be state events.
|
"""Get auth events for given event_ids. The events *must* be state events.
|
||||||
|
|
||||||
|
@ -228,88 +209,6 @@ class EventFederationStore(SQLBaseStore):
|
||||||
|
|
||||||
return int(min_depth) if min_depth is not None else None
|
return int(min_depth) if min_depth is not None else None
|
||||||
|
|
||||||
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
|
|
||||||
min_depth = self._get_min_depth_interaction(txn, room_id)
|
|
||||||
|
|
||||||
if min_depth and depth >= min_depth:
|
|
||||||
return
|
|
||||||
|
|
||||||
self._simple_upsert_txn(
|
|
||||||
txn,
|
|
||||||
table="room_depth",
|
|
||||||
keyvalues={
|
|
||||||
"room_id": room_id,
|
|
||||||
},
|
|
||||||
values={
|
|
||||||
"min_depth": depth,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
def _handle_mult_prev_events(self, txn, events):
|
|
||||||
"""
|
|
||||||
For the given event, update the event edges table and forward and
|
|
||||||
backward extremities tables.
|
|
||||||
"""
|
|
||||||
self._simple_insert_many_txn(
|
|
||||||
txn,
|
|
||||||
table="event_edges",
|
|
||||||
values=[
|
|
||||||
{
|
|
||||||
"event_id": ev.event_id,
|
|
||||||
"prev_event_id": e_id,
|
|
||||||
"room_id": ev.room_id,
|
|
||||||
"is_state": False,
|
|
||||||
}
|
|
||||||
for ev in events
|
|
||||||
for e_id, _ in ev.prev_events
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
self._update_backward_extremeties(txn, events)
|
|
||||||
|
|
||||||
def _update_backward_extremeties(self, txn, events):
|
|
||||||
"""Updates the event_backward_extremities tables based on the new/updated
|
|
||||||
events being persisted.
|
|
||||||
|
|
||||||
This is called for new events *and* for events that were outliers, but
|
|
||||||
are now being persisted as non-outliers.
|
|
||||||
|
|
||||||
Forward extremities are handled when we first start persisting the events.
|
|
||||||
"""
|
|
||||||
events_by_room = {}
|
|
||||||
for ev in events:
|
|
||||||
events_by_room.setdefault(ev.room_id, []).append(ev)
|
|
||||||
|
|
||||||
query = (
|
|
||||||
"INSERT INTO event_backward_extremities (event_id, room_id)"
|
|
||||||
" SELECT ?, ? WHERE NOT EXISTS ("
|
|
||||||
" SELECT 1 FROM event_backward_extremities"
|
|
||||||
" WHERE event_id = ? AND room_id = ?"
|
|
||||||
" )"
|
|
||||||
" AND NOT EXISTS ("
|
|
||||||
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
|
|
||||||
" AND outlier = ?"
|
|
||||||
" )"
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.executemany(query, [
|
|
||||||
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
|
|
||||||
for ev in events for e_id, _ in ev.prev_events
|
|
||||||
if not ev.internal_metadata.is_outlier()
|
|
||||||
])
|
|
||||||
|
|
||||||
query = (
|
|
||||||
"DELETE FROM event_backward_extremities"
|
|
||||||
" WHERE event_id = ? AND room_id = ?"
|
|
||||||
)
|
|
||||||
txn.executemany(
|
|
||||||
query,
|
|
||||||
[
|
|
||||||
(ev.event_id, ev.room_id) for ev in events
|
|
||||||
if not ev.internal_metadata.is_outlier()
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
|
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
|
||||||
"""For a given room_id and stream_ordering, return the forward
|
"""For a given room_id and stream_ordering, return the forward
|
||||||
extremeties of the room at that point in "time".
|
extremeties of the room at that point in "time".
|
||||||
|
@ -371,28 +270,6 @@ class EventFederationStore(SQLBaseStore):
|
||||||
get_forward_extremeties_for_room_txn
|
get_forward_extremeties_for_room_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
def _delete_old_forward_extrem_cache(self):
|
|
||||||
def _delete_old_forward_extrem_cache_txn(txn):
|
|
||||||
# Delete entries older than a month, while making sure we don't delete
|
|
||||||
# the only entries for a room.
|
|
||||||
sql = ("""
|
|
||||||
DELETE FROM stream_ordering_to_exterm
|
|
||||||
WHERE
|
|
||||||
room_id IN (
|
|
||||||
SELECT room_id
|
|
||||||
FROM stream_ordering_to_exterm
|
|
||||||
WHERE stream_ordering > ?
|
|
||||||
) AND stream_ordering < ?
|
|
||||||
""")
|
|
||||||
txn.execute(
|
|
||||||
sql,
|
|
||||||
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
|
|
||||||
)
|
|
||||||
return self.runInteraction(
|
|
||||||
"_delete_old_forward_extrem_cache",
|
|
||||||
_delete_old_forward_extrem_cache_txn
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_backfill_events(self, room_id, event_list, limit):
|
def get_backfill_events(self, room_id, event_list, limit):
|
||||||
"""Get a list of Events for a given topic that occurred before (and
|
"""Get a list of Events for a given topic that occurred before (and
|
||||||
including) the events in event_list. Return a list of max size `limit`
|
including) the events in event_list. Return a list of max size `limit`
|
||||||
|
@ -522,6 +399,135 @@ class EventFederationStore(SQLBaseStore):
|
||||||
|
|
||||||
return event_results
|
return event_results
|
||||||
|
|
||||||
|
|
||||||
|
class EventFederationStore(EventFederationWorkerStore):
|
||||||
|
""" Responsible for storing and serving up the various graphs associated
|
||||||
|
with an event. Including the main event graph and the auth chains for an
|
||||||
|
event.
|
||||||
|
|
||||||
|
Also has methods for getting the front (latest) and back (oldest) edges
|
||||||
|
of the event graphs. These are used to generate the parents for new events
|
||||||
|
and backfilling from another server respectively.
|
||||||
|
"""
|
||||||
|
|
||||||
|
EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
|
||||||
|
|
||||||
|
def __init__(self, db_conn, hs):
|
||||||
|
super(EventFederationStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
|
self.register_background_update_handler(
|
||||||
|
self.EVENT_AUTH_STATE_ONLY,
|
||||||
|
self._background_delete_non_state_event_auth,
|
||||||
|
)
|
||||||
|
|
||||||
|
hs.get_clock().looping_call(
|
||||||
|
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
|
||||||
|
min_depth = self._get_min_depth_interaction(txn, room_id)
|
||||||
|
|
||||||
|
if min_depth and depth >= min_depth:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._simple_upsert_txn(
|
||||||
|
txn,
|
||||||
|
table="room_depth",
|
||||||
|
keyvalues={
|
||||||
|
"room_id": room_id,
|
||||||
|
},
|
||||||
|
values={
|
||||||
|
"min_depth": depth,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def _handle_mult_prev_events(self, txn, events):
|
||||||
|
"""
|
||||||
|
For the given event, update the event edges table and forward and
|
||||||
|
backward extremities tables.
|
||||||
|
"""
|
||||||
|
self._simple_insert_many_txn(
|
||||||
|
txn,
|
||||||
|
table="event_edges",
|
||||||
|
values=[
|
||||||
|
{
|
||||||
|
"event_id": ev.event_id,
|
||||||
|
"prev_event_id": e_id,
|
||||||
|
"room_id": ev.room_id,
|
||||||
|
"is_state": False,
|
||||||
|
}
|
||||||
|
for ev in events
|
||||||
|
for e_id, _ in ev.prev_events
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
self._update_backward_extremeties(txn, events)
|
||||||
|
|
||||||
|
def _update_backward_extremeties(self, txn, events):
|
||||||
|
"""Updates the event_backward_extremities tables based on the new/updated
|
||||||
|
events being persisted.
|
||||||
|
|
||||||
|
This is called for new events *and* for events that were outliers, but
|
||||||
|
are now being persisted as non-outliers.
|
||||||
|
|
||||||
|
Forward extremities are handled when we first start persisting the events.
|
||||||
|
"""
|
||||||
|
events_by_room = {}
|
||||||
|
for ev in events:
|
||||||
|
events_by_room.setdefault(ev.room_id, []).append(ev)
|
||||||
|
|
||||||
|
query = (
|
||||||
|
"INSERT INTO event_backward_extremities (event_id, room_id)"
|
||||||
|
" SELECT ?, ? WHERE NOT EXISTS ("
|
||||||
|
" SELECT 1 FROM event_backward_extremities"
|
||||||
|
" WHERE event_id = ? AND room_id = ?"
|
||||||
|
" )"
|
||||||
|
" AND NOT EXISTS ("
|
||||||
|
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
|
||||||
|
" AND outlier = ?"
|
||||||
|
" )"
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.executemany(query, [
|
||||||
|
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
|
||||||
|
for ev in events for e_id, _ in ev.prev_events
|
||||||
|
if not ev.internal_metadata.is_outlier()
|
||||||
|
])
|
||||||
|
|
||||||
|
query = (
|
||||||
|
"DELETE FROM event_backward_extremities"
|
||||||
|
" WHERE event_id = ? AND room_id = ?"
|
||||||
|
)
|
||||||
|
txn.executemany(
|
||||||
|
query,
|
||||||
|
[
|
||||||
|
(ev.event_id, ev.room_id) for ev in events
|
||||||
|
if not ev.internal_metadata.is_outlier()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def _delete_old_forward_extrem_cache(self):
|
||||||
|
def _delete_old_forward_extrem_cache_txn(txn):
|
||||||
|
# Delete entries older than a month, while making sure we don't delete
|
||||||
|
# the only entries for a room.
|
||||||
|
sql = ("""
|
||||||
|
DELETE FROM stream_ordering_to_exterm
|
||||||
|
WHERE
|
||||||
|
room_id IN (
|
||||||
|
SELECT room_id
|
||||||
|
FROM stream_ordering_to_exterm
|
||||||
|
WHERE stream_ordering > ?
|
||||||
|
) AND stream_ordering < ?
|
||||||
|
""")
|
||||||
|
txn.execute(
|
||||||
|
sql,
|
||||||
|
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
|
||||||
|
)
|
||||||
|
return self.runInteraction(
|
||||||
|
"_delete_old_forward_extrem_cache",
|
||||||
|
_delete_old_forward_extrem_cache_txn
|
||||||
|
)
|
||||||
|
|
||||||
def clean_room_for_join(self, room_id):
|
def clean_room_for_join(self, room_id):
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"clean_room_for_join",
|
"clean_room_for_join",
|
||||||
|
|
|
@ -22,9 +22,7 @@ from synapse.crypto.event_signing import compute_event_reference_hash
|
||||||
from synapse.util.caches.descriptors import cached, cachedList
|
from synapse.util.caches.descriptors import cached, cachedList
|
||||||
|
|
||||||
|
|
||||||
class SignatureStore(SQLBaseStore):
|
class SignatureWorkerStore(SQLBaseStore):
|
||||||
"""Persistence for event signatures and hashes"""
|
|
||||||
|
|
||||||
@cached()
|
@cached()
|
||||||
def get_event_reference_hash(self, event_id):
|
def get_event_reference_hash(self, event_id):
|
||||||
return self._get_event_reference_hashes_txn(event_id)
|
return self._get_event_reference_hashes_txn(event_id)
|
||||||
|
@ -74,6 +72,10 @@ class SignatureStore(SQLBaseStore):
|
||||||
txn.execute(query, (event_id, ))
|
txn.execute(query, (event_id, ))
|
||||||
return {k: v for k, v in txn}
|
return {k: v for k, v in txn}
|
||||||
|
|
||||||
|
|
||||||
|
class SignatureStore(SignatureWorkerStore):
|
||||||
|
"""Persistence for event signatures and hashes"""
|
||||||
|
|
||||||
def _store_event_reference_hashes_txn(self, txn, events):
|
def _store_event_reference_hashes_txn(self, txn, events):
|
||||||
"""Store a hash for a PDU
|
"""Store a hash for a PDU
|
||||||
Args:
|
Args:
|
||||||
|
|
Loading…
Reference in New Issue