Skip to content

raster

occulus.raster

DEM / DSM / DTM rasterization and export.

Create regular-grid elevation models from point clouds and export them as GeoTIFF files.

Functions:

Name Description
- :func:`create_dsm` — Digital Surface Model
- :func:`create_dtm` — Digital Terrain Model
- :func:`create_dem` — Alias for :func:`create_dtm`
- :func:`idw_interpolate` — Inverse Distance Weighting interpolation
- :func:`nearest_interpolate` — Nearest-neighbour interpolation
- :func:`export_geotiff` — Write raster to GeoTIFF
Types
  • :class:RasterResult — Container for rasterized elevation data

RasterResult dataclass

Result container for rasterized elevation models.

Attributes:

Name Type Description
data NDArray[float64]

Elevation grid of shape (ny, nx).

x_edges NDArray[float64]

X bin edges of length nx + 1.

y_edges NDArray[float64]

Y bin edges of length ny + 1.

resolution float

Grid cell size in coordinate units.

crs str

Coordinate reference system identifier (EPSG code or WKT), or empty string if unknown.

nodata float

Value used for cells with no data. Default -9999.0.

Source code in src/occulus/raster/dem.py
@dataclass
class RasterResult:
    """Result container for rasterized elevation models.

    Attributes
    ----------
    data : NDArray[np.float64]
        Elevation grid of shape (ny, nx).
    x_edges : NDArray[np.float64]
        X bin edges of length nx + 1.
    y_edges : NDArray[np.float64]
        Y bin edges of length ny + 1.
    resolution : float
        Grid cell size in coordinate units.
    crs : str
        Coordinate reference system identifier (EPSG code or WKT), or
        empty string if unknown.
    nodata : float
        Value used for cells with no data. Default -9999.0.
    """

    data: NDArray[np.float64]
    x_edges: NDArray[np.float64]
    y_edges: NDArray[np.float64]
    resolution: float
    crs: str
    nodata: float = -9999.0

create_dem(cloud, resolution=1.0, *, method='idw', ground_class=2, nodata=-9999.0)

Create a Digital Elevation Model (alias for :func:create_dtm).

This function is a convenience alias. In common usage, DEM and DTM are used interchangeably to mean the bare-earth surface. See :func:create_dtm for full documentation.

Parameters:

Name Type Description Default
cloud PointCloud

Input point cloud with ground classification.

required
resolution float

Grid cell size in coordinate units, by default 1.0.

1.0
method str

Interpolation method: "idw" or "nearest", by default "idw".

'idw'
ground_class int

ASPRS classification code for ground, by default 2.

2
nodata float

Nodata value, by default -9999.0.

-9999.0

Returns:

Type Description
RasterResult

DTM elevation grid with metadata.

Raises:

Type Description
OcculusRasterError

See :func:create_dtm.

Source code in src/occulus/raster/dem.py
def create_dem(
    cloud: PointCloud,
    resolution: float = 1.0,
    *,
    method: str = "idw",
    ground_class: int = 2,
    nodata: float = -9999.0,
) -> RasterResult:
    """Create a Digital Elevation Model (alias for :func:`create_dtm`).

    This function is a convenience alias. In common usage, DEM and DTM are
    used interchangeably to mean the bare-earth surface. See
    :func:`create_dtm` for full documentation.

    Parameters
    ----------
    cloud : PointCloud
        Input point cloud with ground classification.
    resolution : float, optional
        Grid cell size in coordinate units, by default 1.0.
    method : str, optional
        Interpolation method: ``"idw"`` or ``"nearest"``, by default ``"idw"``.
    ground_class : int, optional
        ASPRS classification code for ground, by default 2.
    nodata : float, optional
        Nodata value, by default -9999.0.

    Returns
    -------
    RasterResult
        DTM elevation grid with metadata.

    Raises
    ------
    OcculusRasterError
        See :func:`create_dtm`.
    """
    return create_dtm(
        cloud,
        resolution,
        method=method,
        ground_class=ground_class,
        nodata=nodata,
    )

create_dsm(cloud, resolution=1.0, *, method='idw', nodata=-9999.0)

Create a Digital Surface Model (max-Z of all points).

The DSM represents the highest surface at each grid cell, including buildings, vegetation, and other above-ground features.

For cells that contain at least one point, the maximum Z value is used directly. Empty cells are filled using the specified interpolation method.

Parameters:

Name Type Description Default
cloud PointCloud

Input point cloud.

required
resolution float

Grid cell size in coordinate units, by default 1.0.

1.0
method str

Interpolation method for gap-filling: "idw" or "nearest", by default "idw".

'idw'
nodata float

Value assigned to cells with insufficient data, by default -9999.0.

-9999.0

Returns:

Type Description
RasterResult

DSM elevation grid with metadata.

Raises:

Type Description
OcculusRasterError

If the cloud is empty, resolution is non-positive, or interpolation method is unknown.

Source code in src/occulus/raster/dem.py
def create_dsm(
    cloud: PointCloud,
    resolution: float = 1.0,
    *,
    method: str = "idw",
    nodata: float = -9999.0,
) -> RasterResult:
    """Create a Digital Surface Model (max-Z of all points).

    The DSM represents the highest surface at each grid cell, including
    buildings, vegetation, and other above-ground features.

    For cells that contain at least one point, the maximum Z value is used
    directly. Empty cells are filled using the specified interpolation method.

    Parameters
    ----------
    cloud : PointCloud
        Input point cloud.
    resolution : float, optional
        Grid cell size in coordinate units, by default 1.0.
    method : str, optional
        Interpolation method for gap-filling: ``"idw"`` or ``"nearest"``,
        by default ``"idw"``.
    nodata : float, optional
        Value assigned to cells with insufficient data, by default -9999.0.

    Returns
    -------
    RasterResult
        DSM elevation grid with metadata.

    Raises
    ------
    OcculusRasterError
        If the cloud is empty, resolution is non-positive, or interpolation
        method is unknown.
    """
    _validate_inputs(cloud, resolution, method)

    x_edges, y_edges = _build_grid_edges(cloud, resolution)
    xy = cloud.xyz[:, :2]
    z = cloud.xyz[:, 2]

    # Direct binning for max-Z
    grid = _bin_max_z(xy, z, x_edges, y_edges, nodata)

    # Interpolate empty cells
    empty_mask = grid == nodata
    if empty_mask.any() and (~empty_mask).any():
        grid_cx = _grid_centres(x_edges)
        grid_cy = _grid_centres(y_edges)
        filled = _interpolate_grid(xy, z, grid_cx, grid_cy, method, nodata)

        # Only fill cells that were empty
        grid[empty_mask] = filled[empty_mask]

    crs = cloud.metadata.coordinate_system

    logger.info(
        "create_dsm: %dx%d grid (res=%.2f), z_range=[%.2f, %.2f]",
        grid.shape[1],
        grid.shape[0],
        resolution,
        float(np.min(grid[grid != nodata])) if (grid != nodata).any() else 0.0,
        float(np.max(grid[grid != nodata])) if (grid != nodata).any() else 0.0,
    )

    return RasterResult(
        data=grid,
        x_edges=x_edges,
        y_edges=y_edges,
        resolution=resolution,
        crs=crs,
        nodata=nodata,
    )

create_dtm(cloud, resolution=1.0, *, method='idw', ground_class=2, nodata=-9999.0)

Create a Digital Terrain Model from ground-classified points.

The DTM represents the bare-earth surface using only points classified as ground (ASPRS class 2 by default). Empty cells are filled using the specified interpolation method.

Parameters:

Name Type Description Default
cloud PointCloud

Input point cloud. Must have a classification array with ground points matching ground_class.

required
resolution float

Grid cell size in coordinate units, by default 1.0.

1.0
method str

Interpolation method for gap-filling: "idw" or "nearest", by default "idw".

'idw'
ground_class int

ASPRS classification code for ground points, by default 2.

2
nodata float

Value assigned to cells with no data, by default -9999.0.

-9999.0

Returns:

Type Description
RasterResult

DTM elevation grid with metadata.

Raises:

Type Description
OcculusRasterError

If the cloud is empty, has no classification, contains no ground points, resolution is non-positive, or method is unknown.

Source code in src/occulus/raster/dem.py
def create_dtm(
    cloud: PointCloud,
    resolution: float = 1.0,
    *,
    method: str = "idw",
    ground_class: int = 2,
    nodata: float = -9999.0,
) -> RasterResult:
    """Create a Digital Terrain Model from ground-classified points.

    The DTM represents the bare-earth surface using only points classified
    as ground (ASPRS class 2 by default). Empty cells are filled using the
    specified interpolation method.

    Parameters
    ----------
    cloud : PointCloud
        Input point cloud. Must have a classification array with ground
        points matching ``ground_class``.
    resolution : float, optional
        Grid cell size in coordinate units, by default 1.0.
    method : str, optional
        Interpolation method for gap-filling: ``"idw"`` or ``"nearest"``,
        by default ``"idw"``.
    ground_class : int, optional
        ASPRS classification code for ground points, by default 2.
    nodata : float, optional
        Value assigned to cells with no data, by default -9999.0.

    Returns
    -------
    RasterResult
        DTM elevation grid with metadata.

    Raises
    ------
    OcculusRasterError
        If the cloud is empty, has no classification, contains no ground
        points, resolution is non-positive, or method is unknown.
    """
    _validate_inputs(cloud, resolution, method)

    if cloud.classification is None:
        raise OcculusRasterError(
            "DTM requires a classification array. "
            "Run classify_ground_csf() or classify_ground_pmf() first."
        )

    ground_mask = cloud.classification == ground_class
    if not ground_mask.any():
        raise OcculusRasterError(
            f"No ground points found (class {ground_class}). Run ground classification first."
        )

    ground_xyz = cloud.xyz[ground_mask]
    xy = ground_xyz[:, :2]
    z = ground_xyz[:, 2]

    # Build grid over the full cloud extent, not just ground points
    x_edges, y_edges = _build_grid_edges(cloud, resolution)
    grid_cx = _grid_centres(x_edges)
    grid_cy = _grid_centres(y_edges)

    # Interpolate ground surface
    grid = _interpolate_grid(xy, z, grid_cx, grid_cy, method, nodata)

    crs = cloud.metadata.coordinate_system

    logger.info(
        "create_dtm: %dx%d grid (res=%.2f), %d ground pts, z_range=[%.2f, %.2f]",
        grid.shape[1],
        grid.shape[0],
        resolution,
        int(ground_mask.sum()),
        float(np.min(grid[grid != nodata])) if (grid != nodata).any() else 0.0,
        float(np.max(grid[grid != nodata])) if (grid != nodata).any() else 0.0,
    )

    return RasterResult(
        data=grid,
        x_edges=x_edges,
        y_edges=y_edges,
        resolution=resolution,
        crs=crs,
        nodata=nodata,
    )

export_geotiff(raster, path, *, crs='')

Write a RasterResult to a GeoTIFF file.

Uses rasterio to produce a single-band Float64 GeoTIFF with the appropriate affine transform derived from the raster's edge coordinates and resolution. The CRS can be overridden via the crs parameter; otherwise the raster's own CRS is used.

Parameters:

Name Type Description Default
raster RasterResult

Raster data to export.

required
path str or Path

Output file path. Parent directory must exist.

required
crs str

CRS override as an EPSG string (e.g. "EPSG:4326") or WKT. If empty, uses raster.crs. If both are empty, no CRS is written to the file.

''

Returns:

Type Description
Path

Absolute path to the written GeoTIFF file.

Raises:

Type Description
OcculusRasterError

If the raster data is invalid.

OcculusExportError

If the file cannot be written (I/O error, rasterio failure).

ImportError

If rasterio is not installed.

Source code in src/occulus/raster/export.py
def export_geotiff(
    raster: RasterResult,
    path: str | Path,
    *,
    crs: str = "",
) -> Path:
    """Write a RasterResult to a GeoTIFF file.

    Uses ``rasterio`` to produce a single-band Float64 GeoTIFF with the
    appropriate affine transform derived from the raster's edge coordinates
    and resolution. The CRS can be overridden via the ``crs`` parameter;
    otherwise the raster's own CRS is used.

    Parameters
    ----------
    raster : RasterResult
        Raster data to export.
    path : str or Path
        Output file path. Parent directory must exist.
    crs : str, optional
        CRS override as an EPSG string (e.g. ``"EPSG:4326"``) or WKT.
        If empty, uses ``raster.crs``. If both are empty, no CRS is
        written to the file.

    Returns
    -------
    Path
        Absolute path to the written GeoTIFF file.

    Raises
    ------
    OcculusRasterError
        If the raster data is invalid.
    OcculusExportError
        If the file cannot be written (I/O error, rasterio failure).
    ImportError
        If ``rasterio`` is not installed.
    """
    try:
        import rasterio  # type: ignore[import-not-found]
        from rasterio.transform import from_bounds  # type: ignore[import-not-found]
    except ImportError as exc:
        raise ImportError("rasterio is required for GeoTIFF export: pip install rasterio") from exc

    out_path = Path(path).resolve()

    if raster.data.ndim != 2:
        raise OcculusRasterError(f"Raster data must be 2D, got {raster.data.ndim}D")

    ny, nx = raster.data.shape
    if nx == 0 or ny == 0:
        raise OcculusRasterError("Cannot export an empty raster (zero cells)")

    # Determine CRS to use
    effective_crs = crs or raster.crs or None

    # Compute affine transform from edge coordinates
    x_min = float(raster.x_edges[0])
    x_max = float(raster.x_edges[-1])
    y_min = float(raster.y_edges[0])
    y_max = float(raster.y_edges[-1])

    transform = from_bounds(x_min, y_min, x_max, y_max, nx, ny)

    try:
        if not out_path.parent.exists():
            raise OcculusExportError(f"Parent directory does not exist: {out_path.parent}")

        profile = {
            "driver": "GTiff",
            "dtype": "float64",
            "width": nx,
            "height": ny,
            "count": 1,
            "nodata": raster.nodata,
            "transform": transform,
        }
        if effective_crs:
            profile["crs"] = effective_crs

        with rasterio.open(out_path, "w", **profile) as dst:
            # Rasterio expects (row 0 = north), so flip if y_edges are ascending
            if raster.y_edges[0] < raster.y_edges[-1]:
                dst.write(np.flipud(raster.data).astype(np.float64), 1)
            else:
                dst.write(raster.data.astype(np.float64), 1)

    except OcculusExportError:
        raise
    except Exception as exc:
        raise OcculusExportError(f"Failed to write GeoTIFF to {out_path}: {exc}") from exc

    logger.info(
        "export_geotiff: wrote %dx%d raster to %s (crs=%s)",
        nx,
        ny,
        out_path,
        effective_crs or "none",
    )
    return out_path

idw_interpolate(xy, z, grid_x, grid_y, *, power=2.0, max_dist=None, k=12, nodata=-9999.0)

Inverse Distance Weighting interpolation onto a regular grid.

For each grid cell centre, the k nearest source points are found via KDTree and their Z values are combined using inverse-distance weights raised to power.

Parameters:

Name Type Description Default
xy NDArray[float64]

Source point XY coordinates, shape (N, 2).

required
z NDArray[float64]

Source point Z values, shape (N,).

required
grid_x NDArray[float64]

Grid cell centre X coordinates, shape (nx,).

required
grid_y NDArray[float64]

Grid cell centre Y coordinates, shape (ny,).

required
power float

Distance weighting exponent, by default 2.0.

2.0
max_dist float or None

Maximum search distance. Grid cells with no neighbours within this radius are set to nodata. None disables the distance limit.

None
k int

Number of nearest neighbours to use, by default 12.

12
nodata float

Value assigned to cells with no data, by default -9999.0.

-9999.0

Returns:

Type Description
NDArray[float64]

Interpolated grid of shape (ny, nx).

Raises:

Type Description
OcculusRasterError

If inputs are invalid or interpolation fails.

Source code in src/occulus/raster/interpolation.py
def idw_interpolate(
    xy: NDArray[np.float64],
    z: NDArray[np.float64],
    grid_x: NDArray[np.float64],
    grid_y: NDArray[np.float64],
    *,
    power: float = 2.0,
    max_dist: float | None = None,
    k: int = 12,
    nodata: float = -9999.0,
) -> NDArray[np.float64]:
    """Inverse Distance Weighting interpolation onto a regular grid.

    For each grid cell centre, the ``k`` nearest source points are found via
    KDTree and their Z values are combined using inverse-distance weights
    raised to ``power``.

    Parameters
    ----------
    xy : NDArray[np.float64]
        Source point XY coordinates, shape (N, 2).
    z : NDArray[np.float64]
        Source point Z values, shape (N,).
    grid_x : NDArray[np.float64]
        Grid cell centre X coordinates, shape (nx,).
    grid_y : NDArray[np.float64]
        Grid cell centre Y coordinates, shape (ny,).
    power : float, optional
        Distance weighting exponent, by default 2.0.
    max_dist : float or None, optional
        Maximum search distance. Grid cells with no neighbours within this
        radius are set to ``nodata``. ``None`` disables the distance limit.
    k : int, optional
        Number of nearest neighbours to use, by default 12.
    nodata : float, optional
        Value assigned to cells with no data, by default -9999.0.

    Returns
    -------
    NDArray[np.float64]
        Interpolated grid of shape (ny, nx).

    Raises
    ------
    OcculusRasterError
        If inputs are invalid or interpolation fails.
    """
    try:
        from scipy.spatial import KDTree  # type: ignore[import-untyped]
    except ImportError as exc:
        raise ImportError("scipy is required for IDW interpolation: pip install scipy") from exc

    if xy.ndim != 2 or xy.shape[1] != 2:
        raise OcculusRasterError(f"xy must be (N, 2), got shape {xy.shape}")
    if z.ndim != 1 or z.shape[0] != xy.shape[0]:
        raise OcculusRasterError(
            f"z must be (N,) matching xy rows, got z.shape={z.shape}, xy.shape={xy.shape}"
        )
    if len(xy) == 0:
        raise OcculusRasterError("Cannot interpolate from zero source points")
    if power <= 0:
        raise OcculusRasterError(f"power must be positive, got {power}")

    k_actual = min(k, len(xy))

    # Build KDTree from source points
    tree = KDTree(xy)

    # Create meshgrid of cell centres
    gx, gy = np.meshgrid(grid_x, grid_y)
    query_pts = np.column_stack((gx.ravel(), gy.ravel()))

    # Query k nearest neighbours
    distances, indices = tree.query(query_pts, k=k_actual, workers=-1)

    # Ensure 2D arrays even when k_actual == 1
    if distances.ndim == 1:
        distances = distances[:, np.newaxis]
        indices = indices[:, np.newaxis]

    result = np.full(len(query_pts), nodata, dtype=np.float64)

    # Handle exact matches (distance == 0) — use the coincident point's Z
    exact_mask = distances[:, 0] == 0.0
    if exact_mask.any():
        result[exact_mask] = z[indices[exact_mask, 0]]

    # IDW for non-exact points
    interp_mask = ~exact_mask
    if interp_mask.any():
        d = distances[interp_mask]
        idx = indices[interp_mask]
        z_vals = z[idx]

        # Apply max_dist filter
        if max_dist is not None:
            valid = d <= max_dist
            # If no neighbours within max_dist, cell stays nodata
            has_valid = valid.any(axis=1)
            # For cells with some valid neighbours, zero out invalid ones
            d_masked = np.where(valid, d, np.inf)
        else:
            has_valid = np.ones(d.shape[0], dtype=bool)
            d_masked = d

        # Compute weights: 1 / d^power
        with np.errstate(divide="ignore"):
            weights = 1.0 / np.power(d_masked, power)
        weights = np.where(np.isinf(weights), 0.0, weights)

        w_sum = weights.sum(axis=1)
        safe = (w_sum > 0) & has_valid
        weighted_z = (weights * z_vals).sum(axis=1)

        interp_indices = np.where(interp_mask)[0]
        result[interp_indices[safe]] = weighted_z[safe] / w_sum[safe]

    grid = result.reshape(len(grid_y), len(grid_x))

    n_filled = int((grid != nodata).sum())
    logger.debug(
        "idw_interpolate: %d/%d cells filled (k=%d, power=%.1f)",
        n_filled,
        grid.size,
        k_actual,
        power,
    )
    return grid

nearest_interpolate(xy, z, grid_x, grid_y, *, max_dist=None, nodata=-9999.0)

Nearest-neighbour interpolation onto a regular grid.

Each grid cell centre is assigned the Z value of the closest source point.

Parameters:

Name Type Description Default
xy NDArray[float64]

Source point XY coordinates, shape (N, 2).

required
z NDArray[float64]

Source point Z values, shape (N,).

required
grid_x NDArray[float64]

Grid cell centre X coordinates, shape (nx,).

required
grid_y NDArray[float64]

Grid cell centre Y coordinates, shape (ny,).

required
max_dist float or None

Maximum distance to accept a neighbour. Cells with no point within this radius are set to nodata. None disables the limit.

None
nodata float

Value assigned to cells with no data, by default -9999.0.

-9999.0

Returns:

Type Description
NDArray[float64]

Interpolated grid of shape (ny, nx).

Raises:

Type Description
OcculusRasterError

If inputs are invalid or interpolation fails.

Source code in src/occulus/raster/interpolation.py
def nearest_interpolate(
    xy: NDArray[np.float64],
    z: NDArray[np.float64],
    grid_x: NDArray[np.float64],
    grid_y: NDArray[np.float64],
    *,
    max_dist: float | None = None,
    nodata: float = -9999.0,
) -> NDArray[np.float64]:
    """Nearest-neighbour interpolation onto a regular grid.

    Each grid cell centre is assigned the Z value of the closest source point.

    Parameters
    ----------
    xy : NDArray[np.float64]
        Source point XY coordinates, shape (N, 2).
    z : NDArray[np.float64]
        Source point Z values, shape (N,).
    grid_x : NDArray[np.float64]
        Grid cell centre X coordinates, shape (nx,).
    grid_y : NDArray[np.float64]
        Grid cell centre Y coordinates, shape (ny,).
    max_dist : float or None, optional
        Maximum distance to accept a neighbour. Cells with no point within
        this radius are set to ``nodata``. ``None`` disables the limit.
    nodata : float, optional
        Value assigned to cells with no data, by default -9999.0.

    Returns
    -------
    NDArray[np.float64]
        Interpolated grid of shape (ny, nx).

    Raises
    ------
    OcculusRasterError
        If inputs are invalid or interpolation fails.
    """
    try:
        from scipy.spatial import KDTree  # type: ignore[import-untyped]
    except ImportError as exc:
        raise ImportError(
            "scipy is required for nearest-neighbour interpolation: pip install scipy"
        ) from exc

    if xy.ndim != 2 or xy.shape[1] != 2:
        raise OcculusRasterError(f"xy must be (N, 2), got shape {xy.shape}")
    if z.ndim != 1 or z.shape[0] != xy.shape[0]:
        raise OcculusRasterError(
            f"z must be (N,) matching xy rows, got z.shape={z.shape}, xy.shape={xy.shape}"
        )
    if len(xy) == 0:
        raise OcculusRasterError("Cannot interpolate from zero source points")

    tree = KDTree(xy)

    gx, gy = np.meshgrid(grid_x, grid_y)
    query_pts = np.column_stack((gx.ravel(), gy.ravel()))

    distances, indices = tree.query(query_pts, k=1, workers=-1)

    result = z[indices].copy()

    if max_dist is not None:
        result[distances > max_dist] = nodata

    grid = result.reshape(len(grid_y), len(grid_x))

    n_filled = int((grid != nodata).sum())
    logger.debug(
        "nearest_interpolate: %d/%d cells filled",
        n_filled,
        grid.size,
    )
    return grid