From 8f5a980bf1dee6ac9d3f7abbcc45ea8f55eb62c7 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 30 Oct 2023 03:09:57 +0100 Subject: [PATCH] Implemented telemetry collector requests and responses --- sbapp/sideband/core.py | 49 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index cf09b34..e19adc8 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -533,6 +533,8 @@ class SidebandCore(): self.config["telemetry_use_propagation_only"] = False if not "telemetry_try_propagation_on_fail" in self.config: self.config["telemetry_try_propagation_on_fail"] = True + if not "telemetry_requests_only_send_latest" in self.config: + self.config["telemetry_requests_only_send_latest"] = True if not "telemetry_s_location" in self.config: self.config["telemetry_s_location"] = False @@ -952,7 +954,7 @@ class SidebandCore(): return "not_sent" - def send_latest_telemetry(self, to_addr=None): + def send_latest_telemetry(self, to_addr=None, stream=None): if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True: RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG) return "in_progress" @@ -975,9 +977,19 @@ class SidebandCore(): desired_method = LXMF.LXMessage.DIRECT lxm_fields = self.get_message_fields(to_addr) - if lxm_fields != None and LXMF.FIELD_TELEMETRY in lxm_fields: - telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY]) - telemetry_timebase = telemeter.read_all()["time"]["utc"] + if stream != None and len(stream) > 0: + lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream + + if lxm_fields != None and (LXMF.FIELD_TELEMETRY in lxm_fields or LXMF.FIELD_TELEMETRY_STREAM in lxm_fields): + if LXMF.FIELD_TELEMETRY in lxm_fields: + telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY]) + telemetry_timebase = telemeter.read_all()["time"]["utc"] + elif LXMF.FIELD_TELEMETRY_STREAM in lxm_fields: + telemetry_timebase = 0 + for te in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]: + ts = te[1] + telemetry_timebase = max(telemetry_timebase, ts) + if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0): lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields) lxm.telemetry_timebase = telemetry_timebase @@ -1471,7 +1483,6 @@ class SidebandCore(): db = self.__db_connect() dbc = db.cursor() - # TODO: Implement limit limit_part = "" if limit: limit_part = " LIMIT "+str(int(limit)) @@ -1996,7 +2007,7 @@ class SidebandCore(): if LXMF.FIELD_TELEMETRY_STREAM in lxm.fields: for telemetry_entry in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]: # TODO: Implement - pass + RNS.log("TODO: Save this telemetry stream entry: "+str(telemetry_entry), RNS.LOG_WARNING) if own_command or len(lxm.content) != 0 or len(lxm.title) != 0: db = self.__db_connect() @@ -2934,11 +2945,10 @@ class SidebandCore(): self.lxm_ingest(message, originator=True) def get_message_fields(self, context_dest, telemetry_update=False): - fields = None + fields = {} send_telemetry = (telemetry_update == True) or self.should_send_telemetry(context_dest) send_appearance = self.config["telemetry_send_appearance"] or send_telemetry if send_telemetry or send_appearance: - fields = {} if send_appearance: def fth(c): r = c[0]; g = c[1]; b = c[2] @@ -3307,11 +3317,12 @@ class SidebandCore(): def handle_commands(self, commands, message): try: context_dest = message.source_hash - RNS.log("Handling commands from "+str(context_dest), RNS.LOG_DEBUG) + RNS.log("Handling commands from "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG) for command in commands: if Commands.TELEMETRY_REQUEST in command: timebase = int(command[Commands.TELEMETRY_REQUEST]) RNS.log("Handling telemetry request with timebase "+str(timebase), RNS.LOG_DEBUG) + self.create_telemetry_response(to_addr=context_dest, timebase=timebase) elif Commands.ECHO in command: msg_content = "Echo reply: "+command[Commands.ECHO].decode("utf-8") @@ -3337,6 +3348,26 @@ class SidebandCore(): except Exception as e: RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR) + def create_telemetry_response(self, to_addr, timebase): + sources = {} + sources = self.list_telemetry(after=timebase) + only_latest = self.config["telemetry_requests_only_send_latest"] + + telemetry_stream = [] + for source in sources: + if source != to_addr: + for entry in sources[source]: + timestamp = entry[0]; packed_telemetry = entry[1] + te = [source, timestamp, packed_telemetry] + if only_latest: + if not source in sources: + sources[source] = True + telemetry_stream.append(te) + else: + telemetry_stream.append(te) + + self.send_latest_telemetry(to_addr=to_addr, stream=telemetry_stream) + def get_display_name_bytes(self): return self.config["display_name"].encode("utf-8")