Explicit federation ack
This commit is contained in:
parent
54fed21c04
commit
4c79a63fd7
|
@ -235,7 +235,10 @@ class FederationSenderHandler(object):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
stream_id = yield self.store.get_federation_out_pos("federation")
|
stream_id = yield self.store.get_federation_out_pos("federation")
|
||||||
defer.returnValue({"federation": stream_id})
|
defer.returnValue({
|
||||||
|
"federation": stream_id,
|
||||||
|
"federation_ack": stream_id,
|
||||||
|
})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def process_replication(self, result):
|
def process_replication(self, result):
|
||||||
|
|
|
@ -213,7 +213,15 @@ class FederationRemoteSendQueue(object):
|
||||||
def get_current_token(self):
|
def get_current_token(self):
|
||||||
return self.pos - 1
|
return self.pos - 1
|
||||||
|
|
||||||
def get_replication_rows(self, token, limit):
|
def get_replication_rows(self, token, limit, federation_ack=None):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
token (int)
|
||||||
|
limit (int)
|
||||||
|
federation_ack (int): Optional. The position where the worker is
|
||||||
|
explicitly acknowledged it has handled. Allows us to drop
|
||||||
|
data from before that point
|
||||||
|
"""
|
||||||
# TODO: Handle limit.
|
# TODO: Handle limit.
|
||||||
|
|
||||||
# To handle restarts where we wrap around
|
# To handle restarts where we wrap around
|
||||||
|
@ -224,7 +232,8 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
# There should be only one reader, so lets delete everything its
|
# There should be only one reader, so lets delete everything its
|
||||||
# acknowledged its seen.
|
# acknowledged its seen.
|
||||||
self._clear_queue_before_pos(token)
|
if federation_ack:
|
||||||
|
self._clear_queue_before_pos(federation_ack)
|
||||||
|
|
||||||
# Fetch changed presence
|
# Fetch changed presence
|
||||||
keys = self.presence_changed.keys()
|
keys = self.presence_changed.keys()
|
||||||
|
|
|
@ -171,8 +171,13 @@ class ReplicationResource(Resource):
|
||||||
}
|
}
|
||||||
request_streams["streams"] = parse_string(request, "streams")
|
request_streams["streams"] = parse_string(request, "streams")
|
||||||
|
|
||||||
|
federation_ack = parse_integer(request, "federation_ack", None)
|
||||||
|
|
||||||
def replicate():
|
def replicate():
|
||||||
return self.replicate(request_streams, limit)
|
return self.replicate(
|
||||||
|
request_streams, limit,
|
||||||
|
federation_ack=federation_ack
|
||||||
|
)
|
||||||
|
|
||||||
writer = yield self.notifier.wait_for_replication(replicate, timeout)
|
writer = yield self.notifier.wait_for_replication(replicate, timeout)
|
||||||
result = writer.finish()
|
result = writer.finish()
|
||||||
|
@ -190,7 +195,7 @@ class ReplicationResource(Resource):
|
||||||
finish_request(request)
|
finish_request(request)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def replicate(self, request_streams, limit):
|
def replicate(self, request_streams, limit, federation_ack=None):
|
||||||
writer = _Writer()
|
writer = _Writer()
|
||||||
current_token = yield self.current_replication_token()
|
current_token = yield self.current_replication_token()
|
||||||
logger.debug("Replicating up to %r", current_token)
|
logger.debug("Replicating up to %r", current_token)
|
||||||
|
@ -209,7 +214,7 @@ class ReplicationResource(Resource):
|
||||||
yield self.caches(writer, current_token, limit, request_streams)
|
yield self.caches(writer, current_token, limit, request_streams)
|
||||||
yield self.to_device(writer, current_token, limit, request_streams)
|
yield self.to_device(writer, current_token, limit, request_streams)
|
||||||
yield self.public_rooms(writer, current_token, limit, request_streams)
|
yield self.public_rooms(writer, current_token, limit, request_streams)
|
||||||
self.federation(writer, current_token, limit, request_streams)
|
self.federation(writer, current_token, limit, request_streams, federation_ack)
|
||||||
self.streams(writer, current_token, request_streams)
|
self.streams(writer, current_token, request_streams)
|
||||||
|
|
||||||
logger.debug("Replicated %d rows", writer.total)
|
logger.debug("Replicated %d rows", writer.total)
|
||||||
|
@ -473,7 +478,7 @@ class ReplicationResource(Resource):
|
||||||
"position", "room_id", "visibility"
|
"position", "room_id", "visibility"
|
||||||
), position=upto_token)
|
), position=upto_token)
|
||||||
|
|
||||||
def federation(self, writer, current_token, limit, request_streams):
|
def federation(self, writer, current_token, limit, request_streams, federation_ack):
|
||||||
if self.config.send_federation:
|
if self.config.send_federation:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -483,7 +488,7 @@ class ReplicationResource(Resource):
|
||||||
|
|
||||||
if federation is not None and federation != current_position:
|
if federation is not None and federation != current_position:
|
||||||
federation_rows = self.federation_sender.get_replication_rows(
|
federation_rows = self.federation_sender.get_replication_rows(
|
||||||
federation, limit,
|
federation, limit, federation_ack=federation_ack,
|
||||||
)
|
)
|
||||||
upto_token = _position_from_rows(federation_rows, current_position)
|
upto_token = _position_from_rows(federation_rows, current_position)
|
||||||
writer.write_header_and_rows("federation", federation_rows, (
|
writer.write_header_and_rows("federation", federation_rows, (
|
||||||
|
|
Loading…
Reference in New Issue