Source code for pyehicle.preprocessing.compression

"""
Trajectory compression module for pyehicle.

This module provides spatio-temporal compression functionality to reduce the size
of GPS trajectories while preserving their essential shape and characteristics.
It uses DBSCAN-style clustering with KD-trees for efficient neighbor searches.
"""

import warnings
from typing import Optional, Union, Literal

import numpy as np
import pandas as pd
import polars as pl
from pyproj import Geod, Transformer

# Try to import cKDTree for fast spatial queries (10-100x faster than brute force)
try:
    from scipy.spatial import cKDTree as KDTree  # type: ignore
    _have_kdtree = True
except Exception:
    KDTree = None  # type: ignore
    _have_kdtree = False


# ======================== Helper Functions ========================


def _mean_timestamp_ns(arr: np.ndarray) -> np.datetime64:
    """
    Calculate the mean timestamp from an array of datetime64 values.

    This helper computes the average timestamp by converting to nanosecond integers,
    averaging, and converting back. This approach handles datetime arithmetic correctly.

    Parameters
    ----------
    arr : np.ndarray
        Array of datetime64 values.

    Returns
    -------
    np.datetime64
        The mean timestamp as a datetime64[ns] value.
    """
    # Convert to nanosecond integers for averaging
    ints = arr.astype("datetime64[ns]").astype("int64")
    avg = int(np.round(ints.mean()))
    # Convert back to datetime64
    return np.datetime64(avg, "ns")


def _to_ns_array(series_or_array) -> np.ndarray:
    """
    Convert various time representations to a numpy datetime64[ns] array.

    Handles pandas Series, numpy arrays, datetime strings, and other common
    time formats. Ensures consistent datetime64[ns] output for time calculations.

    Parameters
    ----------
    series_or_array : Series, array-like, or datetime-like
        Input time data in any common format.

    Returns
    -------
    np.ndarray
        Array of datetime64[ns] values.
    """
    arr = np.asarray(series_or_array)

    # Handle empty arrays
    if arr.size == 0:
        return np.array([], dtype="datetime64[ns]")

    # If already datetime64, just convert to nanosecond precision
    if np.issubdtype(arr.dtype, np.datetime64):
        return arr.astype("datetime64[ns]")

    # Try direct pandas conversion
    try:
        return pd.to_datetime(arr).to_numpy(dtype="datetime64[ns]")
    except Exception:
        # Fallback: wrap in Series first (handles more edge cases)
        return pd.to_datetime(pd.Series(arr)).to_numpy(dtype="datetime64[ns]")


def _aggregate_group(
    df_group: pd.DataFrame,
    lat_col: str,
    lon_col: str,
    time_col: Optional[str],
    agg_other: str
) -> pd.Series:
    """
    Aggregate a cluster of trajectory points into a single representative point.

    This helper function takes a group of spatially/temporally close points and
    combines them based on the specified aggregation strategy. The geographic
    centroid (mean lat/lon) is always used for position, while other columns
    are handled according to the agg_other parameter.

    Parameters
    ----------
    df_group : pd.DataFrame
        DataFrame containing the points in this cluster.
    lat_col : str
        Name of the latitude column.
    lon_col : str
        Name of the longitude column.
    time_col : str or None
        Name of the time column (if present).
    agg_other : str
        Aggregation strategy for non-spatial columns: "first", "last", or "mean".

    Returns
    -------
    pd.Series
        A Series representing the aggregated point with all original columns.

    Notes
    -----
    The geographic centroid is computed as the simple arithmetic mean of lat/lon.
    For very large clusters spanning significant distances, this is an approximation.
    The mean timestamp is computed if time_col is provided.
    """
    # Always compute the geographic centroid (mean of lat/lon)
    vals = {
        lat_col: float(df_group[lat_col].astype(float).mean()),
        lon_col: float(df_group[lon_col].astype(float).mean())
    }

    # Handle timestamp aggregation by computing the mean time
    if (time_col is not None) and (time_col in df_group.columns):
        times = _to_ns_array(df_group[time_col])
        vals[time_col] = pd.to_datetime(str(np.datetime_as_string(_mean_timestamp_ns(times))))

    # Process remaining columns (those that aren't lat, lon, or time)
    skip_cols = {lat_col, lon_col, time_col}
    other_cols = [c for c in df_group.columns if c not in skip_cols]

    # Apply the aggregation strategy to other columns
    if agg_other == "first":
        # Use values from the first point in the cluster
        first_row = df_group.iloc[0]
        for c in other_cols:
            vals[c] = first_row[c]
    elif agg_other == "last":
        # Use values from the last point in the cluster
        last_row = df_group.iloc[-1]
        for c in other_cols:
            vals[c] = last_row[c]
    elif agg_other == "mean":
        # Average numeric columns, use first value for non-numeric
        first_row = df_group.iloc[0]
        for c in other_cols:
            if pd.api.types.is_numeric_dtype(df_group[c].dtype):
                vals[c] = df_group[c].astype(float).mean()
            else:
                vals[c] = first_row[c]
    else:
        # Default to 'first' strategy for unknown values
        first_row = df_group.iloc[0]
        for c in other_cols:
            vals[c] = first_row[c]

    return pd.Series(vals)


# ======================== Main Compression Function ========================


[docs] def spatio_temporal_compress( df: Union[pd.DataFrame, pl.DataFrame], spatial_radius_km: float = 0.1, lat_col: str = "lat", lon_col: str = "lon", time_col: Optional[str] = "time", collapse_clusters: bool = True, time_threshold_s: Optional[float] = None, agg_other: Literal["first", "last", "mean"] = "first", min_samples: int = 1, drop_noise: bool = False, use_aeqd: bool = True, final_geodetic_check: bool = False, geodetic_check_tol_m: float = 0.0, ) -> Union[pd.DataFrame, pl.DataFrame]: """ Compress GPS trajectory by clustering nearby points using DBSCAN-style spatial clustering. This function reduces the number of points in a trajectory while preserving its essential shape by identifying and merging clusters of points that are close together in space (and optionally time). It uses a KD-tree for efficient O(n log n) neighbor searches and can optionally use an Azimuthal Equidistant (AEQD) projection for sub-meter distance accuracy. The compression algorithm works in several stages: 1. Project coordinates to a local metric system (AEQD centered on trajectory centroid) 2. Build a KD-tree for fast spatial range queries 3. Find clusters of nearby points using DBSCAN 4. Optionally filter clusters by temporal proximity 5. Aggregate or select representatives from each cluster Parameters ---------- df : pd.DataFrame or pl.DataFrame Input trajectory with latitude, longitude, and optionally time columns. Must contain at least lat_col and lon_col columns. spatial_radius_km : float, default=0.1 Clustering radius in kilometers (converted to meters internally). Points within this distance are considered spatial neighbors. Typical values: 0.01-0.5 km for urban trajectories. lat_col : str, default="lat" Name of the latitude column (WGS84 decimal degrees). lon_col : str, default="lon" Name of the longitude column (WGS84 decimal degrees). time_col : str or None, default="time" Name of the time column. If None, no temporal filtering is applied. If provided, must be parseable by pandas.to_datetime(). collapse_clusters : bool, default=True How to represent each cluster in the output: - True: Merge all cluster points into a single point (geographic centroid) - False: Keep one representative point from each cluster (the one with lowest index) time_threshold_s : float or None, default=None Temporal clustering threshold in seconds. If provided, points must be within this time window AND spatial_radius_km to be clustered together. Useful for separating overlapping paths taken at different times. agg_other : {"first", "last", "mean"}, default="first" How to aggregate non-spatial columns when collapse_clusters=True: - "first": Use values from the first point in the cluster - "last": Use values from the last point in the cluster - "mean": Average numeric columns, use first value for non-numeric min_samples : int, default=1 Minimum number of neighbors (including self) required to form a core point. Similar to DBSCAN's min_samples parameter. Higher values filter out isolated points more aggressively. drop_noise : bool, default=False If True, drop points that don't belong to any cluster (noise points). If False, keep noise points as singleton clusters. use_aeqd : bool, default=True If True, use Azimuthal Equidistant projection centered on trajectory centroid for accurate distance calculations. If False or if AEQD fails, fallback to Web Mercator (EPSG:3857). AEQD is recommended for accurate compression. final_geodetic_check : bool, default=False If True, verify each cluster using geodesic distances (Geod.inv) after the initial projection-based clustering. Slower but provides additional accuracy verification. Usually not needed with use_aeqd=True. geodetic_check_tol_m : float, default=0.0 Tolerance in meters for the final geodetic check. Cluster members must be within spatial_radius_km + geodetic_check_tol_m of the cluster centroid. Only used if final_geodetic_check=True. Returns ------- pd.DataFrame or pl.DataFrame Compressed trajectory with the same type and column structure as input. The order of rows may differ from the input. The number of rows will be less than or equal to the input (depending on clustering results). Raises ------ ValueError If lat_col or lon_col are not present in the DataFrame. Examples -------- >>> import pandas as pd >>> import pyehicle as pye >>> >>> # Load a dense GPS trajectory >>> df = pd.read_csv('dense_trajectory.csv') # 1000 points >>> print(f"Original points: {len(df)}") >>> >>> # Simple compression with 100m radius >>> compressed = pye.preprocessing.spatio_temporal_compress( ... df, ... spatial_radius_km=0.1, # 100 meters ... collapse_clusters=True ... ) >>> print(f"Compressed points: {len(compressed)}") # ~200 points >>> >>> # Spatio-temporal compression (separate paths at different times) >>> compressed_st = pye.preprocessing.spatio_temporal_compress( ... df, ... spatial_radius_km=0.05, # 50 meters ... time_threshold_s=300, # 5 minutes ... collapse_clusters=True ... ) >>> >>> # Aggressive compression: drop isolated points, larger radius >>> aggressive = pye.preprocessing.spatio_temporal_compress( ... df, ... spatial_radius_km=0.2, # 200 meters ... min_samples=3, # Need at least 3 neighbors ... drop_noise=True, # Remove isolated points ... agg_other="mean" # Average other columns ... ) Notes ----- **Algorithm Details:** The function implements a DBSCAN-like clustering algorithm optimized for GPS data: 1. **Projection**: Coordinates are projected to a local metric system for accurate distance calculations. By default, uses AEQD (Azimuthal Equidistant) projection centered on the trajectory's geographic centroid. This ensures distances are accurate within a few meters across the entire trajectory. 2. **Spatial Indexing**: A KD-tree is built on the projected coordinates for O(log n) neighbor queries. Without scipy, falls back to O(n²) brute force. 3. **Clustering**: DBSCAN algorithm identifies clusters of spatially connected points: - Core points: Points with ≥ min_samples neighbors within spatial_radius_km - Cluster formation: Core points and their neighbors form clusters - Noise: Points that don't belong to any cluster (if not dropped) 4. **Temporal Filtering**: If time_threshold_s is provided, neighbor relationships are filtered to only include points within the time window. This prevents clustering of points from different trips on the same path. 5. **Aggregation**: Clusters are represented either by their geographic centroid (collapse_clusters=True) or by selecting one representative point. **Performance:** - With scipy (cKDTree): O(n log n) - fast even for large trajectories - Without scipy: O(n²) - acceptable for <10,000 points, slow for larger datasets - AEQD projection: ~0.1ms per trajectory (one-time cost) - Geodetic check: O(n) extra cost per cluster (usually not needed) **Accuracy:** - AEQD projection: Sub-meter accuracy for distances up to ~1000 km from center - Web Mercator: Acceptable for small spatial_radius (<1 km) at mid-latitudes - Geodetic check: Verifies clusters using true geodesic distances (slowest but most accurate) **Typical Use Cases:** - Reduce storage size of dense GPS logs (e.g., 1 Hz sampling → 0.1 Hz effective) - Preprocessing for map-matching (remove redundant points) - Noise reduction in stationary periods (e.g., at traffic lights) - Simplify trajectory for visualization **Warnings:** - If scipy is not installed, a warning is issued and O(n²) fallback is used - If AEQD projection fails, a warning is issued and Web Mercator is used - Very large spatial_radius_km (>5 km) may produce unexpected results - Very small min_samples (<1) will be clamped to 1 """ # Initialize geodesic calculator for WGS84 ellipsoid geod = Geod(ellps="WGS84") radius_m = float(spatial_radius_km) * 1000.0 # Convert km to meters # ========== Input Validation and Setup ========== # Handle empty DataFrame (return early to avoid unnecessary processing) if isinstance(df, pd.DataFrame): if df.shape[0] == 0: return df.copy() else: # polars DataFrame if len(df) == 0: return df.clone() # Convert to pandas for processing (convert back at the end if needed) input_was_polars = isinstance(df, pl.DataFrame) pdf = df.to_pandas() if input_was_polars else df.copy() # Verify required columns exist if lat_col not in pdf.columns or lon_col not in pdf.columns: raise ValueError("lat_col and lon_col must exist in the DataFrame") # Extract coordinate arrays for processing lats = pdf[lat_col].to_numpy(dtype=float) lons = pdf[lon_col].to_numpy(dtype=float) n = len(lats) # ========== Temporal Data Setup ========== # Convert time column to nanoseconds if temporal filtering is enabled if (time_col is not None) and (time_col in pdf.columns) and (time_threshold_s is not None): times = _to_ns_array(pdf[time_col]) time_thresh_ns = np.timedelta64(int(round(time_threshold_s * 1e9)), "ns") t_ints = times.astype("datetime64[ns]").astype("int64") else: # No temporal filtering times = None time_thresh_ns = None t_ints = None # ========== Coordinate Projection ========== # Project lat/lon to a local metric coordinate system for accurate distance calculations # AEQD (Azimuthal Equidistant) centered on trajectory centroid is optimal for this use_local_aeqd = bool(use_aeqd) transformer = None xs = ys = None if use_local_aeqd: try: # Calculate trajectory centroid as projection center cen_lat = float(np.mean(lats)) cen_lon = float(np.mean(lons)) # Construct AEQD projection string centered at centroid # +lat_0, +lon_0: center point # +datum=WGS84: use WGS84 ellipsoid # +units=m: distances in meters aeqd_proj = f"+proj=aeqd +lat_0={cen_lat:.9f} +lon_0={cen_lon:.9f} +datum=WGS84 +units=m +no_defs" # Create transformer from WGS84 (EPSG:4326) to AEQD # always_xy=True ensures (lon, lat) order for transform() transformer = Transformer.from_crs("EPSG:4326", aeqd_proj, always_xy=True) # Transform all coordinates to AEQD projected space xs, ys = transformer.transform(lons, lats) except Exception: # AEQD failed (rare), fallback to Web Mercator warnings.warn("AEQD projection failed; falling back to EPSG:3857 projection.") transformer = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True) xs, ys = transformer.transform(lons, lats) else: # User disabled AEQD, use Web Mercator (less accurate at high latitudes) transformer = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True) xs, ys = transformer.transform(lons, lats) # ========== Spatial Neighbor Search ========== # Build neighbor lists using KD-tree for O(n log n) performance if _have_kdtree: # Fast path: scipy's cKDTree (C implementation) # Build tree on projected coordinates tree = KDTree(np.column_stack([xs, ys])) # Find all neighbors within radius_m for each point # Returns list of arrays, where neighbors[i] contains indices of points # within radius_m of point i (including i itself) neighbors = tree.query_ball_point(np.column_stack([xs, ys]), r=radius_m) else: # Slow path: O(n²) brute force neighbor search # Issue warning since this is significantly slower for large datasets warnings.warn( "scipy.spatial.cKDTree not available: falling back to slower O(n^2) neighbor search. " "Install scipy for much better performance." ) neighbors = [] r2 = radius_m * radius_m # Use squared distance for faster comparisons # For each point, find all neighbors by checking distances for i in range(n): dx = xs - xs[i] # X-distance to all points dy = ys - ys[i] # Y-distance to all points d2 = dx * dx + dy * dy # Squared Euclidean distance # Find indices where squared distance <= radius² neighbors.append(list(np.nonzero(d2 <= r2)[0])) # ========== Temporal Filtering (Optional) ========== # If time threshold specified, filter neighbor lists by temporal proximity if (times is not None) and (time_thresh_ns is not None): time_thresh_int = int(time_thresh_ns.astype("timedelta64[ns]").astype("int64")) t_ints_int64 = t_ints.astype("int64") # For each point, keep only neighbors within time window for i in range(n): neigh_idx = np.array(neighbors[i], dtype=int) if neigh_idx.size == 0: continue # Calculate absolute time differences (in nanoseconds) diffs_ns = np.abs(t_ints_int64[neigh_idx] - t_ints_int64[i]) # Keep only neighbors within time threshold mask = diffs_ns <= time_thresh_int neighbors[i] = neigh_idx[mask].tolist() # ========== DBSCAN Clustering ========== # Run DBSCAN clustering algorithm to identify groups of connected points visited = np.zeros(n, dtype=bool) # Track which points have been visited labels = -np.ones(n, dtype=int) # Cluster labels (-1 = noise/unclustered) cluster_id = 0 # Current cluster ID min_samples_thresh = max(1, min_samples) # Ensure at least 1 sample required # Iterate through each point and expand clusters for i in range(n): if visited[i]: continue # Skip already-visited points visited[i] = True neigh = np.array(neighbors[i], dtype=int) # Check if this is a core point (has enough neighbors) if neigh.size < min_samples_thresh: continue # Not a core point, leave as noise (-1) # Start a new cluster with this core point labels[i] = cluster_id # Initialize cluster expansion queue with this point's neighbors seed = [int(j) for j in neigh if j != i] # Expand cluster by visiting all connected neighbors (breadth-first search) while seed: j = seed.pop() if not visited[j]: visited[j] = True neigh_j = np.array(neighbors[j], dtype=int) # If this neighbor is also a core point, add its neighbors to expansion queue if neigh_j.size >= min_samples_thresh: for nb in neigh_j: if not visited[nb]: seed.append(int(nb)) # Add this point to current cluster if it's not already assigned if labels[j] == -1: labels[j] = cluster_id # Move to next cluster ID cluster_id += 1 # ========== Group Formation ========== # Organize points into groups based on cluster labels groups = {} for idx, lab in enumerate(labels): if lab == -1: # Noise point (not in any cluster) if drop_noise: continue # Skip noise points if drop_noise=True else: # Keep noise as singleton clusters with unique keys key = f"noise_{idx}" groups.setdefault(key, []).append(idx) else: # Regular cluster point groups.setdefault(int(lab), []).append(idx) # ========== Geodetic Distance Verification (Optional) ========== # Optionally verify clusters using true geodesic distances # This is slower but provides additional accuracy verification # Usually not needed with AEQD projection if final_geodetic_check and len(groups) > 0: tol = float(geodetic_check_tol_m) radius_tol = radius_m + tol # Allow small tolerance for numerical errors new_groups = {} for lab_key, idxs in groups.items(): idxs_arr = np.array(idxs, dtype=int) # Get coordinates for this cluster group_lats = lats[idxs_arr] group_lons = lons[idxs_arr] # Calculate cluster centroid in geographic coordinates cen_lat = float(group_lats.mean()) cen_lon = float(group_lons.mean()) # Use pyproj Geod to calculate geodesic distances from centroid # Returns: forward azimuth, back azimuth, distance (meters) _, _, dists = geod.inv( np.full(len(idxs_arr), cen_lon), # From centroid lon np.full(len(idxs_arr), cen_lat), # From centroid lat group_lons, # To each point lon group_lats, # To each point lat ) # Keep only points within tolerance keep_mask = dists <= radius_tol kept = idxs_arr[keep_mask] removed = idxs_arr[~keep_mask] # Reorganize clusters based on geodetic check results if kept.size == 0: # All points failed check - make them noise if not dropping if not drop_noise: for rm in removed: new_groups.setdefault(f"noise_{rm}", []).append(int(rm)) else: # Keep the valid points in this cluster new_groups.setdefault(lab_key, []).extend(kept.tolist()) # Handle removed points as noise if removed.size > 0 and not drop_noise: for rm in removed: new_groups.setdefault(f"noise_{rm}", []).append(int(rm)) groups = new_groups # ========== Create Compressed Trajectory ========== # Generate final compressed trajectory from clusters rows = [] for lab_key, idxs in groups.items(): if not idxs: continue # Skip empty groups idxs_arr = np.array(idxs, dtype=int) if collapse_clusters: # Aggregate all points in cluster into a single representative point # Uses geographic centroid for lat/lon, mean time, and agg_other strategy for rest group_df = pdf.iloc[idxs_arr] row = _aggregate_group(group_df, lat_col, lon_col, time_col, agg_other) rows.append(row) else: # Keep one representative point from each cluster (the one with lowest index) # This preserves original point data without aggregation rep_idx = int(idxs_arr.min()) rows.append(pdf.iloc[rep_idx]) # Build result DataFrame from aggregated/selected rows result_pdf = pd.DataFrame(rows).reset_index(drop=True) if rows else pd.DataFrame(columns=pdf.columns) # ========== Finalization ========== # Ensure all original columns are present with proper types for c in pdf.columns: if c not in result_pdf.columns: result_pdf[c] = pd.NA # Ensure time column has correct datetime type if (time_col is not None) and (time_col in pdf.columns) and (time_col in result_pdf.columns): result_pdf[time_col] = pd.to_datetime(result_pdf[time_col]) # Reorder columns to match input DataFrame result_pdf = result_pdf[pdf.columns] # Return in the same DataFrame type as the input return pl.from_pandas(result_pdf) if input_was_polars else result_pdf