147 lines
6.1 KiB
Python
147 lines
6.1 KiB
Python
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
import yaml
|
|
from dateparser import parse
|
|
from sqlalchemy import create_engine, func
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from lib.dawarich import send_to_dawarich
|
|
from lib.models import HALocations, Base
|
|
from lib.strings import mysql_trim_float
|
|
|
|
logging.basicConfig()
|
|
_logger = logging.getLogger('MAIN')
|
|
|
|
CONFIG_DATA = {}
|
|
|
|
|
|
def get_battery_level(entity_id, timestamp):
|
|
battery_entity_id = f"sensor.{entity_id.split('.')[1]}_battery_level"
|
|
start_time = timestamp - timedelta(minutes=1)
|
|
|
|
response = requests.get(
|
|
f"{CONFIG_DATA['ha_url']}/api/history/period/{start_time.isoformat()}?filter_entity_id={battery_entity_id}",
|
|
headers={"Authorization": f"Bearer {CONFIG_DATA['access_token']}"}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
battery_history = response.json()[0]
|
|
if battery_history:
|
|
for entry in reversed(battery_history):
|
|
entry_timestamp = parse(entry["last_changed"])
|
|
if entry_timestamp <= timestamp:
|
|
state = entry.get("state")
|
|
return float(state) if state is not None else None
|
|
_logger.warning(f'No battery level data found for entity "{battery_entity_id}" at or before timestamp {timestamp}')
|
|
return None
|
|
else:
|
|
_logger.warning(f'No battery level data found for entity "{battery_entity_id}" at or before timestamp {timestamp}')
|
|
return None
|
|
else:
|
|
_logger.error(f'Failed to fetch battery level for entity "{battery_entity_id}" at or before timestamp {timestamp}: {response.status_code} -- {response.text}')
|
|
return None
|
|
|
|
|
|
def main():
|
|
global CONFIG_DATA
|
|
_logger.setLevel(logging.DEBUG)
|
|
|
|
config_path = Path(os.path.dirname(os.path.realpath(__file__)), 'config.yml')
|
|
if not config_path.exists():
|
|
_logger.critical(f'Config file not found: {config_path}')
|
|
sys.exit(1)
|
|
|
|
with open(config_path) as stream:
|
|
try:
|
|
CONFIG_DATA = yaml.safe_load(stream)
|
|
except yaml.YAMLError as e:
|
|
_logger.critical(f'Failed to load config file: {e}')
|
|
sys.exit(1)
|
|
|
|
_logger.info('Initalizing database...')
|
|
engine = create_engine(f'mysql://{CONFIG_DATA["database"]["username"]}:{CONFIG_DATA["database"]["password"]}@{CONFIG_DATA["database"]["host"]}{":" + str(CONFIG_DATA["database"]["port"]) if CONFIG_DATA["database"].get("port") else ""}/{CONFIG_DATA["database"]["database"]}')
|
|
Session = sessionmaker(bind=engine)
|
|
Base.metadata.create_all(engine)
|
|
|
|
while True:
|
|
end_time = datetime.now()
|
|
start_time = end_time - timedelta(hours=24)
|
|
|
|
for entity_id in CONFIG_DATA['entities']:
|
|
_logger.info(f'Fetching entity "{entity_id}"')
|
|
response = requests.get(
|
|
f"{CONFIG_DATA['ha_url']}/api/history/period/{start_time.isoformat()}?filter_entity_id={entity_id}",
|
|
headers={"Authorization": f"Bearer {CONFIG_DATA['access_token']}"},
|
|
)
|
|
|
|
new_rows = 0
|
|
skipped_rows = 0
|
|
|
|
if response.status_code == 200:
|
|
location_history = response.json()[0]
|
|
session = Session()
|
|
|
|
for entry in location_history:
|
|
timestamp = parse(entry["last_changed"])
|
|
state = entry["state"]
|
|
attributes = entry["attributes"]
|
|
latitude = mysql_trim_float(attributes.get("latitude"))
|
|
longitude = mysql_trim_float(attributes.get("longitude"))
|
|
|
|
existing_entry = session.query(HALocations).filter(
|
|
func.round(HALocations.lat, 6) == round(latitude, 6),
|
|
func.round(HALocations.lon, 6) == round(longitude, 6),
|
|
func.date(HALocations.timestamp) == datetime.strptime(str(timestamp).strip('+00:00'), '%Y-%m-%d %H:%M:%S.%f').date(),
|
|
func.hour(HALocations.timestamp) == datetime.strptime(str(timestamp).strip('+00:00'), '%Y-%m-%d %H:%M:%S.%f').hour,
|
|
func.minute(HALocations.timestamp) == datetime.strptime(str(timestamp).strip('+00:00'), '%Y-%m-%d %H:%M:%S.%f').minute,
|
|
HALocations.device == entity_id
|
|
).first()
|
|
|
|
if existing_entry is None:
|
|
batt_level = get_battery_level(entity_id, timestamp)
|
|
if batt_level is not None:
|
|
attributes['battery'] = batt_level
|
|
else:
|
|
attributes['battery'] = -1
|
|
|
|
new_entry = HALocations(
|
|
device=entity_id,
|
|
lat=latitude,
|
|
lon=longitude,
|
|
state=state,
|
|
attributes=attributes,
|
|
timestamp=timestamp
|
|
)
|
|
session.add(new_entry)
|
|
|
|
if CONFIG_DATA.get('dawarich') and entity_id == CONFIG_DATA['dawarich']['entity_id']:
|
|
send_to_dawarich(
|
|
entity_id,
|
|
timestamp,
|
|
latitude,
|
|
longitude,
|
|
attributes['gps_accuracy'], attributes['vertical_accuracy'], attributes['altitude'], attributes['battery'], attributes['speed'], CONFIG_DATA)
|
|
|
|
new_rows += 1
|
|
else:
|
|
skipped_rows += 1
|
|
|
|
_logger.info(f'Committing changes to database for entity "{entity_id}". New: {new_rows}. Skipped: {skipped_rows}')
|
|
session.commit()
|
|
session.close()
|
|
else:
|
|
_logger.error(f'Failed to fetch the location history: {response.status_code} -- {response.text}')
|
|
|
|
_logger.info('History fetch complete')
|
|
time.sleep(CONFIG_DATA['interval'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|