Resource transfer and receipt management optimisation

This commit is contained in:
Mark Qvist 2020-05-13 13:08:48 +02:00
parent 9a00bd2704
commit be1ff8ec21
5 changed files with 90 additions and 40 deletions

View File

@ -55,7 +55,8 @@ def server(configpath, path):
def announceLoop(destination):
# Let the user know that everything is ready
RNS.log("File server "+RNS.prettyhexrep(destination.hash)+" running, hit enter to manually send an announce (Ctrl-C to quit)")
RNS.log("File server "+RNS.prettyhexrep(destination.hash)+" running")
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")
# We enter a loop that runs until the users exits.
# If the user hits enter, we will announce our server
@ -238,8 +239,9 @@ def download(filename):
# We just create a packet containing the
# requested filename, and send it down the
# link.
request_packet = RNS.Packet(server_link, filename.encode("utf-8"))
# link. We also specify we don't need a
# packet receipt.
request_packet = RNS.Packet(server_link, filename.encode("utf-8"), create_receipt=False)
request_packet.send()
print("")

View File

@ -30,9 +30,6 @@ class TCPClientInterface(Interface):
self.parent_interface = None
self.name = name
# TODO: Optimise so this is not needed
self.transmit_delay = 0.001
if connected_socket != None:
self.receives = True
self.target_ip = None
@ -50,6 +47,7 @@ class TCPClientInterface(Interface):
self.owner = owner
self.online = True
self.writing = False
if connected_socket == None:
thread = threading.Thread(target=self.read_loop)
@ -61,10 +59,14 @@ class TCPClientInterface(Interface):
def processOutgoing(self, data):
if self.online:
while self.writing:
time.sleep(0.01)
try:
time.sleep(self.transmit_delay)
self.writing = True
data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG])
self.socket.sendall(data)
self.writing = False
except Exception as e:
RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -78,8 +80,7 @@ class TCPClientInterface(Interface):
data_buffer = b""
while True:
data_in = self.socket.recv(1024)
data_in = self.socket.recv(4096)
if len(data_in) > 0:
pointer = 0
while pointer < len(data_in):

View File

@ -1,3 +1,4 @@
import threading
import struct
import math
import time
@ -55,7 +56,7 @@ class Packet:
# Default packet timeout
TIMEOUT = 60
def __init__(self, destination, data, packet_type = DATA, context = NONE, transport_type = RNS.Transport.BROADCAST, header_type = HEADER_1, transport_id = None, attached_interface = None):
def __init__(self, destination, data, packet_type = DATA, context = NONE, transport_type = RNS.Transport.BROADCAST, header_type = HEADER_1, transport_id = None, attached_interface = None, create_receipt = True):
if destination != None:
if transport_type == None:
transport_type = RNS.Transport.BROADCAST
@ -74,6 +75,7 @@ class Packet:
self.raw = None
self.packed = False
self.sent = False
self.create_receipt = create_receipt
self.receipt = None
self.fromPacked = False
else:
@ -257,6 +259,7 @@ class PacketReceipt:
FAILED = 0x00
SENT = 0x01
DELIVERED = 0x02
CULLED = 0xFF
EXPL_LENGTH = RNS.Identity.HASHLENGTH//8+RNS.Identity.SIGLENGTH//8
@ -366,10 +369,18 @@ class PacketReceipt:
def check_timeout(self):
if self.is_timed_out():
if self.timeout == -1:
self.status = PacketReceipt.CULLED
else:
self.status = PacketReceipt.FAILED
self.concluded_at = time.time()
if self.callbacks.timeout:
self.callbacks.timeout(self)
thread = threading.Thread(target=self.callbacks.timeout, args=(self,))
thread.setDaemon(True)
thread.start()
#self.callbacks.timeout(self)
# Set the timeout in seconds

View File

@ -549,7 +549,9 @@ class Resource:
last_map_hash = request_data[1:Resource.MAPHASH_LEN+1]
part_index = self.receiver_min_consecutive_height
for part in self.parts[self.receiver_min_consecutive_height:]:
search_start = part_index
search_end = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE
for part in self.parts[search_start:search_end]:
part_index += 1
if part.map_hash == last_map_hash:
break

View File

@ -41,6 +41,7 @@ class Transport:
LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2
REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes
DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week
MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of
interfaces = [] # All active interfaces
destinations = [] # All active destinations
@ -49,6 +50,7 @@ class Transport:
packet_hashlist = [] # A list of packet hashes for duplicate detection
receipts = [] # Receipts of all outgoing packets for proof processing
# TODO: "destination_table" should really be renamed to "path_table"
announce_table = {} # A table for storing announces currently waiting to be retransmitted
destination_table = {} # A lookup table containing the next hop to a given destination
reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies
@ -154,10 +156,13 @@ class Transport:
if not Transport.jobs_locked:
# Process receipts list for timed-out packets
if time.time() > Transport.receipts_last_checked+Transport.receipts_check_interval:
while len(Transport.receipts) > Transport.MAX_RECEIPTS:
culled_receipt = Transport.receipts.pop(0)
culled_receipt.timeout = -1
receipt.check_timeout()
for receipt in Transport.receipts:
thread = threading.Thread(target=receipt.check_timeout)
thread.setDaemon(True)
thread.start()
receipt.check_timeout()
if receipt.status != RNS.PacketReceipt.SENT:
Transport.receipts.remove(receipt)
@ -211,11 +216,30 @@ class Transport:
if time.time() > link_entry[0] + Transport.LINK_TIMEOUT:
Transport.link_table.pop(link_id)
# Cull the destination table
# Cull the path table
stale_paths = []
for destination_hash in Transport.destination_table:
destination_entry = Transport.destination_table[destination_hash]
attached_interface = destination_entry[5]
if time.time() > destination_entry[0] + Transport.DESTINATION_TIMEOUT:
stale_paths.append(destination_hash)
RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG)
if not attached_interface in Transport.interfaces:
stale_paths.append(destination_hash)
RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" was removed since the attached interface no longer exists", RNS.LOG_DEBUG)
i = 0
for destination_hash in stale_paths:
Transport.destination_table.pop(destination_hash)
i += 1
if i > 0:
if i == 1:
RNS.log("Removed "+str(i)+" path", RNS.LOG_DEBUG)
else:
RNS.log("Removed "+str(i)+" paths", RNS.LOG_DEBUG)
Transport.tables_last_culled = time.time()
@ -266,7 +290,7 @@ class Transport:
else:
# Broadcast packet on all outgoing interfaces, or relevant
# interface, if packet is for a link or has an attachede interface
# interface, if packet is for a link or has an attached interface
for interface in Transport.interfaces:
if interface.OUT:
should_transmit = True
@ -288,7 +312,17 @@ class Transport:
packet.sent = True
packet.sent_at = time.time()
if (packet.packet_type == RNS.Packet.DATA and packet.destination.type != RNS.Destination.PLAIN):
# Don't generate receipt if it has been explicitly disabled
if (packet.create_receipt == True and
# Only generate receipts for DATA packets
packet.packet_type == RNS.Packet.DATA and
# Don't generate receipts for PLAIN destinations
packet.destination.type != RNS.Destination.PLAIN and
# Don't generate receipts for link-related packets
not (packet.context >= RNS.Packet.KEEPALIVE and packet.context <= RNS.Packet.LRPROOF) and
# Don't generate receipts for resource packets
not (packet.context >= RNS.Packet.RESOURCE and packet.context <= RNS.Packet.RESOURCE_RCL)):
packet.receipt = RNS.PacketReceipt(packet)
Transport.receipts.append(packet.receipt)