Skip to content

Commit

Permalink
Moved get_layer_data to plugin manager base. Created erosion filter.
Browse files Browse the repository at this point in the history
  • Loading branch information
mktk1117 committed Feb 24, 2024
1 parent 084405f commit cfb3fc7
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 156 deletions.
12 changes: 11 additions & 1 deletion elevation_mapping_cupy/config/core/plugin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,14 @@ inpainting:
layer_name: "inpaint"
extra_params:
method: "telea" # telea or ns
# Apply smoothing for inpainted layer
# Apply smoothing for inpainted layer
erosion:
enable: True
fill_nan: False
is_height_layer: False
layer_name: "erosion"
extra_params:
input_layer_name: "traversability"
dilation_size: 3
iteration_n: 20
reverse: True
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ publishers:
fps: 5.0

filtered_elevation_map:
layers: ['inpaint', 'smooth', 'min_filter']
layers: ['inpaint', 'smooth', 'min_filter', 'erosion']
basic_layers: ['inpaint']
fps: 5.0

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Copyright (c) 2024, Takahiro Miki. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for details.
#
import cv2 as cv
import cupy as cp
import numpy as np

from typing import List

from .plugin_manager import PluginBase


class Erosion(PluginBase):
"""
This class is used for applying erosion to an elevation map or specific layers within it.
Erosion is a morphological operation that is used to remove small-scale details from a binary image.
Args:
kernel_size (int): Size of the erosion kernel. Default is 3, which means a 3x3 square kernel.
iterations (int): Number of times erosion is applied. Default is 1.
**kwargs (): Additional keyword arguments.
"""

def __init__(
self,
input_layer_name="traversability",
kernel_size: int = 3,
iterations: int = 1,
reverse: bool = False,
**kwargs,
):
super().__init__()
self.input_layer_name = input_layer_name
self.kernel_size = kernel_size
self.iterations = iterations
self.reverse = reverse

def __call__(
self,
elevation_map: cp.ndarray,
layer_names: List[str],
plugin_layers: cp.ndarray,
plugin_layer_names: List[str],
semantic_map: cp.ndarray,
semantic_layer_names: List[str],
*args,
) -> cp.ndarray:
"""
Applies erosion to the given elevation map.
Args:
elevation_map (cupy._core.core.ndarray): The elevation map to be eroded.
layer_names (List[str]): Names of the layers in the elevation map.
plugin_layers (cupy._core.core.ndarray): Layers provided by other plugins.
plugin_layer_names (List[str]): Names of the layers provided by other plugins.
*args (): Additional arguments.
Returns:
cupy._core.core.ndarray: The eroded elevation map.
"""
# Convert the elevation map to a format suitable for erosion (if necessary)
layer_data = self.get_layer_data(
elevation_map,
layer_names,
plugin_layers,
plugin_layer_names,
semantic_map,
semantic_layer_names,
self.input_layer_name,
)
layer_np = cp.asnumpy(layer_data)

# Define the erosion kernel
kernel = np.ones((self.kernel_size, self.kernel_size), np.uint8)

if self.reverse:
layer_np = 1 - layer_np
# Apply erosion
layer_min = float(layer_np.min())
layer_max = float(layer_np.max())
layer_np_normalized = ((layer_np - layer_min) * 255 / (layer_max - layer_min)).astype("uint8")
eroded_map_np = cv.erode(layer_np_normalized, kernel, iterations=self.iterations)
eroded_map_np = eroded_map_np.astype(np.float32) * (layer_max - layer_min) / 255 + layer_min
if self.reverse:
eroded_map_np = 1 - eroded_map_np

# Convert back to cupy array and return
return cp.asarray(eroded_map_np)
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ class MaxLayerFilter(PluginBase):
Args:
cell_n (int): The width and height of the elevation map.
reverse (list): A list of boolean values indicating whether to reverse the filter operation for each layer. Default is [True].
min_or_max (str): A string indicating whether to apply a minimum or maximum filter. Accepts "min" or "max". Default is "max".
layers (list): List of layers for semantic traversability. Default is ["traversability"].
thresholds (list): List of thresholds for each layer. Default is [0.5].
type (list): List of types for each layer. Default is ["traversability"].
thresholds (list): List of thresholds for each layer. If the value is bigger than a threshold, assign 1.0 otherwise 0.0. If it is False, it does not apply. Default is [False].
**kwargs: Additional keyword arguments.
"""

Expand All @@ -29,7 +30,6 @@ def __init__(
min_or_max: str = "max",
thresholds: list = [False],
default_value: float = 0.0,
apply_sqrt: bool = True,
**kwargs,
):
super().__init__()
Expand All @@ -38,31 +38,30 @@ def __init__(
self.min_or_max = min_or_max
self.thresholds = thresholds
self.default_value = default_value
self.apply_sqrt = apply_sqrt

def get_layer_data(
self,
elevation_map,
layer_names,
plugin_layers,
plugin_layer_names,
semantic_map,
semantic_layer_names,
name,
):
if name in layer_names:
idx = layer_names.index(name)
layer = elevation_map[idx].copy()
elif name in plugin_layer_names:
idx = plugin_layer_names.index(name)
layer = plugin_layers[idx].copy()
elif name in semantic_layer_names:
idx = semantic_layer_names.index(name)
layer = semantic_map[idx].copy()
else:
print(f"Could not find layer {name}!")
layer = None
return layer
# def get_layer_data(
# self,
# elevation_map,
# layer_names,
# plugin_layers,
# plugin_layer_names,
# semantic_map,
# semantic_layer_names,
# name,
# ):
# if name in layer_names:
# idx = layer_names.index(name)
# layer = elevation_map[idx].copy()
# elif name in plugin_layer_names:
# idx = plugin_layer_names.index(name)
# layer = plugin_layers[idx].copy()
# elif name in semantic_layer_names:
# idx = semantic_layer_names.index(name)
# layer = semantic_map[idx].copy()
# else:
# print(f"Could not find layer {name}!")
# layer = None
# return layer

def __call__(
self,
Expand Down Expand Up @@ -121,6 +120,4 @@ def __call__(
result = cp.min(result, axis=0)
else:
result = cp.max(result, axis=0)
if self.apply_sqrt:
result = cp.square(result)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
from abc import ABC
import cupy as cp
from typing import List, Dict
from typing import List, Dict, Optional
import importlib
import inspect
from dataclasses import dataclass
Expand Down Expand Up @@ -39,6 +39,8 @@ def __call__(
layer_names: List[str],
plugin_layers: cp.ndarray,
plugin_layer_names: List[str],
semantic_map: cp.ndarray,
semantic_layer_names: List[str],
*args,
**kwargs,
) -> cp.ndarray:
Expand All @@ -50,6 +52,8 @@ def __call__(
layer_names ():
plugin_layers ():
plugin_layer_names ():
semantic_map ():
semantic_layer_names ():
Run your processing here and return the result.
layer of elevation_map 0: elevation
Expand All @@ -64,6 +68,45 @@ def __call__(
"""
pass

def get_layer_data(
self,
elevation_map: cp.ndarray,
layer_names: List[str],
plugin_layers: cp.ndarray,
plugin_layer_names: List[str],
semantic_map: cp.ndarray,
semantic_layer_names: List[str],
name: str,
) -> Optional[cp.ndarray]:
"""
Retrieve a copy of the layer data from the elevation, plugin, or semantic maps based on the layer name.
Args:
elevation_map (cp.ndarray): The elevation map containing various layers.
layer_names (List[str]): A list of names for each layer in the elevation map.
plugin_layers (cp.ndarray): The plugin layers containing additional data.
plugin_layer_names (List[str]): A list of names for each layer in the plugin layers.
semantic_map (cp.ndarray): The semantic map containing various layers.
semantic_layer_names (List[str]): A list of names for each layer in the semantic map.
name (str): The name of the layer to retrieve.
Returns:
Optional[cp.ndarray]: A copy of the requested layer as a cupy ndarray if found, otherwise None.
"""
if name in layer_names:
idx = layer_names.index(name)
layer = elevation_map[idx].copy()
elif name in plugin_layer_names:
idx = plugin_layer_names.index(name)
layer = plugin_layers[idx].copy()
elif name in semantic_layer_names:
idx = semantic_layer_names.index(name)
layer = semantic_map[idx].copy()
else:
print(f"Could not find layer {name}!")
layer = None
return layer


class PluginManager(object):
"""
Expand Down Expand Up @@ -152,11 +195,22 @@ def update_with_name(
self.layers[idx] = self.plugins[idx](elevation_map, layer_names, self.layers, self.layer_names)
elif n_param == 7:
self.layers[idx] = self.plugins[idx](
elevation_map, layer_names, self.layers, self.layer_names, semantic_map, semantic_params,
elevation_map,
layer_names,
self.layers,
self.layer_names,
semantic_map,
semantic_params,
)
elif n_param == 8:
self.layers[idx] = self.plugins[idx](
elevation_map, layer_names, self.layers, self.layer_names, semantic_map, semantic_params, rotation,
elevation_map,
layer_names,
self.layers,
self.layer_names,
semantic_map,
semantic_params,
rotation,
)
else:
self.layers[idx] = self.plugins[idx](
Expand Down
Loading

0 comments on commit cfb3fc7

Please sign in to comment.