import os
import shutil
import hashlib
import math
from typing import Tuple, List, Dict
import numpy as np
import pandas as pd
import polars as pl
from scipy.spatial import cKDTree
from pyproj import Geod, Transformer
# Prefer osmium
try:
import osmium as osm # type: ignore
_HAVE_OSMIUM = True
except Exception:
osm = None
_HAVE_OSMIUM = False
# Fallback to osmnx if needed
try:
import osmnx as ox # type: ignore
_HAVE_OSMNX = True
except Exception:
ox = None
_HAVE_OSMNX = False
_GEOD = Geod(ellps="WGS84")
# Overpass cache directory
_OVERPASS_CACHE_DIR = os.path.join(os.getcwd(), "overpass_cache")
os.makedirs(_OVERPASS_CACHE_DIR, exist_ok=True)
# AEQD transformer cache
_transformer_cache: Dict[Tuple[float, float, int], Tuple[Transformer, Transformer]] = {}
def _get_aeqd_transformers(cen_lat: float, cen_lon: float, precision: int = 6):
key = (round(cen_lat, precision), round(cen_lon, precision), precision)
if key in _transformer_cache:
return _transformer_cache[key]
proj = f"+proj=aeqd +lat_0={cen_lat:.9f} +lon_0={cen_lon:.9f} +datum=WGS84 +units=m +no_defs"
fwd = Transformer.from_crs("EPSG:4326", proj, always_xy=True)
inv = Transformer.from_crs(proj, "EPSG:4326", always_xy=True)
_transformer_cache[key] = (fwd, inv)
if len(_transformer_cache) > 256:
_transformer_cache.pop(next(iter(_transformer_cache)))
return fwd, inv
# Driveability checks (close to osmnx semantics)
_HIGHWAY_WHITELIST = {
"motorway", "trunk", "primary", "secondary", "tertiary",
"unclassified", "residential", "living_street",
"motorway_link", "trunk_link", "primary_link",
"secondary_link", "tertiary_link", "service", "road"
}
_HIGHWAY_BLACKLIST = {"footway", "pedestrian", "steps", "path", "cycleway", "bridleway",
"proposed", "construction", "abandoned", "platform"}
_SERVICE_DENY = {"driveway", "parking_aisle", "alley"}
_ACCESS_DENY = {"no", "private", "customers", "permit", "delivery", "destination"}
def _resolve_tag_value(val):
if val is None:
return None
if isinstance(val, (list, tuple)):
v = val[0]
else:
v = val
return str(v).lower()
def _is_driveable_way(tags: Dict[str, str]) -> bool:
if not tags:
return False
hwy = _resolve_tag_value(tags.get("highway"))
if hwy is None:
return False
if hwy in _HIGHWAY_BLACKLIST:
return False
if (hwy not in _HIGHWAY_WHITELIST) and (not hwy.isdigit()) and (hwy != "road"):
return False
if _resolve_tag_value(tags.get("area")) == "yes":
return False
service = _resolve_tag_value(tags.get("service"))
if service in _SERVICE_DENY:
return False
if _resolve_tag_value(tags.get("route")) == "ferry":
return False
for k in ("motor_vehicle", "motorcar", "vehicle", "access"):
v = _resolve_tag_value(tags.get(k))
if v in _ACCESS_DENY:
return False
if _resolve_tag_value(tags.get("proposed")) == "yes" or _resolve_tag_value(tags.get("construction")) == "yes":
return False
return True
if _HAVE_OSMIUM:
class WayNodeCollector(osm.SimpleHandler):
def __init__(self, nodes: Dict[int, Tuple[float, float]], ways: Dict[int, Dict]):
super().__init__()
self.nodes = nodes
self.ways = ways
def node(self, n):
if n.location.valid():
self.nodes[n.id] = (float(n.location.lat), float(n.location.lon))
def way(self, w):
# In newer osmium versions, TagList doesn't have .items()
# Instead, iterate over tags directly where each tag has .k and .v attributes
tags = {tag.k: tag.v for tag in w.tags}
if not _is_driveable_way(tags):
return
node_refs = [nd.ref for nd in w.nodes]
if len(node_refs) < 2:
return
self.ways[w.id] = {"tags": tags, "nodes": node_refs}
import requests # local import allowed
def _bbox_hash(north: float, south: float, east: float, west: float, overlap: float):
key = f"{north:.6f}_{south:.6f}_{east:.6f}_{west:.6f}_{overlap:.6f}"
return hashlib.sha256(key.encode("utf8")).hexdigest()
def _overpass_cache_path(north: float, south: float, east: float, west: float, overlap: float):
h = _bbox_hash(north, south, east, west, overlap)
return os.path.join(_OVERPASS_CACHE_DIR, f"overpass_{h}.osm")
def _download_overpass_bbox(north: float, south: float, east: float, west: float,
timeout: int = 180, force_download: bool = False, overlap: float = 0.0) -> str:
cache_path = _overpass_cache_path(north, south, east, west, overlap)
if os.path.exists(cache_path) and not force_download:
return cache_path
bbox = f"{south},{west},{north},{east}"
q = f"""
[out:xml][timeout:{int(timeout)}];
(
way["highway"]({bbox});
);
(._;>;);
out body;
"""
url = "https://overpass-api.de/api/interpreter"
resp = requests.post(url, data={"data": q}, stream=True, timeout=timeout)
resp.raise_for_status()
with open(cache_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=1 << 20):
if chunk:
f.write(chunk)
return cache_path
def _split_bbox_into_tiles(north: float, south: float, east: float, west: float,
max_tile_deg: float = 0.5, overlap_deg: float = 0.05) -> List[Tuple[float, float, float, float]]:
# Early exit for invalid or single-tile bbox
if east <= west or north <= south:
return [(north, south, east, west)]
lat_spans = max(1, math.ceil((north - south) / max_tile_deg))
lon_spans = max(1, math.ceil((east - west) / max_tile_deg))
# Pre-compute tile edges for latitude and longitude
lat_edges = np.linspace(south, north, lat_spans + 1)
lon_edges = np.linspace(west, east, lon_spans + 1)
tiles = []
for i in range(lat_spans):
s = lat_edges[i]
n = lat_edges[i + 1]
# Calculate expanded boundaries with overlap
n_exp = min(n + overlap_deg, north)
s_exp = max(s - overlap_deg, south)
for j in range(lon_spans):
w = lon_edges[j]
e = lon_edges[j + 1]
e_exp = min(e + overlap_deg, east)
w_exp = max(w - overlap_deg, west)
tiles.append((n_exp, s_exp, e_exp, w_exp))
return tiles
def _build_segment_index(north: float, south: float, east: float, west: float,
tile_max_deg: float = 0.5,
tile_overlap_deg: float = 0.05,
cache_overpass: bool = True,
aeqd_precision: int = 6) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
# Initialize lists to collect segment data
seg_ids_list = []
centroid_x_list = []
centroid_y_list = []
seg_points_list = []
tiles = _split_bbox_into_tiles(north, south, east, west, max_tile_deg=tile_max_deg, overlap_deg=tile_overlap_deg)
for (tn, ts, te, tw) in tiles:
try:
osm_path = _download_overpass_bbox(tn, ts, te, tw, overlap=tile_overlap_deg, force_download=not cache_overpass)
except Exception:
continue
nodes = {}
ways = {}
if _HAVE_OSMIUM:
handler = WayNodeCollector(nodes, ways)
handler.apply_file(osm_path, locations=True)
elif _HAVE_OSMNX:
try:
G = ox.graph_from_bbox(tn, ts, te, tw, network_type="drive", simplify=False)
nodes_gdf, edges_gdf = ox.graph_to_gdfs(G, nodes=True, edges=True)
for nid, row in nodes_gdf.iterrows():
nodes[nid] = (float(row.geometry.y), float(row.geometry.x))
for idx, row in edges_gdf.iterrows():
geom = row.geometry
try:
coords = list(geom.coords)
if len(coords) >= 2:
wid = int(hash((coords[0], coords[-1], idx)) & 0xFFFFFFFF)
node_seq = []
for c in coords:
node_id = int(hash(c) & 0x7FFFFFFF)
nodes.setdefault(node_id, (float(c[1]), float(c[0])))
node_seq.append(node_id)
ways[wid] = {"tags": {}, "nodes": node_seq}
except Exception:
continue
except Exception:
continue
else:
raise ImportError("Neither osmium nor osmnx available to build OSM segments.")
if len(ways) == 0:
continue
# Calculate tile centroid and get AEQD transformer for this tile
cen_lat = (tn + ts) * 0.5
cen_lon = (te + tw) * 0.5
fwd, _ = _get_aeqd_transformers(cen_lat, cen_lon, precision=aeqd_precision)
for way_id, w in ways.items():
node_seq = w["nodes"]
# Process all consecutive node pairs in this way
num_pairs = len(node_seq) - 1
for i in range(num_pairs):
a = node_seq[i]
b = node_seq[i + 1]
if a not in nodes or b not in nodes:
continue
lat_a, lon_a = nodes[a]
lat_b, lon_b = nodes[b]
try:
xa, ya = fwd.transform(lon_a, lat_a)
xb, yb = fwd.transform(lon_b, lat_b)
except Exception:
xa, ya = float(lon_a), float(lat_a)
xb, yb = float(lon_b), float(lat_b)
# Calculate segment centroid
cx = (xa + xb) * 0.5
cy = (ya + yb) * 0.5
seg_ids_list.append(int(way_id))
centroid_x_list.append(float(cx))
centroid_y_list.append(float(cy))
seg_points_list.append((lat_a, lon_a, lat_b, lon_b))
if len(seg_ids_list) == 0:
return (np.array([], dtype=int),
np.array([], dtype=float),
np.array([], dtype=float),
np.empty((0, 4), dtype=float))
return (np.array(seg_ids_list, dtype=int),
np.array(centroid_x_list, dtype=float),
np.array(centroid_y_list, dtype=float),
np.array(seg_points_list, dtype=float))
def __lengths(df_matched: pd.DataFrame | pl.DataFrame,
df_true: pd.DataFrame | pl.DataFrame,
lat_col: str = 'lat',
lon_col: str = 'lon',
delete_cache: bool = True,
tile_max_deg: float = 0.5,
tile_overlap_deg: float = 0.05,
cache_overpass: bool = True,
aeqd_precision: int = 6) -> Tuple[float, float, float]:
geod = _GEOD
# Prepare data based on input type
if isinstance(df_matched, pd.DataFrame) and isinstance(df_true, pd.DataFrame):
_df_matched = df_matched.copy().reset_index(drop=True)
_df_true = df_true.copy().reset_index(drop=True)
input_is_polars = False
combined = pd.concat([_df_matched, _df_true], ignore_index=True)
elif isinstance(df_matched, pl.DataFrame) and isinstance(df_true, pl.DataFrame):
_df_matched = df_matched.clone()
_df_true = df_true.clone()
input_is_polars = True
# Combine coordinates for bounding box calculation
lons = _df_matched[lon_col].to_list() + _df_true[lon_col].to_list()
lats = _df_matched[lat_col].to_list() + _df_true[lat_col].to_list()
combined = pd.DataFrame({lon_col: lons, lat_col: lats})
else:
raise ValueError("df_matched and df_true must be the same type (both pandas or both polars).")
# Early exit for empty data
if len(combined) == 0:
return 0.0, 0.0, 0.0
# Calculate bounding box from all coordinates
combined_lons = combined[lon_col].to_numpy(dtype=float)
combined_lats = combined[lat_col].to_numpy(dtype=float)
west, east = float(combined_lons.min()), float(combined_lons.max())
south, north = float(combined_lats.min()), float(combined_lats.max())
seg_ids, cent_x, cent_y, seg_points = _build_segment_index(north, south, east, west,
tile_max_deg=tile_max_deg,
tile_overlap_deg=tile_overlap_deg,
cache_overpass=cache_overpass,
aeqd_precision=aeqd_precision)
# If no road segments found, fall back to simple geometric trajectory comparison
if seg_ids.size == 0:
def _sum_length_from_df(df):
if isinstance(df, pl.DataFrame):
lon_arr = np.array(df[lon_col].to_list(), dtype=float)
lat_arr = np.array(df[lat_col].to_list(), dtype=float)
else:
lon_arr = df[lon_col].to_numpy(dtype=float)
lat_arr = df[lat_col].to_numpy(dtype=float)
if len(lon_arr) < 2:
return 0.0
# Calculate geodesic distances between consecutive points
_, _, lens = geod.inv(lon_arr[:-1], lat_arr[:-1], lon_arr[1:], lat_arr[1:])
return float(np.nansum(np.abs(lens)))
matched_length = _sum_length_from_df(_df_matched)
true_length = _sum_length_from_df(_df_true)
if matched_length == 0.0:
matched_length = true_length
# Calculate common length by comparing trajectories directly
# When no road data available, use simple coordinate matching
common_length = 0.0
# Extract coordinates for comparison
if input_is_polars:
m_lons = np.array(_df_matched.get_column(lon_col).to_list(), dtype=float)
m_lats = np.array(_df_matched.get_column(lat_col).to_list(), dtype=float)
t_lons = np.array(_df_true.get_column(lon_col).to_list(), dtype=float)
t_lats = np.array(_df_true.get_column(lat_col).to_list(), dtype=float)
else:
m_lons = _df_matched[lon_col].to_numpy(dtype=float)
m_lats = _df_matched[lat_col].to_numpy(dtype=float)
t_lons = _df_true[lon_col].to_numpy(dtype=float)
t_lats = _df_true[lat_col].to_numpy(dtype=float)
# Iterate over segments in the shorter trajectory
min_len = min(len(m_lons), len(t_lons))
if min_len > 1:
for i in range(min_len - 1):
# Check if points are approximately the same (within 1 meter)
# This handles rounding differences and small GPS variations
dist1 = geod.inv(m_lons[i], m_lats[i], t_lons[i], t_lats[i])[2]
dist2 = geod.inv(m_lons[i+1], m_lats[i+1], t_lons[i+1], t_lats[i+1])[2]
# If both endpoints match within 1m, count the segment
if abs(dist1) < 1.0 and abs(dist2) < 1.0:
_, _, seglen = geod.inv(m_lons[i], m_lats[i], m_lons[i+1], m_lats[i+1])
common_length += abs(seglen)
if delete_cache:
if os.path.isdir("cache"):
shutil.rmtree("cache", ignore_errors=True)
if os.path.isdir(_OVERPASS_CACHE_DIR):
shutil.rmtree(_OVERPASS_CACHE_DIR, ignore_errors=True)
return matched_length, true_length, common_length
# Calculate global AEQD projection centered on the bounding box
cen_lat = (north + south) * 0.5
cen_lon = (east + west) * 0.5
fwd_global, _ = _get_aeqd_transformers(cen_lat, cen_lon, precision=aeqd_precision)
# Extract segment endpoint coordinates
lat_a = seg_points[:, 0]
lon_a = seg_points[:, 1]
lat_b = seg_points[:, 2]
lon_b = seg_points[:, 3]
# Transform segment endpoints to AEQD projection
xa, ya = fwd_global.transform(lon_a, lat_a)
xb, yb = fwd_global.transform(lon_b, lat_b)
# Calculate segment centroids in projected space
centroid_x = (xa + xb) * 0.5
centroid_y = (ya + yb) * 0.5
# Build spatial index (KDTree) for fast segment lookup
tree = cKDTree(np.column_stack((centroid_x, centroid_y)))
# Extract coordinate arrays from input dataframes
if input_is_polars:
m_lons = np.array(_df_matched.get_column(lon_col).to_list(), dtype=float)
m_lats = np.array(_df_matched.get_column(lat_col).to_list(), dtype=float)
t_lons = np.array(_df_true.get_column(lon_col).to_list(), dtype=float)
t_lats = np.array(_df_true.get_column(lat_col).to_list(), dtype=float)
else:
m_lons = _df_matched[lon_col].to_numpy(dtype=float)
m_lats = _df_matched[lat_col].to_numpy(dtype=float)
t_lons = _df_true[lon_col].to_numpy(dtype=float)
t_lats = _df_true[lat_col].to_numpy(dtype=float)
# Early exit for empty trajectories
if len(m_lons) == 0 or len(t_lons) == 0:
return 0.0, 0.0, 0.0
# Transform trajectory points to AEQD projection
mx, my = fwd_global.transform(m_lons, m_lats)
tx, ty = fwd_global.transform(t_lons, t_lats)
# Find nearest road segment for each trajectory point
_, idx_matched = tree.query(np.column_stack((mx, my)))
_, idx_true = tree.query(np.column_stack((tx, ty)))
# Map trajectory points to their nearest road segment IDs
matched_seg_ids = seg_ids[idx_matched]
true_seg_ids = seg_ids[idx_true]
# Calculate common length by finding consecutive matching segments
# A segment is "common" if both trajectories are on the same road
common_length = 0.0
if not input_is_polars:
dfm = _df_matched.reset_index(drop=True)
dft = _df_true.reset_index(drop=True)
dfm['road_id'] = matched_seg_ids
dft['road_id'] = true_seg_ids
# Extract arrays for iteration
road_ids_m = dfm['road_id'].to_numpy(dtype=int)
road_ids_t = dft['road_id'].to_numpy(dtype=int)
lons_m = dfm[lon_col].to_numpy(dtype=float)
lats_m = dfm[lat_col].to_numpy(dtype=float)
# Use minimum length to avoid index errors when trajectories have different sizes
npoints = min(len(dfm), len(dft))
for i in range(npoints - 1):
# Current segment goes from point i to point i+1
# Check if both points are on matching roads in both trajectories
cur_m = road_ids_m[i]
cur_t = road_ids_t[i]
next_m = road_ids_m[i + 1]
next_t = road_ids_t[i + 1]
# Segment is common if BOTH endpoints are on matching roads
# This ensures we're counting the same road segment in both trajectories
if (cur_m == cur_t) and (next_m == next_t):
lon1 = lons_m[i]
lat1 = lats_m[i]
lon2 = lons_m[i + 1]
lat2 = lats_m[i + 1]
_, _, seglen = geod.inv(lon1, lat1, lon2, lat2)
common_length += abs(seglen)
else:
dfm = _df_matched.with_column(pl.Series(name='road_id', values=matched_seg_ids.tolist()))
dft = _df_true.with_column(pl.Series(name='road_id', values=true_seg_ids.tolist()))
# Use minimum length to avoid index errors when trajectories have different sizes
npoints = min(dfm.height, dft.height)
for i in range(npoints - 1):
# Current segment goes from point i to point i+1
# Check if both points are on matching roads in both trajectories
cur_m = int(dfm[i, 'road_id'])
cur_t = int(dft[i, 'road_id'])
next_m = int(dfm[i + 1, 'road_id'])
next_t = int(dft[i + 1, 'road_id'])
# Segment is common if BOTH endpoints are on matching roads
# This ensures we're counting the same road segment in both trajectories
if (cur_m == cur_t) and (next_m == next_t):
lon1 = float(dfm[i, lon_col])
lat1 = float(dfm[i, lat_col])
lon2 = float(dfm[i + 1, lon_col])
lat2 = float(dfm[i + 1, lat_col])
_, _, seglen = geod.inv(lon1, lat1, lon2, lat2)
common_length += abs(seglen)
# Helper function to calculate total trajectory length
def _sum_geodesic(lon_arr, lat_arr):
if len(lon_arr) < 2:
return 0.0
_, _, lens = geod.inv(lon_arr[:-1], lat_arr[:-1], lon_arr[1:], lat_arr[1:])
return float(np.nansum(np.abs(lens)))
# Calculate total trajectory lengths
if not input_is_polars:
# Use already extracted arrays for pandas
matched_length = _sum_geodesic(lons_m, lats_m)
true_length = _sum_geodesic(t_lons, t_lats)
else:
matched_length = _sum_geodesic(
np.array(dfm.get_column(lon_col).to_list(), dtype=float),
np.array(dfm.get_column(lat_col).to_list(), dtype=float)
)
true_length = _sum_geodesic(
np.array(dft.get_column(lon_col).to_list(), dtype=float),
np.array(dft.get_column(lat_col).to_list(), dtype=float)
)
if matched_length == 0.0:
matched_length = true_length
if common_length == 0.0:
if delete_cache:
if os.path.isdir("cache"):
shutil.rmtree("cache", ignore_errors=True)
if os.path.isdir(_OVERPASS_CACHE_DIR):
shutil.rmtree(_OVERPASS_CACHE_DIR, ignore_errors=True)
return matched_length, true_length, 0.0
if delete_cache:
if os.path.isdir("cache"):
shutil.rmtree("cache", ignore_errors=True)
if os.path.isdir(_OVERPASS_CACHE_DIR):
shutil.rmtree(_OVERPASS_CACHE_DIR, ignore_errors=True)
return matched_length, true_length, common_length
# Public wrappers with tile_overlap_deg default 0.05
[docs]
def optimized_lengths(df_matched: pd.DataFrame | pl.DataFrame,
df_true: pd.DataFrame | pl.DataFrame,
lat_col: str = 'lat',
lon_col: str = 'lon',
delete_cache: bool = True,
tile_max_deg: float = 0.5,
tile_overlap_deg: float = 0.05,
cache_overpass: bool = True,
aeqd_precision: int = 6) -> Tuple[float, float, float]:
return __lengths(df_matched, df_true, lat_col, lon_col, delete_cache,
tile_max_deg, tile_overlap_deg, cache_overpass, aeqd_precision)
[docs]
def rmf(df_matched: pd.DataFrame | pl.DataFrame,
df_true: pd.DataFrame | pl.DataFrame,
lat_col: str = 'lat',
lon_col: str = 'lon',
delete_cache: bool = True,
tile_max_deg: float = 0.5,
tile_overlap_deg: float = 0.05,
cache_overpass: bool = True,
aeqd_precision: int = 6) -> float:
matched_length, true_length, common_length = __lengths(df_matched, df_true, lat_col, lon_col, delete_cache,
tile_max_deg, tile_overlap_deg, cache_overpass, aeqd_precision)
if true_length == 0.0:
return 0.0
rmf_value = (matched_length + true_length - 2.0 * common_length) / true_length
return float(abs(round(rmf_value, 10)))
[docs]
def recall(df_matched: pd.DataFrame | pl.DataFrame,
df_true: pd.DataFrame | pl.DataFrame,
lat_col: str = 'lat',
lon_col: str = 'lon',
delete_cache: bool = True,
tile_max_deg: float = 0.5,
tile_overlap_deg: float = 0.05,
cache_overpass: bool = True,
aeqd_precision: int = 6) -> float:
matched_length, true_length, common_length = __lengths(df_matched, df_true, lat_col, lon_col, delete_cache,
tile_max_deg, tile_overlap_deg, cache_overpass, aeqd_precision)
if true_length == 0.0:
return 0.0
return float(round(common_length / true_length, 10))
[docs]
def precision(df_matched: pd.DataFrame | pl.DataFrame,
df_true: pd.DataFrame | pl.DataFrame,
lat_col: str = 'lat',
lon_col: str = 'lon',
delete_cache: bool = True,
tile_max_deg: float = 0.5,
tile_overlap_deg: float = 0.05,
cache_overpass: bool = True,
aeqd_precision: int = 6) -> float:
matched_length, true_length, common_length = __lengths(df_matched, df_true, lat_col, lon_col, delete_cache,
tile_max_deg, tile_overlap_deg, cache_overpass, aeqd_precision)
if matched_length == 0.0:
return 0.0
return float(round(common_length / matched_length, 10))
[docs]
def f1(df_matched: pd.DataFrame | pl.DataFrame,
df_true: pd.DataFrame | pl.DataFrame,
lat_col: str = 'lat',
lon_col: str = 'lon',
delete_cache: bool = True,
tile_max_deg: float = 0.5,
tile_overlap_deg: float = 0.05,
cache_overpass: bool = True,
aeqd_precision: int = 6) -> float:
matched_length, true_length, common_length = __lengths(df_matched, df_true, lat_col, lon_col, delete_cache,
tile_max_deg, tile_overlap_deg, cache_overpass, aeqd_precision)
if matched_length == 0.0 or true_length == 0.0:
return 0.0
prec = common_length / matched_length
rec = common_length / true_length
if prec == 0.0 or rec == 0.0:
return 0.0
return float(2.0 * (prec * rec) / (prec + rec))
[docs]
def length_index(df_matched: pd.DataFrame | pl.DataFrame,
df_original: pd.DataFrame | pl.DataFrame,
lat_col: str = 'lat',
lon_col: str = 'lon') -> float:
geod = _GEOD
# Helper function to calculate trajectory length
def _sum_len_df(df):
if isinstance(df, pl.DataFrame):
lon_arr = np.array(df[lon_col].to_list(), dtype=float)
lat_arr = np.array(df[lat_col].to_list(), dtype=float)
else:
lon_arr = df[lon_col].to_numpy(dtype=float)
lat_arr = df[lat_col].to_numpy(dtype=float)
if len(lon_arr) < 2:
return 0.0
# Calculate geodesic distances between consecutive points
_, _, lens = geod.inv(lon_arr[:-1], lat_arr[:-1], lon_arr[1:], lat_arr[1:])
return float(np.nansum(np.abs(lens)))
total_matched = _sum_len_df(df_matched)
total_raw = _sum_len_df(df_original)
return float(total_matched / total_raw) if total_raw != 0.0 else 0.0
[docs]
def clear_overpass_cache():
"""Delete the on-disk Overpass cache directory (overpass_cache/)."""
if os.path.isdir(_OVERPASS_CACHE_DIR):
shutil.rmtree(_OVERPASS_CACHE_DIR, ignore_errors=True)