Improved link error handling. Fixes #387.

This commit is contained in:
Mark Qvist 2023-10-31 11:44:12 +01:00
parent 3d645ae2f4
commit a4143cfe6d
1 changed files with 113 additions and 102 deletions

View File

@ -392,19 +392,20 @@ class Link:
try: try:
measured_rtt = time.time() - self.request_time measured_rtt = time.time() - self.request_time
plaintext = self.decrypt(packet.data) plaintext = self.decrypt(packet.data)
rtt = umsgpack.unpackb(plaintext) if plaintext != None:
self.rtt = max(measured_rtt, rtt) rtt = umsgpack.unpackb(plaintext)
self.status = Link.ACTIVE self.rtt = max(measured_rtt, rtt)
self.activated_at = time.time() self.status = Link.ACTIVE
self.activated_at = time.time()
if self.rtt != None and self.establishment_cost != None and self.rtt > 0 and self.establishment_cost > 0: if self.rtt != None and self.establishment_cost != None and self.rtt > 0 and self.establishment_cost > 0:
self.establishment_rate = self.establishment_cost/self.rtt self.establishment_rate = self.establishment_cost/self.rtt
try: try:
if self.owner.callbacks.link_established != None: if self.owner.callbacks.link_established != None:
self.owner.callbacks.link_established(self) self.owner.callbacks.link_established(self)
except Exception as e: except Exception as e:
RNS.log("Error occurred in external link establishment callback. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error occurred in external link establishment callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
except Exception as e: except Exception as e:
RNS.log("Error occurred while processing RTT packet, tearing down link. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error occurred while processing RTT packet, tearing down link. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -748,65 +749,68 @@ class Link:
should_query = False should_query = False
if packet.context == RNS.Packet.NONE: if packet.context == RNS.Packet.NONE:
plaintext = self.decrypt(packet.data) plaintext = self.decrypt(packet.data)
if self.callbacks.packet != None: if plaintext != None:
thread = threading.Thread(target=self.callbacks.packet, args=(plaintext, packet)) if self.callbacks.packet != None:
thread.daemon = True thread = threading.Thread(target=self.callbacks.packet, args=(plaintext, packet))
thread.start() thread.daemon = True
thread.start()
if self.destination.proof_strategy == RNS.Destination.PROVE_ALL: if self.destination.proof_strategy == RNS.Destination.PROVE_ALL:
packet.prove() packet.prove()
should_query = True should_query = True
elif self.destination.proof_strategy == RNS.Destination.PROVE_APP: elif self.destination.proof_strategy == RNS.Destination.PROVE_APP:
if self.destination.callbacks.proof_requested: if self.destination.callbacks.proof_requested:
try: try:
if self.destination.callbacks.proof_requested(packet): if self.destination.callbacks.proof_requested(packet):
packet.prove() packet.prove()
should_query = True should_query = True
except Exception as e: except Exception as e:
RNS.log("Error while executing proof request callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error while executing proof request callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
self.__update_phy_stats(packet, query_shared=should_query) self.__update_phy_stats(packet, query_shared=should_query)
elif packet.context == RNS.Packet.LINKIDENTIFY: elif packet.context == RNS.Packet.LINKIDENTIFY:
plaintext = self.decrypt(packet.data) plaintext = self.decrypt(packet.data)
if plaintext != None:
if not self.initiator and len(plaintext) == RNS.Identity.KEYSIZE//8 + RNS.Identity.SIGLENGTH//8:
public_key = plaintext[:RNS.Identity.KEYSIZE//8]
signed_data = self.link_id+public_key
signature = plaintext[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Identity.SIGLENGTH//8]
identity = RNS.Identity(create_keys=False)
identity.load_public_key(public_key)
if not self.initiator and len(plaintext) == RNS.Identity.KEYSIZE//8 + RNS.Identity.SIGLENGTH//8: if identity.validate(signature, signed_data):
public_key = plaintext[:RNS.Identity.KEYSIZE//8] self.__remote_identity = identity
signed_data = self.link_id+public_key if self.callbacks.remote_identified != None:
signature = plaintext[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Identity.SIGLENGTH//8] try:
identity = RNS.Identity(create_keys=False) self.callbacks.remote_identified(self, self.__remote_identity)
identity.load_public_key(public_key) except Exception as e:
RNS.log("Error while executing remote identified callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
if identity.validate(signature, signed_data): self.__update_phy_stats(packet, query_shared=True)
self.__remote_identity = identity
if self.callbacks.remote_identified != None:
try:
self.callbacks.remote_identified(self, self.__remote_identity)
except Exception as e:
RNS.log("Error while executing remote identified callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
self.__update_phy_stats(packet, query_shared=True)
elif packet.context == RNS.Packet.REQUEST: elif packet.context == RNS.Packet.REQUEST:
try: try:
request_id = packet.getTruncatedHash() request_id = packet.getTruncatedHash()
packed_request = self.decrypt(packet.data) packed_request = self.decrypt(packet.data)
unpacked_request = umsgpack.unpackb(packed_request) if packed_request != None:
self.handle_request(request_id, unpacked_request) unpacked_request = umsgpack.unpackb(packed_request)
self.__update_phy_stats(packet, query_shared=True) self.handle_request(request_id, unpacked_request)
self.__update_phy_stats(packet, query_shared=True)
except Exception as e: except Exception as e:
RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR)
elif packet.context == RNS.Packet.RESPONSE: elif packet.context == RNS.Packet.RESPONSE:
try: try:
packed_response = self.decrypt(packet.data) packed_response = self.decrypt(packet.data)
unpacked_response = umsgpack.unpackb(packed_response) if packed_response != None:
request_id = unpacked_response[0] unpacked_response = umsgpack.unpackb(packed_response)
response_data = unpacked_response[1] request_id = unpacked_response[0]
transfer_size = len(umsgpack.packb(response_data))-2 response_data = unpacked_response[1]
self.handle_response(request_id, response_data, transfer_size, transfer_size) transfer_size = len(umsgpack.packb(response_data))-2
self.__update_phy_stats(packet, query_shared=True) self.handle_response(request_id, response_data, transfer_size, transfer_size)
self.__update_phy_stats(packet, query_shared=True)
except Exception as e: except Exception as e:
RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -821,63 +825,67 @@ class Link:
elif packet.context == RNS.Packet.RESOURCE_ADV: elif packet.context == RNS.Packet.RESOURCE_ADV:
packet.plaintext = self.decrypt(packet.data) packet.plaintext = self.decrypt(packet.data)
self.__update_phy_stats(packet, query_shared=True) if packet.plaintext != None:
self.__update_phy_stats(packet, query_shared=True)
if RNS.ResourceAdvertisement.is_request(packet): if RNS.ResourceAdvertisement.is_request(packet):
RNS.Resource.accept(packet, callback=self.request_resource_concluded) RNS.Resource.accept(packet, callback=self.request_resource_concluded)
elif RNS.ResourceAdvertisement.is_response(packet): elif RNS.ResourceAdvertisement.is_response(packet):
request_id = RNS.ResourceAdvertisement.read_request_id(packet) request_id = RNS.ResourceAdvertisement.read_request_id(packet)
for pending_request in self.pending_requests: for pending_request in self.pending_requests:
if pending_request.request_id == request_id: if pending_request.request_id == request_id:
RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id) RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id)
pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet) pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet)
pending_request.response_transfer_size = RNS.ResourceAdvertisement.read_transfer_size(packet) pending_request.response_transfer_size = RNS.ResourceAdvertisement.read_transfer_size(packet)
pending_request.started_at = time.time() pending_request.started_at = time.time()
elif self.resource_strategy == Link.ACCEPT_NONE: elif self.resource_strategy == Link.ACCEPT_NONE:
pass pass
elif self.resource_strategy == Link.ACCEPT_APP: elif self.resource_strategy == Link.ACCEPT_APP:
if self.callbacks.resource != None: if self.callbacks.resource != None:
try: try:
resource_advertisement = RNS.ResourceAdvertisement.unpack(packet.plaintext) resource_advertisement = RNS.ResourceAdvertisement.unpack(packet.plaintext)
resource_advertisement.link = self resource_advertisement.link = self
if self.callbacks.resource(resource_advertisement): if self.callbacks.resource(resource_advertisement):
RNS.Resource.accept(packet, self.callbacks.resource_concluded) RNS.Resource.accept(packet, self.callbacks.resource_concluded)
except Exception as e: except Exception as e:
RNS.log("Error while executing resource accept callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error while executing resource accept callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
elif self.resource_strategy == Link.ACCEPT_ALL: elif self.resource_strategy == Link.ACCEPT_ALL:
RNS.Resource.accept(packet, self.callbacks.resource_concluded) RNS.Resource.accept(packet, self.callbacks.resource_concluded)
elif packet.context == RNS.Packet.RESOURCE_REQ: elif packet.context == RNS.Packet.RESOURCE_REQ:
plaintext = self.decrypt(packet.data) plaintext = self.decrypt(packet.data)
self.__update_phy_stats(packet, query_shared=True) if plaintext != None:
if ord(plaintext[:1]) == RNS.Resource.HASHMAP_IS_EXHAUSTED: self.__update_phy_stats(packet, query_shared=True)
resource_hash = plaintext[1+RNS.Resource.MAPHASH_LEN:RNS.Identity.HASHLENGTH//8+1+RNS.Resource.MAPHASH_LEN] if ord(plaintext[:1]) == RNS.Resource.HASHMAP_IS_EXHAUSTED:
else: resource_hash = plaintext[1+RNS.Resource.MAPHASH_LEN:RNS.Identity.HASHLENGTH//8+1+RNS.Resource.MAPHASH_LEN]
resource_hash = plaintext[1:RNS.Identity.HASHLENGTH//8+1] else:
resource_hash = plaintext[1:RNS.Identity.HASHLENGTH//8+1]
for resource in self.outgoing_resources: for resource in self.outgoing_resources:
if resource.hash == resource_hash: if resource.hash == resource_hash:
# We need to check that this request has not been # We need to check that this request has not been
# received before in order to avoid sequencing errors. # received before in order to avoid sequencing errors.
if not packet.packet_hash in resource.req_hashlist: if not packet.packet_hash in resource.req_hashlist:
resource.req_hashlist.append(packet.packet_hash) resource.req_hashlist.append(packet.packet_hash)
resource.request(plaintext) resource.request(plaintext)
elif packet.context == RNS.Packet.RESOURCE_HMU: elif packet.context == RNS.Packet.RESOURCE_HMU:
plaintext = self.decrypt(packet.data) plaintext = self.decrypt(packet.data)
self.__update_phy_stats(packet, query_shared=True) if plaintext != None:
resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8] self.__update_phy_stats(packet, query_shared=True)
for resource in self.incoming_resources: resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
if resource_hash == resource.hash: for resource in self.incoming_resources:
resource.hashmap_update_packet(plaintext) if resource_hash == resource.hash:
resource.hashmap_update_packet(plaintext)
elif packet.context == RNS.Packet.RESOURCE_ICL: elif packet.context == RNS.Packet.RESOURCE_ICL:
plaintext = self.decrypt(packet.data) plaintext = self.decrypt(packet.data)
self.__update_phy_stats(packet) if plaintext != None:
resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8] self.__update_phy_stats(packet)
for resource in self.incoming_resources: resource_hash = plaintext[:RNS.Identity.HASHLENGTH//8]
if resource_hash == resource.hash: for resource in self.incoming_resources:
resource.cancel() if resource_hash == resource.hash:
resource.cancel()
elif packet.context == RNS.Packet.KEEPALIVE: elif packet.context == RNS.Packet.KEEPALIVE:
if not self.initiator and packet.data == bytes([0xFF]): if not self.initiator and packet.data == bytes([0xFF]):
@ -909,13 +917,15 @@ class Link:
# else: # else:
# packet.prove() # packet.prove()
# plaintext = self.decrypt(packet.data) # plaintext = self.decrypt(packet.data)
# self._channel._receive(plaintext) # if plaintext != None:
# self._channel._receive(plaintext)
############################################ ############################################
packet.prove() packet.prove()
plaintext = self.decrypt(packet.data) plaintext = self.decrypt(packet.data)
self.__update_phy_stats(packet) if plaintext != None:
self._channel._receive(plaintext) self.__update_phy_stats(packet)
self._channel._receive(plaintext)
elif packet.packet_type == RNS.Packet.PROOF: elif packet.packet_type == RNS.Packet.PROOF:
if packet.context == RNS.Packet.RESOURCE_PRF: if packet.context == RNS.Packet.RESOURCE_PRF:
@ -953,6 +963,7 @@ class Link:
except Exception as e: except Exception as e:
RNS.log("Decryption failed on link "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Decryption failed on link "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
return None
def sign(self, message): def sign(self, message):