"""Shark IQ Wrapper.""" import logging from typing import Dict, Optional from sharkiqpy import OperatingModes, PowerModes, Properties, SharkIqVacuum from homeassistant.components.vacuum import ( STATE_CLEANING, STATE_DOCKED, STATE_IDLE, STATE_PAUSED, STATE_RETURNING, SUPPORT_BATTERY, SUPPORT_FAN_SPEED, SUPPORT_LOCATE, SUPPORT_PAUSE, SUPPORT_RETURN_HOME, SUPPORT_START, SUPPORT_STATE, SUPPORT_STATUS, SUPPORT_STOP, StateVacuumEntity, ) from .const import DOMAIN, SHARK from .update_coordinator import SharkIqUpdateCoordinator LOGGER = logging.getLogger(__name__) # Supported features SUPPORT_SHARKIQ = ( SUPPORT_BATTERY | SUPPORT_FAN_SPEED | SUPPORT_PAUSE | SUPPORT_RETURN_HOME | SUPPORT_START | SUPPORT_STATE | SUPPORT_STATUS | SUPPORT_STOP | SUPPORT_LOCATE ) OPERATING_STATE_MAP = { OperatingModes.PAUSE: STATE_PAUSED, OperatingModes.START: STATE_CLEANING, OperatingModes.STOP: STATE_IDLE, OperatingModes.RETURN: STATE_RETURNING, } FAN_SPEEDS_MAP = { "Eco": PowerModes.ECO, "Normal": PowerModes.NORMAL, "Max": PowerModes.MAX, } STATE_RECHARGING_TO_RESUME = "recharging_to_resume" # Attributes to expose ATTR_ERROR_CODE = "last_error_code" ATTR_ERROR_MSG = "last_error_message" ATTR_LOW_LIGHT = "low_light" ATTR_RECHARGE_RESUME = "recharge_and_resume" ATTR_RSSI = "rssi" class SharkVacuumEntity(StateVacuumEntity): """Shark IQ vacuum entity.""" def __init__(self, sharkiq: SharkIqVacuum, coordinator: SharkIqUpdateCoordinator): """Create a new SharkVacuumEntity.""" if sharkiq.serial_number not in coordinator.shark_vacs: raise RuntimeError( f"Shark IQ robot {sharkiq.serial_number} is not known to the coordinator" ) self.coordinator = coordinator self.sharkiq = sharkiq @property def should_poll(self): """Don't poll this entity. Polling is done via the coordinator.""" return False def clean_spot(self, **kwargs): """Clean a spot. Not yet implemented.""" raise NotImplementedError() def send_command(self, command, params=None, **kwargs): """Send a command to the vacuum. Not yet implemented.""" raise NotImplementedError() @property def is_online(self) -> bool: """Tell us if the device is online.""" return self.coordinator.is_online(self.sharkiq.serial_number) @property def name(self) -> str: """Device name.""" return self.sharkiq.name @property def serial_number(self) -> str: """Vacuum API serial number (DSN).""" return self.sharkiq.serial_number @property def model(self) -> str: """Vacuum model number.""" if self.sharkiq.vac_model_number: return self.sharkiq.vac_model_number return self.sharkiq.oem_model_number @property def device_info(self) -> Dict: """Device info dictionary.""" return { "identifiers": {(DOMAIN, self.serial_number)}, "name": self.name, "manufacturer": SHARK, "model": self.model, "sw_version": self.sharkiq.get_property_value( Properties.ROBOT_FIRMWARE_VERSION ), } @property def supported_features(self) -> int: """Flag vacuum cleaner robot features that are supported.""" return SUPPORT_SHARKIQ @property def is_docked(self) -> Optional[bool]: """Is vacuum docked.""" return self.sharkiq.get_property_value(Properties.DOCKED_STATUS) @property def error_code(self) -> Optional[int]: """Return the last observed error code (or None).""" return self.sharkiq.error_code @property def error_message(self) -> Optional[str]: """Return the last observed error message (or None).""" if not self.error_code: return None return self.sharkiq.error_code @property def operating_mode(self) -> Optional[str]: """Operating mode..""" op_mode = self.sharkiq.get_property_value(Properties.OPERATING_MODE) return OPERATING_STATE_MAP.get(op_mode) @property def recharging_to_resume(self) -> Optional[int]: """Return True if vacuum set to recharge and resume cleaning.""" return self.sharkiq.get_property_value(Properties.RECHARGING_TO_RESUME) @property def state(self): """ Get the current vacuum state. NB: Currently, we do not return an error state because they can be very, very stale. In the app, these are (usually) handled by showing the robot as stopped and sending the user a notification. """ if self.recharging_to_resume: return STATE_RECHARGING_TO_RESUME if self.is_docked: return STATE_DOCKED return self.operating_mode @property def unique_id(self) -> str: """Return the unique id of the vacuum cleaner.""" return f"sharkiq-{self.serial_number:s}-vacuum" @property def available(self) -> bool: """Determine if the sensor is available based on API results.""" # If the last update was successful... if self.coordinator.last_update_success and self.is_online: return True # Otherwise, we are not. return False @property def battery_level(self): """Get the current battery level.""" return self.sharkiq.get_property_value(Properties.BATTERY_CAPACITY) async def async_update(self): """Update the known properties asynchronously.""" await self.coordinator.async_request_refresh() async def async_added_to_hass(self) -> None: """Connect to dispatcher listening for entity data notifications.""" self.async_on_remove( self.coordinator.async_add_listener(self.async_write_ha_state) ) async def async_return_to_base(self, **kwargs): """Have the device return to base.""" await self.sharkiq.async_set_operating_mode(OperatingModes.RETURN) await self.coordinator.async_refresh() async def async_pause(self): """Pause the cleaning task.""" await self.sharkiq.async_set_operating_mode(OperatingModes.PAUSE) await self.coordinator.async_refresh() async def async_start(self): """Start the device.""" await self.sharkiq.async_set_operating_mode(OperatingModes.START) await self.coordinator.async_refresh() async def async_stop(self, **kwargs): """Stop the device.""" await self.sharkiq.async_set_operating_mode(OperatingModes.STOP) await self.coordinator.async_refresh() async def async_locate(self, **kwargs): """Cause the device to generate a loud chirp.""" await self.sharkiq.async_find_device() @property def fan_speed(self) -> str: """Return the current fan speed.""" fan_speed = None speed_level = self.sharkiq.get_property_value(Properties.POWER_MODE) for k, val in FAN_SPEEDS_MAP.items(): if val == speed_level: fan_speed = k return fan_speed async def async_set_fan_speed(self, fan_speed: str, **kwargs): """Set the fan speed.""" await self.sharkiq.async_set_property_value( Properties.POWER_MODE, FAN_SPEEDS_MAP.get(fan_speed.capitalize()) ) await self.coordinator.async_refresh() @property def fan_speed_list(self): """Get the list of available fan speed steps of the vacuum cleaner.""" return list(FAN_SPEEDS_MAP.keys()) # Various attributes we want to expose @property def recharge_resume(self) -> Optional[bool]: """Recharge and resume mode active.""" return self.sharkiq.get_property_value(Properties.RECHARGE_RESUME) @property def rssi(self) -> Optional[int]: """Get the WiFi RSSI.""" return self.sharkiq.get_property_value(Properties.RSSI) @property def low_light(self): """Let us know if the robot is operating in low-light mode.""" return self.sharkiq.get_property_value(Properties.LOW_LIGHT_MISSION) @property def shark_state_attributes(self) -> Dict: """Return a dictionary of device state attributes specific to sharkiq.""" data = { ATTR_ERROR_CODE: self.error_code, ATTR_ERROR_MSG: self.sharkiq.error_text, ATTR_LOW_LIGHT: self.low_light, ATTR_RECHARGE_RESUME: self.recharge_resume, ATTR_RSSI: self.rssi, } return data @property def state_attributes(self) -> Dict: """Return a dictionary of device state attributes.""" return dict(**super().state_attributes, **self.shark_state_attributes)