Skip to content

segmentation

occulus.segmentation

Point cloud segmentation — ground classification and object extraction.

Primary interface::

from occulus.segmentation import (
    classify_ground_csf,
    classify_ground_pmf,
    cluster_dbscan,
    detect_powerlines,
    segment_trees,
    PowerlineResult,
    SegmentationResult,
)
Ground classification
  • :func:classify_ground_csf — Cloth Simulation Filter (aerial/UAV preferred)
  • :func:classify_ground_pmf — Progressive Morphological Filter
Object segmentation
  • :func:cluster_dbscan — density-based clustering (general purpose)
  • :func:segment_trees — CHM-based individual tree delineation (aerial/UAV)
Infrastructure detection
  • :func:detect_powerlines — wire and pylon extraction from classified clouds

PowerlineResult dataclass

Result of powerline detection.

Attributes:

Name Type Description
wire_mask NDArray[bool_]

Per-point boolean mask identifying wire points.

pylon_mask NDArray[bool_]

Per-point boolean mask identifying pylon/tower points.

wire_segments list[CatenarySegment]

List of detected wire segments with optional catenary fits.

pylon_positions NDArray[float64]

Centroid positions of detected pylons as (M, 3) array.

clearance_violations list[ClearanceViolation]

Locations where wires are below the minimum clearance threshold.

Source code in src/occulus/segmentation/powerlines.py
@dataclass
class PowerlineResult:
    """Result of powerline detection.

    Attributes
    ----------
    wire_mask : NDArray[np.bool_]
        Per-point boolean mask identifying wire points.
    pylon_mask : NDArray[np.bool_]
        Per-point boolean mask identifying pylon/tower points.
    wire_segments : list[CatenarySegment]
        List of detected wire segments with optional catenary fits.
    pylon_positions : NDArray[np.float64]
        Centroid positions of detected pylons as (M, 3) array.
    clearance_violations : list[ClearanceViolation]
        Locations where wires are below the minimum clearance threshold.
    """

    wire_mask: NDArray[np.bool_]
    pylon_mask: NDArray[np.bool_]
    wire_segments: list[CatenarySegment] = field(default_factory=list)
    pylon_positions: NDArray[np.float64] = field(
        default_factory=lambda: np.empty((0, 3), dtype=np.float64)
    )
    clearance_violations: list[ClearanceViolation] = field(default_factory=list)

SegmentationResult dataclass

Result of object segmentation.

Attributes:

Name Type Description
labels NDArray[int32]

Per-point segment labels. -1 = noise / unassigned.

n_segments int

Number of unique segments (excluding noise label -1).

segment_sizes dict[int, int]

Mapping of label → point count (noise label excluded).

Source code in src/occulus/segmentation/objects.py
@dataclass
class SegmentationResult:
    """Result of object segmentation.

    Attributes
    ----------
    labels : NDArray[np.int32]
        Per-point segment labels. ``-1`` = noise / unassigned.
    n_segments : int
        Number of unique segments (excluding noise label -1).
    segment_sizes : dict[int, int]
        Mapping of label → point count (noise label excluded).
    """

    labels: NDArray[np.int32]
    n_segments: int
    segment_sizes: dict[int, int] = field(default_factory=dict)

classify_ground_csf(cloud, *, cloth_resolution=None, rigidness=3, iterations=500, class_threshold=None)

Classify ground points using the Cloth Simulation Filter (CSF).

The cloud is inverted along Z, a regular cloth grid is draped over it under gravity, and the final cloth height is compared to the cloud to assign ground labels (ASPRS class 2).

Parameters:

Name Type Description Default
cloud PointCloud

Input point cloud. Must have at least 10 points.

required
cloth_resolution float | None

Spacing between cloth grid nodes in coordinate units. Smaller values produce finer results at higher computational cost. Defaults to a platform-aware value (2.0 for aerial, 0.5 for terrestrial, 1.5 for UAV).

None
rigidness int

Cloth rigidness: 1 (mountain), 2 (complex terrain), 3 (flat terrain), by default 3. Higher values pull the cloth tighter to the surface.

3
iterations int

Maximum cloth simulation iterations, by default 500.

500
class_threshold float | None

Maximum distance between a point and the cloth surface for the point to be classified as ground. Defaults to cloth_resolution / 2.

None

Returns:

Type Description
PointCloud

Copy of the input cloud with classification array set. Ground points have class 2; non-ground points retain their original classification (or 1 = unassigned if none was present).

Raises:

Type Description
OcculusSegmentationError

If the cloud has fewer than 10 points or the simulation produces no ground points.

Source code in src/occulus/segmentation/ground.py
def classify_ground_csf(
    cloud: PointCloud,
    *,
    cloth_resolution: float | None = None,
    rigidness: int = 3,
    iterations: int = 500,
    class_threshold: float | None = None,
) -> PointCloud:
    """Classify ground points using the Cloth Simulation Filter (CSF).

    The cloud is inverted along Z, a regular cloth grid is draped over it
    under gravity, and the final cloth height is compared to the cloud to
    assign ground labels (ASPRS class 2).

    Parameters
    ----------
    cloud : PointCloud
        Input point cloud. Must have at least 10 points.
    cloth_resolution : float | None, optional
        Spacing between cloth grid nodes in coordinate units. Smaller values
        produce finer results at higher computational cost. Defaults to a
        platform-aware value (2.0 for aerial, 0.5 for terrestrial, 1.5 for UAV).
    rigidness : int, optional
        Cloth rigidness: 1 (mountain), 2 (complex terrain), 3 (flat terrain),
        by default 3. Higher values pull the cloth tighter to the surface.
    iterations : int, optional
        Maximum cloth simulation iterations, by default 500.
    class_threshold : float | None, optional
        Maximum distance between a point and the cloth surface for the point
        to be classified as ground. Defaults to ``cloth_resolution / 2``.

    Returns
    -------
    PointCloud
        Copy of the input cloud with ``classification`` array set.
        Ground points have class 2; non-ground points retain their original
        classification (or 1 = unassigned if none was present).

    Raises
    ------
    OcculusSegmentationError
        If the cloud has fewer than 10 points or the simulation produces no
        ground points.
    """
    if cloud.n_points < 10:
        raise OcculusSegmentationError(f"CSF requires at least 10 points, got {cloud.n_points}")

    # Platform-aware defaults
    if cloth_resolution is None:
        cloth_resolution = _default_cloth_resolution(cloud.platform)
    if class_threshold is None:
        class_threshold = cloth_resolution / 2.0

    logger.debug(
        "classify_ground_csf: n=%d res=%.3f rigidness=%d iters=%d",
        cloud.n_points,
        cloth_resolution,
        rigidness,
        iterations,
    )

    xyz = cloud.xyz
    # --- Step 1: Invert Z (CSF works on upside-down cloud) ---
    z_max = xyz[:, 2].max()
    inv_xyz = xyz.copy()
    inv_xyz[:, 2] = z_max - xyz[:, 2]

    # --- Step 2: Build cloth grid ---
    x_min, y_min = inv_xyz[:, 0].min(), inv_xyz[:, 1].min()
    x_max, y_max = inv_xyz[:, 0].max(), inv_xyz[:, 1].max()

    nx = max(2, int(np.ceil((x_max - x_min) / cloth_resolution)) + 1)
    ny = max(2, int(np.ceil((y_max - y_min) / cloth_resolution)) + 1)

    xs = np.linspace(x_min, x_max, nx)
    ys = np.linspace(y_min, y_max, ny)
    grid_x, grid_y = np.meshgrid(xs, ys)  # (ny, nx)
    cloth_z = np.full_like(grid_x, fill_value=np.inf)  # start above all points

    # --- Step 3: Project cloud onto grid (lowest Z per cell) ---
    tree_2d = KDTree(inv_xyz[:, :2])
    cloth_pts = np.column_stack((grid_x.ravel(), grid_y.ravel()))
    _, nn_idx = tree_2d.query(cloth_pts, k=1, workers=-1)
    nn_idx = nn_idx.ravel()
    projected_z = inv_xyz[nn_idx, 2].reshape(ny, nx)
    cloth_z = projected_z.copy()

    # --- Step 4: Cloth simulation ---
    damping = {1: 0.75, 2: 0.5, 3: 0.3}.get(rigidness, 0.5)

    for _ in range(iterations):
        z_prev = cloth_z.copy()

        # Gravity: move cloth downward toward projected terrain
        gravity_force = projected_z - cloth_z
        cloth_z += gravity_force * 0.1

        # Internal spring forces (smooth cloth between neighbours)
        # Average of 4 cardinal neighbours
        z_padded = np.pad(cloth_z, 1, mode="edge")
        z_avg = (
            (
                z_padded[:-2, 1:-1]  # up
                + z_padded[2:, 1:-1]  # down
                + z_padded[1:-1, :-2]  # left
                + z_padded[1:-1, 2:]  # right
            )
            / 4.0
        )
        cloth_z += (z_avg - cloth_z) * (1.0 - damping)

        # Hard constraint: cloth cannot go below terrain
        cloth_z = np.maximum(cloth_z, projected_z)

        # Convergence check
        if np.max(np.abs(cloth_z - z_prev)) < 1e-4:
            break

    # --- Step 5: Classify points ---
    # For each point find nearest cloth node and compare Z distance
    cloth_xy = np.column_stack((grid_x.ravel(), grid_y.ravel()))
    cloth_z_flat = cloth_z.ravel()

    cloth_tree = KDTree(cloth_xy)
    _, cloth_nn = cloth_tree.query(inv_xyz[:, :2], k=1, workers=-1)
    cloth_nn = cloth_nn.ravel()

    # Distance in original (non-inverted) Z
    cloth_heights = cloth_z_flat[cloth_nn]  # cloth Z in inverted space
    # Convert cloth Z back to original space: cloth_Z_orig = z_max - cloth_Z_inv
    cloth_z_original = z_max - cloth_heights
    z_diff = np.abs(xyz[:, 2] - cloth_z_original)
    ground_mask = z_diff <= class_threshold

    n_ground = int(ground_mask.sum())
    if n_ground == 0:
        raise OcculusSegmentationError(
            "CSF produced no ground points. Try increasing cloth_resolution or class_threshold."
        )

    # Build output classification array
    base_class = (
        cloud.classification.copy()
        if cloud.classification is not None
        else np.ones(cloud.n_points, dtype=np.uint8)
    )
    classification = base_class.astype(np.uint8)
    classification[ground_mask] = 2

    logger.info(
        "classify_ground_csf: %d ground / %d non-ground points",
        n_ground,
        cloud.n_points - n_ground,
    )

    return _copy_with_classification(cloud, classification)

classify_ground_pmf(cloud, *, cell_size=1.0, slope=0.3, initial_distance=0.15, max_distance=2.5, max_window_size=20.0)

Classify ground points using the Progressive Morphological Filter (PMF).

Creates a series of morphological opening operations with progressively growing window sizes. Points that are too far above the morphologically opened surface are classified as non-ground.

Parameters:

Name Type Description Default
cloud PointCloud

Input point cloud. Must have at least 10 points.

required
cell_size float

Grid cell size for the terrain surface, by default 1.0.

1.0
slope float

Slope tolerance in metres per metre (e.g. 0.3 = 30% grade), by default 0.3.

0.3
initial_distance float

Initial maximum distance threshold in metres, by default 0.15.

0.15
max_distance float

Maximum allowed height difference above the filtered surface, by default 2.5.

2.5
max_window_size float

Maximum morphological window radius in metres, by default 20.0.

20.0

Returns:

Type Description
PointCloud

Copy of the input cloud with classification array set. Ground points have class 2.

Raises:

Type Description
OcculusSegmentationError

If the cloud has fewer than 10 points or no ground points are found.

Source code in src/occulus/segmentation/ground.py
def classify_ground_pmf(
    cloud: PointCloud,
    *,
    cell_size: float = 1.0,
    slope: float = 0.3,
    initial_distance: float = 0.15,
    max_distance: float = 2.5,
    max_window_size: float = 20.0,
) -> PointCloud:
    """Classify ground points using the Progressive Morphological Filter (PMF).

    Creates a series of morphological opening operations with progressively
    growing window sizes. Points that are too far above the morphologically
    opened surface are classified as non-ground.

    Parameters
    ----------
    cloud : PointCloud
        Input point cloud. Must have at least 10 points.
    cell_size : float, optional
        Grid cell size for the terrain surface, by default 1.0.
    slope : float, optional
        Slope tolerance in metres per metre (e.g. 0.3 = 30% grade), by default 0.3.
    initial_distance : float, optional
        Initial maximum distance threshold in metres, by default 0.15.
    max_distance : float, optional
        Maximum allowed height difference above the filtered surface,
        by default 2.5.
    max_window_size : float, optional
        Maximum morphological window radius in metres, by default 20.0.

    Returns
    -------
    PointCloud
        Copy of the input cloud with ``classification`` array set.
        Ground points have class 2.

    Raises
    ------
    OcculusSegmentationError
        If the cloud has fewer than 10 points or no ground points are found.
    """
    if cloud.n_points < 10:
        raise OcculusSegmentationError(f"PMF requires at least 10 points, got {cloud.n_points}")

    xyz = cloud.xyz

    # Build 2D minimum elevation grid
    x_min, y_min = xyz[:, 0].min(), xyz[:, 1].min()
    x_max, y_max = xyz[:, 0].max(), xyz[:, 1].max()

    nx = max(2, int(np.ceil((x_max - x_min) / cell_size)) + 1)
    ny = max(2, int(np.ceil((y_max - y_min) / cell_size)) + 1)

    # Assign each point to a grid cell; keep minimum Z per cell
    col_idx = np.clip(((xyz[:, 0] - x_min) / cell_size).astype(int), 0, nx - 1)
    row_idx = np.clip(((xyz[:, 1] - y_min) / cell_size).astype(int), 0, ny - 1)
    cell_flat = row_idx * nx + col_idx

    z_min_grid = np.full(ny * nx, np.inf)
    for i in range(cloud.n_points):
        z_min_grid[cell_flat[i]] = min(z_min_grid[cell_flat[i]], xyz[i, 2])

    # Replace inf cells with nearest neighbour Z
    valid = np.isfinite(z_min_grid)
    if valid.sum() < 3:
        raise OcculusSegmentationError("PMF: insufficient valid cells in grid")

    cells = np.column_stack(
        (
            np.arange(ny * nx) % nx,
            np.arange(ny * nx) // nx,
        )
    )
    tree_grid = KDTree(cells[valid])
    _, nn_grid = tree_grid.query(cells[~valid], k=1, workers=-1)
    z_min_grid[~valid] = z_min_grid[valid][nn_grid.ravel()]

    z_surface = z_min_grid.reshape(ny, nx)

    # Progressively apply morphological opening
    ground_mask = np.ones(cloud.n_points, dtype=bool)
    w = 1
    z_surface.copy()

    while w * cell_size <= max_window_size:
        # Morphological erosion then dilation (= opening) with window w
        from scipy.ndimage import maximum_filter, minimum_filter  # type: ignore[import-untyped]

        eroded = minimum_filter(z_surface, size=2 * w + 1)
        opened = maximum_filter(eroded, size=2 * w + 1)

        # Distance threshold grows with window size
        dh = initial_distance + slope * (w * cell_size)
        dh = min(dh, max_distance)

        # A point is non-ground if it exceeds opened surface by more than dh
        z_surface.ravel()[cell_flat]
        opened_pt = opened.ravel()[cell_flat]
        ground_mask &= (xyz[:, 2] - opened_pt) <= dh

        z_surface = opened
        w = min(w * 2, w + 1)  # exponential-ish growth

    n_ground = int(ground_mask.sum())
    if n_ground == 0:
        raise OcculusSegmentationError(
            "PMF produced no ground points. Try adjusting cell_size, slope, or max_distance."
        )

    base_class = (
        cloud.classification.copy()
        if cloud.classification is not None
        else np.ones(cloud.n_points, dtype=np.uint8)
    )
    classification = base_class.astype(np.uint8)
    classification[ground_mask] = 2

    logger.info(
        "classify_ground_pmf: %d ground / %d non-ground points",
        n_ground,
        cloud.n_points - n_ground,
    )

    return _copy_with_classification(cloud, classification)

cluster_dbscan(cloud, eps, min_samples=10, *, use_2d=False)

Cluster a point cloud using DBSCAN.

Density-Based Spatial Clustering of Applications with Noise (DBSCAN) groups points that are closely packed together and marks outliers as noise (label -1). No prior knowledge of the number of clusters is needed.

Parameters:

Name Type Description Default
cloud PointCloud

Input point cloud.

required
eps float

Neighbourhood radius: maximum distance between two points for them to be considered neighbours. In the same units as the cloud coordinates.

required
min_samples int

Minimum number of points in a neighbourhood for a core point, by default 10.

10
use_2d bool

If True, clustering is performed in the XY plane only (useful for separating vertical objects like trees above ground), by default False.

False

Returns:

Type Description
SegmentationResult

Per-point labels and cluster statistics.

Raises:

Type Description
OcculusSegmentationError

If eps or min_samples are invalid, or the cloud is empty.

Source code in src/occulus/segmentation/objects.py
def cluster_dbscan(
    cloud: PointCloud,
    eps: float,
    min_samples: int = 10,
    *,
    use_2d: bool = False,
) -> SegmentationResult:
    """Cluster a point cloud using DBSCAN.

    Density-Based Spatial Clustering of Applications with Noise (DBSCAN)
    groups points that are closely packed together and marks outliers as
    noise (label -1). No prior knowledge of the number of clusters is needed.

    Parameters
    ----------
    cloud : PointCloud
        Input point cloud.
    eps : float
        Neighbourhood radius: maximum distance between two points for them
        to be considered neighbours. In the same units as the cloud coordinates.
    min_samples : int, optional
        Minimum number of points in a neighbourhood for a core point,
        by default 10.
    use_2d : bool, optional
        If ``True``, clustering is performed in the XY plane only (useful
        for separating vertical objects like trees above ground), by default
        ``False``.

    Returns
    -------
    SegmentationResult
        Per-point labels and cluster statistics.

    Raises
    ------
    OcculusSegmentationError
        If eps or min_samples are invalid, or the cloud is empty.
    """
    if eps <= 0:
        raise OcculusSegmentationError(f"eps must be positive, got {eps}")
    if min_samples <= 0:
        raise OcculusSegmentationError(f"min_samples must be positive, got {min_samples}")
    if cloud.n_points == 0:
        raise OcculusSegmentationError("Cannot cluster an empty cloud")

    pts = cloud.xyz[:, :2] if use_2d else cloud.xyz
    labels = _dbscan(pts, eps=eps, min_samples=min_samples)

    unique_labels = set(labels) - {-1}
    n_segments = len(unique_labels)
    segment_sizes = {int(lbl): int((labels == lbl).sum()) for lbl in unique_labels}

    logger.info(
        "cluster_dbscan: %d clusters, %d noise points (eps=%.3f, min_samples=%d)",
        n_segments,
        int((labels == -1).sum()),
        eps,
        min_samples,
    )

    return SegmentationResult(
        labels=labels.astype(np.int32),
        n_segments=n_segments,
        segment_sizes=segment_sizes,
    )

detect_powerlines(cloud, *, min_height_above_ground=3.0, max_height_above_ground=50.0, linearity_threshold=0.7, ground_class=2, catenary_fit=True, min_clearance=None, strict=True, min_wire_span=50.0, max_pylon_xy_extent=5.0, min_pylon_z_extent=8.0, max_wire_height_std=3.0, pylon_association_radius=10.0)

Detect powerline wires and pylons in a classified point cloud.

The algorithm:

  1. Separate ground (ground_class) from non-ground using the cloud's classification attribute.
  2. Interpolate a ground surface and compute height-above-ground (HAG) for every non-ground point.
  3. Filter candidates by the HAG height band.
  4. Compute per-point geometric features (linearity, planarity, verticality) via PCA on local KDTree neighbourhoods.
  5. Classify wire candidates (high linearity) and pylon candidates (high verticality, low linearity, spatially clustered).
  6. Cluster wire candidates with DBSCAN into individual wire segments.
  7. Optionally fit catenary curves to each wire segment.
  8. Optionally flag clearance violations where wires are below min_clearance.

When strict=True (the default), additional false-positive reduction filters are applied:

  • Wire segment length: DBSCAN clusters whose 3D extent is shorter than min_wire_span are rejected (building edges, fences).
  • Pylon geometry: Pylon clusters whose XY bounding box exceeds max_pylon_xy_extent or whose Z extent is less than min_pylon_z_extent are rejected (buildings, tree crowns).
  • Height band consistency: Wire clusters whose height standard deviation exceeds max_wire_height_std are rejected (tree crowns with scattered points).
  • Wire–pylon association: Wire segments whose endpoints are not within pylon_association_radius of a detected pylon are downgraded (removed from the confirmed wire mask but retained as segments with rmse=inf).

Parameters:

Name Type Description Default
cloud PointCloud

Input point cloud. Must have a classification array with at least some points labelled as ground_class.

required
min_height_above_ground float

Minimum height above the interpolated ground surface for a point to be considered a powerline candidate, by default 3.0.

3.0
max_height_above_ground float

Maximum height above ground, by default 50.0.

50.0
linearity_threshold float

Minimum linearity value (0--1) for a point to be classified as a wire candidate, by default 0.7.

0.7
ground_class int

ASPRS classification code for ground points, by default 2.

2
catenary_fit bool

If True, fit a catenary curve to each wire segment, by default True.

True
min_clearance float | None

If provided, flag wire points whose HAG is below this value as clearance violations, by default None (no clearance check).

None
strict bool

If True, enable false-positive reduction filters for wire span length, pylon geometry, height consistency, and wire–pylon association. Set to False to use the original permissive behaviour, by default True.

True
min_wire_span float

Minimum horizontal extent (metres) for a wire DBSCAN cluster to be accepted. Only used when strict=True, by default 50.0.

50.0
max_pylon_xy_extent float

Maximum XY bounding box size (metres) for a pylon cluster. Clusters wider than this are rejected as buildings. Only used when strict=True, by default 5.0.

5.0
min_pylon_z_extent float

Minimum Z extent (metres) for a pylon cluster. Clusters shorter than this are rejected. Only used when strict=True, by default 8.0.

8.0
max_wire_height_std float

Maximum standard deviation (metres) of Z values within a wire cluster. Clusters exceeding this are rejected. Only used when strict=True, by default 3.0.

3.0
pylon_association_radius float

Maximum distance (metres) from a wire segment endpoint to the nearest pylon centroid for the segment to be considered connected. Only used when strict=True, by default 10.0.

10.0

Returns:

Type Description
PowerlineResult

Detection results including wire/pylon masks, segments, pylon positions, and optional clearance violations.

Raises:

Type Description
OcculusSegmentationError

If the cloud has no classification array, too few ground points for interpolation, or no powerline candidates are found.

Source code in src/occulus/segmentation/powerlines.py
def detect_powerlines(
    cloud: PointCloud,
    *,
    min_height_above_ground: float = 3.0,
    max_height_above_ground: float = 50.0,
    linearity_threshold: float = 0.7,
    ground_class: int = 2,
    catenary_fit: bool = True,
    min_clearance: float | None = None,
    strict: bool = True,
    min_wire_span: float = 50.0,
    max_pylon_xy_extent: float = 5.0,
    min_pylon_z_extent: float = 8.0,
    max_wire_height_std: float = 3.0,
    pylon_association_radius: float = 10.0,
) -> PowerlineResult:
    """Detect powerline wires and pylons in a classified point cloud.

    The algorithm:

    1. Separate ground (``ground_class``) from non-ground using the
       cloud's ``classification`` attribute.
    2. Interpolate a ground surface and compute height-above-ground (HAG)
       for every non-ground point.
    3. Filter candidates by the HAG height band.
    4. Compute per-point geometric features (linearity, planarity,
       verticality) via PCA on local KDTree neighbourhoods.
    5. Classify wire candidates (high linearity) and pylon candidates
       (high verticality, low linearity, spatially clustered).
    6. Cluster wire candidates with DBSCAN into individual wire segments.
    7. Optionally fit catenary curves to each wire segment.
    8. Optionally flag clearance violations where wires are below
       ``min_clearance``.

    When ``strict=True`` (the default), additional false-positive
    reduction filters are applied:

    - **Wire segment length**: DBSCAN clusters whose 3D extent is
      shorter than ``min_wire_span`` are rejected (building edges,
      fences).
    - **Pylon geometry**: Pylon clusters whose XY bounding box exceeds
      ``max_pylon_xy_extent`` or whose Z extent is less than
      ``min_pylon_z_extent`` are rejected (buildings, tree crowns).
    - **Height band consistency**: Wire clusters whose height standard
      deviation exceeds ``max_wire_height_std`` are rejected (tree
      crowns with scattered points).
    - **Wire–pylon association**: Wire segments whose endpoints are
      not within ``pylon_association_radius`` of a detected pylon are
      downgraded (removed from the confirmed wire mask but retained
      as segments with ``rmse=inf``).

    Parameters
    ----------
    cloud : PointCloud
        Input point cloud. Must have a ``classification`` array with at
        least some points labelled as ``ground_class``.
    min_height_above_ground : float, optional
        Minimum height above the interpolated ground surface for a point
        to be considered a powerline candidate, by default 3.0.
    max_height_above_ground : float, optional
        Maximum height above ground, by default 50.0.
    linearity_threshold : float, optional
        Minimum linearity value (0--1) for a point to be classified as a
        wire candidate, by default 0.7.
    ground_class : int, optional
        ASPRS classification code for ground points, by default 2.
    catenary_fit : bool, optional
        If ``True``, fit a catenary curve to each wire segment,
        by default ``True``.
    min_clearance : float | None, optional
        If provided, flag wire points whose HAG is below this value as
        clearance violations, by default ``None`` (no clearance check).
    strict : bool, optional
        If ``True``, enable false-positive reduction filters for wire
        span length, pylon geometry, height consistency, and wire–pylon
        association.  Set to ``False`` to use the original permissive
        behaviour, by default ``True``.
    min_wire_span : float, optional
        Minimum horizontal extent (metres) for a wire DBSCAN cluster to
        be accepted.  Only used when ``strict=True``, by default 50.0.
    max_pylon_xy_extent : float, optional
        Maximum XY bounding box size (metres) for a pylon cluster.
        Clusters wider than this are rejected as buildings.  Only used
        when ``strict=True``, by default 5.0.
    min_pylon_z_extent : float, optional
        Minimum Z extent (metres) for a pylon cluster.  Clusters shorter
        than this are rejected.  Only used when ``strict=True``,
        by default 8.0.
    max_wire_height_std : float, optional
        Maximum standard deviation (metres) of Z values within a wire
        cluster.  Clusters exceeding this are rejected.  Only used when
        ``strict=True``, by default 3.0.
    pylon_association_radius : float, optional
        Maximum distance (metres) from a wire segment endpoint to the
        nearest pylon centroid for the segment to be considered
        connected.  Only used when ``strict=True``, by default 10.0.

    Returns
    -------
    PowerlineResult
        Detection results including wire/pylon masks, segments, pylon
        positions, and optional clearance violations.

    Raises
    ------
    OcculusSegmentationError
        If the cloud has no classification array, too few ground points
        for interpolation, or no powerline candidates are found.
    """
    # --- Input validation ---------------------------------------------------
    if cloud.n_points == 0:
        raise OcculusSegmentationError("Cannot detect powerlines in an empty cloud")

    if cloud.classification is None:
        raise OcculusSegmentationError(
            "Cloud must have a classification array. "
            "Run classify_ground_csf() or classify_ground_pmf() first."
        )

    if min_height_above_ground < 0:
        raise OcculusSegmentationError(
            f"min_height_above_ground must be >= 0, got {min_height_above_ground}"
        )
    if max_height_above_ground <= min_height_above_ground:
        raise OcculusSegmentationError(
            f"max_height_above_ground ({max_height_above_ground}) must be greater "
            f"than min_height_above_ground ({min_height_above_ground})"
        )
    if not 0.0 < linearity_threshold <= 1.0:
        raise OcculusSegmentationError(
            f"linearity_threshold must be in (0, 1], got {linearity_threshold}"
        )

    # --- Step 1: Separate ground / non-ground --------------------------------
    ground_mask = cloud.classification == ground_class
    n_ground = int(ground_mask.sum())

    if n_ground < 3:
        raise OcculusSegmentationError(
            f"Need at least 3 ground points for surface interpolation, "
            f"found {n_ground} with class={ground_class}"
        )

    non_ground_mask = ~ground_mask
    n_non_ground = int(non_ground_mask.sum())

    if n_non_ground == 0:
        raise OcculusSegmentationError(
            "All points are classified as ground; no candidates for powerline detection"
        )

    logger.debug(
        "Ground separation: %d ground, %d non-ground points",
        n_ground,
        n_non_ground,
    )

    # --- Step 2: Compute height above ground ---------------------------------
    ground_xyz = cloud.xyz[ground_mask]
    hag = _compute_height_above_ground(cloud.xyz, ground_xyz)

    # --- Step 3: Filter by height band ---------------------------------------
    non_ground_indices = np.where(non_ground_mask)[0]
    height_band = (hag[non_ground_indices] >= min_height_above_ground) & (
        hag[non_ground_indices] <= max_height_above_ground
    )
    candidate_indices = non_ground_indices[height_band]

    if len(candidate_indices) == 0:
        raise OcculusSegmentationError(
            f"No points found in height band [{min_height_above_ground}, "
            f"{max_height_above_ground}] above ground"
        )

    logger.debug(
        "Height filter: %d candidates in [%.1f, %.1f] m above ground",
        len(candidate_indices),
        min_height_above_ground,
        max_height_above_ground,
    )

    # --- Step 4: Compute geometric features ----------------------------------
    candidate_xyz = cloud.xyz[candidate_indices]
    linearity, planarity, verticality = _compute_geometric_features(
        candidate_xyz, k=min(20, len(candidate_indices))
    )

    # --- Step 5: Wire and pylon classification -------------------------------
    wire_local = linearity >= linearity_threshold
    pylon_local = (verticality >= 0.6) & (linearity < linearity_threshold * 0.5)

    wire_indices = candidate_indices[wire_local]
    pylon_candidate_indices = candidate_indices[pylon_local]

    logger.debug(
        "Feature classification: %d wire candidates, %d pylon candidates",
        len(wire_indices),
        len(pylon_candidate_indices),
    )

    # --- Step 6: Cluster wires with DBSCAN -----------------------------------
    wire_segments: list[CatenarySegment] = []
    wire_mask = np.zeros(cloud.n_points, dtype=bool)

    if len(wire_indices) > 0:
        wire_labels = _dbscan_cluster(cloud.xyz[wire_indices], eps=3.0, min_samples=5)

        unique_labels = set(wire_labels) - {-1}
        n_rejected_span = 0
        n_rejected_height = 0
        for lbl in sorted(unique_labels):
            seg_local = np.where(wire_labels == lbl)[0]
            seg_global = wire_indices[seg_local]
            seg_xyz = cloud.xyz[seg_global]

            # --- Strict filter: minimum wire span length ---
            if strict:
                xy_extent = seg_xyz[:, :2].max(axis=0) - seg_xyz[:, :2].min(axis=0)
                span = float(np.linalg.norm(xy_extent))
                if span < min_wire_span:
                    n_rejected_span += 1
                    continue

            # --- Strict filter: height band consistency ---
            if strict:
                z_std = float(seg_xyz[:, 2].std())
                if z_std > max_wire_height_std:
                    n_rejected_height += 1
                    continue

            wire_mask[seg_global] = True
            seg = CatenarySegment(
                indices=seg_global,
                a=0.0,
                x0=0.0,
                z0=0.0,
                rmse=float("inf"),
            )
            wire_segments.append(seg)

        if strict and (n_rejected_span > 0 or n_rejected_height > 0):
            logger.debug(
                "Strict wire filtering: rejected %d short segments, %d high-variance segments",
                n_rejected_span,
                n_rejected_height,
            )

        if not strict:
            # In permissive mode, mark all wire candidates (original behaviour)
            wire_mask[wire_indices] = True

        logger.info(
            "Wire clustering: %d wire segments from %d wire points",
            len(wire_segments),
            int(wire_mask.sum()),
        )

    # --- Step 6b: Cluster pylons ---------------------------------------------
    pylon_mask = np.zeros(cloud.n_points, dtype=bool)
    pylon_positions = np.empty((0, 3), dtype=np.float64)

    if len(pylon_candidate_indices) > 0:
        pylon_labels = _dbscan_cluster(cloud.xyz[pylon_candidate_indices], eps=5.0, min_samples=3)

        pylon_unique = set(pylon_labels) - {-1}
        if pylon_unique:
            centroids = []
            accepted_indices: list[NDArray[np.intp]] = []
            n_rejected_wide = 0
            n_rejected_short = 0
            for lbl in sorted(pylon_unique):
                seg_local = np.where(pylon_labels == lbl)[0]
                seg_global = pylon_candidate_indices[seg_local]
                seg_xyz = cloud.xyz[seg_global]

                if strict:
                    # Reject clusters with wide XY footprint (buildings)
                    xy_min = seg_xyz[:, :2].min(axis=0)
                    xy_max = seg_xyz[:, :2].max(axis=0)
                    xy_extent = xy_max - xy_min
                    if float(xy_extent.max()) > max_pylon_xy_extent:
                        n_rejected_wide += 1
                        continue

                    # Reject clusters that are not tall enough
                    z_extent = float(seg_xyz[:, 2].max() - seg_xyz[:, 2].min())
                    if z_extent < min_pylon_z_extent:
                        n_rejected_short += 1
                        continue

                centroid = seg_xyz.mean(axis=0)
                centroids.append(centroid)
                accepted_indices.append(seg_global)

            if centroids:
                pylon_positions = np.array(centroids, dtype=np.float64)
                for idx_arr in accepted_indices:
                    pylon_mask[idx_arr] = True

            if strict and (n_rejected_wide > 0 or n_rejected_short > 0):
                logger.debug(
                    "Strict pylon filtering: rejected %d wide clusters, %d short clusters",
                    n_rejected_wide,
                    n_rejected_short,
                )
        else:
            # No pylon clusters found — mark all candidates in permissive mode
            if not strict:
                pylon_mask[pylon_candidate_indices] = True

        if not strict:
            # In permissive mode, mark all pylon candidates (original behaviour)
            pylon_mask[pylon_candidate_indices] = True

        logger.info(
            "Pylon clustering: %d pylons detected",
            len(pylon_positions),
        )

    # --- Step 6c: Wire-pylon association (strict mode) -----------------------
    if strict and len(pylon_positions) >= 2 and wire_segments:
        wire_segments, wire_mask = _filter_orphan_wires(
            cloud.xyz,
            wire_segments,
            wire_mask,
            pylon_positions,
            pylon_association_radius,
        )

    # --- Step 7: Catenary fitting --------------------------------------------
    if catenary_fit and wire_segments:
        wire_segments = _fit_catenaries(cloud.xyz, wire_segments)

    # --- Step 8: Clearance analysis ------------------------------------------
    clearance_violations: list[ClearanceViolation] = []
    confirmed_wire_indices = np.where(wire_mask)[0]
    if min_clearance is not None and len(confirmed_wire_indices) > 0:
        for idx in confirmed_wire_indices:
            h = hag[idx]
            if h < min_clearance:
                clearance_violations.append(
                    ClearanceViolation(
                        point_index=int(idx),
                        height_above_ground=float(h),
                        min_clearance=min_clearance,
                    )
                )
        if clearance_violations:
            logger.warning(
                "Clearance analysis: %d violations (min_clearance=%.1f)",
                len(clearance_violations),
                min_clearance,
            )

    return PowerlineResult(
        wire_mask=wire_mask,
        pylon_mask=pylon_mask,
        wire_segments=wire_segments,
        pylon_positions=pylon_positions,
        clearance_violations=clearance_violations,
    )

segment_trees(cloud, *, resolution=1.0, min_height=2.0, min_crown_area=4.0, max_raster_size=5000)

Segment individual trees using a CHM-based watershed approach.

Builds a 2D Canopy Height Model (CHM) raster from the input cloud, detects local maxima as tree tops, then performs a marker-controlled watershed segmentation to delineate individual tree crowns.

This function is intended for aerial or UAV clouds. Terrestrial clouds will raise an error.

Parameters:

Name Type Description Default
cloud PointCloud

Input cloud. Should be non-ground points (vegetation only) for best results. Must be aerial or UAV platform.

required
resolution float

CHM raster resolution in coordinate units, by default 1.0.

1.0
min_height float

Minimum tree height above the minimum Z of the cloud, by default 2.0.

2.0
min_crown_area float

Minimum crown area in square coordinate units to retain a tree segment, by default 4.0.

4.0
max_raster_size int

Maximum allowed dimension (width or height) for the CHM raster. If the computed raster would exceed this in either dimension, the resolution is automatically coarsened to fit. By default 5000.

5000

Returns:

Type Description
SegmentationResult

Per-point tree labels. Label -1 = not assigned to any tree.

Raises:

Type Description
UnsupportedPlatformError

If the cloud platform is TERRESTRIAL.

OcculusSegmentationError

If no trees are detected.

Source code in src/occulus/segmentation/objects.py
def segment_trees(
    cloud: PointCloud,
    *,
    resolution: float = 1.0,
    min_height: float = 2.0,
    min_crown_area: float = 4.0,
    max_raster_size: int = 5000,
) -> SegmentationResult:
    """Segment individual trees using a CHM-based watershed approach.

    Builds a 2D Canopy Height Model (CHM) raster from the input cloud,
    detects local maxima as tree tops, then performs a marker-controlled
    watershed segmentation to delineate individual tree crowns.

    This function is intended for aerial or UAV clouds. Terrestrial clouds
    will raise an error.

    Parameters
    ----------
    cloud : PointCloud
        Input cloud. Should be non-ground points (vegetation only) for
        best results. Must be aerial or UAV platform.
    resolution : float, optional
        CHM raster resolution in coordinate units, by default 1.0.
    min_height : float, optional
        Minimum tree height above the minimum Z of the cloud, by default 2.0.
    min_crown_area : float, optional
        Minimum crown area in square coordinate units to retain a tree
        segment, by default 4.0.
    max_raster_size : int, optional
        Maximum allowed dimension (width or height) for the CHM raster.
        If the computed raster would exceed this in either dimension, the
        resolution is automatically coarsened to fit. By default 5000.

    Returns
    -------
    SegmentationResult
        Per-point tree labels. Label -1 = not assigned to any tree.

    Raises
    ------
    UnsupportedPlatformError
        If the cloud platform is ``TERRESTRIAL``.
    OcculusSegmentationError
        If no trees are detected.
    """
    if cloud.platform == Platform.TERRESTRIAL:
        raise UnsupportedPlatformError(
            "segment_trees is intended for aerial/UAV clouds, not terrestrial scans. "
            "Use cluster_dbscan() for TLS object segmentation."
        )
    if cloud.n_points == 0:
        raise OcculusSegmentationError("Cannot segment trees from an empty cloud")

    from scipy.ndimage import label as nd_label  # type: ignore[import-untyped]
    from scipy.ndimage import maximum_filter, watershed_ift

    xyz = cloud.xyz
    x_min, y_min = xyz[:, 0].min(), xyz[:, 1].min()
    x_max, y_max = xyz[:, 0].max(), xyz[:, 1].max()
    z_min = xyz[:, 2].min()

    nx = max(2, int(np.ceil((x_max - x_min) / resolution)) + 1)
    ny = max(2, int(np.ceil((y_max - y_min) / resolution)) + 1)

    # Auto-coarsen resolution if raster would exceed max_raster_size
    max_dim = max(nx, ny)
    if max_dim > max_raster_size:
        old_resolution = resolution
        resolution = resolution * (max_dim / max_raster_size)
        nx = max(2, int(np.ceil((x_max - x_min) / resolution)) + 1)
        ny = max(2, int(np.ceil((y_max - y_min) / resolution)) + 1)
        logger.warning(
            "CHM raster %dx%d exceeds max_raster_size=%d, coarsening resolution from %.4f to %.4f",
            max(2, int(np.ceil((x_max - x_min) / old_resolution)) + 1),
            max(2, int(np.ceil((y_max - y_min) / old_resolution)) + 1),
            max_raster_size,
            old_resolution,
            resolution,
        )

    col_idx = np.clip(((xyz[:, 0] - x_min) / resolution).astype(int), 0, nx - 1)
    row_idx = np.clip(((xyz[:, 1] - y_min) / resolution).astype(int), 0, ny - 1)

    # Build max-Z raster (CHM) — vectorised via np.maximum.at
    heights = xyz[:, 2] - z_min
    chm = np.zeros((ny, nx), dtype=np.float64)
    flat_idx = row_idx * nx + col_idx
    np.maximum.at(chm.ravel(), flat_idx, heights)

    # Detect local maxima (tree tops) via morphological maximum filter
    neighborhood = max(3, int(np.ceil(min_crown_area**0.5 / resolution)) | 1)
    local_max = maximum_filter(chm, size=neighborhood)
    treetop_mask = (chm == local_max) & (chm >= min_height)

    # Label connected components of tree tops
    treetop_labels, n_tops = nd_label(treetop_mask)

    if n_tops == 0:
        raise OcculusSegmentationError(
            f"No trees detected (min_height={min_height}). Try lowering min_height or resolution."
        )

    # Marker-controlled watershed via scipy watershed_ift.
    # watershed_ift expects a uint8/uint16 cost surface and int32 markers.
    # We invert the CHM so that high canopy = low cost (seeds expand from peaks).
    chm_max = chm.max()
    if chm_max > 0:
        cost = ((1.0 - chm / chm_max) * 65534).astype(np.uint16)
    else:
        cost = np.zeros_like(chm, dtype=np.uint16)
    labels_grid = watershed_ift(cost, treetop_labels.astype(np.int32))

    # Map each point to its tree label — vectorised
    height_mask = heights >= min_height
    grid_labels = labels_grid[row_idx, col_idx] - 1  # 0 in grid → -1 (unlabelled)
    pt_labels = np.where(height_mask, grid_labels, np.int32(-1)).astype(np.int32)

    # Prune tiny segments
    unique_labels, counts = np.unique(pt_labels[pt_labels >= 0], return_counts=True)
    min_count = min_crown_area / (resolution**2)
    segment_sizes: dict[int, int] = {}
    small_labels = set()
    for lbl, count in zip(unique_labels, counts, strict=False):
        if count >= min_count:
            segment_sizes[int(lbl)] = int(count)
        else:
            small_labels.add(int(lbl))
    if small_labels:
        small_mask = np.isin(pt_labels, list(small_labels))
        pt_labels[small_mask] = -1

    n_segments = len(segment_sizes)
    logger.info(
        "segment_trees: %d trees detected from %d treetops (res=%.2f)",
        n_segments,
        n_tops,
        resolution,
    )

    return SegmentationResult(
        labels=pt_labels,
        n_segments=n_segments,
        segment_sizes=segment_sizes,
    )