Make federation send queue take the current position
This commit is contained in:
parent
7984708a55
commit
11880103b1
|
@ -220,10 +220,15 @@ class FederationRemoteSendQueue(object):
|
||||||
def get_current_token(self):
|
def get_current_token(self):
|
||||||
return self.pos - 1
|
return self.pos - 1
|
||||||
|
|
||||||
def get_replication_rows(self, token, limit, federation_ack=None):
|
def federation_ack(self, token):
|
||||||
"""
|
self._clear_queue_before_pos(token)
|
||||||
|
|
||||||
|
def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
|
||||||
|
"""Get rows to be sent over federation between the two tokens
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
token (int)
|
from_token (int)
|
||||||
|
to_token(int)
|
||||||
limit (int)
|
limit (int)
|
||||||
federation_ack (int): Optional. The position where the worker is
|
federation_ack (int): Optional. The position where the worker is
|
||||||
explicitly acknowledged it has handled. Allows us to drop
|
explicitly acknowledged it has handled. Allows us to drop
|
||||||
|
@ -232,8 +237,8 @@ class FederationRemoteSendQueue(object):
|
||||||
# TODO: Handle limit.
|
# TODO: Handle limit.
|
||||||
|
|
||||||
# To handle restarts where we wrap around
|
# To handle restarts where we wrap around
|
||||||
if token > self.pos:
|
if from_token > self.pos:
|
||||||
token = -1
|
from_token = -1
|
||||||
|
|
||||||
rows = []
|
rows = []
|
||||||
|
|
||||||
|
@ -244,10 +249,11 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
# Fetch changed presence
|
# Fetch changed presence
|
||||||
keys = self.presence_changed.keys()
|
keys = self.presence_changed.keys()
|
||||||
i = keys.bisect_right(token)
|
i = keys.bisect_right(from_token)
|
||||||
|
j = keys.bisect_right(to_token) + 1
|
||||||
dest_user_ids = set(
|
dest_user_ids = set(
|
||||||
(pos, dest_user_id)
|
(pos, dest_user_id)
|
||||||
for pos in keys[i:]
|
for pos in keys[i:j]
|
||||||
for dest_user_id in self.presence_changed[pos]
|
for dest_user_id in self.presence_changed[pos]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -259,8 +265,9 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
# Fetch changes keyed edus
|
# Fetch changes keyed edus
|
||||||
keys = self.keyed_edu_changed.keys()
|
keys = self.keyed_edu_changed.keys()
|
||||||
i = keys.bisect_right(token)
|
i = keys.bisect_right(from_token)
|
||||||
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
|
j = keys.bisect_right(to_token) + 1
|
||||||
|
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, (destination, edu_key)) in keyed_edus:
|
for (pos, (destination, edu_key)) in keyed_edus:
|
||||||
rows.append(
|
rows.append(
|
||||||
|
@ -272,16 +279,18 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
# Fetch changed edus
|
# Fetch changed edus
|
||||||
keys = self.edus.keys()
|
keys = self.edus.keys()
|
||||||
i = keys.bisect_right(token)
|
i = keys.bisect_right(from_token)
|
||||||
edus = set((k, self.edus[k]) for k in keys[i:])
|
j = keys.bisect_right(to_token) + 1
|
||||||
|
edus = set((k, self.edus[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, edu) in edus:
|
for (pos, edu) in edus:
|
||||||
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
|
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
|
||||||
|
|
||||||
# Fetch changed failures
|
# Fetch changed failures
|
||||||
keys = self.failures.keys()
|
keys = self.failures.keys()
|
||||||
i = keys.bisect_right(token)
|
i = keys.bisect_right(from_token)
|
||||||
failures = set((k, self.failures[k]) for k in keys[i:])
|
j = keys.bisect_right(to_token) + 1
|
||||||
|
failures = set((k, self.failures[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, (destination, failure)) in failures:
|
for (pos, (destination, failure)) in failures:
|
||||||
rows.append((pos, FAILURE_TYPE, ujson.dumps({
|
rows.append((pos, FAILURE_TYPE, ujson.dumps({
|
||||||
|
@ -291,8 +300,9 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
# Fetch changed device messages
|
# Fetch changed device messages
|
||||||
keys = self.device_messages.keys()
|
keys = self.device_messages.keys()
|
||||||
i = keys.bisect_right(token)
|
i = keys.bisect_right(from_token)
|
||||||
device_messages = set((k, self.device_messages[k]) for k in keys[i:])
|
j = keys.bisect_right(to_token) + 1
|
||||||
|
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, destination) in device_messages:
|
for (pos, destination) in device_messages:
|
||||||
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
|
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
|
||||||
|
|
|
@ -489,7 +489,7 @@ class ReplicationResource(Resource):
|
||||||
|
|
||||||
if federation is not None and federation != current_position:
|
if federation is not None and federation != current_position:
|
||||||
federation_rows = self.federation_sender.get_replication_rows(
|
federation_rows = self.federation_sender.get_replication_rows(
|
||||||
federation, limit, federation_ack=federation_ack,
|
federation, current_position, limit, federation_ack=federation_ack,
|
||||||
)
|
)
|
||||||
upto_token = _position_from_rows(federation_rows, current_position)
|
upto_token = _position_from_rows(federation_rows, current_position)
|
||||||
writer.write_header_and_rows("federation", federation_rows, (
|
writer.write_header_and_rows("federation", federation_rows, (
|
||||||
|
|
Loading…
Reference in New Issue