From 31104c6e9c0d17087eab736ef162971e35057ddd Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 18 Apr 2022 16:23:24 +0200 Subject: [PATCH] Implemented bandwidth-based announce propagation calculation --- RNS/Interfaces/Interface.py | 58 ++++++++++++++++++++++++++++ RNS/Reticulum.py | 3 ++ RNS/Transport.py | 77 +++++++++++++++++++++++++++++++++---- 3 files changed, 131 insertions(+), 7 deletions(-) diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index 39fe672..f2637a3 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -21,6 +21,8 @@ # SOFTWARE. import RNS +import time +import threading class Interface: IN = False @@ -41,5 +43,61 @@ class Interface: def get_hash(self): return RNS.Identity.full_hash(str(self).encode("utf-8")) + # TODO: Clean + # def bogus_queue(self): + # self.announce_queue = [] + + # import random + # import time + + # now = time.time() + # random.seed(45) + # for i in range(1,32): + # entry = {"time": now+i*3, "hops":random.randint(4,16), "raw": str("bogus_data_"+str(i)).encode("utf-8")} + # self.announce_queue.append(entry) + + def process_announce_queue(self): + if not hasattr(self, "announce_cap"): + self.announce_cap = RNS.Reticulum.ANNOUNCE_CAP + + if hasattr(self, "announce_queue"): + # TODO: Clean + # RNS.log("Processing announce queue on "+str(self), RNS.LOG_DEBUG) + try: + now = time.time() + stale = [] + for a in self.announce_queue: + if now > a["time"]+RNS.Reticulum.QUEUED_ANNOUNCE_LIFE: + stale.append(a) + + for s in stale: + self.announce_queue.remove(s) + + if len(self.announce_queue) > 0: + min_hops = min(entry["hops"] for entry in self.announce_queue) + entries = list(filter(lambda e: e["hops"] == min_hops, self.announce_queue)) + entries.sort(key=lambda e: e["time"]) + selected = entries[0] + + now = time.time() + tx_time = (len(selected["raw"])*8) / self.bitrate + wait_time = (tx_time / self.announce_cap) + self.announce_allowed_at = now + wait_time + + self.processOutgoing(selected["raw"]) + self.announce_queue.remove(selected) + # TODO: Clean debug statements + # RNS.log("Sent queued announce with "+str(selected["hops"])+" hops on "+str(self)) + if len(self.announce_queue) > 0: + # TODO: Clean debug statements + # RNS.log("Still have "+str(len(self.announce_queue))+" announces in queue, scheduling next for tx in "+str(round(wait_time*1000,6))+"ms", RNS.LOG_DEBUG) + timer = threading.Timer(wait_time, self.process_announce_queue) + timer.start() + + except Exception as e: + self.announce_queue = [] + RNS.log("Error while processing announce queue on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.log("The announce queue for this interface has been cleared.", RNS.LOG_ERROR) + def detach(self): pass \ No newline at end of file diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 8b83a94..d1c12b5 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -85,6 +85,9 @@ class Reticulum: the default value. """ + MAX_QUEUED_ANNOUNCES = 16384 + QUEUED_ANNOUNCE_LIFE = 60*60*24 + ANNOUNCE_CAP = 2 """ The maximum percentage of interface bandwidth that, at any given time, diff --git a/RNS/Transport.py b/RNS/Transport.py index 25a3b89..74a6f64 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -56,7 +56,7 @@ class Transport: PATHFINDER_D = 10 # Fixed per-hop delay PATHFINDER_R = 1 # Retransmit retries PATHFINDER_T = 10 # Retry grace period - PATHFINDER_RW = 5 # Random window for announce rebroadcast + PATHFINDER_RW = 4 # Random window for announce rebroadcast PATHFINDER_E = 60*60*24*7 # Path expiration of one week AP_PATH_TIME = 60*60*24 # Path expiration of one day for Access Point paths @@ -82,6 +82,7 @@ class Transport: # TODO: "destination_table" should really be renamed to "path_table" # Notes on memory usage: 1 megabyte of memory can store approximately # 55.100 path table entries or approximately 22.300 link table entries. + announce_table = {} # A table for storing announces currently waiting to be retransmitted destination_table = {} # A lookup table containing the next hop to a given destination reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies @@ -302,7 +303,6 @@ class Transport: break else: if time.time() > announce_entry[1]: - # announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW announce_entry[2] += 1 packet = announce_entry[5] @@ -476,12 +476,14 @@ class Transport: @staticmethod def outbound(packet): while (Transport.jobs_running): + # TODO: Profile actual impact here on faster links sleep(0.01) Transport.jobs_locked = True # TODO: This updateHash call might be redundant packet.update_hash() sent = False + outbound_time = time.time() # Check if we have a known path for the destination in the path table if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination.type != RNS.Destination.PLAIN and packet.destination.type != RNS.Destination.GROUP and packet.destination_hash in Transport.destination_table: @@ -550,17 +552,75 @@ class Transport: should_transmit = False if packet.packet_type == RNS.Packet.ANNOUNCE: - if packet.attached_interface == None and interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: - RNS.log("Blocking announce broadcast on "+str(interface)+" due to AP mode", RNS.LOG_DEBUG) - should_transmit = False - # TODO: Add capacity limit based on interface bandwidth + if packet.attached_interface == None: + if interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: + RNS.log("Blocking announce broadcast on "+str(interface)+" due to AP mode", RNS.LOG_EXTREME) + should_transmit = False + + else: + # TODO: Remove logging statements + if packet.hops > 0: + + if not hasattr(interface, "announce_cap"): + interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP + + if not hasattr(interface, "announce_allowed_at"): + interface.announce_allowed_at = 0 + + if not hasattr(interface, "announce_queue"): + interface.announce_queue = [] + + queued_announces = True if len(interface.announce_queue) > 0 else False + + if not queued_announces and outbound_time > interface.announce_allowed_at: + tx_time = (len(packet.raw)*8) / interface.bitrate + wait_time = (tx_time / interface.announce_cap) + interface.announce_allowed_at = outbound_time + wait_time + + else: + should_transmit = False + if not len(interface.announce_queue) >= RNS.Reticulum.MAX_QUEUED_ANNOUNCES: + entry = {"time": outbound_time, "hops": packet.hops, "raw": packet.raw} + queued_announces = True if len(interface.announce_queue) > 0 else False + interface.announce_queue.append(entry) + # TODO: Clean + # RNS.log("Added announce to queue on "+str(interface), RNS.LOG_DEBUG) + + if not queued_announces: + # TODO: Clean + # RNS.log("Interface "+str(interface)+" still waiting for announce allowance", RNS.LOG_DEBUG) + wait_time = max(interface.announce_allowed_at - time.time(), 0) + timer = threading.Timer(wait_time, interface.process_announce_queue) + timer.start() + # TODO: Clean + # RNS.log("Triggering run in "+str(wait_time)+" seconds", RNS.LOG_DEBUG) + else: + # TODO: Clean + # RNS.log("Interface "+str(interface)+" has announces in queue, adding directly to it", RNS.LOG_DEBUG) + pass + + else: + # TODO: Clean + # RNS.log("Not retransmitting announce on "+str(interface)+" since the queue is full", RNS.LOG_DEBUG) + pass + + else: + # TODO: Clean + # RNS.log("Skipping announce cap calculations for "+str(packet.hops)+" hop packet", RNS.LOG_DEBUG) + pass if should_transmit: if not stored_hash: Transport.packet_hashlist.append(packet.packet_hash) stored_hash = True - interface.processOutgoing(packet.raw) + def send_packet(): + interface.processOutgoing(packet.raw) + + thread = threading.Thread(target=send_packet) + thread.daemon = True + thread.start() + sent = True if sent: @@ -641,6 +701,9 @@ class Transport: def inbound(raw, interface=None): while (Transport.jobs_running): sleep(0.01) + + if Transport.identity == None: + return Transport.jobs_locked = True