get gps working

This commit is contained in:
Cyberes 2024-06-29 04:50:52 -06:00
parent 4aa4654b71
commit 7fabaea4f4
16 changed files with 1114 additions and 84 deletions

View File

@ -13,3 +13,5 @@ WIFI_CONNECT_TIMEOUT = 10
# All in seconds.
INTERVAL_WIFI_SCAN = 10
INTERVAL_NTP_SYNC = 86400
INTERVAL_ACTIVE_POSITION_SEND = 120 # only when active and moving
INTERVAL_CLOCK_DRIFT_COMP = 1800

1
src/lib/consts.py Normal file
View File

@ -0,0 +1 @@
PIN_LED = 4

0
src/lib/gps/__init__.py Normal file
View File

830
src/lib/gps/micropyGPS.py Normal file
View File

@ -0,0 +1,830 @@
"""
# MicropyGPS - a GPS NMEA sentence parser for Micropython/Python 3.X
# Copyright (c) 2017 Michael Calvin McCoy (calvin.mccoy@protonmail.com)
# The MIT License (MIT) - see LICENSE file
"""
# TODO:
# Time Since First Fix
# Distance/Time to Target
# More Helper Functions
# Dynamically limit sentences types to parse
from math import floor, modf
# Import utime or time for fix time handling
try:
# Assume running on MicroPython
import utime
except ImportError:
# Otherwise default to time module for non-embedded implementations
# Should still support millisecond resolution.
import time
class MicropyGPS(object):
"""GPS NMEA Sentence Parser. Creates object that stores all relevant GPS data and statistics.
Parses sentences one character at a time using update(). """
# Max Number of Characters a valid sentence can be (based on GGA sentence)
SENTENCE_LIMIT = 90
__HEMISPHERES = ('N', 'S', 'E', 'W')
__NO_FIX = 1
__FIX_2D = 2
__FIX_3D = 3
__DIRECTIONS = ('N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W',
'WNW', 'NW', 'NNW')
__MONTHS = ('January', 'February', 'March', 'April', 'May',
'June', 'July', 'August', 'September', 'October',
'November', 'December')
def __init__(self, local_offset=0, location_formatting='ddm'):
"""
Setup GPS Object Status Flags, Internal Data Registers, etc
local_offset (int): Timzone Difference to UTC
location_formatting (str): Style For Presenting Longitude/Latitude:
Decimal Degree Minute (ddm) - 40° 26.767 N
Degrees Minutes Seconds (dms) - 40° 26 46 N
Decimal Degrees (dd) - 40.446° N
"""
#####################
# Object Status Flags
self.sentence_active = False
self.active_segment = 0
self.process_crc = False
self.gps_segments = []
self.crc_xor = 0
self.char_count = 0
self.fix_time = 0
#####################
# Sentence Statistics
self.crc_fails = 0
self.clean_sentences = 0
self.parsed_sentences = 0
#####################
# Logging Related
self.log_handle = None
self.log_en = False
#####################
# Data From Sentences
# Time
self.timestamp = [0, 0, 0.0]
self.date = [0, 0, 0]
self.local_offset = local_offset
# Position/Motion
self._latitude = [0, 0.0, 'N']
self._longitude = [0, 0.0, 'W']
self.coord_format = location_formatting
self.speed = [0.0, 0.0, 0.0]
self.course = 0.0
self.altitude = 0.0
self.geoid_height = 0.0
# GPS Info
self.satellites_in_view = 0
self.satellites_in_use = 0
self.satellites_used = []
self.last_sv_sentence = 0
self.total_sv_sentences = 0
self.satellite_data = dict()
self.hdop = 0.0
self.pdop = 0.0
self.vdop = 0.0
self.valid = False
self.fix_stat = 0
self.fix_type = 1
########################################
# Coordinates Translation Functions
########################################
@property
def latitude(self):
"""Format Latitude Data Correctly"""
if self.coord_format == 'dd':
decimal_degrees = self._latitude[0] + (self._latitude[1] / 60)
return [decimal_degrees, self._latitude[2]]
elif self.coord_format == 'dms':
minute_parts = modf(self._latitude[1])
seconds = round(minute_parts[0] * 60)
return [self._latitude[0], int(minute_parts[1]), seconds, self._latitude[2]]
else:
return self._latitude
@property
def longitude(self):
"""Format Longitude Data Correctly"""
if self.coord_format == 'dd':
decimal_degrees = self._longitude[0] + (self._longitude[1] / 60)
return [decimal_degrees, self._longitude[2]]
elif self.coord_format == 'dms':
minute_parts = modf(self._longitude[1])
seconds = round(minute_parts[0] * 60)
return [self._longitude[0], int(minute_parts[1]), seconds, self._longitude[2]]
else:
return self._longitude
########################################
# Logging Related Functions
########################################
def start_logging(self, target_file, mode="append"):
"""
Create GPS data log object
"""
# Set Write Mode Overwrite or Append
mode_code = 'w' if mode == 'new' else 'a'
try:
self.log_handle = open(target_file, mode_code)
except AttributeError:
print("Invalid FileName")
return False
self.log_en = True
return True
def stop_logging(self):
"""
Closes the log file handler and disables further logging
"""
try:
self.log_handle.close()
except AttributeError:
print("Invalid Handle")
return False
self.log_en = False
return True
def write_log(self, log_string):
"""Attempts to write the last valid NMEA sentence character to the active file handler
"""
try:
self.log_handle.write(log_string)
except TypeError:
return False
return True
########################################
# Sentence Parsers
########################################
def gprmc(self):
"""Parse Recommended Minimum Specific GPS/Transit data (RMC)Sentence.
Updates UTC timestamp, latitude, longitude, Course, Speed, Date, and fix status
"""
# UTC Timestamp
try:
utc_string = self.gps_segments[1]
if utc_string: # Possible timestamp found
hours = (int(utc_string[0:2]) + self.local_offset) % 24
minutes = int(utc_string[2:4])
seconds = float(utc_string[4:])
self.timestamp = [hours, minutes, seconds]
else: # No Time stamp yet
self.timestamp = [0, 0, 0.0]
except ValueError: # Bad Timestamp value present
return False
# Date stamp
try:
date_string = self.gps_segments[9]
# Date string printer function assumes to be year >=2000,
# date_string() must be supplied with the correct century argument to display correctly
if date_string: # Possible date stamp found
day = int(date_string[0:2])
month = int(date_string[2:4])
year = int(date_string[4:6])
self.date = (day, month, year)
else: # No Date stamp yet
self.date = (0, 0, 0)
except ValueError: # Bad Date stamp value present
return False
# Check Receiver Data Valid Flag
if self.gps_segments[2] == 'A': # Data from Receiver is Valid/Has Fix
# Longitude / Latitude
try:
# Latitude
l_string = self.gps_segments[3]
lat_degs = int(l_string[0:2])
lat_mins = float(l_string[2:])
lat_hemi = self.gps_segments[4]
# Longitude
l_string = self.gps_segments[5]
lon_degs = int(l_string[0:3])
lon_mins = float(l_string[3:])
lon_hemi = self.gps_segments[6]
except ValueError:
return False
if lat_hemi not in self.__HEMISPHERES:
return False
if lon_hemi not in self.__HEMISPHERES:
return False
# Speed
try:
spd_knt = float(self.gps_segments[7])
except ValueError:
return False
# Course
try:
if self.gps_segments[8]:
course = float(self.gps_segments[8])
else:
course = 0.0
except ValueError:
return False
# TODO - Add Magnetic Variation
# Update Object Data
self._latitude = [lat_degs, lat_mins, lat_hemi]
self._longitude = [lon_degs, lon_mins, lon_hemi]
# Include mph and hm/h
self.speed = [spd_knt, spd_knt * 1.151, spd_knt * 1.852]
self.course = course
self.valid = True
# Update Last Fix Time
self.new_fix_time()
else: # Clear Position Data if Sentence is 'Invalid'
self._latitude = [0, 0.0, 'N']
self._longitude = [0, 0.0, 'W']
self.speed = [0.0, 0.0, 0.0]
self.course = 0.0
self.valid = False
return True
def gpgll(self):
"""Parse Geographic Latitude and Longitude (GLL)Sentence. Updates UTC timestamp, latitude,
longitude, and fix status"""
# UTC Timestamp
try:
utc_string = self.gps_segments[5]
if utc_string: # Possible timestamp found
hours = (int(utc_string[0:2]) + self.local_offset) % 24
minutes = int(utc_string[2:4])
seconds = float(utc_string[4:])
self.timestamp = [hours, minutes, seconds]
else: # No Time stamp yet
self.timestamp = [0, 0, 0.0]
except ValueError: # Bad Timestamp value present
return False
# Check Receiver Data Valid Flag
if self.gps_segments[6] == 'A': # Data from Receiver is Valid/Has Fix
# Longitude / Latitude
try:
# Latitude
l_string = self.gps_segments[1]
lat_degs = int(l_string[0:2])
lat_mins = float(l_string[2:])
lat_hemi = self.gps_segments[2]
# Longitude
l_string = self.gps_segments[3]
lon_degs = int(l_string[0:3])
lon_mins = float(l_string[3:])
lon_hemi = self.gps_segments[4]
except ValueError:
return False
if lat_hemi not in self.__HEMISPHERES:
return False
if lon_hemi not in self.__HEMISPHERES:
return False
# Update Object Data
self._latitude = [lat_degs, lat_mins, lat_hemi]
self._longitude = [lon_degs, lon_mins, lon_hemi]
self.valid = True
# Update Last Fix Time
self.new_fix_time()
else: # Clear Position Data if Sentence is 'Invalid'
self._latitude = [0, 0.0, 'N']
self._longitude = [0, 0.0, 'W']
self.valid = False
return True
def gpvtg(self):
"""Parse Track Made Good and Ground Speed (VTG) Sentence. Updates speed and course"""
try:
course = float(self.gps_segments[1]) if self.gps_segments[1] else 0.0
spd_knt = float(self.gps_segments[5]) if self.gps_segments[5] else 0.0
except ValueError:
return False
# Include mph and km/h
self.speed = (spd_knt, spd_knt * 1.151, spd_knt * 1.852)
self.course = course
return True
def gpgga(self):
"""Parse Global Positioning System Fix Data (GGA) Sentence. Updates UTC timestamp, latitude, longitude,
fix status, satellites in use, Horizontal Dilution of Precision (HDOP), altitude, geoid height and fix status"""
try:
# UTC Timestamp
utc_string = self.gps_segments[1]
# Skip timestamp if receiver doesn't have on yet
if utc_string:
hours = (int(utc_string[0:2]) + self.local_offset) % 24
minutes = int(utc_string[2:4])
seconds = float(utc_string[4:])
else:
hours = 0
minutes = 0
seconds = 0.0
# Number of Satellites in Use
satellites_in_use = int(self.gps_segments[7])
# Get Fix Status
fix_stat = int(self.gps_segments[6])
except (ValueError, IndexError):
return False
try:
# Horizontal Dilution of Precision
hdop = float(self.gps_segments[8])
except (ValueError, IndexError):
hdop = 0.0
# Process Location and Speed Data if Fix is GOOD
if fix_stat:
# Longitude / Latitude
try:
# Latitude
l_string = self.gps_segments[2]
lat_degs = int(l_string[0:2])
lat_mins = float(l_string[2:])
lat_hemi = self.gps_segments[3]
# Longitude
l_string = self.gps_segments[4]
lon_degs = int(l_string[0:3])
lon_mins = float(l_string[3:])
lon_hemi = self.gps_segments[5]
except ValueError:
return False
if lat_hemi not in self.__HEMISPHERES:
return False
if lon_hemi not in self.__HEMISPHERES:
return False
# Altitude / Height Above Geoid
try:
altitude = float(self.gps_segments[9])
geoid_height = float(self.gps_segments[11])
except ValueError:
altitude = 0
geoid_height = 0
# Update Object Data
self._latitude = [lat_degs, lat_mins, lat_hemi]
self._longitude = [lon_degs, lon_mins, lon_hemi]
self.altitude = altitude
self.geoid_height = geoid_height
# Update Object Data
self.timestamp = [hours, minutes, seconds]
self.satellites_in_use = satellites_in_use
self.hdop = hdop
self.fix_stat = fix_stat
# If Fix is GOOD, update fix timestamp
if fix_stat:
self.new_fix_time()
return True
def gpgsa(self):
"""Parse GNSS DOP and Active Satellites (GSA) sentence. Updates GPS fix type, list of satellites used in
fix calculation, Position Dilution of Precision (PDOP), Horizontal Dilution of Precision (HDOP), Vertical
Dilution of Precision, and fix status"""
# Fix Type (None,2D or 3D)
try:
fix_type = int(self.gps_segments[2])
except ValueError:
return False
# Read All (up to 12) Available PRN Satellite Numbers
sats_used = []
for sats in range(12):
sat_number_str = self.gps_segments[3 + sats]
if sat_number_str:
try:
sat_number = int(sat_number_str)
sats_used.append(sat_number)
except ValueError:
return False
else:
break
# PDOP,HDOP,VDOP
try:
pdop = float(self.gps_segments[15])
hdop = float(self.gps_segments[16])
vdop = float(self.gps_segments[17])
except ValueError:
return False
# Update Object Data
self.fix_type = fix_type
# If Fix is GOOD, update fix timestamp
if fix_type > self.__NO_FIX:
self.new_fix_time()
self.satellites_used = sats_used
self.hdop = hdop
self.vdop = vdop
self.pdop = pdop
return True
def gpgsv(self):
"""Parse Satellites in View (GSV) sentence. Updates number of SV Sentences,the number of the last SV sentence
parsed, and data on each satellite present in the sentence"""
try:
num_sv_sentences = int(self.gps_segments[1])
current_sv_sentence = int(self.gps_segments[2])
sats_in_view = int(self.gps_segments[3])
except ValueError:
return False
# Create a blank dict to store all the satellite data from this sentence in:
# satellite PRN is key, tuple containing telemetry is value
satellite_dict = dict()
# Calculate Number of Satelites to pull data for and thus how many segment positions to read
if num_sv_sentences == current_sv_sentence:
# Last sentence may have 1-4 satellites; 5 - 20 positions
sat_segment_limit = (sats_in_view - ((num_sv_sentences - 1) * 4)) * 5
else:
sat_segment_limit = 20 # Non-last sentences have 4 satellites and thus read up to position 20
# Try to recover data for up to 4 satellites in sentence
for sats in range(4, sat_segment_limit, 4):
# If a PRN is present, grab satellite data
if self.gps_segments[sats]:
try:
sat_id = int(self.gps_segments[sats])
except (ValueError,IndexError):
return False
try: # elevation can be null (no value) when not tracking
elevation = int(self.gps_segments[sats+1])
except (ValueError,IndexError):
elevation = None
try: # azimuth can be null (no value) when not tracking
azimuth = int(self.gps_segments[sats+2])
except (ValueError,IndexError):
azimuth = None
try: # SNR can be null (no value) when not tracking
snr = int(self.gps_segments[sats+3])
except (ValueError,IndexError):
snr = None
# If no PRN is found, then the sentence has no more satellites to read
else:
break
# Add Satellite Data to Sentence Dict
satellite_dict[sat_id] = (elevation, azimuth, snr)
# Update Object Data
self.total_sv_sentences = num_sv_sentences
self.last_sv_sentence = current_sv_sentence
self.satellites_in_view = sats_in_view
# For a new set of sentences, we either clear out the existing sat data or
# update it as additional SV sentences are parsed
if current_sv_sentence == 1:
self.satellite_data = satellite_dict
else:
self.satellite_data.update(satellite_dict)
return True
##########################################
# Data Stream Handler Functions
##########################################
def new_sentence(self):
"""Adjust Object Flags in Preparation for a New Sentence"""
self.gps_segments = ['']
self.active_segment = 0
self.crc_xor = 0
self.sentence_active = True
self.process_crc = True
self.char_count = 0
def update(self, new_char):
"""Process a new input char and updates GPS object if necessary based on special characters ('$', ',', '*')
Function builds a list of received string that are validate by CRC prior to parsing by the appropriate
sentence function. Returns sentence type on successful parse, None otherwise"""
valid_sentence = False
# Validate new_char is a printable char
ascii_char = ord(new_char)
if 10 <= ascii_char <= 126:
self.char_count += 1
# Write Character to log file if enabled
if self.log_en:
self.write_log(new_char)
# Check if a new string is starting ($)
if new_char == '$':
self.new_sentence()
return None
elif self.sentence_active:
# Check if sentence is ending (*)
if new_char == '*':
self.process_crc = False
self.active_segment += 1
self.gps_segments.append('')
return None
# Check if a section is ended (,), Create a new substring to feed
# characters to
elif new_char == ',':
self.active_segment += 1
self.gps_segments.append('')
# Store All Other printable character and check CRC when ready
else:
self.gps_segments[self.active_segment] += new_char
# When CRC input is disabled, sentence is nearly complete
if not self.process_crc:
if len(self.gps_segments[self.active_segment]) == 2:
try:
final_crc = int(self.gps_segments[self.active_segment], 16)
if self.crc_xor == final_crc:
valid_sentence = True
else:
self.crc_fails += 1
except ValueError:
pass # CRC Value was deformed and could not have been correct
# Update CRC
if self.process_crc:
self.crc_xor ^= ascii_char
# If a Valid Sentence Was received and it's a supported sentence, then parse it!!
if valid_sentence:
self.clean_sentences += 1 # Increment clean sentences received
self.sentence_active = False # Clear Active Processing Flag
if self.gps_segments[0] in self.supported_sentences:
# parse the Sentence Based on the message type, return True if parse is clean
if self.supported_sentences[self.gps_segments[0]](self):
# Let host know that the GPS object was updated by returning parsed sentence type
self.parsed_sentences += 1
return self.gps_segments[0]
# Check that the sentence buffer isn't filling up with Garage waiting for the sentence to complete
if self.char_count > self.SENTENCE_LIMIT:
self.sentence_active = False
# Tell Host no new sentence was parsed
return None
def new_fix_time(self):
"""Updates a high resolution counter with current time when fix is updated. Currently only triggered from
GGA, GSA and RMC sentences"""
try:
self.fix_time = utime.ticks_ms()
except NameError:
self.fix_time = time.time()
#########################################
# User Helper Functions
# These functions make working with the GPS object data easier
#########################################
def satellite_data_updated(self):
"""
Checks if the all the GSV sentences in a group have been read, making satellite data complete
:return: boolean
"""
if self.total_sv_sentences > 0 and self.total_sv_sentences == self.last_sv_sentence:
return True
else:
return False
def unset_satellite_data_updated(self):
"""
Mark GSV sentences as read indicating the data has been used and future updates are fresh
"""
self.last_sv_sentence = 0
def satellites_visible(self):
"""
Returns a list of of the satellite PRNs currently visible to the receiver
:return: list
"""
return list(self.satellite_data.keys())
def time_since_fix(self):
"""Returns number of millisecond since the last sentence with a valid fix was parsed. Returns 0 if
no fix has been found"""
# Test if a Fix has been found
if self.fix_time == 0:
return -1
# Try calculating fix time using utime; if not running MicroPython
# time.time() returns a floating point value in secs
try:
current = utime.ticks_diff(utime.ticks_ms(), self.fix_time)
except NameError:
current = (time.time() - self.fix_time) * 1000 # ms
return current
def compass_direction(self):
"""
Determine a cardinal or inter-cardinal direction based on current course.
:return: string
"""
# Calculate the offset for a rotated compass
if self.course >= 348.75:
offset_course = 360 - self.course
else:
offset_course = self.course + 11.25
# Each compass point is separated by 22.5 degrees, divide to find lookup value
dir_index = floor(offset_course / 22.5)
final_dir = self.__DIRECTIONS[dir_index]
return final_dir
def latitude_string(self):
"""
Create a readable string of the current latitude data
:return: string
"""
if self.coord_format == 'dd':
formatted_latitude = self.latitude
lat_string = str(formatted_latitude[0]) + '° ' + str(self._latitude[2])
elif self.coord_format == 'dms':
formatted_latitude = self.latitude
lat_string = str(formatted_latitude[0]) + '° ' + str(formatted_latitude[1]) + "' " + str(formatted_latitude[2]) + '" ' + str(formatted_latitude[3])
else:
lat_string = str(self._latitude[0]) + '° ' + str(self._latitude[1]) + "' " + str(self._latitude[2])
return lat_string
def longitude_string(self):
"""
Create a readable string of the current longitude data
:return: string
"""
if self.coord_format == 'dd':
formatted_longitude = self.longitude
lon_string = str(formatted_longitude[0]) + '° ' + str(self._longitude[2])
elif self.coord_format == 'dms':
formatted_longitude = self.longitude
lon_string = str(formatted_longitude[0]) + '° ' + str(formatted_longitude[1]) + "' " + str(formatted_longitude[2]) + '" ' + str(formatted_longitude[3])
else:
lon_string = str(self._longitude[0]) + '° ' + str(self._longitude[1]) + "' " + str(self._longitude[2])
return lon_string
def speed_string(self, unit='kph'):
"""
Creates a readable string of the current speed data in one of three units
:param unit: string of 'kph','mph, or 'knot'
:return:
"""
if unit == 'mph':
speed_string = str(self.speed[1]) + ' mph'
elif unit == 'knot':
if self.speed[0] == 1:
unit_str = ' knot'
else:
unit_str = ' knots'
speed_string = str(self.speed[0]) + unit_str
else:
speed_string = str(self.speed[2]) + ' km/h'
return speed_string
def date_string(self, formatting='s_mdy', century='20'):
"""
Creates a readable string of the current date.
Can select between long format: Januray 1st, 2014
or two short formats:
11/01/2014 (MM/DD/YYYY)
01/11/2014 (DD/MM/YYYY)
:param formatting: string 's_mdy', 's_dmy', or 'long'
:param century: int delineating the century the GPS data is from (19 for 19XX, 20 for 20XX)
:return: date_string string with long or short format date
"""
# Long Format Januray 1st, 2014
if formatting == 'long':
# Retrieve Month string from private set
month = self.__MONTHS[self.date[1] - 1]
# Determine Date Suffix
if self.date[0] in (1, 21, 31):
suffix = 'st'
elif self.date[0] in (2, 22):
suffix = 'nd'
elif self.date[0] == (3, 23):
suffix = 'rd'
else:
suffix = 'th'
day = str(self.date[0]) + suffix # Create Day String
year = century + str(self.date[2]) # Create Year String
date_string = month + ' ' + day + ', ' + year # Put it all together
else:
# Add leading zeros to day string if necessary
if self.date[0] < 10:
day = '0' + str(self.date[0])
else:
day = str(self.date[0])
# Add leading zeros to month string if necessary
if self.date[1] < 10:
month = '0' + str(self.date[1])
else:
month = str(self.date[1])
# Add leading zeros to year string if necessary
if self.date[2] < 10:
year = '0' + str(self.date[2])
else:
year = str(self.date[2])
# Build final string based on desired formatting
if formatting == 's_dmy':
date_string = day + '/' + month + '/' + year
else: # Default date format
date_string = month + '/' + day + '/' + year
return date_string
# All the currently supported NMEA sentences
supported_sentences = {'GPRMC': gprmc, 'GLRMC': gprmc,
'GPGGA': gpgga, 'GLGGA': gpgga,
'GPVTG': gpvtg, 'GLVTG': gpvtg,
'GPGSA': gpgsa, 'GLGSA': gpgsa,
'GPGSV': gpgsv, 'GLGSV': gpgsv,
'GPGLL': gpgll, 'GLGLL': gpgll,
'GNGGA': gpgga, 'GNRMC': gprmc,
'GNVTG': gpvtg, 'GNGLL': gpgll,
'GNGSA': gpgsa,
}
if __name__ == "__main__":
pass

76
src/lib/gps/read.py Normal file
View File

@ -0,0 +1,76 @@
from time import sleep
from machine import Pin, UART
from lib.gps.micropyGPS import MicropyGPS
from lib.interval_tracker import interval_tracker
from lib.logging import logger
from lib.ttime import initialize_rtc, unix_timestamp
PIN_GPS_POWER = 12
PIN_GPS_UART_RXD = 34
PIN_GPS_UART_TXD = 26
GPS_BAUDRATE = 115200
uart = UART(1, baudrate=GPS_BAUDRATE, rx=PIN_GPS_UART_RXD, tx=PIN_GPS_UART_TXD)
gps_power_pin = Pin(PIN_GPS_POWER, Pin.OUT)
gps_power_pin.value(0) # Turn off GPS power initially
def read_gps_uart(want: str):
# TODO: add timeout
m = MicropyGPS(location_formatting='dd')
buffer = b''
decoded = ''
while True:
if uart.any():
c = uart.read(1)
if c == b'\r':
continue
if c == b'\n':
decoded = buffer.decode()
buffer = b''
else:
buffer += c
if decoded.startswith(want):
for x in decoded:
m.update(x)
if m.latitude[0] > 0 and m.longitude[0] > 0:
return m
decoded = ''
sleep(0.05)
class Position:
def __init__(self, valid: bool, latitude: tuple, longitude: tuple, altitude: float, speed: float, satellites_in_use: int, hdop: float, timestamp: int, timedata: tuple, course: float):
self.valid = valid
self.altitude = altitude
self.speed = speed
self.satellites_in_use = satellites_in_use
self.hdop = hdop
self.timestamp = timestamp
self.timedata = timedata # real GPS timestamp in the format (timestamp, date)
self.course = course
self.latitude = latitude[0]
if latitude[1] == 'S':
self.latitude = -latitude[0]
self.longitude = longitude[0]
if longitude[1] == 'W':
self.longitude = -longitude[0]
def get_position():
gps_power_pin.value(1) # Always make sure the GPS is on.
timestamp = unix_timestamp()
gnrmc = read_gps_uart('$GNRMC')
gngga = read_gps_uart('$GNGGA')
p = Position(gnrmc.valid, gnrmc.latitude, gnrmc.longitude, gnrmc.altitude, gnrmc.speed[0], gngga.satellites_in_use, gngga.hdop, timestamp, (gnrmc.timestamp, gnrmc.date), gnrmc.course)
# Set the clock if it's time.
if interval_tracker.check('ntp_sync'):
initialize_rtc(p)
logger('Updated time', source='GPS')
return p

View File

@ -1,4 +1,4 @@
def int_or_none(value):
def type_or_none(value, type_cls):
if value is None:
return None
return int(value)
return type_cls(value)

View File

@ -18,5 +18,7 @@ class IntervalTracker:
interval_tracker = IntervalTracker(
wifi_scan=INTERVAL_WIFI_SCAN,
ntp_sync=INTERVAL_NTP_SYNC
ntp_sync=INTERVAL_NTP_SYNC,
active_position_send=INTERVAL_ACTIVE_POSITION_SEND,
clock_drift_comp=INTERVAL_CLOCK_DRIFT_COMP
)

14
src/lib/led.py Normal file
View File

@ -0,0 +1,14 @@
from machine import Pin
from lib.consts import PIN_LED
led = Pin(PIN_LED, Pin.OUT)
def led_on():
led_off()
led.value(1)
def led_off():
led.value(0)

View File

@ -10,13 +10,17 @@ class WifiMananger:
_wlan = None
def activate(self):
assert self._wlan is None
if self._wlan is not None:
raise Exception("Already activated")
self._wlan = network.WLAN(network.STA_IF)
self._wlan.active(True)
def connect(self):
assert not self._wlan.isconnected()
def disconnect(self):
self._wlan.disconnect()
def connect(self):
if self._wlan.isconnected():
raise Exception("Already connected")
logger(f'Scanning', source='WIFI')
found = False
for item in self.scan():
@ -49,11 +53,20 @@ class WifiMananger:
def ifconfig(self):
return self._wlan.ifconfig()
def signal_strength(self):
for item in self.scan():
if item[0].decode() == WIFI_SSID:
return item[3]
def config(self, value: str):
return self._wlan.config(value)
def scan(self):
return self._wlan.scan()
def mac_addr(self):
m = self._wlan.config('mac')
return ':'.join('%02x' % b for b in m)
wifi = WifiMananger()

View File

@ -1,20 +1,20 @@
from config import DEVICE_ID, TRACCAR_HOST
from lib.helpers import int_or_none
from lib.ttime import timestamp
from lib.helpers import type_or_none
from lib.ttime import unix_timestamp
class CellTowerInfo:
class CellularInfo:
def __init__(self, mcc: int, mnc: int, lac: int, cell_id: int, signal_strength: int):
if not isinstance(mcc, int):
raise ValueError("mcc must be an integer")
raise ValueError(f"mcc must be an integer, not {type(mcc)}")
if not isinstance(mnc, int):
raise ValueError("mnc must be an integer")
raise ValueError(f"mnc must be an integer, not {type(mnc)}")
if not isinstance(lac, int):
raise ValueError("lac must be an integer")
raise ValueError(f"lac must be an integer, not {type(lac)}")
if not isinstance(cell_id, int):
raise ValueError("cell_id must be an integer")
raise ValueError(f"cell_id must be an integer, not {type(cell_id)}")
if not isinstance(signal_strength, int):
raise ValueError("signal_strength must be an integer")
raise ValueError(f"signal_strength must be an integer, not {type(signal_strength)}")
self.mcc = mcc
self.mnc = mnc
@ -26,56 +26,59 @@ class CellTowerInfo:
class WifiInfo:
def __init__(self, mac_addr: str, signal_strength: int):
if not isinstance(mac_addr, str):
raise ValueError("mac_addr must be a string")
raise ValueError(f"mac_addr must be a string, not {type(mac_addr)}")
if not isinstance(signal_strength, int):
raise ValueError("signal_strength must be an integer")
raise ValueError(f"signal_strength must be an integer, not {type(signal_strength)}")
self.mac_addr = mac_addr
self.signal_strength = signal_strength
class TraccarGetRequest:
# https://www.traccar.org/osmand/
"""
https://www.traccar.org/osmand/
"""
def __init__(self, timestamp: int, lat: float, lon: float, loc_valid: bool = True,
cell: CellTowerInfo = None, wifi: WifiInfo = None, speed: int = None, heading: int = None, altitude: int = None,
accuracy: int = None, hdop: int = None, custom: dict = None):
cell: CellularInfo = None, wifi: WifiInfo = None, speed: int = None, heading: int = None, altitude: int = None,
accuracy: int = None, hdop: float = None, custom: dict = None):
if not isinstance(timestamp, (int, float)):
raise ValueError("timestamp must be an integer")
raise ValueError(f"timestamp must be an integer, not {type(timestamp)}")
if not isinstance(lat, (float, int)):
raise ValueError("lat must be a float")
raise ValueError(f"lat must be a float, not {type(lat)}")
if not isinstance(lon, (float, int)):
raise ValueError("lon must be a float")
raise ValueError(f"lon must be a float, not {type(lon)}")
if not isinstance(loc_valid, bool):
raise ValueError("loc_valid must be a boolean")
if cell is not None and not isinstance(cell, CellTowerInfo):
raise ValueError("cell must be an instance of CellTowerInfo")
raise ValueError(f"loc_valid must be a boolean, not {type(loc_valid)}")
if cell is not None and not isinstance(cell, CellularInfo):
raise ValueError(f"cell must be an instance of CellularInfo, not {type(cell)}")
if wifi is not None and not isinstance(wifi, WifiInfo):
raise ValueError("wifi must be an instance of WifiInfo")
raise ValueError(f"wifi must be an instance of WifiInfo, not {type(wifi)}")
if speed is not None and not isinstance(speed, (int, float)):
raise ValueError("speed must be an integer")
raise ValueError(f"speed must be an integer, not {type(speed)}")
if heading is not None and not isinstance(heading, (int, float)):
raise ValueError("heading must be an integer")
raise ValueError(f"heading must be an integer, not {type(heading)}")
if altitude is not None and not isinstance(altitude, (int, float)):
raise ValueError("altitude must be an integer")
raise ValueError(f"altitude must be an integer, not {type(altitude)}")
if accuracy is not None and not isinstance(accuracy, (int, float)):
raise ValueError("accuracy must be an integer")
if hdop is not None and not isinstance(hdop, (int, float)):
raise ValueError("hdop must be an integer")
raise ValueError(f"accuracy must be an integer, not {type(accuracy)}")
if hdop is not None and not isinstance(hdop, float):
raise ValueError(f"hdop must be an integer, not {type(hdop)}")
if custom is not None and not isinstance(custom, dict):
raise ValueError("custom must be a dict")
raise ValueError(f"custom must be a dict, not {type(custom)}")
self.deviceid = DEVICE_ID
self.timestamp = int(timestamp)
self.lat = float(lat)
self.lon = float(lon)
self.loc_valid = loc_valid
self.cell = cell
self.wifi = wifi
self.speed = int_or_none(speed)
self.heading = int_or_none(heading)
self.altitude = int_or_none(altitude)
self.accuracy = int_or_none(accuracy)
self.hdop = int_or_none(hdop)
self.cell = cell # TODO: serialize
self.wifi = f'{wifi.mac_addr},{wifi.signal_strength}'
self.speed = type_or_none(speed, int)
self.heading = type_or_none(heading, int)
self.altitude = type_or_none(altitude, int)
self.accuracy = type_or_none(accuracy, int)
self.hdop = type_or_none(hdop, float)
self.custom = custom
self.query = self.build_query()
@ -100,6 +103,6 @@ class TraccarGetRequest:
return parameters
class TraccarPingRequest(TraccarGetRequest):
class TraccarPingGetRequest(TraccarGetRequest):
def __init__(self):
super().__init__(timestamp=timestamp(), lat=0, lon=0, loc_valid=False, custom={'ping': True})
super().__init__(timestamp=unix_timestamp(), lat=0, lon=0, loc_valid=False, custom={'ping': True})

48
src/lib/traccar/send.py Normal file
View File

@ -0,0 +1,48 @@
import gc
import urequests as requests
from lib.gps.read import get_position
from lib.led import led_on, led_off
from lib.logging import LogLevel, logger
from lib.networking.wifi import wifi
from lib.traccar.request import TraccarGetRequest, WifiInfo
def send_to_traccar(event: TraccarGetRequest):
led_on()
params = ' '.join(['='.join((x, str(y))) for x, y in event.query])
# TODO: determine cell or wifi here
gc.collect()
r = requests.post(event.request_url)
if r.status_code != 200:
logger(f'{params} - Failed to send request to traccar: "{r.text}" - Status code: {r.status_code}', level=LogLevel.error, source='NET')
else:
logger(params, source='NET')
led_off()
gc.collect()
def assemble_position_message():
# Get the GPS fix.
position = get_position()
# Gather connection info
conn_info = WifiInfo(mac_addr=wifi.mac_addr(), signal_strength=wifi.signal_strength())
# cell_info = CellInfo()
# Startup ping
return TraccarGetRequest(
timestamp=position.timestamp,
lat=position.latitude,
lon=position.longitude,
loc_valid=position.valid,
wifi=conn_info,
speed=position.speed,
heading=position.course,
altitude=position.altitude,
hdop=position.hdop,
custom={
'satellites': position.satellites_in_use
}
)

View File

@ -1,16 +0,0 @@
import gc
import urequests as requests
from lib.logging import LogLevel, logger
from lib.traccar.request import TraccarGetRequest
def send_to_traccar(event: TraccarGetRequest):
params = ' '.join(['='.join((x, str(y))) for x, y in event.query])
r = requests.post(event.request_url)
gc.collect()
if r.status_code != 200:
logger(f'{params} - Failed to send request to traccar: {r.text} - Status code: {r.status_code}', level=LogLevel.error, source='NET')
else:
logger(params, source='NET')

View File

@ -1,5 +1,42 @@
import time
from machine import RTC
from lib.ntp import Ntp
_rtc = RTC()
Ntp.set_datetime_callback(_rtc.datetime)
def timestamp():
def initialize_rtc(first_fix):
hour, minute, second = first_fix.timedata[0]
day, month, year = first_fix.timedata[1]
# Convert the GPS date and time to a struct_time
gps_time = time.mktime((year + 2000, month, day, hour, minute, int(second), 0, 0))
# Convert the struct_time to a timestamp in microseconds since the device's epoch
gps_us = gps_time * 1000_000
# Get the current time in microseconds since the device's epoch
now_us = time.ticks_us()
# Block NTP.
Ntp.set_hosts(('0.0.0.0',))
# Pass the GPS time and the current time to the rtc_sync function
Ntp.rtc_sync(new_time=(gps_us, now_us))
def unix_timestamp():
return Ntp.time_s(epoch=Ntp.EPOCH_1970, utc=True)
def gps_to_unix(gps_time, gps_date):
"""
Literally who knows how to do this
"""
hour, minute, second = gps_time
day, month, year = gps_date
gps_datetime = time.mktime((year - 12, month + 6, day - 12, hour + 6, minute, int(second), 0, 0))
return int(gps_datetime)

View File

@ -1,19 +1,22 @@
import gc
import time
from lib.interval_tracker import interval_tracker
from lib.logging import logger
from lib.networking.wifi import wifi
from lib.ntp import Ntp
from lib.traccar.send import send_to_traccar, assemble_position_message
from startup import startup
startup()
while True:
gc.collect()
if interval_tracker.check('wifi_scan'):
if not wifi.isconnected():
wifi.connect()
if interval_tracker.check('ntp_sync'):
logger('Syncing time', source='NET')
Ntp.rtc_sync()
if interval_tracker.check('active_position_send'):
send_to_traccar(assemble_position_message())
if interval_tracker.check('clock_drift_comp'):
Ntp.drift_compensate(Ntp.drift_us())
gc.collect()
time.sleep(1)

View File

@ -1,44 +1,50 @@
from machine import RTC
from config import *
from lib.gps.read import gps_power_pin, get_position
from lib.led import led_on, led_off
from lib.logging import logger
from lib.networking.wifi import wifi
from lib.ntp import Ntp
from lib.traccar.request import TraccarPingRequest
from lib.traccar.traccar import send_to_traccar
from lib.ttime import timestamp
from lib.traccar.send import send_to_traccar, assemble_position_message
from lib.ttime import unix_timestamp, initialize_rtc
def startup():
led_on()
print('Freematics Micropython Edition')
print('https://git.evulid.cc/cyberes/freematics-firmware_v5-micropython')
# Device info
print('==========')
validate_config()
logger(f'Server: {TRACCAR_HOST}', source='TRACCAR')
logger(f'Device ID: {DEVICE_ID}', source='TRACCAR')
logger(f'Server: {TRACCAR_HOST}', source='TRACCAR')
print('==========')
# WIFI
# Start the GPS and let it get itself sorted out while we do other things.
gps_power_pin.value(1)
# Activate wifi but don't connect yet.
wifi.activate()
if wifi.isconnected():
wifi.disconnect()
logger('Disconnected from existing network', source='WIFI')
wifi.connect()
# NTP/time
logger('Syncing time', source='NET')
_initialize_rtc()
logger(f'Current time: {timestamp()}')
# GPS
logger('Getting initial fix', source='GPS')
position = get_position()
# Startup ping
logger('Sending startup ping', source='NET')
send_to_traccar(TraccarPingRequest())
# Time.
initialize_rtc(position)
logger(f'Current time: {unix_timestamp()}')
# We are fully initalized so we can turn off the LED.
led_off()
def _initialize_rtc():
_rtc = RTC()
Ntp.set_datetime_callback(_rtc.datetime)
Ntp.set_hosts(('0.pool.ntp.org', '1.pool.ntp.org', '2.pool.ntp.org'))
Ntp.rtc_sync()
logger('Startup complete!')
print('====================')
# Send the first message
send_to_traccar(assemble_position_message())
def validate_config():

11
test.py Normal file
View File

@ -0,0 +1,11 @@
import time
def gps_to_unix(gps_time, gps_date):
hour, minute, second = gps_time
day, month, year = gps_date
gps_datetime = time.mktime((year - 12, month + 6, day - 12, hour, minute, int(second), 0, 0))
return gps_datetime
print(gps_to_unix([8, 43, 15.4], (29, 6, 20)))