gps buffering, add timeouts, other improvements
This commit is contained in:
parent
7fabaea4f4
commit
18be663658
|
@ -1,6 +1,7 @@
|
|||
ESP32_GENERIC-20240602-v1.23.0.bin
|
||||
config.py
|
||||
.idea
|
||||
test.py
|
||||
|
||||
# ---> Python
|
||||
# Byte-compiled / optimized / DLL files
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
https://github.com/jposada202020/MicroPython_ICM20948
|
||||
https://github.com/ekondayan/micropython-ntp
|
||||
https://github.com/ekondayan/micropython-ntp
|
||||
https://github.com/micropython/micropython-lib
|
|
@ -0,0 +1,27 @@
|
|||
import gc
|
||||
|
||||
from config import GNSS_OFFLINE_BUFFER_SIZE
|
||||
|
||||
|
||||
class PositionBuffer:
|
||||
def __init__(self):
|
||||
self.stack = []
|
||||
|
||||
def push(self, item):
|
||||
if len(self.stack) >= GNSS_OFFLINE_BUFFER_SIZE:
|
||||
self.stack.pop(0) # remove the oldest item
|
||||
self.stack.append(item)
|
||||
|
||||
def pop(self):
|
||||
if len(self.stack) < 1:
|
||||
return None
|
||||
try:
|
||||
return self.stack.pop()
|
||||
finally:
|
||||
gc.collect()
|
||||
|
||||
def size(self):
|
||||
return len(self.stack)
|
||||
|
||||
|
||||
position_buffer = PositionBuffer()
|
|
@ -0,0 +1,17 @@
|
|||
class Position:
|
||||
def __init__(self, valid: bool, latitude: tuple, longitude: tuple, altitude: float, speed: float, satellites_in_use: int, hdop: float, timestamp: int, timedata: tuple, course: float):
|
||||
self.valid = valid
|
||||
self.altitude = altitude
|
||||
self.speed = speed
|
||||
self.satellites_in_use = satellites_in_use
|
||||
self.hdop = hdop
|
||||
self.timestamp = timestamp
|
||||
self.timedata = timedata # real GPS timestamp in the format (timestamp, date)
|
||||
self.course = course
|
||||
|
||||
self.latitude = latitude[0]
|
||||
if latitude[1] == 'S':
|
||||
self.latitude = -latitude[0]
|
||||
self.longitude = longitude[0]
|
||||
if longitude[1] == 'W':
|
||||
self.longitude = -longitude[0]
|
|
@ -1,10 +1,13 @@
|
|||
from time import sleep
|
||||
import asyncio
|
||||
|
||||
from machine import Pin, UART
|
||||
|
||||
from config import TIMEOUT_GNSS_LOCATION, GNSS_LOCATION_RETRY
|
||||
from lib.gps.micropyGPS import MicropyGPS
|
||||
from lib.gps.position import Position
|
||||
from lib.interval_tracker import interval_tracker
|
||||
from lib.logging import logger
|
||||
from lib.logging import logger, LogLevel
|
||||
from lib.runtime import timeout, uTimeoutError
|
||||
from lib.ttime import initialize_rtc, unix_timestamp
|
||||
|
||||
PIN_GPS_POWER = 12
|
||||
|
@ -18,18 +21,23 @@ gps_power_pin = Pin(PIN_GPS_POWER, Pin.OUT)
|
|||
gps_power_pin.value(0) # Turn off GPS power initially
|
||||
|
||||
|
||||
def read_gps_uart(want: str):
|
||||
# TODO: add timeout
|
||||
async def read_gps_uart(want: str):
|
||||
m = MicropyGPS(location_formatting='dd')
|
||||
buffer = b''
|
||||
decoded = ''
|
||||
sreader = asyncio.StreamReader(uart)
|
||||
|
||||
while True:
|
||||
if uart.any():
|
||||
c = uart.read(1)
|
||||
c = await sreader.read(1)
|
||||
if c == b'\r':
|
||||
continue
|
||||
if c == b'\n':
|
||||
decoded = buffer.decode()
|
||||
try:
|
||||
decoded = buffer.decode()
|
||||
except UnicodeError:
|
||||
# We see sporadic decode errors for some reason, probably on strings we don't care about.
|
||||
continue
|
||||
buffer = b''
|
||||
else:
|
||||
buffer += c
|
||||
|
@ -39,34 +47,30 @@ def read_gps_uart(want: str):
|
|||
if m.latitude[0] > 0 and m.longitude[0] > 0:
|
||||
return m
|
||||
decoded = ''
|
||||
sleep(0.05)
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
|
||||
class Position:
|
||||
def __init__(self, valid: bool, latitude: tuple, longitude: tuple, altitude: float, speed: float, satellites_in_use: int, hdop: float, timestamp: int, timedata: tuple, course: float):
|
||||
self.valid = valid
|
||||
self.altitude = altitude
|
||||
self.speed = speed
|
||||
self.satellites_in_use = satellites_in_use
|
||||
self.hdop = hdop
|
||||
self.timestamp = timestamp
|
||||
self.timedata = timedata # real GPS timestamp in the format (timestamp, date)
|
||||
self.course = course
|
||||
|
||||
self.latitude = latitude[0]
|
||||
if latitude[1] == 'S':
|
||||
self.latitude = -latitude[0]
|
||||
self.longitude = longitude[0]
|
||||
if longitude[1] == 'W':
|
||||
self.longitude = -longitude[0]
|
||||
|
||||
|
||||
def get_position():
|
||||
async def get_position():
|
||||
gps_power_pin.value(1) # Always make sure the GPS is on.
|
||||
timestamp = unix_timestamp()
|
||||
gnrmc = read_gps_uart('$GNRMC')
|
||||
gngga = read_gps_uart('$GNGGA')
|
||||
p = Position(gnrmc.valid, gnrmc.latitude, gnrmc.longitude, gnrmc.altitude, gnrmc.speed[0], gngga.satellites_in_use, gngga.hdop, timestamp, (gnrmc.timestamp, gnrmc.date), gnrmc.course)
|
||||
|
||||
@timeout(TIMEOUT_GNSS_LOCATION)
|
||||
async def get_loc():
|
||||
timestamp = unix_timestamp()
|
||||
gnrmc = await read_gps_uart('$GNRMC')
|
||||
gngga = await read_gps_uart('$GNGGA')
|
||||
return Position(gnrmc.valid, gnrmc.latitude, gnrmc.longitude, gnrmc.altitude, gnrmc.speed[0], gngga.satellites_in_use, gngga.hdop, timestamp, (gnrmc.timestamp, gnrmc.date), gnrmc.course)
|
||||
|
||||
p = Position(valid=False, latitude=(0.0, 'N'), longitude=(0.0, 'W'), altitude=0, speed=0, satellites_in_use=0, hdop=0, timestamp=unix_timestamp(), timedata=None, course=0)
|
||||
for i in range(GNSS_LOCATION_RETRY):
|
||||
try:
|
||||
p = await get_loc()
|
||||
break
|
||||
except uTimeoutError:
|
||||
logger(f'Failed to determine location within GNSS timeout ({TIMEOUT_GNSS_LOCATION}). Attempt {i + 1}/{GNSS_LOCATION_RETRY}. Restarting GNSS', source='GPS', level=LogLevel.warning)
|
||||
await asyncio.sleep(3)
|
||||
gps_power_pin.value(0)
|
||||
await asyncio.sleep(1)
|
||||
gps_power_pin.value(1)
|
||||
|
||||
# Set the clock if it's time.
|
||||
if interval_tracker.check('ntp_sync'):
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
import time
|
||||
|
||||
|
||||
class LogLevel:
|
||||
debug = 'DEBUG'
|
||||
info = 'INFO'
|
||||
|
@ -9,4 +12,4 @@ def logger(msg: str, level: LogLevel = LogLevel.info, source: str = None):
|
|||
s = ''
|
||||
if source:
|
||||
s = f'[{source}] - '
|
||||
print(f'{s}{level} -- {msg}')
|
||||
print(f'{s}{level} -- {time.ticks_ms() / 1000} -- {msg}')
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
from lib.networking.wifi import wifi
|
||||
|
||||
|
||||
class ConnectionTypes:
|
||||
wifi = 'Wifi'
|
||||
cell = 'Cellular'
|
||||
offline = 'Offline'
|
||||
|
||||
|
||||
def select_connection_type():
|
||||
if wifi.is_connected():
|
||||
return ConnectionTypes.wifi
|
||||
# if cellular.is_connected():
|
||||
else:
|
||||
return ConnectionTypes.offline
|
|
@ -1,49 +1,79 @@
|
|||
import time
|
||||
import asyncio
|
||||
|
||||
import network
|
||||
|
||||
from config import WIFI_SSID, WIFI_PASSWORD, WIFI_CONNECT_TIMEOUT
|
||||
from lib.logging import logger
|
||||
from config import WIFI_SSID, WIFI_PASSWORD, TIMEOUT_WIFI_CONNECT
|
||||
from lib.logging import logger, LogLevel
|
||||
from lib.runtime import timeout
|
||||
|
||||
|
||||
class WifiStatus:
|
||||
IDLE = network.STAT_IDLE
|
||||
CONNECTING = network.STAT_CONNECTING
|
||||
BAD_PASS = network.STAT_WRONG_PASSWORD
|
||||
AP_NOT_FOUND = network.STAT_NO_AP_FOUND
|
||||
CONNECTED = network.STAT_GOT_IP
|
||||
|
||||
def __init__(self, status):
|
||||
self.status = status
|
||||
|
||||
@classmethod
|
||||
def from_status_code(cls, status_code):
|
||||
# Pass in a status code and convert it to the object.
|
||||
for attr in dir(cls):
|
||||
if getattr(cls, attr) == status_code:
|
||||
return cls(attr)
|
||||
return None
|
||||
|
||||
def __str__(self):
|
||||
return self.status
|
||||
|
||||
|
||||
class WifiMananger:
|
||||
_wlan = None
|
||||
_wlan = network.WLAN(network.STA_IF)
|
||||
|
||||
def activate(self):
|
||||
if self._wlan is not None:
|
||||
raise Exception("Already activated")
|
||||
self._wlan = network.WLAN(network.STA_IF)
|
||||
self._wlan.active(True)
|
||||
def active(self, active: bool):
|
||||
self._wlan.active(active)
|
||||
|
||||
def disconnect(self):
|
||||
self._wlan.disconnect()
|
||||
|
||||
def connect(self):
|
||||
if self._wlan.isconnected():
|
||||
@timeout(TIMEOUT_WIFI_CONNECT)
|
||||
async def connect(self, ignore_missing: bool = False):
|
||||
if self._wlan.isconnected() or self.status() == WifiStatus.CONNECTED:
|
||||
raise Exception("Already connected")
|
||||
if self.status() == WifiStatus.CONNECTING:
|
||||
logger(f'Already attempting connection, device power cycle may be required to reset driver', source='WIFI')
|
||||
return False
|
||||
|
||||
logger(f'Scanning', source='WIFI')
|
||||
heard = self.scan()
|
||||
found = False
|
||||
for item in self.scan():
|
||||
for item in heard:
|
||||
if item[0].decode() == WIFI_SSID:
|
||||
found = True
|
||||
if not found:
|
||||
logger(f'SSID not found: "{WIFI_SSID}"', source='WIFI')
|
||||
if not ignore_missing:
|
||||
logger(f'SSID not found: "{WIFI_SSID}". Heard {len(heard)} other SSIDs.', source='WIFI', level=LogLevel.warning)
|
||||
return False
|
||||
if self.status() == WifiStatus.AP_NOT_FOUND:
|
||||
logger(f'SSID not found: "{WIFI_SSID}". Heard {len(heard)} other SSIDs.', source='WIFI', level=LogLevel.warning)
|
||||
return False
|
||||
|
||||
logger(f'Connecting to "{WIFI_SSID}"', source='WIFI')
|
||||
self._wlan.connect(WIFI_SSID, WIFI_PASSWORD)
|
||||
for _ in range(WIFI_CONNECT_TIMEOUT):
|
||||
time.sleep(1)
|
||||
for _ in range(TIMEOUT_WIFI_CONNECT):
|
||||
await asyncio.sleep(1)
|
||||
if wifi.address() != '0.0.0.0':
|
||||
break
|
||||
time.sleep(1)
|
||||
await asyncio.sleep(1)
|
||||
if wifi.address() == '0.0.0.0':
|
||||
logger(f'Failed to connect to "{WIFI_SSID}"', source='WIFI')
|
||||
logger(f'Failed to connect to "{WIFI_SSID}". Status: {self.status()}', source='WIFI', level=LogLevel.warning)
|
||||
return False
|
||||
logger(f'Connected to "{wifi.config("ssid")}" with IP {wifi.address()}', source='WIFI')
|
||||
return True
|
||||
|
||||
def isconnected(self):
|
||||
def is_connected(self):
|
||||
return self._wlan.isconnected()
|
||||
|
||||
def address(self):
|
||||
|
@ -54,9 +84,7 @@ class WifiMananger:
|
|||
return self._wlan.ifconfig()
|
||||
|
||||
def signal_strength(self):
|
||||
for item in self.scan():
|
||||
if item[0].decode() == WIFI_SSID:
|
||||
return item[3]
|
||||
return self._wlan.status('rssi')
|
||||
|
||||
def config(self, value: str):
|
||||
return self._wlan.config(value)
|
||||
|
@ -68,5 +96,12 @@ class WifiMananger:
|
|||
m = self._wlan.config('mac')
|
||||
return ':'.join('%02x' % b for b in m)
|
||||
|
||||
def status(self):
|
||||
c = self._wlan.status()
|
||||
s = WifiStatus.from_status_code(c)
|
||||
if s is None:
|
||||
raise Exception(f'Status code not matched: {c}')
|
||||
return s
|
||||
|
||||
|
||||
wifi = WifiMananger()
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
import asyncio
|
||||
|
||||
|
||||
class uTimeoutError(Exception):
|
||||
def __init__(self, message="Timeout occurred"):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
def timeout(seconds):
|
||||
"""
|
||||
This will not work if your decorated function uses `time.sleep()`!!! Use `asyncio.sleep()` instead.
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
async def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
|
||||
except asyncio.TimeoutError:
|
||||
raise uTimeoutError(f'Function {func.__name__}() timed out after {seconds} seconds')
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def reboot():
|
||||
import machine
|
||||
machine.reset()
|
|
@ -40,7 +40,7 @@ class TraccarGetRequest:
|
|||
"""
|
||||
|
||||
def __init__(self, timestamp: int, lat: float, lon: float, loc_valid: bool = True,
|
||||
cell: CellularInfo = None, wifi: WifiInfo = None, speed: int = None, heading: int = None, altitude: int = None,
|
||||
cell: CellularInfo = None, wifi: WifiInfo = None, speed: float = None, heading: float = None, altitude: float = None,
|
||||
accuracy: int = None, hdop: float = None, custom: dict = None):
|
||||
if not isinstance(timestamp, (int, float)):
|
||||
raise ValueError(f"timestamp must be an integer, not {type(timestamp)}")
|
||||
|
@ -55,11 +55,11 @@ class TraccarGetRequest:
|
|||
if wifi is not None and not isinstance(wifi, WifiInfo):
|
||||
raise ValueError(f"wifi must be an instance of WifiInfo, not {type(wifi)}")
|
||||
if speed is not None and not isinstance(speed, (int, float)):
|
||||
raise ValueError(f"speed must be an integer, not {type(speed)}")
|
||||
raise ValueError(f"speed must be a float, not {type(speed)}")
|
||||
if heading is not None and not isinstance(heading, (int, float)):
|
||||
raise ValueError(f"heading must be an integer, not {type(heading)}")
|
||||
raise ValueError(f"heading must be a float, not {type(heading)}")
|
||||
if altitude is not None and not isinstance(altitude, (int, float)):
|
||||
raise ValueError(f"altitude must be an integer, not {type(altitude)}")
|
||||
raise ValueError(f"altitude must be a float, not {type(altitude)}")
|
||||
if accuracy is not None and not isinstance(accuracy, (int, float)):
|
||||
raise ValueError(f"accuracy must be an integer, not {type(accuracy)}")
|
||||
if hdop is not None and not isinstance(hdop, float):
|
||||
|
@ -72,8 +72,13 @@ class TraccarGetRequest:
|
|||
self.lat = float(lat)
|
||||
self.lon = float(lon)
|
||||
self.loc_valid = loc_valid
|
||||
|
||||
if wifi:
|
||||
self.wifi = f'{wifi.mac_addr},{wifi.signal_strength}'
|
||||
else:
|
||||
self.wifi = None
|
||||
self.cell = cell # TODO: serialize
|
||||
self.wifi = f'{wifi.mac_addr},{wifi.signal_strength}'
|
||||
|
||||
self.speed = type_or_none(speed, int)
|
||||
self.heading = type_or_none(heading, int)
|
||||
self.altitude = type_or_none(altitude, int)
|
||||
|
@ -94,7 +99,11 @@ class TraccarGetRequest:
|
|||
|
||||
parameters = []
|
||||
stuff = self.__dict__.copy()
|
||||
|
||||
if self.wifi is None:
|
||||
stuff.pop('wifi')
|
||||
stuff.pop('custom')
|
||||
|
||||
for k, v in stuff.items():
|
||||
append_item(k, v)
|
||||
if self.custom is not None:
|
||||
|
|
|
@ -2,47 +2,61 @@ import gc
|
|||
|
||||
import urequests as requests
|
||||
|
||||
from config import GNSS_OFFLINE_BUFFER_SIZE
|
||||
from lib.gps.buffer import position_buffer
|
||||
from lib.gps.position import Position
|
||||
from lib.gps.read import get_position
|
||||
from lib.led import led_on, led_off
|
||||
from lib.logging import LogLevel, logger
|
||||
from lib.networking.select import select_connection_type, ConnectionTypes
|
||||
from lib.networking.wifi import wifi
|
||||
from lib.traccar.request import TraccarGetRequest, WifiInfo
|
||||
|
||||
|
||||
def send_to_traccar(event: TraccarGetRequest):
|
||||
def send_to_traccar(position: Position, wifi_info: WifiInfo, cell_info):
|
||||
led_on()
|
||||
params = ' '.join(['='.join((x, str(y))) for x, y in event.query])
|
||||
# TODO: determine cell or wifi here
|
||||
gc.collect()
|
||||
r = requests.post(event.request_url)
|
||||
if r.status_code != 200:
|
||||
logger(f'{params} - Failed to send request to traccar: "{r.text}" - Status code: {r.status_code}', level=LogLevel.error, source='NET')
|
||||
event = None
|
||||
params = 'None'
|
||||
if select_connection_type() == ConnectionTypes.wifi:
|
||||
event = TraccarGetRequest(
|
||||
timestamp=position.timestamp,
|
||||
lat=position.latitude,
|
||||
lon=position.longitude,
|
||||
loc_valid=position.valid,
|
||||
wifi=wifi_info,
|
||||
speed=position.speed,
|
||||
heading=position.course,
|
||||
altitude=position.altitude,
|
||||
hdop=position.hdop,
|
||||
custom={
|
||||
'satellites': position.satellites_in_use
|
||||
}
|
||||
)
|
||||
params = ' '.join(['='.join((x, str(y))) for x, y in event.query])
|
||||
r = requests.post(event.request_url)
|
||||
if r.status_code != 200:
|
||||
logger(f'{params} - Failed to send request to traccar: "{r.text}" - Status code: {r.status_code}', level=LogLevel.error, source='NET')
|
||||
elif select_connection_type() == ConnectionTypes.cell:
|
||||
print('cellular not implemented')
|
||||
else:
|
||||
position_buffer.push(event)
|
||||
logger(f'Offline buffer: {position_buffer.size()}/{GNSS_OFFLINE_BUFFER_SIZE}', source='STORE')
|
||||
if select_connection_type() != ConnectionTypes.offline:
|
||||
logger(params, source='NET')
|
||||
led_off()
|
||||
gc.collect()
|
||||
|
||||
|
||||
def assemble_position_message():
|
||||
# Get the GPS fix.
|
||||
position = get_position()
|
||||
async def assemble_position_message(position: Position = None):
|
||||
if not position:
|
||||
position = await get_position()
|
||||
|
||||
# Gather connection info
|
||||
conn_info = WifiInfo(mac_addr=wifi.mac_addr(), signal_strength=wifi.signal_strength())
|
||||
wifi_info = None
|
||||
if wifi.is_connected():
|
||||
wifi_info = WifiInfo(mac_addr=wifi.mac_addr(), signal_strength=wifi.signal_strength())
|
||||
|
||||
cell_info = None
|
||||
# cell_info = CellInfo()
|
||||
|
||||
# Startup ping
|
||||
return TraccarGetRequest(
|
||||
timestamp=position.timestamp,
|
||||
lat=position.latitude,
|
||||
lon=position.longitude,
|
||||
loc_valid=position.valid,
|
||||
wifi=conn_info,
|
||||
speed=position.speed,
|
||||
heading=position.course,
|
||||
altitude=position.altitude,
|
||||
hdop=position.hdop,
|
||||
custom={
|
||||
'satellites': position.satellites_in_use
|
||||
}
|
||||
)
|
||||
return position, wifi_info, cell_info
|
||||
|
|
|
@ -8,9 +8,13 @@ _rtc = RTC()
|
|||
Ntp.set_datetime_callback(_rtc.datetime)
|
||||
|
||||
|
||||
def initialize_rtc(first_fix):
|
||||
hour, minute, second = first_fix.timedata[0]
|
||||
day, month, year = first_fix.timedata[1]
|
||||
def initialize_rtc(first_loc):
|
||||
if first_loc.timedata is None:
|
||||
# This can happen when the GPS fix failed.
|
||||
raise Exception('initialize_rtc() got None timedata')
|
||||
|
||||
hour, minute, second = first_loc.timedata[0]
|
||||
day, month, year = first_loc.timedata[1]
|
||||
|
||||
# Convert the GPS date and time to a struct_time
|
||||
gps_time = time.mktime((year + 2000, month, day, hour, minute, int(second), 0, 0))
|
||||
|
|
40
src/main.py
40
src/main.py
|
@ -1,22 +1,38 @@
|
|||
import asyncio
|
||||
import gc
|
||||
import time
|
||||
|
||||
from lib.interval_tracker import interval_tracker
|
||||
from lib.networking.wifi import wifi
|
||||
from lib.ntp import Ntp
|
||||
from lib.runtime import reboot
|
||||
from lib.traccar.send import send_to_traccar, assemble_position_message
|
||||
from startup import startup
|
||||
|
||||
startup()
|
||||
|
||||
while True:
|
||||
gc.collect()
|
||||
if interval_tracker.check('wifi_scan'):
|
||||
if not wifi.isconnected():
|
||||
wifi.connect()
|
||||
if interval_tracker.check('active_position_send'):
|
||||
send_to_traccar(assemble_position_message())
|
||||
if interval_tracker.check('clock_drift_comp'):
|
||||
Ntp.drift_compensate(Ntp.drift_us())
|
||||
gc.collect()
|
||||
async def main():
|
||||
await startup()
|
||||
# TODO: background thread to sync items in the position buffer when we go back online
|
||||
|
||||
while True:
|
||||
gc.collect()
|
||||
if interval_tracker.check('wifi_scan'):
|
||||
if not wifi.is_connected():
|
||||
wifi.connect(ignore_missing=True)
|
||||
if interval_tracker.check('active_position_send'):
|
||||
send_to_traccar(*(await assemble_position_message()))
|
||||
if interval_tracker.check('clock_drift_comp'):
|
||||
Ntp.drift_compensate(Ntp.drift_us())
|
||||
gc.collect()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except Exception as e:
|
||||
import sys
|
||||
import time
|
||||
|
||||
sys.print_exception(e)
|
||||
print('\n-- REBOOTING --\n\n')
|
||||
time.sleep(1)
|
||||
reboot()
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
import machine
|
||||
machine.reset()
|
||||
from lib.runtime import reboot
|
||||
|
||||
reboot()
|
||||
|
|
|
@ -1,16 +1,25 @@
|
|||
import asyncio
|
||||
import gc
|
||||
import platform
|
||||
import sys
|
||||
|
||||
from config import *
|
||||
from lib.gps.read import gps_power_pin, get_position
|
||||
from lib.led import led_on, led_off
|
||||
from lib.logging import logger
|
||||
from lib.logging import logger, LogLevel
|
||||
from lib.networking.wifi import wifi
|
||||
from lib.runtime import uTimeoutError
|
||||
from lib.traccar.send import send_to_traccar, assemble_position_message
|
||||
from lib.ttime import unix_timestamp, initialize_rtc
|
||||
|
||||
|
||||
def startup():
|
||||
async def startup():
|
||||
led_on()
|
||||
print('Freematics Micropython Edition')
|
||||
print('https://git.evulid.cc/cyberes/freematics-firmware_v5-micropython')
|
||||
print(sys.platform)
|
||||
print(platform.platform())
|
||||
print('Memory size:', gc.mem_free() + gc.mem_alloc())
|
||||
|
||||
# Device info
|
||||
print('==========')
|
||||
|
@ -22,29 +31,32 @@ def startup():
|
|||
# Start the GPS and let it get itself sorted out while we do other things.
|
||||
gps_power_pin.value(1)
|
||||
|
||||
# Activate wifi but don't connect yet.
|
||||
wifi.activate()
|
||||
if wifi.isconnected():
|
||||
# Connect to wifi
|
||||
wifi.active(False)
|
||||
await asyncio.sleep(0.5)
|
||||
wifi.active(True)
|
||||
if wifi.is_connected():
|
||||
wifi.disconnect()
|
||||
logger('Disconnected from existing network', source='WIFI')
|
||||
wifi.connect()
|
||||
try:
|
||||
await wifi.connect()
|
||||
except uTimeoutError:
|
||||
logger(f'Failed to connect to "{WIFI_SSID}". Status: TIMEOUT', source='WIFI', level=LogLevel.error)
|
||||
|
||||
# GPS
|
||||
logger('Getting initial fix', source='GPS')
|
||||
position = get_position()
|
||||
logger('Getting initial location', source='GPS')
|
||||
position = await get_position()
|
||||
|
||||
# Time.
|
||||
# Time
|
||||
initialize_rtc(position)
|
||||
logger(f'Current time: {unix_timestamp()}')
|
||||
|
||||
# We are fully initalized so we can turn off the LED.
|
||||
led_off()
|
||||
|
||||
logger('Startup complete!')
|
||||
print('====================')
|
||||
|
||||
# Send the first message
|
||||
send_to_traccar(assemble_position_message())
|
||||
send_to_traccar(*(await assemble_position_message(position)))
|
||||
|
||||
|
||||
def validate_config():
|
||||
|
|
11
test.py
11
test.py
|
@ -1,11 +0,0 @@
|
|||
import time
|
||||
|
||||
|
||||
def gps_to_unix(gps_time, gps_date):
|
||||
hour, minute, second = gps_time
|
||||
day, month, year = gps_date
|
||||
gps_datetime = time.mktime((year - 12, month + 6, day - 12, hour, minute, int(second), 0, 0))
|
||||
return gps_datetime
|
||||
|
||||
|
||||
print(gps_to_unix([8, 43, 15.4], (29, 6, 20)))
|
|
@ -2,15 +2,15 @@
|
|||
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
|
||||
|
||||
echo "Erasing files..."
|
||||
# TODO: check that /lib exists first
|
||||
# TODO: read /src and delete files that exist
|
||||
"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 rmdir /lib
|
||||
|
||||
echo "Uploading..."
|
||||
"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 put "$SCRIPT_DIR/src/" / || exit
|
||||
"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 rm /config.sample.py
|
||||
|
||||
echo "Resetting..."
|
||||
"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 run --no-output "$SCRIPT_DIR/src/reset.py" || exit
|
||||
|
||||
#echo "Listing files..."
|
||||
#"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 ls
|
||||
# Uploading can take a while so make a sound when it's done
|
||||
# https://unix.stackexchange.com/questions/1974/how-do-i-make-my-pc-speaker-beep/163716#163716
|
||||
( speaker-test -t sine -f 1000 > /dev/null )& pid=$! ; sleep 0.1s ; kill -9 $pid
|
Loading…
Reference in New Issue