Skip to content

API Reference

Full API reference for all EarthForge packages. Auto-generated from docstrings via mkdocstrings.

earthforge.core

config

earthforge.core.config

EarthForge configuration management.

Provides profile-based configuration backed by a TOML file at ~/.earthforge/config.toml. Each profile bundles a STAC API endpoint, a storage backend selection, and backend-specific options (credentials, regions, endpoints). The default profile is used when no --profile flag is given.

Configuration file format::

[profiles.default]
stac_api = "https://earth-search.aws.element84.com/v1"
storage = "s3"

[profiles.default.storage_options]
region = "us-west-2"

Functions:

Name Description
load_profile

Async loader that reads config and returns a typed profile.

load_profile_sync

Convenience sync wrapper.

init_config

Creates a starter config file with a default profile.

config_dir

Returns the resolved config directory path.

EarthForgeProfile dataclass

A named configuration profile.

Parameters:

Name Type Description Default
name str

Profile identifier (e.g. "default", "planetary").

required
stac_api str | None

Base URL for the STAC API, or None if not configured.

None
storage_backend str

One of "s3", "gcs", "azure", "local".

'local'
storage_options dict[str, str]

Backend-specific key/value pairs (region, credentials, etc.).

dict()

Raises:

Type Description
ConfigError

If storage_backend is not in VALID_BACKENDS.

from_dict(name, data) classmethod

Construct a profile from a parsed TOML dictionary.

Parameters:

Name Type Description Default
name str

The profile name key.

required
data dict[str, object]

The TOML table for this profile.

required

Returns:

Type Description
Self

A validated EarthForgeProfile.

Raises:

Type Description
ConfigError

If required fields are missing or have wrong types.

config_dir()

Return the EarthForge configuration directory.

Returns:

Type Description
Path

Path("~/.earthforge") expanded to an absolute path.

load_profile(name='default') async

Load a named profile from the configuration file.

If no config file exists, returns a built-in default profile (for the "default" name) or raises ConfigError for any other name.

Parameters:

Name Type Description Default
name str

Profile name to load.

'default'

Returns:

Type Description
EarthForgeProfile

The resolved EarthForgeProfile.

Raises:

Type Description
ConfigError

If the config file is malformed, the profile doesn't exist, or field validation fails.

load_profile_sync(name='default')

Synchronous convenience wrapper for :func:load_profile.

Parameters:

Name Type Description Default
name str

Profile name to load.

'default'

Returns:

Type Description
EarthForgeProfile

The resolved EarthForgeProfile.

Raises:

Type Description
ConfigError

Same conditions as :func:load_profile.

init_config(*, overwrite=False) async

Create the default configuration file.

Parameters:

Name Type Description Default
overwrite bool

If True, replace an existing config file. If False and the file already exists, raise ConfigError.

False

Returns:

Type Description
Path

The path to the created config file.

Raises:

Type Description
ConfigError

If the file exists and overwrite is False, or if the directory cannot be created.

formats

earthforge.core.formats

EarthForge centralized format detection.

Identifies geospatial file formats using a three-stage detection chain:

  1. Magic bytes — Read the first 512 bytes and match known signatures.
  2. File extension — Fall back to extension-based lookup.
  3. Content inspection — For ambiguous cases (e.g. GeoTIFF vs COG), perform format-specific structural checks.

Domain packages can register additional content inspectors via :func:register_inspector to extend detection without modifying this module.

The detection chain works on both local paths and remote URLs. For remote files, only the first 512 bytes are fetched via HTTP range request — no full downloads.

Usage::

from earthforge.core.formats import detect, detect_sync, FormatType

fmt = await detect("/path/to/file.tif")
assert fmt == FormatType.GEOTIFF

FormatType

Bases: StrEnum

Known geospatial file format identifiers.

Members map to canonical format names used throughout EarthForge for dispatch, validation, and output labeling.

register_inspector(fn)

Register a content inspector for format disambiguation.

Inspectors are called in registration order. The first non-None return value replaces the candidate format.

Parameters:

Name Type Description Default
fn InspectorFn

A callable (header_bytes, candidate_format, source) -> FormatType | None.

required

Returns:

Type Description
InspectorFn

The same function (allows use as a decorator).

detect(source, *, profile=None) async

Detect the geospatial format of a file or URL.

Uses a three-stage chain: magic bytes → extension → content inspection. For remote URLs, only the first 512 bytes are fetched.

Parameters:

Name Type Description Default
source str

Local file path or HTTP(S) URL.

required
profile EarthForgeProfile | None

Optional profile for HTTP client configuration (remote URLs).

None

Returns:

Type Description
FormatType

The detected :class:FormatType.

Raises:

Type Description
FormatDetectionError

If the source cannot be read.

detect_sync(source, *, profile=None)

Synchronous convenience wrapper for :func:detect.

Parameters:

Name Type Description Default
source str

Local file path or HTTP(S) URL.

required
profile EarthForgeProfile | None

Optional profile for HTTP client configuration.

None

Returns:

Type Description
FormatType

The detected :class:FormatType.

Raises:

Type Description
FormatDetectionError

If the source cannot be read.

errors

earthforge.core.errors

EarthForge error hierarchy.

All exceptions raised by EarthForge inherit from EarthForgeError. Each domain package defines its own subclasses (e.g. StacSearchError, CogValidationError) so callers can catch at whatever granularity they need. The exit_code attribute maps directly to CLI exit codes, letting the CLI layer translate library exceptions into meaningful shell return values without parsing message strings.

EarthForgeError

Bases: Exception

Base exception for all EarthForge errors.

Parameters:

Name Type Description Default
message str

Human-readable description of the error.

required
exit_code int

CLI exit code to use when this error propagates to the shell. Defaults to 1 (general error).

1

Attributes:

Name Type Description
exit_code int

The numeric exit code for CLI propagation.

ConfigError

Bases: EarthForgeError

Raised when configuration loading, parsing, or validation fails.

Examples: missing config file, invalid TOML, unknown profile name, missing required field in a profile.

StorageError

Bases: EarthForgeError

Raised when a cloud storage operation fails.

Examples: permission denied on S3, object not found, network timeout, invalid storage backend name.

HttpError

Bases: EarthForgeError

Raised when an HTTP request fails after retries.

Parameters:

Name Type Description Default
message str

Human-readable description.

required
status_code int | None

The HTTP status code that triggered the error, if available.

None
exit_code int

CLI exit code (defaults to 4).

4

Attributes:

Name Type Description
status_code

The HTTP status code, or None for connection-level failures.

FormatDetectionError

Bases: EarthForgeError

Raised when format detection cannot determine the file type.

This typically means the file's magic bytes don't match any known format, the extension is unrecognized, and content inspection was inconclusive.

output

earthforge.core.output

EarthForge structured output rendering.

All CLI output flows through this module. Commands return Pydantic models; this module serializes them into the format requested by --output. Domain packages never call print() or rich directly.

Supported formats:

  • table — Human-readable Rich table (default for interactive terminals).
  • json — Machine-readable JSON matching the Pydantic model schema.
  • csv — Comma-separated values for spreadsheet and pipeline consumption.
  • quiet — Suppressed output; only the exit code communicates success/failure.

The contract is simple: if --output json produces valid JSON for one command, it produces valid JSON for every command. The schema is the Pydantic model itself.

Accessibility (WCAG 2.1 AA):

  • NO_COLOR disables all color (https://no-color.org/).
  • FORCE_COLOR forces color even in non-interactive contexts (https://force-color.org/). NO_COLOR takes precedence.
  • Status indicators always include text markers ([PASS], [FAIL], [WARN]) so information is never conveyed by color alone.
  • High-contrast mode selects styles that meet WCAG 4.5:1 contrast ratios on both dark and light terminal backgrounds.

Usage in CLI commands::

from earthforge.core.output import OutputFormat, render_to_console

result = await some_library_function(...)
render_to_console(result, fmt=OutputFormat.TABLE)

OutputFormat

Bases: StrEnum

Supported output formats for CLI commands.

Members

TABLE: Human-readable Rich table. JSON: Machine-readable JSON. CSV: Comma-separated values. QUIET: No output.

StatusMarker

Bases: StrEnum

Text markers for pass/fail/warn status.

These ensure information is never conveyed by color alone (WCAG 1.4.1 Use of Color). Every status indicator in table output includes both a colored token and a text marker.

format_status(marker, message='')

Format a status marker with an optional message.

Parameters:

Name Type Description Default
marker StatusMarker

The status marker to display.

required
message str

Optional text to append after the marker.

''

Returns:

Type Description
str

A string like "[PASS] All checks passed" suitable for both

str

colored and plain-text rendering.

render(data, fmt, *, high_contrast=False)

Render structured data to a string in the requested format.

Parameters:

Name Type Description Default
data BaseModel | Sequence[BaseModel]

A single Pydantic model or a sequence of models.

required
fmt OutputFormat

The desired output format.

required
high_contrast bool

If True, use high-contrast styling (WCAG 4.5:1).

False

Returns:

Type Description
str

The formatted string. Returns an empty string for QUIET format.

Raises:

Type Description
ValueError

If fmt is not a valid OutputFormat.

render_to_console(data, fmt, *, no_color=False, high_contrast=False)

Render structured data directly to the terminal.

This is the primary function called by CLI command handlers.

Parameters:

Name Type Description Default
data BaseModel | Sequence[BaseModel]

A single Pydantic model or a sequence of models.

required
fmt OutputFormat

The desired output format.

required
no_color bool

If True, disable colored output regardless of NO_COLOR.

False
high_contrast bool

If True, use high-contrast styling (WCAG 4.5:1).

False

expression

earthforge.core.expression

Safe arithmetic expression evaluator for band math and formulas.

Parses mathematical expressions using Python's AST module and evaluates them against a provided variable environment. Only whitelisted operations are permitted — no eval(), exec(), attribute access, subscripts, or arbitrary function calls.

Supported constructs:

  • Arithmetic: +, -, *, /, **
  • Unary: -x, +x
  • Comparison: <, <=, >, >=, ==, !=
  • Safe functions: clip, where, abs, sqrt, log, minimum, maximum
  • Constants: numeric literals (int, float)
  • Variables: names bound in the environment dict

This module is shared infrastructure — domain packages (raster, pipeline) import from here rather than implementing their own expression parsers.

Usage::

from earthforge.core.expression import safe_eval

env = {"B04": red_array, "B08": nir_array}
ndvi = safe_eval("(B08 - B04) / (B08 + B04)", env)

safe_eval(expr_str, env)

Evaluate a mathematical expression safely via AST walking.

Only arithmetic operators, comparisons, whitelisted function calls, numeric constants, and names present in env are permitted. No builtins, attribute access, subscripts, or arbitrary code execution.

Parameters:

Name Type Description Default
expr_str str

Expression string (e.g. "(B08 - B04) / (B08 + B04)").

required
env dict[str, Any]

Variable bindings (name → value, typically numpy arrays).

required

Returns:

Type Description
Any

Result of evaluating the expression.

Raises:

Type Description
ValueError

If the expression contains unsupported constructs or references undefined variables.

extract_variables(expr_str)

Extract variable names referenced in an expression.

Parameters:

Name Type Description Default
expr_str str

Expression string.

required

Returns:

Type Description
set[str]

Set of variable names (excluding safe function names).

Raises:

Type Description
ValueError

If the expression has invalid syntax.

palettes

earthforge.core.palettes

Colorblind-safe palette constants for EarthForge visualizations.

All palettes are verified safe for the three main forms of color vision deficiency (deuteranopia, protanopia, tritanopia). Every visualization produced by EarthForge — CLI preview images, example output maps, and documentation figures — must use one of these palettes.

Palette categories:

  • Sequential — for continuous data with a single direction (elevation, temperature, NDVI magnitude). viridis and cividis are perceptually uniform and safe for all forms of CVD.

  • Diverging — for data centered on a meaningful midpoint (NDVI gain/loss, temperature anomalies). Brown → white → teal, sourced from ColorBrewer BrBG.

  • Categorical — for discrete classes (land cover types, format categories). ColorBrewer Set2 (8 colors) and Paired (12 colors).

VIRIDIS = ['#440154', '#482777', '#3e4a89', '#31688e', '#26838e', '#1f9e89', '#6cce5a', '#b6de2b', '#fee825'] module-attribute

Viridis 9-stop palette — dark purple to bright yellow.

CIVIDIS = ['#00224e', '#123570', '#3b496c', '#575d6d', '#707173', '#8a8678', '#a59c74', '#c3b369', '#e1cc55'] module-attribute

Cividis 9-stop palette — dark blue to warm yellow, optimized for CVD.

DIVERGING_BRBG = ['#8c510a', '#bf812d', '#dfc27d', '#f6e8c3', '#f5f5f5', '#c7eae5', '#80cdc1', '#35978f', '#01665e'] module-attribute

Brown → white → teal diverging palette (9 stops, ColorBrewer BrBG).

SET2 = ['#66c2a5', '#fc8d62', '#8da0cb', '#e78ac3', '#a6d854', '#ffd92f', '#e5c494', '#b3b3b3'] module-attribute

ColorBrewer Set2 — 8 qualitative colors, CVD-safe.

PAIRED = ['#a6cee3', '#1f78b4', '#b2df8a', '#33a02c', '#fb9a99', '#e31a1c', '#fdbf6f', '#ff7f00', '#cab2d6', '#6a3d9a', '#ffff99', '#b15928'] module-attribute

ColorBrewer Paired — 12 qualitative colors, grouped in light/dark pairs.

SEQUENTIAL = {'viridis': VIRIDIS, 'cividis': CIVIDIS} module-attribute

All sequential palettes by name.

DIVERGING = {'brbg': DIVERGING_BRBG} module-attribute

All diverging palettes by name.

CATEGORICAL = {'set2': SET2, 'paired': PAIRED} module-attribute

All categorical palettes by name.


earthforge.stac

earthforge.stac.search

STAC catalog search.

Wraps pystac-client's search functionality with EarthForge's profile-aware configuration and returns structured Pydantic models. The search is executed synchronously via pystac-client (which uses requests internally) and wrapped in an async interface for consistency with the rest of EarthForge.

Design note: pystac-client uses requests for HTTP, not httpx. This is an accepted trade-off — pystac-client handles STAC pagination, conformance negotiation, and CQL2 filtering that would be complex to reimplement. Our httpx-based earthforge.core.http is used for non-STAC HTTP operations (range reads, direct asset fetches).

Usage::

from earthforge.core.config import load_profile
from earthforge.stac.search import search_catalog

profile = await load_profile("default")
results = await search_catalog(
    profile=profile,
    collections=["sentinel-2-l2a"],
    bbox=[-85.0, 37.0, -84.0, 38.0],
    max_items=10,
)

AssetInfo

Bases: BaseModel

Metadata for a single STAC asset.

Attributes:

Name Type Description
key str

The asset key (e.g. "visual", "B04").

href str

URL to the asset file.

media_type str | None

MIME type if available.

title str | None

Human-readable title if available.

SearchResultItem

Bases: BaseModel

A single STAC item from a search result.

Attributes:

Name Type Description
id str

The STAC item ID.

collection str | None

The collection this item belongs to.

datetime str | None

The item's datetime as ISO string, or None for date ranges.

bbox list[float] | None

Bounding box [west, south, east, north].

properties dict[str, object]

Raw STAC properties dict (eo:cloud_cover, platform, etc.).

asset_count int

Number of assets in this item.

assets list[AssetInfo]

List of asset metadata (populated when detail is requested).

self_link str | None

URL to the item's self link.

SearchResult

Bases: BaseModel

Structured result from a STAC catalog search.

Attributes:

Name Type Description
api_url str

The STAC API endpoint that was searched.

matched int | None

Total number of items matching the query (if reported by API).

returned int

Number of items actually returned.

items list[SearchResultItem]

The search result items.

search_catalog(profile, *, collections=None, bbox=None, datetime_range=None, max_items=10, query=None, filter_expr=None, filter_lang='cql2-json') async

Search a STAC catalog using the profile's configured API endpoint.

Runs the synchronous pystac-client search in a thread executor to avoid blocking the event loop.

Parameters:

Name Type Description Default
profile EarthForgeProfile

Active configuration profile (provides the STAC API URL).

required
collections list[str] | None

Collection IDs to search within (e.g. ["sentinel-2-l2a"]).

None
bbox list[float] | None

Spatial bounding box [west, south, east, north] in WGS84.

None
datetime_range str | None

Temporal filter as ISO 8601 datetime or range string.

None
max_items int

Maximum items to return (default: 10).

10
query dict[str, object] | None

Legacy query parameters (deprecated — use filter_expr instead).

None
filter_expr dict[str, object] | None

CQL2-JSON filter expression, preferred per STAC best practices. Example: {"op": "<=", "args": [{"property": "eo:cloud_cover"}, 20]}

None
filter_lang str | None

Filter language (default: "cql2-json"). Only used when filter_expr is provided.

'cql2-json'

Returns:

Type Description
SearchResult

Structured search results with items and metadata.

Raises:

Type Description
StacError

If pystac-client is not installed.

StacSearchError

If the API connection or search fails.

info

earthforge.stac.info

STAC item and collection metadata inspection.

Fetches and parses STAC items or collections from a URL, returning structured metadata suitable for CLI rendering. Uses httpx via earthforge.core.http for fetching (unlike search, which uses pystac-client).

Usage::

from earthforge.stac.info import inspect_stac_item

profile = await load_profile("default")
info = await inspect_stac_item(profile, "https://earth-search.../items/S2A_...")

StacAssetDetail

Bases: BaseModel

Detailed metadata for a single STAC asset.

Attributes:

Name Type Description
key str

Asset key identifier.

href str

URL to the asset.

media_type str | None

MIME type.

title str | None

Human-readable title.

roles list[str]

Asset roles (e.g. ["data"], ["thumbnail"]).

StacItemInfo

Bases: BaseModel

Structured metadata for a STAC item.

Attributes:

Name Type Description
id str

STAC item ID.

collection str | None

Parent collection ID.

datetime str | None

Item datetime as ISO string.

bbox list[float] | None

Bounding box [west, south, east, north].

geometry_type str | None

GeoJSON geometry type (e.g. "Polygon").

properties dict[str, object]

Selected properties from the item.

asset_count int

Number of assets.

assets list[StacAssetDetail]

Detailed asset metadata.

stac_version str | None

STAC specification version.

stac_extensions list[str]

List of STAC extension URIs.

StacCollectionInfo

Bases: BaseModel

Structured metadata for a STAC collection.

Attributes:

Name Type Description
id str

Collection ID.

title str | None

Human-readable title.

description str | None

Collection description.

license str | None

License identifier.

extent_spatial list[float] | None

Spatial extent as bounding box.

extent_temporal list[str | None]

Temporal extent as [start, end] ISO strings.

item_count int | None

Number of items if reported.

stac_version str | None

STAC specification version.

inspect_stac_item(profile, url) async

Fetch and parse a STAC item from a URL.

Parameters:

Name Type Description Default
profile EarthForgeProfile

Active configuration profile.

required
url str

URL to a STAC item JSON document.

required

Returns:

Type Description
StacItemInfo

Structured item metadata.

Raises:

Type Description
StacError

If the fetch fails or the response is not a valid STAC item.

inspect_stac_collection(profile, url) async

Fetch and parse a STAC collection from a URL.

Parameters:

Name Type Description Default
profile EarthForgeProfile

Active configuration profile.

required
url str

URL to a STAC collection JSON document.

required

Returns:

Type Description
StacCollectionInfo

Structured collection metadata.

Raises:

Type Description
StacError

If the fetch fails or the response is not a valid STAC collection.

fetch

earthforge.stac.fetch

STAC asset download with parallel fetch and resume support.

Downloads assets from a STAC item to a local directory. Fetches the item JSON, filters the requested assets, then downloads them concurrently using asyncio.TaskGroup bounded by a semaphore for configurable parallelism.

Resume support: if a local file already exists with the same byte count as the server's Content-Length, the asset is skipped without re-downloading.

Usage::

from earthforge.stac.fetch import fetch_assets

profile = await load_profile("default")
result = await fetch_assets(
    profile,
    item_url="https://earth-search.../items/S2A_...",
    output_dir="./data/sentinel2",
    assets=["red", "green", "blue"],
    parallel=4,
)
print(f"Downloaded {result.assets_fetched} assets ({result.total_bytes_downloaded:,} bytes)")

AssetFetchResult

Bases: BaseModel

Result for a single downloaded asset.

Attributes:

Name Type Description
key str

Asset dictionary key (e.g. "B04", "red").

href str

Original remote URL of the asset.

local_path str

Path to the downloaded local file.

size_bytes int

File size in bytes.

skipped bool

True if the file already existed with the correct size (resumed).

media_type str | None

MIME type from the STAC asset definition.

FetchResult

Bases: BaseModel

Structured result for a STAC asset fetch operation.

Attributes:

Name Type Description
item_id str

STAC item ID.

item_url str

URL the item was fetched from.

output_dir str

Local directory where assets were written.

assets_requested int

Number of assets selected for download.

assets_fetched int

Number of assets actually downloaded.

assets_skipped int

Number of assets skipped (already existed, correct size).

total_bytes_downloaded int

Bytes transferred during this run.

total_size_bytes int

Total size of all files on disk after fetch.

elapsed_seconds float

Wall-clock time for the entire operation.

files list[AssetFetchResult]

Per-asset download results.

fetch_assets(profile, item_url, *, output_dir=None, assets=None, parallel=4) async

Download assets from a STAC item to a local directory.

Fetches the item JSON from item_url, selects the requested assets, then downloads them concurrently. Supports resume: assets that already exist locally with the correct byte count are skipped.

Parameters:

Name Type Description Default
profile EarthForgeProfile

EarthForge config profile (for HTTP client config).

required
item_url str

URL to a STAC item JSON.

required
output_dir str | None

Local directory to write files into. Defaults to <current_dir>/<item_id>/.

None
assets list[str] | None

List of asset keys to download. If None, all data assets (excluding thumbnails and overviews) are downloaded.

None
parallel int

Maximum number of concurrent downloads (default: 4).

4

Returns:

Type Description
FetchResult

class:FetchResult with per-asset download details.

Raises:

Type Description
StacError

If the item URL cannot be fetched or assets fail to download.

validate

earthforge.stac.validate

STAC item and collection validation against the STAC specification.

Validates STAC documents using pystac's built-in validation (which delegates to JSON Schema validation against the STAC spec schemas). Supports validating both local JSON files and remote STAC API URLs.

Usage::

from earthforge.stac.validate import validate_stac

profile = await load_profile("default")
result = await validate_stac(profile, "https://earth-search.../items/S2A_...")

StacValidationCheck

Bases: BaseModel

Result of a single validation check.

Attributes:

Name Type Description
check str

Name of the validation check.

status str

Pass/fail/warn status with text marker.

message str

Human-readable detail.

StacValidationResult

Bases: BaseModel

Aggregate result of validating a STAC document.

Attributes:

Name Type Description
source str

URL or path that was validated.

stac_type str

Detected type ("Item", "Collection", "Catalog").

stac_version str

STAC version declared in the document.

is_valid bool

Overall pass/fail.

extensions_validated list[str]

List of extension schema IDs that were checked.

checks list[StacValidationCheck]

Individual check results.

summary str

Human-readable one-line summary.

validate_stac(profile, source) async

Validate a STAC item or collection against the specification.

Fetches the STAC document, determines its type, then runs pystac validation including any declared extension schemas.

Parameters:

Name Type Description Default
profile EarthForgeProfile

EarthForge profile (used for HTTP client configuration).

required
source str

URL or local path to a STAC item or collection JSON.

required

Returns:

Name Type Description
A StacValidationResult

class:StacValidationResult with detailed check results.

Raises:

Type Description
StacValidationError

If the document cannot be fetched or parsed.

publish

earthforge.stac.publish

STAC item publication to writable STAC APIs.

Pushes STAC items to APIs that support the Transaction Extension (OGC API - Features - Part 4). Checks the /conformance endpoint before attempting to POST/PUT items.

Usage::

from earthforge.stac.publish import publish_item

profile = await load_profile("default")
result = await publish_item(profile, item_dict, collection_id="my-collection")

PublishResult

Bases: BaseModel

Result of publishing a STAC item.

Attributes:

Name Type Description
item_id str

The published item's ID.

collection_id str

The target collection.

api_url str

The STAC API endpoint.

action str

"created" or "updated".

status_code int

HTTP response status code.

self_link str | None

URL to the published item (if available).

check_transaction_support(api_url) async

Check if a STAC API supports the Transaction Extension.

Parameters:

Name Type Description Default
api_url str

Base URL of the STAC API.

required

Returns:

Type Description
bool

True if the Transaction Extension is supported.

publish_item(profile, item, *, collection_id=None, api_url=None, upsert=True) async

Publish a STAC item to a writable STAC API.

Parameters:

Name Type Description Default
profile EarthForgeProfile

EarthForge profile (provides default STAC API URL).

required
item dict[str, Any]

STAC Item dict to publish.

required
collection_id str | None

Target collection. Defaults to item's collection field.

None
api_url str | None

Override STAC API URL. Defaults to profile's stac_api.

None
upsert bool

If True, attempt PUT to update if POST returns 409 Conflict.

True

Returns:

Name Type Description
A PublishResult

class:PublishResult with publication details.

Raises:

Type Description
StacPublishError

If publication fails.


earthforge.raster

info

earthforge.raster.info

Raster file inspection — COG and GeoTIFF metadata extraction.

Reads raster metadata (dimensions, CRS, bands, data types, tiling, overviews) without loading pixel data. For remote files, rasterio uses GDAL's virtual filesystem (vsicurl) which issues HTTP range requests automatically.

Usage::

from earthforge.raster.info import inspect_raster, inspect_raster_sync

info = await inspect_raster("/path/to/file.tif")
print(info.width, info.height, info.crs)

BandInfo

Bases: BaseModel

Metadata for a single raster band.

Attributes:

Name Type Description
index int

1-based band index.

dtype str

Data type (e.g. "uint8", "float32").

nodata float | int | None

NoData value, or None if not set.

description str

Band description, or empty string.

RasterInfo

Bases: BaseModel

Structured metadata for a raster file.

Attributes:

Name Type Description
source str

The file path or URL that was inspected.

driver str

GDAL driver name (e.g. "GTiff").

width int

Raster width in pixels.

height int

Raster height in pixels.

crs str | None

Coordinate reference system as a string (e.g. "EPSG:4326").

bounds list[float]

Bounding box as [west, south, east, north].

transform list[float]

Affine transform as a 6-element list.

band_count int

Number of bands.

bands list[BandInfo]

Per-band metadata.

tile_width int | None

Tile width in pixels, or None if untiled (strip layout).

tile_height int | None

Tile height in pixels, or None if untiled.

is_tiled bool

Whether the raster uses tiled layout.

overview_count int

Number of overview levels.

overview_levels list[int]

List of overview decimation factors.

compression str | None

Compression method (e.g. "deflate", "lzw"), or None.

interleave str | None

Pixel interleaving ("band", "pixel"), or None.

inspect_raster(source) async

Inspect a raster file and return structured metadata.

Runs rasterio in a thread executor since GDAL I/O is blocking.

Parameters:

Name Type Description Default
source str

Local file path or URL.

required

Returns:

Type Description
RasterInfo

Structured raster metadata.

Raises:

Type Description
RasterError

If the file cannot be opened or read.

inspect_raster_sync(source)

Synchronous convenience wrapper for :func:inspect_raster.

Parameters:

Name Type Description Default
source str

Local file path or URL.

required

Returns:

Type Description
RasterInfo

Structured raster metadata.

Raises:

Type Description
RasterError

If the file cannot be opened or read.

validate

earthforge.raster.validate

COG compliance validation backed by rio-cogeo.

Delegates structural validation to rio-cogeo, the community-standard COG validation library, and supplements with rasterio-based checks for compression and format detection.

Checks performed:

  1. geotiff — File is a GeoTIFF (rasterio driver check).
  2. tiled — Data is stored in tiles, not strips (rio-cogeo).
  3. overviews — At least one overview level is present (rio-cogeo strict mode + rasterio fallback).
  4. ifd_order — IFD ordering is correct: overview data precedes full- resolution data in the file (rio-cogeo byte-level check).
  5. compression — Data is compressed (rasterio).

rio-cogeo is the authoritative source for checks 2-4. Its byte-level IFD ordering check catches files that appear valid from rasterio metadata alone but have incorrect internal structure. Using strict=True treats missing overviews as a validation error rather than a warning.

Usage::

from earthforge.raster.validate import validate_cog

result = await validate_cog("/path/to/file.tif")
assert result.is_valid

ValidationCheck

Bases: BaseModel

Result of a single validation check.

Attributes:

Name Type Description
name str

Check identifier (e.g. "tiled", "overviews").

passed bool

Whether this check passed.

message str

Human-readable result description.

CogValidationResult

Bases: BaseModel

Structured result from COG validation.

Attributes:

Name Type Description
source str

The file that was validated.

is_valid bool

Whether all checks passed.

checks list[ValidationCheck]

Individual check results.

summary str

One-line summary of the validation.

validate_cog(source) async

Validate COG compliance for a raster file.

Delegates to rio-cogeo for byte-level IFD ordering and structural validation, which catches files that appear valid from metadata alone but have incorrect internal structure.

Parameters:

Name Type Description Default
source str

Path or URL to a GeoTIFF file.

required

Returns:

Type Description
CogValidationResult

Structured validation result with named per-check results.

Raises:

Type Description
RasterError

If rio-cogeo is not installed, or the file cannot be opened.

convert

earthforge.raster.convert

GeoTIFF to Cloud-Optimized GeoTIFF (COG) conversion.

Converts plain GeoTIFF files into COG format by applying tiling, compression, and overview generation. Uses GDAL's COG driver (via rasterio) for spec- compliant output with proper IFD ordering.

Usage::

from earthforge.raster.convert import convert_to_cog

result = await convert_to_cog("input.tif", output="output.tif")

CogConvertResult

Bases: BaseModel

Structured result from a COG conversion.

Attributes:

Name Type Description
source str

Input file path.

output str

Output COG file path.

width int

Raster width in pixels.

height int

Raster height in pixels.

band_count int

Number of bands.

dtype str

Data type of the output.

crs str | None

CRS identifier string.

compression str

Compression codec used.

blocksize int

Tile size used.

overview_levels list[int]

Overview decimation levels generated.

overview_resampling str

Resampling method used for overviews.

file_size_bytes int | None

Output file size in bytes.

convert_to_cog(source, *, output=None, compression='deflate', blocksize=512, resampling='average', overview_levels=None) async

Convert a GeoTIFF to Cloud-Optimized GeoTIFF (COG).

Applies tiling, compression, and overview generation. The output follows the COG specification with proper IFD ordering (overviews after main image).

Parameters:

Name Type Description Default
source str

Path to the input GeoTIFF.

required
output str | None

Output COG path. If None, appends _cog to the stem.

None
compression str

Compression codec (default: "deflate").

'deflate'
blocksize int

Tile size in pixels (default: 512).

512
resampling str

Resampling for overviews (default: "nearest").

'average'
overview_levels list[int] | None

Explicit overview levels. None auto-computes.

None

Returns:

Type Description
CogConvertResult

Structured conversion result.

Raises:

Type Description
RasterError

If the conversion fails.

preview

earthforge.raster.preview

Raster preview generation.

Generates PNG quicklook images from raster files by reading overview levels (or downsampling) to avoid loading the full resolution dataset. For remote COGs, this means only the overview bytes are fetched — not the full file.

Usage::

from earthforge.raster.preview import generate_preview

result = await generate_preview("s3://bucket/image.tif", max_size=512)

PreviewResult

Bases: BaseModel

Structured result from preview generation.

Attributes:

Name Type Description
source str

The raster file that was previewed.

output_path str

Path to the generated PNG file.

width int

Preview image width in pixels.

height int

Preview image height in pixels.

bands_used int

Number of bands rendered.

overview_level int | None

Overview level used (None if full resolution was downsampled).

generate_preview(source, *, output_path=None, max_size=512) async

Generate a PNG quicklook from a raster file.

Reads overview levels when available to minimize data transfer for remote files. For local files, downsamples at read time.

Parameters:

Name Type Description Default
source str

Path or URL to a raster file.

required
output_path str | None

Output PNG path. If None, derives from source filename.

None
max_size int

Maximum dimension in pixels (default: 512).

512

Returns:

Type Description
PreviewResult

Structured preview result.

Raises:

Type Description
RasterError

If the file cannot be read or preview cannot be generated.

stats

earthforge.raster.stats

Raster statistics computation — global and zonal.

Computes summary statistics (min, max, mean, std, median, histogram) for raster files. Supports both global statistics (entire raster) and zonal statistics (masked to a WKT/GeoJSON geometry via rasterio.mask).

Usage::

from earthforge.raster.stats import compute_stats

result = await compute_stats("elevation.tif")
result = await compute_stats("elevation.tif", geometry_wkt="POLYGON(...)")

BandStatistics

Bases: BaseModel

Statistics for a single raster band.

Attributes:

Name Type Description
band int

Band index (1-based).

min float

Minimum value.

max float

Maximum value.

mean float

Mean value.

std float

Standard deviation.

median float

Median value.

valid_pixels int

Number of non-nodata pixels.

nodata_pixels int

Number of nodata pixels.

histogram_counts list[int]

Histogram bin counts.

histogram_edges list[float]

Histogram bin edges.

RasterStatsResult

Bases: BaseModel

Aggregate statistics result for a raster file.

Attributes:

Name Type Description
source str

Path or URL of the raster.

width int

Raster width in pixels.

height int

Raster height in pixels.

band_count int

Number of bands.

crs str | None

CRS string.

is_zonal bool

Whether a geometry mask was applied.

bands list[BandStatistics]

Per-band statistics.

compute_stats(source, *, bands=None, geometry_wkt=None, histogram_bins=50) async

Compute raster statistics.

Parameters:

Name Type Description Default
source str

Path or URL to a raster file.

required
bands list[int] | None

Band indices to compute (1-based). Default: all bands.

None
geometry_wkt str | None

Optional WKT geometry for zonal statistics.

None
histogram_bins int

Number of histogram bins (default: 50).

50

Returns:

Name Type Description
A RasterStatsResult

class:RasterStatsResult with per-band statistics.

Raises:

Type Description
RasterError

If the file cannot be opened or processed.

calc

earthforge.raster.calc

Raster band math calculator.

Evaluates mathematical expressions across raster bands using the safe expression evaluator from earthforge.core.expression. Supports multi-file inputs (one file per band variable) and produces a single-band output GeoTIFF.

Usage::

from earthforge.raster.calc import raster_calc

result = await raster_calc(
    expression="(B08 - B04) / (B08 + B04)",
    inputs={"B08": "nir.tif", "B04": "red.tif"},
    output="ndvi.tif",
)

RasterCalcResult

Bases: BaseModel

Result of a band math calculation.

Attributes:

Name Type Description
expression str

The expression that was evaluated.

output str

Output file path.

width int

Output raster width.

height int

Output raster height.

dtype str

Output data type.

crs str | None

CRS of the output.

file_size_bytes int

Size of the output file.

raster_calc(expression, inputs, output, *, dtype='float32', nodata=None) async

Evaluate a band math expression across raster inputs.

Parameters:

Name Type Description Default
expression str

Math expression (e.g. "(B08 - B04) / (B08 + B04)").

required
inputs dict[str, str]

Mapping of variable name to file path.

required
output str

Output GeoTIFF path.

required
dtype str

Output data type (default: "float32").

'float32'
nodata float | None

Nodata value for the output (default: None).

None

Returns:

Name Type Description
A RasterCalcResult

class:RasterCalcResult with output metadata.

Raises:

Type Description
RasterError

If inputs can't be read or expression is invalid.

tile

earthforge.raster.tile

XYZ/TMS static tile generation from raster files.

Generates a directory of PNG tiles in {z}/{x}/{y}.png structure from a raster file (typically a COG). Uses windowed reads at tile boundaries and overview levels for lower zoom levels.

Tile math is implemented inline (~40 lines) to avoid adding mercantile as a dependency.

Usage::

from earthforge.raster.tile import generate_tiles

result = await generate_tiles("elevation.tif", output_dir="tiles/", zoom_range=(8, 12))

TileResult

Bases: BaseModel

Result of tile generation.

Attributes:

Name Type Description
source str

Input raster path.

output_dir str

Directory containing generated tiles.

zoom_min int

Minimum zoom level generated.

zoom_max int

Maximum zoom level generated.

tile_count int

Total number of tiles generated.

tile_size int

Tile size in pixels.

generate_tiles(source, output_dir, *, zoom_range=(0, 5), tile_size=256) async

Generate XYZ tiles from a raster file.

Parameters:

Name Type Description Default
source str

Path or URL to a raster file.

required
output_dir str

Directory to write {z}/{x}/{y}.png tiles into.

required
zoom_range tuple[int, int]

(min_zoom, max_zoom) inclusive.

(0, 5)
tile_size int

Tile size in pixels (default: 256).

256

Returns:

Name Type Description
A TileResult

class:TileResult with generation summary.

Raises:

Type Description
RasterError

If the raster cannot be read or tiles cannot be written.


earthforge.vector

info

earthforge.vector.info

Deep metadata extraction for vector geospatial formats.

Reads Parquet/GeoParquet file metadata via pyarrow without loading data into memory. Extracts schema, row counts, geometry columns, CRS, bounding box, and encoding information from GeoParquet geo metadata.

For non-Parquet vector formats (GeoJSON, FlatGeobuf), provides basic file-level metadata. Deep inspection of those formats may be added in later milestones.

ColumnInfo

Bases: BaseModel

Metadata for a single column in a vector dataset.

Attributes:

Name Type Description
name str

Column name.

type str

Arrow type string (e.g. "int64", "binary").

is_geometry bool

Whether this column contains geometry data.

VectorInfo

Bases: BaseModel

Structured metadata for a vector geospatial file.

Attributes:

Name Type Description
source str

The file path that was inspected.

format str

Detected vector format (e.g. "geoparquet", "parquet").

row_count int

Total number of rows/features.

num_columns int

Total number of columns.

columns list[ColumnInfo]

Per-column metadata.

geometry_column str | None

Name of the primary geometry column, if any.

geometry_types list[str]

List of geometry types found (e.g. ["Point"]).

crs str | None

CRS string from GeoParquet metadata, if available.

bbox list[float] | None

Bounding box [west, south, east, north], if available.

encoding str | None

Geometry encoding (e.g. "WKB"), if available.

num_row_groups int | None

Number of Parquet row groups.

compression str | None

Parquet compression codec, if applicable.

file_size_bytes int | None

File size in bytes.

inspect_vector(source) async

Inspect a vector file and return structured metadata.

Runs the synchronous pyarrow read in a thread executor to avoid blocking the event loop. Currently supports Parquet and GeoParquet files.

Parameters:

Name Type Description Default
source str

Path to a vector file.

required

Returns:

Type Description
VectorInfo

Structured vector metadata.

Raises:

Type Description
VectorError

If the file cannot be read or format is unsupported.

convert

earthforge.vector.convert

Vector format conversion.

Converts between vector geospatial formats with a focus on producing valid GeoParquet output. Supports Shapefile, GeoJSON, and other OGR-readable formats as input. Writes GeoParquet with proper geo metadata including CRS, geometry types, encoding, and bounding box.

Uses GDAL/OGR for reading source formats and pyarrow for writing Parquet. Falls back to geopandas if available, but does not require it.

Usage::

from earthforge.vector.convert import convert_vector

result = await convert_vector("buildings.shp", output="buildings.parquet")

ConvertResult

Bases: BaseModel

Structured result from a vector format conversion.

Attributes:

Name Type Description
source str

Input file path.

output str

Output file path.

input_format str

Source format name (e.g. "ESRI Shapefile").

output_format str

Target format (e.g. "geoparquet").

feature_count int

Number of features converted.

geometry_type str | None

Geometry type (e.g. "Polygon").

crs str | None

CRS identifier string.

bbox list[float] | None

Bounding box [west, south, east, north].

file_size_bytes int | None

Output file size in bytes.

convert_vector(source, *, output=None, target_format='geoparquet', compression='snappy') async

Convert a vector file to GeoParquet.

Reads the source using GDAL/OGR and writes GeoParquet with proper geo metadata. Supports Shapefile, GeoJSON, GPKG, and any OGR-supported format.

Parameters:

Name Type Description Default
source str

Path to the input vector file.

required
output str | None

Output file path. If None, replaces extension with .parquet.

None
target_format str

Target format (default: "geoparquet").

'geoparquet'
compression str

Parquet compression codec (default: "snappy").

'snappy'

Returns:

Type Description
ConvertResult

Structured conversion result.

Raises:

Type Description
VectorError

If the conversion fails.

query

earthforge.vector.query

Spatial and attribute queries against GeoParquet files.

Leverages pyarrow's row-group-level statistics and predicate pushdown to read only the data that matches the query — critical for large files where reading the full dataset would be impractical.

For bbox queries, the filter is applied against the bbox column covering structure embedded in GeoParquet metadata. If per-row bounding box columns (bbox.xmin, bbox.ymin, etc.) are present, pyarrow can skip entire row groups whose spatial extent doesn't intersect the query box.

Usage::

from earthforge.vector.query import query_features

result = await query_features("buildings.parquet", bbox=[-85, 37, -84, 38])
print(result.feature_count)

QueryResult

Bases: BaseModel

Structured result from a vector spatial/attribute query.

Attributes:

Name Type Description
source str

The file that was queried.

feature_count int

Number of features matching the query.

columns list[str]

Column names in the result.

bbox_filter list[float] | None

The bounding box filter applied, if any.

features list[dict[str, Any]]

List of feature dicts (geometry as WKT if available).

total_rows int

Total rows in the source file (before filtering).

row_groups_scanned int | None

Number of Parquet row groups actually read.

row_groups_total int | None

Total row groups in the file.

query_features(source, *, bbox=None, columns=None, limit=None, include_geometry=True) async

Query features from a GeoParquet file.

Uses pyarrow predicate pushdown when GeoParquet bbox covering metadata is present, skipping row groups that don't intersect the query bbox. Falls back to post-read geometry filtering via shapely when covering is not available.

Parameters:

Name Type Description Default
source str

Path to a GeoParquet/Parquet file.

required
bbox list[float] | None

Bounding box filter [west, south, east, north].

None
columns list[str] | None

Columns to include. None returns all.

None
limit int | None

Maximum features to return.

None
include_geometry bool

Include geometry as WKT in results.

True

Returns:

Type Description
QueryResult

Structured query result.

Raises:

Type Description
VectorError

If the file cannot be read or query fails.

validate

earthforge.vector.validate

GeoParquet schema compliance validation.

Validates GeoParquet files against the GeoParquet specification by checking:

  • Presence of geo metadata key in Parquet file metadata
  • CRS stored as PROJJSON in the geo metadata
  • Geometry column declared and present in the schema
  • Supported geometry encoding (WKB)
  • Valid bounding box metadata

Usage::

from earthforge.vector.validate import validate_geoparquet

result = await validate_geoparquet("buildings.parquet")

VectorValidationCheck

Bases: BaseModel

Result of a single validation check.

Attributes:

Name Type Description
check str

Name of the validation check.

status str

Pass/fail/warn status with text marker.

message str

Human-readable detail.

VectorValidationResult

Bases: BaseModel

Aggregate result of validating a GeoParquet file.

Attributes:

Name Type Description
source str

Path or URL that was validated.

is_valid bool

Overall pass/fail.

format_version str | None

GeoParquet version if detected.

geometry_column str | None

Name of the primary geometry column.

crs str | None

CRS identifier (e.g. EPSG code) if found.

encoding str | None

Geometry encoding (e.g. WKB).

checks list[VectorValidationCheck]

Individual check results.

summary str

Human-readable one-line summary.

validate_geoparquet(source) async

Validate a GeoParquet file against the specification.

Parameters:

Name Type Description Default
source str

Path or URL to a Parquet file.

required

Returns:

Name Type Description
A VectorValidationResult

class:VectorValidationResult with detailed check results.

Raises:

Type Description
VectorValidationError

If the file cannot be read or is not Parquet.

clip

earthforge.vector.clip

Vector feature clipping by bounding box or geometry.

Clips features from a GeoParquet (or other vector) file to a bounding box or a clipping geometry using shapely.intersection.

Usage::

from earthforge.vector.clip import clip_features

result = await clip_features("buildings.parquet", bbox=(-85.5, 37.0, -84.0, 38.5))

ClipResult

Bases: BaseModel

Result of clipping vector features.

Attributes:

Name Type Description
source str

Input file path.

output str

Output file path.

features_input int

Number of features in the input.

features_output int

Number of features after clipping.

clip_method str

Either 'bbox' or 'geometry'.

output_format str

Output file format.

file_size_bytes int

Size of the output file.

clip_features(source, output=None, *, bbox=None, geometry_wkt=None) async

Clip features to a bounding box or geometry.

Parameters:

Name Type Description Default
source str

Path to a vector file (GeoParquet, GeoJSON, etc.).

required
output str | None

Output path. Defaults to <source_stem>_clipped.parquet.

None
bbox tuple[float, float, float, float] | None

Bounding box as (west, south, east, north).

None
geometry_wkt str | None

WKT geometry to clip to. bbox takes precedence.

None

Returns:

Name Type Description
A ClipResult

class:ClipResult with clipping summary.

Raises:

Type Description
VectorError

If the file cannot be read or no clip region is specified.

tile

earthforge.vector.tile

Vector tile generation — GeoParquet to PMTiles or MBTiles.

Converts vector features to tiled formats suitable for web map display. Uses mapbox-vector-tile for MVT encoding and pmtiles for the PMTiles container. Optionally delegates to tippecanoe subprocess if available on PATH for better simplification.

Usage::

from earthforge.vector.tile import generate_vector_tiles

result = await generate_vector_tiles("buildings.parquet", "buildings.pmtiles")

VectorTileResult

Bases: BaseModel

Result of vector tile generation.

Attributes:

Name Type Description
source str

Input file path.

output str

Output file path.

output_format str

Output format (PMTiles, MBTiles).

feature_count int

Number of input features.

method str

Generation method used (tippecanoe or builtin).

file_size_bytes int

Output file size.

zoom_range str

Min and max zoom levels.

generate_vector_tiles(source, output, *, min_zoom=0, max_zoom=14, layer_name=None) async

Generate vector tiles from a vector file.

Parameters:

Name Type Description Default
source str

Path to a GeoParquet, GeoJSON, or other vector file.

required
output str

Output path (use .pmtiles or .mbtiles suffix).

required
min_zoom int

Minimum zoom level (default: 0).

0
max_zoom int

Maximum zoom level (default: 14).

14
layer_name str | None

Layer name in the vector tiles. Defaults to input stem.

None

Returns:

Name Type Description
A VectorTileResult

class:VectorTileResult with generation summary.

Raises:

Type Description
VectorError

If generation fails.


earthforge.cube

info

earthforge.cube.info

Metadata inspection for Zarr and NetCDF datacubes.

Opens the store lazily via xarray.open_zarr or xarray.open_dataset (with the h5netcdf engine for NetCDF). No data arrays are loaded into memory — only the coordinate arrays and top-level attributes are read, which for consolidated Zarr stores requires a single HTTP request for the .zmetadata file.

For remote Zarr stores the caller should pass an S3/GCS/Azure URL understood by fsspec, e.g. s3://era5-pds/zarr/. For local paths, a filesystem path is accepted as-is.

Usage::

from earthforge.cube.info import inspect_cube

info = await inspect_cube("s3://era5-pds/zarr/1979/01/data/eastward_wind.zarr")
print(f"Variables: {info.variables}")
print(f"Dimensions: {info.dimensions}")

DimensionInfo

Bases: BaseModel

Metadata for a single datacube dimension.

Attributes:

Name Type Description
name str

Dimension name (e.g. "time", "latitude").

size int

Number of coordinate values along this dimension.

dtype str

NumPy dtype string (e.g. "float64", "datetime64[ns]").

min_value str | None

Minimum coordinate value as a string, if numeric.

max_value str | None

Maximum coordinate value as a string, if numeric.

units str | None

CF-convention units attribute, if present.

VariableInfo

Bases: BaseModel

Metadata for a single datacube variable.

Attributes:

Name Type Description
name str

Variable name.

dims list[str]

Dimension names this variable spans.

dtype str

NumPy dtype string.

shape list[int]

Shape tuple as a list of ints.

chunks list[int] | None

Chunk shape as a list of ints, if chunked.

units str | None

CF-convention units attribute, if present.

long_name str | None

CF-convention long name, if present.

standard_name str | None

CF-convention standard name, if present.

fill_value str | None

Missing data fill value as a string, if set.

CubeInfo

Bases: BaseModel

Structured metadata for a Zarr or NetCDF datacube.

Attributes:

Name Type Description
source str

The store path or URL that was inspected.

format str

Detected format ("zarr" or "netcdf").

dimensions list[DimensionInfo]

Ordered list of dimension metadata.

variables list[VariableInfo]

Data variables (excludes coordinate variables).

global_attrs dict[str, Any]

Top-level dataset attributes (e.g. CF conventions, title, history).

crs str | None

CRS string extracted from crs_wkt or grid_mapping attributes, if available.

spatial_bbox list[float] | None

Bounding box [west, south, east, north] derived from longitude/latitude coordinate extents, if available.

time_range list[str] | None

[start, end] derived from the time coordinate extents as ISO 8601 strings, if available.

inspect_cube(source) async

Inspect a Zarr or NetCDF datacube and return structured metadata.

Opens the store lazily — no data arrays are loaded into memory. For consolidated Zarr stores (which include a .zmetadata file), this requires only a single metadata request regardless of how many variables the store contains.

Runs the synchronous xarray/zarr calls in a thread executor to avoid blocking the event loop.

Parameters:

Name Type Description Default
source str

Zarr store path/URL or NetCDF file path. Remote URLs (s3://, gs://, az://) are passed directly to xarray.open_zarr which delegates to fsspec.

required

Returns:

Type Description
CubeInfo

class:CubeInfo with dimensions, variables, spatial extent,

CubeInfo

time range, and global attributes.

Raises:

Type Description
CubeError

If the store cannot be opened or format is unrecognized.

slice

earthforge.cube.slice

Spatiotemporal slicing for Zarr and NetCDF datacubes.

Applies bounding box and time range filters to an open xarray Dataset using label-based indexing (sel) and coordinate-based selection (where). The slice operation is lazy until .load() is called — only the required chunks are fetched from the remote store.

Sliced data can be written to a local Zarr store or NetCDF file.

Usage::

from earthforge.cube.slice import slice_cube

result = await slice_cube(
    source="s3://era5-pds/zarr/",
    variables=["t2m", "u10"],
    bbox=(-85.0, 37.0, -84.0, 38.0),
    time_range="2025-06-01/2025-06-30",
    output="./data/era5_ky_june2025.zarr",
)
print(f"Slice size: {result.output_size_bytes:,} bytes")

SliceResult

Bases: BaseModel

Structured result for a datacube slice operation.

Attributes:

Name Type Description
source str

Input store path or URL.

output str

Path to the written output file or store.

output_format str

Format of the output ("zarr" or "netcdf").

variables_selected list[str]

Variable names included in the slice.

bbox list[float] | None

Spatial bounding box applied, if any.

time_range list[str] | None

Time range applied as [start, end] ISO strings, if any.

shape dict[str, int]

Shape of the output Dataset as {dim: size} mapping.

output_size_bytes int

Size of the output file/directory in bytes.

elapsed_seconds float

Wall-clock time for the operation.

slice_cube(source, *, variables=None, bbox=None, time_range=None, output) async

Slice a Zarr or NetCDF datacube by variables, bbox, and time.

The operation is lazy until the subset is written: xarray defers all data transfer until the .load() call, so only the chunks that intersect the requested slice are fetched from the remote store.

Parameters:

Name Type Description Default
source str

Zarr store path/URL or NetCDF file path.

required
variables list[str] | None

Variable names to include. If None, all data variables are included.

None
bbox tuple[float, float, float, float] | None

Spatial filter as (west, south, east, north) in the coordinate system of the Dataset (usually degrees for global products). If None, no spatial filter is applied.

None
time_range str | None

ISO 8601 date range string (YYYY-MM-DD/YYYY-MM-DD or YYYY-MM/YYYY-MM). If None, no time filter is applied.

None
output str

Output path. Written as Zarr if the path ends in .zarr; otherwise written as NetCDF4 via h5netcdf.

required

Returns:

Type Description
SliceResult

class:SliceResult with output path, shape, size, and timing.

Raises:

Type Description
CubeError

If the store cannot be opened, variables are missing, filter coordinates are absent, or writing fails.

validate

earthforge.cube.validate

Zarr and NetCDF datacube structure validation.

Validates datacube files against common conventions and best practices:

  • Chunk structure (present and reasonable sizes)
  • CF-convention compliance (units, standard_name, long_name)
  • CRS presence (grid_mapping or crs_wkt attribute)
  • Coordinate arrays (time, latitude/longitude or x/y)
  • Dimension completeness

Usage::

from earthforge.cube.validate import validate_cube

result = await validate_cube("era5.zarr")

CubeValidationCheck

Bases: BaseModel

Result of a single validation check.

Attributes:

Name Type Description
check str

Name of the validation check.

status str

Pass/fail/warn status with text marker.

message str

Human-readable detail.

CubeValidationResult

Bases: BaseModel

Aggregate result of validating a datacube.

Attributes:

Name Type Description
source str

Path or URL that was validated.

format str

Detected format (zarr or netcdf).

is_valid bool

Overall pass/fail.

dimensions list[str]

List of dimension names.

variables list[str]

List of data variable names.

checks list[CubeValidationCheck]

Individual check results.

summary str

Human-readable one-line summary.

validate_cube(source) async

Validate a Zarr store or NetCDF file for datacube compliance.

Parameters:

Name Type Description Default
source str

Path or URL to a Zarr store or NetCDF file.

required

Returns:

Name Type Description
A CubeValidationResult

class:CubeValidationResult with detailed check results.

Raises:

Type Description
CubeError

If the file cannot be opened.

convert

earthforge.cube.convert

Datacube format conversion — NetCDF to Zarr and vice versa.

Supports rechunking during conversion via xarray's built-in chunking.

Usage::

from earthforge.cube.convert import convert_cube

result = await convert_cube("data.nc", "data.zarr")

CubeConvertResult

Bases: BaseModel

Result of a datacube conversion.

Attributes:

Name Type Description
source str

Input path.

output str

Output path.

source_format str

Input format.

output_format str

Output format.

variables list[str]

Variables in the dataset.

dimensions list[str]

Dimensions in the dataset.

chunks dict[str, int] | None

Chunk sizes if rechunked.

convert_cube(source, output, *, chunks=None) async

Convert between NetCDF and Zarr formats.

Parameters:

Name Type Description Default
source str

Path to input file (NetCDF or Zarr).

required
output str

Path for output (use .zarr suffix for Zarr, .nc for NetCDF).

required
chunks dict[str, int] | None

Optional rechunking spec (dimension name -> chunk size).

None

Returns:

Name Type Description
A CubeConvertResult

class:CubeConvertResult with conversion details.

Raises:

Type Description
CubeError

If the conversion fails.

stats

earthforge.cube.stats

Datacube aggregate statistics along dimensions.

Computes summary statistics (mean, min, max, std, sum) over specified dimensions using xarray's built-in reduction operations.

Usage::

from earthforge.cube.stats import cube_stats

result = await cube_stats("era5.zarr", variable="temperature", reduce_dims=["time"])

CubeStatsResult

Bases: BaseModel

Result of datacube statistics computation.

Attributes:

Name Type Description
source str

Input path.

variable str

Variable that statistics were computed for.

reduce_dims list[str]

Dimensions that were reduced.

remaining_dims list[str]

Dimensions remaining after reduction.

operation str

Statistical operation applied.

min float

Global minimum of the result.

max float

Global maximum of the result.

mean float

Global mean of the result.

output str | None

Output file path if saved.

cube_stats(source, variable, *, reduce_dims=None, operation='mean', output=None) async

Compute aggregate statistics over datacube dimensions.

Parameters:

Name Type Description Default
source str

Path to a Zarr store or NetCDF file.

required
variable str

Name of the data variable to compute stats for.

required
reduce_dims list[str] | None

Dimensions to reduce over. Default: all dimensions.

None
operation str

One of "mean", "min", "max", "std", "sum".

'mean'
output str | None

Optional output path to save the reduced dataset.

None

Returns:

Name Type Description
A CubeStatsResult

class:CubeStatsResult with computed statistics.

Raises:

Type Description
CubeError

If the computation fails.


earthforge.pipeline

schema

earthforge.pipeline.schema

JSON Schema definition and validation for EarthForge pipeline documents.

The schema validates the top-level pipeline key and all nested structures. Validation uses jsonschema for compatibility with the standard ecosystem (unlike Pydantic, jsonschema can validate dict structures loaded from YAML without a full model hierarchy).

The pipeline document structure::

pipeline:
  name: <str>               # Human-readable name
  description: <str>        # Optional description
  output_dir: <str>         # Root output directory (default: ./output)
  parallel: <int>           # Max concurrent item workers (default: 4)
  source:
    stac_search:
      api: <str>            # STAC API URL
      collection: <str>     # Collection ID
      bbox: [W, S, E, N]    # Optional spatial filter
      datetime: <str>       # Optional datetime range
      query: <dict>         # Optional CQL2 filter
      limit: <int>          # Max items (default: 10)
  steps:
    - for_each_item:        # Per-item concurrent step list
        - <step_name>:
            <step_params>
    - <step_name>:          # Top-level (non-per-item) steps
        <step_params>

validate_pipeline_doc(doc)

Validate a parsed pipeline YAML document against the pipeline schema.

Parameters:

Name Type Description Default
doc dict[str, Any]

Parsed YAML as a Python dict.

required

Raises:

Type Description
PipelineValidationError

If the document does not conform to the schema.

steps

earthforge.pipeline.steps

EarthForge pipeline step registry and built-in step implementations.

Each step is an async callable with the signature::

async def step_fn(ctx: StepContext) -> StepResult

Steps are registered by name (e.g. "raster.convert", "raster.calc") and looked up by the pipeline runner when processing for_each_item blocks.

The StepContext carries the current STAC item, its downloaded asset paths, the pipeline-level output directory, and the step's own parameters dict.

Built-in steps

stac.fetch Download STAC item assets. assets param selects which keys to download; default downloads all non-thumbnail data assets.

raster.calc Evaluate a band math expression over GeoTIFF bands loaded via rasterio. The expression is parsed as a safe arithmetic AST — no eval or exec is used (per CLAUDE.md guardrail).

raster.convert Convert a GeoTIFF to COG using the GDAL COG driver via earthforge.raster.convert.

vector.convert Convert a vector file to GeoParquet.

StepContext dataclass

Runtime context passed to each step during execution.

Attributes:

Name Type Description
item_id str

STAC item ID being processed.

item_url str

URL of the STAC item JSON.

asset_paths dict[str, str]

Mapping of asset key → local file path (populated by stac.fetch steps that run before other steps).

output_dir Path

Per-item output directory (<pipeline.output_dir>/<item_id>).

params dict[str, Any]

Step-specific parameter dict from the pipeline YAML.

profile str

EarthForge profile name (from the pipeline or global config).

StepResult dataclass

Result from a single step execution.

Attributes:

Name Type Description
step_name str

Registered name of the step (e.g. "raster.calc").

item_id str

STAC item ID that was processed.

outputs dict[str, str]

Mapping of output key → file path (for downstream steps).

elapsed_seconds float

Wall-clock time for this step.

skipped bool

True if the step was skipped (e.g. output already exists).

message str

Human-readable summary of what the step did.

register_step(name)

Decorator to register an async step function under name.

Parameters:

Name Type Description Default
name str

Step name as it appears in the pipeline YAML (e.g. "raster.convert").

required

Returns:

Type Description
Callable[..., Any]

The original function, unchanged.

get_step(name)

Look up a registered step by name.

Parameters:

Name Type Description Default
name str

Step name.

required

Returns:

Type Description
Callable[..., Any]

The registered async step callable.

Raises:

Type Description
KeyError

If the name is not registered.

list_steps()

Return a sorted list of all registered step names and their docstrings.

Returns:

Type Description
list[dict[str, str]]

List of dicts with name and description keys.

step_stac_fetch(ctx) async

Download STAC item assets to the output directory.

Parameters (ctx.params): assets: List of asset keys to download. Default: all data assets. parallel: Max concurrent downloads. Default: 4.

step_raster_calc(ctx) async

Evaluate a band math expression over GeoTIFF bands.

Parameters (ctx.params): expression: Band math expression (e.g. "(B08 - B04) / (B08 + B04)"). Variable names must match asset keys in ctx.asset_paths. output: Output filename template. {item_id} is replaced with the STAC item ID. dtype: Output dtype (default: float32).

step_raster_convert(ctx) async

Convert a raster to COG or another format.

Parameters (ctx.params): format: Target format — "COG" (default) or "GeoTIFF". compression: COG compression — "deflate" (default), "lzw", "zstd". input: Asset key to convert (default: "result" from prior step, or the first asset path if only one exists). output: Output filename template. {item_id} is replaced.

step_vector_convert(ctx) async

Convert a vector file to GeoParquet.

Parameters (ctx.params): input: Asset key to convert. output: Output filename template. {item_id} is replaced.

runner

earthforge.pipeline.runner

EarthForge pipeline runner — YAML pipeline execution engine.

Loads a pipeline YAML document, validates it against the schema, fetches the source STAC items, and executes the step graph. for_each_item blocks run concurrently across items using asyncio.TaskGroup bounded by a semaphore.

Usage::

from earthforge.pipeline.runner import run_pipeline, load_pipeline

doc = load_pipeline("pipeline.yaml")
result = await run_pipeline(doc)
print(f"Processed {result.items_succeeded}/{result.items_total} items")

ItemResult

Bases: BaseModel

Execution result for a single STAC item.

Attributes:

Name Type Description
item_id str

STAC item ID.

item_url str

URL the item was fetched from.

succeeded bool

True if all steps completed without error.

steps list[dict[str, Any]]

Per-step results.

error str | None

Error message if the item failed, otherwise None.

elapsed_seconds float

Wall-clock time for all steps on this item.

PipelineRunResult

Bases: BaseModel

Structured result for a complete pipeline run.

Attributes:

Name Type Description
pipeline_name str

Name field from the pipeline YAML.

items_total int

Total number of source items.

items_succeeded int

Items that completed all steps without error.

items_failed int

Items that encountered at least one step error.

item_results list[ItemResult]

Per-item detailed results.

elapsed_seconds float

Total wall-clock time for the pipeline run.

load_pipeline(path)

Load and parse a pipeline YAML file.

Parameters:

Name Type Description Default
path str

Filesystem path to the pipeline YAML file.

required

Returns:

Type Description
dict[str, Any]

Parsed pipeline document as a Python dict.

Raises:

Type Description
PipelineError

If the file cannot be read.

PipelineValidationError

If the YAML is invalid or malformed.

run_pipeline(doc, *, output_dir=None, profile='default', dry_run=False) async

Execute a validated pipeline document.

Fetches source STAC items, then runs the step graph concurrently across items using asyncio.TaskGroup bounded by the parallel setting.

Parameters:

Name Type Description Default
doc dict[str, Any]

Parsed and validated pipeline document (from :func:load_pipeline).

required
output_dir str | None

Override the pipeline's output_dir setting.

None
profile str

EarthForge profile name for STAC and storage access.

'default'
dry_run bool

If True, validate and plan the pipeline without executing steps.

False

Returns:

Type Description
PipelineRunResult

class:PipelineRunResult with per-item results and summary statistics.

Raises:

Type Description
PipelineValidationError

If the document fails schema validation.

PipelineError

If the source fetch fails or no items are found.