Reticulum/RNS/Identity.py

507 lines
19 KiB
Python
Raw Normal View History

2018-03-16 04:40:37 -06:00
import base64
import math
2018-03-19 11:11:50 -06:00
import os
import RNS
2018-03-19 13:51:26 -06:00
import time
import atexit
import base64
2020-04-22 04:07:13 -06:00
from .vendor import umsgpack as umsgpack
2018-03-16 04:40:37 -06:00
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
2018-03-16 04:40:37 -06:00
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.fernet import Fernet
2018-03-16 03:50:37 -06:00
cio_default_backend = default_backend()
2018-03-16 03:50:37 -06:00
class Identity:
"""
This class is used to manage identities in Reticulum. It provides methods
for encryption, decryption, signatures and verification, and is the basis
for all encrypted communication over Reticulum networks.
:param create_keys: Specifies whether new encryption and signing keys should be generated.
"""
CURVE = "Curve25519"
"""
The curve used for Elliptic Curve DH key exchanges
"""
2020-08-13 04:15:56 -06:00
KEYSIZE = 256*2
"""
X25519 key size in bits. A complete key is the concatenation of a 256 bit encryption key, and a 256 bit signing key.
"""
2020-08-13 04:15:56 -06:00
# Non-configurable constants
2021-09-02 10:00:03 -06:00
FERNET_VERSION = 0x80
2021-09-02 10:34:58 -06:00
FERNET_OVERHEAD = 54 # In bytes
AES128_BLOCKSIZE = 16 # In bytes
HASHLENGTH = 256 # In bits
SIGLENGTH = KEYSIZE # In bits
2020-08-13 04:15:56 -06:00
2021-09-02 10:00:03 -06:00
TRUNCATED_HASHLENGTH = RNS.Reticulum.TRUNCATED_HASHLENGTH
"""
Constant specifying the truncated hash length (in bits) used by Reticulum
for addressable hashes and other purposes. Non-configurable.
"""
2020-08-13 04:15:56 -06:00
# Storage
known_destinations = {}
@staticmethod
def remember(packet_hash, destination_hash, public_key, app_data = None):
if len(public_key) != Identity.KEYSIZE//8:
raise TypeError("Can't remember "+RNS.prettyhexrep(destination_hash)+", the public key size of "+str(len(public_key))+" is not valid.", RNS.LOG_ERROR)
else:
Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data]
2020-08-13 04:15:56 -06:00
@staticmethod
def recall(destination_hash):
"""
Recall identity for a destination hash.
:param destination_hash: Destination hash as *bytes*.
:returns: An :ref:`RNS.Identity<api-identity>` instance that can be used to create an outgoing :ref:`RNS.Destination<api-destination>`, or *None* if the destination is unknown.
"""
2020-08-13 04:15:56 -06:00
if destination_hash in Identity.known_destinations:
identity_data = Identity.known_destinations[destination_hash]
identity = Identity(create_keys=False)
2021-05-16 08:15:57 -06:00
identity.load_public_key(identity_data[2])
identity.app_data = identity_data[3]
2020-08-13 04:15:56 -06:00
return identity
else:
return None
@staticmethod
def recall_app_data(destination_hash):
"""
Recall last heard app_data for a destination hash.
:param destination_hash: Destination hash as *bytes*.
:returns: *Bytes* containing app_data, or *None* if the destination is unknown.
"""
if destination_hash in Identity.known_destinations:
app_data = Identity.known_destinations[destination_hash][3]
return app_data
else:
return None
2020-08-13 04:15:56 -06:00
@staticmethod
2021-05-16 08:15:57 -06:00
def save_known_destinations():
try:
storage_known_destinations = {}
if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
try:
file = open(RNS.Reticulum.storagepath+"/known_destinations","rb")
storage_known_destinations = umsgpack.load(file)
file.close()
except:
pass
for destination_hash in storage_known_destinations:
if not destination_hash in Identity.known_destinations:
Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash]
RNS.log("Saving known destinations to storage...", RNS.LOG_VERBOSE)
file = open(RNS.Reticulum.storagepath+"/known_destinations","wb")
umsgpack.dump(Identity.known_destinations, file)
file.close()
RNS.log("Done saving known destinations to storage", RNS.LOG_VERBOSE)
except Exception as e:
RNS.log("Error while saving known destinations to disk, the contained exception was: "+str(e), RNS.LOG_ERROR)
2020-08-13 04:15:56 -06:00
@staticmethod
2021-05-16 08:15:57 -06:00
def load_known_destinations():
2020-08-13 04:15:56 -06:00
if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
try:
file = open(RNS.Reticulum.storagepath+"/known_destinations","rb")
Identity.known_destinations = umsgpack.load(file)
file.close()
RNS.log("Loaded "+str(len(Identity.known_destinations))+" known destination from storage", RNS.LOG_VERBOSE)
except:
RNS.log("Error loading known destinations from disk, file will be recreated on exit", RNS.LOG_ERROR)
else:
RNS.log("Destinations file does not exist, no known destinations loaded", RNS.LOG_VERBOSE)
2020-08-13 04:15:56 -06:00
@staticmethod
2021-05-16 08:15:57 -06:00
def full_hash(data):
"""
Get a SHA-256 hash of passed data.
:param data: Data to be hashed as *bytes*.
:returns: SHA-256 hash as *bytes*
"""
2020-08-13 04:15:56 -06:00
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(data)
return digest.finalize()
@staticmethod
2021-05-16 08:15:57 -06:00
def truncated_hash(data):
"""
Get a truncated SHA-256 hash of passed data.
:param data: Data to be hashed as *bytes*.
:returns: Truncated SHA-256 hash as *bytes*
"""
2021-05-16 08:15:57 -06:00
return Identity.full_hash(data)[:(Identity.TRUNCATED_HASHLENGTH//8)]
2020-08-13 04:15:56 -06:00
@staticmethod
2021-05-16 08:15:57 -06:00
def get_random_hash():
"""
Get a random SHA-256 hash.
:param data: Data to be hashed as *bytes*.
:returns: Truncated SHA-256 hash of random data as *bytes*
"""
2021-05-16 08:15:57 -06:00
return Identity.truncated_hash(os.urandom(10))
2020-08-13 04:15:56 -06:00
@staticmethod
2021-05-16 08:15:57 -06:00
def validate_announce(packet):
2020-08-13 04:15:56 -06:00
if packet.packet_type == RNS.Packet.ANNOUNCE:
RNS.log("Validating announce from "+RNS.prettyhexrep(packet.destination_hash), RNS.LOG_DEBUG)
destination_hash = packet.destination_hash
2021-05-20 08:56:08 -06:00
public_key = packet.data[:Identity.KEYSIZE//8]
random_hash = packet.data[Identity.KEYSIZE//8:Identity.KEYSIZE//8+10]
signature = packet.data[Identity.KEYSIZE//8+10:Identity.KEYSIZE//8+10+Identity.KEYSIZE//8]
2020-08-13 04:15:56 -06:00
app_data = b""
2021-05-20 08:56:08 -06:00
if len(packet.data) > Identity.KEYSIZE//8+10+Identity.KEYSIZE//8:
app_data = packet.data[Identity.KEYSIZE//8+10+Identity.KEYSIZE//8:]
2020-08-13 04:15:56 -06:00
signed_data = destination_hash+public_key+random_hash+app_data
2021-05-20 08:56:08 -06:00
if not len(packet.data) > Identity.KEYSIZE//8+10+Identity.KEYSIZE//8:
app_data = None
announced_identity = Identity(create_keys=False)
2021-05-16 08:15:57 -06:00
announced_identity.load_public_key(public_key)
2020-08-13 04:15:56 -06:00
if announced_identity.pub != None and announced_identity.validate(signature, signed_data):
RNS.Identity.remember(packet.get_hash(), destination_hash, public_key, app_data)
2020-08-13 04:15:56 -06:00
RNS.log("Stored valid announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
del announced_identity
return True
else:
RNS.log("Received invalid announce", RNS.LOG_DEBUG)
del announced_identity
return False
@staticmethod
def exit_handler():
2021-05-16 08:15:57 -06:00
Identity.save_known_destinations()
2020-08-13 04:15:56 -06:00
2021-09-02 12:35:42 -06:00
@staticmethod
def from_bytes(prv_bytes):
"""
Create a new :ref:`RNS.Identity<api-identity>` instance from *bytes* of private key.
Can be used to load previously created and saved identities into Reticulum.
:param prv_bytes: The *bytes* of private a saved private key. **HAZARD!** Never use this to generate a new key by feeding random data in prv_bytes.
:returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the *bytes* data was invalid.
"""
identity = Identity(create_keys=False)
if identity.load_private_key(prv_bytes):
return identity
else:
return None
2020-08-13 04:15:56 -06:00
@staticmethod
def from_file(path):
"""
Create a new :ref:`RNS.Identity<api-identity>` instance from a file.
Can be used to load previously created and saved identities into Reticulum.
:param path: The full path to the saved :ref:`RNS.Identity<api-identity>` data
:returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the loaded data was invalid.
"""
identity = Identity(create_keys=False)
2020-08-13 04:15:56 -06:00
if identity.load(path):
return identity
else:
return None
2021-08-28 17:24:21 -06:00
def to_file(self, path):
"""
Saves the identity to a file. This will write the private key to disk,
and anyone with access to this file will be able to decrypt all
communication for the identity. Be very careful with this method.
:param path: The full path specifying where to save the identity.
:returns: True if the file was saved, otherwise False.
"""
try:
with open(path, "wb") as key_file:
key_file.write(self.get_private_key())
return True
return False
except Exception as e:
RNS.log("Error while saving identity to "+str(path), RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e))
def __init__(self,create_keys=True):
2020-08-13 04:15:56 -06:00
# Initialize keys to none
self.prv = None
self.prv_bytes = None
self.sig_prv = None
self.sig_prv_bytes = None
self.pub = None
self.pub_bytes = None
self.sig_pub = None
self.sig_pub_bytes = None
self.hash = None
self.hexhash = None
if create_keys:
2021-05-16 08:15:57 -06:00
self.create_keys()
2020-08-13 04:15:56 -06:00
2021-05-16 08:15:57 -06:00
def create_keys(self):
self.prv = X25519PrivateKey.generate()
self.prv_bytes = self.prv.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
2020-08-13 04:15:56 -06:00
)
self.sig_prv = Ed25519PrivateKey.generate()
self.sig_prv_bytes = self.sig_prv.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
2020-08-13 04:15:56 -06:00
encryption_algorithm=serialization.NoEncryption()
)
self.pub = self.prv.public_key()
self.pub_bytes = self.pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
self.sig_pub = self.sig_prv.public_key()
self.sig_pub_bytes = self.sig_pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
2020-08-13 04:15:56 -06:00
)
2021-05-16 08:15:57 -06:00
self.update_hashes()
2020-08-13 04:15:56 -06:00
RNS.log("Identity keys created for "+RNS.prettyhexrep(self.hash), RNS.LOG_VERBOSE)
2021-05-16 08:15:57 -06:00
def get_private_key(self):
"""
:returns: The private key as *bytes*
"""
return self.prv_bytes+self.sig_prv_bytes
2020-08-13 04:15:56 -06:00
2021-05-16 08:15:57 -06:00
def get_public_key(self):
"""
:returns: The public key as *bytes*
"""
return self.pub_bytes+self.sig_pub_bytes
2020-08-13 04:15:56 -06:00
2021-05-16 08:15:57 -06:00
def load_private_key(self, prv_bytes):
"""
Load a private key into the instance.
:param prv_bytes: The private key as *bytes*.
:returns: True if the key was loaded, otherwise False.
"""
2020-08-13 04:15:56 -06:00
try:
self.prv_bytes = prv_bytes[:Identity.KEYSIZE//8//2]
self.prv = X25519PrivateKey.from_private_bytes(self.prv_bytes)
self.sig_prv_bytes = prv_bytes[Identity.KEYSIZE//8//2:]
self.sig_prv = Ed25519PrivateKey.from_private_bytes(self.sig_prv_bytes)
self.pub = self.prv.public_key()
self.pub_bytes = self.pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
2020-08-13 04:15:56 -06:00
)
self.sig_pub = self.sig_prv.public_key()
self.sig_pub_bytes = self.sig_pub.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
2020-08-13 04:15:56 -06:00
)
2021-05-16 08:15:57 -06:00
self.update_hashes()
2020-08-13 04:15:56 -06:00
return True
except Exception as e:
raise e
2020-08-13 04:15:56 -06:00
RNS.log("Failed to load identity key", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
2020-08-13 04:15:56 -06:00
return False
def load_public_key(self, pub_bytes):
"""
Load a public key into the instance.
:param pub_bytes: The public key as *bytes*.
:returns: True if the key was loaded, otherwise False.
"""
2020-08-13 04:15:56 -06:00
try:
self.pub_bytes = pub_bytes[:Identity.KEYSIZE//8//2]
self.sig_pub_bytes = pub_bytes[Identity.KEYSIZE//8//2:]
self.pub = X25519PublicKey.from_public_bytes(self.pub_bytes)
self.sig_pub = Ed25519PublicKey.from_public_bytes(self.sig_pub_bytes)
2021-05-16 08:15:57 -06:00
self.update_hashes()
2020-08-13 04:15:56 -06:00
except Exception as e:
RNS.log("Error while loading public key, the contained exception was: "+str(e), RNS.LOG_ERROR)
2021-05-16 08:15:57 -06:00
def update_hashes(self):
self.hash = Identity.truncated_hash(self.get_public_key())
2020-08-13 04:15:56 -06:00
self.hexhash = self.hash.hex()
def load(self, path):
try:
with open(path, "rb") as key_file:
prv_bytes = key_file.read()
2021-05-16 08:15:57 -06:00
return self.load_private_key(prv_bytes)
2020-08-13 04:15:56 -06:00
return False
except Exception as e:
RNS.log("Error while loading identity from "+str(path), RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e))
def get_salt(self):
return self.hash
def get_context(self):
return None
2020-08-13 04:15:56 -06:00
def encrypt(self, plaintext):
"""
Encrypts information for the identity.
:param plaintext: The plaintext to be encrypted as *bytes*.
:returns: Ciphertext token as *bytes*.
:raises: *KeyError* if the instance does not hold a public key.
"""
2020-08-13 04:15:56 -06:00
if self.pub != None:
ephemeral_key = X25519PrivateKey.generate()
ephemeral_pub_bytes = ephemeral_key.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
shared_key = ephemeral_key.exchange(self.pub)
# TODO: Improve this re-allocation of HKDF
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=self.get_salt(),
info=self.get_context(),
backend=cio_default_backend,
).derive(shared_key)
fernet = Fernet(base64.urlsafe_b64encode(derived_key))
ciphertext = base64.urlsafe_b64decode(fernet.encrypt(plaintext))
token = ephemeral_pub_bytes+ciphertext
return token
2020-08-13 04:15:56 -06:00
else:
raise KeyError("Encryption failed because identity does not hold a public key")
def decrypt(self, ciphertext_token):
"""
Decrypts information for the identity.
:param ciphertext: The ciphertext to be decrypted as *bytes*.
:returns: Plaintext as *bytes*, or *None* if decryption fails.
:raises: *KeyError* if the instance does not hold a private key.
"""
2020-08-13 04:15:56 -06:00
if self.prv != None:
if len(ciphertext_token) > Identity.KEYSIZE//8//2:
plaintext = None
try:
peer_pub_bytes = ciphertext_token[:Identity.KEYSIZE//8//2]
peer_pub = X25519PublicKey.from_public_bytes(peer_pub_bytes)
shared_key = self.prv.exchange(peer_pub)
# TODO: Improve this re-allocation of HKDF
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=self.get_salt(),
info=self.get_context(),
backend=cio_default_backend,
).derive(shared_key)
fernet = Fernet(base64.urlsafe_b64encode(derived_key))
ciphertext = ciphertext_token[Identity.KEYSIZE//8//2:]
plaintext = fernet.decrypt(base64.urlsafe_b64encode(ciphertext))
except Exception as e:
RNS.log("Decryption by "+RNS.prettyhexrep(self.hash)+" failed: "+str(e), RNS.LOG_DEBUG)
return plaintext;
else:
RNS.log("Decryption failed because the token size was invalid.", RNS.LOG_DEBUG)
return None
2020-08-13 04:15:56 -06:00
else:
raise KeyError("Decryption failed because identity does not hold a private key")
def sign(self, message):
"""
Signs information by the identity.
:param message: The message to be signed as *bytes*.
:returns: Signature as *bytes*.
:raises: *KeyError* if the instance does not hold a private key.
"""
if self.sig_prv != None:
try:
return self.sig_prv.sign(message)
except Exception as e:
RNS.log("The identity "+str(self)+" could not sign the requested message. The contained exception was: "+str(e), RNS.LOG_ERROR)
raise e
2020-08-13 04:15:56 -06:00
else:
raise KeyError("Signing failed because identity does not hold a private key")
def validate(self, signature, message):
"""
Validates the signature of a signed message.
:param signature: The signature to be validated as *bytes*.
:param message: The message to be validated as *bytes*.
:returns: True if the signature is valid, otherwise False.
:raises: *KeyError* if the instance does not hold a public key.
"""
2020-08-13 04:15:56 -06:00
if self.pub != None:
try:
self.sig_pub.verify(signature, message)
2020-08-13 04:15:56 -06:00
return True
except Exception as e:
return False
else:
raise KeyError("Signature validation failed because identity does not hold a public key")
def prove(self, packet, destination=None):
signature = self.sign(packet.packet_hash)
if RNS.Reticulum.should_use_implicit_proof():
proof_data = signature
else:
proof_data = packet.packet_hash + signature
if destination == None:
2021-05-16 08:42:07 -06:00
destination = packet.generate_proof_destination()
2020-08-13 04:15:56 -06:00
proof = RNS.Packet(destination, proof_data, RNS.Packet.PROOF, attached_interface = packet.receiving_interface)
proof.send()
def __str__(self):
return RNS.prettyhexrep(self.hash)