add files

This commit is contained in:
Cyberes 2024-08-16 23:20:58 -06:00
parent de9bb1bdc2
commit e56b291ee6
7 changed files with 245 additions and 2 deletions

4
.gitignore vendored
View File

@ -1,3 +1,5 @@
.idea
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
@ -15,7 +17,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
@ -159,4 +160,3 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

0
lib/__init__.py Normal file
View File

86
lib/cddis_fetch.py Normal file
View File

@ -0,0 +1,86 @@
import datetime
import subprocess
import sys
import tempfile
from pathlib import Path
import chromedriver_autoinstaller
import requests
from selenium import webdriver
from selenium.webdriver import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
IONEX_BASE_URL = 'https://cddis.nasa.gov/archive/gnss/products/ionex/'
def fetch_latest_ionex(username: str, password: str):
now = datetime.date.today()
url = IONEX_BASE_URL + str(now.year)
chromedriver_autoinstaller.install()
options = Options()
options.add_argument('--headless=new')
driver = webdriver.Chrome(options=options)
driver.get(url)
# Login
username_field = WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.ID, "username")))
username_field.clear()
username_field.send_keys(username)
password_field = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "password")))
password_field.clear()
password_field.send_keys(password)
password_field.send_keys(Keys.RETURN)
# Wait until we're redirected to the right page.
WebDriverWait(driver, 30).until(EC.visibility_of_element_located((By.ID, "parDirTextContainer")))
# Get the days in the year.
day_elements = driver.find_elements(By.XPATH, '//div[@class="archiveDir"]/div[@class="archiveDirTextContainer"]/a[@class="archiveDirText"]')
day_urls = [element.get_attribute('href') for element in day_elements]
# Load the latest day.
driver.get(day_urls[-1])
# Find our file.
file_elements = driver.find_elements(By.XPATH, '//a[@class="archiveItemText"]')
file_urls = [element.get_attribute('href') for element in file_elements]
found_url = None
for u in file_urls:
parts = u.split('/')
if parts[-1].startswith('c2pg'):
found_url = u
break
if found_url is None:
print('Did not find c2pg')
sys.exit(1)
# Download our file.
auth_cookie = None
for cookie in driver.get_cookies():
if cookie['name'] == 'ProxyAuth':
auth_cookie = cookie['value']
break
if auth_cookie is None:
print('Did not find ProxyAuth cookie')
sys.exit(1)
driver.close()
del driver
# Download data.
zip_data_r = requests.get(found_url, cookies={'ProxyAuth': auth_cookie})
zip_data_r.raise_for_status()
# Read data.
tmp_file = tempfile.NamedTemporaryFile()
tmp_file.write(zip_data_r.content)
tmp_dir = tempfile.TemporaryDirectory()
subprocess.run(["7z", "e", tmp_file.name, f"-o{tmp_dir.name}"], check=True, stdout=subprocess.PIPE)
p = Path(tmp_dir.name)
target_file = list(p.iterdir())[-1]
data = target_file.read_text()
return data

68
lib/tecmap.py Normal file
View File

@ -0,0 +1,68 @@
import re
from datetime import datetime
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
import numpy as np
"""
https://github.com/daniestevez/jupyter_notebooks/blob/master/IONEX.ipynb
"""
def parse_ionex_datetime(s: str):
match = re.match(r'\s*(\d{4})\s*(\d{1,2})\s*(\d{1,2})\s*(\d{1,2})\s*(\d{1,2})\s*(\d{1,2})', s)
if match:
year, month, day, hour, minute, second = map(int, match.groups())
return datetime(year, month, day, hour, minute, second)
else:
raise ValueError("Invalid date format")
def parse_map(tecmap, exponent=-1):
tecmap = re.split('.*END OF TEC MAP', tecmap)[0]
return np.stack([np.fromstring(l, sep=' ') for l in re.split('.*LAT/LON1/LON2/DLON/H\\n', tecmap)[1:]]) * 10 ** exponent
def get_tecmaps(ionex: str):
for tecmap in ionex.split('START OF TEC MAP')[1:]:
lines = tecmap.split('\n')
epoch = lines[1].strip() if len(lines) > 1 else None
yield parse_map(tecmap), epoch
def plot_tec_map(tecmap, lon_range: list, lat_range: list):
proj = ccrs.PlateCarree()
f, ax = plt.subplots(1, 1, subplot_kw=dict(projection=proj))
# Create arrays of latitudes and longitudes to match the geographical grid of the TEC map data.
# This is hard coded and should never change.
lat = np.arange(-87.5, 87.5, 2.5)
lon = np.arange(-180, 180, 2.5)
# Create a mask for the data in the lat/lon range
lon_mask = (lon >= lon_range[0]) & (lon <= lon_range[1])
lat_mask = (lat >= lat_range[0]) & (lat <= lat_range[1])
mask = np.ix_(lat_mask, lon_mask)
# Select only the data in the lat/lon range
tecmap_ranged = tecmap[mask]
# Plot the TEC map
h = plt.imshow(tecmap_ranged, cmap='viridis', vmin=0, vmax=100, extent=(lon_range[0], lon_range[1], lat_range[0], lat_range[1]), transform=proj)
# Make graph pretty
# ax.coastlines()
# plt.title('VTEC map')
# divider = make_axes_locatable(ax)
# ax_cb = divider.new_horizontal(size='5%', pad=0.1, axes_class=plt.Axes)
# f.add_axes(ax_cb)
# cb = plt.colorbar(h, cax=ax_cb)
# plt.rc('text', usetex=True)
# cb.set_label('TECU ($10^{16} \\mathrm{el}/\\mathrm{m}^2$)')
# plt.show()
# Deallocate
plt.close()
return tecmap_ranged

68
main.py Normal file
View File

@ -0,0 +1,68 @@
import logging
import os
import sys
import time
import numpy as np
import paho.mqtt.client as mqtt
from lib.cddis_fetch import fetch_latest_ionex
from lib.tecmap import get_tecmaps, plot_tec_map
logging.basicConfig(level=logging.INFO)
MQTT_BROKER_HOST = os.getenv('MQTT_BROKER_HOST', "")
MQTT_BROKER_PORT = int(os.getenv('MQTT_BROKER_PORT', 1883))
MQTT_CLIENT_ID = os.getenv('MQTT_CLIENT_ID', "space_weather")
MQTT_USERNAME = os.getenv('MQTT_USERNAME', "")
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD', "")
MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX', "space-weather")
LAT_RANGE_MIN = os.getenv('LAT_RANGE_MIN')
LAT_RANGE_MAX = os.getenv('LAT_RANGE_MAX')
LON_RANGE_MIN = os.getenv('LON_RANGE_MIN')
LON_RANGE_MAX = os.getenv('LON_RANGE_MAX')
if not LAT_RANGE_MIN or not LAT_RANGE_MAX or not LON_RANGE_MIN or not LON_RANGE_MAX:
logging.critical('Must set LAT_RANGE_MIN, LAT_RANGE_MAX, LON_RANGE_MIN, and LON_RANGE_MAX environment variables')
sys.exit(1)
CDDIS_USERNAME = os.getenv('CDDIS_USERNAME')
CDDIS_PASSWORD = os.getenv('CDDIS_PASSWORD')
if not CDDIS_USERNAME or not CDDIS_PASSWORD:
logging.critical('Must set CDDIS_USERNAME and CDDIS_PASSWORD environment variables')
sys.exit(1)
client = mqtt.Client(client_id=MQTT_CLIENT_ID)
if MQTT_USERNAME and MQTT_PASSWORD:
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
logging.info("Username and password set.")
client.will_set(MQTT_TOPIC_PREFIX + "/status", payload="Offline", qos=1, retain=True) # set LWT
client.connect(MQTT_BROKER_HOST, port=MQTT_BROKER_PORT)
client.loop_start()
def publish(topic: str, msg):
topic_expanded = MQTT_TOPIC_PREFIX + '/' + topic
result = client.publish(topic_expanded, msg)
status = result[0]
if status == 0:
logging.info(f"Sent {msg} to topic {topic_expanded}")
else:
logging.error(f"Failed to send message to topic {topic_expanded}")
def main():
while True:
logging.info('Fetching latest IONEX data')
ionex_data = fetch_latest_ionex(CDDIS_USERNAME, CDDIS_PASSWORD)
tec_data = []
for tecmap, epoch in get_tecmaps(ionex_data):
avg_tec = np.mean(plot_tec_map(tecmap, [float(LON_RANGE_MIN), float(LON_RANGE_MAX)], [float(LAT_RANGE_MIN), float(LAT_RANGE_MAX)]))
tec_data.append(avg_tec)
daily_avg = round(np.mean(tec_data), 1)
publish('vtec', daily_avg)
time.sleep(1800)
if __name__ == '__main__':
main()

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
paho-mqtt==1.5.0
chromedriver-autoinstaller==0.6.4
selenium==4.23.1
requests==2.32.3
matplotlib==3.9.2
cartopy==0.23.0
numpy==2.0.1

14
service/vtec.service Normal file
View File

@ -0,0 +1,14 @@
[Unit]
Description=Space Weather VTEC
After=network.target
[Service]
Type=simple
User=homeassistant
EnvironmentFile=/etc/secrets/space-weather
ExecStart=/srv/space-weather/venv/bin/python /srv/space-weather/ha-noaa-space-weather-sensor/main.py
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target