This commit is contained in:
Cyberes 2024-06-28 22:38:32 -06:00
parent d3cc20a7a5
commit 4aa4654b71
21 changed files with 1585 additions and 3 deletions

6
.gitignore vendored
View File

@ -1,3 +1,7 @@
ESP32_GENERIC-20240602-v1.23.0.bin
config.py
.idea
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
@ -15,7 +19,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
@ -159,4 +162,3 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

8
Doc/Install.md Normal file
View File

@ -0,0 +1,8 @@
https://micropython.org/resources/firmware/ESP32_GENERIC-20240602-v1.23.0.bin
Install firmware:
```shell
esptool.py --port /dev/ttyUSB0 erase_flash
esptool.py --chip esp32 --port /dev/ttyUSB0 write_flash -z 0x1000 ESP32_GENERIC-20240602-v1.23.0.bin
```

2
Doc/Libraries.md Normal file
View File

@ -0,0 +1,2 @@
https://github.com/jposada202020/MicroPython_ICM20948
https://github.com/ekondayan/micropython-ntp

View File

@ -1,3 +1,3 @@
# freematics-firmware_v5-micropython
Freematics firmware v5 ported to Micropython
_Freematics firmware v5 ported to Micropython._

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
esptool==4.7.0
adafruit-ampy==1.1.0
micropython-esp32-stubs==1.23.0

15
src/config.sample.py Normal file
View File

@ -0,0 +1,15 @@
DEVICE_ID = 'test-device'
# Traccar Server
TRACCAR_CONN_MODE = 'http'
TRACCAR_HOST = 'https://traccar.example.com'
# WiFi
WIFI_SSID = 'homenetwork'
WIFI_PASSWORD = 'password123'
WIFI_CONNECT_TIMEOUT = 10
# Intervals
# All in seconds.
INTERVAL_WIFI_SCAN = 10
INTERVAL_NTP_SYNC = 86400

0
src/lib/__init__.py Normal file
View File

4
src/lib/helpers.py Normal file
View File

@ -0,0 +1,4 @@
def int_or_none(value):
if value is None:
return None
return int(value)

View File

@ -0,0 +1,22 @@
import time
from config import *
class IntervalTracker:
def __init__(self, **kwargs):
self.intervals = kwargs
self.last_trigger_times = {name: time.time() for name in self.intervals}
def check(self, name):
if time.time() - self.last_trigger_times[name] >= self.intervals[name]:
self.last_trigger_times[name] = time.time()
return True
else:
return False
interval_tracker = IntervalTracker(
wifi_scan=INTERVAL_WIFI_SCAN,
ntp_sync=INTERVAL_NTP_SYNC
)

12
src/lib/logging.py Normal file
View File

@ -0,0 +1,12 @@
class LogLevel:
debug = 'DEBUG'
info = 'INFO'
warning = 'WARN'
error = 'ERR'
def logger(msg: str, level: LogLevel = LogLevel.info, source: str = None):
s = ''
if source:
s = f'[{source}] - '
print(f'{s}{level} -- {msg}')

View File

View File

@ -0,0 +1,59 @@
import time
import network
from config import WIFI_SSID, WIFI_PASSWORD, WIFI_CONNECT_TIMEOUT
from lib.logging import logger
class WifiMananger:
_wlan = None
def activate(self):
assert self._wlan is None
self._wlan = network.WLAN(network.STA_IF)
self._wlan.active(True)
def connect(self):
assert not self._wlan.isconnected()
logger(f'Scanning', source='WIFI')
found = False
for item in self.scan():
if item[0].decode() == WIFI_SSID:
found = True
if not found:
logger(f'SSID not found: "{WIFI_SSID}"', source='WIFI')
return False
logger(f'Connecting to "{WIFI_SSID}"', source='WIFI')
self._wlan.connect(WIFI_SSID, WIFI_PASSWORD)
for _ in range(WIFI_CONNECT_TIMEOUT):
time.sleep(1)
if wifi.address() != '0.0.0.0':
break
time.sleep(1)
if wifi.address() == '0.0.0.0':
logger(f'Failed to connect to "{WIFI_SSID}"', source='WIFI')
return False
logger(f'Connected to "{wifi.config("ssid")}" with IP {wifi.address()}', source='WIFI')
return True
def isconnected(self):
return self._wlan.isconnected()
def address(self):
ip, netmask, gw, dns = self._wlan.ifconfig()
return ip
def ifconfig(self):
return self._wlan.ifconfig()
def config(self, value: str):
return self._wlan.config(value)
def scan(self):
return self._wlan.scan()
wifi = WifiMananger()

1247
src/lib/ntp.py Normal file

File diff suppressed because it is too large Load Diff

View File

105
src/lib/traccar/request.py Normal file
View File

@ -0,0 +1,105 @@
from config import DEVICE_ID, TRACCAR_HOST
from lib.helpers import int_or_none
from lib.ttime import timestamp
class CellTowerInfo:
def __init__(self, mcc: int, mnc: int, lac: int, cell_id: int, signal_strength: int):
if not isinstance(mcc, int):
raise ValueError("mcc must be an integer")
if not isinstance(mnc, int):
raise ValueError("mnc must be an integer")
if not isinstance(lac, int):
raise ValueError("lac must be an integer")
if not isinstance(cell_id, int):
raise ValueError("cell_id must be an integer")
if not isinstance(signal_strength, int):
raise ValueError("signal_strength must be an integer")
self.mcc = mcc
self.mnc = mnc
self.lac = lac
self.cell_id = cell_id
self.signal_strength = signal_strength
class WifiInfo:
def __init__(self, mac_addr: str, signal_strength: int):
if not isinstance(mac_addr, str):
raise ValueError("mac_addr must be a string")
if not isinstance(signal_strength, int):
raise ValueError("signal_strength must be an integer")
self.mac_addr = mac_addr
self.signal_strength = signal_strength
class TraccarGetRequest:
# https://www.traccar.org/osmand/
def __init__(self, timestamp: int, lat: float, lon: float, loc_valid: bool = True,
cell: CellTowerInfo = None, wifi: WifiInfo = None, speed: int = None, heading: int = None, altitude: int = None,
accuracy: int = None, hdop: int = None, custom: dict = None):
if not isinstance(timestamp, (int, float)):
raise ValueError("timestamp must be an integer")
if not isinstance(lat, (float, int)):
raise ValueError("lat must be a float")
if not isinstance(lon, (float, int)):
raise ValueError("lon must be a float")
if not isinstance(loc_valid, bool):
raise ValueError("loc_valid must be a boolean")
if cell is not None and not isinstance(cell, CellTowerInfo):
raise ValueError("cell must be an instance of CellTowerInfo")
if wifi is not None and not isinstance(wifi, WifiInfo):
raise ValueError("wifi must be an instance of WifiInfo")
if speed is not None and not isinstance(speed, (int, float)):
raise ValueError("speed must be an integer")
if heading is not None and not isinstance(heading, (int, float)):
raise ValueError("heading must be an integer")
if altitude is not None and not isinstance(altitude, (int, float)):
raise ValueError("altitude must be an integer")
if accuracy is not None and not isinstance(accuracy, (int, float)):
raise ValueError("accuracy must be an integer")
if hdop is not None and not isinstance(hdop, (int, float)):
raise ValueError("hdop must be an integer")
if custom is not None and not isinstance(custom, dict):
raise ValueError("custom must be a dict")
self.deviceid = DEVICE_ID
self.timestamp = int(timestamp)
self.lat = float(lat)
self.lon = float(lon)
self.loc_valid = loc_valid
self.cell = cell
self.wifi = wifi
self.speed = int_or_none(speed)
self.heading = int_or_none(heading)
self.altitude = int_or_none(altitude)
self.accuracy = int_or_none(accuracy)
self.hdop = int_or_none(hdop)
self.custom = custom
self.query = self.build_query()
self.request_url = TRACCAR_HOST.strip('/') + '/?' + '&'.join(['='.join((str(x), str(y))) for x, y in self.query])
def build_query(self):
def append_item(key, value):
nonlocal parameters
if value is not None:
if isinstance(value, bool):
value = str(value).lower()
parameters.append((key, value))
parameters = []
stuff = self.__dict__.copy()
stuff.pop('custom')
for k, v in stuff.items():
append_item(k, v)
if self.custom is not None:
for k, v in self.custom.items():
append_item(k, v)
return parameters
class TraccarPingRequest(TraccarGetRequest):
def __init__(self):
super().__init__(timestamp=timestamp(), lat=0, lon=0, loc_valid=False, custom={'ping': True})

View File

@ -0,0 +1,16 @@
import gc
import urequests as requests
from lib.logging import LogLevel, logger
from lib.traccar.request import TraccarGetRequest
def send_to_traccar(event: TraccarGetRequest):
params = ' '.join(['='.join((x, str(y))) for x, y in event.query])
r = requests.post(event.request_url)
gc.collect()
if r.status_code != 200:
logger(f'{params} - Failed to send request to traccar: {r.text} - Status code: {r.status_code}', level=LogLevel.error, source='NET')
else:
logger(params, source='NET')

5
src/lib/ttime.py Normal file
View File

@ -0,0 +1,5 @@
from lib.ntp import Ntp
def timestamp():
return Ntp.time_s(epoch=Ntp.EPOCH_1970, utc=True)

19
src/main.py Normal file
View File

@ -0,0 +1,19 @@
import time
from lib.interval_tracker import interval_tracker
from lib.logging import logger
from lib.networking.wifi import wifi
from lib.ntp import Ntp
from startup import startup
startup()
while True:
if interval_tracker.check('wifi_scan'):
if not wifi.isconnected():
wifi.connect()
if interval_tracker.check('ntp_sync'):
logger('Syncing time', source='NET')
Ntp.rtc_sync()
time.sleep(1)

2
src/reset.py Normal file
View File

@ -0,0 +1,2 @@
import machine
machine.reset()

45
src/startup.py Normal file
View File

@ -0,0 +1,45 @@
from machine import RTC
from config import *
from lib.logging import logger
from lib.networking.wifi import wifi
from lib.ntp import Ntp
from lib.traccar.request import TraccarPingRequest
from lib.traccar.traccar import send_to_traccar
from lib.ttime import timestamp
def startup():
print('Freematics Micropython Edition')
print('https://git.evulid.cc/cyberes/freematics-firmware_v5-micropython')
# Device info
print('==========')
validate_config()
logger(f'Server: {TRACCAR_HOST}', source='TRACCAR')
logger(f'Device ID: {DEVICE_ID}', source='TRACCAR')
print('==========')
# WIFI
wifi.activate()
wifi.connect()
# NTP/time
logger('Syncing time', source='NET')
_initialize_rtc()
logger(f'Current time: {timestamp()}')
# Startup ping
logger('Sending startup ping', source='NET')
send_to_traccar(TraccarPingRequest())
def _initialize_rtc():
_rtc = RTC()
Ntp.set_datetime_callback(_rtc.datetime)
Ntp.set_hosts(('0.pool.ntp.org', '1.pool.ntp.org', '2.pool.ntp.org'))
Ntp.rtc_sync()
def validate_config():
assert TRACCAR_CONN_MODE in ['udp', 'udp_chacha', 'http']

16
upload.sh Executable file
View File

@ -0,0 +1,16 @@
#!/bin/bash
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
echo "Erasing files..."
# TODO: check that /lib exists first
# TODO: read /src and delete files that exist
"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 rmdir /lib
echo "Uploading..."
"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 put "$SCRIPT_DIR/src/" / || exit
echo "Resetting..."
"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 run --no-output "$SCRIPT_DIR/src/reset.py" || exit
#echo "Listing files..."
#"$SCRIPT_DIR"/venv/bin/ampy --port /dev/ttyUSB0 ls