From 548663655bba0e384ee2d21ee1ea479c85204925 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 16 Sep 2024 09:59:21 +0200 Subject: [PATCH] Perform telemetry requests and updates via service on Android. Update service heartbeat monitoring. --- sbapp/sideband/core.py | 310 ++++++++++++++++++++++++++--------------- 1 file changed, 196 insertions(+), 114 deletions(-) diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index aa40c04..8c7d11e 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -1205,130 +1205,192 @@ class SidebandCore(): else: self.setstate(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.request_sending", False) + def _service_request_latest_telemetry(self, from_addr=None): + if not RNS.vendor.platformutils.is_android(): + return False + else: + if self.is_client: + try: + if self.rpc_connection == None: + self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) + + self.rpc_connection.send({"request_latest_telemetry": {"from_addr": from_addr}}) + response = self.rpc_connection.recv() + return response + + except Exception as e: + RNS.log("Error while requesting latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG) + RNS.trace_exception(e) + return False + else: + return False def request_latest_telemetry(self, from_addr=None): - if from_addr == None or from_addr == self.lxmf_destination.hash: - return "no_address" - else: - if self.getstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending") == True: - RNS.log("Not sending new telemetry request, since an earlier transfer is already in progress", RNS.LOG_DEBUG) - return "in_progress" + if self.allow_service_dispatch and self.is_client: + try: + return self._service_request_latest_telemetry(from_addr) - if from_addr != None: - dest_identity = RNS.Identity.recall(from_addr) - - if dest_identity == None: - RNS.log("The identity for "+RNS.prettyhexrep(from_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG) - RNS.Transport.request_path(from_addr) - return "destination_unknown" - - else: - now = time.time() - dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery") - source = self.lxmf_destination - - if self.config["telemetry_use_propagation_only"] == True: - desired_method = LXMF.LXMessage.PROPAGATED - else: - desired_method = LXMF.LXMessage.DIRECT - - request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.timebase") or now - self.telemetry_request_max_history - lxm_fields = { LXMF.FIELD_COMMANDS: [ - {Commands.TELEMETRY_REQUEST: request_timebase}, - ]} - - lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=True) - lxm.request_timebase = request_timebase - lxm.register_delivery_callback(self.telemetry_request_finished) - lxm.register_failed_callback(self.telemetry_request_finished) - - if self.message_router.get_outbound_propagation_node() != None: - if self.config["telemetry_try_propagation_on_fail"]: - lxm.try_propagation_on_fail = True - - RNS.log(f"Sending telemetry request with timebase {request_timebase}", RNS.LOG_DEBUG) - self.setpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.last_request_attempt", time.time()) - self.setstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending", True) - self.message_router.handle_outbound(lxm) - return "sent" - - else: + except Exception as e: + RNS.log("Error requesting latest telemetry: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) return "not_sent" + else: + if from_addr == None or from_addr == self.lxmf_destination.hash: + return "no_address" + else: + if self.getstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending") == True: + RNS.log("Not sending new telemetry request, since an earlier transfer is already in progress", RNS.LOG_DEBUG) + return "in_progress" - def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False): - if to_addr == None or to_addr == self.lxmf_destination.hash: - return "no_address" - else: - if to_addr == self.config["telemetry_collector"]: - is_authorized_telemetry_request = True + if from_addr != None: + dest_identity = RNS.Identity.recall(from_addr) + + if dest_identity == None: + RNS.log("The identity for "+RNS.prettyhexrep(from_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG) + RNS.Transport.request_path(from_addr) + return "destination_unknown" - if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True: - RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG) - return "in_progress" + else: + now = time.time() + dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery") + source = self.lxmf_destination + + if self.config["telemetry_use_propagation_only"] == True: + desired_method = LXMF.LXMessage.PROPAGATED + else: + desired_method = LXMF.LXMessage.DIRECT - if (self.latest_packed_telemetry != None and self.latest_telemetry != None) or stream != None: - dest_identity = RNS.Identity.recall(to_addr) - - if dest_identity == None: - RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG) - RNS.Transport.request_path(to_addr) - return "destination_unknown" + request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.timebase") or now - self.telemetry_request_max_history + lxm_fields = { LXMF.FIELD_COMMANDS: [ + {Commands.TELEMETRY_REQUEST: request_timebase}, + ]} + + lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=True) + lxm.request_timebase = request_timebase + lxm.register_delivery_callback(self.telemetry_request_finished) + lxm.register_failed_callback(self.telemetry_request_finished) + + if self.message_router.get_outbound_propagation_node() != None: + if self.config["telemetry_try_propagation_on_fail"]: + lxm.try_propagation_on_fail = True + + RNS.log(f"Sending telemetry request with timebase {request_timebase}", RNS.LOG_DEBUG) + self.setpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.last_request_attempt", time.time()) + self.setstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending", True) + self.message_router.handle_outbound(lxm) + return "sent" else: - dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery") - source = self.lxmf_destination - - if self.config["telemetry_use_propagation_only"] == True: - desired_method = LXMF.LXMessage.PROPAGATED - else: - desired_method = LXMF.LXMessage.DIRECT + return "not_sent" - lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request, signal_already_sent=True) - if lxm_fields == False and stream == None: - return "already_sent" - - if stream != None and len(stream) > 0: - if lxm_fields == False: - lxm_fields = {} - lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream - - if lxm_fields != None and lxm_fields != False and (LXMF.FIELD_TELEMETRY in lxm_fields or LXMF.FIELD_TELEMETRY_STREAM in lxm_fields): - if LXMF.FIELD_TELEMETRY in lxm_fields: - telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY]) - telemetry_timebase = telemeter.read_all()["time"]["utc"] - elif LXMF.FIELD_TELEMETRY_STREAM in lxm_fields: - telemetry_timebase = 0 - for te in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]: - ts = te[1] - telemetry_timebase = max(telemetry_timebase, ts) - - if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0): - lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=self.is_trusted(to_addr)) - lxm.telemetry_timebase = telemetry_timebase - lxm.register_delivery_callback(self.outbound_telemetry_finished) - lxm.register_failed_callback(self.outbound_telemetry_finished) - - if self.message_router.get_outbound_propagation_node() != None: - if self.config["telemetry_try_propagation_on_fail"]: - lxm.try_propagation_on_fail = True - - RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_DEBUG) - - self.setpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_attempt", time.time()) - self.setstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending", True) - self.message_router.handle_outbound(lxm) - return "sent" - - else: - RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_DEBUG) - return "already_sent" - else: - return "nothing_to_send" + def _service_send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False): + if not RNS.vendor.platformutils.is_android(): + return False + else: + if self.is_client: + try: + if self.rpc_connection == None: + self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) + self.rpc_connection.send({"send_latest_telemetry": { + "to_addr": to_addr, + "stream": stream, + "is_authorized_telemetry_request": is_authorized_telemetry_request} + }) + response = self.rpc_connection.recv() + return response + + except Exception as e: + RNS.log("Error while sending latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG) + RNS.trace_exception(e) + return False else: - RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING) - return "nothing_to_send" + return False + + def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False): + if self.allow_service_dispatch and self.is_client: + try: + return self._service_send_latest_telemetry(to_addr, stream, is_authorized_telemetry_request) + + except Exception as e: + RNS.log("Error requesting latest telemetry: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) + return "not_sent" + + else: + if to_addr == None or to_addr == self.lxmf_destination.hash: + return "no_address" + else: + if to_addr == self.config["telemetry_collector"]: + is_authorized_telemetry_request = True + + if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True: + RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG) + return "in_progress" + + if (self.latest_packed_telemetry != None and self.latest_telemetry != None) or stream != None: + dest_identity = RNS.Identity.recall(to_addr) + + if dest_identity == None: + RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG) + RNS.Transport.request_path(to_addr) + return "destination_unknown" + + else: + dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery") + source = self.lxmf_destination + + if self.config["telemetry_use_propagation_only"] == True: + desired_method = LXMF.LXMessage.PROPAGATED + else: + desired_method = LXMF.LXMessage.DIRECT + + lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request, signal_already_sent=True) + if lxm_fields == False and stream == None: + return "already_sent" + + if stream != None and len(stream) > 0: + if lxm_fields == False: + lxm_fields = {} + lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream + + if lxm_fields != None and lxm_fields != False and (LXMF.FIELD_TELEMETRY in lxm_fields or LXMF.FIELD_TELEMETRY_STREAM in lxm_fields): + if LXMF.FIELD_TELEMETRY in lxm_fields: + telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY]) + telemetry_timebase = telemeter.read_all()["time"]["utc"] + elif LXMF.FIELD_TELEMETRY_STREAM in lxm_fields: + telemetry_timebase = 0 + for te in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]: + ts = te[1] + telemetry_timebase = max(telemetry_timebase, ts) + + if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0): + lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=self.is_trusted(to_addr)) + lxm.telemetry_timebase = telemetry_timebase + lxm.register_delivery_callback(self.outbound_telemetry_finished) + lxm.register_failed_callback(self.outbound_telemetry_finished) + + if self.message_router.get_outbound_propagation_node() != None: + if self.config["telemetry_try_propagation_on_fail"]: + lxm.try_propagation_on_fail = True + + RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_DEBUG) + + self.setpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_attempt", time.time()) + self.setstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending", True) + self.message_router.handle_outbound(lxm) + return "sent" + + else: + RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_DEBUG) + return "already_sent" + else: + return "nothing_to_send" + + else: + RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING) + return "nothing_to_send" def list_telemetry(self, context_dest = None, after = None, before = None, limit = None): @@ -1400,6 +1462,7 @@ class SidebandCore(): return [] def service_available(self): + heartbeat_stale_time = 7.5 now = time.time() service_heartbeat = self.getstate("service.heartbeat") if not service_heartbeat: @@ -1407,9 +1470,16 @@ class SidebandCore(): return False else: try: - if now - service_heartbeat > 4.0: - RNS.log("Stale service heartbeat at "+str(now), RNS.LOG_DEBUG) - return False + if now - service_heartbeat > heartbeat_stale_time: + RNS.log("Stale service heartbeat at "+str(now)+", retrying...", RNS.LOG_DEBUG) + now = time.time() + service_heartbeat = self.getstate("service.heartbeat") + if now - service_heartbeat > heartbeat_stale_time: + RNS.log("Service heartbeat did not recover after retry", RNS.LOG_DEBUG) + return False + else: + RNS.log("Service heartbeat recovered at"+str(time), RNS.LOG_DEBUG) + return True else: return True except Exception as e: @@ -1680,6 +1750,18 @@ class SidebandCore(): args["destination_hash"], args["propagation"]) connection.send(send_result) + elif "request_latest_telemetry" in call: + args = call["request_latest_telemetry"] + send_result = self.request_latest_telemetry(args["from_addr"]) + connection.send(send_result) + elif "send_latest_telemetry" in call: + args = call["send_latest_telemetry"] + send_result = self.send_latest_telemetry( + to_addr=args["to_addr"], + stream=args["stream"], + is_authorized_telemetry_request=args["is_authorized_telemetry_request"] + ) + connection.send(send_result) elif "get_lxm_progress" in call: args = call["get_lxm_progress"] connection.send(self.get_lxm_progress(args["lxm_hash"]))