diff --git a/Examples/Ratchets.py b/Examples/Ratchets.py new file mode 100644 index 0000000..9c71c90 --- /dev/null +++ b/Examples/Ratchets.py @@ -0,0 +1,343 @@ +########################################################## +# This RNS example demonstrates a simple client/server # +# echo utility that uses ratchets to rotate encryption # +# keys everytime an announce is sent. # +########################################################## + +import argparse +import RNS + +# Let's define an app name. We'll use this for all +# destinations we create. Since this echo example +# is part of a range of example utilities, we'll put +# them all within the app namespace "example_utilities" +APP_NAME = "example_utilities" + + +########################################################## +#### Server Part ######################################### +########################################################## + +# This initialisation is executed when the users chooses +# to run as a server +def server(configpath): + global reticulum + + # We must first initialise Reticulum + reticulum = RNS.Reticulum(configpath) + + # TODO: Remove + RNS.loglevel = RNS.LOG_DEBUG + + # Randomly create a new identity for our echo server + server_identity = RNS.Identity() + + # We create a destination that clients can query. We want + # to be able to verify echo replies to our clients, so we + # create a "single" destination that can receive encrypted + # messages. This way the client can send a request and be + # certain that no-one else than this destination was able + # to read it. + echo_destination = RNS.Destination( + server_identity, + RNS.Destination.IN, + RNS.Destination.SINGLE, + APP_NAME, + "ratchet", + "echo", + "request" + ) + + # Enable ratchets on the destination by providing a file + # path to store ratchets. In this example, we will just + # use a temporary file, but in real-world applications, + # it's extremely important to keep this file secure, since + # it contains encryption keys for the destination. + destination_hexhash = RNS.hexrep(echo_destination.hash, delimit=False) + echo_destination.enable_ratchets(f"/tmp/{destination_hexhash}.ratchets") + + # We configure the destination to automatically prove all + # packets addressed to it. By doing this, RNS will automatically + # generate a proof for each incoming packet and transmit it + # back to the sender of that packet. + echo_destination.set_proof_strategy(RNS.Destination.PROVE_ALL) + + # Tell the destination which function in our program to + # run when a packet is received. We do this so we can + # print a log message when the server receives a request + echo_destination.set_packet_callback(server_callback) + + # Everything's ready! + # Let's Wait for client requests or user input + announceLoop(echo_destination) + + +def announceLoop(destination): + # Let the user know that everything is ready + RNS.log( + "Ratcheted echo server "+ + RNS.prettyhexrep(destination.hash)+ + " running, hit enter to manually send an announce (Ctrl-C to quit)" + ) + + # We enter a loop that runs until the users exits. + # If the user hits enter, we will announce our server + # destination on the network, which will let clients + # know how to create messages directed towards it. + while True: + entered = input() + destination.announce() + RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) + + +def server_callback(message, packet): + global reticulum + + # Tell the user that we received an echo request, and + # that we are going to send a reply to the requester. + # Sending the proof is handled automatically, since we + # set up the destination to prove all incoming packets. + + reception_stats = "" + if reticulum.is_connected_to_shared_instance: + reception_rssi = reticulum.get_packet_rssi(packet.packet_hash) + reception_snr = reticulum.get_packet_snr(packet.packet_hash) + + if reception_rssi != None: + reception_stats += " [RSSI "+str(reception_rssi)+" dBm]" + + if reception_snr != None: + reception_stats += " [SNR "+str(reception_snr)+" dBm]" + + else: + if packet.rssi != None: + reception_stats += " [RSSI "+str(packet.rssi)+" dBm]" + + if packet.snr != None: + reception_stats += " [SNR "+str(packet.snr)+" dB]" + + RNS.log("Received packet from echo client, proof sent"+reception_stats) + + +########################################################## +#### Client Part ######################################### +########################################################## + +# This initialisation is executed when the users chooses +# to run as a client +def client(destination_hexhash, configpath, timeout=None): + global reticulum + + # We need a binary representation of the destination + # hash that was entered on the command line + try: + dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 + if len(destination_hexhash) != dest_len: + raise ValueError( + "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) + ) + + destination_hash = bytes.fromhex(destination_hexhash) + except Exception as e: + RNS.log("Invalid destination entered. Check your input!") + RNS.log(str(e)+"\n") + exit() + + # We must first initialise Reticulum + reticulum = RNS.Reticulum(configpath) + + # We override the loglevel to provide feedback when + # an announce is received + if RNS.loglevel < RNS.LOG_INFO: + RNS.loglevel = RNS.LOG_INFO + + # Tell the user that the client is ready! + RNS.log( + "Echo client ready, hit enter to send echo request to "+ + destination_hexhash+ + " (Ctrl-C to quit)" + ) + + # We enter a loop that runs until the user exits. + # If the user hits enter, we will try to send an + # echo request to the destination specified on the + # command line. + while True: + input() + + # Let's first check if RNS knows a path to the destination. + # If it does, we'll load the server identity and create a packet + if RNS.Transport.has_path(destination_hash): + + # To address the server, we need to know it's public + # key, so we check if Reticulum knows this destination. + # This is done by calling the "recall" method of the + # Identity module. If the destination is known, it will + # return an Identity instance that can be used in + # outgoing destinations. + server_identity = RNS.Identity.recall(destination_hash) + + # We got the correct identity instance from the + # recall method, so let's create an outgoing + # destination. We use the naming convention: + # example_utilities.ratchet.echo.request + # This matches the naming we specified in the + # server part of the code. + request_destination = RNS.Destination( + server_identity, + RNS.Destination.OUT, + RNS.Destination.SINGLE, + APP_NAME, + "ratchet", + "echo", + "request" + ) + + # The destination is ready, so let's create a packet. + # We set the destination to the request_destination + # that was just created, and the only data we add + # is a random hash. + echo_request = RNS.Packet(request_destination, RNS.Identity.get_random_hash()) + + # Send the packet! If the packet is successfully + # sent, it will return a PacketReceipt instance. + packet_receipt = echo_request.send() + + # If the user specified a timeout, we set this + # timeout on the packet receipt, and configure + # a callback function, that will get called if + # the packet times out. + if timeout != None: + packet_receipt.set_timeout(timeout) + packet_receipt.set_timeout_callback(packet_timed_out) + + # We can then set a delivery callback on the receipt. + # This will get automatically called when a proof for + # this specific packet is received from the destination. + packet_receipt.set_delivery_callback(packet_delivered) + + # Tell the user that the echo request was sent + RNS.log("Sent echo request to "+RNS.prettyhexrep(request_destination.hash)) + else: + # If we do not know this destination, tell the + # user to wait for an announce to arrive. + RNS.log("Destination is not yet known. Requesting path...") + RNS.log("Hit enter to manually retry once an announce is received.") + RNS.Transport.request_path(destination_hash) + +# This function is called when our reply destination +# receives a proof packet. +def packet_delivered(receipt): + global reticulum + + if receipt.status == RNS.PacketReceipt.DELIVERED: + rtt = receipt.get_rtt() + if (rtt >= 1): + rtt = round(rtt, 3) + rttstring = str(rtt)+" seconds" + else: + rtt = round(rtt*1000, 3) + rttstring = str(rtt)+" milliseconds" + + reception_stats = "" + if reticulum.is_connected_to_shared_instance: + reception_rssi = reticulum.get_packet_rssi(receipt.proof_packet.packet_hash) + reception_snr = reticulum.get_packet_snr(receipt.proof_packet.packet_hash) + + if reception_rssi != None: + reception_stats += " [RSSI "+str(reception_rssi)+" dBm]" + + if reception_snr != None: + reception_stats += " [SNR "+str(reception_snr)+" dB]" + + else: + if receipt.proof_packet != None: + if receipt.proof_packet.rssi != None: + reception_stats += " [RSSI "+str(receipt.proof_packet.rssi)+" dBm]" + + if receipt.proof_packet.snr != None: + reception_stats += " [SNR "+str(receipt.proof_packet.snr)+" dB]" + + RNS.log( + "Valid reply received from "+ + RNS.prettyhexrep(receipt.destination.hash)+ + ", round-trip time is "+rttstring+ + reception_stats + ) + +# This function is called if a packet times out. +def packet_timed_out(receipt): + if receipt.status == RNS.PacketReceipt.FAILED: + RNS.log("Packet "+RNS.prettyhexrep(receipt.hash)+" timed out") + + +########################################################## +#### Program Startup ##################################### +########################################################## + +# This part of the program gets run at startup, +# and parses input from the user, and then starts +# the desired program mode. +if __name__ == "__main__": + try: + parser = argparse.ArgumentParser(description="Simple ratcheted echo server and client utility") + + parser.add_argument( + "-s", + "--server", + action="store_true", + help="wait for incoming packets from clients" + ) + + parser.add_argument( + "-t", + "--timeout", + action="store", + metavar="s", + default=None, + help="set a reply timeout in seconds", + type=float + ) + + parser.add_argument("--config", + action="store", + default=None, + help="path to alternative Reticulum config directory", + type=str + ) + + parser.add_argument( + "destination", + nargs="?", + default=None, + help="hexadecimal hash of the server destination", + type=str + ) + + args = parser.parse_args() + + if args.server: + configarg=None + if args.config: + configarg = args.config + server(configarg) + else: + if args.config: + configarg = args.config + else: + configarg = None + + if args.timeout: + timeoutarg = float(args.timeout) + else: + timeoutarg = None + + if (args.destination == None): + print("") + parser.print_help() + print("") + else: + client(args.destination, configarg, timeout=timeoutarg) + except KeyboardInterrupt: + print("") + exit() \ No newline at end of file