Skip to content

Commit

Permalink
update examples to use kwargs
Browse files Browse the repository at this point in the history
  • Loading branch information
semuadmin committed May 10, 2024
1 parent ad41fca commit 68b8ee5
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 88 deletions.
38 changes: 25 additions & 13 deletions examples/gpxtracker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""
Simple CLI utility which creates a GPX track file
from a binary NMEA dump.
from a binary NMEA dump. Dump must contain NMEA GGA messages.
Dump must contain NMEA GGA messages.
Usage:
python3 gpxtracker.py infile="pygpsdata.log" outdir="."
There are a number of free online GPX viewers
e.g. https://gpx-viewer.com/view
Expand All @@ -14,11 +16,15 @@
@author: semuadmin
"""

# pylint: disable=consider-using-with

import os
from datetime import datetime
from sys import argv
from time import strftime
from pynmeagps.nmeareader import NMEAReader

import pynmeagps.exceptions as nme
from pynmeagps.nmeareader import NMEAReader

XML_HDR = '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'

Expand Down Expand Up @@ -79,7 +85,7 @@ def reader(self, validate=False):

self.write_gpx_hdr()

for (_, msg) in self._nmeareader: # invokes iterator method
for _, msg in self._nmeareader: # invokes iterator method
try:
if msg.msgID == "GGA":
dat = datetime.now()
Expand Down Expand Up @@ -126,7 +132,7 @@ def write_gpx_hdr(self):

timestamp = strftime("%Y%m%d%H%M%S")
self._trkfname = os.path.join(self._outdir, f"gpxtrack-{timestamp}.gpx")
self._trkfile = open(self._trkfname, "a")
self._trkfile = open(self._trkfname, "a", encoding="utf-8")

date = datetime.now().isoformat() + "Z"
gpxtrack = (
Expand Down Expand Up @@ -187,16 +193,22 @@ def write_gpx_tlr(self):
self._trkfile.close()


if __name__ == "__main__":
def main(**kwargs):
"""
Main routine.
"""

print("NMEA datalog to GPX file converter\n")
infilep = input("Enter input NMEA datalog file: ").strip('"')
outdirp = input("Enter output directory: ").strip('"')
# infilep = "C:\\Users\\username\\Downloads\\pygpsdata-test.log"
# outdirp = "C:\\Users\\username\\Downloads"
tkr = NMEATracker(infilep, outdirp)
print(f"\nProcessing file {infilep}...")
infile = kwargs.get("infile", "pygpsdata-test.log")
outdir = kwargs.get("outdir", ".")
print("NMEA datalog to GPX file converter")
tkr = NMEATracker(infile, outdir)
print(f"\nProcessing file {infile}...")
tkr.open()
tkr.reader()
tkr.close()
print("\nOperation Complete")


if __name__ == "__main__":

main(**dict(arg.split("=") for arg in argv[1:]))
40 changes: 22 additions & 18 deletions examples/nmeafile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
NMEAMessage logfile reader using the
NMEAReader iterator functions and an external error handler.
Usage:
python3 nmeafile.py filename="nmeadata.log"
Created on 7 Mar 2021
@author: semuadmin
"""

from sys import argv

from pynmeagps.nmeareader import NMEAReader


Expand All @@ -20,29 +26,27 @@ def errhandler(err):
print(f"\nERROR: {err}\n")


def read(stream, errorhandler):
def main(**kwargs):
"""
Reads and parses UBX message data from stream.
Main routine.
"""
# pylint: disable=unused-variable

msgcount = 0

nmr = NMEAReader(
stream, nmeaonly=False, quitonerror=False, errorhandler=errorhandler
)
for raw, parsed_data in nmr:
print(parsed_data)
msgcount += 1
filename = kwargs.get("filename", "nmeadata.log")

print(f"\n{msgcount} messages read.\n")
count = 0
print(f"\nOpening file {filename}...\n")
with open(filename, "rb") as stream:
nmr = NMEAReader(
stream, nmeaonly=False, quitonerror=False, errorhandler=errhandler
)
for raw, parsed_data in nmr:
print(parsed_data)
count += 1

print(f"\n{count} messages read.\n")
print("\nProcessing Complete")


if __name__ == "__main__":
print("\nEnter fully qualified name of file containing raw NMEA data: ", end="")
filename = input().strip('"')

print(f"\nOpening file {filename}...\n")
with open(filename, "rb") as fstream:
read(fstream, errhandler)
print("\nProcessing Complete")
main(**dict(arg.split("=") for arg in argv[1:]))
33 changes: 19 additions & 14 deletions examples/nmeapoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
"concurrently" using threads and queues. This represents a useful
generic pattern for many end user applications.
Usage:
python3 nmeapoller.py port=/dev/ttyACM0 baudrate=38400 timeout=3
It implements two threads which run concurrently:
1) an I/O thread which continuously reads NMEA data from the
receiver and sends any queued outbound command or poll messages.
Expand All @@ -24,11 +28,10 @@
:copyright: SEMU Consulting © 2021
:license: BSD 3-Clause
"""
# pylint: disable=invalid-name

from queue import Queue
from sys import platform
from threading import Event, Lock, Thread
from sys import argv
from threading import Event, Thread
from time import sleep

from serial import Serial
Expand Down Expand Up @@ -83,21 +86,18 @@ def process_data(queue: Queue, stop: Event):
queue.task_done()


if __name__ == "__main__":
# set port, baudrate and timeout to suit your device configuration
if platform == "win32": # Windows
port = "COM13"
elif platform == "darwin": # MacOS
port = "/dev/tty.usbmodem1101"
else: # Linux
port = "/dev/ttyACM1"
baudrate = 38400
timeout = 0.1
def main(**kwargs):
"""
Main routine.
"""

port = kwargs.get("serport", "/dev/ttyACM0")
baudrate = int(kwargs.get("baudrate", 38400))
timeout = float(kwargs.get("timeout", 3))

with Serial(port, baudrate, timeout=timeout) as serial_stream:
nmeareader = NMEAReader(serial_stream)

serial_lock = Lock()
read_queue = Queue()
send_queue = Queue()
stop_event = Event()
Expand Down Expand Up @@ -149,3 +149,8 @@ def process_data(queue: Queue, stop: Event):
io_thread.join()
process_thread.join()
print("\nProcessing complete")


if __name__ == "__main__":

main(**dict(arg.split("=") for arg in argv[1:]))
39 changes: 22 additions & 17 deletions examples/nmeasocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,51 @@
A simple example implementation of a GNSS socket reader
using the pynmeagps.NMEAReader iterator functions.
Usage:
python3 nmeasocket.py ipaddress=127.0.0.1 ipport=50012
Created on 05 May 2022
@author: semuadmin
"""

import socket
from sys import argv
from datetime import datetime

from pynmeagps.nmeareader import NMEAReader


def read(stream: socket.socket):
def main(**kwargs):
"""
Reads and parses NMEA message from socket stream.
"""

msgcount = 0
count = 0
start = datetime.now()

nmr = NMEAReader(
stream,
)
ipaddress = kwargs.get("ipaddress", "localhost")
ipport = int(kwargs.get("ipport", 50012))

try:
for (_, parsed_data) in nmr:
print(parsed_data)
msgcount += 1
print(f"Opening socket {ipaddress}:{ipport}...")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as stream:
stream.connect((ipaddress, ipport))
print("Starting NMEA reader...")
nmr = NMEAReader(stream)
for _, parsed_data in nmr:
print(parsed_data)
count += 1
except KeyboardInterrupt:
dur = datetime.now() - start
secs = dur.seconds + dur.microseconds / 1e6
print("Session terminated by user")
print(
f"{msgcount:,d} messages read in {secs:.2f} seconds:",
f"{msgcount/secs:.2f} msgs per second",
f"{count:,d} messages read in {secs:.2f} seconds:",
f"{count/secs:.2f} msgs per second",
)


if __name__ == "__main__":

SERVER = "localhost"
PORT = 50007

print(f"Opening socket {SERVER}:{PORT}...")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((SERVER, PORT))
read(sock)
main(**dict(arg.split("=") for arg in argv[1:]))
54 changes: 28 additions & 26 deletions examples/webserver/nmeaserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
This illustrates a simple HTTP wrapper around the
pynmneagps NMEAStreamer streaming and parsing example.
Usage:
python3 nmeaserver.py ipaddress=localhost ipport=8080 serport=/dev/ttyACM1 baudrate=38400 timeout=0.1
It displays selected GPS data on a dynamically updated web page
using the native Python 3 http.server library and a RESTful API
implemented by the pynmeagps streaming and parsing service.
Expand All @@ -20,7 +24,7 @@
:license: (c) SEMU Consulting 2021 - BSD 3-Clause License
"""

from sys import platform
from sys import argv
from io import BufferedReader
from threading import Thread, Event
from time import sleep
Expand All @@ -31,9 +35,9 @@
import pynmeagps.exceptions as nme


class NMEAStreamer:
class GNSSStreamer:
"""
NMEAStreamer class.
GNSSStreamer class.
"""

def __init__(self, port, baudrate, timeout, nmea_only=0, validate=1):
Expand Down Expand Up @@ -156,7 +160,7 @@ def _read_thread(self, stopevent):

def set_data(self, parsed_data):
"""
Set GPS data dictionary from RMC, GGA and GSA sentences.
Set GPS data dictionary from NMEA RMC, GGA and GSA sentences.
"""

# print(parsed_data)
Expand All @@ -167,14 +171,14 @@ def set_data(self, parsed_data):
self.gpsdata["longitude"] = parsed_data.lon
self.gpsdata["speed"] = parsed_data.spd
self.gpsdata["track"] = parsed_data.cog
if parsed_data.msgID == "GGA":
elif parsed_data.msgID == "GGA":
self.gpsdata["time"] = str(parsed_data.time)
self.gpsdata["latitude"] = parsed_data.lat
self.gpsdata["longitude"] = parsed_data.lon
self.gpsdata["elevation"] = parsed_data.alt
self.gpsdata["siv"] = parsed_data.numSV
self.gpsdata["hdop"] = parsed_data.HDOP
if parsed_data.msgID == "GSA":
elif parsed_data.msgID == "GSA":
self.gpsdata["fix"] = parsed_data.navMode
self.gpsdata["pdop"] = parsed_data.PDOP
self.gpsdata["hdop"] = parsed_data.HDOP
Expand All @@ -191,31 +195,24 @@ def get_data(self):
return json.dumps(self.gpsdata)


if __name__ == "__main__":
def main(**kwargs):
"""
Main routine.
"""

ADDRESS = "localhost"
TCPPORT = 8080
# set port, baudrate and timeout to suit your device configuration
if platform == "win32": # Windows
port = "COM13"
elif platform == "darwin": # MacOS
port = "/dev/tty.usbmodem14101"
else: # Linux
port = "/dev/ttyACM1"
baudrate = 9600
timeout = 0.1

gps = NMEAStreamer(port, baudrate, timeout)
httpd = GPSHTTPServer((ADDRESS, TCPPORT), GPSHTTPHandler, gps)
ipaddress = kwargs.get("ipaddress", "localhost")
ipport = int(kwargs.get("ipport", 8080))
serport = kwargs.get("serport", "/dev/ttyACM0")
baudrate = int(kwargs.get("baudrate", 38400))
timeout = float(kwargs.get("timeout", 0.1))

gps = GNSSStreamer(serport, baudrate, timeout)
httpd = GPSHTTPServer((ipaddress, ipport), GPSHTTPHandler, gps)

if gps.connect():
gps.start_read_thread()
print(
"\nStarting HTTP Server on http://"
+ ADDRESS
+ ":"
+ str(TCPPORT)
+ " ...\nPress Ctrl-C to terminate.\n"
f"\nStarting HTTP Server on http://{ipaddress}:{ipport}...\nPress Ctrl-C to terminate.\n"
)
httpd_thread = Thread(target=httpd.serve_forever, daemon=True)
httpd_thread.start()
Expand All @@ -231,3 +228,8 @@ def get_data(self):
sleep(2) # wait for shutdown
gps.disconnect()
print("\nProcessing Complete")


if __name__ == "__main__":

main(**dict(arg.split("=") for arg in argv[1:]))

0 comments on commit 68b8ee5

Please sign in to comment.