Actually don't store any PDUs
This commit is contained in:
parent
d84f5b30b8
commit
21fe249d62
|
@ -106,7 +106,6 @@ class ReplicationLayer(object):
|
||||||
|
|
||||||
self.query_handlers[query_type] = handler
|
self.query_handlers[query_type] = handler
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
@log_function
|
@log_function
|
||||||
def send_pdu(self, pdu):
|
def send_pdu(self, pdu):
|
||||||
"""Informs the replication layer about a new PDU generated within the
|
"""Informs the replication layer about a new PDU generated within the
|
||||||
|
@ -135,7 +134,7 @@ class ReplicationLayer(object):
|
||||||
logger.debug("[%s] Persisting PDU", pdu.pdu_id)
|
logger.debug("[%s] Persisting PDU", pdu.pdu_id)
|
||||||
|
|
||||||
# Save *before* trying to send
|
# Save *before* trying to send
|
||||||
yield self.store.persist_event(pdu=pdu)
|
# yield self.store.persist_event(pdu=pdu)
|
||||||
|
|
||||||
logger.debug("[%s] Persisted PDU", pdu.pdu_id)
|
logger.debug("[%s] Persisted PDU", pdu.pdu_id)
|
||||||
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id)
|
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id)
|
||||||
|
@ -359,12 +358,13 @@ class ReplicationLayer(object):
|
||||||
pdu_id, pdu_origin
|
pdu_id, pdu_origin
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
results = yield self.store.get_current_state_for_context(
|
raise NotImplementedError("Specify an event")
|
||||||
context
|
# results = yield self.store.get_current_state_for_context(
|
||||||
)
|
# context
|
||||||
pdus = [Pdu.from_pdu_tuple(p) for p in results]
|
# )
|
||||||
|
# pdus = [Pdu.from_pdu_tuple(p) for p in results]
|
||||||
logger.debug("Context returning %d results", len(pdus))
|
#
|
||||||
|
# logger.debug("Context returning %d results", len(pdus))
|
||||||
|
|
||||||
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
|
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
|
||||||
|
|
||||||
|
@ -456,7 +456,6 @@ class ReplicationLayer(object):
|
||||||
|
|
||||||
defer.returnValue(pdus)
|
defer.returnValue(pdus)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
@log_function
|
@log_function
|
||||||
def _get_persisted_pdu(self, pdu_id, pdu_origin):
|
def _get_persisted_pdu(self, pdu_id, pdu_origin):
|
||||||
""" Get a PDU from the database with given origin and id.
|
""" Get a PDU from the database with given origin and id.
|
||||||
|
@ -464,9 +463,7 @@ class ReplicationLayer(object):
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: Results in a `Pdu`.
|
Deferred: Results in a `Pdu`.
|
||||||
"""
|
"""
|
||||||
pdu_tuple = yield self.store.get_pdu(pdu_id, pdu_origin)
|
return self.handler.get_persisted_pdu(pdu_id, pdu_origin)
|
||||||
|
|
||||||
defer.returnValue(Pdu.from_pdu_tuple(pdu_tuple))
|
|
||||||
|
|
||||||
def _transaction_from_pdus(self, pdu_list):
|
def _transaction_from_pdus(self, pdu_list):
|
||||||
"""Returns a new Transaction containing the given PDUs suitable for
|
"""Returns a new Transaction containing the given PDUs suitable for
|
||||||
|
@ -502,7 +499,9 @@ class ReplicationLayer(object):
|
||||||
# Get missing pdus if necessary.
|
# Get missing pdus if necessary.
|
||||||
if not pdu.outlier:
|
if not pdu.outlier:
|
||||||
# We only backfill backwards to the min depth.
|
# We only backfill backwards to the min depth.
|
||||||
min_depth = yield self.store.get_min_depth_for_context(pdu.context)
|
min_depth = yield self.handler.get_min_depth_for_context(
|
||||||
|
pdu.context
|
||||||
|
)
|
||||||
|
|
||||||
if min_depth and pdu.depth > min_depth:
|
if min_depth and pdu.depth > min_depth:
|
||||||
for pdu_id, origin, hashes in pdu.prev_pdus:
|
for pdu_id, origin, hashes in pdu.prev_pdus:
|
||||||
|
@ -529,7 +528,7 @@ class ReplicationLayer(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Persist the Pdu, but don't mark it as processed yet.
|
# Persist the Pdu, but don't mark it as processed yet.
|
||||||
yield self.store.persist_event(pdu=pdu)
|
# yield self.store.persist_event(pdu=pdu)
|
||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
ret = yield self.handler.on_receive_pdu(
|
ret = yield self.handler.on_receive_pdu(
|
||||||
|
|
|
@ -415,6 +415,28 @@ class FederationHandler(BaseHandler):
|
||||||
for e in events
|
for e in events
|
||||||
])
|
])
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
@log_function
|
||||||
|
def get_persisted_pdu(self, pdu_id, origin):
|
||||||
|
""" Get a PDU from the database with given origin and id.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: Results in a `Pdu`.
|
||||||
|
"""
|
||||||
|
event = yield self.store.get_event(
|
||||||
|
self.pdu_codec.encode_event_id(pdu_id, origin),
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if event:
|
||||||
|
defer.returnValue(self.pdu_codec.pdu_from_event(event))
|
||||||
|
else:
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
|
@log_function
|
||||||
|
def get_min_depth_for_context(self, context):
|
||||||
|
return self.store.get_min_depth(context)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def _on_user_joined(self, user, room_id):
|
def _on_user_joined(self, user, room_id):
|
||||||
waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
|
waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
|
||||||
|
|
|
@ -78,6 +78,13 @@ class EventFederationStore(SQLBaseStore):
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def get_min_depth(self, room_id):
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_min_depth",
|
||||||
|
self._get_min_depth_interaction,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
|
||||||
def _get_min_depth_interaction(self, txn, room_id):
|
def _get_min_depth_interaction(self, txn, room_id):
|
||||||
min_depth = self._simple_select_one_onecol_txn(
|
min_depth = self._simple_select_one_onecol_txn(
|
||||||
txn,
|
txn,
|
||||||
|
|
Loading…
Reference in New Issue