GPS NMEA Data Processing
Complete guide to processing GPS NMEA data with PySerial - parse coordinates, track location, and build GPS applications in Python
Build powerful GPS applications with PySerial by parsing NMEA sentences, tracking location, and processing satellite data in real-time.
NMEA Protocol Basics
NMEA 0183 is the standard protocol for GPS communication, using ASCII sentences that are easy to parse and understand.
import serial
import re
from datetime import datetime
from typing import Dict, Optional, Tuple
class NMEAParser:
def __init__(self):
self.sentence_parsers = {
'GPGGA': self._parse_gga,
'GPRMC': self._parse_rmc,
'GPGSA': self._parse_gsa,
'GPGSV': self._parse_gsv,
'GPVTG': self._parse_vtg
}
def parse_sentence(self, sentence: str) -> Optional[Dict]:
"""Parse NMEA sentence"""
sentence = sentence.strip()
# Validate checksum
if not self._validate_checksum(sentence):
return None
# Extract sentence type and data
if sentence.startswith('$'):
parts = sentence[1:].split('*')[0].split(',')
sentence_type = parts[0]
if sentence_type in self.sentence_parsers:
try:
return self.sentence_parsers[sentence_type](parts)
except Exception as e:
print(f"Error parsing {sentence_type}: {e}")
return None
return None
def _validate_checksum(self, sentence: str) -> bool:
"""Validate NMEA checksum"""
if '*' not in sentence:
return False
try:
message, checksum = sentence.split('*')
message = message[1:] # Remove $
calculated_checksum = 0
for char in message:
calculated_checksum ^= ord(char)
return f"{calculated_checksum:02X}" == checksum.upper()
except:
return False
def _safe_float(self, value: str) -> Optional[float]:
"""Safely convert string to float"""
try:
return float(value) if value else None
except ValueError:
return None
def _safe_int(self, value: str) -> Optional[int]:
"""Safely convert string to int"""
try:
return int(value) if value else None
except ValueError:
return None
# Example NMEA sentences:
sample_sentences = [
"$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47",
"$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A",
"$GPGSA,A,3,04,05,,09,12,,,24,,,,,2.5,1.3,2.1*39",
"$GPGSV,2,1,07,07,79,048,42,02,51,062,43,26,36,256,42,27,27,138,42*71"
]
parser = NMEAParser()
print("NMEA Sentence Structure:")
print("$TALKER_SENTENCE_TYPE,field1,field2,...,fieldN*CHECKSUM")
print("\nExamples:")
for sentence in sample_sentences:
print(f"\n📡 {sentence}")
# Show structure
parts = sentence.split(',')
print(f" Talker + Type: {parts[0]}")
print(f" Fields: {len(parts)-1}")
# Validate checksum
is_valid = parser._validate_checksum(sentence)
print(f" Checksum: {'✅ Valid' if is_valid else '❌ Invalid'}")
class NMEASentenceTypes:
"""Documentation of common NMEA sentence types"""
@staticmethod
def describe_sentences():
sentences = {
'GGA': {
'name': 'Global Positioning System Fix Data',
'description': 'Position fix, quality, satellites, altitude',
'fields': [
'UTC Time (HHMMSS)',
'Latitude (DDMM.MMMM)',
'N/S Indicator',
'Longitude (DDDMM.MMMM)',
'E/W Indicator',
'Fix Quality (0=invalid, 1=GPS, 2=DGPS)',
'Number of satellites',
'Horizontal dilution',
'Altitude above sea level',
'Altitude units (M=meters)',
'Geoid height',
'Geoid units (M=meters)',
'DGPS time since last update',
'DGPS station ID'
]
},
'RMC': {
'name': 'Recommended Minimum',
'description': 'Position, velocity, time, date, variation',
'fields': [
'UTC Time (HHMMSS)',
'Status (A=valid, V=invalid)',
'Latitude (DDMM.MMMM)',
'N/S Indicator',
'Longitude (DDDMM.MMMM)',
'E/W Indicator',
'Speed over ground (knots)',
'Course over ground (degrees)',
'UTC Date (DDMMYY)',
'Magnetic variation (degrees)',
'Variation direction (E/W)',
'Mode indicator (A=autonomous, D=DGPS)'
]
},
'GSA': {
'name': 'GPS DOP and Active Satellites',
'description': 'Dilution of precision and active satellites',
'fields': [
'Mode (M=manual, A=automatic)',
'Fix type (1=none, 2=2D, 3=3D)',
'PRN numbers of satellites used (12 fields)',
'PDOP (Position dilution of precision)',
'HDOP (Horizontal dilution)',
'VDOP (Vertical dilution)'
]
},
'GSV': {
'name': 'GPS Satellites in View',
'description': 'Information about satellites in view',
'fields': [
'Number of sentences for full data',
'Sentence number (1-4)',
'Total satellites in view',
'Satellite 1 PRN number',
'Satellite 1 elevation (degrees)',
'Satellite 1 azimuth (degrees)',
'Satellite 1 SNR (dB-Hz)',
'... (repeat for up to 4 satellites per sentence)'
]
},
'VTG': {
'name': 'Track Made Good and Ground Speed',
'description': 'Course and speed information',
'fields': [
'Course over ground (degrees true)',
'Reference (T=true)',
'Course over ground (degrees magnetic)',
'Reference (M=magnetic)',
'Speed over ground (knots)',
'Units (N=knots)',
'Speed over ground (km/h)',
'Units (K=km/h)',
'Mode indicator'
]
}
}
print("Common NMEA 0183 Sentences:")
print("=" * 50)
for code, info in sentences.items():
print(f"\n{code} - {info['name']}")
print(f"Purpose: {info['description']}")
print("Fields:")
for i, field in enumerate(info['fields'], 1):
print(f" {i:2d}. {field}")
return sentences
# Show sentence documentation
NMEASentenceTypes.describe_sentences()
def parse_coordinate(coord_str: str, direction: str) -> Optional[float]:
"""Parse NMEA coordinate format (DDMM.MMMM) to decimal degrees"""
if not coord_str or not direction:
return None
try:
# Handle latitude (DDMM.MMMM) and longitude (DDDMM.MMMM)
if len(coord_str) >= 7: # Minimum length for coordinate
if '.' in coord_str:
# Find decimal point
dot_pos = coord_str.index('.')
if dot_pos >= 4: # Longitude format DDDMM.MMMM
degrees = int(coord_str[:dot_pos-2])
minutes = float(coord_str[dot_pos-2:])
else: # Latitude format DDMM.MMMM
degrees = int(coord_str[:dot_pos-2])
minutes = float(coord_str[dot_pos-2:])
decimal_degrees = degrees + minutes / 60.0
# Apply direction
if direction in ['S', 'W']:
decimal_degrees = -decimal_degrees
return decimal_degrees
except (ValueError, IndexError):
pass
return None
def parse_time(time_str: str) -> Optional[datetime]:
"""Parse NMEA time format (HHMMSS or HHMMSS.SSS)"""
if not time_str or len(time_str) < 6:
return None
try:
# Handle fractional seconds
if '.' in time_str:
time_part = time_str.split('.')[0]
microseconds = int(float('0.' + time_str.split('.')[1]) * 1000000)
else:
time_part = time_str
microseconds = 0
if len(time_part) >= 6:
hours = int(time_part[:2])
minutes = int(time_part[2:4])
seconds = int(time_part[4:6])
return datetime.now().replace(
hour=hours, minute=minutes, second=seconds, microsecond=microseconds
)
except (ValueError, IndexError):
pass
return None
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse NMEA date format (DDMMYY)"""
if not date_str or len(date_str) != 6:
return None
try:
day = int(date_str[:2])
month = int(date_str[2:4])
year = int(date_str[4:6])
# Convert 2-digit year to 4-digit (assuming 20xx)
if year < 80: # Arbitrary cutoff
year += 2000
else:
year += 1900
return datetime(year, month, day)
except (ValueError, IndexError):
pass
return None
# Test coordinate parsing
test_coords = [
("4807.038", "N", "48.117300°N"),
("01131.000", "E", "11.516667°E"),
("3723.2475", "S", "37.387458°S"),
("12158.3416", "W", "121.972360°W")
]
print("Coordinate Parsing Examples:")
print("-" * 40)
for coord, direction, expected in test_coords:
result = parse_coordinate(coord, direction)
print(f"{coord} {direction} → {result:.6f}° (Expected: {expected})")
# Test time parsing
test_times = ["123519", "235959.999", "000000"]
print("\nTime Parsing Examples:")
print("-" * 40)
for time_str in test_times:
result = parse_time(time_str)
print(f"{time_str} → {result}")
# Test date parsing
test_dates = ["230394", "010100", "311299"]
print("\nDate Parsing Examples:")
print("-" * 40)
for date_str in test_dates:
result = parse_date(date_str)
print(f"{date_str} → {result}")
Complete GPS Receiver Class
Build a comprehensive GPS class that handles all common NMEA sentences and provides easy access to position data.
import serial
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable
import math
class GPSReceiver:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial = None
self.running = False
# GPS data storage
self.current_position = {
'latitude': None,
'longitude': None,
'altitude': None,
'speed': None,
'course': None,
'timestamp': None,
'fix_quality': 0,
'satellites_used': 0,
'satellites_in_view': 0,
'hdop': None,
'pdop': None,
'vdop': None
}
self.satellites = {} # Satellite information
self.position_history = [] # Position tracking
self.callbacks = {'position': [], 'satellite': [], 'raw': []}
# NMEA parser
self.parser = NMEAParser()
self._setup_parsers()
def _setup_parsers(self):
"""Setup NMEA sentence parsers"""
self.parser.sentence_parsers.update({
'GPGGA': self._parse_gga,
'GPRMC': self._parse_rmc,
'GPGSA': self._parse_gsa,
'GPGSV': self._parse_gsv,
'GPVTG': self._parse_vtg
})
def connect(self) -> bool:
"""Connect to GPS receiver"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1
)
print(f"✅ Connected to GPS receiver on {self.port}")
return True
except Exception as e:
print(f"❌ Failed to connect to GPS: {e}")
return False
def start_reading(self):
"""Start reading GPS data in background thread"""
if not self.serial:
return False
self.running = True
self.reader_thread = threading.Thread(target=self._reader_worker)
self.reader_thread.daemon = True
self.reader_thread.start()
return True
def _reader_worker(self):
"""Background thread for reading GPS data"""
buffer = b''
while self.running:
try:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
buffer += data
# Process complete sentences
while b'\r\n' in buffer:
line, buffer = buffer.split(b'\r\n', 1)
sentence = line.decode('ascii', errors='ignore')
if sentence:
# Trigger raw data callbacks
self._trigger_callbacks('raw', sentence)
# Parse sentence
parsed_data = self.parser.parse_sentence(sentence)
if parsed_data:
self._process_parsed_data(parsed_data)
time.sleep(0.01)
except Exception as e:
print(f"GPS reader error: {e}")
time.sleep(1)
def _process_parsed_data(self, data: Dict):
"""Process parsed NMEA data"""
data_type = data.get('type')
if data_type == 'position':
# Update position data
for key, value in data.items():
if key in self.current_position:
self.current_position[key] = value
self.current_position['timestamp'] = datetime.utcnow()
# Add to history
self.position_history.append({
**self.current_position,
'timestamp': self.current_position['timestamp']
})
# Keep history limited
if len(self.position_history) > 1000:
self.position_history = self.position_history[-1000:]
# Trigger position callbacks
self._trigger_callbacks('position', self.current_position)
elif data_type == 'satellites':
# Update satellite data
self.satellites.update(data.get('satellites', {}))
self._trigger_callbacks('satellite', self.satellites)
def _parse_gga(self, fields: List[str]) -> Dict:
"""Parse GGA sentence - Global Positioning System Fix Data"""
if len(fields) < 15:
return {}
latitude = parse_coordinate(fields[2], fields[3])
longitude = parse_coordinate(fields[4], fields[5])
return {
'type': 'position',
'latitude': latitude,
'longitude': longitude,
'fix_quality': self.parser._safe_int(fields[6]),
'satellites_used': self.parser._safe_int(fields[7]),
'hdop': self.parser._safe_float(fields[8]),
'altitude': self.parser._safe_float(fields[9]),
'timestamp': parse_time(fields[1])
}
def _parse_rmc(self, fields: List[str]) -> Dict:
"""Parse RMC sentence - Recommended Minimum"""
if len(fields) < 12:
return {}
if fields[2] != 'A': # Only process if data is valid
return {}
latitude = parse_coordinate(fields[3], fields[4])
longitude = parse_coordinate(fields[5], fields[6])
return {
'type': 'position',
'latitude': latitude,
'longitude': longitude,
'speed': self.parser._safe_float(fields[7]), # knots
'course': self.parser._safe_float(fields[8]), # degrees
'timestamp': parse_time(fields[1])
}
def _parse_gsa(self, fields: List[str]) -> Dict:
"""Parse GSA sentence - GPS DOP and Active Satellites"""
if len(fields) < 18:
return {}
return {
'type': 'position',
'fix_type': self.parser._safe_int(fields[2]),
'pdop': self.parser._safe_float(fields[15]),
'hdop': self.parser._safe_float(fields[16]),
'vdop': self.parser._safe_float(fields[17])
}
def _parse_gsv(self, fields: List[str]) -> Dict:
"""Parse GSV sentence - GPS Satellites in View"""
if len(fields) < 8:
return {}
satellites = {}
total_sats = self.parser._safe_int(fields[3])
# Process up to 4 satellites per GSV sentence
for i in range(4):
base_idx = 4 + i * 4
if base_idx + 3 < len(fields):
prn = fields[base_idx]
elevation = self.parser._safe_int(fields[base_idx + 1])
azimuth = self.parser._safe_int(fields[base_idx + 2])
snr = self.parser._safe_int(fields[base_idx + 3])
if prn:
satellites[prn] = {
'elevation': elevation,
'azimuth': azimuth,
'snr': snr
}
return {
'type': 'satellites',
'total_satellites': total_sats,
'satellites': satellites
}
def _parse_vtg(self, fields: List[str]) -> Dict:
"""Parse VTG sentence - Track Made Good and Ground Speed"""
if len(fields) < 9:
return {}
return {
'type': 'position',
'course': self.parser._safe_float(fields[1]), # degrees true
'speed': self.parser._safe_float(fields[7]) # km/h
}
def add_callback(self, callback_type: str, callback: Callable):
"""Add callback for GPS events"""
if callback_type in self.callbacks:
self.callbacks[callback_type].append(callback)
def _trigger_callbacks(self, callback_type: str, data):
"""Trigger callbacks for specific event type"""
for callback in self.callbacks.get(callback_type, []):
try:
callback(data)
except Exception as e:
print(f"Callback error: {e}")
def get_position(self) -> Dict:
"""Get current position"""
return self.current_position.copy()
def get_satellites(self) -> Dict:
"""Get satellite information"""
return self.satellites.copy()
def is_position_valid(self) -> bool:
"""Check if current position is valid"""
return (self.current_position['latitude'] is not None and
self.current_position['longitude'] is not None and
self.current_position['fix_quality'] > 0)
def calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two GPS coordinates (Haversine formula)"""
R = 6371000 # Earth radius in meters
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = (math.sin(delta_lat/2) * math.sin(delta_lat/2) +
math.cos(lat1_rad) * math.cos(lat2_rad) *
math.sin(delta_lon/2) * math.sin(delta_lon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = R * c
return distance
def get_travel_statistics(self) -> Dict:
"""Get travel statistics from position history"""
if len(self.position_history) < 2:
return {'error': 'Insufficient position data'}
stats = {
'total_distance': 0,
'max_speed': 0,
'average_speed': 0,
'duration': 0,
'positions_recorded': len(self.position_history)
}
speeds = []
for i in range(1, len(self.position_history)):
prev_pos = self.position_history[i-1]
curr_pos = self.position_history[i]
if (prev_pos['latitude'] and prev_pos['longitude'] and
curr_pos['latitude'] and curr_pos['longitude']):
# Calculate distance
distance = self.calculate_distance(
prev_pos['latitude'], prev_pos['longitude'],
curr_pos['latitude'], curr_pos['longitude']
)
stats['total_distance'] += distance
# Calculate speed if we have timestamps
if prev_pos['timestamp'] and curr_pos['timestamp']:
time_diff = (curr_pos['timestamp'] - prev_pos['timestamp']).total_seconds()
if time_diff > 0:
speed = distance / time_diff # m/s
speeds.append(speed)
stats['max_speed'] = max(stats['max_speed'], speed)
if speeds:
stats['average_speed'] = sum(speeds) / len(speeds)
# Calculate total duration
if (self.position_history[0]['timestamp'] and
self.position_history[-1]['timestamp']):
stats['duration'] = (self.position_history[-1]['timestamp'] -
self.position_history[0]['timestamp']).total_seconds()
return stats
def stop(self):
"""Stop GPS reading"""
self.running = False
if hasattr(self, 'reader_thread'):
self.reader_thread.join()
if self.serial:
self.serial.close()
# Usage example with callbacks
def position_callback(position_data):
"""Handle position updates"""
if position_data['latitude'] and position_data['longitude']:
print(f"📍 Position: {position_data['latitude']:.6f}, {position_data['longitude']:.6f}")
print(f" Altitude: {position_data['altitude']}m")
print(f" Satellites: {position_data['satellites_used']}")
print(f" Fix Quality: {position_data['fix_quality']}")
def satellite_callback(satellite_data):
"""Handle satellite updates"""
satellites_with_signal = {k: v for k, v in satellite_data.items() if v.get('snr')}
print(f"🛰️ Satellites in view: {len(satellite_data)}, with signal: {len(satellites_with_signal)}")
# Create GPS receiver
gps = GPSReceiver('/dev/ttyUSB0', 9600)
# Add callbacks
gps.add_callback('position', position_callback)
gps.add_callback('satellite', satellite_callback)
# Connect and start reading
if gps.connect():
gps.start_reading()
try:
# Wait for valid position
print("Waiting for GPS fix...")
while not gps.is_position_valid():
time.sleep(1)
print("GPS fix acquired!")
# Run for a while to collect data
time.sleep(60)
# Get travel statistics
stats = gps.get_travel_statistics()
print(f"\n📊 Travel Statistics:")
print(f" Total distance: {stats.get('total_distance', 0):.2f} meters")
print(f" Max speed: {stats.get('max_speed', 0) * 3.6:.2f} km/h")
print(f" Average speed: {stats.get('average_speed', 0) * 3.6:.2f} km/h")
print(f" Duration: {stats.get('duration', 0):.0f} seconds")
except KeyboardInterrupt:
print("Stopping GPS receiver...")
finally:
gps.stop()
Advanced GPS Applications
Real-time Tracking Dashboard
import json
import time
from datetime import datetime
from typing import Dict, List
class GPSTracker:
def __init__(self, gps_receiver: GPSReceiver):
self.gps = gps_receiver
self.track_log = []
self.waypoints = []
self.geofences = []
# Add GPS callbacks
self.gps.add_callback('position', self._log_position)
def _log_position(self, position_data: Dict):
"""Log position to track"""
if position_data['latitude'] and position_data['longitude']:
track_point = {
'timestamp': datetime.utcnow().isoformat(),
'latitude': position_data['latitude'],
'longitude': position_data['longitude'],
'altitude': position_data['altitude'],
'speed': position_data['speed'],
'course': position_data['course'],
'satellites': position_data['satellites_used']
}
self.track_log.append(track_point)
# Check geofences
self._check_geofences(track_point)
def add_waypoint(self, name: str, lat: float, lon: float):
"""Add waypoint"""
waypoint = {
'name': name,
'latitude': lat,
'longitude': lon,
'timestamp': datetime.utcnow().isoformat()
}
self.waypoints.append(waypoint)
print(f"📍 Added waypoint: {name} ({lat:.6f}, {lon:.6f})")
def add_geofence(self, name: str, lat: float, lon: float, radius: float):
"""Add circular geofence"""
geofence = {
'name': name,
'latitude': lat,
'longitude': lon,
'radius': radius, # meters
'active': True
}
self.geofences.append(geofence)
print(f"🚧 Added geofence: {name} (radius: {radius}m)")
def _check_geofences(self, position: Dict):
"""Check if position violates any geofences"""
for geofence in self.geofences:
if not geofence['active']:
continue
distance = self.gps.calculate_distance(
position['latitude'], position['longitude'],
geofence['latitude'], geofence['longitude']
)
if distance <= geofence['radius']:
print(f"🚨 GEOFENCE ALERT: Entered {geofence['name']} (distance: {distance:.1f}m)")
def get_current_status(self) -> Dict:
"""Get current tracking status"""
position = self.gps.get_position()
satellites = self.gps.get_satellites()
status = {
'timestamp': datetime.utcnow().isoformat(),
'position': position,
'satellites_in_view': len(satellites),
'satellites_with_signal': len([s for s in satellites.values() if s.get('snr')]),
'track_points': len(self.track_log),
'waypoints': len(self.waypoints),
'geofences': len([g for g in self.geofences if g['active']])
}
return status
def export_track_gpx(self, filename: str):
"""Export track as GPX file"""
if not self.track_log:
print("No track data to export")
return
gpx_content = """<?xml version="1.0" encoding="UTF-8"?>
<gpx version="1.1" creator="GPS Tracker">
<trk>
<name>GPS Track</name>
<trkseg>
"""
for point in self.track_log:
gpx_content += f""" <trkpt lat="{point['latitude']}" lon="{point['longitude']}">
<ele>{point.get('altitude', 0)}</ele>
<time>{point['timestamp']}</time>
</trkpt>
"""
gpx_content += """ </trkseg>
</trk>
</gpx>"""
with open(filename, 'w') as f:
f.write(gpx_content)
print(f"📄 Track exported to {filename} ({len(self.track_log)} points)")
def export_track_json(self, filename: str):
"""Export track as JSON file"""
track_data = {
'metadata': {
'created': datetime.utcnow().isoformat(),
'points': len(self.track_log),
'waypoints': len(self.waypoints)
},
'track': self.track_log,
'waypoints': self.waypoints,
'geofences': self.geofences
}
with open(filename, 'w') as f:
json.dump(track_data, f, indent=2)
print(f"📄 Track exported to {filename}")
# Usage example
tracker = GPSTracker(gps)
# Add some waypoints
tracker.add_waypoint("Start Point", 40.7128, -74.0060) # New York
tracker.add_waypoint("Destination", 40.7589, -73.9851) # Times Square
# Add geofence
tracker.add_geofence("Home", 40.7128, -74.0060, 100) # 100m radius
# Monitor for a while
try:
while True:
status = tracker.get_current_status()
print(f"\n📊 Status: {status['track_points']} points, "
f"{status['satellites_in_view']} satellites")
time.sleep(5)
except KeyboardInterrupt:
# Export data
tracker.export_track_gpx("track.gpx")
tracker.export_track_json("track.json")
Navigation and Routing
import math
from typing import List, Tuple
class GPSNavigation:
def __init__(self, gps_receiver: GPSReceiver):
self.gps = gps_receiver
self.route = []
self.current_waypoint_index = 0
def set_route(self, waypoints: List[Tuple[float, float, str]]):
"""Set navigation route"""
self.route = []
for lat, lon, name in waypoints:
self.route.append({
'latitude': lat,
'longitude': lon,
'name': name
})
self.current_waypoint_index = 0
print(f"🗺️ Route set with {len(self.route)} waypoints")
def get_navigation_info(self) -> Dict:
"""Get current navigation information"""
position = self.gps.get_position()
if not position['latitude'] or not position['longitude']:
return {'error': 'No GPS fix'}
if not self.route or self.current_waypoint_index >= len(self.route):
return {'error': 'No route set'}
current_waypoint = self.route[self.current_waypoint_index]
# Calculate distance and bearing to current waypoint
distance = self.gps.calculate_distance(
position['latitude'], position['longitude'],
current_waypoint['latitude'], current_waypoint['longitude']
)
bearing = self._calculate_bearing(
position['latitude'], position['longitude'],
current_waypoint['latitude'], current_waypoint['longitude']
)
# Calculate course deviation
current_course = position.get('course', 0) or 0
course_deviation = self._normalize_angle(bearing - current_course)
# Check if we've reached the waypoint
if distance < 50: # 50 meter threshold
self._advance_waypoint()
nav_info = {
'current_position': {
'latitude': position['latitude'],
'longitude': position['longitude']
},
'current_waypoint': current_waypoint,
'waypoint_index': self.current_waypoint_index + 1,
'total_waypoints': len(self.route),
'distance_to_waypoint': distance,
'bearing_to_waypoint': bearing,
'current_course': current_course,
'course_deviation': course_deviation,
'turn_direction': self._get_turn_direction(course_deviation)
}
return nav_info
def _calculate_bearing(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate bearing between two GPS coordinates"""
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lon_rad = math.radians(lon2 - lon1)
x = math.sin(delta_lon_rad) * math.cos(lat2_rad)
y = (math.cos(lat1_rad) * math.sin(lat2_rad) -
math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(delta_lon_rad))
bearing_rad = math.atan2(x, y)
bearing_deg = math.degrees(bearing_rad)
return (bearing_deg + 360) % 360
def _normalize_angle(self, angle: float) -> float:
"""Normalize angle to -180 to +180 degrees"""
while angle > 180:
angle -= 360
while angle < -180:
angle += 360
return angle
def _get_turn_direction(self, deviation: float) -> str:
"""Get turn direction based on course deviation"""
if abs(deviation) < 10:
return "straight"
elif deviation > 0:
return "right"
else:
return "left"
def _advance_waypoint(self):
"""Advance to next waypoint"""
self.current_waypoint_index += 1
if self.current_waypoint_index < len(self.route):
waypoint = self.route[self.current_waypoint_index]
print(f"🎯 Reached waypoint! Next: {waypoint['name']}")
else:
print("🏁 Route complete!")
def get_route_progress(self) -> Dict:
"""Get overall route progress"""
if not self.route:
return {'error': 'No route set'}
total_distance = 0
completed_distance = 0
# Calculate total route distance
for i in range(len(self.route) - 1):
wp1 = self.route[i]
wp2 = self.route[i + 1]
distance = self.gps.calculate_distance(
wp1['latitude'], wp1['longitude'],
wp2['latitude'], wp2['longitude']
)
total_distance += distance
if i < self.current_waypoint_index:
completed_distance += distance
# Add distance to current waypoint if available
position = self.gps.get_position()
if (position['latitude'] and position['longitude'] and
self.current_waypoint_index > 0):
prev_waypoint = self.route[self.current_waypoint_index - 1]
completed_distance += self.gps.calculate_distance(
prev_waypoint['latitude'], prev_waypoint['longitude'],
position['latitude'], position['longitude']
)
progress_percent = (completed_distance / total_distance * 100) if total_distance > 0 else 0
return {
'total_distance': total_distance,
'completed_distance': completed_distance,
'remaining_distance': total_distance - completed_distance,
'progress_percent': progress_percent,
'current_waypoint': self.current_waypoint_index + 1,
'total_waypoints': len(self.route)
}
# Navigation example
nav = GPSNavigation(gps)
# Set a route
route_waypoints = [
(40.7128, -74.0060, "Start - New York"),
(40.7589, -73.9851, "Times Square"),
(40.7614, -73.9776, "Central Park South"),
(40.7829, -73.9654, "End - Upper East Side")
]
nav.set_route(route_waypoints)
# Navigation loop
try:
while True:
nav_info = nav.get_navigation_info()
if 'error' not in nav_info:
print(f"\n🧭 Navigation:")
print(f" To: {nav_info['current_waypoint']['name']}")
print(f" Distance: {nav_info['distance_to_waypoint']:.0f}m")
print(f" Bearing: {nav_info['bearing_to_waypoint']:.0f}°")
print(f" Turn: {nav_info['turn_direction']}")
progress = nav.get_route_progress()
print(f" Progress: {progress['progress_percent']:.1f}%")
time.sleep(2)
except KeyboardInterrupt:
print("Navigation stopped")
GPS Data Visualization
try:
import matplotlib.pyplot as plt
import numpy as np
class GPSVisualizer:
def __init__(self, gps_receiver: GPSReceiver):
self.gps = gps_receiver
def plot_track(self, track_data: List[Dict]):
"""Plot GPS track"""
if not track_data:
print("No track data to plot")
return
lats = [p['latitude'] for p in track_data if p['latitude']]
lons = [p['longitude'] for p in track_data if p['longitude']]
if not lats or not lons:
print("No valid GPS coordinates to plot")
return
plt.figure(figsize=(12, 8))
# Plot track
plt.plot(lons, lats, 'b-', linewidth=2, label='GPS Track')
plt.plot(lons[0], lats[0], 'go', markersize=10, label='Start')
plt.plot(lons[-1], lats[-1], 'ro', markersize=10, label='End')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('GPS Track')
plt.legend()
plt.grid(True, alpha=0.3)
plt.axis('equal')
plt.tight_layout()
plt.show()
def plot_satellite_sky(self, satellite_data: Dict):
"""Plot satellite positions in sky view"""
if not satellite_data:
print("No satellite data to plot")
return
fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(projection='polar'))
for prn, data in satellite_data.items():
if data.get('elevation') is not None and data.get('azimuth') is not None:
# Convert to polar coordinates
elevation = data['elevation']
azimuth = math.radians(data['azimuth'])
# Plot satellite
radius = 90 - elevation # Invert elevation for polar plot
snr = data.get('snr', 0)
# Color by SNR
if snr > 40:
color = 'green'
elif snr > 25:
color = 'orange'
elif snr > 0:
color = 'red'
else:
color = 'gray'
ax.plot(azimuth, radius, 'o', color=color, markersize=8)
ax.text(azimuth, radius-5, prn, ha='center', va='center', fontsize=8)
# Configure plot
ax.set_ylim(0, 90)
ax.set_yticks([0, 30, 60, 90])
ax.set_yticklabels(['90°', '60°', '30°', '0°']) # Elevation labels
ax.set_theta_zero_location('N')
ax.set_theta_direction(-1)
ax.set_title('Satellite Sky View\n(Green=Strong, Orange=Medium, Red=Weak, Gray=No Signal)')
plt.show()
def plot_signal_strength(self, satellite_data: Dict):
"""Plot satellite signal strength"""
prns = []
snrs = []
for prn, data in satellite_data.items():
snr = data.get('snr')
if snr is not None:
prns.append(prn)
snrs.append(snr)
if not prns:
print("No SNR data to plot")
return
plt.figure(figsize=(12, 6))
bars = plt.bar(prns, snrs)
# Color bars by signal strength
for bar, snr in zip(bars, snrs):
if snr > 40:
bar.set_color('green')
elif snr > 25:
bar.set_color('orange')
else:
bar.set_color('red')
plt.xlabel('Satellite PRN')
plt.ylabel('Signal Strength (dB-Hz)')
plt.title('Satellite Signal Strength')
plt.grid(True, alpha=0.3)
# Add threshold lines
plt.axhline(y=25, color='orange', linestyle='--', alpha=0.7, label='Good Signal')
plt.axhline(y=40, color='green', linestyle='--', alpha=0.7, label='Excellent Signal')
plt.legend()
plt.tight_layout()
plt.show()
# Usage
# visualizer = GPSVisualizer(gps)
# visualizer.plot_track(tracker.track_log)
# visualizer.plot_satellite_sky(gps.get_satellites())
# visualizer.plot_signal_strength(gps.get_satellites())
except ImportError:
print("matplotlib not available for visualization")
NMEA Parsing
Parse GPS NMEA sentences and extract location data
GPS Receiver
Complete GPS class with position tracking and callbacks
Real-time Tracking
Build tracking applications with geofences and waypoints
Navigation
Implement turn-by-turn navigation and route planning
GPS NMEA processing with PySerial enables powerful location-based applications. From basic coordinate parsing to advanced navigation systems, these examples provide a solid foundation for GPS projects.
How is this guide?