Source code for coincident.search.wesm

"""
Spatial metadata for the 3D Elevation Program (3DEP) is available by work unit in the Work Unit Extent Spatial Metadata (WESM).
https://www.usgs.gov/ngp-standards-and-specifications/wesm-data-dictionary
"""

from __future__ import annotations

import os
from importlib import resources
from typing import Any

# import rustac
import pandas as pd
import pyogrio
import requests  # type: ignore[import-untyped]
from geopandas import GeoDataFrame, read_file
from pandas import Timedelta, Timestamp
from shapely.geometry import box

from coincident.datasets import usgs
from coincident.overlaps import subset_by_temporal_overlap

swath_polygon_csv = resources.files("coincident.search") / "swath_polygons.csv"

defaults = usgs.ThreeDEP()
wesm_gpkg_url = defaults.search


# For some reason credential handling of pyogrio is different in pip environment rather than conda-forge. if installed from pip, it seems rasterio.Env does not have effect, but using os.environ to temporary set GDAL ENV settings works in either environment…


class TemporaryEnvVars:
    def __init__(self, **kwargs: str) -> None:
        self.env_vars: dict[str, str] = kwargs
        self.old_values: dict[str, str | None] = {}

    def __enter__(self) -> None:
        # Save original values and set new ones
        for key, value in self.env_vars.items():
            self.old_values[key] = os.environ.get(key)
            os.environ[key] = str(value)  # Env vars must be strings

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: Any,
    ) -> None:
        # Restore original values
        for key, value in self.old_values.items():
            if value is None:
                del os.environ[key]
            else:
                os.environ[key] = value


def stacify_column_names(gf: GeoDataFrame) -> GeoDataFrame:
    """
    Renames WESM column names to match STAC (SpatioTemporal Asset Catalog) conventions

    Parameters
    ----------
    gf
        The input GeoDataFrame with columns 'collect_start' and 'collect_end'.

    Returns
    -------
        The modified GeoDataFrame with renamed columns and additional 'datetime' and 'duration' columns.

    Notes
    -----
        - 'start_datetime': Renamed from 'collect_start'.
        - 'end_datetime': Renamed from 'collect_end'.
        - 'datetime': Midpoint of 'start_datetime' and 'end_datetime'.
        - 'duration': Duration in days between 'start_datetime' and 'end_datetime'.
    """

    name_map = {
        "collect_start": "start_datetime",
        "collect_end": "end_datetime",
    }
    gf = gf.rename(columns=name_map)
    # Add collection name to match STAC (used to identify geodataframe contents in other functions)
    gf["collection"] = "3DEP"
    gf["start_datetime"] = pd.to_datetime(
        gf["start_datetime"], format="ISO8601", errors="coerce"
    )
    gf["end_datetime"] = pd.to_datetime(
        gf["end_datetime"], format="ISO8601", errors="coerce"
    )
    duration = gf.end_datetime - gf.start_datetime
    gf["datetime"] = gf.start_datetime + duration / 2
    gf["dayofyear"] = gf.datetime.dt.dayofyear
    gf["duration"] = duration.dt.days

    return gf


[docs] def read_wesm_csv(url: str = wesm_gpkg_url) -> GeoDataFrame: """ Read WESM metadata from a remote CSV file. The CSV contains up to date metadata and a mapping of workunit to feature ID (FID) in the main GPKG file. Parameters ---------- url The URL or file path to the WESM index file Returns ------- A GeoDataFrame containing the metadata from the CSV file. """ # TODO: Cache CSV locally, so subsequent reads are faster wesm_csv = url.replace(".gpkg", ".csv") df = pd.read_csv(wesm_csv) df.index += 1 df.index.name = "fid" return df
def search_bboxes( url: str = wesm_gpkg_url, intersects: GeoDataFrame | None = None, search_start: Timestamp | None = None, search_end: Timestamp | None = None, # **kwargs: dict[str, Any] | None, ) -> GeoDataFrame: """ Search WESM.gpkg bounding boxes for spatial and temporal overlap Parameters ---------- url The URL or file path to the GeoPackage (GPKG) file. intersects A GeoDataFrame to spatially intersect with the convex hulls, by default None. search_start The start time for the temporal search, by default None. search_end The end time for the temporal search, by default None. Returns ------- A GeoDataFrame containing the convex hull geometries and FIDs in EPSG:4326 """ # NOTE: much faster to JUST read bboxes, not full geometry or other columns sql = "select * from rtree_WESM_geometry" # # with rasterio.Env( # AWS_NO_SIGN_REQUEST="YES", # CPL_VSIL_CURL_ALLOWED_EXTENSIONS=".gpkg", # ): with TemporaryEnvVars( AWS_NO_SIGN_REQUEST="YES", CPL_VSIL_CURL_ALLOWED_EXTENSIONS=".gpkg", ): df = pyogrio.read_dataframe( url, sql=sql ) # , use_arrow=True... arrow probably doesn;t matter for <10000 rows? bboxes = df.apply(lambda x: box(x.minx, x.miny, x.maxx, x.maxy), axis=1) gf = ( GeoDataFrame(df, geometry=bboxes, crs="EPSG:4326") .rename(columns={"id": "fid"}) .set_index("fid") ) df = read_wesm_csv(url) gf = gf.merge(df, right_index=True, left_index=True) gf = stacify_column_names(gf) if intersects is not None: gf = gf[gf.intersects(intersects.geometry.iloc[0])] if search_start is not None and search_end is not None: gf = subset_by_temporal_overlap(gf, search_start, search_end, temporal_buffer=0) return gf
[docs] def load_by_fid( fids: list[int], url: str = wesm_gpkg_url, **kwargs: dict[str, Any] | None ) -> GeoDataFrame: """ Extracts full geometry from remote WESM GPKG based on Feature ID (FID) Parameters ---------- fids List of Feature IDs (fids) to extract from GPKG url The URL or file path to the GeoPackage (GPKG) file. **kwargs Additional keyword arguments to pass to :func:`geopandas.read_file` Returns ------- A GeoDataFrame containing the data from the GeoPackage file with column names modified by :func:`stacify_column_names`. """ # Format SQL: # special case for (a) not (a,) # Reading a remote WESM by specific FIDs is fast query = f"fid in ({fids[0]})" if len(fids) == 1 else f"fid in {(*fids,)}" with TemporaryEnvVars( AWS_NO_SIGN_REQUEST="YES", CPL_VSIL_CURL_ALLOWED_EXTENSIONS=".gpkg", ): gf = read_file( url, where=query, **kwargs, # mask=mask, # spatial subset intolerably slow for remote GPKG... # NOTE: worth additional dependencies for speed? # Only faster I think if GPKG is local & large https://github.com/geopandas/pyogrio/issues/252 # engine='pyogrio', # pyarrow=True, ) # For the purposes of footprint polygon search, just ignore datum (NAD83) gf = gf.set_crs("EPSG:4326", allow_override=True) return stacify_column_names(gf)
def swathtime_to_datetime( gf: GeoDataFrame, start_col: str = "START_TIME", end_col: str = "END_TIME" ) -> GeoDataFrame: """ Convert swath GPS time columns to datetimes in a GeoDataFrame Parameters ---------- gf : GeoDataFrame The GeoDataFrame containing the swath time columns. start_col : str, optional The name of the column containing the start times, by default 'START_TIME'. end_col : str, optional The name of the column containing the end times, by default 'END_TIME'. Returns ------- GeoDataFrame The GeoDataFrame with additional 'start_ 'dayofyear' column. """ if start_col not in gf.columns or end_col not in gf.columns: available_cols = ", ".join(gf.columns.tolist()) message = f"Columns {start_col} and/or {end_col} not found in GeoDataFrame. Available columns: {available_cols}" raise KeyError(message) gf["collect_start"] = Timestamp("1980-01-06") + gf[start_col].apply( lambda x: Timedelta(seconds=x + 1e9) ) gf["collect_end"] = Timestamp("1980-01-06") + gf[end_col].apply( lambda x: Timedelta(seconds=x + 1e9) ) # Assuming no flights spanning multiple days / happening at midnight :) return stacify_column_names(gf)
[docs] def get_swath_polygons( workunit: str, start_col: str = "START_TIME", end_col: str = "END_TIME", ) -> GeoDataFrame: """ Retrieve swath polygons from a remote URL and convert swath time to datetime. Parameters ---------- workunit The WESM workunit string. start_col The name of the column containing the start times, by default 'START_TIME'. end_col The name of the column containing the end times, by default 'END_TIME'. Returns ------- A GeoDataFrame containing the swath polygons with swath time converted to datetime. Notes ----- - Swath polygons not available for all workunits """ # NOTE: this CSV comes from crawling WESM USGS/spatial_metadata in S3 bucket df = pd.read_csv(swath_polygon_csv) try: url = df[df.workunit == workunit].swathpolygon_link.iloc[0] except Exception as e: message = f"No swath polygons found for workunit={workunit}" raise ValueError(message) from e # Actually read from S3! with TemporaryEnvVars( AWS_NO_SIGN_REQUEST="YES", GDAL_DISABLE_READDIR_ON_OPEN="EMPTY_DIR", ): gf = read_file(url) gf = swathtime_to_datetime(gf, start_col=start_col, end_col=end_col) # Swath polygons likely have different CRS (EPSG:6350), so reproject return gf.to_crs("EPSG:4326")
# NOTE it says that 3DEP elevation products were "lastUpdatedDate": "Sep 11, 2019"? # https://tnmaccess.nationalmap.gov/api/v1/datasets? def query_tnm_api( polygon_str: str, tnmdataset: str = "Digital Elevation Model (DEM) 1 meter", prodFormats: str = "GeoTIFF", ) -> list[dict[str, Any]]: """ Query the TNM API for USGS DEM products that intersect the given polygon. Parameters: polygon_str (str): Polygon coordinates string in the required API format. tnmdataset (str): The dataset name for the TNM API (default is "Digital Elevation Model (DEM) 1 meter"). prodFormats (str): The product format filetype to search (default is "GeoTIFF"). Returns: list: A list of JSON items returned from the TNM API. """ # params is a dict[str, object], but requests expects Mapping[str, Any] items = [] offset = 0 url = "https://tnmaccess.nationalmap.gov/api/v1/products" while True: params = { "datasets": tnmdataset, "polygon": polygon_str, "prodFormats": prodFormats, "outputFormat": "JSON", "max": 200, "offset": offset, } response = requests.get(url, params=params, timeout=60) if response.status_code != 200: msg_status_code = ( f"TNM API request failed with status code {response.status_code}" ) raise RuntimeError(msg_status_code) data = response.json() batch = data.get("items", []) if not batch: # no more products -> exit loop break items.extend(batch) offset += len(batch) # for debugging: # print(response.url) return items