Move pdu and event persistence into a single persist_event function
This commit is contained in:
parent
66a4d33524
commit
64e2a5d58e
|
@ -25,7 +25,6 @@ from .units import Pdu
|
||||||
|
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
|
|
||||||
import copy
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
@ -48,9 +47,8 @@ class PduActions(object):
|
||||||
Returns:
|
Returns:
|
||||||
Deferred
|
Deferred
|
||||||
"""
|
"""
|
||||||
return self._persist(pdu)
|
return self.store.persist_event(pdu=pdu)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
@log_function
|
@log_function
|
||||||
def persist_outgoing(self, pdu):
|
def persist_outgoing(self, pdu):
|
||||||
""" Persists the given `Pdu` that this home server created.
|
""" Persists the given `Pdu` that this home server created.
|
||||||
|
@ -58,9 +56,7 @@ class PduActions(object):
|
||||||
Returns:
|
Returns:
|
||||||
Deferred
|
Deferred
|
||||||
"""
|
"""
|
||||||
ret = yield self._persist(pdu)
|
return self.store.persist_event(pdu=pdu)
|
||||||
|
|
||||||
defer.returnValue(ret)
|
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def mark_as_processed(self, pdu):
|
def mark_as_processed(self, pdu):
|
||||||
|
@ -143,28 +139,6 @@ class PduActions(object):
|
||||||
depth=pdu.depth
|
depth=pdu.depth
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
@log_function
|
|
||||||
def _persist(self, pdu):
|
|
||||||
kwargs = copy.copy(pdu.__dict__)
|
|
||||||
unrec_keys = copy.copy(pdu.unrecognized_keys)
|
|
||||||
del kwargs["content"]
|
|
||||||
kwargs["content_json"] = json.dumps(pdu.content)
|
|
||||||
kwargs["unrecognized_keys"] = json.dumps(unrec_keys)
|
|
||||||
|
|
||||||
logger.debug("Persisting: %s", repr(kwargs))
|
|
||||||
|
|
||||||
if pdu.is_state:
|
|
||||||
ret = yield self.store.persist_state(**kwargs)
|
|
||||||
else:
|
|
||||||
ret = yield self.store.persist_pdu(**kwargs)
|
|
||||||
|
|
||||||
yield self.store.update_min_depth_for_context(
|
|
||||||
pdu.context, pdu.depth
|
|
||||||
)
|
|
||||||
|
|
||||||
defer.returnValue(ret)
|
|
||||||
|
|
||||||
|
|
||||||
class TransactionActions(object):
|
class TransactionActions(object):
|
||||||
""" Defines persistence actions that relate to handling Transactions.
|
""" Defines persistence actions that relate to handling Transactions.
|
||||||
|
|
|
@ -57,7 +57,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def persist_event(self, event, backfilled=False):
|
def persist_event(self, event=None, backfilled=False, pdu=None):
|
||||||
# FIXME (erikj): This should be removed when we start amalgamating
|
# FIXME (erikj): This should be removed when we start amalgamating
|
||||||
# event and pdu storage
|
# event and pdu storage
|
||||||
yield self.hs.get_federation().fill_out_prev_events(event)
|
yield self.hs.get_federation().fill_out_prev_events(event)
|
||||||
|
@ -70,7 +70,11 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
stream_ordering = self.min_token
|
stream_ordering = self.min_token
|
||||||
|
|
||||||
latest = yield self._db_pool.runInteraction(
|
latest = yield self._db_pool.runInteraction(
|
||||||
self._persist_event_txn, event, backfilled, stream_ordering
|
self._persist_pdu_event_txn,
|
||||||
|
pdu=pdu,
|
||||||
|
event=event,
|
||||||
|
backfilled=backfilled,
|
||||||
|
stream_ordering=stream_ordering,
|
||||||
)
|
)
|
||||||
defer.returnValue(latest)
|
defer.returnValue(latest)
|
||||||
|
|
||||||
|
@ -92,6 +96,30 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
event = self._parse_event_from_row(events_dict)
|
event = self._parse_event_from_row(events_dict)
|
||||||
defer.returnValue(event)
|
defer.returnValue(event)
|
||||||
|
|
||||||
|
def _persist_pdu_event_txn(self, txn, pdu=None, event=None,
|
||||||
|
backfilled=False, stream_ordering=None):
|
||||||
|
if pdu is not None:
|
||||||
|
self._persist_pdu_txn(txn, pdu)
|
||||||
|
if event is not None:
|
||||||
|
self._persist_event_txn(txn, event, backfilled, stream_ordering)
|
||||||
|
|
||||||
|
def _persist_pdu_txn(self, txn, pdu):
|
||||||
|
cols = dict(pdu.__dict__)
|
||||||
|
unrec_keys = dict(pdu.unrecognized_keys)
|
||||||
|
del cols["content"]
|
||||||
|
del cols["prev_pdus"]
|
||||||
|
cols["content_json"] = json.dumps(pdu.content)
|
||||||
|
cols["unrecognized_keys"] = json.dumps(unrec_keys)
|
||||||
|
|
||||||
|
logger.debug("Persisting: %s", repr(cols))
|
||||||
|
|
||||||
|
if pdu.is_state:
|
||||||
|
self._persist_state_txn(txn, pdu.prev_pdus, cols)
|
||||||
|
else:
|
||||||
|
self._persist_pdu_txn(txn, pdu.prev_pdus, cols)
|
||||||
|
|
||||||
|
self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None):
|
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None):
|
||||||
if event.type == RoomMemberEvent.TYPE:
|
if event.type == RoomMemberEvent.TYPE:
|
||||||
|
|
|
@ -114,7 +114,7 @@ class PduStore(SQLBaseStore):
|
||||||
|
|
||||||
return self._get_pdu_tuples(txn, res)
|
return self._get_pdu_tuples(txn, res)
|
||||||
|
|
||||||
def persist_pdu(self, prev_pdus, **cols):
|
def _persist_pdu_txn(self, txn, prev_pdus, cols):
|
||||||
"""Inserts a (non-state) PDU into the database.
|
"""Inserts a (non-state) PDU into the database.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -122,11 +122,6 @@ class PduStore(SQLBaseStore):
|
||||||
prev_pdus (list)
|
prev_pdus (list)
|
||||||
**cols: The columns to insert into the PdusTable.
|
**cols: The columns to insert into the PdusTable.
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
|
||||||
self._persist_pdu, prev_pdus, cols
|
|
||||||
)
|
|
||||||
|
|
||||||
def _persist_pdu(self, txn, prev_pdus, cols):
|
|
||||||
entry = PdusTable.EntryType(
|
entry = PdusTable.EntryType(
|
||||||
**{k: cols.get(k, None) for k in PdusTable.fields}
|
**{k: cols.get(k, None) for k in PdusTable.fields}
|
||||||
)
|
)
|
||||||
|
@ -262,7 +257,7 @@ class PduStore(SQLBaseStore):
|
||||||
|
|
||||||
return row[0] if row else None
|
return row[0] if row else None
|
||||||
|
|
||||||
def update_min_depth_for_context(self, context, depth):
|
def _update_min_depth_for_context_txn(self, txn, context, depth):
|
||||||
"""Update the minimum `depth` of the given context, which is the line
|
"""Update the minimum `depth` of the given context, which is the line
|
||||||
on which we stop backfilling backwards.
|
on which we stop backfilling backwards.
|
||||||
|
|
||||||
|
@ -270,11 +265,6 @@ class PduStore(SQLBaseStore):
|
||||||
context (str)
|
context (str)
|
||||||
depth (int)
|
depth (int)
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
|
||||||
self._update_min_depth_for_context, context, depth
|
|
||||||
)
|
|
||||||
|
|
||||||
def _update_min_depth_for_context(self, txn, context, depth):
|
|
||||||
min_depth = self._get_min_depth_interaction(txn, context)
|
min_depth = self._get_min_depth_interaction(txn, context)
|
||||||
|
|
||||||
do_insert = depth < min_depth if min_depth else True
|
do_insert = depth < min_depth if min_depth else True
|
||||||
|
@ -485,7 +475,7 @@ class StatePduStore(SQLBaseStore):
|
||||||
"""A collection of queries for handling state PDUs.
|
"""A collection of queries for handling state PDUs.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def persist_state(self, prev_pdus, **cols):
|
def _persist_state_txn(self, txn, prev_pdus, cols):
|
||||||
"""Inserts a state PDU into the database
|
"""Inserts a state PDU into the database
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -493,12 +483,6 @@ class StatePduStore(SQLBaseStore):
|
||||||
prev_pdus (list)
|
prev_pdus (list)
|
||||||
**cols: The columns to insert into the PdusTable and StatePdusTable
|
**cols: The columns to insert into the PdusTable and StatePdusTable
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
|
||||||
self._persist_state, prev_pdus, cols
|
|
||||||
)
|
|
||||||
|
|
||||||
def _persist_state(self, txn, prev_pdus, cols):
|
|
||||||
pdu_entry = PdusTable.EntryType(
|
pdu_entry = PdusTable.EntryType(
|
||||||
**{k: cols.get(k, None) for k in PdusTable.fields}
|
**{k: cols.get(k, None) for k in PdusTable.fields}
|
||||||
)
|
)
|
||||||
|
|
|
@ -58,7 +58,7 @@ class FederationTestCase(unittest.TestCase):
|
||||||
self.mock_persistence = Mock(spec=[
|
self.mock_persistence = Mock(spec=[
|
||||||
"get_current_state_for_context",
|
"get_current_state_for_context",
|
||||||
"get_pdu",
|
"get_pdu",
|
||||||
"persist_pdu",
|
"persist_event",
|
||||||
"update_min_depth_for_context",
|
"update_min_depth_for_context",
|
||||||
"prep_send_transaction",
|
"prep_send_transaction",
|
||||||
"delivered_txn",
|
"delivered_txn",
|
||||||
|
|
Loading…
Reference in New Issue