Add a 'backfill room' button
This commit is contained in:
parent
598a1d8ff9
commit
75b6d982a0
|
@ -74,10 +74,18 @@ class FederationEventHandler(object):
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def backfill(self, room_id, limit):
|
def backfill(self, dest, room_id, limit):
|
||||||
# TODO: Work out which destinations to ask for backfill
|
pdus = yield self.replication_layer.backfill(dest, room_id, limit)
|
||||||
# self.replication_layer.backfill(dest, room_id, limit)
|
|
||||||
pass
|
if not pdus:
|
||||||
|
defer.returnValue([])
|
||||||
|
|
||||||
|
events = [
|
||||||
|
self.pdu_codec.event_from_pdu(pdu)
|
||||||
|
for pdu in pdus
|
||||||
|
]
|
||||||
|
|
||||||
|
defer.returnValue(events)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def get_state_for_room(self, destination, room_id):
|
def get_state_for_room(self, destination, room_id):
|
||||||
|
@ -87,7 +95,7 @@ class FederationEventHandler(object):
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_receive_pdu(self, pdu):
|
def on_receive_pdu(self, pdu, backfilled):
|
||||||
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
||||||
do auth checks and put it throught the StateHandler.
|
do auth checks and put it throught the StateHandler.
|
||||||
"""
|
"""
|
||||||
|
@ -95,7 +103,7 @@ class FederationEventHandler(object):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with (yield self.lock_manager.lock(pdu.context)):
|
with (yield self.lock_manager.lock(pdu.context)):
|
||||||
if event.is_state:
|
if event.is_state and not backfilled:
|
||||||
is_new_state = yield self.state_handler.handle_new_state(
|
is_new_state = yield self.state_handler.handle_new_state(
|
||||||
pdu
|
pdu
|
||||||
)
|
)
|
||||||
|
@ -104,7 +112,7 @@ class FederationEventHandler(object):
|
||||||
else:
|
else:
|
||||||
is_new_state = False
|
is_new_state = False
|
||||||
|
|
||||||
yield self.event_handler.on_receive(event, is_new_state)
|
yield self.event_handler.on_receive(event, is_new_state, backfilled)
|
||||||
|
|
||||||
except AuthError:
|
except AuthError:
|
||||||
# TODO: Implement something in federation that allows us to
|
# TODO: Implement something in federation that allows us to
|
||||||
|
|
|
@ -208,7 +208,7 @@ class ReplicationLayer(object):
|
||||||
|
|
||||||
pdus = [Pdu(outlier=False, **p) for p in transaction.pdus]
|
pdus = [Pdu(outlier=False, **p) for p in transaction.pdus]
|
||||||
for pdu in pdus:
|
for pdu in pdus:
|
||||||
yield self._handle_new_pdu(pdu)
|
yield self._handle_new_pdu(pdu, backfilled=True)
|
||||||
|
|
||||||
defer.returnValue(pdus)
|
defer.returnValue(pdus)
|
||||||
|
|
||||||
|
@ -415,7 +415,7 @@ class ReplicationLayer(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def _handle_new_pdu(self, pdu):
|
def _handle_new_pdu(self, pdu, backfilled=False):
|
||||||
# We reprocess pdus when we have seen them only as outliers
|
# We reprocess pdus when we have seen them only as outliers
|
||||||
existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
|
existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
|
||||||
|
|
||||||
|
@ -451,7 +451,10 @@ class ReplicationLayer(object):
|
||||||
# Persist the Pdu, but don't mark it as processed yet.
|
# Persist the Pdu, but don't mark it as processed yet.
|
||||||
yield self.pdu_actions.persist_received(pdu)
|
yield self.pdu_actions.persist_received(pdu)
|
||||||
|
|
||||||
ret = yield self.handler.on_receive_pdu(pdu)
|
if not backfilled:
|
||||||
|
ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled)
|
||||||
|
else:
|
||||||
|
ret = None
|
||||||
|
|
||||||
yield self.pdu_actions.mark_as_processed(pdu)
|
yield self.pdu_actions.mark_as_processed(pdu)
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_receive(self, event, is_new_state):
|
def on_receive(self, event, is_new_state, backfilled):
|
||||||
if hasattr(event, "state_key") and not is_new_state:
|
if hasattr(event, "state_key") and not is_new_state:
|
||||||
logger.debug("Ignoring old state.")
|
logger.debug("Ignoring old state.")
|
||||||
return
|
return
|
||||||
|
@ -70,6 +70,21 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
else:
|
else:
|
||||||
with (yield self.room_lock.lock(event.room_id)):
|
with (yield self.room_lock.lock(event.room_id)):
|
||||||
store_id = yield self.store.persist_event(event)
|
store_id = yield self.store.persist_event(event, backfilled)
|
||||||
|
|
||||||
yield self.notifier.on_new_room_event(event, store_id)
|
if not backfilled:
|
||||||
|
yield self.notifier.on_new_room_event(event, store_id)
|
||||||
|
|
||||||
|
|
||||||
|
@log_function
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def backfill(self, dest, room_id, limit):
|
||||||
|
events = yield self.hs.get_federation().backfill(dest, room_id, limit)
|
||||||
|
|
||||||
|
for event in events:
|
||||||
|
try:
|
||||||
|
yield self.store.persist_event(event, backfilled=True)
|
||||||
|
except:
|
||||||
|
logger.debug("Failed to persiste event: %s", event)
|
||||||
|
|
||||||
|
defer.returnValue(events)
|
||||||
|
|
|
@ -383,6 +383,21 @@ class RoomMessageListRestServlet(RestServlet):
|
||||||
defer.returnValue((200, msgs))
|
defer.returnValue((200, msgs))
|
||||||
|
|
||||||
|
|
||||||
|
class RoomTriggerBackfill(RestServlet):
|
||||||
|
PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/backfill$")
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_GET(self, request, room_id):
|
||||||
|
remote_server = urllib.unquote(request.args["remote"][0])
|
||||||
|
room_id = urllib.unquote(room_id)
|
||||||
|
limit = int(request.args["limit"][0])
|
||||||
|
|
||||||
|
handler = self.handlers.federation_handler
|
||||||
|
events = yield handler.backfill(remote_server, room_id, limit)
|
||||||
|
|
||||||
|
res = [event.get_dict() for event in events]
|
||||||
|
defer.returnValue((200, res))
|
||||||
|
|
||||||
def _parse_json(request):
|
def _parse_json(request):
|
||||||
try:
|
try:
|
||||||
content = json.loads(request.content.read())
|
content = json.loads(request.content.read())
|
||||||
|
@ -403,3 +418,4 @@ def register_servlets(hs, http_server):
|
||||||
RoomMemberListRestServlet(hs).register(http_server)
|
RoomMemberListRestServlet(hs).register(http_server)
|
||||||
RoomMessageListRestServlet(hs).register(http_server)
|
RoomMessageListRestServlet(hs).register(http_server)
|
||||||
JoinRoomAliasServlet(hs).register(http_server)
|
JoinRoomAliasServlet(hs).register(http_server)
|
||||||
|
RoomTriggerBackfill(hs).register(http_server)
|
||||||
|
|
|
@ -20,6 +20,8 @@ from synapse.api.events.room import (
|
||||||
RoomConfigEvent, RoomNameEvent,
|
RoomConfigEvent, RoomNameEvent,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from synapse.util.logutils import log_function
|
||||||
|
|
||||||
from .directory import DirectoryStore
|
from .directory import DirectoryStore
|
||||||
from .feedback import FeedbackStore
|
from .feedback import FeedbackStore
|
||||||
from .presence import PresenceStore
|
from .presence import PresenceStore
|
||||||
|
@ -32,9 +34,13 @@ from .pdu import StatePduStore, PduStore
|
||||||
from .transactions import TransactionStore
|
from .transactions import TransactionStore
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class DataStore(RoomMemberStore, RoomStore,
|
class DataStore(RoomMemberStore, RoomStore,
|
||||||
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
|
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
|
||||||
PresenceStore, PduStore, StatePduStore, TransactionStore,
|
PresenceStore, PduStore, StatePduStore, TransactionStore,
|
||||||
|
@ -49,6 +55,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
self.min_token = None
|
self.min_token = None
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@log_function
|
||||||
def persist_event(self, event, backfilled=False):
|
def persist_event(self, event, backfilled=False):
|
||||||
if event.type == RoomMemberEvent.TYPE:
|
if event.type == RoomMemberEvent.TYPE:
|
||||||
yield self._store_room_member(event)
|
yield self._store_room_member(event)
|
||||||
|
@ -83,6 +90,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
defer.returnValue(event)
|
defer.returnValue(event)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@log_function
|
||||||
def _store_event(self, event, backfilled):
|
def _store_event(self, event, backfilled):
|
||||||
# FIXME (erikj): This should be removed when we start amalgamating
|
# FIXME (erikj): This should be removed when we start amalgamating
|
||||||
# event and pdu storage
|
# event and pdu storage
|
||||||
|
@ -101,7 +109,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
if not self.min_token_deferred.called:
|
if not self.min_token_deferred.called:
|
||||||
yield self.min_token_deferred
|
yield self.min_token_deferred
|
||||||
self.min_token -= 1
|
self.min_token -= 1
|
||||||
vals["token_ordering"] = self.min_token
|
vals["stream_ordering"] = self.min_token
|
||||||
|
|
||||||
unrec = {
|
unrec = {
|
||||||
k: v
|
k: v
|
||||||
|
@ -110,7 +118,11 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
}
|
}
|
||||||
vals["unrecognized_keys"] = json.dumps(unrec)
|
vals["unrecognized_keys"] = json.dumps(unrec)
|
||||||
|
|
||||||
yield self._simple_insert("events", vals)
|
try:
|
||||||
|
yield self._simple_insert("events", vals)
|
||||||
|
except:
|
||||||
|
logger.exception("Failed to persist, probably duplicate")
|
||||||
|
return
|
||||||
|
|
||||||
if not backfilled and hasattr(event, "state_key"):
|
if not backfilled and hasattr(event, "state_key"):
|
||||||
vals = {
|
vals = {
|
||||||
|
@ -161,10 +173,12 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
def _get_min_token(self):
|
def _get_min_token(self):
|
||||||
row = yield self._execute(
|
row = yield self._execute(
|
||||||
None,
|
None,
|
||||||
"SELECT MIN(token_ordering) FROM events"
|
"SELECT MIN(stream_ordering) FROM events"
|
||||||
)
|
)
|
||||||
|
|
||||||
self.min_token = rows[0][0] if rows and rows[0] else 0
|
self.min_token = min(row[0][0], -1) if row and row[0] else -1
|
||||||
|
|
||||||
|
logger.debug("min_token is: %s", self.min_token)
|
||||||
|
|
||||||
defer.returnValue(self.min_token)
|
defer.returnValue(self.min_token)
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,8 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
from ._base import SQLBaseStore, Table, JoinHelper
|
from ._base import SQLBaseStore, Table, JoinHelper
|
||||||
|
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
|
@ -319,6 +321,7 @@ class PduStore(SQLBaseStore):
|
||||||
|
|
||||||
return [(row[0], row[1], row[2]) for row in results]
|
return [(row[0], row[1], row[2]) for row in results]
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def get_oldest_pdus_in_context(self, context):
|
def get_oldest_pdus_in_context(self, context):
|
||||||
"""Get a list of Pdus that we haven't backfilled beyond yet (and haven't
|
"""Get a list of Pdus that we haven't backfilled beyond yet (and haven't
|
||||||
seen). This list is used when we want to backfill backwards and is the
|
seen). This list is used when we want to backfill backwards and is the
|
||||||
|
@ -331,17 +334,14 @@ class PduStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
list: A list of PduIdTuple.
|
list: A list of PduIdTuple.
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
results = yield self._execute(
|
||||||
self._get_oldest_pdus_in_context, context
|
None,
|
||||||
)
|
|
||||||
|
|
||||||
def _get_oldest_pdus_in_context(self, txn, context):
|
|
||||||
txn.execute(
|
|
||||||
"SELECT pdu_id, origin FROM %(back)s WHERE context = ?"
|
"SELECT pdu_id, origin FROM %(back)s WHERE context = ?"
|
||||||
% {"back": PduBackwardExtremitiesTable.table_name, },
|
% {"back": PduBackwardExtremitiesTable.table_name, },
|
||||||
(context,)
|
context
|
||||||
)
|
)
|
||||||
return [PduIdTuple(i, o) for i, o in txn.fetchall()]
|
|
||||||
|
defer.returnValue([PduIdTuple(i, o) for i, o in results])
|
||||||
|
|
||||||
def is_pdu_new(self, pdu_id, origin, context, depth):
|
def is_pdu_new(self, pdu_id, origin, context, depth):
|
||||||
"""For a given Pdu, try and figure out if it's 'new', i.e., if it's
|
"""For a given Pdu, try and figure out if it's 'new', i.e., if it's
|
||||||
|
|
Loading…
Reference in New Issue