Prevent replication wedging

This commit is contained in:
Andrew Morgan 2019-03-04 13:56:49 +00:00
parent 1beebe916f
commit 5f0c449dd5
1 changed files with 24 additions and 4 deletions

View File

@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
@defer.inlineCallbacks @defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token): def subscribe_to_stream(self, stream_name, token):
"""Subscribe the remote to a streams. """Subscribe the remote to a stream.
This invloves checking if they've missed anything and sending those This invloves checking if they've missed anything and sending those
updates down if they have. During that time new updates for the stream updates down if they have. During that time new updates for the stream
@ -478,10 +478,30 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Now we can send any updates that came in while we were subscribing # Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, []) pending_rdata = self.pending_rdata.pop(stream_name, [])
batch_updates = []
for token, update in pending_rdata: for token, update in pending_rdata:
# Only send updates newer than the current token # If the token is null, it is part of a batch update. Batches
if token > current_token: # are multiple updates that share a single token. To denote
self.send_command(RdataCommand(stream_name, token, update)) # this, the token is set to None for all tokens in the batch
# except for the last. If we find a None token, we keep looking
# through tokens until we find one that is not None and then
# process all previous updates in the batch as if they had the
# final token.
if not token or len(batch_updates) > 0:
batch_updates.append(update)
if token and not token > current_token:
# This batch is older than current_token, dismiss
batch_updates = []
continue
if token:
# Send all updates that are part of this batch with the
# found token
for update in batch_updates:
self.send_command(RdataCommand(stream_name, token, update))
else:
# Only send updates newer than the current token
if token > current_token:
self.send_command(RdataCommand(stream_name, token, update))
# They're now fully subscribed # They're now fully subscribed
self.replication_streams.add(stream_name) self.replication_streams.add(stream_name)