Source code for craterpy.helper

"""This file contains various helper functions for craterpy"""

import contextlib
import warnings
from pathlib import Path

import antimeridian
import joblib
import numpy as np
import pandas as pd
import pyproj
import rasterio as rio
from joblib import Parallel, delayed
from planetarypy.crs import local_crs
from planetarypy.geo import split_at_antimeridian
from rasterstats import zonal_stats
from scipy.spatial import cKDTree
from shapely.geometry import MultiPolygon, Point, Polygon
from shapely.ops import transform
from tqdm.autonotebook import tqdm

# Suppress warnings
warnings.filterwarnings("ignore", category=antimeridian.FixWindingWarning)
warnings.filterwarnings("ignore", "Setting nodata.*", module=r".*rasterstats")
warnings.filterwarnings("ignore", "Setting masked.*", module=r".*rasterstats")
warnings.filterwarnings("ignore", ".*lose important proj.*", module=r".*pyproj")


# Geospatial helpers
[docs]def bbox2extent(bbox): """Convert rasterio/geopandas bounding box to matplotlib extent. Parameters ---------- bbox : array-like of float Bounding box in the form (minx, miny, maxx, maxy). Returns ------- tuple of float Extent in the form (xmin, xmax, ymin, ymax). """ return bbox[0], bbox[2], bbox[1], bbox[3]
[docs]def lon360(lon): """Return longitude in range [0, 360). Parameters ---------- lon : float or array-like Longitude values in degrees. Returns ------- float or array-like Corresponding longitude values in the [0, 360] range. """ return (lon + 360) % 360
[docs]def lon180(lon): """Return longitude in range (-180, 180]. Parameters ---------- lon : float or array-like Longitude values in degrees. Returns ------- float or array-like Corresponding longitude values in the (-180, 180] range. """ return ((lon + 180) % 360) - 180
[docs]def deg2pix(degrees, ppd): """Return degrees converted to pixels at ppd pixels/degree. Parameters ---------- degrees : float Angular distance in degrees. ppd : float Pixels per degree resolution. Returns ------- int Equivalent number of pixels. """ return int(degrees * ppd)
[docs]def get_ind(value, array): """Return closest index of a value from array. Parameters ---------- value : float Target value to find. array : array-like Array of values. Returns ------- int Index of the nearest array element to `value`. """ ind = np.abs(array - value).argmin() return int(ind)
[docs]def km2deg(dist, mpp, ppd): """Return dist converted from kilometers to degrees. Parameters ---------- dist : float Distance in kilometers. mpp : float Meters per pixel. ppd : float Pixels per degree. Returns ------- float Equivalent angular distance in degrees. """ return 1000 * dist / (mpp * ppd)
[docs]def km2pix(dist, mpp): """Return dist converted from kilometers to pixels Parameters ---------- dist : float Distance in kilometers. mpp : float Meters per pixel. Returns ------- int Equivalent number of pixels. """ return int(1000 * dist / mpp)
[docs]def greatcircdist(lat1, lon1, lat2, lon2, radius): """Return great circle distance between two points on a spherical body. Uses Haversine formula for great circle distances. Parameters ---------- lat1 : float Latitude of the first point in degrees. lon1 : float Longitude of the first point in degrees. lat2 : float Latitude of the second point in degrees. lon2 : float Longitude of the second point in degrees. radius : float Radius of the sphere (same units for output). Returns ------- float Great circle distance between the points. Examples -------- >>> greatcircdist(36.12, -86.67, 33.94, -118.40, 6372.8) 2887.259950607111 """ # Convert degrees to radians lat1, lon1, lat2, lon2 = np.radians([lat1, lon1, lat2, lon2]) # Haversine dlat, dlon = abs(lat2 - lat1), abs(lon2 - lon1) a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2 theta = 2 * np.arcsin(np.sqrt(a)) return radius * theta
[docs]def inglobal(lat, lon): """True if lat and lon within global coordinates. Default coords: lat in (-90, 90) and lon in (-180, 180). mode='pos': lat in (-90, 90) and lon in (0, 360). Parameters ---------- lat : float Latitude in degrees. lon : float Longitude in degrees. Returns ------- bool True if latitude is between -90 and 90 and longitude normalized to 0-360. Examples -------- >>> inglobal(0, 0) True >>> inglobal(91, 0) False >>> inglobal(0, -50) True """ return (-90 <= lat <= 90) and (0 <= lon360(lon) <= 360)
# Shape geometry helpers
[docs]def gen_annuli(centers: list, rads: list, inner: float, outer: float, crs, **kwargs): """ Generate annular polygons for a set of crater centers using a local azimuthal equidistant projection. Each annulus is generated by projecting to a local coordinate system, buffering, and then transforming back to geodetic coordinates. Processes in parallel with joblib. Parameters ---------- centers : list of shapely.geometry.Point List of crater center points (latitude/longitude). rads : list of float List of crater radii (in projection units). inner : float Inner radius scaling factor (relative to crater radius). outer : float Outer radius scaling factor (relative to crater radius). crs : pyproj.CRS Geodetic coordinate reference system for output polygons. **kwargs Additional keyword arguments passed to annulus creation (e.g., n_jobs, nvert). Returns ------- list of shapely.geometry.Polygon List of annular polygons in geodetic coordinates. """ # Use joblib to run _create_single_annulus on all available CPU cores minus 1. n_jobs = kwargs.get("n_jobs", -2) tasks = ( delayed(create_single_annulus)(center, rad, inner, outer, crs, **kwargs) for center, rad in zip(centers, rads, strict=True) ) with tqdm_joblib(tqdm(desc="Generating polygons", total=len(centers))): return Parallel(n_jobs=n_jobs, return_as="list")(tasks)
[docs]def create_single_annulus( center: Point, rad: float, inner: float, outer: float, geodetic_crs, **kwargs ) -> Polygon | MultiPolygon: """ Create a geodetic annulus polygon for a single crater center. Projects the center to a local azimuthal equidistant CRS, generates an annular buffer, and transforms the result back to the geodetic CRS. Handles antimeridian wrapping if needed. Parameters ---------- center : shapely.geometry.Point Crater center point (latitude/longitude). rad : float Crater radius in projection units. inner : float Inner radius scaling factor (relative to crater radius). outer : float Outer radius scaling factor (relative to crater radius). geodetic_crs : pyproj.CRS Geodetic coordinate reference system for output polygon. **kwargs Additional keyword arguments for buffer creation. Returns ------- shapely.geometry.Polygon Annulus polygon in geodetic coordinates. """ # Build a local azimuthal equidistant projection centered on the crater, # delegating CRS construction to planetarypy (single IAU source of truth). local = local_crs(center.x, center.y, _naif_from_crs(geodetic_crs)) # Generate the buffer in the local, flat projection buf = get_annular_buffer(Point(0, 0), rad, inner, outer, **kwargs) # Create transformer and unproject the buffer to the geodetic CRS to_geodetic = pyproj.Transformer.from_crs( local, geodetic_crs, always_xy=True ).transform annulus = transform(to_geodetic, buf) # Split/fix geometry that wraps the antimeridian or covers a pole. The span # gate keeps the common non-crossing case (including annuli with an inner # hole) on the untouched path; split_at_antimeridian rebuilds the exterior # only, so it must not run on geometry that doesn't actually wrap. if annulus.bounds[2] - annulus.bounds[0] >= 300: parts = split_at_antimeridian(list(annulus.exterior.coords)[:-1]) if parts: annulus = parts[0] if len(parts) == 1 else MultiPolygon(parts) return annulus
def _naif_from_crs(crs) -> int: """Return the NAIF id encoded in an IAU geodetic CRS (code = naif * 100).""" authority = pyproj.CRS(crs).to_authority() return int(authority[1]) // 100
[docs]def get_annular_buffer(point: Point, rad: float, inner: float, outer: float, **kwargs): """ Generate an annular buffer (ring-shaped polygon) around a point. The annulus is defined by an outer and inner radius, both scaled by the crater radius. The polygon is created in a projected (flat) coordinate system. Parameters ---------- point : shapely.geometry.Point Center point for the buffer (usually (0, 0) in local projection). rad : float Crater radius in projection units. inner : float Inner radius scaling factor (relative to crater radius). outer : float Outer radius scaling factor (relative to crater radius). **kwargs nvert : int, optional Number of vertices for the circular polygon (default: 32). Returns ------- shapely.geometry.Polygon Annular buffer polygon. """ # Num vertices to make each circular poly nvert = kwargs.get("nvert", 32) outer_scaled = rad * outer inner_scaled = rad * inner outer_poly = point.buffer(outer_scaled, quad_segs=nvert // 4) if inner_scaled <= 0: return outer_poly inner_poly = point.buffer(inner_scaled, quad_segs=nvert // 4) return outer_poly.difference(inner_poly)
[docs]def reproject_to_raster(geometries, raster): """Return geometries reprojected to the raster's CRS. rasterstats samples a raster using the geometry coordinates as-is and does not reproject, so geometries must share the raster's CRS. This is a no-op when the CRS already match or when either CRS is unknown. Parameters ---------- geometries : geopandas.GeoSeries GeoSeries with a defined ``.crs``. raster : str or rasterio.DatasetReader Raster file path or open raster dataset. Returns ------- geopandas.GeoSeries Geometries in the raster CRS (or unchanged if reprojection isn't needed). """ if hasattr(raster, "crs"): rcrs = raster.crs else: with rio.open(raster) as src: rcrs = src.crs if rcrs is None or geometries.crs is None: return geometries if pyproj.CRS.from_user_input(rcrs) == pyproj.CRS.from_user_input(geometries.crs): return geometries return geometries.to_crs(rcrs)
[docs]def compute_zonal_stats(geometries, raster, suffix: str = "", **kwargs) -> pd.DataFrame: """ Compute zonal stats (e.g., mean, std, count) for each geometry in the raster. See rasterstats.zonal_stats. Parameters ---------- geometries : geopandas.GeoSeries GeoSeries of geometry objects (e.g., shapely.Polygons). raster : str or rasterio.DatasetReader Raster file path or open raster dataset. suffix : str, optional Suffix to append to output column names (default: ""). **kwargs Additional keyword arguments to pass to rasterstats.zonal_stats (e.g., stats, nodata). Returns ------- pandas.DataFrame DataFrame containing computed statistics for each geometry. """ # rasterstats doesn't reproject; match the raster CRS first (e.g. projected rasters) geometries = reproject_to_raster(geometries, raster) # Perform the core calculation zstats = zonal_stats(geometries, raster, all_touched=True, **kwargs) df = pd.DataFrame(zstats, index=geometries.index) # Return only requested stat columns with suffix, if given if suffix and not suffix.startswith("_"): suffix = f"_{suffix}" return df.add_suffix(suffix)
def _prep_rasters_regions(rasters, regions, nodata): """Normalize rasters/regions/nodata into a {name: raster} dict, region list, nodata dict.""" def _name_raster(i, raster): return Path(raster).name if isinstance(raster, str) else f"raster{i}" if isinstance(rasters, dict): rdict = rasters elif isinstance(rasters, (tuple, list)): # Set up dict for column naming. Choose filename if str, else "raster{i}" (e.g., if given file descriptors) rdict = { _name_raster(i, raster): Path(raster).as_posix() for i, raster in enumerate(rasters) } else: rdict = {"_": rasters} if isinstance(regions, str): regions = [regions] if not isinstance(nodata, dict): nodata = dict.fromkeys(rdict, nodata) return rdict, regions, nodata
[docs]def get_stats( gdf, rasters, regions, stats=("mean", "std", "count"), nodata=None, n_jobs=-2, ): """ Compute zonal stats on polygons in a GeoDataFrame for all rasters and regions in parallel. Return a single DataFrame with columns <stat_raster_region> for all combinations. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon regions as columns. rasters : str, rasterio.DatasetReader, or dict of {str: str or rasterio.DatasetReader} Single raster path or open raster dataset, or a mapping of names to rasters. regions : str or list of str Name or list of names of geometry columns in gdf to use as regions. stats : tuple of str, optional Statistics to compute for each raster-region combination. Default: ("mean", "std", "count"). nodata : number, None, or dict of {str: number}, optional Nodata value to use for masking, or mapping of raster names to their nodata values. If None, no nodata masking is applied. n_jobs : int, optional Number of parallel worker processes to use. Default is 1. Returns ------- pandas.DataFrame DataFrame containing the original crater columns along with appended statistics columns for each raster and region combination. """ rdict, regions, nodata = _prep_rasters_regions(rasters, regions, nodata) total_tasks = len(rdict) * len(regions) # Create a list of tasks, where each task has all the info for one worker call tasks = [] for region_name in regions: for raster_name, raster_file in rdict.items(): tasks.append( delayed(compute_zonal_stats)( geometries=gdf[region_name], raster=raster_file, stats=stats, nodata=nodata.get(raster_name), # Safely get nodata value suffix=f"{raster_name}_{region_name}".strip("_"), ) ) # Compute in parallel with tqdm_joblib(tqdm(desc="Computing Zonal Stats", total=total_tasks)): all_stats = Parallel(n_jobs=n_jobs)(tasks) return pd.concat(all_stats, axis=1)
[docs]def compute_arrays(geometries, raster, suffix: str = "", **kwargs) -> pd.Series: """ Return the masked pixel array clipped to each geometry from the raster. See rasterstats.zonal_stats (raster_out=True). Parameters ---------- geometries : geopandas.GeoSeries GeoSeries of geometry objects (e.g., shapely.Polygons). raster : str or rasterio.DatasetReader Raster file path or open raster dataset. suffix : str, optional Name to give the returned Series (default: ""). **kwargs Additional keyword arguments to pass to rasterstats.zonal_stats (e.g., nodata). Returns ------- pandas.Series Series of numpy.ma.MaskedArray, one clipped raster window per geometry. """ # rasterstats doesn't reproject; match the raster CRS first (e.g. projected rasters) geometries = reproject_to_raster(geometries, raster) zstats = zonal_stats( geometries, raster, all_touched=True, raster_out=True, **kwargs ) arrays = [z["mini_raster_array"] for z in zstats] return pd.Series(arrays, index=geometries.index, name=suffix or None)
[docs]def get_arrays(gdf, rasters, regions, nodata=None, n_jobs=-2): """ Return the masked pixel arrays underlying zonal stats for all rasters and regions. Each cell holds the numpy.ma.MaskedArray clipped to one geometry, so callers can compute anything beyond the built-in zonal statistics. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon regions as columns. rasters : str, rasterio.DatasetReader, or dict of {str: str or rasterio.DatasetReader} Single raster path or open raster dataset, or a mapping of names to rasters. regions : str or list of str Name or list of names of geometry columns in gdf to use as regions. nodata : number, None, or dict of {str: number}, optional Nodata value to use for masking, or mapping of raster names to their nodata values. n_jobs : int, optional Number of parallel worker processes to use. Returns ------- pandas.DataFrame DataFrame with one column per raster-region combination; each cell is the masked pixel array clipped to that geometry. """ rdict, regions, nodata = _prep_rasters_regions(rasters, regions, nodata) total_tasks = len(rdict) * len(regions) tasks = [] for region_name in regions: for raster_name, raster_file in rdict.items(): tasks.append( delayed(compute_arrays)( geometries=gdf[region_name], raster=raster_file, nodata=nodata.get(raster_name), suffix=f"{raster_name}_{region_name}".strip("_"), ) ) with tqdm_joblib(tqdm(desc="Extracting Arrays", total=total_tasks)): all_arrays = Parallel(n_jobs=n_jobs)(tasks) return pd.concat(all_arrays, axis=1)
# Misc
[docs]@contextlib.contextmanager def tqdm_joblib(tqdm_object): """ Context manager to patch joblib to report into tqdm progress bar given as argument. See: https://stackoverflow.com/a/58936697 """ class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack): def __call__(self, *args, **kwargs): tqdm_object.update(n=self.batch_size) return super().__call__(*args, **kwargs) old_batch_callback = joblib.parallel.BatchCompletionCallBack joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback try: yield tqdm_object finally: joblib.parallel.BatchCompletionCallBack = old_batch_callback tqdm_object.close()
# DataFrame helpers
[docs]def findcol(df, names, exact=False): """Return first matching column in df matching a string given in names. Case insensitive, ignores whitespace. Raises ValueError if none found. Parameters ---------- df : pandas.DataFrame Dataframe object. names : str or list of str Names to check against columns in df. exact : bool Exact matches only (case-insensitive). Otherwise match on column substring (default: False). Returns ------- str Name of the first matching column. Raises ------ ValueError If no matching column is found. Examples -------- >>> df = pd.DataFrame({'Lat': [10, -20., 80.0], 'Lon': [14, -40.1, 317.2], 'Diam': [2, 12., 23.7]}) >>> findcol(df, ['Latitude', 'Lat']) 'Lat' >>> findcol(df, ['Radius']) Traceback (most recent call last): ... ValueError: No column containing ['Radius'] found. >>> findcol(df, 'diam') 'Diam' """ if isinstance(names, str): names = [names] # Ignore spaces, convert "(xyz)" to "_xyz" so that Ex. d_m will match d (m) # Note: can match with regex but then calling func needs to escape chars like () cols = df.columns.str.replace(" ", "") cols = cols.str.replace("(", "_", regex=False).str.replace(")", " ", regex=False) for name in names: if exact: matches = df.columns[cols.str.fullmatch(name, case=False)] else: matches = df.columns[cols.str.contains(name, case=False)] if any(matches): # Exit early at first match found return matches[0] raise ValueError(f"No column containing {names} found.")
[docs]def find_rad_or_diam_col(df): """Return the name of the radius or diameter column. Parameters ---------- df : pandas.DataFrame DataFrame to search for radius/diameter column names. Returns ------- str Name of the column representing radius or diameter. Raises ------ ValueError If no suitable column is found. """ # First search these names (case-insensitive) in order for exact matches names = "radius,diameter,rad,diam,r_km,d_km,r_m,d_m,r,d" try: col = findcol(df, names.split(","), exact=True) except ValueError: try: # Unlikely that inexact matches for r,d are correct so exclude names_contains = names.replace(",r,d", "") col = findcol(df, names_contains.split(","), exact=False) except ValueError: raise ValueError("Could not identify radius or diameter column.") from None return col
[docs]def latlon_to_cartesian(lat, lon, radius): """Convert lat, lon to cartesian (x, y, z) Parameters ---------- lat : float or array-like Latitude in degrees. lon : float or array-like Longitude in degrees. radius : float Radius of the sphere. Returns ------- tuple of array-like or float Cartesian coordinates (x, y, z). Examples -------- >>> latlon_to_cartesian(0, 0, 1) (1.0, 0.0, 0.0) """ lat, lon = np.radians(lat), np.radians(lon) x = radius * np.cos(lat) * np.cos(lon) y = radius * np.cos(lat) * np.sin(lon) z = radius * np.sin(lat) return x, y, z
[docs]def merge( df1, df2, rbody=1737.4e3, rtol=0.5, latcol="_lat", loncol="_lon", radcol="_radius_m", ): """Spatial merge of craters in df1 and df2. If a crater matches, keep lat, lon, rad and other col values from df1. If not a match, append to the end with lat, lon, rad from df2. Retains all columns from both dfs and fills in values if present. Parameters ---------- df1 : pandas.DataFrame Left dataframe whose values will be preserved for any matches df2 : pandas.DataFrame Right dataframe to merge rbody : float Radius of the body in meters (default: Moon's radius) latcol : str Name of latitude column (default: 'lat') loncol : str Name of longitude column (default: 'lon') radcol : str Name of radius column (default: 'rad') rtol : float Relative tolerance for radius matching (default: 0.5) Returns ------- pandas.DataFrame Merged dataframe with lat/lon/rad from df1 """ def match(result, df2_rad): """Return matching index if radii differ by less than rtol.""" if result: for match in result: df1_rad = df1[radcol].iloc[match] if (df2_rad - df1_rad) / df1_rad < rtol: return match return np.nan coords1 = list( zip(*latlon_to_cartesian(df1[latcol], df1[loncol], rbody), strict=True) ) coords2 = list( zip(*latlon_to_cartesian(df2[latcol], df2[loncol], rbody), strict=True) ) tree = cKDTree(coords1) results = tree.query_ball_point(coords2, r=df2[radcol]) df2["df1_idx"] = np.nan for (i, row), result in zip(df2.iterrows(), results, strict=True): df2.at[i, "df1_idx"] = match(result, row[radcol]) # Append non-matching rows (includes columns from both df1 and df2) merged = pd.concat([df1, df2[df2.df1_idx.isna()]], ignore_index=True) # Find and add missing data from columns unique to df2 to the matched rows df2_uniq_cols = list(set(df2.columns) - set(df1.columns)) df2_match_idx = df2.df1_idx.dropna().values merged.loc[df2_match_idx, df2_uniq_cols] = df2.loc[ ~df2.df1_idx.isna(), df2_uniq_cols ].values merged.drop(columns="df1_idx", inplace=True) return merged.reset_index(drop=True)