From aa99ef058acac9d8f9fef7c2125e8bbe4cc72546 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sun, 29 Oct 2023 23:03:51 +0100 Subject: [PATCH] Implemented automated and manual telemetry updates to collector --- sbapp/sideband/core.py | 82 +++++++++++++++++++++++++++++++++------ sbapp/ui/objectdetails.py | 12 ++++-- sbapp/ui/telemetry.py | 34 ++++++++-------- 3 files changed, 95 insertions(+), 33 deletions(-) diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index 4c1d6f8..4194703 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -120,6 +120,11 @@ class SidebandCore(): self.telemetry_running = False self.latest_telemetry = None self.telemetry_changes = 0 + self.pending_telemetry_send = False + self.pending_telemetry_send_try = 0 + self.pending_telemetry_send_maxtries = 2 + self.telemetry_send_blocked_until = 0 + self.pending_telemetry_request = False self.state_db = {} self.rpc_connection = None @@ -868,24 +873,29 @@ class SidebandCore(): self.message_router.handle_outbound(message) else: if message.state == LXMF.LXMessage.DELIVERED: - self.setpersistent("telemetry.collector_last_send_success_timebase", message.telemetry_timebase) - self.telemetry_update_sending = False + self.setpersistent(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.last_send_success_timebase", message.telemetry_timebase) + self.setstate(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.update_sending", False) + if message.destination_hash == self.config["telemetry_collector"]: + self.pending_telemetry_send = False + self.pending_telemetry_send_try = 0 + self.telemetry_send_blocked_until = 0 else: - self.telemetry_update_sending = False + self.setstate(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.update_sending", False) + def request_latest_telemetry(self, from_addr=None): pass def send_latest_telemetry(self, to_addr=None): - if hasattr(self, "telemetry_update_sending") and self.telemetry_update_sending == True: - RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_VERBOSE) + if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True: + RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG) return "in_progress" if to_addr != None and self.latest_packed_telemetry != None and self.latest_telemetry != None: dest_identity = RNS.Identity.recall(to_addr) if dest_identity == None: - RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_VERBOSE) + RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG) RNS.Transport.request_path(to_addr) return "destination_unknown" @@ -902,7 +912,7 @@ class SidebandCore(): if lxm_fields != None and LXMF.FIELD_TELEMETRY in lxm_fields: telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY]) telemetry_timebase = telemeter.read_all()["time"]["utc"] - if telemetry_timebase > (self.getpersistent("telemetry.collector_last_send_success_timebase") or 0): + if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0): lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields) lxm.telemetry_timebase = telemetry_timebase lxm.register_delivery_callback(self.outbound_telemetry_finished) @@ -912,16 +922,18 @@ class SidebandCore(): if self.config["telemetry_try_propagation_on_fail"]: lxm.try_propagation_on_fail = True - RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_VERBOSE) + RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_DEBUG) - self.setpersistent("telemetry.collector_last_send_attempt", time.time()) - self.telemetry_update_sending = True + self.setpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_attempt", time.time()) + self.setstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending", True) self.message_router.handle_outbound(lxm) return "sent" else: - RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_VERBOSE) + RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_DEBUG) return "already_sent" + else: + return "not_sent" else: RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING) @@ -2339,7 +2351,53 @@ class SidebandCore(): if self.config["telemetry_enabled"]: if self.config["telemetry_send_to_collector"]: - pass + if self.config["telemetry_collector"] != None: + try: + now = time.time() + collector_address = self.config["telemetry_collector"] + last_send_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(collector_address, delimit=False)}.last_send_success_timebase") or 0 + send_interval = self.config["telemetry_send_interval"] + next_send = last_send_timebase+send_interval + + scheduled = next_send-now; blocked = self.telemetry_send_blocked_until-now + next_send_in = max(scheduled, blocked) + RNS.log("Last telemetry send was "+RNS.prettytime(now-last_send_timebase)+" ago", RNS.LOG_DEBUG) + RNS.log("Next telemetry send is "+("in "+RNS.prettytime(next_send_in) if next_send_in > 0 else "now"), RNS.LOG_DEBUG) + + if now > last_send_timebase+send_interval and now > self.telemetry_send_blocked_until: + RNS.log("Initiating telemetry send to collector", RNS.LOG_DEBUG) + if not self.pending_telemetry_send_try >= self.pending_telemetry_send_maxtries: + self.pending_telemetry_send = True + self.pending_telemetry_send_try += 1 + self.send_latest_telemetry(to_addr=collector_address) + else: + if self.telemetry_send_blocked_until < now: + next_slot = now+send_interval + self.telemetry_send_blocked_until = next_slot + RNS.log(f"Sending telemetry to collector failed after {self.pending_telemetry_send_try} tries.", RNS.LOG_WARNING) + RNS.log(f"Not scheduling further retries until next send slot in {RNS.prettytime(next_slot-now)}.", RNS.LOG_WARNING) + self.pending_telemetry_send_try = 0 + + except Exception as e: + RNS.log("An error occurred while sending scheduled telemetry to collector: "+str(e), RNS.LOG_ERROR) + + if self.config["telemetry_request_from_collector"]: + if self.config["telemetry_collector"] != None: + try: + now = time.time() + collector_address = self.config["telemetry_collector"] + last_request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(collector_address, delimit=False)}.last_request_success_timebase") or 0 + request_interval = self.config["telemetry_request_interval"] + next_request = last_request_timebase+request_interval + + RNS.log("Last telemetry request was "+RNS.prettytime(now-last_request_timebase)+" ago", RNS.LOG_DEBUG) + RNS.log("Next telemetry request is "+("in "+RNS.prettytime(next_request-now) if next_request-now > 0 else "now"), RNS.LOG_DEBUG) + + if now > last_request_timebase+request_interval: + RNS.log("Initiating telemetry request to collector", RNS.LOG_DEBUG) + + except Exception as e: + RNS.log("An error occurred while requesting scheduled telemetry from collector: "+str(e), RNS.LOG_ERROR) def __start_jobs_deferred(self): if self.is_service: diff --git a/sbapp/ui/objectdetails.py b/sbapp/ui/objectdetails.py index 0270cbd..70ea55e 100644 --- a/sbapp/ui/objectdetails.py +++ b/sbapp/ui/objectdetails.py @@ -119,6 +119,7 @@ class ObjectDetails(): def set_source(self, source_dest, from_conv=False, from_telemetry=False, prefetched=None): self.object_hash = source_dest own_address = self.app.sideband.lxmf_destination.hash + telemetry_allowed = self.app.sideband.should_send_telemetry(source_dest) if source_dest == own_address: self.viewing_self = True else: @@ -143,16 +144,16 @@ class ObjectDetails(): self.screen.ids.object_appearance.icon = appearance[0] self.screen.ids.object_appearance.icon_color = appearance[1] self.screen.ids.object_appearance.md_bg_color = appearance[2] - # self.screen.ids.delete_button.line_color = self.app.color_reject - # self.screen.ids.delete_button.text_color = self.app.color_reject - # self.screen.ids.delete_button.icon_color = self.app.color_reject def djob(dt): if self.viewing_self: self.screen.ids.request_button.disabled = True self.screen.ids.send_button.disabled = True else: self.screen.ids.request_button.disabled = False - self.screen.ids.send_button.disabled = False + if telemetry_allowed: + self.screen.ids.send_button.disabled = False + else: + self.screen.ids.send_button.disabled = True if prefetched != None: latest_telemetry = prefetched @@ -210,6 +211,9 @@ class ObjectDetails(): elif result == "sent": title_str = "Update Sent" info_str = "A telemetry update was sent to the peer." + elif result == "not_sent": + title_str = "Not Sent" + info_str = "A telemetry update could not be sent." else: title_str = "Unknown Status" info_str = "The status of the telemetry update is unknown." diff --git a/sbapp/ui/telemetry.py b/sbapp/ui/telemetry.py index 6f98a14..ef0c305 100644 --- a/sbapp/ui/telemetry.py +++ b/sbapp/ui/telemetry.py @@ -532,7 +532,22 @@ MDScreen: height: dp(48) MDLabel: - text: "Only display from trusted" + text: "Send display style to everyone" + font_style: "H6" + + MDSwitch: + id: telemetry_send_appearance + pos_hint: {"center_y": 0.3} + active: False + + MDBoxLayout: + orientation: "horizontal" + size_hint_y: None + padding: [0,0,dp(24),dp(0)] + height: dp(48) + + MDLabel: + text: "Only display trusted on map" font_style: "H6" MDSwitch: @@ -562,7 +577,7 @@ MDScreen: height: dp(48) MDLabel: - text: "Send to all trusted peers" + text: "Send telemetry to all trusted" font_style: "H6" MDSwitch: @@ -570,21 +585,6 @@ MDScreen: pos_hint: {"center_y": 0.3} active: False - MDBoxLayout: - orientation: "horizontal" - size_hint_y: None - padding: [0,0,dp(24),dp(0)] - height: dp(48) - - MDLabel: - text: "Send display style to everyone" - font_style: "H6" - - MDSwitch: - id: telemetry_send_appearance - pos_hint: {"center_y": 0.3} - active: False - MDBoxLayout: orientation: "horizontal" size_hint_y: None