diff --git a/src/config.sample.py b/src/config.sample.py index d3a3316..53cf490 100644 --- a/src/config.sample.py +++ b/src/config.sample.py @@ -13,3 +13,5 @@ WIFI_CONNECT_TIMEOUT = 10 # All in seconds. INTERVAL_WIFI_SCAN = 10 INTERVAL_NTP_SYNC = 86400 +INTERVAL_ACTIVE_POSITION_SEND = 120 # only when active and moving +INTERVAL_CLOCK_DRIFT_COMP = 1800 diff --git a/src/lib/consts.py b/src/lib/consts.py new file mode 100644 index 0000000..4646da2 --- /dev/null +++ b/src/lib/consts.py @@ -0,0 +1 @@ +PIN_LED = 4 diff --git a/src/lib/gps/__init__.py b/src/lib/gps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/lib/gps/micropyGPS.py b/src/lib/gps/micropyGPS.py new file mode 100644 index 0000000..61b19ed --- /dev/null +++ b/src/lib/gps/micropyGPS.py @@ -0,0 +1,830 @@ +""" +# MicropyGPS - a GPS NMEA sentence parser for Micropython/Python 3.X +# Copyright (c) 2017 Michael Calvin McCoy (calvin.mccoy@protonmail.com) +# The MIT License (MIT) - see LICENSE file +""" + +# TODO: +# Time Since First Fix +# Distance/Time to Target +# More Helper Functions +# Dynamically limit sentences types to parse + +from math import floor, modf + +# Import utime or time for fix time handling +try: + # Assume running on MicroPython + import utime +except ImportError: + # Otherwise default to time module for non-embedded implementations + # Should still support millisecond resolution. + import time + + +class MicropyGPS(object): + """GPS NMEA Sentence Parser. Creates object that stores all relevant GPS data and statistics. + Parses sentences one character at a time using update(). """ + + # Max Number of Characters a valid sentence can be (based on GGA sentence) + SENTENCE_LIMIT = 90 + __HEMISPHERES = ('N', 'S', 'E', 'W') + __NO_FIX = 1 + __FIX_2D = 2 + __FIX_3D = 3 + __DIRECTIONS = ('N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', + 'WNW', 'NW', 'NNW') + __MONTHS = ('January', 'February', 'March', 'April', 'May', + 'June', 'July', 'August', 'September', 'October', + 'November', 'December') + + def __init__(self, local_offset=0, location_formatting='ddm'): + """ + Setup GPS Object Status Flags, Internal Data Registers, etc + local_offset (int): Timzone Difference to UTC + location_formatting (str): Style For Presenting Longitude/Latitude: + Decimal Degree Minute (ddm) - 40° 26.767′ N + Degrees Minutes Seconds (dms) - 40° 26′ 46″ N + Decimal Degrees (dd) - 40.446° N + """ + + ##################### + # Object Status Flags + self.sentence_active = False + self.active_segment = 0 + self.process_crc = False + self.gps_segments = [] + self.crc_xor = 0 + self.char_count = 0 + self.fix_time = 0 + + ##################### + # Sentence Statistics + self.crc_fails = 0 + self.clean_sentences = 0 + self.parsed_sentences = 0 + + ##################### + # Logging Related + self.log_handle = None + self.log_en = False + + ##################### + # Data From Sentences + # Time + self.timestamp = [0, 0, 0.0] + self.date = [0, 0, 0] + self.local_offset = local_offset + + # Position/Motion + self._latitude = [0, 0.0, 'N'] + self._longitude = [0, 0.0, 'W'] + self.coord_format = location_formatting + self.speed = [0.0, 0.0, 0.0] + self.course = 0.0 + self.altitude = 0.0 + self.geoid_height = 0.0 + + # GPS Info + self.satellites_in_view = 0 + self.satellites_in_use = 0 + self.satellites_used = [] + self.last_sv_sentence = 0 + self.total_sv_sentences = 0 + self.satellite_data = dict() + self.hdop = 0.0 + self.pdop = 0.0 + self.vdop = 0.0 + self.valid = False + self.fix_stat = 0 + self.fix_type = 1 + + ######################################## + # Coordinates Translation Functions + ######################################## + @property + def latitude(self): + """Format Latitude Data Correctly""" + if self.coord_format == 'dd': + decimal_degrees = self._latitude[0] + (self._latitude[1] / 60) + return [decimal_degrees, self._latitude[2]] + elif self.coord_format == 'dms': + minute_parts = modf(self._latitude[1]) + seconds = round(minute_parts[0] * 60) + return [self._latitude[0], int(minute_parts[1]), seconds, self._latitude[2]] + else: + return self._latitude + + @property + def longitude(self): + """Format Longitude Data Correctly""" + if self.coord_format == 'dd': + decimal_degrees = self._longitude[0] + (self._longitude[1] / 60) + return [decimal_degrees, self._longitude[2]] + elif self.coord_format == 'dms': + minute_parts = modf(self._longitude[1]) + seconds = round(minute_parts[0] * 60) + return [self._longitude[0], int(minute_parts[1]), seconds, self._longitude[2]] + else: + return self._longitude + + ######################################## + # Logging Related Functions + ######################################## + def start_logging(self, target_file, mode="append"): + """ + Create GPS data log object + """ + # Set Write Mode Overwrite or Append + mode_code = 'w' if mode == 'new' else 'a' + + try: + self.log_handle = open(target_file, mode_code) + except AttributeError: + print("Invalid FileName") + return False + + self.log_en = True + return True + + def stop_logging(self): + """ + Closes the log file handler and disables further logging + """ + try: + self.log_handle.close() + except AttributeError: + print("Invalid Handle") + return False + + self.log_en = False + return True + + def write_log(self, log_string): + """Attempts to write the last valid NMEA sentence character to the active file handler + """ + try: + self.log_handle.write(log_string) + except TypeError: + return False + return True + + ######################################## + # Sentence Parsers + ######################################## + def gprmc(self): + """Parse Recommended Minimum Specific GPS/Transit data (RMC)Sentence. + Updates UTC timestamp, latitude, longitude, Course, Speed, Date, and fix status + """ + + # UTC Timestamp + try: + utc_string = self.gps_segments[1] + + if utc_string: # Possible timestamp found + hours = (int(utc_string[0:2]) + self.local_offset) % 24 + minutes = int(utc_string[2:4]) + seconds = float(utc_string[4:]) + self.timestamp = [hours, minutes, seconds] + else: # No Time stamp yet + self.timestamp = [0, 0, 0.0] + + except ValueError: # Bad Timestamp value present + return False + + # Date stamp + try: + date_string = self.gps_segments[9] + + # Date string printer function assumes to be year >=2000, + # date_string() must be supplied with the correct century argument to display correctly + if date_string: # Possible date stamp found + day = int(date_string[0:2]) + month = int(date_string[2:4]) + year = int(date_string[4:6]) + self.date = (day, month, year) + else: # No Date stamp yet + self.date = (0, 0, 0) + + except ValueError: # Bad Date stamp value present + return False + + # Check Receiver Data Valid Flag + if self.gps_segments[2] == 'A': # Data from Receiver is Valid/Has Fix + + # Longitude / Latitude + try: + # Latitude + l_string = self.gps_segments[3] + lat_degs = int(l_string[0:2]) + lat_mins = float(l_string[2:]) + lat_hemi = self.gps_segments[4] + + # Longitude + l_string = self.gps_segments[5] + lon_degs = int(l_string[0:3]) + lon_mins = float(l_string[3:]) + lon_hemi = self.gps_segments[6] + except ValueError: + return False + + if lat_hemi not in self.__HEMISPHERES: + return False + + if lon_hemi not in self.__HEMISPHERES: + return False + + # Speed + try: + spd_knt = float(self.gps_segments[7]) + except ValueError: + return False + + # Course + try: + if self.gps_segments[8]: + course = float(self.gps_segments[8]) + else: + course = 0.0 + except ValueError: + return False + + # TODO - Add Magnetic Variation + + # Update Object Data + self._latitude = [lat_degs, lat_mins, lat_hemi] + self._longitude = [lon_degs, lon_mins, lon_hemi] + # Include mph and hm/h + self.speed = [spd_knt, spd_knt * 1.151, spd_knt * 1.852] + self.course = course + self.valid = True + + # Update Last Fix Time + self.new_fix_time() + + else: # Clear Position Data if Sentence is 'Invalid' + self._latitude = [0, 0.0, 'N'] + self._longitude = [0, 0.0, 'W'] + self.speed = [0.0, 0.0, 0.0] + self.course = 0.0 + self.valid = False + + return True + + def gpgll(self): + """Parse Geographic Latitude and Longitude (GLL)Sentence. Updates UTC timestamp, latitude, + longitude, and fix status""" + + # UTC Timestamp + try: + utc_string = self.gps_segments[5] + + if utc_string: # Possible timestamp found + hours = (int(utc_string[0:2]) + self.local_offset) % 24 + minutes = int(utc_string[2:4]) + seconds = float(utc_string[4:]) + self.timestamp = [hours, minutes, seconds] + else: # No Time stamp yet + self.timestamp = [0, 0, 0.0] + + except ValueError: # Bad Timestamp value present + return False + + # Check Receiver Data Valid Flag + if self.gps_segments[6] == 'A': # Data from Receiver is Valid/Has Fix + + # Longitude / Latitude + try: + # Latitude + l_string = self.gps_segments[1] + lat_degs = int(l_string[0:2]) + lat_mins = float(l_string[2:]) + lat_hemi = self.gps_segments[2] + + # Longitude + l_string = self.gps_segments[3] + lon_degs = int(l_string[0:3]) + lon_mins = float(l_string[3:]) + lon_hemi = self.gps_segments[4] + except ValueError: + return False + + if lat_hemi not in self.__HEMISPHERES: + return False + + if lon_hemi not in self.__HEMISPHERES: + return False + + # Update Object Data + self._latitude = [lat_degs, lat_mins, lat_hemi] + self._longitude = [lon_degs, lon_mins, lon_hemi] + self.valid = True + + # Update Last Fix Time + self.new_fix_time() + + else: # Clear Position Data if Sentence is 'Invalid' + self._latitude = [0, 0.0, 'N'] + self._longitude = [0, 0.0, 'W'] + self.valid = False + + return True + + def gpvtg(self): + """Parse Track Made Good and Ground Speed (VTG) Sentence. Updates speed and course""" + try: + course = float(self.gps_segments[1]) if self.gps_segments[1] else 0.0 + spd_knt = float(self.gps_segments[5]) if self.gps_segments[5] else 0.0 + except ValueError: + return False + + # Include mph and km/h + self.speed = (spd_knt, spd_knt * 1.151, spd_knt * 1.852) + self.course = course + return True + + def gpgga(self): + """Parse Global Positioning System Fix Data (GGA) Sentence. Updates UTC timestamp, latitude, longitude, + fix status, satellites in use, Horizontal Dilution of Precision (HDOP), altitude, geoid height and fix status""" + + try: + # UTC Timestamp + utc_string = self.gps_segments[1] + + # Skip timestamp if receiver doesn't have on yet + if utc_string: + hours = (int(utc_string[0:2]) + self.local_offset) % 24 + minutes = int(utc_string[2:4]) + seconds = float(utc_string[4:]) + else: + hours = 0 + minutes = 0 + seconds = 0.0 + + # Number of Satellites in Use + satellites_in_use = int(self.gps_segments[7]) + + # Get Fix Status + fix_stat = int(self.gps_segments[6]) + + except (ValueError, IndexError): + return False + + try: + # Horizontal Dilution of Precision + hdop = float(self.gps_segments[8]) + except (ValueError, IndexError): + hdop = 0.0 + + # Process Location and Speed Data if Fix is GOOD + if fix_stat: + + # Longitude / Latitude + try: + # Latitude + l_string = self.gps_segments[2] + lat_degs = int(l_string[0:2]) + lat_mins = float(l_string[2:]) + lat_hemi = self.gps_segments[3] + + # Longitude + l_string = self.gps_segments[4] + lon_degs = int(l_string[0:3]) + lon_mins = float(l_string[3:]) + lon_hemi = self.gps_segments[5] + except ValueError: + return False + + if lat_hemi not in self.__HEMISPHERES: + return False + + if lon_hemi not in self.__HEMISPHERES: + return False + + # Altitude / Height Above Geoid + try: + altitude = float(self.gps_segments[9]) + geoid_height = float(self.gps_segments[11]) + except ValueError: + altitude = 0 + geoid_height = 0 + + # Update Object Data + self._latitude = [lat_degs, lat_mins, lat_hemi] + self._longitude = [lon_degs, lon_mins, lon_hemi] + self.altitude = altitude + self.geoid_height = geoid_height + + # Update Object Data + self.timestamp = [hours, minutes, seconds] + self.satellites_in_use = satellites_in_use + self.hdop = hdop + self.fix_stat = fix_stat + + # If Fix is GOOD, update fix timestamp + if fix_stat: + self.new_fix_time() + + return True + + def gpgsa(self): + """Parse GNSS DOP and Active Satellites (GSA) sentence. Updates GPS fix type, list of satellites used in + fix calculation, Position Dilution of Precision (PDOP), Horizontal Dilution of Precision (HDOP), Vertical + Dilution of Precision, and fix status""" + + # Fix Type (None,2D or 3D) + try: + fix_type = int(self.gps_segments[2]) + except ValueError: + return False + + # Read All (up to 12) Available PRN Satellite Numbers + sats_used = [] + for sats in range(12): + sat_number_str = self.gps_segments[3 + sats] + if sat_number_str: + try: + sat_number = int(sat_number_str) + sats_used.append(sat_number) + except ValueError: + return False + else: + break + + # PDOP,HDOP,VDOP + try: + pdop = float(self.gps_segments[15]) + hdop = float(self.gps_segments[16]) + vdop = float(self.gps_segments[17]) + except ValueError: + return False + + # Update Object Data + self.fix_type = fix_type + + # If Fix is GOOD, update fix timestamp + if fix_type > self.__NO_FIX: + self.new_fix_time() + + self.satellites_used = sats_used + self.hdop = hdop + self.vdop = vdop + self.pdop = pdop + + return True + + def gpgsv(self): + """Parse Satellites in View (GSV) sentence. Updates number of SV Sentences,the number of the last SV sentence + parsed, and data on each satellite present in the sentence""" + try: + num_sv_sentences = int(self.gps_segments[1]) + current_sv_sentence = int(self.gps_segments[2]) + sats_in_view = int(self.gps_segments[3]) + except ValueError: + return False + + # Create a blank dict to store all the satellite data from this sentence in: + # satellite PRN is key, tuple containing telemetry is value + satellite_dict = dict() + + # Calculate Number of Satelites to pull data for and thus how many segment positions to read + if num_sv_sentences == current_sv_sentence: + # Last sentence may have 1-4 satellites; 5 - 20 positions + sat_segment_limit = (sats_in_view - ((num_sv_sentences - 1) * 4)) * 5 + else: + sat_segment_limit = 20 # Non-last sentences have 4 satellites and thus read up to position 20 + + # Try to recover data for up to 4 satellites in sentence + for sats in range(4, sat_segment_limit, 4): + + # If a PRN is present, grab satellite data + if self.gps_segments[sats]: + try: + sat_id = int(self.gps_segments[sats]) + except (ValueError,IndexError): + return False + + try: # elevation can be null (no value) when not tracking + elevation = int(self.gps_segments[sats+1]) + except (ValueError,IndexError): + elevation = None + + try: # azimuth can be null (no value) when not tracking + azimuth = int(self.gps_segments[sats+2]) + except (ValueError,IndexError): + azimuth = None + + try: # SNR can be null (no value) when not tracking + snr = int(self.gps_segments[sats+3]) + except (ValueError,IndexError): + snr = None + # If no PRN is found, then the sentence has no more satellites to read + else: + break + + # Add Satellite Data to Sentence Dict + satellite_dict[sat_id] = (elevation, azimuth, snr) + + # Update Object Data + self.total_sv_sentences = num_sv_sentences + self.last_sv_sentence = current_sv_sentence + self.satellites_in_view = sats_in_view + + # For a new set of sentences, we either clear out the existing sat data or + # update it as additional SV sentences are parsed + if current_sv_sentence == 1: + self.satellite_data = satellite_dict + else: + self.satellite_data.update(satellite_dict) + + return True + + ########################################## + # Data Stream Handler Functions + ########################################## + + def new_sentence(self): + """Adjust Object Flags in Preparation for a New Sentence""" + self.gps_segments = [''] + self.active_segment = 0 + self.crc_xor = 0 + self.sentence_active = True + self.process_crc = True + self.char_count = 0 + + def update(self, new_char): + """Process a new input char and updates GPS object if necessary based on special characters ('$', ',', '*') + Function builds a list of received string that are validate by CRC prior to parsing by the appropriate + sentence function. Returns sentence type on successful parse, None otherwise""" + + valid_sentence = False + + # Validate new_char is a printable char + ascii_char = ord(new_char) + + if 10 <= ascii_char <= 126: + self.char_count += 1 + + # Write Character to log file if enabled + if self.log_en: + self.write_log(new_char) + + # Check if a new string is starting ($) + if new_char == '$': + self.new_sentence() + return None + + elif self.sentence_active: + + # Check if sentence is ending (*) + if new_char == '*': + self.process_crc = False + self.active_segment += 1 + self.gps_segments.append('') + return None + + # Check if a section is ended (,), Create a new substring to feed + # characters to + elif new_char == ',': + self.active_segment += 1 + self.gps_segments.append('') + + # Store All Other printable character and check CRC when ready + else: + self.gps_segments[self.active_segment] += new_char + + # When CRC input is disabled, sentence is nearly complete + if not self.process_crc: + + if len(self.gps_segments[self.active_segment]) == 2: + try: + final_crc = int(self.gps_segments[self.active_segment], 16) + if self.crc_xor == final_crc: + valid_sentence = True + else: + self.crc_fails += 1 + except ValueError: + pass # CRC Value was deformed and could not have been correct + + # Update CRC + if self.process_crc: + self.crc_xor ^= ascii_char + + # If a Valid Sentence Was received and it's a supported sentence, then parse it!! + if valid_sentence: + self.clean_sentences += 1 # Increment clean sentences received + self.sentence_active = False # Clear Active Processing Flag + + if self.gps_segments[0] in self.supported_sentences: + + # parse the Sentence Based on the message type, return True if parse is clean + if self.supported_sentences[self.gps_segments[0]](self): + + # Let host know that the GPS object was updated by returning parsed sentence type + self.parsed_sentences += 1 + return self.gps_segments[0] + + # Check that the sentence buffer isn't filling up with Garage waiting for the sentence to complete + if self.char_count > self.SENTENCE_LIMIT: + self.sentence_active = False + + # Tell Host no new sentence was parsed + return None + + def new_fix_time(self): + """Updates a high resolution counter with current time when fix is updated. Currently only triggered from + GGA, GSA and RMC sentences""" + try: + self.fix_time = utime.ticks_ms() + except NameError: + self.fix_time = time.time() + + ######################################### + # User Helper Functions + # These functions make working with the GPS object data easier + ######################################### + + def satellite_data_updated(self): + """ + Checks if the all the GSV sentences in a group have been read, making satellite data complete + :return: boolean + """ + if self.total_sv_sentences > 0 and self.total_sv_sentences == self.last_sv_sentence: + return True + else: + return False + + def unset_satellite_data_updated(self): + """ + Mark GSV sentences as read indicating the data has been used and future updates are fresh + """ + self.last_sv_sentence = 0 + + def satellites_visible(self): + """ + Returns a list of of the satellite PRNs currently visible to the receiver + :return: list + """ + return list(self.satellite_data.keys()) + + def time_since_fix(self): + """Returns number of millisecond since the last sentence with a valid fix was parsed. Returns 0 if + no fix has been found""" + + # Test if a Fix has been found + if self.fix_time == 0: + return -1 + + # Try calculating fix time using utime; if not running MicroPython + # time.time() returns a floating point value in secs + try: + current = utime.ticks_diff(utime.ticks_ms(), self.fix_time) + except NameError: + current = (time.time() - self.fix_time) * 1000 # ms + + return current + + def compass_direction(self): + """ + Determine a cardinal or inter-cardinal direction based on current course. + :return: string + """ + # Calculate the offset for a rotated compass + if self.course >= 348.75: + offset_course = 360 - self.course + else: + offset_course = self.course + 11.25 + + # Each compass point is separated by 22.5 degrees, divide to find lookup value + dir_index = floor(offset_course / 22.5) + + final_dir = self.__DIRECTIONS[dir_index] + + return final_dir + + def latitude_string(self): + """ + Create a readable string of the current latitude data + :return: string + """ + if self.coord_format == 'dd': + formatted_latitude = self.latitude + lat_string = str(formatted_latitude[0]) + '° ' + str(self._latitude[2]) + elif self.coord_format == 'dms': + formatted_latitude = self.latitude + lat_string = str(formatted_latitude[0]) + '° ' + str(formatted_latitude[1]) + "' " + str(formatted_latitude[2]) + '" ' + str(formatted_latitude[3]) + else: + lat_string = str(self._latitude[0]) + '° ' + str(self._latitude[1]) + "' " + str(self._latitude[2]) + return lat_string + + def longitude_string(self): + """ + Create a readable string of the current longitude data + :return: string + """ + if self.coord_format == 'dd': + formatted_longitude = self.longitude + lon_string = str(formatted_longitude[0]) + '° ' + str(self._longitude[2]) + elif self.coord_format == 'dms': + formatted_longitude = self.longitude + lon_string = str(formatted_longitude[0]) + '° ' + str(formatted_longitude[1]) + "' " + str(formatted_longitude[2]) + '" ' + str(formatted_longitude[3]) + else: + lon_string = str(self._longitude[0]) + '° ' + str(self._longitude[1]) + "' " + str(self._longitude[2]) + return lon_string + + def speed_string(self, unit='kph'): + """ + Creates a readable string of the current speed data in one of three units + :param unit: string of 'kph','mph, or 'knot' + :return: + """ + if unit == 'mph': + speed_string = str(self.speed[1]) + ' mph' + + elif unit == 'knot': + if self.speed[0] == 1: + unit_str = ' knot' + else: + unit_str = ' knots' + speed_string = str(self.speed[0]) + unit_str + + else: + speed_string = str(self.speed[2]) + ' km/h' + + return speed_string + + def date_string(self, formatting='s_mdy', century='20'): + """ + Creates a readable string of the current date. + Can select between long format: Januray 1st, 2014 + or two short formats: + 11/01/2014 (MM/DD/YYYY) + 01/11/2014 (DD/MM/YYYY) + :param formatting: string 's_mdy', 's_dmy', or 'long' + :param century: int delineating the century the GPS data is from (19 for 19XX, 20 for 20XX) + :return: date_string string with long or short format date + """ + + # Long Format Januray 1st, 2014 + if formatting == 'long': + # Retrieve Month string from private set + month = self.__MONTHS[self.date[1] - 1] + + # Determine Date Suffix + if self.date[0] in (1, 21, 31): + suffix = 'st' + elif self.date[0] in (2, 22): + suffix = 'nd' + elif self.date[0] == (3, 23): + suffix = 'rd' + else: + suffix = 'th' + + day = str(self.date[0]) + suffix # Create Day String + + year = century + str(self.date[2]) # Create Year String + + date_string = month + ' ' + day + ', ' + year # Put it all together + + else: + # Add leading zeros to day string if necessary + if self.date[0] < 10: + day = '0' + str(self.date[0]) + else: + day = str(self.date[0]) + + # Add leading zeros to month string if necessary + if self.date[1] < 10: + month = '0' + str(self.date[1]) + else: + month = str(self.date[1]) + + # Add leading zeros to year string if necessary + if self.date[2] < 10: + year = '0' + str(self.date[2]) + else: + year = str(self.date[2]) + + # Build final string based on desired formatting + if formatting == 's_dmy': + date_string = day + '/' + month + '/' + year + + else: # Default date format + date_string = month + '/' + day + '/' + year + + return date_string + + # All the currently supported NMEA sentences + supported_sentences = {'GPRMC': gprmc, 'GLRMC': gprmc, + 'GPGGA': gpgga, 'GLGGA': gpgga, + 'GPVTG': gpvtg, 'GLVTG': gpvtg, + 'GPGSA': gpgsa, 'GLGSA': gpgsa, + 'GPGSV': gpgsv, 'GLGSV': gpgsv, + 'GPGLL': gpgll, 'GLGLL': gpgll, + 'GNGGA': gpgga, 'GNRMC': gprmc, + 'GNVTG': gpvtg, 'GNGLL': gpgll, + 'GNGSA': gpgsa, + } + +if __name__ == "__main__": + pass diff --git a/src/lib/gps/read.py b/src/lib/gps/read.py new file mode 100644 index 0000000..d6e28c6 --- /dev/null +++ b/src/lib/gps/read.py @@ -0,0 +1,76 @@ +from time import sleep + +from machine import Pin, UART + +from lib.gps.micropyGPS import MicropyGPS +from lib.interval_tracker import interval_tracker +from lib.logging import logger +from lib.ttime import initialize_rtc, unix_timestamp + +PIN_GPS_POWER = 12 +PIN_GPS_UART_RXD = 34 +PIN_GPS_UART_TXD = 26 + +GPS_BAUDRATE = 115200 + +uart = UART(1, baudrate=GPS_BAUDRATE, rx=PIN_GPS_UART_RXD, tx=PIN_GPS_UART_TXD) +gps_power_pin = Pin(PIN_GPS_POWER, Pin.OUT) +gps_power_pin.value(0) # Turn off GPS power initially + + +def read_gps_uart(want: str): + # TODO: add timeout + m = MicropyGPS(location_formatting='dd') + buffer = b'' + decoded = '' + while True: + if uart.any(): + c = uart.read(1) + if c == b'\r': + continue + if c == b'\n': + decoded = buffer.decode() + buffer = b'' + else: + buffer += c + if decoded.startswith(want): + for x in decoded: + m.update(x) + if m.latitude[0] > 0 and m.longitude[0] > 0: + return m + decoded = '' + sleep(0.05) + + +class Position: + def __init__(self, valid: bool, latitude: tuple, longitude: tuple, altitude: float, speed: float, satellites_in_use: int, hdop: float, timestamp: int, timedata: tuple, course: float): + self.valid = valid + self.altitude = altitude + self.speed = speed + self.satellites_in_use = satellites_in_use + self.hdop = hdop + self.timestamp = timestamp + self.timedata = timedata # real GPS timestamp in the format (timestamp, date) + self.course = course + + self.latitude = latitude[0] + if latitude[1] == 'S': + self.latitude = -latitude[0] + self.longitude = longitude[0] + if longitude[1] == 'W': + self.longitude = -longitude[0] + + +def get_position(): + gps_power_pin.value(1) # Always make sure the GPS is on. + timestamp = unix_timestamp() + gnrmc = read_gps_uart('$GNRMC') + gngga = read_gps_uart('$GNGGA') + p = Position(gnrmc.valid, gnrmc.latitude, gnrmc.longitude, gnrmc.altitude, gnrmc.speed[0], gngga.satellites_in_use, gngga.hdop, timestamp, (gnrmc.timestamp, gnrmc.date), gnrmc.course) + + # Set the clock if it's time. + if interval_tracker.check('ntp_sync'): + initialize_rtc(p) + logger('Updated time', source='GPS') + + return p diff --git a/src/lib/helpers.py b/src/lib/helpers.py index 73ca08d..01fcfc3 100644 --- a/src/lib/helpers.py +++ b/src/lib/helpers.py @@ -1,4 +1,4 @@ -def int_or_none(value): +def type_or_none(value, type_cls): if value is None: return None - return int(value) + return type_cls(value) diff --git a/src/lib/interval_tracker.py b/src/lib/interval_tracker.py index 7d07b38..94110e2 100644 --- a/src/lib/interval_tracker.py +++ b/src/lib/interval_tracker.py @@ -18,5 +18,7 @@ class IntervalTracker: interval_tracker = IntervalTracker( wifi_scan=INTERVAL_WIFI_SCAN, - ntp_sync=INTERVAL_NTP_SYNC + ntp_sync=INTERVAL_NTP_SYNC, + active_position_send=INTERVAL_ACTIVE_POSITION_SEND, + clock_drift_comp=INTERVAL_CLOCK_DRIFT_COMP ) diff --git a/src/lib/led.py b/src/lib/led.py new file mode 100644 index 0000000..fda183c --- /dev/null +++ b/src/lib/led.py @@ -0,0 +1,14 @@ +from machine import Pin + +from lib.consts import PIN_LED + +led = Pin(PIN_LED, Pin.OUT) + + +def led_on(): + led_off() + led.value(1) + + +def led_off(): + led.value(0) diff --git a/src/lib/networking/wifi.py b/src/lib/networking/wifi.py index 9a7cd72..95bd8fe 100644 --- a/src/lib/networking/wifi.py +++ b/src/lib/networking/wifi.py @@ -10,13 +10,17 @@ class WifiMananger: _wlan = None def activate(self): - assert self._wlan is None + if self._wlan is not None: + raise Exception("Already activated") self._wlan = network.WLAN(network.STA_IF) self._wlan.active(True) - def connect(self): - assert not self._wlan.isconnected() + def disconnect(self): + self._wlan.disconnect() + def connect(self): + if self._wlan.isconnected(): + raise Exception("Already connected") logger(f'Scanning', source='WIFI') found = False for item in self.scan(): @@ -49,11 +53,20 @@ class WifiMananger: def ifconfig(self): return self._wlan.ifconfig() + def signal_strength(self): + for item in self.scan(): + if item[0].decode() == WIFI_SSID: + return item[3] + def config(self, value: str): return self._wlan.config(value) def scan(self): return self._wlan.scan() + def mac_addr(self): + m = self._wlan.config('mac') + return ':'.join('%02x' % b for b in m) + wifi = WifiMananger() diff --git a/src/lib/traccar/request.py b/src/lib/traccar/request.py index c8931d5..a646463 100644 --- a/src/lib/traccar/request.py +++ b/src/lib/traccar/request.py @@ -1,20 +1,20 @@ from config import DEVICE_ID, TRACCAR_HOST -from lib.helpers import int_or_none -from lib.ttime import timestamp +from lib.helpers import type_or_none +from lib.ttime import unix_timestamp -class CellTowerInfo: +class CellularInfo: def __init__(self, mcc: int, mnc: int, lac: int, cell_id: int, signal_strength: int): if not isinstance(mcc, int): - raise ValueError("mcc must be an integer") + raise ValueError(f"mcc must be an integer, not {type(mcc)}") if not isinstance(mnc, int): - raise ValueError("mnc must be an integer") + raise ValueError(f"mnc must be an integer, not {type(mnc)}") if not isinstance(lac, int): - raise ValueError("lac must be an integer") + raise ValueError(f"lac must be an integer, not {type(lac)}") if not isinstance(cell_id, int): - raise ValueError("cell_id must be an integer") + raise ValueError(f"cell_id must be an integer, not {type(cell_id)}") if not isinstance(signal_strength, int): - raise ValueError("signal_strength must be an integer") + raise ValueError(f"signal_strength must be an integer, not {type(signal_strength)}") self.mcc = mcc self.mnc = mnc @@ -26,56 +26,59 @@ class CellTowerInfo: class WifiInfo: def __init__(self, mac_addr: str, signal_strength: int): if not isinstance(mac_addr, str): - raise ValueError("mac_addr must be a string") + raise ValueError(f"mac_addr must be a string, not {type(mac_addr)}") if not isinstance(signal_strength, int): - raise ValueError("signal_strength must be an integer") + raise ValueError(f"signal_strength must be an integer, not {type(signal_strength)}") self.mac_addr = mac_addr self.signal_strength = signal_strength class TraccarGetRequest: - # https://www.traccar.org/osmand/ + """ + https://www.traccar.org/osmand/ + """ + def __init__(self, timestamp: int, lat: float, lon: float, loc_valid: bool = True, - cell: CellTowerInfo = None, wifi: WifiInfo = None, speed: int = None, heading: int = None, altitude: int = None, - accuracy: int = None, hdop: int = None, custom: dict = None): + cell: CellularInfo = None, wifi: WifiInfo = None, speed: int = None, heading: int = None, altitude: int = None, + accuracy: int = None, hdop: float = None, custom: dict = None): if not isinstance(timestamp, (int, float)): - raise ValueError("timestamp must be an integer") + raise ValueError(f"timestamp must be an integer, not {type(timestamp)}") if not isinstance(lat, (float, int)): - raise ValueError("lat must be a float") + raise ValueError(f"lat must be a float, not {type(lat)}") if not isinstance(lon, (float, int)): - raise ValueError("lon must be a float") + raise ValueError(f"lon must be a float, not {type(lon)}") if not isinstance(loc_valid, bool): - raise ValueError("loc_valid must be a boolean") - if cell is not None and not isinstance(cell, CellTowerInfo): - raise ValueError("cell must be an instance of CellTowerInfo") + raise ValueError(f"loc_valid must be a boolean, not {type(loc_valid)}") + if cell is not None and not isinstance(cell, CellularInfo): + raise ValueError(f"cell must be an instance of CellularInfo, not {type(cell)}") if wifi is not None and not isinstance(wifi, WifiInfo): - raise ValueError("wifi must be an instance of WifiInfo") + raise ValueError(f"wifi must be an instance of WifiInfo, not {type(wifi)}") if speed is not None and not isinstance(speed, (int, float)): - raise ValueError("speed must be an integer") + raise ValueError(f"speed must be an integer, not {type(speed)}") if heading is not None and not isinstance(heading, (int, float)): - raise ValueError("heading must be an integer") + raise ValueError(f"heading must be an integer, not {type(heading)}") if altitude is not None and not isinstance(altitude, (int, float)): - raise ValueError("altitude must be an integer") + raise ValueError(f"altitude must be an integer, not {type(altitude)}") if accuracy is not None and not isinstance(accuracy, (int, float)): - raise ValueError("accuracy must be an integer") - if hdop is not None and not isinstance(hdop, (int, float)): - raise ValueError("hdop must be an integer") + raise ValueError(f"accuracy must be an integer, not {type(accuracy)}") + if hdop is not None and not isinstance(hdop, float): + raise ValueError(f"hdop must be an integer, not {type(hdop)}") if custom is not None and not isinstance(custom, dict): - raise ValueError("custom must be a dict") + raise ValueError(f"custom must be a dict, not {type(custom)}") self.deviceid = DEVICE_ID self.timestamp = int(timestamp) self.lat = float(lat) self.lon = float(lon) self.loc_valid = loc_valid - self.cell = cell - self.wifi = wifi - self.speed = int_or_none(speed) - self.heading = int_or_none(heading) - self.altitude = int_or_none(altitude) - self.accuracy = int_or_none(accuracy) - self.hdop = int_or_none(hdop) + self.cell = cell # TODO: serialize + self.wifi = f'{wifi.mac_addr},{wifi.signal_strength}' + self.speed = type_or_none(speed, int) + self.heading = type_or_none(heading, int) + self.altitude = type_or_none(altitude, int) + self.accuracy = type_or_none(accuracy, int) + self.hdop = type_or_none(hdop, float) self.custom = custom self.query = self.build_query() @@ -100,6 +103,6 @@ class TraccarGetRequest: return parameters -class TraccarPingRequest(TraccarGetRequest): +class TraccarPingGetRequest(TraccarGetRequest): def __init__(self): - super().__init__(timestamp=timestamp(), lat=0, lon=0, loc_valid=False, custom={'ping': True}) + super().__init__(timestamp=unix_timestamp(), lat=0, lon=0, loc_valid=False, custom={'ping': True}) diff --git a/src/lib/traccar/send.py b/src/lib/traccar/send.py new file mode 100644 index 0000000..f55b5ed --- /dev/null +++ b/src/lib/traccar/send.py @@ -0,0 +1,48 @@ +import gc + +import urequests as requests + +from lib.gps.read import get_position +from lib.led import led_on, led_off +from lib.logging import LogLevel, logger +from lib.networking.wifi import wifi +from lib.traccar.request import TraccarGetRequest, WifiInfo + + +def send_to_traccar(event: TraccarGetRequest): + led_on() + params = ' '.join(['='.join((x, str(y))) for x, y in event.query]) + # TODO: determine cell or wifi here + gc.collect() + r = requests.post(event.request_url) + if r.status_code != 200: + logger(f'{params} - Failed to send request to traccar: "{r.text}" - Status code: {r.status_code}', level=LogLevel.error, source='NET') + else: + logger(params, source='NET') + led_off() + gc.collect() + + +def assemble_position_message(): + # Get the GPS fix. + position = get_position() + + # Gather connection info + conn_info = WifiInfo(mac_addr=wifi.mac_addr(), signal_strength=wifi.signal_strength()) + # cell_info = CellInfo() + + # Startup ping + return TraccarGetRequest( + timestamp=position.timestamp, + lat=position.latitude, + lon=position.longitude, + loc_valid=position.valid, + wifi=conn_info, + speed=position.speed, + heading=position.course, + altitude=position.altitude, + hdop=position.hdop, + custom={ + 'satellites': position.satellites_in_use + } + ) diff --git a/src/lib/traccar/traccar.py b/src/lib/traccar/traccar.py deleted file mode 100644 index 13db457..0000000 --- a/src/lib/traccar/traccar.py +++ /dev/null @@ -1,16 +0,0 @@ -import gc - -import urequests as requests - -from lib.logging import LogLevel, logger -from lib.traccar.request import TraccarGetRequest - - -def send_to_traccar(event: TraccarGetRequest): - params = ' '.join(['='.join((x, str(y))) for x, y in event.query]) - r = requests.post(event.request_url) - gc.collect() - if r.status_code != 200: - logger(f'{params} - Failed to send request to traccar: {r.text} - Status code: {r.status_code}', level=LogLevel.error, source='NET') - else: - logger(params, source='NET') diff --git a/src/lib/ttime.py b/src/lib/ttime.py index db435f4..9efc374 100644 --- a/src/lib/ttime.py +++ b/src/lib/ttime.py @@ -1,5 +1,42 @@ +import time + +from machine import RTC + from lib.ntp import Ntp +_rtc = RTC() +Ntp.set_datetime_callback(_rtc.datetime) -def timestamp(): + +def initialize_rtc(first_fix): + hour, minute, second = first_fix.timedata[0] + day, month, year = first_fix.timedata[1] + + # Convert the GPS date and time to a struct_time + gps_time = time.mktime((year + 2000, month, day, hour, minute, int(second), 0, 0)) + + # Convert the struct_time to a timestamp in microseconds since the device's epoch + gps_us = gps_time * 1000_000 + + # Get the current time in microseconds since the device's epoch + now_us = time.ticks_us() + + # Block NTP. + Ntp.set_hosts(('0.0.0.0',)) + + # Pass the GPS time and the current time to the rtc_sync function + Ntp.rtc_sync(new_time=(gps_us, now_us)) + + +def unix_timestamp(): return Ntp.time_s(epoch=Ntp.EPOCH_1970, utc=True) + + +def gps_to_unix(gps_time, gps_date): + """ + Literally who knows how to do this + """ + hour, minute, second = gps_time + day, month, year = gps_date + gps_datetime = time.mktime((year - 12, month + 6, day - 12, hour + 6, minute, int(second), 0, 0)) + return int(gps_datetime) diff --git a/src/main.py b/src/main.py index 5932bd3..bcd47cc 100644 --- a/src/main.py +++ b/src/main.py @@ -1,19 +1,22 @@ +import gc import time from lib.interval_tracker import interval_tracker -from lib.logging import logger from lib.networking.wifi import wifi from lib.ntp import Ntp +from lib.traccar.send import send_to_traccar, assemble_position_message from startup import startup startup() while True: + gc.collect() if interval_tracker.check('wifi_scan'): if not wifi.isconnected(): wifi.connect() - if interval_tracker.check('ntp_sync'): - logger('Syncing time', source='NET') - Ntp.rtc_sync() - + if interval_tracker.check('active_position_send'): + send_to_traccar(assemble_position_message()) + if interval_tracker.check('clock_drift_comp'): + Ntp.drift_compensate(Ntp.drift_us()) + gc.collect() time.sleep(1) diff --git a/src/startup.py b/src/startup.py index 34d9ebc..dde8961 100644 --- a/src/startup.py +++ b/src/startup.py @@ -1,44 +1,50 @@ -from machine import RTC - from config import * +from lib.gps.read import gps_power_pin, get_position +from lib.led import led_on, led_off from lib.logging import logger from lib.networking.wifi import wifi -from lib.ntp import Ntp -from lib.traccar.request import TraccarPingRequest -from lib.traccar.traccar import send_to_traccar -from lib.ttime import timestamp +from lib.traccar.send import send_to_traccar, assemble_position_message +from lib.ttime import unix_timestamp, initialize_rtc def startup(): + led_on() print('Freematics Micropython Edition') print('https://git.evulid.cc/cyberes/freematics-firmware_v5-micropython') # Device info print('==========') validate_config() - logger(f'Server: {TRACCAR_HOST}', source='TRACCAR') logger(f'Device ID: {DEVICE_ID}', source='TRACCAR') + logger(f'Server: {TRACCAR_HOST}', source='TRACCAR') print('==========') - # WIFI + # Start the GPS and let it get itself sorted out while we do other things. + gps_power_pin.value(1) + + # Activate wifi but don't connect yet. wifi.activate() + if wifi.isconnected(): + wifi.disconnect() + logger('Disconnected from existing network', source='WIFI') wifi.connect() - # NTP/time - logger('Syncing time', source='NET') - _initialize_rtc() - logger(f'Current time: {timestamp()}') + # GPS + logger('Getting initial fix', source='GPS') + position = get_position() - # Startup ping - logger('Sending startup ping', source='NET') - send_to_traccar(TraccarPingRequest()) + # Time. + initialize_rtc(position) + logger(f'Current time: {unix_timestamp()}') + # We are fully initalized so we can turn off the LED. + led_off() -def _initialize_rtc(): - _rtc = RTC() - Ntp.set_datetime_callback(_rtc.datetime) - Ntp.set_hosts(('0.pool.ntp.org', '1.pool.ntp.org', '2.pool.ntp.org')) - Ntp.rtc_sync() + logger('Startup complete!') + print('====================') + + # Send the first message + send_to_traccar(assemble_position_message()) def validate_config(): diff --git a/test.py b/test.py new file mode 100644 index 0000000..28c2d0c --- /dev/null +++ b/test.py @@ -0,0 +1,11 @@ +import time + + +def gps_to_unix(gps_time, gps_date): + hour, minute, second = gps_time + day, month, year = gps_date + gps_datetime = time.mktime((year - 12, month + 6, day - 12, hour, minute, int(second), 0, 0)) + return gps_datetime + + +print(gps_to_unix([8, 43, 15.4], (29, 6, 20)))