Skip to content

Powerline Detection

occulus.segmentation.powerlines

Powerline detection — wire and pylon extraction from point clouds.

Detects power transmission and distribution lines from classified LiDAR point clouds by combining height-above-ground filtering with geometric feature analysis (linearity, planarity, verticality). Wire candidates exhibit high linearity while pylon candidates exhibit high verticality and spatial clustering. Optionally fits catenary curves to wire segments and checks minimum ground clearance.

CatenarySegment dataclass

A single fitted catenary curve representing one wire span.

Attributes:

Name Type Description
indices NDArray[intp]

Indices into the original point cloud for points in this segment.

a float

Catenary shape parameter (lower = more sag).

x0 float

Horizontal offset of the catenary vertex.

z0 float

Vertical offset of the catenary vertex.

rmse float

Root-mean-square error of the catenary fit in coordinate units.

Source code in src/occulus/segmentation/powerlines.py
@dataclass
class CatenarySegment:
    """A single fitted catenary curve representing one wire span.

    Attributes
    ----------
    indices : NDArray[np.intp]
        Indices into the original point cloud for points in this segment.
    a : float
        Catenary shape parameter (lower = more sag).
    x0 : float
        Horizontal offset of the catenary vertex.
    z0 : float
        Vertical offset of the catenary vertex.
    rmse : float
        Root-mean-square error of the catenary fit in coordinate units.
    """

    indices: NDArray[np.intp]
    a: float
    x0: float
    z0: float
    rmse: float

ClearanceViolation dataclass

A location where a wire segment is below the minimum clearance.

Attributes:

Name Type Description
point_index int

Index into the original point cloud.

height_above_ground float

Actual height above ground at this point (coordinate units).

min_clearance float

Required minimum clearance that was violated.

Source code in src/occulus/segmentation/powerlines.py
@dataclass
class ClearanceViolation:
    """A location where a wire segment is below the minimum clearance.

    Attributes
    ----------
    point_index : int
        Index into the original point cloud.
    height_above_ground : float
        Actual height above ground at this point (coordinate units).
    min_clearance : float
        Required minimum clearance that was violated.
    """

    point_index: int
    height_above_ground: float
    min_clearance: float

PowerlineResult dataclass

Result of powerline detection.

Attributes:

Name Type Description
wire_mask NDArray[bool_]

Per-point boolean mask identifying wire points.

pylon_mask NDArray[bool_]

Per-point boolean mask identifying pylon/tower points.

wire_segments list[CatenarySegment]

List of detected wire segments with optional catenary fits.

pylon_positions NDArray[float64]

Centroid positions of detected pylons as (M, 3) array.

clearance_violations list[ClearanceViolation]

Locations where wires are below the minimum clearance threshold.

Source code in src/occulus/segmentation/powerlines.py
@dataclass
class PowerlineResult:
    """Result of powerline detection.

    Attributes
    ----------
    wire_mask : NDArray[np.bool_]
        Per-point boolean mask identifying wire points.
    pylon_mask : NDArray[np.bool_]
        Per-point boolean mask identifying pylon/tower points.
    wire_segments : list[CatenarySegment]
        List of detected wire segments with optional catenary fits.
    pylon_positions : NDArray[np.float64]
        Centroid positions of detected pylons as (M, 3) array.
    clearance_violations : list[ClearanceViolation]
        Locations where wires are below the minimum clearance threshold.
    """

    wire_mask: NDArray[np.bool_]
    pylon_mask: NDArray[np.bool_]
    wire_segments: list[CatenarySegment] = field(default_factory=list)
    pylon_positions: NDArray[np.float64] = field(
        default_factory=lambda: np.empty((0, 3), dtype=np.float64)
    )
    clearance_violations: list[ClearanceViolation] = field(default_factory=list)

detect_powerlines(cloud, *, min_height_above_ground=3.0, max_height_above_ground=50.0, linearity_threshold=0.7, ground_class=2, catenary_fit=True, min_clearance=None, strict=True, min_wire_span=50.0, max_pylon_xy_extent=5.0, min_pylon_z_extent=8.0, max_wire_height_std=3.0, pylon_association_radius=10.0)

Detect powerline wires and pylons in a classified point cloud.

The algorithm:

  1. Separate ground (ground_class) from non-ground using the cloud's classification attribute.
  2. Interpolate a ground surface and compute height-above-ground (HAG) for every non-ground point.
  3. Filter candidates by the HAG height band.
  4. Compute per-point geometric features (linearity, planarity, verticality) via PCA on local KDTree neighbourhoods.
  5. Classify wire candidates (high linearity) and pylon candidates (high verticality, low linearity, spatially clustered).
  6. Cluster wire candidates with DBSCAN into individual wire segments.
  7. Optionally fit catenary curves to each wire segment.
  8. Optionally flag clearance violations where wires are below min_clearance.

When strict=True (the default), additional false-positive reduction filters are applied:

  • Wire segment length: DBSCAN clusters whose 3D extent is shorter than min_wire_span are rejected (building edges, fences).
  • Pylon geometry: Pylon clusters whose XY bounding box exceeds max_pylon_xy_extent or whose Z extent is less than min_pylon_z_extent are rejected (buildings, tree crowns).
  • Height band consistency: Wire clusters whose height standard deviation exceeds max_wire_height_std are rejected (tree crowns with scattered points).
  • Wire–pylon association: Wire segments whose endpoints are not within pylon_association_radius of a detected pylon are downgraded (removed from the confirmed wire mask but retained as segments with rmse=inf).

Parameters:

Name Type Description Default
cloud PointCloud

Input point cloud. Must have a classification array with at least some points labelled as ground_class.

required
min_height_above_ground float

Minimum height above the interpolated ground surface for a point to be considered a powerline candidate, by default 3.0.

3.0
max_height_above_ground float

Maximum height above ground, by default 50.0.

50.0
linearity_threshold float

Minimum linearity value (0--1) for a point to be classified as a wire candidate, by default 0.7.

0.7
ground_class int

ASPRS classification code for ground points, by default 2.

2
catenary_fit bool

If True, fit a catenary curve to each wire segment, by default True.

True
min_clearance float | None

If provided, flag wire points whose HAG is below this value as clearance violations, by default None (no clearance check).

None
strict bool

If True, enable false-positive reduction filters for wire span length, pylon geometry, height consistency, and wire–pylon association. Set to False to use the original permissive behaviour, by default True.

True
min_wire_span float

Minimum horizontal extent (metres) for a wire DBSCAN cluster to be accepted. Only used when strict=True, by default 50.0.

50.0
max_pylon_xy_extent float

Maximum XY bounding box size (metres) for a pylon cluster. Clusters wider than this are rejected as buildings. Only used when strict=True, by default 5.0.

5.0
min_pylon_z_extent float

Minimum Z extent (metres) for a pylon cluster. Clusters shorter than this are rejected. Only used when strict=True, by default 8.0.

8.0
max_wire_height_std float

Maximum standard deviation (metres) of Z values within a wire cluster. Clusters exceeding this are rejected. Only used when strict=True, by default 3.0.

3.0
pylon_association_radius float

Maximum distance (metres) from a wire segment endpoint to the nearest pylon centroid for the segment to be considered connected. Only used when strict=True, by default 10.0.

10.0

Returns:

Type Description
PowerlineResult

Detection results including wire/pylon masks, segments, pylon positions, and optional clearance violations.

Raises:

Type Description
OcculusSegmentationError

If the cloud has no classification array, too few ground points for interpolation, or no powerline candidates are found.

Source code in src/occulus/segmentation/powerlines.py
def detect_powerlines(
    cloud: PointCloud,
    *,
    min_height_above_ground: float = 3.0,
    max_height_above_ground: float = 50.0,
    linearity_threshold: float = 0.7,
    ground_class: int = 2,
    catenary_fit: bool = True,
    min_clearance: float | None = None,
    strict: bool = True,
    min_wire_span: float = 50.0,
    max_pylon_xy_extent: float = 5.0,
    min_pylon_z_extent: float = 8.0,
    max_wire_height_std: float = 3.0,
    pylon_association_radius: float = 10.0,
) -> PowerlineResult:
    """Detect powerline wires and pylons in a classified point cloud.

    The algorithm:

    1. Separate ground (``ground_class``) from non-ground using the
       cloud's ``classification`` attribute.
    2. Interpolate a ground surface and compute height-above-ground (HAG)
       for every non-ground point.
    3. Filter candidates by the HAG height band.
    4. Compute per-point geometric features (linearity, planarity,
       verticality) via PCA on local KDTree neighbourhoods.
    5. Classify wire candidates (high linearity) and pylon candidates
       (high verticality, low linearity, spatially clustered).
    6. Cluster wire candidates with DBSCAN into individual wire segments.
    7. Optionally fit catenary curves to each wire segment.
    8. Optionally flag clearance violations where wires are below
       ``min_clearance``.

    When ``strict=True`` (the default), additional false-positive
    reduction filters are applied:

    - **Wire segment length**: DBSCAN clusters whose 3D extent is
      shorter than ``min_wire_span`` are rejected (building edges,
      fences).
    - **Pylon geometry**: Pylon clusters whose XY bounding box exceeds
      ``max_pylon_xy_extent`` or whose Z extent is less than
      ``min_pylon_z_extent`` are rejected (buildings, tree crowns).
    - **Height band consistency**: Wire clusters whose height standard
      deviation exceeds ``max_wire_height_std`` are rejected (tree
      crowns with scattered points).
    - **Wire–pylon association**: Wire segments whose endpoints are
      not within ``pylon_association_radius`` of a detected pylon are
      downgraded (removed from the confirmed wire mask but retained
      as segments with ``rmse=inf``).

    Parameters
    ----------
    cloud : PointCloud
        Input point cloud. Must have a ``classification`` array with at
        least some points labelled as ``ground_class``.
    min_height_above_ground : float, optional
        Minimum height above the interpolated ground surface for a point
        to be considered a powerline candidate, by default 3.0.
    max_height_above_ground : float, optional
        Maximum height above ground, by default 50.0.
    linearity_threshold : float, optional
        Minimum linearity value (0--1) for a point to be classified as a
        wire candidate, by default 0.7.
    ground_class : int, optional
        ASPRS classification code for ground points, by default 2.
    catenary_fit : bool, optional
        If ``True``, fit a catenary curve to each wire segment,
        by default ``True``.
    min_clearance : float | None, optional
        If provided, flag wire points whose HAG is below this value as
        clearance violations, by default ``None`` (no clearance check).
    strict : bool, optional
        If ``True``, enable false-positive reduction filters for wire
        span length, pylon geometry, height consistency, and wire–pylon
        association.  Set to ``False`` to use the original permissive
        behaviour, by default ``True``.
    min_wire_span : float, optional
        Minimum horizontal extent (metres) for a wire DBSCAN cluster to
        be accepted.  Only used when ``strict=True``, by default 50.0.
    max_pylon_xy_extent : float, optional
        Maximum XY bounding box size (metres) for a pylon cluster.
        Clusters wider than this are rejected as buildings.  Only used
        when ``strict=True``, by default 5.0.
    min_pylon_z_extent : float, optional
        Minimum Z extent (metres) for a pylon cluster.  Clusters shorter
        than this are rejected.  Only used when ``strict=True``,
        by default 8.0.
    max_wire_height_std : float, optional
        Maximum standard deviation (metres) of Z values within a wire
        cluster.  Clusters exceeding this are rejected.  Only used when
        ``strict=True``, by default 3.0.
    pylon_association_radius : float, optional
        Maximum distance (metres) from a wire segment endpoint to the
        nearest pylon centroid for the segment to be considered
        connected.  Only used when ``strict=True``, by default 10.0.

    Returns
    -------
    PowerlineResult
        Detection results including wire/pylon masks, segments, pylon
        positions, and optional clearance violations.

    Raises
    ------
    OcculusSegmentationError
        If the cloud has no classification array, too few ground points
        for interpolation, or no powerline candidates are found.
    """
    # --- Input validation ---------------------------------------------------
    if cloud.n_points == 0:
        raise OcculusSegmentationError("Cannot detect powerlines in an empty cloud")

    if cloud.classification is None:
        raise OcculusSegmentationError(
            "Cloud must have a classification array. "
            "Run classify_ground_csf() or classify_ground_pmf() first."
        )

    if min_height_above_ground < 0:
        raise OcculusSegmentationError(
            f"min_height_above_ground must be >= 0, got {min_height_above_ground}"
        )
    if max_height_above_ground <= min_height_above_ground:
        raise OcculusSegmentationError(
            f"max_height_above_ground ({max_height_above_ground}) must be greater "
            f"than min_height_above_ground ({min_height_above_ground})"
        )
    if not 0.0 < linearity_threshold <= 1.0:
        raise OcculusSegmentationError(
            f"linearity_threshold must be in (0, 1], got {linearity_threshold}"
        )

    # --- Step 1: Separate ground / non-ground --------------------------------
    ground_mask = cloud.classification == ground_class
    n_ground = int(ground_mask.sum())

    if n_ground < 3:
        raise OcculusSegmentationError(
            f"Need at least 3 ground points for surface interpolation, "
            f"found {n_ground} with class={ground_class}"
        )

    non_ground_mask = ~ground_mask
    n_non_ground = int(non_ground_mask.sum())

    if n_non_ground == 0:
        raise OcculusSegmentationError(
            "All points are classified as ground; no candidates for powerline detection"
        )

    logger.debug(
        "Ground separation: %d ground, %d non-ground points",
        n_ground,
        n_non_ground,
    )

    # --- Step 2: Compute height above ground ---------------------------------
    ground_xyz = cloud.xyz[ground_mask]
    hag = _compute_height_above_ground(cloud.xyz, ground_xyz)

    # --- Step 3: Filter by height band ---------------------------------------
    non_ground_indices = np.where(non_ground_mask)[0]
    height_band = (hag[non_ground_indices] >= min_height_above_ground) & (
        hag[non_ground_indices] <= max_height_above_ground
    )
    candidate_indices = non_ground_indices[height_band]

    if len(candidate_indices) == 0:
        raise OcculusSegmentationError(
            f"No points found in height band [{min_height_above_ground}, "
            f"{max_height_above_ground}] above ground"
        )

    logger.debug(
        "Height filter: %d candidates in [%.1f, %.1f] m above ground",
        len(candidate_indices),
        min_height_above_ground,
        max_height_above_ground,
    )

    # --- Step 4: Compute geometric features ----------------------------------
    candidate_xyz = cloud.xyz[candidate_indices]
    linearity, planarity, verticality = _compute_geometric_features(
        candidate_xyz, k=min(20, len(candidate_indices))
    )

    # --- Step 5: Wire and pylon classification -------------------------------
    wire_local = linearity >= linearity_threshold
    pylon_local = (verticality >= 0.6) & (linearity < linearity_threshold * 0.5)

    wire_indices = candidate_indices[wire_local]
    pylon_candidate_indices = candidate_indices[pylon_local]

    logger.debug(
        "Feature classification: %d wire candidates, %d pylon candidates",
        len(wire_indices),
        len(pylon_candidate_indices),
    )

    # --- Step 6: Cluster wires with DBSCAN -----------------------------------
    wire_segments: list[CatenarySegment] = []
    wire_mask = np.zeros(cloud.n_points, dtype=bool)

    if len(wire_indices) > 0:
        wire_labels = _dbscan_cluster(cloud.xyz[wire_indices], eps=3.0, min_samples=5)

        unique_labels = set(wire_labels) - {-1}
        n_rejected_span = 0
        n_rejected_height = 0
        for lbl in sorted(unique_labels):
            seg_local = np.where(wire_labels == lbl)[0]
            seg_global = wire_indices[seg_local]
            seg_xyz = cloud.xyz[seg_global]

            # --- Strict filter: minimum wire span length ---
            if strict:
                xy_extent = seg_xyz[:, :2].max(axis=0) - seg_xyz[:, :2].min(axis=0)
                span = float(np.linalg.norm(xy_extent))
                if span < min_wire_span:
                    n_rejected_span += 1
                    continue

            # --- Strict filter: height band consistency ---
            if strict:
                z_std = float(seg_xyz[:, 2].std())
                if z_std > max_wire_height_std:
                    n_rejected_height += 1
                    continue

            wire_mask[seg_global] = True
            seg = CatenarySegment(
                indices=seg_global,
                a=0.0,
                x0=0.0,
                z0=0.0,
                rmse=float("inf"),
            )
            wire_segments.append(seg)

        if strict and (n_rejected_span > 0 or n_rejected_height > 0):
            logger.debug(
                "Strict wire filtering: rejected %d short segments, %d high-variance segments",
                n_rejected_span,
                n_rejected_height,
            )

        if not strict:
            # In permissive mode, mark all wire candidates (original behaviour)
            wire_mask[wire_indices] = True

        logger.info(
            "Wire clustering: %d wire segments from %d wire points",
            len(wire_segments),
            int(wire_mask.sum()),
        )

    # --- Step 6b: Cluster pylons ---------------------------------------------
    pylon_mask = np.zeros(cloud.n_points, dtype=bool)
    pylon_positions = np.empty((0, 3), dtype=np.float64)

    if len(pylon_candidate_indices) > 0:
        pylon_labels = _dbscan_cluster(cloud.xyz[pylon_candidate_indices], eps=5.0, min_samples=3)

        pylon_unique = set(pylon_labels) - {-1}
        if pylon_unique:
            centroids = []
            accepted_indices: list[NDArray[np.intp]] = []
            n_rejected_wide = 0
            n_rejected_short = 0
            for lbl in sorted(pylon_unique):
                seg_local = np.where(pylon_labels == lbl)[0]
                seg_global = pylon_candidate_indices[seg_local]
                seg_xyz = cloud.xyz[seg_global]

                if strict:
                    # Reject clusters with wide XY footprint (buildings)
                    xy_min = seg_xyz[:, :2].min(axis=0)
                    xy_max = seg_xyz[:, :2].max(axis=0)
                    xy_extent = xy_max - xy_min
                    if float(xy_extent.max()) > max_pylon_xy_extent:
                        n_rejected_wide += 1
                        continue

                    # Reject clusters that are not tall enough
                    z_extent = float(seg_xyz[:, 2].max() - seg_xyz[:, 2].min())
                    if z_extent < min_pylon_z_extent:
                        n_rejected_short += 1
                        continue

                centroid = seg_xyz.mean(axis=0)
                centroids.append(centroid)
                accepted_indices.append(seg_global)

            if centroids:
                pylon_positions = np.array(centroids, dtype=np.float64)
                for idx_arr in accepted_indices:
                    pylon_mask[idx_arr] = True

            if strict and (n_rejected_wide > 0 or n_rejected_short > 0):
                logger.debug(
                    "Strict pylon filtering: rejected %d wide clusters, %d short clusters",
                    n_rejected_wide,
                    n_rejected_short,
                )
        else:
            # No pylon clusters found — mark all candidates in permissive mode
            if not strict:
                pylon_mask[pylon_candidate_indices] = True

        if not strict:
            # In permissive mode, mark all pylon candidates (original behaviour)
            pylon_mask[pylon_candidate_indices] = True

        logger.info(
            "Pylon clustering: %d pylons detected",
            len(pylon_positions),
        )

    # --- Step 6c: Wire-pylon association (strict mode) -----------------------
    if strict and len(pylon_positions) >= 2 and wire_segments:
        wire_segments, wire_mask = _filter_orphan_wires(
            cloud.xyz,
            wire_segments,
            wire_mask,
            pylon_positions,
            pylon_association_radius,
        )

    # --- Step 7: Catenary fitting --------------------------------------------
    if catenary_fit and wire_segments:
        wire_segments = _fit_catenaries(cloud.xyz, wire_segments)

    # --- Step 8: Clearance analysis ------------------------------------------
    clearance_violations: list[ClearanceViolation] = []
    confirmed_wire_indices = np.where(wire_mask)[0]
    if min_clearance is not None and len(confirmed_wire_indices) > 0:
        for idx in confirmed_wire_indices:
            h = hag[idx]
            if h < min_clearance:
                clearance_violations.append(
                    ClearanceViolation(
                        point_index=int(idx),
                        height_above_ground=float(h),
                        min_clearance=min_clearance,
                    )
                )
        if clearance_violations:
            logger.warning(
                "Clearance analysis: %d violations (min_clearance=%.1f)",
                len(clearance_violations),
                min_clearance,
            )

    return PowerlineResult(
        wire_mask=wire_mask,
        pylon_mask=pylon_mask,
        wire_segments=wire_segments,
        pylon_positions=pylon_positions,
        clearance_violations=clearance_violations,
    )