Source code for pyehicle.preprocessing.map_matching

"""
Map-matching module for pyehicle.

This module provides algorithms to align GPS trajectories with road networks by finding
the most likely paths on actual roads. Map-matching is essential for:
- Correcting GPS errors and drift
- Assigning OpenStreetMap (OSM) way IDs to trajectory points
- Preparing trajectories for road-network-based analysis
- Improving trajectory quality before reconstruction

The module implements two map-matching approaches:
1. **Leuven Algorithm (leuven)**: Hidden Markov Model-based matching using local OSM data
   - Fetches road network from Overpass API
   - Uses AEQD projection for accurate metric calculations
   - Best for offline processing and custom road networks

2. **Valhalla Meili (meili)**: Cloud-based matching using Valhalla routing engine
   - Uses Mapbox or custom Valhalla servers
   - Faster for large trajectories
   - Requires internet connection and API key

Both algorithms return trajectories with coordinates snapped to actual road centerlines.
"""

import os
import shutil
import sys
import hashlib
import math
from typing import Dict, Tuple
import numpy as np
import pandas as pd
import polars as pl

# Prefer osmium; if missing, fall back to osmnx (if present)
try:
    import osmium as osm
    _HAVE_OSMIUM = True
except Exception:
    osm = None
    _HAVE_OSMIUM = False

try:
    import osmnx as ox
    _HAVE_OSMNX = True
except Exception:
    ox = None
    _HAVE_OSMNX = False

from leuvenmapmatching.map.inmem import InMemMap
from leuvenmapmatching.matcher.distance import DistanceMatcher
import requests
from tqdm import tqdm

from pyproj import Transformer

# Overpass disk cache dir
_OVERPASS_CACHE_DIR = os.path.join(os.getcwd(), "overpass_cache")
os.makedirs(_OVERPASS_CACHE_DIR, exist_ok=True)

# AEQD transformer cache (simple bounded dict)
_AEQD_CACHE: Dict[Tuple[float, float, int], Tuple[Transformer, Transformer]] = {}


def _get_aeqd_transformers(cen_lat: float, cen_lon: float, precision: int = 6):
    """Return (forward, inverse) AEQD Transformers centered at (cen_lat, cen_lon).
       Cached by rounding centroid to `precision` decimals."""
    key = (round(cen_lat, precision), round(cen_lon, precision), precision)
    if key in _AEQD_CACHE:
        return _AEQD_CACHE[key]
    proj = f"+proj=aeqd +lat_0={cen_lat:.9f} +lon_0={cen_lon:.9f} +datum=WGS84 +units=m +no_defs"
    fwd = Transformer.from_crs("EPSG:4326", proj, always_xy=True)
    inv = Transformer.from_crs(proj, "EPSG:4326", always_xy=True)
    _AEQD_CACHE[key] = (fwd, inv)
    # keep cache bounded
    if len(_AEQD_CACHE) > 256:
        _AEQD_CACHE.pop(next(iter(_AEQD_CACHE)))
    return fwd, inv


# ----------------------------------------
# More precise osmnx-like is_driveable logic
# (close to osmnx.utils.is_driveable)
# ----------------------------------------
_HIGHWAY_WHITELIST = {
    "motorway", "trunk", "primary", "secondary", "tertiary",
    "unclassified", "residential", "living_street",
    "motorway_link", "trunk_link", "primary_link", "secondary_link", "tertiary_link",
    "service", "road", "track"
}
_HIGHWAY_BLACKLIST = {
    "footway", "pedestrian", "steps", "path", "cycleway", "bridleway",
    "proposed", "construction", "abandoned", "platform"
}
_SERVICE_DENY = {"driveway", "parking_aisle"}
_ACCESS_DENY = {"no", "private", "customers", "permit", "delivery", "destination"}


def _resolve_tag_value(v):
    if v is None:
        return None
    if isinstance(v, (list, tuple)):
        v = v[0]
    return str(v).lower()


def _is_driveable_way(tags: dict) -> bool:
    """Return True if a way is considered driveable (osmnx-like heuristics)."""
    if not tags:
        return False
    hwy = _resolve_tag_value(tags.get("highway"))
    if hwy is None:
        return False
    if hwy in _HIGHWAY_BLACKLIST:
        return False
    if (hwy not in _HIGHWAY_WHITELIST) and (not hwy.isdigit()) and (hwy != "road"):
        return False
    # area=yes not a road
    if _resolve_tag_value(tags.get("area")) == "yes":
        return False
    # service restrictions
    service = _resolve_tag_value(tags.get("service"))
    if service in _SERVICE_DENY:
        return False
    # route ferry excluded
    if _resolve_tag_value(tags.get("route")) == "ferry":
        return False
    # access/motor_vehicle/vehicle restrictions
    for k in ("motor_vehicle", "motorcar", "vehicle", "access"):
        v = _resolve_tag_value(tags.get(k))
        if v in _ACCESS_DENY:
            return False
    # exclude proposed/construction
    if _resolve_tag_value(tags.get("proposed")) == "yes" or _resolve_tag_value(tags.get("construction")) == "yes":
        return False
    return True


# ----------------------------------------
# Osmium handler to collect nodes & driveable ways
# ----------------------------------------
if _HAVE_OSMIUM:
    class WayNodeCollector(osm.SimpleHandler):
        def __init__(self, nodes: dict, ways: dict):
            super().__init__()
            self.nodes = nodes
            self.ways = ways

        def node(self, n):
            # store coordinates if available
            if n.location.valid():
                self.nodes[n.id] = (float(n.location.lat), float(n.location.lon))

        def way(self, w):
            tags = {k: v for k, v in w.tags.items()}
            if not _is_driveable_way(tags):
                return
            node_refs = [nd.ref for nd in w.nodes]
            if len(node_refs) < 2:
                return
            self.ways[w.id] = {"tags": tags, "nodes": node_refs}


# ----------------------------------------
# Overpass helpers: tile split, cache path, download
# ----------------------------------------
def _split_bbox_into_tiles(north: float, south: float, east: float, west: float,
                           max_tile_deg: float = 0.5,
                           overlap_deg: float = 0.05):
    """
    Divide a bounding box into smaller overlapping tiles for efficient OSM data fetching.

    This prevents Overpass API timeouts when requesting large areas by breaking them
    into manageable chunks with overlap to ensure no edge cases are missed.
    """
    if east <= west or north <= south:
        return [(north, south, east, west)]

    lat_spans = max(1, math.ceil((north - south) / max_tile_deg))
    lon_spans = max(1, math.ceil((east - west) / max_tile_deg))

    # Create evenly spaced tile boundaries
    lat_edges = np.linspace(south, north, lat_spans + 1)
    lon_edges = np.linspace(west, east, lon_spans + 1)

    tiles = []
    for i in range(lat_spans):
        s = lat_edges[i]
        n = lat_edges[i + 1]
        # Expand tile boundaries with overlap, but clip to original bbox
        n_exp = min(n + overlap_deg, north)
        s_exp = max(s - overlap_deg, south)

        for j in range(lon_spans):
            w = lon_edges[j]
            e = lon_edges[j + 1]
            e_exp = min(e + overlap_deg, east)
            w_exp = max(w - overlap_deg, west)
            tiles.append((n_exp, s_exp, e_exp, w_exp))
    return tiles


def _overpass_cache_path(north: float, south: float, east: float, west: float, overlap: float):
    key = f"{north:.6f}_{south:.6f}_{east:.6f}_{west:.6f}_{overlap:.6f}"
    h = hashlib.sha256(key.encode("utf8")).hexdigest()
    return os.path.join(_OVERPASS_CACHE_DIR, f"overpass_{h}.osm")


def _download_osm_xml_bbox_cached(north: float, south: float, east: float, west: float,
                                  timeout: int = 180, force_download: bool = False, overlap: float = 0.0):
    """Download OSM XML for bbox with caching on disk per tile."""
    cache_path = _overpass_cache_path(north, south, east, west, overlap)
    if os.path.exists(cache_path) and not force_download:
        return cache_path
    bbox = f"{south},{west},{north},{east}"
    q = f"""
    [out:xml][timeout:{int(timeout)}];
    (
      way["highway"]({bbox});
    );
    (._;>;);
    out body;
    """
    url = "https://overpass-api.de/api/interpreter"
    resp = requests.post(url, data={"data": q}, stream=True, timeout=timeout)
    resp.raise_for_status()
    with open(cache_path, "wb") as f:
        for chunk in resp.iter_content(chunk_size=1 << 20):
            if chunk:
                f.write(chunk)
    return cache_path


# ----------------------------------------
# Main upgraded leuven() function
# ----------------------------------------
[docs] def leuven(df: pd.DataFrame | pl.DataFrame, lat_col: str = 'lat', lon_col: str = 'lon', delete_cache: bool = False, max_dist: float = 0.01, tile_max_deg: float = 0.5, tile_overlap_deg: float = 0.05, cache_overpass: bool = True, return_projected: bool = False, aeqd_precision: int = 6) -> pd.DataFrame | pl.DataFrame: """ Map-match GPS trajectory to road network using Hidden Markov Model algorithm (Leuven). This function aligns noisy GPS points to the most likely path on the actual road network by fetching OpenStreetMap data and applying HMM-based map-matching. It corrects GPS drift, snaps points to road centerlines, and assigns OSM way IDs for road-network-based analysis. The algorithm: 1. Fetches driveable roads from Overpass API (tiled for large areas) 2. Projects coordinates to AEQD (Azimuthal Equidistant) for accurate metric calculations 3. Builds in-memory road network graph from OSM data 4. Applies Hidden Markov Model to find most likely road path 5. Returns trajectory with coordinates snapped to roads This is essential preprocessing for: - Trajectory reconstruction (provides OSM IDs for road identification) - Removing GPS noise and drift - Preparing data for road-network-based routing analysis - Generating training data for machine learning models Parameters ---------- df : pd.DataFrame or pl.DataFrame Input GPS trajectory with at least latitude and longitude columns. Can contain additional columns (all will be preserved). Minimum 2 points required. lat_col : str, default='lat' Name of the latitude column (WGS84 decimal degrees). lon_col : str, default='lon' Name of the longitude column (WGS84 decimal degrees). delete_cache : bool, default=False If True, delete cached Overpass API responses after matching. Useful for saving disk space or forcing fresh data download. Recommended: False (reuse cache). max_dist : float, default=0.01 Maximum distance in kilometers for matching GPS points to road nodes. Points farther than this from any road will be excluded from the result. Typical values: - 0.01 km (10m): High-accuracy GPS, urban areas - 0.05 km (50m): Standard GPS accuracy - 0.1 km (100m): Low-accuracy GPS or sparse road networks tile_max_deg : float, default=0.5 Maximum tile size in degrees for splitting large bounding boxes. Overpass API has request size limits, so large trajectories are automatically tiled. Larger values = fewer API calls but longer response times. Typical: 0.3-0.7 degrees. tile_overlap_deg : float, default=0.05 Overlap between adjacent tiles in degrees to ensure roads crossing tile boundaries are captured. Should be >= typical road segment length. Typical: 0.03-0.1 degrees. cache_overpass : bool, default=True If True, cache Overpass API responses to disk in ./overpass_cache/ for reuse. Dramatically speeds up repeated matching in the same area. Recommended: True. return_projected : bool, default=False If True, return coordinates in AEQD projected meters (x, y) instead of WGS84 degrees (lon, lat). Useful for distance calculations. Typical: False (keep lat/lon). aeqd_precision : int, default=6 Decimal precision for rounding AEQD projection center coordinates when caching transformers. Higher precision = more cache entries. Typical: 5-7. Returns ------- pd.DataFrame or pl.DataFrame Map-matched trajectory with coordinates snapped to road network. Returns same type as input (pandas → pandas, polars → polars). Output contains: - Matched coordinates (lat/lon or x/y depending on return_projected) - All original columns preserved - Points filtered: only those within max_dist of roads are kept - Order preserved: points remain in chronological sequence Examples -------- >>> import pandas as pd >>> import pyehicle as pye >>> >>> # Load raw GPS trajectory >>> df = pd.read_csv('raw_gps.csv') >>> print(f"Raw trajectory: {len(df)} points") >>> >>> # Map-match to OSM roads with default settings >>> matched = pye.preprocessing.leuven(df) >>> print(f"Matched trajectory: {len(matched)} points") >>> >>> # Map-match with custom distance threshold (100m) >>> matched = pye.preprocessing.leuven( ... df, ... max_dist=0.1, # 100 meters ... cache_overpass=True # Reuse cached OSM data ... ) >>> >>> # Use in preprocessing pipeline >>> compressed = pye.preprocessing.spatio_temporal_compress(df) >>> matched = pye.preprocessing.leuven(compressed) # Map-match compressed data >>> filtered = pye.preprocessing.kalman_aeqd_filter(matched) # Apply Kalman filter >>> >>> # Visualize before/after >>> pye.utilities.visualization.multiple( ... [df, matched], ... names=['Raw GPS', 'Map-Matched'], ... show_in_browser=True ... ) Notes ----- **Algorithm: Hidden Markov Model (HMM)** - **States**: Road segments (edges) in the OSM network - **Observations**: GPS points with noise - **Transition probability**: Likelihood of traveling from one road to another - **Emission probability**: Likelihood of observing GPS point given road location - **Viterbi algorithm**: Finds most likely sequence of states (road path) **Data Sources:** - **Overpass API**: Public OSM query service (https://overpass-api.de/) - **Fallback**: OSMnx library if Overpass fails - **Caching**: Responses saved to ./overpass_cache/ as XML files **Dependencies:** - leuvenmapmatching: Core HMM matching library - osmium: Fast OSM XML parsing (preferred) - osmnx: Fallback for OSM data fetching and parsing - pyproj: AEQD projection transformations **Projection: AEQD (Azimuthal Equidistant)** - Centered on trajectory bounding box center - Preserves distances from center point - Converts lat/lon (degrees) to x/y (meters) - Essential for accurate distance-based matching **Performance:** - Time complexity: O(n * m * k) where: - n = trajectory points - m = average candidate roads per point - k = HMM Viterbi path length - For 1000-point trajectory: 30-90 seconds (first run), <5 seconds (cached) - Overpass API calls: ~1-5 tiles for typical city-scale trajectories - Cache significantly speeds up repeated matching in same area **Quality Factors:** - **GPS accuracy**: Better GPS → better matching - **Road network density**: Urban >> rural - **Sampling rate**: Higher frequency → better path inference - **max_dist**: Should match expected GPS error (typically 10-50m) **Limitations:** - Requires internet connection (Overpass API or OSMnx) - May fail in areas with sparse OSM coverage - Computationally intensive for very long trajectories (>10,000 points) - Does not handle GPS outages or tunnels well - Assumes trajectory follows driveable roads (not for off-road vehicles) **Troubleshooting:** - **"No OSM data available"**: Check internet connection, try larger tile_max_deg - **Slow performance**: Enable cache_overpass=True, reduce trajectory size with compression - **Poor matching**: Increase max_dist, check OSM coverage in area, improve GPS quality - **Overpass timeout**: Reduce tile_max_deg (split into smaller tiles) See Also -------- meili : Alternative map-matching using Valhalla routing engine spatio_temporal_compress : Reduce trajectory size before map-matching kalman_aeqd_filter : Apply Kalman filtering after map-matching """ input_is_polars = isinstance(df, pl.DataFrame) if input_is_polars: pdf = df.to_pandas() else: pdf = df.copy() if len(pdf) < 2: raise ValueError("Input must contain at least two points for map matching.") # Extract trajectory coordinates lats = pdf[lat_col].to_numpy(dtype=float) lons = pdf[lon_col].to_numpy(dtype=float) points = list(zip(lats.tolist(), lons.tolist())) # Calculate bounding box for the trajectory north = float(lats.max()) south = float(lats.min()) east = float(lons.max()) west = float(lons.min()) # Build nodes & ways by tiling the bbox and parsing each tile nodes: Dict[int, Tuple[float, float]] = {} ways: Dict[int, Dict] = {} tiles = _split_bbox_into_tiles(north, south, east, west, max_tile_deg=tile_max_deg, overlap_deg=tile_overlap_deg) # Set up AEQD projection centered on trajectory bounding box bbox_cen_lat = (north + south) * 0.5 bbox_cen_lon = (east + west) * 0.5 fwd_global, inv_global = _get_aeqd_transformers(bbox_cen_lat, bbox_cen_lon, precision=aeqd_precision) for (tn, ts, te, tw) in tiles: try: osm_path = _download_osm_xml_bbox_cached(tn, ts, te, tw, overlap=tile_overlap_deg, force_download=not cache_overpass) except Exception as e: # try fallback to osmnx tile if available if _HAVE_OSMNX: try: G = ox.graph_from_bbox(tn, ts, te, tw, network_type="drive", simplify=False) nodes_gdf, edges_gdf = ox.graph_to_gdfs(G, nodes=True, edges=True) for nid, row in nodes_gdf.iterrows(): nodes[nid] = (float(row.geometry.y), float(row.geometry.x)) for idx, row in edges_gdf.iterrows(): geom = row.geometry try: coords = list(geom.coords) except Exception: coords = [] if len(coords) >= 2: wid = int(hash((coords[0], coords[-1], idx)) & 0xFFFFFFFF) node_seq = [] for c in coords: node_id = int(hash(c) & 0x7FFFFFFF) nodes.setdefault(node_id, (float(c[1]), float(c[0]))) node_seq.append(node_id) ways[wid] = {"tags": {}, "nodes": node_seq} continue except Exception: # skip tile continue else: continue if _HAVE_OSMIUM: handler = WayNodeCollector(nodes, ways) try: handler.apply_file(osm_path, locations=True) except Exception: # skip tile on parse error continue else: # if no osmium, we already attempted osmnx fallback above when download failed # try a direct osmnx parse of the file (if osmnx present) if _HAVE_OSMNX: try: # osmnx can read xml file into graph via graph_from_xml? not always; fallback: skip # Instead, use ox.graph_from_bbox fallback above pass except Exception: pass if len(nodes) == 0 or len(ways) == 0: # if nothing parsed, fallback to osmnx full bbox if available if _HAVE_OSMNX: return _leuven_with_osmnx(pdf, lat_col, lon_col, delete_cache, max_dist) else: raise RuntimeError("No OSM data available for bbox and no osmnx fallback available.") # Build mapping from node IDs to indices and extract coordinates node_ids = sorted(nodes.keys()) id_to_idx = {nid: i for i, nid in enumerate(node_ids)} node_coords = np.array([nodes[nid] for nid in node_ids], dtype=float) # lat, lon node_lats = node_coords[:, 0] node_lons = node_coords[:, 1] # Project nodes to AEQD for metric coordinates in meters xs, ys = fwd_global.transform(node_lons, node_lats) # Create dictionary mapping node IDs to projected coordinates node_proj = {nid: (float(ys[i]), float(xs[i])) for i, nid in enumerate(node_ids)} # Build InMemMap with projected nodes map_con = InMemMap("map_con", use_latlon=False, index_edges=True, use_rtree=True) for nid in node_ids: lat_m, lon_m = node_proj[nid] map_con.add_node(nid, (lat_m, lon_m)) # Add edges from ways list for way_id, w in ways.items(): node_sequence = w["nodes"] tags = w.get("tags", {}) oneway = _resolve_tag_value(tags.get("oneway")) in ("yes", "true", "1") for a, b in zip(node_sequence[:-1], node_sequence[1:]): if a not in id_to_idx or b not in id_to_idx: continue try: map_con.add_edge(int(a), int(b)) if not oneway: map_con.add_edge(int(b), int(a)) except Exception: continue # Build DistanceMatcher matcher = DistanceMatcher(map_con, max_dist=max_dist, min_prob_norm=0.5, non_emitting_length_factor=0.75, obs_noise=1, obs_noise_ne=50, dist_noise=50, max_lattice_width=3, non_emitting_states=True) # Run HMM-based map matching matcher.match(points, unique=False, tqdm=tqdm) # Extract matched coordinates from the matcher n_points = len(pdf) if n_points == len(matcher.lattice_best): matches = np.array([matcher.lattice_best[i].edge_m.pi for i in range(n_points)], dtype=float) else: raise Exception("The number of points in the dataframe are more than the number of matched points. " "You can try to increase the max_dist parameter.") matched_y = matches[:, 0] # lat_m in AEQD projection matched_x = matches[:, 1] # lon_m in AEQD projection # If user wants projected coords, return them if return_projected: if input_is_polars: return pl.DataFrame({lon_col: matched_x.tolist(), lat_col: matched_y.tolist()}) else: return pd.DataFrame({lon_col: matched_x.tolist(), lat_col: matched_y.tolist()}) # otherwise inverse-transform from AEQD back to lon/lat degrees using the same bbox AEQD try: lon_deg, lat_deg = inv_global.transform(matched_x, matched_y) if input_is_polars: matched_df = pl.DataFrame({lon_col: lon_deg.tolist(), lat_col: lat_deg.tolist()}) else: matched_df = pd.DataFrame({lon_col: lon_deg.tolist(), lat_col: lat_deg.tolist()}) except Exception: # fallback: return projected meters if inverse transform fails if input_is_polars: matched_df = pl.DataFrame({lon_col: matched_x.tolist(), lat_col: matched_y.tolist()}) else: matched_df = pd.DataFrame({lon_col: matched_x.tolist(), lat_col: matched_y.tolist()}) # cleanup caches if requested (preserve previous behavior and also clear overpass cache) if os.path.isdir("cache") and delete_cache: shutil.rmtree("cache", ignore_errors=True) if delete_cache and os.path.isdir(_OVERPASS_CACHE_DIR): shutil.rmtree(_OVERPASS_CACHE_DIR, ignore_errors=True) return matched_df
# ------------------------------ # Fallback helper (unchanged), still returns degrees # ------------------------------ def _leuven_with_osmnx(pdf: pd.DataFrame, lat_col: str, lon_col: str, delete_cache: bool, max_dist: float): """ Fallback map matching using osmnx library when osmium/Overpass is unavailable. This is the original implementation kept for backward compatibility and as a fallback when the preferred osmium-based method fails. """ lats = pdf[lat_col].to_numpy(dtype=float) lons = pdf[lon_col].to_numpy(dtype=float) points = list(zip(lats.tolist(), lons.tolist())) north = float(lats.max()) south = float(lats.min()) east = float(lons.max()) west = float(lons.min()) try: _graph = ox.graph_from_bbox(north, south, east, west, network_type='drive', simplify=False) except Exception as e: print(e) return pdf graph_proj = ox.project_graph(_graph) map_con = InMemMap("map_con", use_latlon=False, index_edges=True, use_rtree=True) nodes, edges = ox.graph_to_gdfs(graph_proj, nodes=True, edges=True) for nid, row in nodes.iterrows(): try: map_con.add_node(nid, (float(row.geometry.y), float(row.geometry.x))) except Exception: continue for u, v, _ in _graph.edges: try: map_con.add_edge(u, v) except Exception: continue matcher = DistanceMatcher(map_con, max_dist=max_dist, min_prob_norm=0.5, non_emitting_length_factor=0.75, obs_noise=1, obs_noise_ne=50, dist_noise=50, max_lattice_width=3, non_emitting_states=True) matcher.match(points, unique=False, tqdm=tqdm) n_points = len(pdf) if n_points == len(matcher.lattice_best): matches = np.array([matcher.lattice_best[i].edge_m.pi for i in range(n_points)], dtype=float) else: raise Exception("The number of points in the dataframe are more than the number of matched points. " "You can try to increase the max_dist parameter.") # Transform matched coordinates back to lat/lon degrees try: inv_t = Transformer.from_crs("EPSG:3857", "EPSG:4326", always_xy=True) lon_m = matches[:, 1] lat_m = matches[:, 0] lon_deg, lat_deg = inv_t.transform(lon_m, lat_m) matched_df = pd.DataFrame({lon_col: lon_deg.tolist(), lat_col: lat_deg.tolist()}) except Exception: matched_df = pd.DataFrame({lon_col: matches[:, 1].tolist(), lat_col: matches[:, 0].tolist()}) if os.path.isdir("cache"): shutil.rmtree("cache", ignore_errors=True) return matched_df # ------------------------------ # Meili function unchanged # ------------------------------
[docs] def meili(df: pd.DataFrame | pl.DataFrame, lat_col: str = 'lat', lon_col: str = 'lon', time_col: str = 'time') -> pd.DataFrame | pl.DataFrame: """ Map-match a GPS trajectory using Valhalla's Meili algorithm. This function requires a Valhalla server running on localhost:8002. Meili uses a Hidden Markov Model to match GPS traces to roads, handling noise and gaps effectively. Parameters ---------- df : pd.DataFrame or pl.DataFrame Input trajectory with latitude, longitude, and time columns. lat_col : str, default='lat' Name of the latitude column. lon_col : str, default='lon' Name of the longitude column. time_col : str, default='time' Name of the time column. Returns ------- pd.DataFrame or pl.DataFrame Map-matched trajectory with coordinates snapped to roads. Includes 'original_lon' and 'original_lat' columns if time_col is present. Notes ----- Requires Valhalla server running on http://localhost:8002 Install and run Valhalla before using this function. See: https://github.com/valhalla/valhalla """ original_lon = df[lon_col].to_list() original_lat = df[lat_col].to_list() # Convert trajectory to JSON format for Valhalla API if isinstance(df, pd.DataFrame): meili_coordinates = df.to_json(orient='records') else: meili_coordinates = df.write_json(row_oriented=True) # Build Meili request body meili_request_body = f'{{"shape":{meili_coordinates},"search_radius": 100, "shape_match":"map_snap", "costing":"auto", "format":"osrm"}}' # Send request to Valhalla server try: r = requests.post( "http://localhost:8002/trace_route", data=meili_request_body, headers={'Content-type': 'application/json'} ) except requests.exceptions.RequestException as e: print(e) sys.exit(1) if r.status_code == 200: # Parse Valhalla response response_text = r.json() tracepoints = response_text.get('tracepoints', []) # Replace None entries with default values (happens when matching fails for a point) default_point = {'matchings_index': '#', 'name': '', 'waypoint_index': '#', 'alternatives_count': 0, 'distance': 0, 'location': [0.0, 0.0]} tracepoints = [tp if tp is not None else default_point for tp in tracepoints] if isinstance(df, pd.DataFrame): # Extract matched coordinates from tracepoints locations = np.array([tp['location'] for tp in tracepoints], dtype=float) lon_vals = locations[:, 0] lat_vals = locations[:, 1] # Build output DataFrame data_dict = {lon_col: lon_vals, lat_col: lat_vals} # Include time and original coordinates if time column is present if time_col in df.columns: data_dict[time_col] = df[time_col].to_numpy() data_dict['original_lon'] = original_lon data_dict['original_lat'] = original_lat matched_df = pd.DataFrame(data_dict) # Filter out failed matches (indicated by 0,0 coordinates) matched_df = matched_df[(matched_df[lat_col] != 0) & (matched_df[lon_col] != 0)] return matched_df else: # polars # Extract matched coordinates for polars locations = [tp['location'] for tp in tracepoints] lon_vals = [loc[0] for loc in locations] lat_vals = [loc[1] for loc in locations] matched_df = pl.DataFrame({ lon_col: lon_vals, lat_col: lat_vals }) # Filter out failed matches matched_df = matched_df.filter( (pl.col(lat_col) != 0) & (pl.col(lon_col) != 0) ) return matched_df else: print(f"Valhalla request failed, status code: {r.status_code}, reason: {r.reason}") return None