pyhOn/pyhon/connection/api.py

157 lines
5.9 KiB
Python
Raw Normal View History

2023-02-12 17:41:38 -07:00
import json
import logging
from datetime import datetime
2023-02-12 19:36:09 -07:00
from pyhon import const
2023-04-09 10:13:50 -06:00
from pyhon.appliance import HonAppliance
2023-04-09 12:50:28 -06:00
from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler
2023-02-12 17:41:38 -07:00
_LOGGER = logging.getLogger()
2023-04-09 10:13:50 -06:00
class HonAPI:
2023-04-09 22:34:19 -06:00
def __init__(self, email="", password="", anonymous=False, session=None) -> None:
2023-02-12 17:41:38 -07:00
super().__init__()
self._email = email
self._password = password
2023-04-09 15:47:33 -06:00
self._anonymous = anonymous
2023-04-09 10:13:50 -06:00
self._hon = None
2023-04-09 15:47:33 -06:00
self._hon_anonymous = None
2023-04-09 22:34:19 -06:00
self._session = session
2023-02-12 17:41:38 -07:00
async def __aenter__(self):
2023-04-09 12:50:28 -06:00
return await self.create()
2023-02-12 17:41:38 -07:00
async def __aexit__(self, exc_type, exc_val, exc_tb):
2023-04-09 22:34:19 -06:00
await self.close()
2023-02-12 17:41:38 -07:00
2023-04-09 12:50:28 -06:00
async def create(self):
2023-04-09 22:34:19 -06:00
self._hon_anonymous = await HonAnonymousConnectionHandler(
self._session
).create()
2023-04-09 15:47:33 -06:00
if not self._anonymous:
2023-04-09 22:34:19 -06:00
self._hon = await HonConnectionHandler(
self._email, self._password, self._session
).create()
2023-04-09 12:50:28 -06:00
return self
2023-02-12 17:41:38 -07:00
2023-04-09 12:50:28 -06:00
async def load_appliances(self):
2023-04-09 10:13:50 -06:00
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
2023-04-09 12:50:28 -06:00
return await resp.json()
async def load_commands(self, appliance: HonAppliance):
2023-02-12 17:41:38 -07:00
params = {
2023-04-09 12:50:28 -06:00
"applianceType": appliance.appliance_type,
"code": appliance.info["code"],
"applianceModelId": appliance.appliance_model_id,
"firmwareId": appliance.info["eepromId"],
"macAddress": appliance.mac_address,
"fwVersion": appliance.info["fwVersion"],
2023-02-12 17:41:38 -07:00
"os": const.OS,
"appVersion": const.APP_VERSION,
2023-04-09 12:50:28 -06:00
"series": appliance.info["series"],
2023-02-12 17:41:38 -07:00
}
url = f"{const.API_URL}/commands/v1/retrieve"
2023-04-09 10:13:50 -06:00
async with self._hon.get(url, params=params) as response:
2023-02-12 17:41:38 -07:00
result = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0":
return {}
return result
2023-04-09 12:50:28 -06:00
async def command_history(self, appliance: HonAppliance):
url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
2023-04-09 10:13:50 -06:00
async with self._hon.get(url) as response:
2023-03-10 18:31:56 -07:00
result = await response.json()
if not result or not result.get("payload"):
return {}
return result["payload"]["history"]
2023-04-09 12:50:28 -06:00
async def last_activity(self, appliance: HonAppliance):
2023-03-18 18:08:54 -06:00
url = f"{const.API_URL}/commands/v1/retrieve-last-activity"
2023-04-09 12:50:28 -06:00
params = {"macAddress": appliance.mac_address}
2023-04-09 10:13:50 -06:00
async with self._hon.get(url, params=params) as response:
2023-03-18 18:08:54 -06:00
result = await response.json()
if result and (activity := result.get("attributes")):
return activity
return {}
2023-04-09 12:50:28 -06:00
async def load_attributes(self, appliance: HonAppliance):
2023-02-12 17:41:38 -07:00
params = {
2023-04-09 12:50:28 -06:00
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
2023-04-09 12:55:36 -06:00
"category": "CYCLE",
2023-02-12 17:41:38 -07:00
}
url = f"{const.API_URL}/commands/v1/context"
2023-04-09 10:13:50 -06:00
async with self._hon.get(url, params=params) as response:
2023-02-12 17:41:38 -07:00
return (await response.json()).get("payload", {})
2023-04-09 12:50:28 -06:00
async def load_statistics(self, appliance: HonAppliance):
2023-02-12 17:41:38 -07:00
params = {
2023-04-09 12:50:28 -06:00
"macAddress": appliance.mac_address,
2023-04-09 12:55:36 -06:00
"applianceType": appliance.appliance_type,
2023-02-12 17:41:38 -07:00
}
url = f"{const.API_URL}/commands/v1/statistics"
2023-04-09 10:13:50 -06:00
async with self._hon.get(url, params=params) as response:
2023-02-12 17:41:38 -07:00
return (await response.json()).get("payload", {})
2023-04-09 12:50:28 -06:00
async def send_command(self, appliance, command, parameters, ancillary_parameters):
2023-02-12 17:41:38 -07:00
now = datetime.utcnow().isoformat()
data = {
2023-04-09 12:50:28 -06:00
"macAddress": appliance.mac_address,
2023-02-12 17:41:38 -07:00
"timestamp": f"{now[:-3]}Z",
"commandName": command,
2023-04-09 12:50:28 -06:00
"transactionId": f"{appliance.mac_address}_{now[:-3]}Z",
"applianceOptions": appliance.commands_options,
"appliance": self._hon.device.get(),
2023-02-12 17:41:38 -07:00
"attributes": {
"channel": "mobileApp",
"origin": "standardProgram",
2023-04-09 12:55:36 -06:00
"energyLabel": "0",
2023-02-12 17:41:38 -07:00
},
"ancillaryParameters": ancillary_parameters,
"parameters": parameters,
2023-04-09 12:55:36 -06:00
"applianceType": appliance.appliance_type,
2023-02-12 17:41:38 -07:00
}
url = f"{const.API_URL}/commands/v1/send"
2023-04-09 10:13:50 -06:00
async with self._hon.post(url, json=data) as resp:
2023-04-09 12:50:28 -06:00
json_data = await resp.json()
if json_data.get("payload", {}).get("resultCode") == "0":
2023-02-12 17:41:38 -07:00
return True
return False
2023-04-09 10:13:50 -06:00
async def appliance_configuration(self):
url = f"{const.API_URL}/config/v1/appliance-configuration"
async with self._hon_anonymous.get(url) as response:
result = await response.json()
if result and (data := result.get("payload")):
return data
return {}
async def app_config(self, language="en", beta=True):
url = f"{const.API_URL}/app-config"
payload = {
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
2023-04-09 12:55:36 -06:00
"os": const.OS,
2023-04-09 10:13:50 -06:00
}
2023-04-09 12:55:36 -06:00
payload = json.dumps(payload, separators=(",", ":"))
2023-04-09 10:13:50 -06:00
async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")):
return data
return {}
async def translation_keys(self, language="en"):
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response:
if result := await response.json():
return result
return {}
2023-04-09 12:50:28 -06:00
async def close(self):
2023-04-09 22:34:19 -06:00
if self._hon:
await self._hon.close()
if self._hon_anonymous:
await self._hon_anonymous.close()