841 lines
31 KiB
Python
841 lines
31 KiB
Python
# coding: utf-8
|
||
# Shape Feature Analysis Module
|
||
# 用于分析二值化图像的形状特征
|
||
|
||
import numpy as np
|
||
import cv2
|
||
import pandas as pd
|
||
from skimage import measure, morphology
|
||
from skimage.segmentation import watershed
|
||
from skimage.feature import peak_local_max
|
||
from scipy import ndimage
|
||
import spectral
|
||
import os
|
||
import warnings
|
||
from shapely.geometry import shape
|
||
import geopandas as gpd
|
||
import json
|
||
warnings.filterwarnings('ignore')
|
||
|
||
|
||
class ShapeFeatureConfig:
|
||
"""
|
||
形状特征分析配置类
|
||
"""
|
||
def __init__(self):
|
||
# 输入数据设置
|
||
self.input_type = 'dat' # 'dat' 或 'shp'
|
||
self.dat_file_path = None # dat文件路径
|
||
self.hdr_file_path = None # 对应的hdr文件路径
|
||
self.shp_file_path = None # shp文件路径
|
||
self.band_index = 0 # 要处理的波段索引(对于dat文件)
|
||
|
||
# 连通域处理参数
|
||
self.connectivity = 8 # 连通性:4或8
|
||
self.min_area = 10 # 最小区域面积阈值
|
||
self.use_watershed = False # 是否使用分水岭算法分离相邻物体
|
||
|
||
# 分水岭算法参数
|
||
self.watershed_min_distance = 10 # 局部最大值的最小距离
|
||
|
||
# 输出设置
|
||
self.output_dir = "output"
|
||
self.output_csv = True # 是否输出CSV文件
|
||
self.save_labeled_image = False # 是否保存标记后的图像
|
||
|
||
def validate(self):
|
||
"""验证配置参数的有效性"""
|
||
if self.input_type not in ['dat', 'shp']:
|
||
raise ValueError("input_type必须是'dat'或'shp'")
|
||
|
||
if self.input_type == 'dat':
|
||
if not self.dat_file_path or not os.path.exists(self.dat_file_path):
|
||
raise FileNotFoundError(f"dat文件不存在: {self.dat_file_path}")
|
||
if self.hdr_file_path and not os.path.exists(self.hdr_file_path):
|
||
raise FileNotFoundError(f"hdr文件不存在: {self.hdr_file_path}")
|
||
|
||
elif self.input_type == 'shp':
|
||
if not self.shp_file_path or not os.path.exists(self.shp_file_path):
|
||
raise FileNotFoundError(f"shp文件不存在: {self.shp_file_path}")
|
||
|
||
if self.connectivity not in [4, 8]:
|
||
raise ValueError("connectivity必须是4或8")
|
||
|
||
if self.min_area < 1:
|
||
raise ValueError("min_area必须大于等于1")
|
||
|
||
return True
|
||
|
||
|
||
def load_binary_image_from_dat(dat_path, hdr_path=None, band_index=0):
|
||
"""
|
||
从dat文件加载二值化图像数据
|
||
|
||
参数:
|
||
- dat_path: dat文件路径
|
||
- hdr_path: 对应的hdr文件路径(可选)
|
||
- band_index: 要读取的波段索引
|
||
|
||
返回:
|
||
- binary_image: 二值化图像数组
|
||
- metadata: 元数据字典
|
||
"""
|
||
try:
|
||
# 如果提供了hdr文件路径,使用spectral库读取ENVI格式
|
||
if hdr_path and os.path.exists(hdr_path):
|
||
print(f"使用hdr文件: {hdr_path}")
|
||
img_obj = spectral.open_image(hdr_path)
|
||
image_data = img_obj.load()
|
||
print(f"通过spectral库读取成功,形状: {image_data.shape}")
|
||
|
||
# 如果没有hdr文件或读取失败,直接读取dat文件
|
||
else:
|
||
print(f"直接读取dat文件: {dat_path}")
|
||
|
||
# 从dat文件路径推断hdr文件路径
|
||
if not hdr_path:
|
||
hdr_path = dat_path.replace('.dat', '.hdr')
|
||
|
||
if os.path.exists(hdr_path):
|
||
# 读取hdr文件获取元数据
|
||
with open(hdr_path, 'r') as f:
|
||
hdr_content = f.read()
|
||
|
||
# 解析hdr文件
|
||
lines = hdr_content.split('\n')
|
||
metadata = {}
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if '=' in line:
|
||
key, value = line.split('=', 1)
|
||
key = key.strip()
|
||
value = value.strip()
|
||
|
||
if key == 'samples':
|
||
metadata['samples'] = int(value)
|
||
elif key == 'lines':
|
||
metadata['lines'] = int(value)
|
||
elif key == 'bands':
|
||
metadata['bands'] = int(value)
|
||
elif key == 'data type':
|
||
metadata['data_type'] = int(value)
|
||
|
||
print(f"从hdr文件解析的元数据: {metadata}")
|
||
|
||
# 读取dat文件
|
||
with open(dat_path, 'rb') as f:
|
||
if metadata['data_type'] == 1: # 8-bit unsigned integer
|
||
image_data = np.frombuffer(f.read(), dtype=np.uint8)
|
||
else:
|
||
raise ValueError(f"不支持的数据类型: {metadata['data_type']}")
|
||
|
||
# 重塑为正确的形状 (lines, samples, bands)
|
||
image_data = image_data.reshape(metadata['lines'], metadata['samples'], metadata['bands'])
|
||
print(f"重塑后的数据形状: {image_data.shape}")
|
||
|
||
else:
|
||
# 如果没有hdr文件,使用spectral库尝试读取
|
||
try:
|
||
img_obj = spectral.open_image(dat_path)
|
||
image_data = img_obj.load()
|
||
print(f"通过spectral库读取成功,形状: {image_data.shape}")
|
||
except:
|
||
raise FileNotFoundError(f"找不到对应的hdr文件: {hdr_path}")
|
||
|
||
# 处理波段选择
|
||
if len(image_data.shape) == 3:
|
||
if band_index >= image_data.shape[2]:
|
||
raise ValueError(f"波段索引 {band_index} 超出范围 [0, {image_data.shape[2]-1}]")
|
||
binary_image = image_data[:, :, band_index]
|
||
else:
|
||
binary_image = image_data
|
||
|
||
# 确保是二值图像
|
||
binary_image = binary_image.astype(np.uint8)
|
||
|
||
# 对于分割结果,通常已经是二值的,但确保值范围正确
|
||
unique_values = np.unique(binary_image)
|
||
print(f"图像中的唯一值: {unique_values}")
|
||
|
||
# 如果图像值范围不是0-1,转换为二值(假设非零值是前景)
|
||
if binary_image.max() > 1:
|
||
binary_image = (binary_image > 0).astype(np.uint8)
|
||
print("已转换为二值图像")
|
||
|
||
metadata = {
|
||
'shape': binary_image.shape,
|
||
'dtype': binary_image.dtype,
|
||
'source': 'dat',
|
||
'band_index': band_index,
|
||
'unique_values': unique_values.tolist()
|
||
}
|
||
|
||
print(f"最终图像形状: {binary_image.shape}, 数据类型: {binary_image.dtype}")
|
||
return binary_image, metadata
|
||
|
||
except Exception as e:
|
||
print(f"读取dat文件失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
raise
|
||
|
||
|
||
def load_regions_from_shp(shp_path, image_shape=None):
|
||
"""
|
||
从shp文件加载区域信息并转换为二值图像
|
||
|
||
参数:
|
||
- shp_path: shp文件路径
|
||
- image_shape: 目标图像形状 (height, width),用于创建二值图像
|
||
|
||
返回:
|
||
- binary_image: 二值化图像数组
|
||
- metadata: 元数据字典
|
||
"""
|
||
try:
|
||
# 读取shp文件
|
||
gdf = gpd.read_file(shp_path)
|
||
print(f"成功读取shp文件,包含 {len(gdf)} 个几何对象")
|
||
|
||
if image_shape is None:
|
||
# 如果没有指定图像形状,尝试从bounds推断
|
||
bounds = gdf.total_bounds # [minx, miny, maxx, maxy]
|
||
# 这里需要根据实际坐标系进行转换,暂时使用简单的假设
|
||
height = int(bounds[3] - bounds[1])
|
||
width = int(bounds[2] - bounds[0])
|
||
if height <= 0 or width <= 0:
|
||
raise ValueError("无法从shp文件推断图像尺寸,请提供image_shape参数")
|
||
image_shape = (height, width)
|
||
|
||
# 创建二值图像
|
||
binary_image = np.zeros(image_shape, dtype=np.uint8)
|
||
|
||
# 将每个几何对象转换为像素坐标并填充
|
||
for idx, geom in enumerate(gdf.geometry):
|
||
if geom is not None:
|
||
# 这里需要坐标转换逻辑,暂时使用简化的实现
|
||
# 实际实现需要根据shp文件的坐标系进行准确转换
|
||
try:
|
||
# 简化的几何转换(实际使用时需要更复杂的坐标变换)
|
||
coords = np.array(geom.exterior.coords) if hasattr(geom, 'exterior') else np.array(geom.coords)
|
||
|
||
# 归一化坐标到图像尺寸(这只是示例,实际需要正确的地理坐标转换)
|
||
coords_normalized = coords.copy()
|
||
if len(coords) > 0:
|
||
# 创建标签图像,每个区域用不同的值标记
|
||
label_value = idx + 1
|
||
# 这里应该实现正确的几何到像素的转换
|
||
# 暂时用占位符实现
|
||
pass
|
||
|
||
except Exception as e:
|
||
print(f"处理几何对象 {idx} 时出错: {e}")
|
||
continue
|
||
|
||
# 如果没有有效的几何转换,创建一个示例图像
|
||
if binary_image.max() == 0:
|
||
print("警告:未能从shp文件转换为有效的二值图像,使用示例数据")
|
||
binary_image[100:200, 100:200] = 1 # 示例矩形区域
|
||
|
||
metadata = {
|
||
'shape': binary_image.shape,
|
||
'dtype': binary_image.dtype,
|
||
'source': 'shp',
|
||
'num_features': len(gdf)
|
||
}
|
||
|
||
return binary_image, metadata
|
||
|
||
except Exception as e:
|
||
print(f"读取shp文件失败: {e}")
|
||
raise
|
||
|
||
|
||
def apply_watershed_segmentation(binary_image, config):
|
||
"""
|
||
使用分水岭算法分离相邻物体
|
||
|
||
参数:
|
||
- binary_image: 二值化图像
|
||
- config: 配置对象
|
||
|
||
返回:
|
||
- labeled_image: 分水岭分割后的标记图像
|
||
- num_objects: 物体数量
|
||
"""
|
||
try:
|
||
# 计算距离变换
|
||
distance = ndimage.distance_transform_edt(binary_image)
|
||
|
||
# 找到局部最大值作为种子点
|
||
local_maxi = peak_local_max(distance,
|
||
indices=False,
|
||
footprint=np.ones((3, 3)),
|
||
labels=binary_image,
|
||
min_distance=config.watershed_min_distance)
|
||
|
||
# 标记种子点
|
||
markers = ndimage.label(local_maxi)[0]
|
||
|
||
# 应用分水岭算法
|
||
labels = watershed(-distance, markers, mask=binary_image)
|
||
|
||
# 重新标记以确保连续的标签
|
||
unique_labels = np.unique(labels)
|
||
unique_labels = unique_labels[unique_labels > 0] # 排除背景
|
||
|
||
relabeled_image = np.zeros_like(labels)
|
||
for new_label, old_label in enumerate(unique_labels, 1):
|
||
relabeled_image[labels == old_label] = new_label
|
||
|
||
num_objects = len(unique_labels)
|
||
print(f"分水岭分割完成,识别出 {num_objects} 个物体")
|
||
|
||
return relabeled_image.astype(np.int32), num_objects
|
||
|
||
except Exception as e:
|
||
print(f"分水岭分割失败: {e}")
|
||
raise
|
||
|
||
|
||
def label_connected_components(binary_image, config):
|
||
"""
|
||
标记连通域,可选择使用分水岭算法
|
||
|
||
参数:
|
||
- binary_image: 二值化图像
|
||
- config: 配置对象
|
||
|
||
返回:
|
||
- labeled_image: 标记后的图像
|
||
- num_objects: 物体数量
|
||
"""
|
||
try:
|
||
if config.use_watershed:
|
||
# 使用分水岭算法
|
||
labeled_image, num_objects = apply_watershed_segmentation(binary_image, config)
|
||
else:
|
||
# 使用标准连通域标记
|
||
if config.connectivity == 8:
|
||
structure = np.ones((3, 3), dtype=int)
|
||
else: # connectivity == 4
|
||
structure = np.array([[0, 1, 0],
|
||
[1, 1, 1],
|
||
[0, 1, 0]], dtype=int)
|
||
|
||
labeled_image, num_objects = ndimage.label(binary_image, structure=structure)
|
||
|
||
# 过滤小区域
|
||
if config.min_area > 1:
|
||
component_sizes = np.bincount(labeled_image.ravel())
|
||
too_small = component_sizes < config.min_area
|
||
too_small_mask = too_small[labeled_image]
|
||
labeled_image[too_small_mask] = 0
|
||
|
||
# 重新标记以保持连续性
|
||
unique_labels = np.unique(labeled_image)
|
||
unique_labels = unique_labels[unique_labels > 0]
|
||
|
||
relabeled_image = np.zeros_like(labeled_image)
|
||
for new_label, old_label in enumerate(unique_labels, 1):
|
||
relabeled_image[labeled_image == old_label] = new_label
|
||
|
||
labeled_image = relabeled_image
|
||
num_objects = len(unique_labels)
|
||
|
||
print(f"连通域标记完成,找到 {num_objects} 个有效区域")
|
||
return labeled_image, num_objects
|
||
|
||
except Exception as e:
|
||
print(f"连通域标记失败: {e}")
|
||
raise
|
||
|
||
|
||
def calculate_shape_features(labeled_image, config):
|
||
"""
|
||
计算所有形状特征指标
|
||
|
||
参数:
|
||
- labeled_image: 标记后的图像
|
||
- config: 配置对象
|
||
|
||
返回:
|
||
- features_df: 包含所有特征的DataFrame
|
||
- contour_coords_dict: 轮廓坐标字典
|
||
"""
|
||
try:
|
||
# 获取区域属性
|
||
regions = measure.regionprops(labeled_image)
|
||
|
||
if len(regions) == 0:
|
||
print("警告:没有找到有效的区域")
|
||
return pd.DataFrame(), {}
|
||
|
||
features_list = []
|
||
contour_coords_dict = {}
|
||
|
||
for region in regions:
|
||
try:
|
||
# 基本属性
|
||
label = region.label
|
||
area = region.area
|
||
perimeter = region.perimeter
|
||
|
||
# 边界框和最小外接矩形
|
||
min_row, min_col, max_row, max_col = region.bbox
|
||
bbox_width = max_col - min_col
|
||
bbox_height = max_row - min_row
|
||
bbox_area = bbox_width * bbox_height
|
||
|
||
# 获取轮廓坐标
|
||
coords = region.coords # 像素坐标
|
||
|
||
# 计算各种形状特征
|
||
feature_dict = {
|
||
'label': label,
|
||
'area': area,
|
||
'perimeter': perimeter,
|
||
'bbox_width': bbox_width,
|
||
'bbox_height': bbox_height,
|
||
'bbox_area': bbox_area
|
||
}
|
||
|
||
# 1. 圆形度 (Circularity) = (4 * π * 面积) / (周长²)
|
||
if perimeter > 0:
|
||
circularity = (4 * np.pi * area) / (perimeter ** 2)
|
||
else:
|
||
circularity = 0
|
||
feature_dict['circularity'] = circularity
|
||
|
||
# 2. 矩形度 (Rectangularity) = 面积 / 外接矩形面积
|
||
rectangularity = area / bbox_area if bbox_area > 0 else 0
|
||
feature_dict['rectangularity'] = rectangularity
|
||
|
||
# 3. 长宽比 (Aspect Ratio) = 宽度 / 高度
|
||
aspect_ratio = bbox_width / bbox_height if bbox_height > 0 else 0
|
||
feature_dict['aspect_ratio'] = aspect_ratio
|
||
|
||
# 4. 紧密度 (Compactness) = (周长²) / 面积
|
||
compactness = (perimeter ** 2) / area if area > 0 else 0
|
||
feature_dict['compactness'] = compactness
|
||
|
||
# 5. 偏心率 (Eccentricity) - 使用regionprops的eccentricity
|
||
eccentricity = region.eccentricity
|
||
feature_dict['eccentricity'] = eccentricity
|
||
|
||
# 6. 形状矩 (Shape Moments) - Hu矩
|
||
if hasattr(region, 'moments_hu'):
|
||
hu_moments = region.moments_hu
|
||
for i, hu_moment in enumerate(hu_moments):
|
||
feature_dict[f'hu_moment_{i+1}'] = hu_moment
|
||
else:
|
||
# 如果没有Hu矩,计算为0
|
||
for i in range(7):
|
||
feature_dict[f'hu_moment_{i+1}'] = 0
|
||
|
||
# 7. 凸性 (Convexity) = 面积 / 凸包面积
|
||
convex_area = region.convex_area
|
||
convexity = area / convex_area if convex_area > 0 else 0
|
||
feature_dict['convexity'] = convexity
|
||
|
||
# 8. 固体度 (Solidity) = 面积 / 凸包面积 (与凸性相同)
|
||
solidity = convexity # 在scikit-image中,solidity就是这个定义
|
||
feature_dict['solidity'] = solidity
|
||
|
||
# 9. 轮廓矩 (Contour Moments) - 使用中心矩
|
||
if hasattr(region, 'moments_central'):
|
||
central_moments = region.moments_central
|
||
feature_dict['central_moment_02'] = central_moments[0, 2] # μ02
|
||
feature_dict['central_moment_20'] = central_moments[2, 0] # μ20
|
||
feature_dict['central_moment_11'] = central_moments[1, 1] # μ11
|
||
else:
|
||
feature_dict['central_moment_02'] = 0
|
||
feature_dict['central_moment_20'] = 0
|
||
feature_dict['central_moment_11'] = 0
|
||
|
||
# 10. 质心坐标
|
||
centroid_row, centroid_col = region.centroid
|
||
feature_dict['centroid_row'] = centroid_row
|
||
feature_dict['centroid_col'] = centroid_col
|
||
|
||
# 11. 主要轴长度和角度
|
||
if hasattr(region, 'major_axis_length'):
|
||
feature_dict['major_axis_length'] = region.major_axis_length
|
||
feature_dict['minor_axis_length'] = region.minor_axis_length
|
||
feature_dict['orientation'] = region.orientation
|
||
else:
|
||
feature_dict['major_axis_length'] = 0
|
||
feature_dict['minor_axis_length'] = 0
|
||
feature_dict['orientation'] = 0
|
||
|
||
# 12. 边界坐标序列(将保存到单独的JSON文件中)
|
||
coords_list = coords.tolist()
|
||
coord_key = f"region_{region.label}_coords"
|
||
feature_dict['contour_coords'] = coord_key # 引用标识符
|
||
|
||
# 将轮廓坐标添加到字典中
|
||
contour_coords_dict[coord_key] = coords_list
|
||
|
||
# 13. 其他有用的特征
|
||
feature_dict['equivalent_diameter'] = region.equivalent_diameter
|
||
feature_dict['extent'] = region.extent # 面积与边界框面积之比
|
||
|
||
features_list.append(feature_dict)
|
||
|
||
except Exception as e:
|
||
print(f"计算区域 {region.label} 的特征时出错: {e}")
|
||
continue
|
||
|
||
# 创建DataFrame
|
||
features_df = pd.DataFrame(features_list)
|
||
|
||
print(f"成功计算 {len(features_df)} 个区域的形状特征")
|
||
return features_df, contour_coords_dict
|
||
|
||
except Exception as e:
|
||
print(f"计算形状特征失败: {e}")
|
||
raise
|
||
|
||
|
||
def save_features_to_csv(features_df, output_path, config, contour_coords_dict=None):
|
||
"""
|
||
将形状特征保存为CSV文件
|
||
|
||
参数:
|
||
- features_df: 特征DataFrame
|
||
- output_path: 输出文件路径(不含扩展名)
|
||
- config: 配置对象
|
||
- contour_coords_dict: 轮廓坐标字典(可选)
|
||
"""
|
||
try:
|
||
# 确保输出目录存在
|
||
output_dir = os.path.dirname(output_path)
|
||
if output_dir and not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
# 如果有轮廓坐标数据,保存到单独的JSON文件
|
||
if contour_coords_dict is not None and len(contour_coords_dict) > 0:
|
||
coords_json_path = f"{output_path}_contour_coords.json"
|
||
with open(coords_json_path, 'w', encoding='utf-8') as f:
|
||
json.dump(contour_coords_dict, f, indent=2, ensure_ascii=False)
|
||
print(f"轮廓坐标已保存到: {coords_json_path}")
|
||
|
||
# 保存为CSV文件
|
||
csv_path = f"{output_path}.csv"
|
||
features_df.to_csv(csv_path, index=False, encoding='utf-8')
|
||
|
||
print(f"形状特征已保存到: {csv_path}")
|
||
print(f"共保存 {len(features_df)} 个区域的特征数据")
|
||
|
||
# 打印特征统计信息
|
||
if len(features_df) > 0:
|
||
numeric_columns = features_df.select_dtypes(include=[np.number]).columns
|
||
print(f"数值特征列数: {len(numeric_columns)}")
|
||
print(f"特征列: {list(features_df.columns)}")
|
||
|
||
return csv_path
|
||
|
||
except Exception as e:
|
||
print(f"保存CSV文件失败: {e}")
|
||
raise
|
||
|
||
|
||
def save_labeled_image(labeled_image, output_path, config):
|
||
"""
|
||
保存标记后的图像(可选)
|
||
|
||
参数:
|
||
- labeled_image: 标记后的图像
|
||
- output_path: 输出文件路径(不含扩展名)
|
||
- config: 配置对象
|
||
"""
|
||
try:
|
||
# 保存为PNG图像用于可视化
|
||
png_path = f"{output_path}_labeled.png"
|
||
|
||
# 归一化标签图像以便可视化
|
||
if labeled_image.max() > 0:
|
||
normalized_image = (labeled_image / labeled_image.max() * 255).astype(np.uint8)
|
||
else:
|
||
normalized_image = labeled_image.astype(np.uint8)
|
||
|
||
# 创建彩色标签图像
|
||
colored_image = cv2.applyColorMap(normalized_image, cv2.COLORMAP_JET)
|
||
|
||
# 保存图像
|
||
cv2.imwrite(png_path, colored_image)
|
||
print(f"标记图像已保存到: {png_path}")
|
||
|
||
# 同时保存为dat文件(ENVI格式)
|
||
dat_path = f"{output_path}_labeled.dat"
|
||
with open(dat_path, 'wb') as f:
|
||
labeled_image.astype(np.int32).tofile(f)
|
||
|
||
# 保存对应的hdr文件
|
||
hdr_path = f"{output_path}_labeled.hdr"
|
||
header_content = f"""ENVI
|
||
description = {{Labeled Image from Shape Analysis [Watershed: {config.use_watershed}]}}
|
||
samples = {labeled_image.shape[1]}
|
||
lines = {labeled_image.shape[0]}
|
||
bands = 1
|
||
header offset = 0
|
||
file type = ENVI Standard
|
||
data type = 3
|
||
interleave = bsq
|
||
byte order = 0
|
||
band names = {{Labeled_Result}}
|
||
classes = {labeled_image.max() + 1}
|
||
class names = {{Background{' , '.join([f'Region_{i}' for i in range(1, labeled_image.max() + 1)])}}}
|
||
"""
|
||
|
||
with open(hdr_path, 'w', encoding='utf-8') as f:
|
||
f.write(header_content)
|
||
|
||
print(f"ENVI格式标记数据已保存到: {dat_path}")
|
||
|
||
except Exception as e:
|
||
print(f"保存标记图像失败: {e}")
|
||
|
||
|
||
def analyze_shape_features(config):
|
||
"""
|
||
执行形状特征分析的主函数
|
||
|
||
参数:
|
||
- config: 配置对象
|
||
|
||
返回:
|
||
- features_df: 特征DataFrame
|
||
- labeled_image: 标记后的图像
|
||
"""
|
||
try:
|
||
print("=" * 60)
|
||
print("开始形状特征分析")
|
||
print("=" * 60)
|
||
|
||
# 1. 加载数据
|
||
print("1. 加载输入数据...")
|
||
if config.input_type == 'dat':
|
||
binary_image, metadata = load_binary_image_from_dat(
|
||
config.dat_file_path, config.hdr_file_path, config.band_index
|
||
)
|
||
elif config.input_type == 'shp':
|
||
binary_image, metadata = load_regions_from_shp(
|
||
config.shp_file_path, image_shape=None
|
||
)
|
||
else:
|
||
raise ValueError(f"不支持的输入类型: {config.input_type}")
|
||
|
||
print(f"数据加载完成,图像形状: {binary_image.shape}")
|
||
binary_image = binary_image.squeeze()
|
||
# 2. 连通域标记
|
||
print("2. 执行连通域标记...")
|
||
labeled_image, num_objects = label_connected_components(binary_image, config)
|
||
print(f"找到 {num_objects} 个连通域")
|
||
|
||
if num_objects == 0:
|
||
print("警告:没有找到有效的连通域")
|
||
return pd.DataFrame(), labeled_image
|
||
|
||
# 3. 计算形状特征
|
||
print("3. 计算形状特征...")
|
||
features_df, contour_coords_dict = calculate_shape_features(labeled_image, config)
|
||
|
||
if len(features_df) == 0:
|
||
print("警告:未能计算出有效的形状特征")
|
||
return features_df, labeled_image
|
||
|
||
# 4. 保存结果
|
||
print("4. 保存分析结果...")
|
||
|
||
# 生成输出文件名
|
||
timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
|
||
base_name = f"shape_features_{config.input_type}"
|
||
|
||
if config.input_type == 'dat':
|
||
base_name += f"_band{config.band_index}"
|
||
if config.use_watershed:
|
||
base_name += "_watershed"
|
||
|
||
base_name += f"_{timestamp}"
|
||
output_path = os.path.join(config.output_dir, base_name)
|
||
|
||
# 保存CSV文件
|
||
if config.output_csv:
|
||
csv_path = save_features_to_csv(features_df, output_path, config, contour_coords_dict)
|
||
|
||
# 保存标记图像
|
||
if config.save_labeled_image:
|
||
save_labeled_image(labeled_image, output_path, config)
|
||
|
||
print("=" * 60)
|
||
print("形状特征分析完成!")
|
||
print(f"分析了 {len(features_df)} 个区域")
|
||
print(f"计算了 {len(features_df.columns)} 个特征指标")
|
||
if config.output_csv:
|
||
print(f"结果已保存到: {csv_path}")
|
||
print("=" * 60)
|
||
|
||
return features_df, labeled_image
|
||
|
||
except Exception as e:
|
||
print(f"形状特征分析失败: {e}")
|
||
raise
|
||
|
||
|
||
def main():
|
||
"""主函数:命令行接口"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description='形状特征分析工具 - 从分割结果提取形状特征',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
使用示例:
|
||
1. 从dat文件分析形状特征:
|
||
python shape_feature.py input.dat -t dat -b 0 -o shape_features
|
||
|
||
2. 从shp文件分析形状特征:
|
||
python shape_feature.py input.shp -t shp -o shape_features
|
||
|
||
3. 启用分水岭算法分离物体:
|
||
python shape_feature.py input.dat -t dat --use-watershed --watershed-min-distance 15 -o features
|
||
|
||
4. 自定义连通性和面积阈值:
|
||
python shape_feature.py input.dat -t dat -c 4 --min-area 100 --save-labeled-image -o results
|
||
|
||
输入格式:
|
||
- dat: ENVI格式的分割结果文件 (.dat + .hdr)
|
||
- shp: Shapefile格式的矢量数据
|
||
|
||
形状特征包括:
|
||
- 面积、周长、紧密度
|
||
- 圆度、矩形度、伸长度
|
||
- 质心坐标、边界框
|
||
- 方向角、主轴长度
|
||
- Hu矩不变量等几何特征
|
||
"""
|
||
)
|
||
|
||
parser.add_argument('input_path', help='输入文件路径 (.dat分割结果 或 .shp矢量文件)')
|
||
parser.add_argument('-t', '--input-type', choices=['dat', 'shp'], default='dat',
|
||
help='输入文件类型 (默认: dat)')
|
||
parser.add_argument('-o', '--output', required=True,
|
||
help='输出文件路径前缀')
|
||
|
||
# dat文件参数
|
||
parser.add_argument('-b', '--band-index', type=int, default=0,
|
||
help='dat文件的波段索引 (默认: 0)')
|
||
parser.add_argument('--hdr-path',
|
||
help='dat文件对应的hdr文件路径 (默认自动查找)')
|
||
|
||
# 连通域参数
|
||
parser.add_argument('-c', '--connectivity', type=int, choices=[4, 8], default=8,
|
||
help='连通性,4或8 (默认: 8)')
|
||
parser.add_argument('--min-area', type=int, default=10,
|
||
help='最小区域面积阈值 (默认: 10)')
|
||
|
||
# 分水岭参数
|
||
parser.add_argument('--use-watershed', action='store_true',
|
||
help='启用分水岭算法分离相邻物体')
|
||
parser.add_argument('--watershed-min-distance', type=int, default=10,
|
||
help='分水岭局部最大值的最小距离 (默认: 10)')
|
||
|
||
# 输出选项
|
||
parser.add_argument('--output-dir', default='output',
|
||
help='输出目录 (默认: output)')
|
||
parser.add_argument('--save-labeled-image', action='store_true',
|
||
help='保存标记后的图像文件')
|
||
|
||
args = parser.parse_args()
|
||
|
||
try:
|
||
print("=" * 70)
|
||
print("形状特征分析工具")
|
||
print("=" * 70)
|
||
print(f"输入文件: {args.input_path}")
|
||
print(f"输入类型: {args.input_type}")
|
||
print(f"输出路径: {args.output}")
|
||
print(f"输出目录: {args.output_dir}")
|
||
|
||
if args.input_type == 'dat':
|
||
print(f"波段索引: {args.band_index}")
|
||
print(f"HDR文件: {args.hdr_path or '自动查找'}")
|
||
|
||
print(f"连通性: {args.connectivity}")
|
||
print(f"最小面积: {args.min_area}")
|
||
print(f"使用分水岭: {args.use_watershed}")
|
||
|
||
if args.use_watershed:
|
||
print(f"分水岭最小距离: {args.watershed_min_distance}")
|
||
print()
|
||
|
||
# 创建配置
|
||
config = ShapeFeatureConfig()
|
||
config.input_type = args.input_type
|
||
config.output_dir = args.output_dir
|
||
config.connectivity = args.connectivity
|
||
config.min_area = args.min_area
|
||
config.use_watershed = args.use_watershed
|
||
config.watershed_min_distance = args.watershed_min_distance
|
||
config.save_labeled_image = args.save_labeled_image
|
||
|
||
# 设置输入文件路径
|
||
if args.input_type == 'dat':
|
||
config.dat_file_path = args.input_path
|
||
config.hdr_file_path = args.hdr_path
|
||
config.band_index = args.band_index
|
||
elif args.input_type == 'shp':
|
||
config.shp_file_path = args.input_path
|
||
|
||
# 验证配置
|
||
try:
|
||
config.validate()
|
||
print("配置验证通过")
|
||
except Exception as e:
|
||
print(f"✗ 配置验证失败: {e}")
|
||
return 1
|
||
|
||
# 执行形状特征分析
|
||
print("\n开始形状特征分析...")
|
||
features_df, labeled_image = analyze_shape_features(config)
|
||
|
||
# 显示结果统计
|
||
if len(features_df) > 0:
|
||
print("\n✓ 分析完成!")
|
||
print(f"检测到的物体数量: {len(features_df)}")
|
||
print(f"提取的特征数量: {len(features_df.columns)}")
|
||
|
||
# 显示特征名称
|
||
feature_names = [col for col in features_df.columns if col not in ['label', 'centroid_row', 'centroid_col']]
|
||
print(f"形状特征: {', '.join(feature_names[:10])}{'...' if len(feature_names) > 10 else ''}")
|
||
|
||
# 显示统计信息
|
||
print("\n特征统计:")
|
||
numeric_cols = features_df.select_dtypes(include=[np.number]).columns
|
||
if len(numeric_cols) > 0:
|
||
stats = features_df[numeric_cols].describe()
|
||
print(f" 面积范围: {features_df['area'].min():.0f} - {features_df['area'].max():.0f} 像素")
|
||
print(f" 平均面积: {features_df['area'].mean():.1f} 像素")
|
||
print(f" 周长范围: {features_df['perimeter'].min():.1f} - {features_df['perimeter'].max():.1f}")
|
||
print(f" 圆度范围: {features_df['circularity'].min():.3f} - {features_df['circularity'].max():.3f}")
|
||
|
||
else:
|
||
print("\n⚠ 未检测到任何物体,请检查输入数据或调整参数")
|
||
|
||
print("\n" + "=" * 70)
|
||
print("形状特征分析完成!")
|
||
print("=" * 70)
|
||
|
||
except Exception as e:
|
||
print(f"✗ 处理失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main() |