Added thread lock to service RPC calls

This commit is contained in:
Mark Qvist 2024-10-01 16:22:47 +02:00
parent 6b97fd8e4b
commit 2cf7d8ad84
1 changed files with 31 additions and 68 deletions

View File

@ -153,6 +153,7 @@ class SidebandCore():
self.state_lock = Lock()
self.message_router = None
self.rpc_connection = None
self.rpc_lock = Lock()
self.service_stopped = False
self.service_context = service_context
self.owner_service = owner_service
@ -1223,13 +1224,8 @@ class SidebandCore():
else:
if self.is_client:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"request_latest_telemetry": {"from_addr": from_addr}})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"request_latest_telemetry": {"from_addr": from_addr}})
except Exception as e:
RNS.log("Error while requesting latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)
@ -1302,17 +1298,12 @@ class SidebandCore():
else:
if self.is_client:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"send_latest_telemetry": {
return self.service_rpc_request({"send_latest_telemetry": {
"to_addr": to_addr,
"stream": stream,
"is_authorized_telemetry_request": is_authorized_telemetry_request}
})
response = self.rpc_connection.recv()
return response
except Exception as e:
RNS.log("Error while sending latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)
@ -1520,11 +1511,7 @@ class SidebandCore():
return True
else:
def set():
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"setstate": (prop, val)})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"setstate": (prop, val)})
try:
set()
@ -1546,11 +1533,7 @@ class SidebandCore():
return True
else:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"latest_telemetry": (latest_telemetry, latest_packed_telemetry)})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"latest_telemetry": (latest_telemetry, latest_packed_telemetry)})
except Exception as e:
RNS.log("Error while setting telemetry over RPC: "+str(e), RNS.LOG_DEBUG)
return False
@ -1567,11 +1550,7 @@ class SidebandCore():
return True
else:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"set_debug": debug})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"set_debug": debug})
except Exception as e:
RNS.log("Error while setting log level over RPC: "+str(e), RNS.LOG_DEBUG)
return False
@ -1585,15 +1564,25 @@ class SidebandCore():
return True
else:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"set_ui_recording": recording})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"set_ui_recording": recording})
except Exception as e:
RNS.log("Error while setting UI recording status over RPC: "+str(e), RNS.LOG_DEBUG)
return False
def service_rpc_request(self, request):
# RNS.log("Running service RPC call: "+str(request), RNS.LOG_DEBUG)
try:
with self.rpc_lock:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send(request)
response = self.rpc_connection.recv()
return response
except Exception as e:
RNS.log(f"An error occurred while executing the service RPC request: {request}", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
def getstate(self, prop, allow_cache=False):
with self.state_lock:
if not self.service_stopped:
@ -1611,11 +1600,7 @@ class SidebandCore():
return None
else:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"getstate": prop})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"getstate": prop})
except Exception as e:
RNS.log("Error while retrieving state "+str(prop)+" over RPC: "+str(e), RNS.LOG_DEBUG)
@ -1650,11 +1635,7 @@ class SidebandCore():
return self._get_plugins_info()
else:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"get_plugins_info": True})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"get_plugins_info": True})
except Exception as e:
ed = "Error while getting plugins info over RPC: "+str(e)
RNS.log(ed, RNS.LOG_DEBUG)
@ -1689,11 +1670,7 @@ class SidebandCore():
return self._get_destination_establishment_rate(destination_hash)
else:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"get_destination_establishment_rate": destination_hash})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"get_destination_establishment_rate": destination_hash})
except Exception as e:
ed = "Error while getting destination link etablishment rate over RPC: "+str(e)
RNS.log(ed, RNS.LOG_DEBUG)
@ -1782,6 +1759,7 @@ class SidebandCore():
except Exception as e:
RNS.log("Error on client RPC connection: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
try:
connection.close()
except:
@ -4001,12 +3979,7 @@ class SidebandCore():
else:
if self.is_client:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"get_lxm_progress": {"lxm_hash": lxm_hash}})
response = self.rpc_connection.recv()
return response
return self.service_rpc_request({"get_lxm_progress": {"lxm_hash": lxm_hash}})
except Exception as e:
RNS.log("Error while getting LXM progress over RPC: "+str(e), RNS.LOG_DEBUG)
@ -4040,10 +4013,7 @@ class SidebandCore():
else:
if self.is_client:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"send_message": {
return self.service_rpc_request({"send_message": {
"content": content,
"destination_hash": destination_hash,
"propagation": propagation,
@ -4053,8 +4023,6 @@ class SidebandCore():
"image": image,
"audio": audio}
})
response = self.rpc_connection.recv()
return response
except Exception as e:
RNS.log("Error while sending message over RPC: "+str(e), RNS.LOG_DEBUG)
@ -4069,17 +4037,12 @@ class SidebandCore():
else:
if self.is_client:
try:
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"send_command": {
return self.service_rpc_request({"send_command": {
"content": content,
"destination_hash": destination_hash,
"propagation": propagation}
})
response = self.rpc_connection.recv()
return response
except Exception as e:
RNS.log("Error while sending command over RPC: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)