"""
Trajectory interpolation module for pyehicle.
This module provides functions to resample GPS trajectories by interpolating points along
geodesic paths (great-circle routes). Interpolation is useful for:
- Increasing trajectory density for smoother visualization
- Normalizing sampling rates across different data sources
- Preparing trajectories for analysis requiring uniform spacing
- Filling gaps in sparse GPS data
The module implements geodesic interpolation using pyproj's WGS84 ellipsoid calculations,
ensuring accurate distances and bearings across all latitudes. Two interpolation modes:
1. **by_number_of_points()**: Resample to exact point count (spatial uniformity)
2. **by_sampling_rate()**: Resample to temporal interval (temporal uniformity)
"""
import numpy as np
import pandas as pd
import polars as pl
from scipy.interpolate import interp1d
from pyproj import Geod
from pyehicle.preprocessing.sampling_rate import get_sampling_rate
from typing import Union, Tuple
# single Geod instance reused for speed
_GEOD = Geod(ellps="WGS84")
def _interpolate_geo_coord_pyproj(lat1: float, lon1: float,
lat2: float, lon2: float,
fraction: float) -> Tuple[float, float]:
"""
Interpolate geodesically between (lat1, lon1) and (lat2, lon2) by fraction [0..1].
Returns (lat_interp, lon_interp).
"""
# pyproj.Geod.inv expects lon, lat order
az12, az21, s12 = _GEOD.inv(lon1, lat1, lon2, lat2)
s = float(s12) * float(fraction)
lon_i, lat_i, back_az = _GEOD.fwd(lon1, lat1, az12, s)
return float(lat_i), float(lon_i)
def _vectorized_interpolate_segment(lat_a: np.ndarray, lon_a: np.ndarray,
lat_b: np.ndarray, lon_b: np.ndarray,
fractions: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Vectorized interpolation for arrays of segment endpoints and fractions.
- lat_a, lon_a, lat_b, lon_b, fractions are 1D arrays of same length m.
Returns arrays (lat_interp, lon_interp) of shape (m,).
"""
# Compute forward azimuth and geodesic distance s12 using vectorized inv
# pyproj.Geod.inv supports array inputs in lon, lat order
# It returns arrays: az12, az21, s12
az12, az21, s12 = _GEOD.inv(lon_a, lat_a, lon_b, lat_b)
# distance to interpolate
s_interp = s12 * fractions
# Use Geod.fwd vectorized: returns lon2, lat2, back_az
lon_i, lat_i, _ = _GEOD.fwd(lon_a, lat_a, az12, s_interp)
return np.asarray(lat_i, dtype=float), np.asarray(lon_i, dtype=float)
[docs]
def by_number_of_points(df: Union[pd.DataFrame, pl.DataFrame],
num: int,
lat_col: str = "lat",
lon_col: str = "lon",
time_col: str = "time") -> Union[pd.DataFrame, pl.DataFrame]:
"""
Resample trajectory to exact number of points using geodesic interpolation.
This function resamples a GPS trajectory to contain exactly `num` points by interpolating
along geodesic (great-circle) paths between original points. Points are distributed
uniformly by distance along the trajectory, and timestamps are interpolated proportionally.
This is useful for:
- Normalizing trajectory density across datasets
- Preparing trajectories for fixed-size neural network inputs
- Creating smoother visualizations
- Standardizing trajectories for comparison
Parameters
----------
df : pd.DataFrame or pl.DataFrame
Input trajectory with at least latitude, longitude, and optionally time columns.
Minimum 2 points required.
num : int
Target number of points in the output trajectory. Must be > 2.
Start and end points are always preserved.
lat_col : str, default='lat'
Name of the latitude column (WGS84 decimal degrees).
lon_col : str, default='lon'
Name of the longitude column (WGS84 decimal degrees).
time_col : str, default='time'
Name of the time column. If present, timestamps are interpolated linearly
by cumulative distance. If not present, output contains only lat/lon.
Returns
-------
pd.DataFrame or pl.DataFrame
Resampled trajectory with exactly `num` points. Returns same type as input.
Contains columns: lat_col, lon_col, and time_col (if present in input).
Points are distributed uniformly by distance along the geodesic path.
Examples
--------
>>> import pandas as pd
>>> import pyehicle as pye
>>>
>>> # Load trajectory with variable spacing
>>> df = pd.read_csv('trajectory.csv')
>>> print(f"Original: {len(df)} points")
>>>
>>> # Resample to exactly 100 points
>>> resampled = pye.preprocessing.by_number_of_points(df, num=100)
>>> print(f"Resampled: {len(resampled)} points") # Exactly 100
>>>
>>> # Increase density for smoother visualization
>>> dense = pye.preprocessing.by_number_of_points(df, num=500)
>>> pye.utilities.visualization.single(dense, name='Dense Trajectory')
Notes
-----
**Algorithm:**
1. Calculate cumulative geodesic distances along the original trajectory
2. Define `num` target distances uniformly spaced from 0 to total_distance
3. For each target distance, find containing segment and interpolation fraction
4. Interpolate coordinates geodesically using pyproj Geod.fwd()
5. Interpolate timestamps linearly by distance (if time column present)
**Geodesic Interpolation:**
- Uses WGS84 ellipsoid (pyproj Geod)
- Accurate for all distances and latitudes
- Interpolates along great-circle paths (shortest distance on sphere)
- Preserves exact start and end points
**Performance:**
- Time complexity: O(n + m) where n = input points, m = output points
- Vectorized operations for efficiency
- Fast even for large trajectories (>10,000 points)
**Use Cases:**
- Standardizing trajectory density for machine learning
- Creating smooth animations with fixed frame counts
- Upsampling sparse trajectories (num > len(df))
- Downsampling dense trajectories (num < len(df))
See Also
--------
by_sampling_rate : Resample by temporal interval
get_sampling_rate : Calculate current sampling rate
"""
# Validation
if num <= 2:
raise ValueError('"num" must be > 2 (start and end included).')
if isinstance(df, pl.DataFrame):
pdf = df.to_pandas()
else:
pdf = df.copy()
if len(pdf) < 2:
raise ValueError("Input trajectory must contain at least two points")
lats = pdf[lat_col].to_numpy(dtype=float)
lons = pdf[lon_col].to_numpy(dtype=float)
# compute segment distances (vectorized)
# Geod.inv wants lon, lat ordering
az12, az21, s12 = _GEOD.inv(lons[:-1], lats[:-1], lons[1:], lats[1:])
segment_distances = np.asarray(s12, dtype=float) # meters
total_distance = float(segment_distances.sum())
# cumulative distances at segment boundaries: xi[0]=0, xi[1]=d0, xi[2]=d0+d1, ..., xi[len]=total
xi = np.concatenate(([0.0], np.cumsum(segment_distances)))
# target distances along the path for the new points
target_d = np.linspace(0.0, total_distance, num=num, endpoint=True)
# for each target_d determine which segment it falls into:
# segment index idx such that xi[idx] <= target_d < xi[idx+1], except last point
# numpy.searchsorted gives insertion index; subtract 1 to get left index
idx = np.searchsorted(xi, target_d, side="right") - 1
# fix boundary cases: for exact total_distance, idx may equal len(segment_distances)
idx[idx == len(segment_distances)] = len(segment_distances) - 1
# fraction along that segment
seg_left = xi[idx]
seg_len = segment_distances[idx]
# guard against zero-length segments
with np.errstate(divide="ignore", invalid="ignore"):
fraction = (target_d - seg_left) / seg_len
# for points exactly at a node where seg_len == 0, set fraction 0
fraction = np.nan_to_num(fraction, nan=0.0, posinf=0.0, neginf=0.0)
# prepare arrays of segment endpoints for vectorized interpolation
lat_a = lats[idx]
lon_a = lons[idx]
lat_b = lats[idx + 1]
lon_b = lons[idx + 1]
# vectorized geodesic interpolation per-target
lat_interp, lon_interp = _vectorized_interpolate_segment(lat_a, lon_a, lat_b, lon_b, fraction)
# If time column present, interpolate by cumulative distance using linear interpolation
if time_col in pdf.columns:
times = pd.to_datetime(pdf[time_col])
# seconds from start
t_seconds = (times - times.iloc[0]).dt.total_seconds().to_numpy(dtype=float)
# xi aligns with nodes; do interp on xi -> t_seconds
time_interp_seconds = interp1d(xi, t_seconds, kind="linear", fill_value="extrapolate")(target_d)
times_out = times.iloc[0] + pd.to_timedelta(time_interp_seconds, unit="s")
out_pdf = pd.DataFrame({time_col: times_out, lat_col: lat_interp, lon_col: lon_interp})
# ensure first time equals original start exactly (avoid float rounding)
out_pdf[time_col].iat[0] = times.iloc[0]
else:
out_pdf = pd.DataFrame({lat_col: lat_interp, lon_col: lon_interp})
if isinstance(df, pl.DataFrame):
return pl.from_pandas(out_pdf)
return out_pdf
[docs]
def by_sampling_rate(df: Union[pd.DataFrame, pl.DataFrame],
target_sampling_rate: float,
lat_col: str = "lat",
lon_col: str = "lon",
time_col: str = "time") -> Union[pd.DataFrame, pl.DataFrame]:
"""
Resample trajectory to uniform temporal interval using geodesic interpolation.
This function resamples a GPS trajectory to a constant time interval (sampling rate)
by interpolating coordinates at regular temporal intervals. This creates trajectories
with uniform time spacing, essential for time-series analysis and temporal modeling.
Coordinates are interpolated geodesically along great-circle paths between original
points, maintaining geographic accuracy across all latitudes and distances.
This is useful for:
- Normalizing temporal resolution across datasets
- Preparing trajectories for time-series analysis
- Synchronizing trajectories from different devices
- Creating animations with consistent frame timing
Parameters
----------
df : pd.DataFrame or pl.DataFrame
Input trajectory with latitude, longitude, and time columns. Minimum 2 points.
target_sampling_rate : float
Target time interval in seconds between consecutive points. Must be positive
and smaller than the current sampling rate. Typical values:
- 1.0s: High-frequency (1 Hz)
- 5.0s: Standard GPS logging
- 10.0s: Low-frequency tracking
lat_col : str, default='lat'
Name of the latitude column (WGS84 decimal degrees).
lon_col : str, default='lon'
Name of the longitude column (WGS84 decimal degrees).
time_col : str, default='time'
Name of the time column. Must be parseable by pandas.to_datetime().
Returns
-------
pd.DataFrame or pl.DataFrame
Resampled trajectory with uniform temporal spacing. Returns same type as input.
Contains columns: lat_col, lon_col, time_col.
Points are spaced at exact `target_sampling_rate` intervals.
Duplicates removed if present.
Raises
------
ValueError
- If target_sampling_rate <= 0
- If target_sampling_rate > current sampling rate (would require downsampling)
Examples
--------
>>> import pandas as pd
>>> import pyehicle as pye
>>>
>>> # Load trajectory with variable sampling rate
>>> df = pd.read_csv('trajectory.csv')
>>> current_rate = pye.preprocessing.get_sampling_rate(df)
>>> print(f"Current sampling rate: {current_rate}s")
>>>
>>> # Resample to 5-second intervals
>>> resampled = pye.preprocessing.by_sampling_rate(df, target_sampling_rate=5.0)
>>> new_rate = pye.preprocessing.get_sampling_rate(resampled)
>>> print(f"New sampling rate: {new_rate}s") # Should be ~5.0
>>>
>>> # Synchronize multiple trajectories to same temporal resolution
>>> traj1 = pye.preprocessing.by_sampling_rate(df1, target_sampling_rate=1.0)
>>> traj2 = pye.preprocessing.by_sampling_rate(df2, target_sampling_rate=1.0)
>>> # Now both have 1-second intervals for comparison
Notes
-----
**Algorithm:**
1. Convert timestamps to seconds from start
2. Generate target times at uniform intervals: 0, Δt, 2Δt, 3Δt, ...
3. For each target time, find containing time segment
4. Calculate interpolation fraction within segment
5. Interpolate coordinates geodesically at that time
6. Convert back to datetime format
**Geodesic Interpolation:**
- Uses WGS84 ellipsoid for accurate great-circle paths
- Coordinates interpolated along shortest distance on sphere
- Maintains geographic accuracy at all latitudes
- Vectorized operations for performance
**Temporal Requirements:**
- `target_sampling_rate` must be smaller than current rate
- This function increases temporal resolution (more frequent sampling)
- For decreasing resolution (downsampling), use data selection instead
**Performance:**
- Time complexity: O(n + m) where n = input points, m = output points
- Vectorized numpy operations throughout
- Fast for typical trajectories (<10,000 points)
**Use Cases:**
- Preparing data for LSTM/RNN models (uniform time steps)
- Creating smooth real-time visualizations
- Synchronizing multi-sensor data
- Generating training data with consistent temporal resolution
See Also
--------
by_number_of_points : Resample by fixed point count
get_sampling_rate : Calculate current sampling rate
"""
if isinstance(df, pl.DataFrame):
pdf = df.to_pandas()
else:
pdf = df.copy()
if target_sampling_rate <= 0:
raise ValueError("target_sampling_rate must be positive")
if len(pdf) < 2:
return df.copy()
# check sampling_rate validity using pyehicle helper (keeps your original behavior)
sr = get_sampling_rate(df, time_col)
if target_sampling_rate > sr:
raise ValueError('"target_sampling_rate" must be less than the sampling rate of the dataframe')
pdf[time_col] = pd.to_datetime(pdf[time_col])
times = pdf[time_col].to_numpy(dtype="datetime64[ns]")
t0 = times[0]
# seconds from start
time_seconds = (times - t0).astype("timedelta64[ns]").astype("int64") / 1e9
time_seconds = time_seconds.astype(float)
total_time = float(time_seconds[-1] - time_seconds[0])
if total_time <= 0:
# degenerate time series
if isinstance(df, pl.DataFrame):
return df.clone()
return df.copy()
# target times in seconds from start
target_times = np.arange(0.0, total_time + 1e-9, target_sampling_rate, dtype=float)
# precompute coordinates arrays
lats = pdf[lat_col].to_numpy(dtype=float)
lons = pdf[lon_col].to_numpy(dtype=float)
# For each target_time we need to find containing segment index and fraction
# Using vectorized searchsorted:
idx = np.searchsorted(time_seconds, target_times, side="right") - 1
# clamp
idx[idx < 0] = 0
idx[idx >= (len(time_seconds) - 1)] = len(time_seconds) - 2
t_left = time_seconds[idx]
t_right = time_seconds[idx + 1]
with np.errstate(divide="ignore", invalid="ignore"):
fraction = (target_times - t_left) / (t_right - t_left)
fraction = np.nan_to_num(fraction, nan=0.0, posinf=0.0, neginf=0.0)
# segment endpoints arrays
lat_a = lats[idx]
lon_a = lons[idx]
lat_b = lats[idx + 1]
lon_b = lons[idx + 1]
# interpolate geodesically
lat_interp, lon_interp = _vectorized_interpolate_segment(lat_a, lon_a, lat_b, lon_b, fraction)
# timestamps back to pandas times (add to t0)
times_out = pd.to_datetime(t0) + pd.to_timedelta(target_times, unit="s")
out_pdf = pd.DataFrame({lat_col: lat_interp, lon_col: lon_interp, time_col: times_out})
# Remove duplicates if any
out_pdf = out_pdf.drop_duplicates(subset=[lat_col, lon_col, time_col], keep="first").reset_index(drop=True)
if isinstance(df, pl.DataFrame):
return pl.from_pandas(out_pdf)
return out_pdf