Source code for craterpy.helper

"""This file contains various helper functions for craterpy"""

import contextlib
import warnings
from pathlib import Path

import antimeridian
import joblib
import numpy as np
import pandas as pd
import pyproj
from joblib import Parallel, delayed
from pyproj.crs import ProjectedCRS
from pyproj.crs.coordinate_operation import AzimuthalEquidistantConversion
from rasterstats import zonal_stats
from scipy.spatial import cKDTree
from shapely.geometry import MultiPolygon, Point, Polygon
from shapely.ops import transform
from tqdm.autonotebook import tqdm

# Suppress warnings
warnings.filterwarnings("ignore", category=antimeridian.FixWindingWarning)
warnings.filterwarnings("ignore", "Setting nodata.*", module=r".*rasterstats")
warnings.filterwarnings("ignore", "Setting masked.*", module=r".*rasterstats")
warnings.filterwarnings("ignore", ".*lose important proj.*", module=r".*pyproj")


# Geospatial helpers
[docs]def bbox2extent(bbox): """Convert rasterio/geopandas bounding box to matplotlib extent. Parameters ---------- bbox : array-like of float Bounding box in the form (minx, miny, maxx, maxy). Returns ------- tuple of float Extent in the form (xmin, xmax, ymin, ymax). """ return bbox[0], bbox[2], bbox[1], bbox[3]
[docs]def lon360(lon): """Return longitude in range [0, 360). Parameters ---------- lon : float or array-like Longitude values in degrees. Returns ------- float or array-like Corresponding longitude values in the [0, 360] range. """ return (lon + 360) % 360
[docs]def lon180(lon): """Return longitude in range (-180, 180]. Parameters ---------- lon : float or array-like Longitude values in degrees. Returns ------- float or array-like Corresponding longitude values in the (-180, 180] range. """ return ((lon + 180) % 360) - 180
[docs]def deg2pix(degrees, ppd): """Return degrees converted to pixels at ppd pixels/degree. Parameters ---------- degrees : float Angular distance in degrees. ppd : float Pixels per degree resolution. Returns ------- int Equivalent number of pixels. """ return int(degrees * ppd)
[docs]def get_ind(value, array): """Return closest index of a value from array. Parameters ---------- value : float Target value to find. array : array-like Array of values. Returns ------- int Index of the nearest array element to `value`. """ ind = np.abs(array - value).argmin() return int(ind)
[docs]def km2deg(dist, mpp, ppd): """Return dist converted from kilometers to degrees. Parameters ---------- dist : float Distance in kilometers. mpp : float Meters per pixel. ppd : float Pixels per degree. Returns ------- float Equivalent angular distance in degrees. """ return 1000 * dist / (mpp * ppd)
[docs]def km2pix(dist, mpp): """Return dist converted from kilometers to pixels Parameters ---------- dist : float Distance in kilometers. mpp : float Meters per pixel. Returns ------- int Equivalent number of pixels. """ return int(1000 * dist / mpp)
[docs]def greatcircdist(lat1, lon1, lat2, lon2, radius): """Return great circle distance between two points on a spherical body. Uses Haversine formula for great circle distances. Parameters ---------- lat1 : float Latitude of the first point in degrees. lon1 : float Longitude of the first point in degrees. lat2 : float Latitude of the second point in degrees. lon2 : float Longitude of the second point in degrees. radius : float Radius of the sphere (same units for output). Returns ------- float Great circle distance between the points. Examples -------- >>> greatcircdist(36.12, -86.67, 33.94, -118.40, 6372.8) 2887.259950607111 """ # Convert degrees to radians lat1, lon1, lat2, lon2 = np.radians([lat1, lon1, lat2, lon2]) # Haversine dlat, dlon = abs(lat2 - lat1), abs(lon2 - lon1) a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2 theta = 2 * np.arcsin(np.sqrt(a)) return radius * theta
[docs]def inglobal(lat, lon): """True if lat and lon within global coordinates. Default coords: lat in (-90, 90) and lon in (-180, 180). mode='pos': lat in (-90, 90) and lon in (0, 360). Parameters ---------- lat : float Latitude in degrees. lon : float Longitude in degrees. Returns ------- bool True if latitude is between -90 and 90 and longitude normalized to 0-360. Examples -------- >>> inglobal(0, 0) True >>> inglobal(91, 0) False >>> inglobal(0, -50) True """ return (-90 <= lat <= 90) and (0 <= lon360(lon) <= 360)
# Shape geometry helpers
[docs]def gen_annuli(centers: list, rads: list, inner: float, outer: float, crs, **kwargs): """ Generate annular polygons for a set of crater centers using a local azimuthal equidistant projection. Each annulus is generated by projecting to a local coordinate system, buffering, and then transforming back to geodetic coordinates. Processes in parallel with joblib. Parameters ---------- centers : list of shapely.geometry.Point List of crater center points (latitude/longitude). rads : list of float List of crater radii (in projection units). inner : float Inner radius scaling factor (relative to crater radius). outer : float Outer radius scaling factor (relative to crater radius). crs : pyproj.CRS Geodetic coordinate reference system for output polygons. **kwargs Additional keyword arguments passed to annulus creation (e.g., n_jobs, nvert). Returns ------- list of shapely.geometry.Polygon List of annular polygons in geodetic coordinates. """ # Use joblib to run _create_single_annulus on all available CPU cores minus 1. n_jobs = kwargs.get("n_jobs", -2) tasks = ( delayed(create_single_annulus)(center, rad, inner, outer, crs, **kwargs) for center, rad in zip(centers, rads, strict=True) ) with tqdm_joblib(tqdm(desc="Generating polygons", total=len(centers))): return Parallel(n_jobs=n_jobs, return_as="list")(tasks)
[docs]def create_single_annulus( center: Point, rad: float, inner: float, outer: float, geodetic_crs, **kwargs ) -> Polygon | MultiPolygon: """ Create a geodetic annulus polygon for a single crater center. Projects the center to a local azimuthal equidistant CRS, generates an annular buffer, and transforms the result back to the geodetic CRS. Handles antimeridian wrapping if needed. Parameters ---------- center : shapely.geometry.Point Crater center point (latitude/longitude). rad : float Crater radius in projection units. inner : float Inner radius scaling factor (relative to crater radius). outer : float Outer radius scaling factor (relative to crater radius). geodetic_crs : pyproj.CRS Geodetic coordinate reference system for output polygon. **kwargs Additional keyword arguments for buffer creation. Returns ------- shapely.geometry.Polygon Annulus polygon in geodetic coordinates. """ # Create a local azimuthal equidistant projection for the crater local_crs = ProjectedCRS( name=f"AzimuthalEquidistant({center.y:.2f}N, {center.x:.2f}E)", conversion=AzimuthalEquidistantConversion(center.y, center.x), geodetic_crs=geodetic_crs, ) # Generate the buffer in the local, flat projection # Assumes self._get_annular_buffer is available or moved here buf = get_annular_buffer(Point(0, 0), rad, inner, outer, **kwargs) # Create transformer and unproject the buffer to the geodetic CRS to_geodetic = pyproj.Transformer.from_crs( local_crs, geodetic_crs, always_xy=True ).transform annulus = transform(to_geodetic, buf) # Fix antimeridian wrapping if necessary if annulus.bounds[2] - annulus.bounds[0] >= 300: annulus = fix_antimeridian_wrap( annulus, rad, outer, geodetic_crs, local_crs, **kwargs ) return annulus
[docs]def get_annular_buffer(point: Point, rad: float, inner: float, outer: float, **kwargs): """ Generate an annular buffer (ring-shaped polygon) around a point. The annulus is defined by an outer and inner radius, both scaled by the crater radius. The polygon is created in a projected (flat) coordinate system. Parameters ---------- point : shapely.geometry.Point Center point for the buffer (usually (0, 0) in local projection). rad : float Crater radius in projection units. inner : float Inner radius scaling factor (relative to crater radius). outer : float Outer radius scaling factor (relative to crater radius). **kwargs nvert : int, optional Number of vertices for the circular polygon (default: 32). Returns ------- shapely.geometry.Polygon Annular buffer polygon. """ # Num vertices to make each circular poly nvert = kwargs.get("nvert", 32) outer_scaled = rad * outer inner_scaled = rad * inner outer_poly = point.buffer(outer_scaled, quad_segs=nvert // 4) if inner_scaled <= 0: return outer_poly inner_poly = point.buffer(inner_scaled, quad_segs=nvert // 4) return outer_poly.difference(inner_poly)
[docs]def fix_antimeridian_wrap( annulus, rad: float, outer: float, geodetic_crs, local_crs, **kwargs ) -> Polygon | MultiPolygon: """ Corrects polygons that wrap incorrectly across the antimeridian. Checks if the generated annulus covers the North or South pole and applies antimeridian fixing logic to ensure valid geometry. Parameters ---------- annulus : shapely.geometry.Polygon Annulus polygon in geodetic coordinates. rad : float Crater radius in projection units. outer : float Outer radius scaling factor (relative to crater radius). geodetic_crs : pyproj.CRS Geodetic coordinate reference system. local_crs : pyproj.CRS Local projected coordinate reference system. **kwargs Additional keyword arguments for buffer creation (e.g. nvert). Returns ------- shapely.geometry.Polygon Corrected polygon with proper antimeridian handling. """ to_projected = pyproj.Transformer.from_crs( geodetic_crs, local_crs, always_xy=True ).transform # Create a solid buffer (from radius 0 to outer) for pole checking solid_buffer = get_annular_buffer(Point(0, 0), rad, 0, outer, **kwargs) # Check if North or South pole is contained within the buffer covers_north_pole = solid_buffer.contains(transform(to_projected, Point(0, 90))) covers_south_pole = solid_buffer.contains(transform(to_projected, Point(0, -90))) return antimeridian.fix_polygon( annulus, force_north_pole=covers_north_pole, force_south_pole=covers_south_pole, )
[docs]def compute_zonal_stats(geometries, raster, suffix: str = "", **kwargs) -> pd.DataFrame: """ Compute zonal stats (e.g., mean, std, count) for each geometry in the raster. See rasterstats.zonal_stats. Parameters ---------- geometries : geopandas.GeoSeries GeoSeries of geometry objects (e.g., shapely.Polygons). raster : str or rasterio.DatasetReader Raster file path or open raster dataset. suffix : str, optional Suffix to append to output column names (default: ""). **kwargs Additional keyword arguments to pass to rasterstats.zonal_stats (e.g., stats, nodata). Returns ------- pandas.DataFrame DataFrame containing computed statistics for each geometry. """ # Perform the core calculation zstats = zonal_stats(geometries, raster, all_touched=True, **kwargs) df = pd.DataFrame(zstats, index=geometries.index) # Return only requested stat columns with suffix, if given if suffix and not suffix.startswith("_"): suffix = f"_{suffix}" return df.add_suffix(suffix)
[docs]def get_stats( gdf, rasters, regions, stats=("mean", "std", "count"), nodata=None, n_jobs=-2, ): """ Compute zonal stats on polygons in a GeoDataFrame for all rasters and regions in parallel. Return a single DataFrame with columns <stat_raster_region> for all combinations. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon regions as columns. rasters : str, rasterio.DatasetReader, or dict of {str: str or rasterio.DatasetReader} Single raster path or open raster dataset, or a mapping of names to rasters. regions : str or list of str Name or list of names of geometry columns in gdf to use as regions. stats : tuple of str, optional Statistics to compute for each raster-region combination. Default: ("mean", "std", "count"). nodata : number, None, or dict of {str: number}, optional Nodata value to use for masking, or mapping of raster names to their nodata values. If None, no nodata masking is applied. n_jobs : int, optional Number of parallel worker processes to use. Default is 1. Returns ------- pandas.DataFrame DataFrame containing the original crater columns along with appended statistics columns for each raster and region combination. """ def _name_raster(i, raster): return Path(raster).name if isinstance(raster, str) else f"raster{i}" if isinstance(rasters, dict): rdict = rasters elif isinstance(rasters, (tuple, list)): # Set up dict for column naming. Choose filename if str, else "raster{i}" (e.g., if given file descriptors) rdict = { _name_raster(i, raster): Path(raster).as_posix() for i, raster in enumerate(rasters) } else: rdict = {"_": rasters} if isinstance(regions, str): regions = [regions] if not isinstance(nodata, dict): nodata = dict.fromkeys(rdict, nodata) total_tasks = len(rdict) * len(regions) # Create a list of tasks, where each task has all the info for one worker call tasks = [] for region_name in regions: for raster_name, raster_file in rdict.items(): tasks.append( delayed(compute_zonal_stats)( geometries=gdf[region_name], raster=raster_file, stats=stats, nodata=nodata.get(raster_name), # Safely get nodata value suffix=f"{raster_name}_{region_name}".strip("_"), ) ) # Compute in parallel with tqdm_joblib(tqdm(desc="Computing Zonal Stats", total=total_tasks)): all_stats = Parallel(n_jobs=n_jobs)(tasks) return pd.concat(all_stats, axis=1)
# Misc
[docs]@contextlib.contextmanager def tqdm_joblib(tqdm_object): """ Context manager to patch joblib to report into tqdm progress bar given as argument. See: https://stackoverflow.com/a/58936697 """ class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack): def __call__(self, *args, **kwargs): tqdm_object.update(n=self.batch_size) return super().__call__(*args, **kwargs) old_batch_callback = joblib.parallel.BatchCompletionCallBack joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback try: yield tqdm_object finally: joblib.parallel.BatchCompletionCallBack = old_batch_callback tqdm_object.close()
# DataFrame helpers
[docs]def findcol(df, names, exact=False): """Return first matching column in df matching a string given in names. Case insensitive, ignores whitespace. Raises ValueError if none found. Parameters ---------- df : pandas.DataFrame Dataframe object. names : str or list of str Names to check against columns in df. exact : bool Exact matches only (case-insensitive). Otherwise match on column substring (default: False). Returns ------- str Name of the first matching column. Raises ------ ValueError If no matching column is found. Examples -------- >>> df = pd.DataFrame({'Lat': [10, -20., 80.0], 'Lon': [14, -40.1, 317.2], 'Diam': [2, 12., 23.7]}) >>> findcol(df, ['Latitude', 'Lat']) 'Lat' >>> findcol(df, ['Radius']) Traceback (most recent call last): ... ValueError: No column containing ['Radius'] found. >>> findcol(df, 'diam') 'Diam' """ if isinstance(names, str): names = [names] # Ignore spaces, convert "(xyz)" to "_xyz" so that Ex. d_m will match d (m) # Note: can match with regex but then calling func needs to escape chars like () cols = df.columns.str.replace(" ", "") cols = cols.str.replace("(", "_", regex=False).str.replace(")", " ", regex=False) for name in names: if exact: matches = df.columns[cols.str.fullmatch(name, case=False)] else: matches = df.columns[cols.str.contains(name, case=False)] if any(matches): # Exit early at first match found return matches[0] raise ValueError(f"No column containing {names} found.")
[docs]def find_rad_or_diam_col(df): """Return the name of the radius or diameter column. Parameters ---------- df : pandas.DataFrame DataFrame to search for radius/diameter column names. Returns ------- str Name of the column representing radius or diameter. Raises ------ ValueError If no suitable column is found. """ # First search these names (case-insensitive) in order for exact matches names = "radius,diameter,rad,diam,r_km,d_km,r_m,d_m,r,d" try: col = findcol(df, names.split(","), exact=True) except ValueError: try: # Unlikely that inexact matches for r,d are correct so exclude names_contains = names.replace(",r,d", "") col = findcol(df, names_contains.split(","), exact=False) except ValueError: raise ValueError("Could not identify radius or diameter column.") from None return col
[docs]def latlon_to_cartesian(lat, lon, radius): """Convert lat, lon to cartesian (x, y, z) Parameters ---------- lat : float or array-like Latitude in degrees. lon : float or array-like Longitude in degrees. radius : float Radius of the sphere. Returns ------- tuple of array-like or float Cartesian coordinates (x, y, z). Examples -------- >>> latlon_to_cartesian(0, 0, 1) (1.0, 0.0, 0.0) """ lat, lon = np.radians(lat), np.radians(lon) x = radius * np.cos(lat) * np.cos(lon) y = radius * np.cos(lat) * np.sin(lon) z = radius * np.sin(lat) return x, y, z
[docs]def merge( df1, df2, rbody=1737.4e3, rtol=0.5, latcol="_lat", loncol="_lon", radcol="_radius_m", ): """Spatial merge of craters in df1 and df2. If a crater matches, keep lat, lon, rad and other col values from df1. If not a match, append to the end with lat, lon, rad from df2. Retains all columns from both dfs and fills in values if present. Parameters ---------- df1 : pandas.DataFrame Left dataframe whose values will be preserved for any matches df2 : pandas.DataFrame Right dataframe to merge rbody : float Radius of the body in meters (default: Moon's radius) latcol : str Name of latitude column (default: 'lat') loncol : str Name of longitude column (default: 'lon') radcol : str Name of radius column (default: 'rad') rtol : float Relative tolerance for radius matching (default: 0.5) Returns ------- pandas.DataFrame Merged dataframe with lat/lon/rad from df1 """ def match(result, df2_rad): """Return matching index if radii differ by less than rtol.""" if result: for match in result: df1_rad = df1[radcol].iloc[match] if (df2_rad - df1_rad) / df1_rad < rtol: return match return np.nan coords1 = list( zip(*latlon_to_cartesian(df1[latcol], df1[loncol], rbody), strict=True) ) coords2 = list( zip(*latlon_to_cartesian(df2[latcol], df2[loncol], rbody), strict=True) ) tree = cKDTree(coords1) results = tree.query_ball_point(coords2, r=df2[radcol]) df2["df1_idx"] = np.nan for (i, row), result in zip(df2.iterrows(), results, strict=True): df2.at[i, "df1_idx"] = match(result, row[radcol]) # Append non-matching rows (includes columns from both df1 and df2) merged = pd.concat([df1, df2[df2.df1_idx.isna()]], ignore_index=True) # Find and add missing data from columns unique to df2 to the matched rows df2_uniq_cols = list(set(df2.columns) - set(df1.columns)) df2_match_idx = df2.df1_idx.dropna().values merged.loc[df2_match_idx, df2_uniq_cols] = df2.loc[ ~df2.df1_idx.isna(), df2_uniq_cols ].values merged.drop(columns="df1_idx", inplace=True) return merged.reset_index(drop=True)