From 19c54319a63da58d44471f92aa673a625757515e Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 2 Sep 2024 13:05:32 +0200 Subject: [PATCH] Added PTT playback queue and playback lock while recording. Added database operation thread locking. --- sbapp/main.py | 35 +- sbapp/plyer/platforms/android/audio.py | 16 +- sbapp/sideband/core.py | 1329 ++++++++++++------------ 3 files changed, 707 insertions(+), 673 deletions(-) diff --git a/sbapp/main.py b/sbapp/main.py index 9c60b43..1aeaa45 100644 --- a/sbapp/main.py +++ b/sbapp/main.py @@ -1052,12 +1052,10 @@ class SidebandApp(MDApp): elif keycode == 40: self.msg_rec_a_save(None) - elif not self.rec_dialog_is_open: - if not self.messages_view.ids.message_text.focus: - if self.messages_view.ptt_enabled and keycode == 44: - if not self.key_ptt_down: - self.key_ptt_down = True - self.message_ptt_down_action() + elif not self.rec_dialog_is_open and not self.messages_view.ids.message_text.focus and self.messages_view.ptt_enabled and keycode == 44: + if not self.key_ptt_down: + self.key_ptt_down = True + self.message_ptt_down_action() elif len(modifiers) > 1 and "shift" in modifiers and "ctrl" in modifiers: def clear_att(): @@ -1763,6 +1761,9 @@ class SidebandApp(MDApp): RNS.trace_exception(e) def message_ptt_down_action(self, sender=None): + if self.sideband.ui_recording: + return + self.sideband.ui_started_recording() self.audio_msg_mode = LXMF.AM_CODEC2_2400 self.message_attach_action(attach_type="audio", nodialog=True) @@ -1782,7 +1783,9 @@ class SidebandApp(MDApp): def message_ptt_up_action(self, sender=None): - self.sideband.ui_stopped_recording() + if not self.sideband.ui_recording: + return + self.rec_dialog.recording = False el_button = self.messages_view.ids.message_ptt_button el_icon = self.messages_view.ids.message_ptt_button.children[0].children[1] @@ -1792,10 +1795,16 @@ class SidebandApp(MDApp): el_icon.theme_text_color="Custom" el_icon.text_color=mdc("BlueGray","500") def cb_s(dt): - self.msg_audio.stop() - self.message_process_audio() - self.message_send_action() - Clock.schedule_once(cb_s, 0.25) + try: + self.msg_audio.stop() + except Exception as e: + RNS.log("An error occurred while stopping recording: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) + + self.sideband.ui_stopped_recording() + if self.message_process_audio(): + self.message_send_action() + Clock.schedule_once(cb_s, 0.35) def message_process_audio(self): if self.audio_msg_mode == LXMF.AM_OPUS_OGG: @@ -1850,7 +1859,9 @@ class SidebandApp(MDApp): os.unlink(self.msg_audio._file_path) else: self.display_codec2_error() - return + return False + + return True def message_init_rec_dialog(self): ss = int(dp(18)) diff --git a/sbapp/plyer/platforms/android/audio.py b/sbapp/plyer/platforms/android/audio.py index a72d1e3..33a0a0d 100644 --- a/sbapp/plyer/platforms/android/audio.py +++ b/sbapp/plyer/platforms/android/audio.py @@ -68,13 +68,21 @@ class AndroidAudio(Audio): def _stop(self): if self._recorder: - self._recorder.stop() - self._recorder.release() + try: + self._recorder.stop() + self._recorder.release() + except Exception as e: + print("Could not stop recording: "+str(e)) + self._recorder = None if self._player: - self._player.stop() - self._player.release() + try: + self._player.stop() + self._player.release() + except Exception as e: + print("Could not stop playback: "+str(e)) + self._player = None self.is_playing = False diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index 1446b60..84e03fc 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -1476,14 +1476,10 @@ class SidebandCore(): pass else: if self.is_service: - # TODO: Remove - RNS.log("Indicating recording in service: "+str(recording)) self.ui_recording = recording return True else: try: - # TODO: Remove - RNS.log("Passing recording indication to service: "+str(recording)) if self.rpc_connection == None: self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) self.rpc_connection.send({"set_ui_recording": recording}) @@ -1496,8 +1492,6 @@ class SidebandCore(): def getstate(self, prop, allow_cache=False): with self.state_lock: if not self.service_stopped: - # TODO: remove - # us = time.time() if not RNS.vendor.platformutils.is_android(): if prop in self.state_db: @@ -1516,8 +1510,6 @@ class SidebandCore(): self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) self.rpc_connection.send({"getstate": prop}) response = self.rpc_connection.recv() - # TODO: Remove - # RNS.log("RPC getstate result for "+str(prop)+"="+str(response)+" in "+RNS.prettytime(time.time()-us), RNS.LOG_WARNING) return response except Exception as e: @@ -1721,247 +1713,255 @@ class SidebandCore(): db.commit() def _db_getpersistent(self, prop): - try: + with self.db_lock: + try: + db = self.__db_connect() + dbc = db.cursor() + + query = "select * from persistent where property=:uprop" + dbc.execute(query, {"uprop": prop.encode("utf-8")}) + result = dbc.fetchall() + + if len(result) < 1: + return None + else: + try: + entry = result[0] + val = msgpack.unpackb(entry[1]) + if val == None: + query = "delete from persistent where (property=:uprop);" + dbc.execute(query, {"uprop": prop.encode("utf-8")}) + db.commit() + + return val + except Exception as e: + RNS.log("Could not unpack persistent value from database for property \""+str(prop)+"\". The contained exception was: "+str(e), RNS.LOG_ERROR) + return None + + except Exception as e: + RNS.log("An error occurred during persistent getstate database operation: "+str(e), RNS.LOG_ERROR) + self.db = None + + def _db_setpersistent(self, prop, val): + existing_prop = self._db_getpersistent(prop) + + with self.db_lock: + try: + db = self.__db_connect() + dbc = db.cursor() + uprop = prop.encode("utf-8") + bval = msgpack.packb(val) + + if existing_prop == None: + try: + query = "INSERT INTO persistent (property, value) values (?, ?)" + data = (uprop, bval) + dbc.execute(query, data) + db.commit() + + except Exception as e: + RNS.log("Error while setting persistent state property "+str(prop)+" in DB: "+str(e), RNS.LOG_ERROR) + RNS.log("Retrying as update query...") + query = "UPDATE state set value=:bval where property=:uprop;" + dbc.execute(query, {"bval": bval, "uprop": uprop}) + db.commit() + + else: + query = "UPDATE persistent set value=:bval where property=:uprop;" + dbc.execute(query, {"bval": bval, "uprop": uprop}) + db.commit() + + except Exception as e: + RNS.log("An error occurred during persistent setstate database operation: "+str(e), RNS.LOG_ERROR) + self.db = None + + def _db_conversation_update_txtime(self, context_dest, is_retry = False): + with self.db_lock: + try: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set last_tx = ? where dest_context = ?" + data = (time.time(), context_dest) + + dbc.execute(query, data) + result = dbc.fetchall() + db.commit() + except Exception as e: + RNS.log("An error occurred while updating conversation TX time: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_conversation_update_txtime(context_dest, is_retry=True) + + def _db_conversation_set_unread(self, context_dest, unread, tx = False, is_retry = False): + with self.db_lock: + try: + db = self.__db_connect() + dbc = db.cursor() + + if unread: + if tx: + query = "UPDATE conv set unread = ?, last_tx = ? where dest_context = ?" + data = (unread, time.time(), context_dest) + else: + query = "UPDATE conv set unread = ?, last_rx = ? where dest_context = ?" + data = (unread, time.time(), context_dest) + else: + query = "UPDATE conv set unread = ? where dest_context = ?" + data = (unread, context_dest) + + dbc.execute(query, data) + result = dbc.fetchall() + db.commit() + except Exception as e: + RNS.log("An error occurred while updating conversation unread flag: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_conversation_set_unread(context_dest, unread, tx, is_retry=True) + + def _db_telemetry(self, context_dest = None, after = None, before = None, limit = None): + with self.db_lock: db = self.__db_connect() dbc = db.cursor() - - query = "select * from persistent where property=:uprop" - dbc.execute(query, {"uprop": prop.encode("utf-8")}) + + limit_part = "" + if limit: + limit_part = " LIMIT "+str(int(limit)) + order_part = " order by ts DESC"+limit_part + if context_dest == None: + if after != None and before == None: + query = "select * from telemetry where ts>:after_ts"+order_part + dbc.execute(query, {"after_ts": after}) + elif after == None and before != None: + query = "select * from telemetry where ts<:before_ts"+order_part + dbc.execute(query, {"before_ts": before}) + elif after != None and before != None: + query = "select * from telemetry where ts<:before_ts and ts>:after_ts"+order_part + dbc.execute(query, {"before_ts": before, "after_ts": after}) + else: + query = query = "select * from telemetry" + dbc.execute(query, {}) + + else: + if after != None and before == None: + query = "select * from telemetry where dest_context=:context_dest and ts>:after_ts"+order_part + dbc.execute(query, {"context_dest": context_dest, "after_ts": after}) + elif after == None and before != None: + query = "select * from telemetry where dest_context=:context_dest and ts<:before_ts"+order_part + dbc.execute(query, {"context_dest": context_dest, "before_ts": before}) + elif after != None and before != None: + query = "select * from telemetry where dest_context=:context_dest and ts<:before_ts and ts>:after_ts"+order_part + dbc.execute(query, {"context_dest": context_dest, "before_ts": before, "after_ts": after}) + else: + query = query = "select * from telemetry where dest_context=:context_dest"+order_part + dbc.execute(query, {"context_dest": context_dest}) + result = dbc.fetchall() if len(result) < 1: return None else: - try: - entry = result[0] - val = msgpack.unpackb(entry[1]) - if val == None: - query = "delete from persistent where (property=:uprop);" - dbc.execute(query, {"uprop": prop.encode("utf-8")}) - db.commit() + results = {} + for entry in result: + telemetry_source = entry[1] + telemetry_timestamp = entry[2] + telemetry_data = entry[3] + + if not telemetry_source in results: + results[telemetry_source] = [] - return val - except Exception as e: - RNS.log("Could not unpack persistent value from database for property \""+str(prop)+"\". The contained exception was: "+str(e), RNS.LOG_ERROR) - return None - - except Exception as e: - RNS.log("An error occurred during persistent getstate database operation: "+str(e), RNS.LOG_ERROR) - self.db = None - - def _db_setpersistent(self, prop, val): - try: - db = self.__db_connect() - dbc = db.cursor() - uprop = prop.encode("utf-8") - bval = msgpack.packb(val) - - if self._db_getpersistent(prop) == None: - try: - query = "INSERT INTO persistent (property, value) values (?, ?)" - data = (uprop, bval) - dbc.execute(query, data) - db.commit() - - except Exception as e: - RNS.log("Error while setting persistent state property "+str(prop)+" in DB: "+str(e), RNS.LOG_ERROR) - RNS.log("Retrying as update query...") - query = "UPDATE state set value=:bval where property=:uprop;" - dbc.execute(query, {"bval": bval, "uprop": uprop}) - db.commit() - - else: - query = "UPDATE persistent set value=:bval where property=:uprop;" - dbc.execute(query, {"bval": bval, "uprop": uprop}) - db.commit() - - except Exception as e: - RNS.log("An error occurred during persistent setstate database operation: "+str(e), RNS.LOG_ERROR) - self.db = None - - def _db_conversation_update_txtime(self, context_dest, is_retry = False): - try: - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set last_tx = ? where dest_context = ?" - data = (time.time(), context_dest) - - dbc.execute(query, data) - result = dbc.fetchall() - db.commit() - except Exception as e: - RNS.log("An error occurred while updating conversation TX time: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_conversation_update_txtime(context_dest, is_retry=True) - - def _db_conversation_set_unread(self, context_dest, unread, tx = False, is_retry = False): - try: - db = self.__db_connect() - dbc = db.cursor() - - if unread: - if tx: - query = "UPDATE conv set unread = ?, last_tx = ? where dest_context = ?" - data = (unread, time.time(), context_dest) - else: - query = "UPDATE conv set unread = ?, last_rx = ? where dest_context = ?" - data = (unread, time.time(), context_dest) - else: - query = "UPDATE conv set unread = ? where dest_context = ?" - data = (unread, context_dest) - - dbc.execute(query, data) - result = dbc.fetchall() - db.commit() - except Exception as e: - RNS.log("An error occurred while updating conversation unread flag: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_conversation_set_unread(context_dest, unread, tx, is_retry=True) - - def _db_telemetry(self, context_dest = None, after = None, before = None, limit = None): - db = self.__db_connect() - dbc = db.cursor() - - limit_part = "" - if limit: - limit_part = " LIMIT "+str(int(limit)) - order_part = " order by ts DESC"+limit_part - if context_dest == None: - if after != None and before == None: - query = "select * from telemetry where ts>:after_ts"+order_part - dbc.execute(query, {"after_ts": after}) - elif after == None and before != None: - query = "select * from telemetry where ts<:before_ts"+order_part - dbc.execute(query, {"before_ts": before}) - elif after != None and before != None: - query = "select * from telemetry where ts<:before_ts and ts>:after_ts"+order_part - dbc.execute(query, {"before_ts": before, "after_ts": after}) - else: - query = query = "select * from telemetry" - dbc.execute(query, {}) - - else: - if after != None and before == None: - query = "select * from telemetry where dest_context=:context_dest and ts>:after_ts"+order_part - dbc.execute(query, {"context_dest": context_dest, "after_ts": after}) - elif after == None and before != None: - query = "select * from telemetry where dest_context=:context_dest and ts<:before_ts"+order_part - dbc.execute(query, {"context_dest": context_dest, "before_ts": before}) - elif after != None and before != None: - query = "select * from telemetry where dest_context=:context_dest and ts<:before_ts and ts>:after_ts"+order_part - dbc.execute(query, {"context_dest": context_dest, "before_ts": before, "after_ts": after}) - else: - query = query = "select * from telemetry where dest_context=:context_dest"+order_part - dbc.execute(query, {"context_dest": context_dest}) - - result = dbc.fetchall() - - if len(result) < 1: - return None - else: - results = {} - for entry in result: - telemetry_source = entry[1] - telemetry_timestamp = entry[2] - telemetry_data = entry[3] + results[telemetry_source].append([telemetry_timestamp, telemetry_data]) - if not telemetry_source in results: - results[telemetry_source] = [] - - results[telemetry_source].append([telemetry_timestamp, telemetry_data]) - - return results + return results def _db_save_telemetry(self, context_dest, telemetry, physical_link = None, source_dest = None, via = None, is_retry = False): - try: - remote_telemeter = Telemeter.from_packed(telemetry) - read_telemetry = remote_telemeter.read_all() - telemetry_timestamp = read_telemetry["time"]["utc"] - - db = self.__db_connect() - dbc = db.cursor() - - query = "select * from telemetry where dest_context=:ctx and ts=:tts" - dbc.execute(query, {"ctx": context_dest, "tts": telemetry_timestamp}) - result = dbc.fetchall() - - if len(result) != 0: - RNS.log("Telemetry entry with source "+RNS.prettyhexrep(context_dest)+" and timestamp "+str(telemetry_timestamp)+" already exists, skipping save", RNS.LOG_DEBUG) - return None - - if physical_link != None and len(physical_link) != 0: - remote_telemeter.synthesize("physical_link") - if "rssi" in physical_link: remote_telemeter.sensors["physical_link"].rssi = physical_link["rssi"] - if "snr" in physical_link: remote_telemeter.sensors["physical_link"].snr = physical_link["snr"] - if "q" in physical_link: remote_telemeter.sensors["physical_link"].q = physical_link["q"] - remote_telemeter.sensors["physical_link"].update_data() - telemetry = remote_telemeter.packed() - - if source_dest != None: - remote_telemeter.synthesize("received") - remote_telemeter.sensors["received"].by = self.lxmf_destination.hash - remote_telemeter.sensors["received"].via = source_dest - - rl = remote_telemeter.read("location") - if rl and "latitude" in rl and "longitude" in rl and "altitude" in rl: - if self.latest_telemetry != None and "location" in self.latest_telemetry: - ol = self.latest_telemetry["location"] - if ol != None: - if "latitude" in ol and "longitude" in ol and "altitude" in ol: - olat = ol["latitude"]; olon = ol["longitude"]; oalt = ol["altitude"] - rlat = rl["latitude"]; rlon = rl["longitude"]; ralt = rl["altitude"] - if olat != None and olon != None and oalt != None: - if rlat != None and rlon != None and ralt != None: - remote_telemeter.sensors["received"].set_distance( - (olat, olon, oalt), (rlat, rlon, ralt) - ) - - remote_telemeter.sensors["received"].update_data() - telemetry = remote_telemeter.packed() - - if via != None: - if not "received" in remote_telemeter.sensors: - remote_telemeter.synthesize("received") - - if "by" in remote_telemeter.sensors["received"].data: - remote_telemeter.sensors["received"].by = remote_telemeter.sensors["received"].data["by"] - if "distance" in remote_telemeter.sensors["received"].data: - remote_telemeter.sensors["received"].geodesic_distance = remote_telemeter.sensors["received"].data["distance"]["geodesic"] - remote_telemeter.sensors["received"].euclidian_distance = remote_telemeter.sensors["received"].data["distance"]["euclidian"] - - remote_telemeter.sensors["received"].via = via - remote_telemeter.sensors["received"].update_data() - telemetry = remote_telemeter.packed() - - query = "INSERT INTO telemetry (dest_context, ts, data) values (?, ?, ?)" - data = (context_dest, telemetry_timestamp, telemetry) - dbc.execute(query, data) - + with self.db_lock: try: - db.commit() + remote_telemeter = Telemeter.from_packed(telemetry) + read_telemetry = remote_telemeter.read_all() + telemetry_timestamp = read_telemetry["time"]["utc"] + + db = self.__db_connect() + dbc = db.cursor() + + query = "select * from telemetry where dest_context=:ctx and ts=:tts" + dbc.execute(query, {"ctx": context_dest, "tts": telemetry_timestamp}) + result = dbc.fetchall() + + if len(result) != 0: + RNS.log("Telemetry entry with source "+RNS.prettyhexrep(context_dest)+" and timestamp "+str(telemetry_timestamp)+" already exists, skipping save", RNS.LOG_DEBUG) + return None + + if physical_link != None and len(physical_link) != 0: + remote_telemeter.synthesize("physical_link") + if "rssi" in physical_link: remote_telemeter.sensors["physical_link"].rssi = physical_link["rssi"] + if "snr" in physical_link: remote_telemeter.sensors["physical_link"].snr = physical_link["snr"] + if "q" in physical_link: remote_telemeter.sensors["physical_link"].q = physical_link["q"] + remote_telemeter.sensors["physical_link"].update_data() + telemetry = remote_telemeter.packed() + + if source_dest != None: + remote_telemeter.synthesize("received") + remote_telemeter.sensors["received"].by = self.lxmf_destination.hash + remote_telemeter.sensors["received"].via = source_dest + + rl = remote_telemeter.read("location") + if rl and "latitude" in rl and "longitude" in rl and "altitude" in rl: + if self.latest_telemetry != None and "location" in self.latest_telemetry: + ol = self.latest_telemetry["location"] + if ol != None: + if "latitude" in ol and "longitude" in ol and "altitude" in ol: + olat = ol["latitude"]; olon = ol["longitude"]; oalt = ol["altitude"] + rlat = rl["latitude"]; rlon = rl["longitude"]; ralt = rl["altitude"] + if olat != None and olon != None and oalt != None: + if rlat != None and rlon != None and ralt != None: + remote_telemeter.sensors["received"].set_distance( + (olat, olon, oalt), (rlat, rlon, ralt) + ) + + remote_telemeter.sensors["received"].update_data() + telemetry = remote_telemeter.packed() + + if via != None: + if not "received" in remote_telemeter.sensors: + remote_telemeter.synthesize("received") + + if "by" in remote_telemeter.sensors["received"].data: + remote_telemeter.sensors["received"].by = remote_telemeter.sensors["received"].data["by"] + if "distance" in remote_telemeter.sensors["received"].data: + remote_telemeter.sensors["received"].geodesic_distance = remote_telemeter.sensors["received"].data["distance"]["geodesic"] + remote_telemeter.sensors["received"].euclidian_distance = remote_telemeter.sensors["received"].data["distance"]["euclidian"] + + remote_telemeter.sensors["received"].via = via + remote_telemeter.sensors["received"].update_data() + telemetry = remote_telemeter.packed() + + query = "INSERT INTO telemetry (dest_context, ts, data) values (?, ?, ?)" + data = (context_dest, telemetry_timestamp, telemetry) + dbc.execute(query, data) + + try: + db.commit() + except Exception as e: + RNS.log("An error occurred while commiting telemetry to database: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_save_telemetry(context_dest, telemetry, physical_link, source_dest, via, is_retry = True) + return + + self.setstate("app.flags.last_telemetry", time.time()) + + return telemetry + except Exception as e: - RNS.log("An error occurred while commiting telemetry to database: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_save_telemetry(context_dest, telemetry, physical_link, source_dest, via, is_retry = True) - return - - self.setstate("app.flags.last_telemetry", time.time()) - - return telemetry - - except Exception as e: - import traceback - exception_info = "".join(traceback.TracebackException.from_exception(e).format()) - RNS.log(f"A {str(type(e))} occurred while saving telemetry to database: {str(e)}", RNS.LOG_ERROR) - RNS.log(exception_info, RNS.LOG_ERROR) - self.db = None + import traceback + exception_info = "".join(traceback.TracebackException.from_exception(e).format()) + RNS.log(f"A {str(type(e))} occurred while saving telemetry to database: {str(e)}", RNS.LOG_ERROR) + RNS.log(exception_info, RNS.LOG_ERROR) + self.db = None def _db_update_appearance(self, context_dest, timestamp, appearance, from_bulk_telemetry=False): conv = self._db_conversation(context_dest) @@ -1973,29 +1973,30 @@ class SidebandCore(): self.setpersistent("temp.peer_appearance."+RNS.hexrep(context_dest, delimit=False), ae) else: - data_dict = conv["data"] - if data_dict == None: - data_dict = {} + with self.db_lock: + data_dict = conv["data"] + if data_dict == None: + data_dict = {} - if not "appearance" in data_dict: - data_dict["appearance"] = None + if not "appearance" in data_dict: + data_dict["appearance"] = None - if from_bulk_telemetry and data_dict["appearance"] != SidebandCore.DEFAULT_APPEARANCE: - RNS.log("Aborting appearance update from bulk transfer, since conversation already has appearance set: "+str(appearance)+" / "+str(data_dict["appearance"]), RNS.LOG_DEBUG) - return + if from_bulk_telemetry and data_dict["appearance"] != SidebandCore.DEFAULT_APPEARANCE: + RNS.log("Aborting appearance update from bulk transfer, since conversation already has appearance set: "+str(appearance)+" / "+str(data_dict["appearance"]), RNS.LOG_DEBUG) + return - if data_dict["appearance"] != appearance: - data_dict["appearance"] = appearance - packed_dict = msgpack.packb(data_dict) - - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set data = ? where dest_context = ?" - data = (packed_dict, context_dest) - dbc.execute(query, data) - result = dbc.fetchall() - db.commit() + if data_dict["appearance"] != appearance: + data_dict["appearance"] = appearance + packed_dict = msgpack.packb(data_dict) + + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set data = ? where dest_context = ?" + data = (packed_dict, context_dest) + dbc.execute(query, data) + result = dbc.fetchall() + db.commit() def _db_get_appearance(self, context_dest, conv = None, raw=False): if context_dest == self.lxmf_destination.hash: @@ -2052,22 +2053,23 @@ class SidebandCore(): data_dict["telemetry"] = send_telemetry packed_dict = msgpack.packb(data_dict) - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set data = ? where dest_context = ?" - data = (packed_dict, context_dest) - dbc.execute(query, data) - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set data = ? where dest_context = ?" + data = (packed_dict, context_dest) + dbc.execute(query, data) + result = dbc.fetchall() - try: - db.commit() - except Exception as e: - RNS.log("An error occurred while updating conversation telemetry options: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_conversation_set_telemetry(context_dest, send_telemetry, is_retry=True) + try: + db.commit() + except Exception as e: + RNS.log("An error occurred while updating conversation telemetry options: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_conversation_set_telemetry(context_dest, send_telemetry, is_retry=True) def _db_conversation_set_requests(self, context_dest, allow_requests=False, is_retry=False): conv = self._db_conversation(context_dest) @@ -2078,22 +2080,23 @@ class SidebandCore(): data_dict["allow_requests"] = allow_requests packed_dict = msgpack.packb(data_dict) - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set data = ? where dest_context = ?" - data = (packed_dict, context_dest) - dbc.execute(query, data) - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set data = ? where dest_context = ?" + data = (packed_dict, context_dest) + dbc.execute(query, data) + result = dbc.fetchall() - try: - db.commit() - except Exception as e: - RNS.log("An error occurred while updating conversation request options: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_conversation_set_requests(context_dest, allow_requests, is_retry=True) + try: + db.commit() + except Exception as e: + RNS.log("An error occurred while updating conversation request options: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + if not is_retry: + RNS.log("Retrying operation...", RNS.LOG_ERROR) + self._db_conversation_set_requests(context_dest, allow_requests, is_retry=True) def _db_conversation_set_object(self, context_dest, is_object=False): conv = self._db_conversation(context_dest) @@ -2104,22 +2107,23 @@ class SidebandCore(): data_dict["is_object"] = is_object packed_dict = msgpack.packb(data_dict) - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set data = ? where dest_context = ?" - data = (packed_dict, context_dest) - dbc.execute(query, data) - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set data = ? where dest_context = ?" + data = (packed_dict, context_dest) + dbc.execute(query, data) + result = dbc.fetchall() - try: - db.commit() - except Exception as e: - RNS.log("An error occurred while updating conversation object option: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_conversation_set_object(context_dest, is_object, is_retry=True) + try: + db.commit() + except Exception as e: + RNS.log("An error occurred while updating conversation object option: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_conversation_set_object(context_dest, is_object, is_retry=True) def _db_conversation_set_ptt_enabled(self, context_dest, ptt_enabled=False): conv = self._db_conversation(context_dest) @@ -2130,205 +2134,216 @@ class SidebandCore(): data_dict["ptt_enabled"] = ptt_enabled packed_dict = msgpack.packb(data_dict) - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set data = ? where dest_context = ?" - data = (packed_dict, context_dest) - dbc.execute(query, data) - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set data = ? where dest_context = ?" + data = (packed_dict, context_dest) + dbc.execute(query, data) + result = dbc.fetchall() - try: - db.commit() - except Exception as e: - RNS.log("An error occurred while updating conversation PTT option: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_conversation_set_ptt_enabled(context_dest, ptt_enabled, is_retry=True) + try: + db.commit() + except Exception as e: + RNS.log("An error occurred while updating conversation PTT option: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_conversation_set_ptt_enabled(context_dest, ptt_enabled, is_retry=True) def _db_conversation_set_trusted(self, context_dest, trusted): - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set trust = ? where dest_context = ?" - data = (trusted, context_dest) - dbc.execute(query, data) - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set trust = ? where dest_context = ?" + data = (trusted, context_dest) + dbc.execute(query, data) + result = dbc.fetchall() - try: - db.commit() - except Exception as e: - RNS.log("An error occurred while updating conversation trusted option: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_conversation_set_trusted(context_dest, trusted, is_retry=True) + try: + db.commit() + except Exception as e: + RNS.log("An error occurred while updating conversation trusted option: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_conversation_set_trusted(context_dest, trusted, is_retry=True) def _db_conversation_set_name(self, context_dest, name): - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set name=:name_data where dest_context=:ctx;" - dbc.execute(query, {"ctx": context_dest, "name_data": name.encode("utf-8")}) - result = dbc.fetchall() - - try: - db.commit() - except Exception as e: - RNS.log("An error occurred while updating conversation name option: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_conversation_set_name(context_dest, name, is_retry=True) + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set name=:name_data where dest_context=:ctx;" + dbc.execute(query, {"ctx": context_dest, "name_data": name.encode("utf-8")}) + result = dbc.fetchall() + + try: + db.commit() + except Exception as e: + RNS.log("An error occurred while updating conversation name option: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_conversation_set_name(context_dest, name, is_retry=True) def _db_conversations(self, conversations=True, objects=False): - db = self.__db_connect() - dbc = db.cursor() - - dbc.execute("select * from conv") - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + dbc.execute("select * from conv") + result = dbc.fetchall() - if len(result) < 1: - return None - else: - convs = [] - for entry in result: - is_object = False - last_rx = entry[1] - last_tx = entry[2] - last_activity = max(last_rx, last_tx) - data = None - try: - data = msgpack.unpackb(entry[7]) - if "is_object" in data: - is_object = data["is_object"] - except: - pass + if len(result) < 1: + return None + else: + convs = [] + for entry in result: + is_object = False + last_rx = entry[1] + last_tx = entry[2] + last_activity = max(last_rx, last_tx) + data = None + try: + data = msgpack.unpackb(entry[7]) + if "is_object" in data: + is_object = data["is_object"] + except: + pass - conv = { - "dest": entry[0], - "unread": entry[3], - "last_rx": last_rx, - "last_tx": last_tx, - "last_activity": last_activity, - "trust": entry[5], - "data": data, - } - should_add = False - if conversations and not is_object: - should_add = True - if objects and is_object: - should_add = True + conv = { + "dest": entry[0], + "unread": entry[3], + "last_rx": last_rx, + "last_tx": last_tx, + "last_activity": last_activity, + "trust": entry[5], + "data": data, + } + should_add = False + if conversations and not is_object: + should_add = True + if objects and is_object: + should_add = True - if should_add: - convs.append(conv) + if should_add: + convs.append(conv) - return sorted(convs, key=lambda c: c["last_activity"], reverse=True) + return sorted(convs, key=lambda c: c["last_activity"], reverse=True) def _db_announces(self): - db = self.__db_connect() - dbc = db.cursor() - - dbc.execute("select * from announce order by received desc") - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + dbc.execute("select * from announce order by received desc") + result = dbc.fetchall() - if len(result) < 1: - return None - else: - announces = [] - added_dests = [] - for entry in result: - try: - if not entry[2] in added_dests: - announce = { - "dest": entry[2], - "data": entry[3].decode("utf-8"), - "time": entry[1], - "type": entry[4] - } - added_dests.append(entry[2]) - announces.append(announce) - except Exception as e: - RNS.log("Exception while fetching announce from DB: "+str(e), RNS.LOG_ERROR) + if len(result) < 1: + return None + else: + announces = [] + added_dests = [] + for entry in result: + try: + if not entry[2] in added_dests: + announce = { + "dest": entry[2], + "data": entry[3].decode("utf-8"), + "time": entry[1], + "type": entry[4] + } + added_dests.append(entry[2]) + announces.append(announce) + except Exception as e: + RNS.log("Exception while fetching announce from DB: "+str(e), RNS.LOG_ERROR) - announces.reverse() - return announces + announces.reverse() + return announces def _db_conversation(self, context_dest): - db = self.__db_connect() - dbc = db.cursor() - - query = "select * from conv where dest_context=:ctx" - dbc.execute(query, {"ctx": context_dest}) - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "select * from conv where dest_context=:ctx" + dbc.execute(query, {"ctx": context_dest}) + result = dbc.fetchall() - if len(result) < 1: - return None - else: - c = result[0] - conv = {} - conv["dest"] = c[0] - conv["last_tx"] = c[1] - conv["last_rx"] = c[2] - conv["unread"] = c[3] - conv["type"] = c[4] - conv["trust"] = c[5] - conv["name"] = c[6].decode("utf-8") - conv["data"] = msgpack.unpackb(c[7]) - conv["last_activity"] = max(c[1], c[2]) - return conv + if len(result) < 1: + return None + else: + c = result[0] + conv = {} + conv["dest"] = c[0] + conv["last_tx"] = c[1] + conv["last_rx"] = c[2] + conv["unread"] = c[3] + conv["type"] = c[4] + conv["trust"] = c[5] + conv["name"] = c[6].decode("utf-8") + conv["data"] = msgpack.unpackb(c[7]) + conv["last_activity"] = max(c[1], c[2]) + return conv def _db_clear_conversation(self, context_dest): RNS.log("Clearing conversation with "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG) - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - query = "delete from lxm where (dest=:ctx_dst or source=:ctx_dst);" - dbc.execute(query, {"ctx_dst": context_dest}) - db.commit() + query = "delete from lxm where (dest=:ctx_dst or source=:ctx_dst);" + dbc.execute(query, {"ctx_dst": context_dest}) + db.commit() def _db_clear_telemetry(self, context_dest): RNS.log("Clearing telemetry for "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG) - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - query = "delete from telemetry where dest_context=:ctx_dst;" - dbc.execute(query, {"ctx_dst": context_dest}) - db.commit() + query = "delete from telemetry where dest_context=:ctx_dst;" + dbc.execute(query, {"ctx_dst": context_dest}) + db.commit() self.setstate("app.flags.last_telemetry", time.time()) def _db_delete_conversation(self, context_dest): RNS.log("Deleting conversation with "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG) - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - query = "delete from conv where (dest_context=:ctx_dst);" - dbc.execute(query, {"ctx_dst": context_dest}) - db.commit() + query = "delete from conv where (dest_context=:ctx_dst);" + dbc.execute(query, {"ctx_dst": context_dest}) + db.commit() def _db_delete_announce(self, context_dest): RNS.log("Deleting announce with "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG) - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - query = "delete from announce where (source=:ctx_dst);" - dbc.execute(query, {"ctx_dst": context_dest}) - db.commit() + query = "delete from announce where (source=:ctx_dst);" + dbc.execute(query, {"ctx_dst": context_dest}) + db.commit() def _db_create_conversation(self, context_dest, name = None, trust = False): RNS.log("Creating conversation for "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG) - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - def_name = "".encode("utf-8") - query = "INSERT INTO conv (dest_context, last_tx, last_rx, unread, type, trust, name, data) values (?, ?, ?, ?, ?, ?, ?, ?)" - data = (context_dest, 0, time.time(), 0, SidebandCore.CONV_P2P, 0, def_name, msgpack.packb(None)) + def_name = "".encode("utf-8") + query = "INSERT INTO conv (dest_context, last_tx, last_rx, unread, type, trust, name, data) values (?, ?, ?, ?, ?, ?, ?, ?)" + data = (context_dest, 0, time.time(), 0, SidebandCore.CONV_P2P, 0, def_name, msgpack.packb(None)) - dbc.execute(query, data) - db.commit() + dbc.execute(query, data) + db.commit() if trust: self._db_conversation_set_trusted(context_dest, True) @@ -2340,139 +2355,79 @@ class SidebandCore(): def _db_delete_message(self, msg_hash): RNS.log("Deleting message "+RNS.prettyhexrep(msg_hash)) - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - query = "delete from lxm where (lxm_hash=:mhash);" - dbc.execute(query, {"mhash": msg_hash}) - db.commit() + query = "delete from lxm where (lxm_hash=:mhash);" + dbc.execute(query, {"mhash": msg_hash}) + db.commit() def _db_clean_messages(self): RNS.log("Purging stale messages... "+str(self.db_path)) - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - query = "delete from lxm where (state=:outbound_state or state=:sending_state);" - dbc.execute(query, {"outbound_state": LXMF.LXMessage.OUTBOUND, "sending_state": LXMF.LXMessage.SENDING}) - db.commit() + query = "delete from lxm where (state=:outbound_state or state=:sending_state);" + dbc.execute(query, {"outbound_state": LXMF.LXMessage.OUTBOUND, "sending_state": LXMF.LXMessage.SENDING}) + db.commit() def _db_message_set_state(self, lxm_hash, state, is_retry=False): - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE lxm set state = ? where lxm_hash = ?" - data = (state, lxm_hash) - dbc.execute(query, data) + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE lxm set state = ? where lxm_hash = ?" + data = (state, lxm_hash) + dbc.execute(query, data) - try: - db.commit() - result = dbc.fetchall() - except Exception as e: - RNS.log("An error occurred while updating message state: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_message_set_state(lxm_hash, state, is_retry=True) + try: + db.commit() + result = dbc.fetchall() + except Exception as e: + RNS.log("An error occurred while updating message state: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_message_set_state(lxm_hash, state, is_retry=True) def _db_message_set_method(self, lxm_hash, method): - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE lxm set method = ? where lxm_hash = ?" - data = (method, lxm_hash) - dbc.execute(query, data) + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE lxm set method = ? where lxm_hash = ?" + data = (method, lxm_hash) + dbc.execute(query, data) - try: - db.commit() - result = dbc.fetchall() - except Exception as e: - RNS.log("An error occurred while updating message method: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_message_set_method(lxm_hash, method, is_retry=True) + try: + db.commit() + result = dbc.fetchall() + except Exception as e: + RNS.log("An error occurred while updating message method: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_message_set_method(lxm_hash, method, is_retry=True) def message(self, msg_hash): return self._db_message(msg_hash) def _db_message(self, msg_hash): - db = self.__db_connect() - dbc = db.cursor() - - query = "select * from lxm where lxm_hash=:mhash" - dbc.execute(query, {"mhash": msg_hash}) - result = dbc.fetchall() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "select * from lxm where lxm_hash=:mhash" + dbc.execute(query, {"mhash": msg_hash}) + result = dbc.fetchall() - if len(result) < 1: - return None - else: - entry = result[0] - - lxm_method = entry[7] - if lxm_method == LXMF.LXMessage.PAPER: - lxm_data = msgpack.unpackb(entry[10]) - packed_lxm = lxm_data[0] - paper_packed_lxm = lxm_data[1] + if len(result) < 1: + return None else: - packed_lxm = entry[10] + entry = result[0] - lxm = LXMF.LXMessage.unpack_from_bytes(packed_lxm, original_method = lxm_method) - - if lxm.desired_method == LXMF.LXMessage.PAPER: - lxm.paper_packed = paper_packed_lxm - - message = { - "hash": lxm.hash, - "dest": lxm.destination_hash, - "source": lxm.source_hash, - "title": lxm.title, - "content": lxm.content, - "received": entry[5], - "sent": lxm.timestamp, - "state": entry[6], - "method": entry[7], - "lxm": lxm - } - return message - - def _db_message_count(self, context_dest): - db = self.__db_connect() - dbc = db.cursor() - - query = "select count(*) from lxm where dest=:context_dest or source=:context_dest" - dbc.execute(query, {"context_dest": context_dest}) - - result = dbc.fetchall() - - if len(result) < 1: - return None - else: - return result[0][0] - - def _db_messages(self, context_dest, after = None, before = None, limit = None): - db = self.__db_connect() - dbc = db.cursor() - - if after != None and before == None: - query = "select * from lxm where (dest=:context_dest or source=:context_dest) and rx_ts>:after_ts" - dbc.execute(query, {"context_dest": context_dest, "after_ts": after}) - elif after == None and before != None: - query = "select * from lxm where (dest=:context_dest or source=:context_dest) and rx_ts<:before_ts" - dbc.execute(query, {"context_dest": context_dest, "before_ts": before}) - elif after != None and before != None: - query = "select * from lxm where (dest=:context_dest or source=:context_dest) and rx_ts<:before_ts and rx_ts>:after_ts" - dbc.execute(query, {"context_dest": context_dest, "before_ts": before, "after_ts": after}) - else: - query = "select * from lxm where dest=:context_dest or source=:context_dest" - dbc.execute(query, {"context_dest": context_dest}) - - result = dbc.fetchall() - - if len(result) < 1: - return None - else: - messages = [] - for entry in result: lxm_method = entry[7] if lxm_method == LXMF.LXMessage.PAPER: lxm_data = msgpack.unpackb(entry[10]) @@ -2482,16 +2437,10 @@ class SidebandCore(): packed_lxm = entry[10] lxm = LXMF.LXMessage.unpack_from_bytes(packed_lxm, original_method = lxm_method) - + if lxm.desired_method == LXMF.LXMessage.PAPER: lxm.paper_packed = paper_packed_lxm - extras = None - try: - extras = msgpack.unpackb(entry[11]) - except: - pass - message = { "hash": lxm.hash, "dest": lxm.destination_hash, @@ -2502,14 +2451,87 @@ class SidebandCore(): "sent": lxm.timestamp, "state": entry[6], "method": entry[7], - "lxm": lxm, - "extras": extras, + "lxm": lxm } + return message - messages.append(message) - if len(messages) > limit: - messages = messages[-limit:] - return messages + def _db_message_count(self, context_dest): + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + query = "select count(*) from lxm where dest=:context_dest or source=:context_dest" + dbc.execute(query, {"context_dest": context_dest}) + + result = dbc.fetchall() + + if len(result) < 1: + return None + else: + return result[0][0] + + def _db_messages(self, context_dest, after = None, before = None, limit = None): + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() + + if after != None and before == None: + query = "select * from lxm where (dest=:context_dest or source=:context_dest) and rx_ts>:after_ts" + dbc.execute(query, {"context_dest": context_dest, "after_ts": after}) + elif after == None and before != None: + query = "select * from lxm where (dest=:context_dest or source=:context_dest) and rx_ts<:before_ts" + dbc.execute(query, {"context_dest": context_dest, "before_ts": before}) + elif after != None and before != None: + query = "select * from lxm where (dest=:context_dest or source=:context_dest) and rx_ts<:before_ts and rx_ts>:after_ts" + dbc.execute(query, {"context_dest": context_dest, "before_ts": before, "after_ts": after}) + else: + query = "select * from lxm where dest=:context_dest or source=:context_dest" + dbc.execute(query, {"context_dest": context_dest}) + + result = dbc.fetchall() + + if len(result) < 1: + return None + else: + messages = [] + for entry in result: + lxm_method = entry[7] + if lxm_method == LXMF.LXMessage.PAPER: + lxm_data = msgpack.unpackb(entry[10]) + packed_lxm = lxm_data[0] + paper_packed_lxm = lxm_data[1] + else: + packed_lxm = entry[10] + + lxm = LXMF.LXMessage.unpack_from_bytes(packed_lxm, original_method = lxm_method) + + if lxm.desired_method == LXMF.LXMessage.PAPER: + lxm.paper_packed = paper_packed_lxm + + extras = None + try: + extras = msgpack.unpackb(entry[11]) + except: + pass + + message = { + "hash": lxm.hash, + "dest": lxm.destination_hash, + "source": lxm.source_hash, + "title": lxm.title, + "content": lxm.content, + "received": entry[5], + "sent": lxm.timestamp, + "state": entry[6], + "method": entry[7], + "lxm": lxm, + "extras": extras, + } + + messages.append(message) + if len(messages) > limit: + messages = messages[-limit:] + return messages def _db_save_lxm(self, lxm, context_dest, originator = False, own_command = False, is_retry = False): state = lxm.state @@ -2551,83 +2573,85 @@ class SidebandCore(): RNS.log("Received telemetry stream field with no data: "+str(lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]), RNS.LOG_DEBUG) if own_command or len(lxm.content) != 0 or len(lxm.title) != 0: - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - if not lxm.packed: - lxm.pack() + if not lxm.packed: + lxm.pack() - if lxm.method == LXMF.LXMessage.PAPER: - packed_lxm = msgpack.packb([lxm.packed, lxm.paper_packed]) - else: - packed_lxm = lxm.packed + if lxm.method == LXMF.LXMessage.PAPER: + packed_lxm = msgpack.packb([lxm.packed, lxm.paper_packed]) + else: + packed_lxm = lxm.packed - extras = {} - if lxm.rssi or lxm.snr or lxm.q: - extras["rssi"] = lxm.rssi - extras["snr"] = lxm.snr - extras["q"] = lxm.q + extras = {} + if lxm.rssi or lxm.snr or lxm.q: + extras["rssi"] = lxm.rssi + extras["snr"] = lxm.snr + extras["q"] = lxm.q - if packed_telemetry != None: - extras["packed_telemetry"] = packed_telemetry + if packed_telemetry != None: + extras["packed_telemetry"] = packed_telemetry - extras = msgpack.packb(extras) + extras = msgpack.packb(extras) - query = "INSERT INTO lxm (lxm_hash, dest, source, title, tx_ts, rx_ts, state, method, t_encrypted, t_encryption, data, extra) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" - data = ( - lxm.hash, - lxm.destination_hash, - lxm.source_hash, - lxm.title, - lxm.timestamp, - time.time(), - state, - lxm.method, - lxm.transport_encrypted, - lxm.transport_encryption, - packed_lxm, - extras - ) + query = "INSERT INTO lxm (lxm_hash, dest, source, title, tx_ts, rx_ts, state, method, t_encrypted, t_encryption, data, extra) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + data = ( + lxm.hash, + lxm.destination_hash, + lxm.source_hash, + lxm.title, + lxm.timestamp, + time.time(), + state, + lxm.method, + lxm.transport_encrypted, + lxm.transport_encryption, + packed_lxm, + extras + ) - dbc.execute(query, data) + dbc.execute(query, data) - try: - db.commit() - except Exception as e: - RNS.log("An error occurred while saving message to database: "+str(e), RNS.LOG_ERROR) - self.__db_reconnect() - if not is_retry: - RNS.log("Retrying operation...", RNS.LOG_ERROR) - self._db_save_lxm(lxm, context_dest, originator = originator, own_command = own_command, is_retry = True) - return + try: + db.commit() + except Exception as e: + RNS.log("An error occurred while saving message to database: "+str(e), RNS.LOG_ERROR) + self.__db_reconnect() + # if not is_retry: + # RNS.log("Retrying operation...", RNS.LOG_ERROR) + # self._db_save_lxm(lxm, context_dest, originator = originator, own_command = own_command, is_retry = True) + # return self.__event_conversation_changed(context_dest) def _db_save_announce(self, destination_hash, app_data, dest_type="lxmf.delivery"): - db = self.__db_connect() - dbc = db.cursor() + with self.db_lock: + db = self.__db_connect() + dbc = db.cursor() - query = "delete from announce where id is NULL or id not in (select id from announce order by received desc limit "+str(self.MAX_ANNOUNCES)+")" - dbc.execute(query) + query = "delete from announce where id is NULL or id not in (select id from announce order by received desc limit "+str(self.MAX_ANNOUNCES)+")" + dbc.execute(query) - query = "delete from announce where (source=:source);" - dbc.execute(query, {"source": destination_hash}) + query = "delete from announce where (source=:source);" + dbc.execute(query, {"source": destination_hash}) - now = time.time() - hash_material = str(time).encode("utf-8")+destination_hash+app_data+dest_type.encode("utf-8") - announce_hash = RNS.Identity.full_hash(hash_material) + now = time.time() + hash_material = str(time).encode("utf-8")+destination_hash+app_data+dest_type.encode("utf-8") + announce_hash = RNS.Identity.full_hash(hash_material) - query = "INSERT INTO announce (id, received, source, data, dest_type) values (?, ?, ?, ?, ?)" - data = ( - announce_hash, - now, - destination_hash, - app_data, - dest_type, - ) + query = "INSERT INTO announce (id, received, source, data, dest_type) values (?, ?, ?, ?, ?)" + data = ( + announce_hash, + now, + destination_hash, + app_data, + dest_type, + ) - dbc.execute(query, data) - db.commit() + dbc.execute(query, data) + db.commit() def lxmf_announce(self, attached_interface=None): if self.is_standalone or self.is_service: @@ -3895,6 +3919,7 @@ class SidebandCore(): if not originator and LXMF.FIELD_AUDIO in message.fields and ptt_enabled: self.ptt_event(message) + should_notify = False if self.is_client: should_notify = False @@ -3922,8 +3947,9 @@ class SidebandCore(): RNS.log("Could not post notification for received message: "+str(e), RNS.LOG_ERROR) def ptt_playback(self, message): - while hasattr(self, "msg_sound") and self.msg_sound != None and self.msg_sound.playing(): - RNS.log("Waiting for playback to stop") + ptt_timeout = 60 + event_time = time.time() + while hasattr(self, "msg_sound") and self.msg_sound != None and self.msg_sound.playing() and time.time() < event_time+ptt_timeout: time.sleep(0.1) time.sleep(0.5) @@ -3979,7 +4005,6 @@ class SidebandCore(): if self.msg_sound != None: RNS.log("Starting playback", RNS.LOG_DEBUG) self.msg_sound.play() - should_notify = False else: RNS.log("Playback was requested, but no audio data was loaded for playback", RNS.LOG_ERROR) @@ -3990,32 +4015,22 @@ class SidebandCore(): def ptt_event(self, message): def ptt_job(): try: - # TODO: Remove logs - RNS.log("Taking lock") self.ptt_playback_lock.acquire() while self.ui_recording: - RNS.log("Waiting for UI recording to finish") time.sleep(0.5) - - RNS.log("Starting playback") self.ptt_playback(message) except Exception as e: RNS.log("Error while starting playback for PTT-enabled conversation: "+str(e), RNS.LOG_ERROR) finally: - RNS.log("Releasing lock") self.ptt_playback_lock.release() threading.Thread(target=ptt_job, daemon=True).start() def ui_started_recording(self): - # TODO: Remove - RNS.log("Indicating recording started") self.ui_recording = True self.service_rpc_set_ui_recording(True) def ui_stopped_recording(self): - # TODO: Remove - RNS.log("Indicating recording stopped") self.ui_recording = False self.service_rpc_set_ui_recording(False)