Perform telemetry requests and updates via service on Android. Update service heartbeat monitoring.

This commit is contained in:
Mark Qvist 2024-09-16 09:59:21 +02:00
parent 6038bc541c
commit 548663655b
1 changed files with 196 additions and 114 deletions

View File

@ -1205,130 +1205,192 @@ class SidebandCore():
else:
self.setstate(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.request_sending", False)
def _service_request_latest_telemetry(self, from_addr=None):
if not RNS.vendor.platformutils.is_android():
return False
else:
if self.is_client:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"request_latest_telemetry": {"from_addr": from_addr}})
response = self.rpc_connection.recv()
return response
except Exception as e:
RNS.log("Error while requesting latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)
return False
else:
return False
def request_latest_telemetry(self, from_addr=None):
if from_addr == None or from_addr == self.lxmf_destination.hash:
return "no_address"
else:
if self.getstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending") == True:
RNS.log("Not sending new telemetry request, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
if self.allow_service_dispatch and self.is_client:
try:
return self._service_request_latest_telemetry(from_addr)
if from_addr != None:
dest_identity = RNS.Identity.recall(from_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(from_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(from_addr)
return "destination_unknown"
else:
now = time.time()
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.timebase") or now - self.telemetry_request_max_history
lxm_fields = { LXMF.FIELD_COMMANDS: [
{Commands.TELEMETRY_REQUEST: request_timebase},
]}
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=True)
lxm.request_timebase = request_timebase
lxm.register_delivery_callback(self.telemetry_request_finished)
lxm.register_failed_callback(self.telemetry_request_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry request with timebase {request_timebase}", RNS.LOG_DEBUG)
self.setpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.last_request_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
except Exception as e:
RNS.log("Error requesting latest telemetry: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
return "not_sent"
else:
if from_addr == None or from_addr == self.lxmf_destination.hash:
return "no_address"
else:
if self.getstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending") == True:
RNS.log("Not sending new telemetry request, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False):
if to_addr == None or to_addr == self.lxmf_destination.hash:
return "no_address"
else:
if to_addr == self.config["telemetry_collector"]:
is_authorized_telemetry_request = True
if from_addr != None:
dest_identity = RNS.Identity.recall(from_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(from_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(from_addr)
return "destination_unknown"
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
else:
now = time.time()
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
if (self.latest_packed_telemetry != None and self.latest_telemetry != None) or stream != None:
dest_identity = RNS.Identity.recall(to_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(to_addr)
return "destination_unknown"
request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.timebase") or now - self.telemetry_request_max_history
lxm_fields = { LXMF.FIELD_COMMANDS: [
{Commands.TELEMETRY_REQUEST: request_timebase},
]}
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=True)
lxm.request_timebase = request_timebase
lxm.register_delivery_callback(self.telemetry_request_finished)
lxm.register_failed_callback(self.telemetry_request_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry request with timebase {request_timebase}", RNS.LOG_DEBUG)
self.setpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.last_request_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
return "not_sent"
lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request, signal_already_sent=True)
if lxm_fields == False and stream == None:
return "already_sent"
if stream != None and len(stream) > 0:
if lxm_fields == False:
lxm_fields = {}
lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream
if lxm_fields != None and lxm_fields != False and (LXMF.FIELD_TELEMETRY in lxm_fields or LXMF.FIELD_TELEMETRY_STREAM in lxm_fields):
if LXMF.FIELD_TELEMETRY in lxm_fields:
telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY])
telemetry_timebase = telemeter.read_all()["time"]["utc"]
elif LXMF.FIELD_TELEMETRY_STREAM in lxm_fields:
telemetry_timebase = 0
for te in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]:
ts = te[1]
telemetry_timebase = max(telemetry_timebase, ts)
if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0):
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=self.is_trusted(to_addr))
lxm.telemetry_timebase = telemetry_timebase
lxm.register_delivery_callback(self.outbound_telemetry_finished)
lxm.register_failed_callback(self.outbound_telemetry_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_DEBUG)
self.setpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_DEBUG)
return "already_sent"
else:
return "nothing_to_send"
def _service_send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False):
if not RNS.vendor.platformutils.is_android():
return False
else:
if self.is_client:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"send_latest_telemetry": {
"to_addr": to_addr,
"stream": stream,
"is_authorized_telemetry_request": is_authorized_telemetry_request}
})
response = self.rpc_connection.recv()
return response
except Exception as e:
RNS.log("Error while sending latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)
return False
else:
RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING)
return "nothing_to_send"
return False
def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False):
if self.allow_service_dispatch and self.is_client:
try:
return self._service_send_latest_telemetry(to_addr, stream, is_authorized_telemetry_request)
except Exception as e:
RNS.log("Error requesting latest telemetry: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
return "not_sent"
else:
if to_addr == None or to_addr == self.lxmf_destination.hash:
return "no_address"
else:
if to_addr == self.config["telemetry_collector"]:
is_authorized_telemetry_request = True
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
if (self.latest_packed_telemetry != None and self.latest_telemetry != None) or stream != None:
dest_identity = RNS.Identity.recall(to_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(to_addr)
return "destination_unknown"
else:
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request, signal_already_sent=True)
if lxm_fields == False and stream == None:
return "already_sent"
if stream != None and len(stream) > 0:
if lxm_fields == False:
lxm_fields = {}
lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream
if lxm_fields != None and lxm_fields != False and (LXMF.FIELD_TELEMETRY in lxm_fields or LXMF.FIELD_TELEMETRY_STREAM in lxm_fields):
if LXMF.FIELD_TELEMETRY in lxm_fields:
telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY])
telemetry_timebase = telemeter.read_all()["time"]["utc"]
elif LXMF.FIELD_TELEMETRY_STREAM in lxm_fields:
telemetry_timebase = 0
for te in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]:
ts = te[1]
telemetry_timebase = max(telemetry_timebase, ts)
if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0):
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=self.is_trusted(to_addr))
lxm.telemetry_timebase = telemetry_timebase
lxm.register_delivery_callback(self.outbound_telemetry_finished)
lxm.register_failed_callback(self.outbound_telemetry_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_DEBUG)
self.setpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_DEBUG)
return "already_sent"
else:
return "nothing_to_send"
else:
RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING)
return "nothing_to_send"
def list_telemetry(self, context_dest = None, after = None, before = None, limit = None):
@ -1400,6 +1462,7 @@ class SidebandCore():
return []
def service_available(self):
heartbeat_stale_time = 7.5
now = time.time()
service_heartbeat = self.getstate("service.heartbeat")
if not service_heartbeat:
@ -1407,9 +1470,16 @@ class SidebandCore():
return False
else:
try:
if now - service_heartbeat > 4.0:
RNS.log("Stale service heartbeat at "+str(now), RNS.LOG_DEBUG)
return False
if now - service_heartbeat > heartbeat_stale_time:
RNS.log("Stale service heartbeat at "+str(now)+", retrying...", RNS.LOG_DEBUG)
now = time.time()
service_heartbeat = self.getstate("service.heartbeat")
if now - service_heartbeat > heartbeat_stale_time:
RNS.log("Service heartbeat did not recover after retry", RNS.LOG_DEBUG)
return False
else:
RNS.log("Service heartbeat recovered at"+str(time), RNS.LOG_DEBUG)
return True
else:
return True
except Exception as e:
@ -1680,6 +1750,18 @@ class SidebandCore():
args["destination_hash"],
args["propagation"])
connection.send(send_result)
elif "request_latest_telemetry" in call:
args = call["request_latest_telemetry"]
send_result = self.request_latest_telemetry(args["from_addr"])
connection.send(send_result)
elif "send_latest_telemetry" in call:
args = call["send_latest_telemetry"]
send_result = self.send_latest_telemetry(
to_addr=args["to_addr"],
stream=args["stream"],
is_authorized_telemetry_request=args["is_authorized_telemetry_request"]
)
connection.send(send_result)
elif "get_lxm_progress" in call:
args = call["get_lxm_progress"]
connection.send(self.get_lxm_progress(args["lxm_hash"]))