ha-noaa-space-weather/feeder/mqtt.py

89 lines
2.9 KiB
Python
Raw Normal View History

2024-10-20 14:52:17 -06:00
import gc
2024-08-16 23:20:58 -06:00
import logging
import os
import pickle
2024-08-16 23:20:58 -06:00
import sys
import time
2024-10-20 14:52:17 -06:00
from datetime import datetime, timezone
2024-08-16 23:20:58 -06:00
import numpy as np
import paho.mqtt.client as mqtt
2024-11-06 12:48:45 -07:00
from dateutil.parser import parse
from redis import Redis
2024-08-16 23:20:58 -06:00
2024-11-06 12:51:13 -07:00
from lib.glotec import plot_glotec_map
2024-08-16 23:20:58 -06:00
logging.basicConfig(level=logging.INFO)
2024-08-19 15:53:55 -06:00
MQTT_BROKER_HOST = os.getenv('MQTT_BROKER_HOST', '')
2024-08-16 23:20:58 -06:00
MQTT_BROKER_PORT = int(os.getenv('MQTT_BROKER_PORT', 1883))
2024-08-19 15:53:55 -06:00
MQTT_CLIENT_ID = os.getenv('MQTT_CLIENT_ID', 'space_weather')
MQTT_USERNAME = os.getenv('MQTT_USERNAME', '')
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD', '')
MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX', 'space-weather')
2024-08-16 23:20:58 -06:00
LAT_RANGE_MIN = os.getenv('LAT_RANGE_MIN')
LAT_RANGE_MAX = os.getenv('LAT_RANGE_MAX')
LON_RANGE_MIN = os.getenv('LON_RANGE_MIN')
LON_RANGE_MAX = os.getenv('LON_RANGE_MAX')
if not LAT_RANGE_MIN or not LAT_RANGE_MAX or not LON_RANGE_MIN or not LON_RANGE_MAX:
logging.critical('Must set LAT_RANGE_MIN, LAT_RANGE_MAX, LON_RANGE_MIN, and LON_RANGE_MAX environment variables')
print(LAT_RANGE_MIN, LAT_RANGE_MAX, LON_RANGE_MIN, LON_RANGE_MAX)
2024-08-16 23:20:58 -06:00
sys.exit(1)
client = mqtt.Client(client_id=MQTT_CLIENT_ID)
if MQTT_USERNAME and MQTT_PASSWORD:
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
2024-08-19 15:53:55 -06:00
client.will_set(MQTT_TOPIC_PREFIX + '/status', payload='Offline', qos=1, retain=True)
2024-08-16 23:20:58 -06:00
client.connect(MQTT_BROKER_HOST, port=MQTT_BROKER_PORT)
client.loop_start()
def publish(topic: str, msg):
topic_expanded = MQTT_TOPIC_PREFIX + '/' + topic
2024-08-18 14:54:40 -06:00
retries = 10
for i in range(retries): # retry
result = client.publish(topic_expanded, msg)
status = result[0]
if status == 0:
2024-08-19 15:53:55 -06:00
logging.info(f'Sent {msg} to topic {topic_expanded}')
2024-08-18 14:54:40 -06:00
return
else:
2024-08-19 15:53:55 -06:00
logging.warning(f'Failed to send message to topic {topic_expanded}: {result}. Retry {i + 1}/{retries}')
2024-08-18 14:54:40 -06:00
time.sleep(10)
2024-08-19 15:53:55 -06:00
logging.error(f'Failed to send message to topic {topic_expanded}.')
2024-08-16 23:20:58 -06:00
def main():
2024-09-03 16:46:11 -06:00
redis = Redis(host='localhost', port=6379, db=0)
while True:
2024-11-06 12:58:39 -07:00
data = redis.get('glotec')
2024-09-03 17:11:22 -06:00
while data is None:
2024-09-03 16:46:11 -06:00
logging.warning('Redis has not been populated yet. Is cache.py running? Sleeping 10s...')
time.sleep(10)
2024-11-06 12:58:39 -07:00
data = redis.get('glotec')
2024-11-06 12:48:45 -07:00
geojson = pickle.loads(data)
2024-09-03 16:46:11 -06:00
2024-10-20 14:52:17 -06:00
utc_hr = datetime.now(timezone.utc).hour
2024-09-03 17:16:18 -06:00
logging.info(f'Using hour {utc_hr}')
2024-11-06 12:48:45 -07:00
glotec_map_ranged, _ = plot_glotec_map(geojson, [float(LON_RANGE_MIN), float(LON_RANGE_MAX)], [float(LAT_RANGE_MIN), float(LAT_RANGE_MAX)])
avg_tec = np.mean(glotec_map_ranged)
logging.info(f'Data timestamp: {parse(geojson["time_tag"]).isoformat()}')
2024-09-03 16:46:11 -06:00
latest = round(avg_tec, 1)
2024-11-06 12:53:28 -07:00
publish('glotec', latest)
2024-10-20 14:52:17 -06:00
del data
2024-11-06 12:48:45 -07:00
del geojson
del glotec_map_ranged
2024-10-20 14:52:17 -06:00
del avg_tec
del latest
gc.collect()
2024-09-03 16:46:11 -06:00
time.sleep(60)
2024-08-16 23:20:58 -06:00
if __name__ == '__main__':
main()