PySerial
PySerialDocs

GPS NMEA Data Processing

Complete guide to processing GPS NMEA data with PySerial - parse coordinates, track location, and build GPS applications in Python

Build powerful GPS applications with PySerial by parsing NMEA sentences, tracking location, and processing satellite data in real-time.

NMEA Protocol Basics

NMEA 0183 is the standard protocol for GPS communication, using ASCII sentences that are easy to parse and understand.

import serial
import re
from datetime import datetime
from typing import Dict, Optional, Tuple

class NMEAParser:
    def __init__(self):
        self.sentence_parsers = {
            'GPGGA': self._parse_gga,
            'GPRMC': self._parse_rmc,
            'GPGSA': self._parse_gsa,
            'GPGSV': self._parse_gsv,
            'GPVTG': self._parse_vtg
        }
        
    def parse_sentence(self, sentence: str) -> Optional[Dict]:
        """Parse NMEA sentence"""
        sentence = sentence.strip()
        
        # Validate checksum
        if not self._validate_checksum(sentence):
            return None
            
        # Extract sentence type and data
        if sentence.startswith('$'):
            parts = sentence[1:].split('*')[0].split(',')
            sentence_type = parts[0]
            
            if sentence_type in self.sentence_parsers:
                try:
                    return self.sentence_parsers[sentence_type](parts)
                except Exception as e:
                    print(f"Error parsing {sentence_type}: {e}")
                    return None
                    
        return None
        
    def _validate_checksum(self, sentence: str) -> bool:
        """Validate NMEA checksum"""
        if '*' not in sentence:
            return False
            
        try:
            message, checksum = sentence.split('*')
            message = message[1:]  # Remove $
            
            calculated_checksum = 0
            for char in message:
                calculated_checksum ^= ord(char)
                
            return f"{calculated_checksum:02X}" == checksum.upper()
            
        except:
            return False
            
    def _safe_float(self, value: str) -> Optional[float]:
        """Safely convert string to float"""
        try:
            return float(value) if value else None
        except ValueError:
            return None
            
    def _safe_int(self, value: str) -> Optional[int]:
        """Safely convert string to int"""
        try:
            return int(value) if value else None
        except ValueError:
            return None

# Example NMEA sentences:
sample_sentences = [
    "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47",
    "$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A",
    "$GPGSA,A,3,04,05,,09,12,,,24,,,,,2.5,1.3,2.1*39",
    "$GPGSV,2,1,07,07,79,048,42,02,51,062,43,26,36,256,42,27,27,138,42*71"
]

parser = NMEAParser()

print("NMEA Sentence Structure:")
print("$TALKER_SENTENCE_TYPE,field1,field2,...,fieldN*CHECKSUM")
print("\nExamples:")

for sentence in sample_sentences:
    print(f"\n📡 {sentence}")
    
    # Show structure
    parts = sentence.split(',')
    print(f"   Talker + Type: {parts[0]}")
    print(f"   Fields: {len(parts)-1}")
    
    # Validate checksum
    is_valid = parser._validate_checksum(sentence)
    print(f"   Checksum: {'✅ Valid' if is_valid else '❌ Invalid'}")
class NMEASentenceTypes:
    """Documentation of common NMEA sentence types"""
    
    @staticmethod
    def describe_sentences():
        sentences = {
            'GGA': {
                'name': 'Global Positioning System Fix Data',
                'description': 'Position fix, quality, satellites, altitude',
                'fields': [
                    'UTC Time (HHMMSS)',
                    'Latitude (DDMM.MMMM)',
                    'N/S Indicator',
                    'Longitude (DDDMM.MMMM)', 
                    'E/W Indicator',
                    'Fix Quality (0=invalid, 1=GPS, 2=DGPS)',
                    'Number of satellites',
                    'Horizontal dilution',
                    'Altitude above sea level',
                    'Altitude units (M=meters)',
                    'Geoid height',
                    'Geoid units (M=meters)',
                    'DGPS time since last update',
                    'DGPS station ID'
                ]
            },
            'RMC': {
                'name': 'Recommended Minimum',
                'description': 'Position, velocity, time, date, variation',
                'fields': [
                    'UTC Time (HHMMSS)',
                    'Status (A=valid, V=invalid)',
                    'Latitude (DDMM.MMMM)',
                    'N/S Indicator',
                    'Longitude (DDDMM.MMMM)',
                    'E/W Indicator', 
                    'Speed over ground (knots)',
                    'Course over ground (degrees)',
                    'UTC Date (DDMMYY)',
                    'Magnetic variation (degrees)',
                    'Variation direction (E/W)',
                    'Mode indicator (A=autonomous, D=DGPS)'
                ]
            },
            'GSA': {
                'name': 'GPS DOP and Active Satellites',
                'description': 'Dilution of precision and active satellites',
                'fields': [
                    'Mode (M=manual, A=automatic)',
                    'Fix type (1=none, 2=2D, 3=3D)',
                    'PRN numbers of satellites used (12 fields)',
                    'PDOP (Position dilution of precision)',
                    'HDOP (Horizontal dilution)',
                    'VDOP (Vertical dilution)'
                ]
            },
            'GSV': {
                'name': 'GPS Satellites in View',
                'description': 'Information about satellites in view',
                'fields': [
                    'Number of sentences for full data',
                    'Sentence number (1-4)',
                    'Total satellites in view',
                    'Satellite 1 PRN number',
                    'Satellite 1 elevation (degrees)',
                    'Satellite 1 azimuth (degrees)',
                    'Satellite 1 SNR (dB-Hz)',
                    '... (repeat for up to 4 satellites per sentence)'
                ]
            },
            'VTG': {
                'name': 'Track Made Good and Ground Speed',
                'description': 'Course and speed information',
                'fields': [
                    'Course over ground (degrees true)',
                    'Reference (T=true)',
                    'Course over ground (degrees magnetic)',
                    'Reference (M=magnetic)',
                    'Speed over ground (knots)',
                    'Units (N=knots)',
                    'Speed over ground (km/h)',
                    'Units (K=km/h)',
                    'Mode indicator'
                ]
            }
        }
        
        print("Common NMEA 0183 Sentences:")
        print("=" * 50)
        
        for code, info in sentences.items():
            print(f"\n{code} - {info['name']}")
            print(f"Purpose: {info['description']}")
            print("Fields:")
            for i, field in enumerate(info['fields'], 1):
                print(f"  {i:2d}. {field}")
                
        return sentences

# Show sentence documentation
NMEASentenceTypes.describe_sentences()
def parse_coordinate(coord_str: str, direction: str) -> Optional[float]:
    """Parse NMEA coordinate format (DDMM.MMMM) to decimal degrees"""
    if not coord_str or not direction:
        return None
        
    try:
        # Handle latitude (DDMM.MMMM) and longitude (DDDMM.MMMM)
        if len(coord_str) >= 7:  # Minimum length for coordinate
            if '.' in coord_str:
                # Find decimal point
                dot_pos = coord_str.index('.')
                
                if dot_pos >= 4:  # Longitude format DDDMM.MMMM
                    degrees = int(coord_str[:dot_pos-2])
                    minutes = float(coord_str[dot_pos-2:])
                else:  # Latitude format DDMM.MMMM
                    degrees = int(coord_str[:dot_pos-2])
                    minutes = float(coord_str[dot_pos-2:])
                
                decimal_degrees = degrees + minutes / 60.0
                
                # Apply direction
                if direction in ['S', 'W']:
                    decimal_degrees = -decimal_degrees
                    
                return decimal_degrees
                
    except (ValueError, IndexError):
        pass
        
    return None

def parse_time(time_str: str) -> Optional[datetime]:
    """Parse NMEA time format (HHMMSS or HHMMSS.SSS)"""
    if not time_str or len(time_str) < 6:
        return None
        
    try:
        # Handle fractional seconds
        if '.' in time_str:
            time_part = time_str.split('.')[0]
            microseconds = int(float('0.' + time_str.split('.')[1]) * 1000000)
        else:
            time_part = time_str
            microseconds = 0
            
        if len(time_part) >= 6:
            hours = int(time_part[:2])
            minutes = int(time_part[2:4]) 
            seconds = int(time_part[4:6])
            
            return datetime.now().replace(
                hour=hours, minute=minutes, second=seconds, microsecond=microseconds
            )
            
    except (ValueError, IndexError):
        pass
        
    return None

def parse_date(date_str: str) -> Optional[datetime]:
    """Parse NMEA date format (DDMMYY)"""
    if not date_str or len(date_str) != 6:
        return None
        
    try:
        day = int(date_str[:2])
        month = int(date_str[2:4])
        year = int(date_str[4:6])
        
        # Convert 2-digit year to 4-digit (assuming 20xx)
        if year < 80:  # Arbitrary cutoff
            year += 2000
        else:
            year += 1900
            
        return datetime(year, month, day)
        
    except (ValueError, IndexError):
        pass
        
    return None

# Test coordinate parsing
test_coords = [
    ("4807.038", "N", "48.117300°N"),
    ("01131.000", "E", "11.516667°E"),
    ("3723.2475", "S", "37.387458°S"),
    ("12158.3416", "W", "121.972360°W")
]

print("Coordinate Parsing Examples:")
print("-" * 40)

for coord, direction, expected in test_coords:
    result = parse_coordinate(coord, direction)
    print(f"{coord} {direction}{result:.6f}° (Expected: {expected})")

# Test time parsing
test_times = ["123519", "235959.999", "000000"]

print("\nTime Parsing Examples:")
print("-" * 40)

for time_str in test_times:
    result = parse_time(time_str)
    print(f"{time_str}{result}")

# Test date parsing
test_dates = ["230394", "010100", "311299"]

print("\nDate Parsing Examples:")
print("-" * 40)

for date_str in test_dates:
    result = parse_date(date_str)
    print(f"{date_str}{result}")

Complete GPS Receiver Class

Build a comprehensive GPS class that handles all common NMEA sentences and provides easy access to position data.

import serial
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable
import math

class GPSReceiver:
    def __init__(self, port: str, baudrate: int = 9600):
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.running = False
        
        # GPS data storage
        self.current_position = {
            'latitude': None,
            'longitude': None,
            'altitude': None,
            'speed': None,
            'course': None,
            'timestamp': None,
            'fix_quality': 0,
            'satellites_used': 0,
            'satellites_in_view': 0,
            'hdop': None,
            'pdop': None,
            'vdop': None
        }
        
        self.satellites = {}  # Satellite information
        self.position_history = []  # Position tracking
        self.callbacks = {'position': [], 'satellite': [], 'raw': []}
        
        # NMEA parser
        self.parser = NMEAParser()
        self._setup_parsers()
        
    def _setup_parsers(self):
        """Setup NMEA sentence parsers"""
        self.parser.sentence_parsers.update({
            'GPGGA': self._parse_gga,
            'GPRMC': self._parse_rmc,
            'GPGSA': self._parse_gsa,
            'GPGSV': self._parse_gsv,
            'GPVTG': self._parse_vtg
        })
        
    def connect(self) -> bool:
        """Connect to GPS receiver"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=1
            )
            print(f"✅ Connected to GPS receiver on {self.port}")
            return True
        except Exception as e:
            print(f"❌ Failed to connect to GPS: {e}")
            return False
            
    def start_reading(self):
        """Start reading GPS data in background thread"""
        if not self.serial:
            return False
            
        self.running = True
        self.reader_thread = threading.Thread(target=self._reader_worker)
        self.reader_thread.daemon = True
        self.reader_thread.start()
        return True
        
    def _reader_worker(self):
        """Background thread for reading GPS data"""
        buffer = b''
        
        while self.running:
            try:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    buffer += data
                    
                    # Process complete sentences
                    while b'\r\n' in buffer:
                        line, buffer = buffer.split(b'\r\n', 1)
                        sentence = line.decode('ascii', errors='ignore')
                        
                        if sentence:
                            # Trigger raw data callbacks
                            self._trigger_callbacks('raw', sentence)
                            
                            # Parse sentence
                            parsed_data = self.parser.parse_sentence(sentence)
                            if parsed_data:
                                self._process_parsed_data(parsed_data)
                                
                time.sleep(0.01)
                
            except Exception as e:
                print(f"GPS reader error: {e}")
                time.sleep(1)
                
    def _process_parsed_data(self, data: Dict):
        """Process parsed NMEA data"""
        data_type = data.get('type')
        
        if data_type == 'position':
            # Update position data
            for key, value in data.items():
                if key in self.current_position:
                    self.current_position[key] = value
                    
            self.current_position['timestamp'] = datetime.utcnow()
            
            # Add to history
            self.position_history.append({
                **self.current_position,
                'timestamp': self.current_position['timestamp']
            })
            
            # Keep history limited
            if len(self.position_history) > 1000:
                self.position_history = self.position_history[-1000:]
                
            # Trigger position callbacks
            self._trigger_callbacks('position', self.current_position)
            
        elif data_type == 'satellites':
            # Update satellite data
            self.satellites.update(data.get('satellites', {}))
            self._trigger_callbacks('satellite', self.satellites)
            
    def _parse_gga(self, fields: List[str]) -> Dict:
        """Parse GGA sentence - Global Positioning System Fix Data"""
        if len(fields) < 15:
            return {}
            
        latitude = parse_coordinate(fields[2], fields[3])
        longitude = parse_coordinate(fields[4], fields[5])
        
        return {
            'type': 'position',
            'latitude': latitude,
            'longitude': longitude,
            'fix_quality': self.parser._safe_int(fields[6]),
            'satellites_used': self.parser._safe_int(fields[7]),
            'hdop': self.parser._safe_float(fields[8]),
            'altitude': self.parser._safe_float(fields[9]),
            'timestamp': parse_time(fields[1])
        }
        
    def _parse_rmc(self, fields: List[str]) -> Dict:
        """Parse RMC sentence - Recommended Minimum"""
        if len(fields) < 12:
            return {}
            
        if fields[2] != 'A':  # Only process if data is valid
            return {}
            
        latitude = parse_coordinate(fields[3], fields[4])
        longitude = parse_coordinate(fields[5], fields[6])
        
        return {
            'type': 'position',
            'latitude': latitude,
            'longitude': longitude,
            'speed': self.parser._safe_float(fields[7]),  # knots
            'course': self.parser._safe_float(fields[8]),  # degrees
            'timestamp': parse_time(fields[1])
        }
        
    def _parse_gsa(self, fields: List[str]) -> Dict:
        """Parse GSA sentence - GPS DOP and Active Satellites"""
        if len(fields) < 18:
            return {}
            
        return {
            'type': 'position',
            'fix_type': self.parser._safe_int(fields[2]),
            'pdop': self.parser._safe_float(fields[15]),
            'hdop': self.parser._safe_float(fields[16]),
            'vdop': self.parser._safe_float(fields[17])
        }
        
    def _parse_gsv(self, fields: List[str]) -> Dict:
        """Parse GSV sentence - GPS Satellites in View"""
        if len(fields) < 8:
            return {}
            
        satellites = {}
        total_sats = self.parser._safe_int(fields[3])
        
        # Process up to 4 satellites per GSV sentence
        for i in range(4):
            base_idx = 4 + i * 4
            if base_idx + 3 < len(fields):
                prn = fields[base_idx]
                elevation = self.parser._safe_int(fields[base_idx + 1])
                azimuth = self.parser._safe_int(fields[base_idx + 2])
                snr = self.parser._safe_int(fields[base_idx + 3])
                
                if prn:
                    satellites[prn] = {
                        'elevation': elevation,
                        'azimuth': azimuth,
                        'snr': snr
                    }
                    
        return {
            'type': 'satellites',
            'total_satellites': total_sats,
            'satellites': satellites
        }
        
    def _parse_vtg(self, fields: List[str]) -> Dict:
        """Parse VTG sentence - Track Made Good and Ground Speed"""
        if len(fields) < 9:
            return {}
            
        return {
            'type': 'position',
            'course': self.parser._safe_float(fields[1]),  # degrees true
            'speed': self.parser._safe_float(fields[7])   # km/h
        }
        
    def add_callback(self, callback_type: str, callback: Callable):
        """Add callback for GPS events"""
        if callback_type in self.callbacks:
            self.callbacks[callback_type].append(callback)
            
    def _trigger_callbacks(self, callback_type: str, data):
        """Trigger callbacks for specific event type"""
        for callback in self.callbacks.get(callback_type, []):
            try:
                callback(data)
            except Exception as e:
                print(f"Callback error: {e}")
                
    def get_position(self) -> Dict:
        """Get current position"""
        return self.current_position.copy()
        
    def get_satellites(self) -> Dict:
        """Get satellite information"""
        return self.satellites.copy()
        
    def is_position_valid(self) -> bool:
        """Check if current position is valid"""
        return (self.current_position['latitude'] is not None and 
                self.current_position['longitude'] is not None and
                self.current_position['fix_quality'] > 0)
                
    def calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
        """Calculate distance between two GPS coordinates (Haversine formula)"""
        R = 6371000  # Earth radius in meters
        
        lat1_rad = math.radians(lat1)
        lat2_rad = math.radians(lat2)
        delta_lat = math.radians(lat2 - lat1)
        delta_lon = math.radians(lon2 - lon1)
        
        a = (math.sin(delta_lat/2) * math.sin(delta_lat/2) + 
             math.cos(lat1_rad) * math.cos(lat2_rad) * 
             math.sin(delta_lon/2) * math.sin(delta_lon/2))
        
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
        distance = R * c
        
        return distance
        
    def get_travel_statistics(self) -> Dict:
        """Get travel statistics from position history"""
        if len(self.position_history) < 2:
            return {'error': 'Insufficient position data'}
            
        stats = {
            'total_distance': 0,
            'max_speed': 0,
            'average_speed': 0,
            'duration': 0,
            'positions_recorded': len(self.position_history)
        }
        
        speeds = []
        
        for i in range(1, len(self.position_history)):
            prev_pos = self.position_history[i-1]
            curr_pos = self.position_history[i]
            
            if (prev_pos['latitude'] and prev_pos['longitude'] and 
                curr_pos['latitude'] and curr_pos['longitude']):
                
                # Calculate distance
                distance = self.calculate_distance(
                    prev_pos['latitude'], prev_pos['longitude'],
                    curr_pos['latitude'], curr_pos['longitude']
                )
                stats['total_distance'] += distance
                
                # Calculate speed if we have timestamps
                if prev_pos['timestamp'] and curr_pos['timestamp']:
                    time_diff = (curr_pos['timestamp'] - prev_pos['timestamp']).total_seconds()
                    if time_diff > 0:
                        speed = distance / time_diff  # m/s
                        speeds.append(speed)
                        stats['max_speed'] = max(stats['max_speed'], speed)
                        
        if speeds:
            stats['average_speed'] = sum(speeds) / len(speeds)
            
        # Calculate total duration
        if (self.position_history[0]['timestamp'] and 
            self.position_history[-1]['timestamp']):
            stats['duration'] = (self.position_history[-1]['timestamp'] - 
                               self.position_history[0]['timestamp']).total_seconds()
                               
        return stats
        
    def stop(self):
        """Stop GPS reading"""
        self.running = False
        if hasattr(self, 'reader_thread'):
            self.reader_thread.join()
        if self.serial:
            self.serial.close()

# Usage example with callbacks
def position_callback(position_data):
    """Handle position updates"""
    if position_data['latitude'] and position_data['longitude']:
        print(f"📍 Position: {position_data['latitude']:.6f}, {position_data['longitude']:.6f}")
        print(f"   Altitude: {position_data['altitude']}m")
        print(f"   Satellites: {position_data['satellites_used']}")
        print(f"   Fix Quality: {position_data['fix_quality']}")

def satellite_callback(satellite_data):
    """Handle satellite updates"""
    satellites_with_signal = {k: v for k, v in satellite_data.items() if v.get('snr')}
    print(f"🛰️  Satellites in view: {len(satellite_data)}, with signal: {len(satellites_with_signal)}")

# Create GPS receiver
gps = GPSReceiver('/dev/ttyUSB0', 9600)

# Add callbacks
gps.add_callback('position', position_callback)
gps.add_callback('satellite', satellite_callback)

# Connect and start reading
if gps.connect():
    gps.start_reading()
    
    try:
        # Wait for valid position
        print("Waiting for GPS fix...")
        while not gps.is_position_valid():
            time.sleep(1)
            
        print("GPS fix acquired!")
        
        # Run for a while to collect data
        time.sleep(60)
        
        # Get travel statistics
        stats = gps.get_travel_statistics()
        print(f"\n📊 Travel Statistics:")
        print(f"   Total distance: {stats.get('total_distance', 0):.2f} meters")
        print(f"   Max speed: {stats.get('max_speed', 0) * 3.6:.2f} km/h")
        print(f"   Average speed: {stats.get('average_speed', 0) * 3.6:.2f} km/h")
        print(f"   Duration: {stats.get('duration', 0):.0f} seconds")
        
    except KeyboardInterrupt:
        print("Stopping GPS receiver...")
        
    finally:
        gps.stop()

Advanced GPS Applications

Real-time Tracking Dashboard

import json
import time
from datetime import datetime
from typing import Dict, List

class GPSTracker:
    def __init__(self, gps_receiver: GPSReceiver):
        self.gps = gps_receiver
        self.track_log = []
        self.waypoints = []
        self.geofences = []
        
        # Add GPS callbacks
        self.gps.add_callback('position', self._log_position)
        
    def _log_position(self, position_data: Dict):
        """Log position to track"""
        if position_data['latitude'] and position_data['longitude']:
            track_point = {
                'timestamp': datetime.utcnow().isoformat(),
                'latitude': position_data['latitude'],
                'longitude': position_data['longitude'],
                'altitude': position_data['altitude'],
                'speed': position_data['speed'],
                'course': position_data['course'],
                'satellites': position_data['satellites_used']
            }
            
            self.track_log.append(track_point)
            
            # Check geofences
            self._check_geofences(track_point)
            
    def add_waypoint(self, name: str, lat: float, lon: float):
        """Add waypoint"""
        waypoint = {
            'name': name,
            'latitude': lat,
            'longitude': lon,
            'timestamp': datetime.utcnow().isoformat()
        }
        self.waypoints.append(waypoint)
        print(f"📍 Added waypoint: {name} ({lat:.6f}, {lon:.6f})")
        
    def add_geofence(self, name: str, lat: float, lon: float, radius: float):
        """Add circular geofence"""
        geofence = {
            'name': name,
            'latitude': lat,
            'longitude': lon,
            'radius': radius,  # meters
            'active': True
        }
        self.geofences.append(geofence)
        print(f"🚧 Added geofence: {name} (radius: {radius}m)")
        
    def _check_geofences(self, position: Dict):
        """Check if position violates any geofences"""
        for geofence in self.geofences:
            if not geofence['active']:
                continue
                
            distance = self.gps.calculate_distance(
                position['latitude'], position['longitude'],
                geofence['latitude'], geofence['longitude']
            )
            
            if distance <= geofence['radius']:
                print(f"🚨 GEOFENCE ALERT: Entered {geofence['name']} (distance: {distance:.1f}m)")
                
    def get_current_status(self) -> Dict:
        """Get current tracking status"""
        position = self.gps.get_position()
        satellites = self.gps.get_satellites()
        
        status = {
            'timestamp': datetime.utcnow().isoformat(),
            'position': position,
            'satellites_in_view': len(satellites),
            'satellites_with_signal': len([s for s in satellites.values() if s.get('snr')]),
            'track_points': len(self.track_log),
            'waypoints': len(self.waypoints),
            'geofences': len([g for g in self.geofences if g['active']])
        }
        
        return status
        
    def export_track_gpx(self, filename: str):
        """Export track as GPX file"""
        if not self.track_log:
            print("No track data to export")
            return
            
        gpx_content = """<?xml version="1.0" encoding="UTF-8"?>
<gpx version="1.1" creator="GPS Tracker">
  <trk>
    <name>GPS Track</name>
    <trkseg>
"""
        
        for point in self.track_log:
            gpx_content += f"""      <trkpt lat="{point['latitude']}" lon="{point['longitude']}">
        <ele>{point.get('altitude', 0)}</ele>
        <time>{point['timestamp']}</time>
      </trkpt>
"""
        
        gpx_content += """    </trkseg>
  </trk>
</gpx>"""
        
        with open(filename, 'w') as f:
            f.write(gpx_content)
            
        print(f"📄 Track exported to {filename} ({len(self.track_log)} points)")
        
    def export_track_json(self, filename: str):
        """Export track as JSON file"""
        track_data = {
            'metadata': {
                'created': datetime.utcnow().isoformat(),
                'points': len(self.track_log),
                'waypoints': len(self.waypoints)
            },
            'track': self.track_log,
            'waypoints': self.waypoints,
            'geofences': self.geofences
        }
        
        with open(filename, 'w') as f:
            json.dump(track_data, f, indent=2)
            
        print(f"📄 Track exported to {filename}")

# Usage example
tracker = GPSTracker(gps)

# Add some waypoints
tracker.add_waypoint("Start Point", 40.7128, -74.0060)  # New York
tracker.add_waypoint("Destination", 40.7589, -73.9851)  # Times Square

# Add geofence
tracker.add_geofence("Home", 40.7128, -74.0060, 100)  # 100m radius

# Monitor for a while
try:
    while True:
        status = tracker.get_current_status()
        print(f"\n📊 Status: {status['track_points']} points, "
              f"{status['satellites_in_view']} satellites")
        time.sleep(5)
        
except KeyboardInterrupt:
    # Export data
    tracker.export_track_gpx("track.gpx")
    tracker.export_track_json("track.json")
import math
from typing import List, Tuple

class GPSNavigation:
    def __init__(self, gps_receiver: GPSReceiver):
        self.gps = gps_receiver
        self.route = []
        self.current_waypoint_index = 0
        
    def set_route(self, waypoints: List[Tuple[float, float, str]]):
        """Set navigation route"""
        self.route = []
        for lat, lon, name in waypoints:
            self.route.append({
                'latitude': lat,
                'longitude': lon,
                'name': name
            })
        self.current_waypoint_index = 0
        print(f"🗺️  Route set with {len(self.route)} waypoints")
        
    def get_navigation_info(self) -> Dict:
        """Get current navigation information"""
        position = self.gps.get_position()
        
        if not position['latitude'] or not position['longitude']:
            return {'error': 'No GPS fix'}
            
        if not self.route or self.current_waypoint_index >= len(self.route):
            return {'error': 'No route set'}
            
        current_waypoint = self.route[self.current_waypoint_index]
        
        # Calculate distance and bearing to current waypoint
        distance = self.gps.calculate_distance(
            position['latitude'], position['longitude'],
            current_waypoint['latitude'], current_waypoint['longitude']
        )
        
        bearing = self._calculate_bearing(
            position['latitude'], position['longitude'],
            current_waypoint['latitude'], current_waypoint['longitude']
        )
        
        # Calculate course deviation
        current_course = position.get('course', 0) or 0
        course_deviation = self._normalize_angle(bearing - current_course)
        
        # Check if we've reached the waypoint
        if distance < 50:  # 50 meter threshold
            self._advance_waypoint()
            
        nav_info = {
            'current_position': {
                'latitude': position['latitude'],
                'longitude': position['longitude']
            },
            'current_waypoint': current_waypoint,
            'waypoint_index': self.current_waypoint_index + 1,
            'total_waypoints': len(self.route),
            'distance_to_waypoint': distance,
            'bearing_to_waypoint': bearing,
            'current_course': current_course,
            'course_deviation': course_deviation,
            'turn_direction': self._get_turn_direction(course_deviation)
        }
        
        return nav_info
        
    def _calculate_bearing(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
        """Calculate bearing between two GPS coordinates"""
        lat1_rad = math.radians(lat1)
        lat2_rad = math.radians(lat2)
        delta_lon_rad = math.radians(lon2 - lon1)
        
        x = math.sin(delta_lon_rad) * math.cos(lat2_rad)
        y = (math.cos(lat1_rad) * math.sin(lat2_rad) - 
             math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(delta_lon_rad))
             
        bearing_rad = math.atan2(x, y)
        bearing_deg = math.degrees(bearing_rad)
        
        return (bearing_deg + 360) % 360
        
    def _normalize_angle(self, angle: float) -> float:
        """Normalize angle to -180 to +180 degrees"""
        while angle > 180:
            angle -= 360
        while angle < -180:
            angle += 360
        return angle
        
    def _get_turn_direction(self, deviation: float) -> str:
        """Get turn direction based on course deviation"""
        if abs(deviation) < 10:
            return "straight"
        elif deviation > 0:
            return "right"
        else:
            return "left"
            
    def _advance_waypoint(self):
        """Advance to next waypoint"""
        self.current_waypoint_index += 1
        if self.current_waypoint_index < len(self.route):
            waypoint = self.route[self.current_waypoint_index]
            print(f"🎯 Reached waypoint! Next: {waypoint['name']}")
        else:
            print("🏁 Route complete!")
            
    def get_route_progress(self) -> Dict:
        """Get overall route progress"""
        if not self.route:
            return {'error': 'No route set'}
            
        total_distance = 0
        completed_distance = 0
        
        # Calculate total route distance
        for i in range(len(self.route) - 1):
            wp1 = self.route[i]
            wp2 = self.route[i + 1]
            distance = self.gps.calculate_distance(
                wp1['latitude'], wp1['longitude'],
                wp2['latitude'], wp2['longitude']
            )
            total_distance += distance
            
            if i < self.current_waypoint_index:
                completed_distance += distance
                
        # Add distance to current waypoint if available
        position = self.gps.get_position()
        if (position['latitude'] and position['longitude'] and 
            self.current_waypoint_index > 0):
            prev_waypoint = self.route[self.current_waypoint_index - 1]
            completed_distance += self.gps.calculate_distance(
                prev_waypoint['latitude'], prev_waypoint['longitude'],
                position['latitude'], position['longitude']
            )
            
        progress_percent = (completed_distance / total_distance * 100) if total_distance > 0 else 0
        
        return {
            'total_distance': total_distance,
            'completed_distance': completed_distance,
            'remaining_distance': total_distance - completed_distance,
            'progress_percent': progress_percent,
            'current_waypoint': self.current_waypoint_index + 1,
            'total_waypoints': len(self.route)
        }

# Navigation example
nav = GPSNavigation(gps)

# Set a route
route_waypoints = [
    (40.7128, -74.0060, "Start - New York"),
    (40.7589, -73.9851, "Times Square"),
    (40.7614, -73.9776, "Central Park South"),
    (40.7829, -73.9654, "End - Upper East Side")
]

nav.set_route(route_waypoints)

# Navigation loop
try:
    while True:
        nav_info = nav.get_navigation_info()
        
        if 'error' not in nav_info:
            print(f"\n🧭 Navigation:")
            print(f"   To: {nav_info['current_waypoint']['name']}")
            print(f"   Distance: {nav_info['distance_to_waypoint']:.0f}m")
            print(f"   Bearing: {nav_info['bearing_to_waypoint']:.0f}°")
            print(f"   Turn: {nav_info['turn_direction']}")
            
            progress = nav.get_route_progress()
            print(f"   Progress: {progress['progress_percent']:.1f}%")
            
        time.sleep(2)
        
except KeyboardInterrupt:
    print("Navigation stopped")

GPS Data Visualization

try:
    import matplotlib.pyplot as plt
    import numpy as np
    
    class GPSVisualizer:
        def __init__(self, gps_receiver: GPSReceiver):
            self.gps = gps_receiver
            
        def plot_track(self, track_data: List[Dict]):
            """Plot GPS track"""
            if not track_data:
                print("No track data to plot")
                return
                
            lats = [p['latitude'] for p in track_data if p['latitude']]
            lons = [p['longitude'] for p in track_data if p['longitude']]
            
            if not lats or not lons:
                print("No valid GPS coordinates to plot")
                return
                
            plt.figure(figsize=(12, 8))
            
            # Plot track
            plt.plot(lons, lats, 'b-', linewidth=2, label='GPS Track')
            plt.plot(lons[0], lats[0], 'go', markersize=10, label='Start')
            plt.plot(lons[-1], lats[-1], 'ro', markersize=10, label='End')
            
            plt.xlabel('Longitude')
            plt.ylabel('Latitude')
            plt.title('GPS Track')
            plt.legend()
            plt.grid(True, alpha=0.3)
            plt.axis('equal')
            
            plt.tight_layout()
            plt.show()
            
        def plot_satellite_sky(self, satellite_data: Dict):
            """Plot satellite positions in sky view"""
            if not satellite_data:
                print("No satellite data to plot")
                return
                
            fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(projection='polar'))
            
            for prn, data in satellite_data.items():
                if data.get('elevation') is not None and data.get('azimuth') is not None:
                    # Convert to polar coordinates
                    elevation = data['elevation']
                    azimuth = math.radians(data['azimuth'])
                    
                    # Plot satellite
                    radius = 90 - elevation  # Invert elevation for polar plot
                    snr = data.get('snr', 0)
                    
                    # Color by SNR
                    if snr > 40:
                        color = 'green'
                    elif snr > 25:
                        color = 'orange'
                    elif snr > 0:
                        color = 'red'
                    else:
                        color = 'gray'
                        
                    ax.plot(azimuth, radius, 'o', color=color, markersize=8)
                    ax.text(azimuth, radius-5, prn, ha='center', va='center', fontsize=8)
                    
            # Configure plot
            ax.set_ylim(0, 90)
            ax.set_yticks([0, 30, 60, 90])
            ax.set_yticklabels(['90°', '60°', '30°', '0°'])  # Elevation labels
            ax.set_theta_zero_location('N')
            ax.set_theta_direction(-1)
            ax.set_title('Satellite Sky View\n(Green=Strong, Orange=Medium, Red=Weak, Gray=No Signal)')
            
            plt.show()
            
        def plot_signal_strength(self, satellite_data: Dict):
            """Plot satellite signal strength"""
            prns = []
            snrs = []
            
            for prn, data in satellite_data.items():
                snr = data.get('snr')
                if snr is not None:
                    prns.append(prn)
                    snrs.append(snr)
                    
            if not prns:
                print("No SNR data to plot")
                return
                
            plt.figure(figsize=(12, 6))
            bars = plt.bar(prns, snrs)
            
            # Color bars by signal strength
            for bar, snr in zip(bars, snrs):
                if snr > 40:
                    bar.set_color('green')
                elif snr > 25:
                    bar.set_color('orange')
                else:
                    bar.set_color('red')
                    
            plt.xlabel('Satellite PRN')
            plt.ylabel('Signal Strength (dB-Hz)')
            plt.title('Satellite Signal Strength')
            plt.grid(True, alpha=0.3)
            
            # Add threshold lines
            plt.axhline(y=25, color='orange', linestyle='--', alpha=0.7, label='Good Signal')
            plt.axhline(y=40, color='green', linestyle='--', alpha=0.7, label='Excellent Signal')
            plt.legend()
            
            plt.tight_layout()
            plt.show()
    
    # Usage
    # visualizer = GPSVisualizer(gps)
    # visualizer.plot_track(tracker.track_log)
    # visualizer.plot_satellite_sky(gps.get_satellites())
    # visualizer.plot_signal_strength(gps.get_satellites())
    
except ImportError:
    print("matplotlib not available for visualization")

GPS NMEA processing with PySerial enables powerful location-based applications. From basic coordinate parsing to advanced navigation systems, these examples provide a solid foundation for GPS projects.

How is this guide?