diff --git a/RNS/Bundle.py b/RNS/Bundle.py index 76e51ff..0b7e17a 100644 --- a/RNS/Bundle.py +++ b/RNS/Bundle.py @@ -5,30 +5,41 @@ import os.path from .vendor import umsgpack as umsgpack class Bundle: + APP_NAME = "rnsbundle" + NO_CUSTODY = 0x00; TAKING_CUSTODY = 0x01; FULL_CUSTODY = 0x02; + REMOVED = 0xFF; CHUNK_SIZE = RNS.Resource.MAX_EFFICIENT_SIZE / 4 - def __init__(self, destination = None, data = None, filepath = None, advertised_id = None): - self.destination = destination - self.state = None - self.data_file = None - self.meta_file = None - self.id = None - self.storagepath = None - self.size = None - self.chunks = 0 - self.heartbeat = time.time() - self.resources = [] + def __init__(self, destination_hash = None, data = None, filepath = None, advertisement_data = None): + self.destination_hash = None + self.is_originator = False + self.state = None + self.data_file = None + self.meta_file = None + self.data_hash = None + self.id = None + self.storagepath = None + self.size = None + self.chunks = 0 + self.created = time.time() + self.heartbeat = created + self.transferring = False + + self.chunk_request_destination = None try: if data != None or filepath != None: + self.destination_hash = destination_hash + self.is_originator = True + self.id = RNS.Identity.getRandomHash() if filepath == None and data != None: try: - self.id = RNS.Identity.fullHash(data) + self.data_hash = RNS.Identity.fullHash(data) self.storagepath = Reticulum.bundlepath+"/"+self.id.hex() self.datapath = self.storagepath+"/data" self.metadatapath = self.storagepath+"/metadata" @@ -50,7 +61,7 @@ class Bundle: elif data == None and filepath != None: try: input_file = open(filepath, "rb") - self.id = RNS.Identity.fullHash(input_file.read()) + self.data_hash = RNS.Identity.fullHash(input_file.read()) input_file.seek(0) self.storagepath = RNS.Reticulum.bundlepath+"/"+self.id.hex() @@ -76,42 +87,141 @@ class Bundle: else: raise ValueError("Bundle cannot be created from data and file path at the same time") - elif advertised_id != None: + # Prepare file handles and metadata + self.size = os.stat(self.datapath).st_size + if self.size < 1: + raise IOError("Bundle data is empty") + self.data_file = open(self.datapath, "rb") + + elif advertisement_data != None: # Incoming bundle transfer + self.id = advertisement_data[1] + self.destination_hash = advertisement_data[0] self.state = Bundle.TAKING_CUSTODY + + self.storagepath = Reticulum.bundlepath+"/"+self.id.hex() + self.datapath = self.storagepath+"/data" + self.metadatapath = self.storagepath+"/metadata" + + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + else: + RNS.log("Warning, bundle already exists in storage location, recreating", RNS.LOG_DEBUG) + + self.data_file = open(self.datapath, "wb") + self.data_file.close() + + self.size = advertisement_data[2] + self.data_file = open(self.datapath, "wb") + else: raise ValueError("No source of data specified for bundle initialisation") - # Prepare file handles and metadata - self.size = os.stat(self.datapath).st_size - if self.size < 1: - raise IOError("Bundle data is empty") - self.chunks = ((self.size-1)//Bundle.CHUNK_SIZE)+1 - self.data_file = open(self.datapath, "rb") self.flush_metadata() + RNS.Transport.register_bundle(self) + except Exception as e: RNS.log("Error while initialising bundle. The contained exception was:", RNS.LOG_ERROR) RNS.log(str(e), RNS.LOG_ERROR) + # TODO: Remove raise e + def get_packed_metadata(self): + metadata = { + "destination": self.destination, + "heartbeat": self.heartbeat, + "size": self.size, + "is_originator": self.is_originator + "state": self.state} + + return umsgpack.packb(metadata) + def flush_metadata(self): try: - metadata = { - "destination": self.destination, - "heartbeat": self.heartbeat, - "size": self.size, - "chunks": self.chunks, - "state": self.state} - self.meta_file = open(self.metadatapath, "wb") - self.meta_file.write(umsgpack.packb(metadata)) + self.meta_file.write(self.get_packed_metadata()) self.meta_file.close() except Exception as e: RNS.log("Error while flushing metadata for bundle "+RNS.prettyhexrep(self.id), RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + def register_destinations(self, destination): + self.chunk_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "chunk", "request") + self.chunk_request_destination.link_established_callback(requester_connected) + + def advertise(self, advertise_to): + advertisement = [ + self.destination, + self.id, + self.size, + self.chunks] + + advertisement_data = umsgpack.packb(advertisement) + advertisement_packet = RNS.Packet(advertise_to, advertisement_data) + advertisement.packet.send() + + def requester_connected(self, link): + RNS.log("Requester connected to bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG) + link.packet_callback(chunk_request) + + def chunk_request(self, data, packet): + chunk_index = data[0] + RNS.log("Request for chunk "+str(chunk_index)+"/"+str(self.chunks)+" of bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG) + if chunk_index < self.chunks: + self.emit_resource(packet.link, chunk_index) + else: + RNS.log("Bundle transfer client requested chunk index out of range, tearing down link.", RNS.LOG_ERROR) + packet.link.teardown() + + def emit_resource(self, link, chunk_index): + if not self.transferring: + chunk_max = self.size-1 + chunk_start = chunk_index*CHUNK_SIZE + chunk_end = (chunk_index+1)*CHUNK_SIZE-1 + if chunk_end > chunk_max: + chunk_end = chunk_max + read_size = chunk_end - chunk_start + + try: + file = open(self.datapath, "rb") + file.seek(chunk_start) + data = file.read(read_size) + chunk_resource = RNS.Resource(data, link, callback=resource_concluded) + chunk_resource.chunk_index = chunk_index + except Exception as e: + RNS.log("Could not read bundle data from storage, the contained exception was:", RNS.LOG_ERROR) + RNS.log(str(e)) + link.teardown() + else: + RNS.log("Bundle chunk "+str(chunk_index)+" for "+RNS.prettyhexrep(self.id)+" was requested while a transfer was already in progress", RNS.LOG_ERROR) + + def resource_concluded(self, resource): + RNS.log("Concluded transferring chunk "+str(resource.chunk_index)+"/"+str(self.chunks)+" of bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG) + self.transferring = False + + def resign_custody(self): + self.state = Bundle.NO_CUSTODY + self.heartbeat = time.time() + + def custody_proof(self, proof): + pass + + def remove(self): + try: + self.state = Bundle.REMOVED + RNS.Transport.deregister_destination(self.chunk_request_destination) + os.unlink(self.datapath) + os.unlink(self.metadatapath) + os.rmdir(self.storagepath) + except Exception as e: + RNS.log("Error while removing bundle from storage, the contained exception was:", RNS.LOG_ERROR) + RNS.log(str(e), RNS.LOG_ERROR) + + + + class BundleAdvertisement: pass \ No newline at end of file diff --git a/RNS/Resource.py b/RNS/Resource.py index 7e62ec2..3455005 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -697,4 +697,4 @@ class ResourceAdvertisement: adv.e = True if (adv.f & 0x01) == 0x01 else False adv.c = True if ((adv.f >> 1) & 0x01) == 0x01 else False - return adv + return adv \ No newline at end of file diff --git a/RNS/Transport.py b/RNS/Transport.py index fde1cd0..8a6bc0c 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -33,28 +33,32 @@ class Transport: # TODO: Calculate an optimal number for this in # various situations - LOCAL_REBROADCASTS_MAX = 2 # How many local rebroadcasts of an announce is allowed + LOCAL_REBROADCASTS_MAX = 2 # How many local rebroadcasts of an announce is allowed - PATH_REQUEST_GRACE = 0.35 # Grace time before a path announcement is made, allows directly reachable peers to respond first - PATH_REQUEST_RW = 2 # Path request random window + PATH_REQUEST_GRACE = 0.35 # Grace time before a path announcement is made, allows directly reachable peers to respond first + PATH_REQUEST_RW = 2 # Path request random window LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2 - REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes + REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of - interfaces = [] # All active interfaces - destinations = [] # All active destinations - pending_links = [] # Links that are being established - active_links = [] # Links that are active - packet_hashlist = [] # A list of packet hashes for duplicate detection - receipts = [] # Receipts of all outgoing packets for proof processing + BUNDLE_TIMEOUT = 60*60*24*7 # Bundles time out after 7 days + BUNDLE_INTERVAL = 180 # How often we should attempt to transfer bundles to their next hop + + interfaces = [] # All active interfaces + destinations = [] # All active destinations + pending_links = [] # Links that are being established + active_links = [] # Links that are active + packet_hashlist = [] # A list of packet hashes for duplicate detection + receipts = [] # Receipts of all outgoing packets for proof processing # TODO: "destination_table" should really be renamed to "path_table" announce_table = {} # A table for storing announces currently waiting to be retransmitted destination_table = {} # A lookup table containing the next hop to a given destination reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies link_table = {} # A lookup table containing hops for links + bundle_table = {} # A table for holding references to bundles in transport held_announces = {} # A table containing temporarily held announce-table entries # Transport control destinations are used @@ -107,9 +111,16 @@ class Transport: # Create transport-specific destinations Transport.path_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request") - Transport.path_request_destination.packet_callback(Transport.pathRequestHandler) + Transport.path_request_destination.packet_callback(Transport.path_request_handler) + + Transport.bundle_advertisement_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "bundle", "advertisement", ) + Transport.bundle_advertisement_destination.packet_callback(Transport.bundle_advertisement_handler) + Transport.control_destinations.append(Transport.path_request_destination) Transport.control_hashes.append(Transport.path_request_destination.hash) + + Transport.control_destinations.append(Transport.bundle_advertisement_destination) + Transport.control_hashes.append(Transport.bundle_advertisement_destination.hash) thread = threading.Thread(target=Transport.jobloop) thread.setDaemon(True) @@ -300,6 +311,8 @@ class Transport: Transport.tables_last_culled = time.time() + Transport.bundle_jobs() + except Exception as e: RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -310,6 +323,35 @@ class Transport: for packet in outgoing: packet.send() + @staticmethod + def bundle_jobs(): + removed_bundles = [] + for bundle in Transport.bundle_table: + # The bundle could not be passed on within the allowed + # time, and should be removed from storage + if bundle.heartbeat+Transport.BUNDLE_TIMEOUT < time.time(): + RNS.log("Removing stale bundle "+RNS.prettyhexrep(bundle.id)+" from storage", RNS.LOG_VERBOSE) + removed_bundles.append(bundle) + bundle.remove() + + # Custody was transferred to another node, we'll remove the bundle + if bundle.state == RNS.Bundle.NO_CUSTODY: + RNS.log("Removing bundle "+RNS.prettyhexrep(bundle.id)+" from storage since custody was transferred", RNS.LOG_VERBOSE) + removed_bundles.append(bundle) + bundle.remove() + + # This is an incoming bundle, attempt to retrieve it + if bundle.state == RNS.Bundle.TAKING_CUSTODY: + pass + + # We have custody over this bundle, and we should attempt + # to deliver it to it's next hop. + if bundle.state == RNS.Bundle.FULL_CUSTODY: + pass + + for bundle in removed_bundles: + Transport.bundle_table.remove(bundle) + @staticmethod def outbound(packet): while (Transport.jobs_running): @@ -852,6 +894,11 @@ class Transport: if destination.direction == RNS.Destination.IN: Transport.destinations.append(destination) + @staticmethod + def deregister_destination(destination): + if destination in Transport.destinations: + Transport.destinations.remove(destination) + @staticmethod def registerLink(link): RNS.log("Registering link "+str(link), RNS.LOG_DEBUG) @@ -870,6 +917,11 @@ class Transport: else: RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR) + @staticmethod + def register_bundle(bundle): + RNS.log("Transport instance registered bundle "+RNS.prettyhexrep(bundle.id), RNS.LOG_DEBUG) + self.bundle_table.append(bundle) + @staticmethod def find_interface_from_hash(interface_hash): for interface in Transport.interfaces: @@ -983,7 +1035,7 @@ class Transport: packet.send() @staticmethod - def pathRequestHandler(data, packet): + def path_request_handler(data, packet): if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8: Transport.pathRequest( data[:RNS.Identity.TRUNCATED_HASHLENGTH//8], @@ -991,6 +1043,10 @@ class Transport: packet.receiving_interface ) + @staticmethod + def bundle_advertisement_handler(data, packet): + pass + @staticmethod def pathRequest(destination_hash, is_from_local_client, attached_interface): RNS.log("Path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)