# coding: utf-8 # Shape Feature Analysis Module # 用于分析二值化图像的形状特征 import numpy as np import cv2 import pandas as pd from skimage import measure, morphology from skimage.segmentation import watershed from skimage.feature import peak_local_max from scipy import ndimage import spectral import os import warnings from shapely.geometry import shape import geopandas as gpd import json warnings.filterwarnings('ignore') class ShapeFeatureConfig: """ 形状特征分析配置类 """ def __init__(self): # 输入数据设置 self.input_type = 'dat' # 'dat' 或 'shp' self.dat_file_path = None # dat文件路径 self.hdr_file_path = None # 对应的hdr文件路径 self.shp_file_path = None # shp文件路径 self.band_index = 0 # 要处理的波段索引(对于dat文件) # 连通域处理参数 self.connectivity = 8 # 连通性:4或8 self.min_area = 10 # 最小区域面积阈值 self.use_watershed = False # 是否使用分水岭算法分离相邻物体 # 分水岭算法参数 self.watershed_min_distance = 10 # 局部最大值的最小距离 # 输出设置 self.output_dir = "output" self.output_csv = True # 是否输出CSV文件 self.save_labeled_image = False # 是否保存标记后的图像 def validate(self): """验证配置参数的有效性""" if self.input_type not in ['dat', 'shp']: raise ValueError("input_type必须是'dat'或'shp'") if self.input_type == 'dat': if not self.dat_file_path or not os.path.exists(self.dat_file_path): raise FileNotFoundError(f"dat文件不存在: {self.dat_file_path}") if self.hdr_file_path and not os.path.exists(self.hdr_file_path): raise FileNotFoundError(f"hdr文件不存在: {self.hdr_file_path}") elif self.input_type == 'shp': if not self.shp_file_path or not os.path.exists(self.shp_file_path): raise FileNotFoundError(f"shp文件不存在: {self.shp_file_path}") if self.connectivity not in [4, 8]: raise ValueError("connectivity必须是4或8") if self.min_area < 1: raise ValueError("min_area必须大于等于1") return True def load_binary_image_from_dat(dat_path, hdr_path=None, band_index=0): """ 从dat文件加载二值化图像数据 参数: - dat_path: dat文件路径 - hdr_path: 对应的hdr文件路径(可选) - band_index: 要读取的波段索引 返回: - binary_image: 二值化图像数组 - metadata: 元数据字典 """ try: # 如果提供了hdr文件路径,使用spectral库读取ENVI格式 if hdr_path and os.path.exists(hdr_path): print(f"使用hdr文件: {hdr_path}") img_obj = spectral.open_image(hdr_path) image_data = img_obj.load() print(f"通过spectral库读取成功,形状: {image_data.shape}") # 如果没有hdr文件或读取失败,直接读取dat文件 else: print(f"直接读取dat文件: {dat_path}") # 从dat文件路径推断hdr文件路径 if not hdr_path: hdr_path = dat_path.replace('.dat', '.hdr') if os.path.exists(hdr_path): # 读取hdr文件获取元数据 with open(hdr_path, 'r') as f: hdr_content = f.read() # 解析hdr文件 lines = hdr_content.split('\n') metadata = {} for line in lines: line = line.strip() if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() if key == 'samples': metadata['samples'] = int(value) elif key == 'lines': metadata['lines'] = int(value) elif key == 'bands': metadata['bands'] = int(value) elif key == 'data type': metadata['data_type'] = int(value) print(f"从hdr文件解析的元数据: {metadata}") # 读取dat文件 with open(dat_path, 'rb') as f: if metadata['data_type'] == 1: # 8-bit unsigned integer image_data = np.frombuffer(f.read(), dtype=np.uint8) else: raise ValueError(f"不支持的数据类型: {metadata['data_type']}") # 重塑为正确的形状 (lines, samples, bands) image_data = image_data.reshape(metadata['lines'], metadata['samples'], metadata['bands']) print(f"重塑后的数据形状: {image_data.shape}") else: # 如果没有hdr文件,使用spectral库尝试读取 try: img_obj = spectral.open_image(dat_path) image_data = img_obj.load() print(f"通过spectral库读取成功,形状: {image_data.shape}") except: raise FileNotFoundError(f"找不到对应的hdr文件: {hdr_path}") # 处理波段选择 if len(image_data.shape) == 3: if band_index >= image_data.shape[2]: raise ValueError(f"波段索引 {band_index} 超出范围 [0, {image_data.shape[2]-1}]") binary_image = image_data[:, :, band_index] else: binary_image = image_data # 确保是二值图像 binary_image = binary_image.astype(np.uint8) # 对于分割结果,通常已经是二值的,但确保值范围正确 unique_values = np.unique(binary_image) print(f"图像中的唯一值: {unique_values}") # 如果图像值范围不是0-1,转换为二值(假设非零值是前景) if binary_image.max() > 1: binary_image = (binary_image > 0).astype(np.uint8) print("已转换为二值图像") metadata = { 'shape': binary_image.shape, 'dtype': binary_image.dtype, 'source': 'dat', 'band_index': band_index, 'unique_values': unique_values.tolist() } print(f"最终图像形状: {binary_image.shape}, 数据类型: {binary_image.dtype}") return binary_image, metadata except Exception as e: print(f"读取dat文件失败: {e}") import traceback traceback.print_exc() raise def load_regions_from_shp(shp_path, image_shape=None): """ 从shp文件加载区域信息并转换为二值图像 参数: - shp_path: shp文件路径 - image_shape: 目标图像形状 (height, width),用于创建二值图像 返回: - binary_image: 二值化图像数组 - metadata: 元数据字典 """ try: # 读取shp文件 gdf = gpd.read_file(shp_path) print(f"成功读取shp文件,包含 {len(gdf)} 个几何对象") if image_shape is None: # 如果没有指定图像形状,尝试从bounds推断 bounds = gdf.total_bounds # [minx, miny, maxx, maxy] # 这里需要根据实际坐标系进行转换,暂时使用简单的假设 height = int(bounds[3] - bounds[1]) width = int(bounds[2] - bounds[0]) if height <= 0 or width <= 0: raise ValueError("无法从shp文件推断图像尺寸,请提供image_shape参数") image_shape = (height, width) # 创建二值图像 binary_image = np.zeros(image_shape, dtype=np.uint8) # 将每个几何对象转换为像素坐标并填充 for idx, geom in enumerate(gdf.geometry): if geom is not None: # 这里需要坐标转换逻辑,暂时使用简化的实现 # 实际实现需要根据shp文件的坐标系进行准确转换 try: # 简化的几何转换(实际使用时需要更复杂的坐标变换) coords = np.array(geom.exterior.coords) if hasattr(geom, 'exterior') else np.array(geom.coords) # 归一化坐标到图像尺寸(这只是示例,实际需要正确的地理坐标转换) coords_normalized = coords.copy() if len(coords) > 0: # 创建标签图像,每个区域用不同的值标记 label_value = idx + 1 # 这里应该实现正确的几何到像素的转换 # 暂时用占位符实现 pass except Exception as e: print(f"处理几何对象 {idx} 时出错: {e}") continue # 如果没有有效的几何转换,创建一个示例图像 if binary_image.max() == 0: print("警告:未能从shp文件转换为有效的二值图像,使用示例数据") binary_image[100:200, 100:200] = 1 # 示例矩形区域 metadata = { 'shape': binary_image.shape, 'dtype': binary_image.dtype, 'source': 'shp', 'num_features': len(gdf) } return binary_image, metadata except Exception as e: print(f"读取shp文件失败: {e}") raise def apply_watershed_segmentation(binary_image, config): """ 使用分水岭算法分离相邻物体 参数: - binary_image: 二值化图像 - config: 配置对象 返回: - labeled_image: 分水岭分割后的标记图像 - num_objects: 物体数量 """ try: # 计算距离变换 distance = ndimage.distance_transform_edt(binary_image) # 找到局部最大值作为种子点 local_maxi = peak_local_max(distance, indices=False, footprint=np.ones((3, 3)), labels=binary_image, min_distance=config.watershed_min_distance) # 标记种子点 markers = ndimage.label(local_maxi)[0] # 应用分水岭算法 labels = watershed(-distance, markers, mask=binary_image) # 重新标记以确保连续的标签 unique_labels = np.unique(labels) unique_labels = unique_labels[unique_labels > 0] # 排除背景 relabeled_image = np.zeros_like(labels) for new_label, old_label in enumerate(unique_labels, 1): relabeled_image[labels == old_label] = new_label num_objects = len(unique_labels) print(f"分水岭分割完成,识别出 {num_objects} 个物体") return relabeled_image.astype(np.int32), num_objects except Exception as e: print(f"分水岭分割失败: {e}") raise def label_connected_components(binary_image, config): """ 标记连通域,可选择使用分水岭算法 参数: - binary_image: 二值化图像 - config: 配置对象 返回: - labeled_image: 标记后的图像 - num_objects: 物体数量 """ try: if config.use_watershed: # 使用分水岭算法 labeled_image, num_objects = apply_watershed_segmentation(binary_image, config) else: # 使用标准连通域标记 if config.connectivity == 8: structure = np.ones((3, 3), dtype=int) else: # connectivity == 4 structure = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]], dtype=int) labeled_image, num_objects = ndimage.label(binary_image, structure=structure) # 过滤小区域 if config.min_area > 1: component_sizes = np.bincount(labeled_image.ravel()) too_small = component_sizes < config.min_area too_small_mask = too_small[labeled_image] labeled_image[too_small_mask] = 0 # 重新标记以保持连续性 unique_labels = np.unique(labeled_image) unique_labels = unique_labels[unique_labels > 0] relabeled_image = np.zeros_like(labeled_image) for new_label, old_label in enumerate(unique_labels, 1): relabeled_image[labeled_image == old_label] = new_label labeled_image = relabeled_image num_objects = len(unique_labels) print(f"连通域标记完成,找到 {num_objects} 个有效区域") return labeled_image, num_objects except Exception as e: print(f"连通域标记失败: {e}") raise def calculate_shape_features(labeled_image, config): """ 计算所有形状特征指标 参数: - labeled_image: 标记后的图像 - config: 配置对象 返回: - features_df: 包含所有特征的DataFrame - contour_coords_dict: 轮廓坐标字典 """ try: # 获取区域属性 regions = measure.regionprops(labeled_image) if len(regions) == 0: print("警告:没有找到有效的区域") return pd.DataFrame(), {} features_list = [] contour_coords_dict = {} for region in regions: try: # 基本属性 label = region.label area = region.area perimeter = region.perimeter # 边界框和最小外接矩形 min_row, min_col, max_row, max_col = region.bbox bbox_width = max_col - min_col bbox_height = max_row - min_row bbox_area = bbox_width * bbox_height # 获取轮廓坐标 coords = region.coords # 像素坐标 # 计算各种形状特征 feature_dict = { 'label': label, 'area': area, 'perimeter': perimeter, 'bbox_width': bbox_width, 'bbox_height': bbox_height, 'bbox_area': bbox_area } # 1. 圆形度 (Circularity) = (4 * π * 面积) / (周长²) if perimeter > 0: circularity = (4 * np.pi * area) / (perimeter ** 2) else: circularity = 0 feature_dict['circularity'] = circularity # 2. 矩形度 (Rectangularity) = 面积 / 外接矩形面积 rectangularity = area / bbox_area if bbox_area > 0 else 0 feature_dict['rectangularity'] = rectangularity # 3. 长宽比 (Aspect Ratio) = 宽度 / 高度 aspect_ratio = bbox_width / bbox_height if bbox_height > 0 else 0 feature_dict['aspect_ratio'] = aspect_ratio # 4. 紧密度 (Compactness) = (周长²) / 面积 compactness = (perimeter ** 2) / area if area > 0 else 0 feature_dict['compactness'] = compactness # 5. 偏心率 (Eccentricity) - 使用regionprops的eccentricity eccentricity = region.eccentricity feature_dict['eccentricity'] = eccentricity # 6. 形状矩 (Shape Moments) - Hu矩 if hasattr(region, 'moments_hu'): hu_moments = region.moments_hu for i, hu_moment in enumerate(hu_moments): feature_dict[f'hu_moment_{i+1}'] = hu_moment else: # 如果没有Hu矩,计算为0 for i in range(7): feature_dict[f'hu_moment_{i+1}'] = 0 # 7. 凸性 (Convexity) = 面积 / 凸包面积 convex_area = region.convex_area convexity = area / convex_area if convex_area > 0 else 0 feature_dict['convexity'] = convexity # 8. 固体度 (Solidity) = 面积 / 凸包面积 (与凸性相同) solidity = convexity # 在scikit-image中,solidity就是这个定义 feature_dict['solidity'] = solidity # 9. 轮廓矩 (Contour Moments) - 使用中心矩 if hasattr(region, 'moments_central'): central_moments = region.moments_central feature_dict['central_moment_02'] = central_moments[0, 2] # μ02 feature_dict['central_moment_20'] = central_moments[2, 0] # μ20 feature_dict['central_moment_11'] = central_moments[1, 1] # μ11 else: feature_dict['central_moment_02'] = 0 feature_dict['central_moment_20'] = 0 feature_dict['central_moment_11'] = 0 # 10. 质心坐标 centroid_row, centroid_col = region.centroid feature_dict['centroid_row'] = centroid_row feature_dict['centroid_col'] = centroid_col # 11. 主要轴长度和角度 if hasattr(region, 'major_axis_length'): feature_dict['major_axis_length'] = region.major_axis_length feature_dict['minor_axis_length'] = region.minor_axis_length feature_dict['orientation'] = region.orientation else: feature_dict['major_axis_length'] = 0 feature_dict['minor_axis_length'] = 0 feature_dict['orientation'] = 0 # 12. 边界坐标序列(将保存到单独的JSON文件中) coords_list = coords.tolist() coord_key = f"region_{region.label}_coords" feature_dict['contour_coords'] = coord_key # 引用标识符 # 将轮廓坐标添加到字典中 contour_coords_dict[coord_key] = coords_list # 13. 其他有用的特征 feature_dict['equivalent_diameter'] = region.equivalent_diameter feature_dict['extent'] = region.extent # 面积与边界框面积之比 features_list.append(feature_dict) except Exception as e: print(f"计算区域 {region.label} 的特征时出错: {e}") continue # 创建DataFrame features_df = pd.DataFrame(features_list) print(f"成功计算 {len(features_df)} 个区域的形状特征") return features_df, contour_coords_dict except Exception as e: print(f"计算形状特征失败: {e}") raise def save_features_to_csv(features_df, output_path, config, contour_coords_dict=None): """ 将形状特征保存为CSV文件 参数: - features_df: 特征DataFrame - output_path: 输出文件路径(不含扩展名) - config: 配置对象 - contour_coords_dict: 轮廓坐标字典(可选) """ try: # 确保输出目录存在 output_dir = os.path.dirname(output_path) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir) # 如果有轮廓坐标数据,保存到单独的JSON文件 if contour_coords_dict is not None and len(contour_coords_dict) > 0: coords_json_path = f"{output_path}_contour_coords.json" with open(coords_json_path, 'w', encoding='utf-8') as f: json.dump(contour_coords_dict, f, indent=2, ensure_ascii=False) print(f"轮廓坐标已保存到: {coords_json_path}") # 保存为CSV文件 csv_path = f"{output_path}.csv" features_df.to_csv(csv_path, index=False, encoding='utf-8') print(f"形状特征已保存到: {csv_path}") print(f"共保存 {len(features_df)} 个区域的特征数据") # 打印特征统计信息 if len(features_df) > 0: numeric_columns = features_df.select_dtypes(include=[np.number]).columns print(f"数值特征列数: {len(numeric_columns)}") print(f"特征列: {list(features_df.columns)}") return csv_path except Exception as e: print(f"保存CSV文件失败: {e}") raise def save_labeled_image(labeled_image, output_path, config): """ 保存标记后的图像(可选) 参数: - labeled_image: 标记后的图像 - output_path: 输出文件路径(不含扩展名) - config: 配置对象 """ try: # 保存为PNG图像用于可视化 png_path = f"{output_path}_labeled.png" # 归一化标签图像以便可视化 if labeled_image.max() > 0: normalized_image = (labeled_image / labeled_image.max() * 255).astype(np.uint8) else: normalized_image = labeled_image.astype(np.uint8) # 创建彩色标签图像 colored_image = cv2.applyColorMap(normalized_image, cv2.COLORMAP_JET) # 保存图像 cv2.imwrite(png_path, colored_image) print(f"标记图像已保存到: {png_path}") # 同时保存为dat文件(ENVI格式) dat_path = f"{output_path}_labeled.dat" with open(dat_path, 'wb') as f: labeled_image.astype(np.int32).tofile(f) # 保存对应的hdr文件 hdr_path = f"{output_path}_labeled.hdr" header_content = f"""ENVI description = {{Labeled Image from Shape Analysis [Watershed: {config.use_watershed}]}} samples = {labeled_image.shape[1]} lines = {labeled_image.shape[0]} bands = 1 header offset = 0 file type = ENVI Standard data type = 3 interleave = bsq byte order = 0 band names = {{Labeled_Result}} classes = {labeled_image.max() + 1} class names = {{Background{' , '.join([f'Region_{i}' for i in range(1, labeled_image.max() + 1)])}}} """ with open(hdr_path, 'w', encoding='utf-8') as f: f.write(header_content) print(f"ENVI格式标记数据已保存到: {dat_path}") except Exception as e: print(f"保存标记图像失败: {e}") def analyze_shape_features(config): """ 执行形状特征分析的主函数 参数: - config: 配置对象 返回: - features_df: 特征DataFrame - labeled_image: 标记后的图像 """ try: print("=" * 60) print("开始形状特征分析") print("=" * 60) # 1. 加载数据 print("1. 加载输入数据...") if config.input_type == 'dat': binary_image, metadata = load_binary_image_from_dat( config.dat_file_path, config.hdr_file_path, config.band_index ) elif config.input_type == 'shp': binary_image, metadata = load_regions_from_shp( config.shp_file_path, image_shape=None ) else: raise ValueError(f"不支持的输入类型: {config.input_type}") print(f"数据加载完成,图像形状: {binary_image.shape}") binary_image = binary_image.squeeze() # 2. 连通域标记 print("2. 执行连通域标记...") labeled_image, num_objects = label_connected_components(binary_image, config) print(f"找到 {num_objects} 个连通域") if num_objects == 0: print("警告:没有找到有效的连通域") return pd.DataFrame(), labeled_image # 3. 计算形状特征 print("3. 计算形状特征...") features_df, contour_coords_dict = calculate_shape_features(labeled_image, config) if len(features_df) == 0: print("警告:未能计算出有效的形状特征") return features_df, labeled_image # 4. 保存结果 print("4. 保存分析结果...") # 生成输出文件名 timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S") base_name = f"shape_features_{config.input_type}" if config.input_type == 'dat': base_name += f"_band{config.band_index}" if config.use_watershed: base_name += "_watershed" base_name += f"_{timestamp}" output_path = os.path.join(config.output_dir, base_name) # 保存CSV文件 if config.output_csv: csv_path = save_features_to_csv(features_df, output_path, config, contour_coords_dict) # 保存标记图像 if config.save_labeled_image: save_labeled_image(labeled_image, output_path, config) print("=" * 60) print("形状特征分析完成!") print(f"分析了 {len(features_df)} 个区域") print(f"计算了 {len(features_df.columns)} 个特征指标") if config.output_csv: print(f"结果已保存到: {csv_path}") print("=" * 60) return features_df, labeled_image except Exception as e: print(f"形状特征分析失败: {e}") raise def main(): """主函数:命令行接口""" import argparse parser = argparse.ArgumentParser( description='形状特征分析工具 - 从分割结果提取形状特征', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: 1. 从dat文件分析形状特征: python shape_feature.py input.dat -t dat -b 0 -o shape_features 2. 从shp文件分析形状特征: python shape_feature.py input.shp -t shp -o shape_features 3. 启用分水岭算法分离物体: python shape_feature.py input.dat -t dat --use-watershed --watershed-min-distance 15 -o features 4. 自定义连通性和面积阈值: python shape_feature.py input.dat -t dat -c 4 --min-area 100 --save-labeled-image -o results 输入格式: - dat: ENVI格式的分割结果文件 (.dat + .hdr) - shp: Shapefile格式的矢量数据 形状特征包括: - 面积、周长、紧密度 - 圆度、矩形度、伸长度 - 质心坐标、边界框 - 方向角、主轴长度 - Hu矩不变量等几何特征 """ ) parser.add_argument('input_path', help='输入文件路径 (.dat分割结果 或 .shp矢量文件)') parser.add_argument('-t', '--input-type', choices=['dat', 'shp'], default='dat', help='输入文件类型 (默认: dat)') parser.add_argument('-o', '--output', required=True, help='输出文件路径前缀') # dat文件参数 parser.add_argument('-b', '--band-index', type=int, default=0, help='dat文件的波段索引 (默认: 0)') parser.add_argument('--hdr-path', help='dat文件对应的hdr文件路径 (默认自动查找)') # 连通域参数 parser.add_argument('-c', '--connectivity', type=int, choices=[4, 8], default=8, help='连通性,4或8 (默认: 8)') parser.add_argument('--min-area', type=int, default=10, help='最小区域面积阈值 (默认: 10)') # 分水岭参数 parser.add_argument('--use-watershed', action='store_true', help='启用分水岭算法分离相邻物体') parser.add_argument('--watershed-min-distance', type=int, default=10, help='分水岭局部最大值的最小距离 (默认: 10)') # 输出选项 parser.add_argument('--output-dir', default='output', help='输出目录 (默认: output)') parser.add_argument('--save-labeled-image', action='store_true', help='保存标记后的图像文件') args = parser.parse_args() try: print("=" * 70) print("形状特征分析工具") print("=" * 70) print(f"输入文件: {args.input_path}") print(f"输入类型: {args.input_type}") print(f"输出路径: {args.output}") print(f"输出目录: {args.output_dir}") if args.input_type == 'dat': print(f"波段索引: {args.band_index}") print(f"HDR文件: {args.hdr_path or '自动查找'}") print(f"连通性: {args.connectivity}") print(f"最小面积: {args.min_area}") print(f"使用分水岭: {args.use_watershed}") if args.use_watershed: print(f"分水岭最小距离: {args.watershed_min_distance}") print() # 创建配置 config = ShapeFeatureConfig() config.input_type = args.input_type config.output_dir = args.output_dir config.connectivity = args.connectivity config.min_area = args.min_area config.use_watershed = args.use_watershed config.watershed_min_distance = args.watershed_min_distance config.save_labeled_image = args.save_labeled_image # 设置输入文件路径 if args.input_type == 'dat': config.dat_file_path = args.input_path config.hdr_file_path = args.hdr_path config.band_index = args.band_index elif args.input_type == 'shp': config.shp_file_path = args.input_path # 验证配置 try: config.validate() print("配置验证通过") except Exception as e: print(f"✗ 配置验证失败: {e}") return 1 # 执行形状特征分析 print("\n开始形状特征分析...") features_df, labeled_image = analyze_shape_features(config) # 显示结果统计 if len(features_df) > 0: print("\n✓ 分析完成!") print(f"检测到的物体数量: {len(features_df)}") print(f"提取的特征数量: {len(features_df.columns)}") # 显示特征名称 feature_names = [col for col in features_df.columns if col not in ['label', 'centroid_row', 'centroid_col']] print(f"形状特征: {', '.join(feature_names[:10])}{'...' if len(feature_names) > 10 else ''}") # 显示统计信息 print("\n特征统计:") numeric_cols = features_df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: stats = features_df[numeric_cols].describe() print(f" 面积范围: {features_df['area'].min():.0f} - {features_df['area'].max():.0f} 像素") print(f" 平均面积: {features_df['area'].mean():.1f} 像素") print(f" 周长范围: {features_df['perimeter'].min():.1f} - {features_df['perimeter'].max():.1f}") print(f" 圆度范围: {features_df['circularity'].min():.3f} - {features_df['circularity'].max():.3f}") else: print("\n⚠ 未检测到任何物体,请检查输入数据或调整参数") print("\n" + "=" * 70) print("形状特征分析完成!") print("=" * 70) except Exception as e: print(f"✗ 处理失败: {e}") import traceback traceback.print_exc() return 1 return 0 if __name__ == '__main__': main()