#!/usr/bin/env python3 """ 高光谱图像地理变换工具 功能: - 读取地理校正后的高光谱航带图像 (dat格式,11个波段:前3个光谱波段、列号、行号、航带号、后5个光谱波段) - 读取原始高光谱图像 (bil格式) - 通过行列号进行地理变换匹配 - 保存变换后的高光谱图像 文件命名格式: - 地理校正文件:2025_9_2_3_53_45_202592_35252_0_rad_rgbxyz_geo1.dat - 原始文件:2025_9_2_3_53_45_202592_35252_0_rad.bil 依赖: - numpy - spectral (pip install spectral) - pathlib """ import numpy as np import os from pathlib import Path import re from typing import Tuple, Optional, List try: import spectral SPECTRAL_AVAILABLE = True except ImportError: SPECTRAL_AVAILABLE = False print("警告: spectral库不可用,请安装: pip install spectral") # 可选:GDAL库用于保存ENVI格式文件 try: from osgeo import gdal GDAL_AVAILABLE = True except ImportError: GDAL_AVAILABLE = False print("警告: GDAL不可用,将使用numpy保存") def _format_geo_info_for_envi(geo_value): """ 直接返回地理信息的原始格式(已经是从HDR文件直接复制的) """ if not geo_value: return None return geo_value def extract_file_key(filename: str) -> str: """ 从文件名中提取用于匹配的关键字 匹配规则:按"2025_9_2_3_53_45_202592_35252_0"这一部分完全相同进行匹配 即匹配模式为:数字_数字_..._数字_rad 示例: "2025_9_2_3_53_45_202592_35252_2_rad_rgbxyz_geo1.dat" -> "2025_9_2_3_53_45_202592_35252_2" "2025_9_2_3_53_45_202592_35252_9_rad.bil" -> "2025_9_2_3_53_45_202592_35252_9" """ # 提取_rad之前的部分作为匹配键 if '_rad' in filename: key = filename.split('_rad')[0] return key else: # 如果没有_rad,尝试其他方式 return filename def load_geo_corrected_dat(dat_file: str) -> Tuple[np.ndarray, dict]: """ 读取地理校正后的dat文件 dat文件包含11个波段的原始数据: - 波段0-2: 前三个光谱波段 - 波段3: 列号 - 波段4: 行号 - 波段5: 航带号 - 波段6-10: 后五个光谱波段 使用spectral库读取dat文件 参数: ----------- dat_file : str dat文件路径 返回: ----------- data : np.ndarray 图像数据 (pixels, 11),每个像素11个波段 metadata : dict 元数据信息 """ if not SPECTRAL_AVAILABLE: raise RuntimeError("需要spectral库来读取dat文件,请安装: pip install spectral") try: # 查找对应的头文件 dat_path = Path(dat_file) hdr_file = dat_path.with_suffix('.hdr') if not hdr_file.exists(): # 首先读取数据大小来推断图像尺寸 data_temp = np.fromfile(dat_file, dtype=np.float32) if data_temp.size % 11 != 0: raise ValueError(f"数据大小 {data_temp.size} 不能被11整除,可能不是正确的dat文件格式") num_pixels = data_temp.size // 11 # 尝试推断图像尺寸 possible_rows = [] for rows in range(1000, int(np.sqrt(num_pixels)) + 1000, 100): if num_pixels % rows == 0: cols = num_pixels // rows if cols > 0 and cols < 10000: possible_rows.append((rows, cols)) if not possible_rows: rows = int(np.sqrt(num_pixels)) cols = (num_pixels + rows - 1) // rows else: rows, cols = possible_rows[0] # 创建临时的头文件 hdr_content = f"""ENVI description = {{ 高光谱地理校正数据 - 11波段格式 前3个光谱波段、列号、行号、航带号、后5个光谱波段}} samples = {cols} lines = {rows} bands = 11 header offset = 0 file type = ENVI Standard data type = 4 interleave = bip byte order = 0 band names = {{ 波段1, 波段2, 波段3, 列号, 行号, 航带号, 波段7, 波段8, 波段9, 波段10, 波段11}} """ with open(hdr_file, 'w', encoding='utf-8') as f: f.write(hdr_content) # 使用spectral读取 image_data = spectral.open_image(str(hdr_file)) data = image_data.load() # 如果数据是3D的,需要重新整形为2D (pixels, bands) if data.ndim == 3: data_reshaped = data.reshape(-1, data.shape[2]) else: data_reshaped = data # 确保是11波段 if data_reshaped.shape[1] != 11: raise ValueError(f"数据波段数 {data_reshaped.shape[1]} 不等于期望的11波段") # 检查坐标是否全部为0(第4、5波段为列号、行号) all_coords_zero = (data_reshaped[:, 3].max() == 0 and data_reshaped[:, 4].max() == 0) # 如果坐标都为0,显示警告 if all_coords_zero: print(f"警告: dat文件中的所有行列号都是0,可能需要重新解释数据格式") # 读取HDR文件中的地理信息(直接复制原始格式) geo_info = {} try: # 直接读取HDR文件内容,保留原始格式 with open(str(hdr_file), 'r', encoding='utf-8', errors='ignore') as f: hdr_content = f.read() # 提取map info行 import re map_info_match = re.search(r'^map info\s*=\s*(.+)$', hdr_content, re.MULTILINE | re.IGNORECASE) if map_info_match: geo_info['map_info'] = map_info_match.group(1).strip() # 提取coordinate system string行 coord_sys_match = re.search(r'^coordinate system string\s*=\s*(.+)$', hdr_content, re.MULTILINE | re.IGNORECASE) if coord_sys_match: geo_info['coordinate_system_string'] = coord_sys_match.group(1).strip() # 提取projection info行(如果有) proj_info_match = re.search(r'^projection info\s*=\s*(.+)$', hdr_content, re.MULTILINE | re.IGNORECASE) if proj_info_match: geo_info['projection_info'] = proj_info_match.group(1).strip() except Exception as e: print(f"警告: 无法读取geo文件的HDR地理信息: {e}") metadata = { 'file_path': dat_file, 'hdr_file': str(hdr_file), 'data_type': str(data_reshaped.dtype), 'bands': 11, 'pixels': len(data_reshaped), 'lines': image_data.nrows, 'samples': image_data.ncols, 'band_names': ['band1', 'band2', 'band3', 'column', 'row', 'strip', 'band7', 'band8', 'band9', 'band10', 'band11'], 'col_idx': 3, 'row_idx': 4, 'strip_idx': 5, 'all_coords_zero': all_coords_zero, 'wavelengths': getattr(image_data.bands, 'centers', None), 'interleave': getattr(image_data, 'interleave', 'unknown'), 'geo_info': geo_info } return data_reshaped, metadata except Exception as e: print(f"spectral读取失败: {e}") print("回退到numpy直接读取...") # 备用方案:使用numpy直接读取 try: data = np.fromfile(dat_file, dtype=np.float32) if data.size % 11 != 0: raise ValueError(f"数据大小 {data.size} 不能被11整除,可能不是正确的dat文件格式") num_pixels = data.size // 11 data_reshaped = data.reshape(num_pixels, 11) print(f"numpy直接读取成功: {dat_file}") print(f"数据形状: {data_reshaped.shape} (pixels, bands)") # 分析坐标信息 all_coords_zero = (data_reshaped[:, 3].max() == 0 and data_reshaped[:, 4].max() == 0) metadata = { 'file_path': dat_file, 'data_type': 'float32', 'bands': 11, 'pixels': num_pixels, 'band_names': ['band1', 'band2', 'band3', 'column', 'row', 'strip', 'band7', 'band8', 'band9', 'band10', 'band11'], 'col_idx': 3, 'row_idx': 4, 'strip_idx': 5, 'all_coords_zero': all_coords_zero } return data_reshaped, metadata except Exception as e2: raise RuntimeError(f"读取dat文件失败: spectral错误: {e}, numpy错误: {e2}") def load_original_bil(bil_file: str) -> Tuple[np.ndarray, dict]: """ 读取原始高光谱bil文件 使用spectral库通过hdr头文件读取ENVI格式的高光谱数据 参数: ----------- bil_file : str bil文件路径 返回: ----------- data : np.ndarray 高光谱数据 (lines, samples, bands) metadata : dict 元数据信息 """ if not SPECTRAL_AVAILABLE: raise RuntimeError("需要spectral库来读取bil文件,请安装: pip install spectral") try: # 查找对应的头文件 bil_path = Path(bil_file) hdr_file = bil_path.with_suffix('.hdr') if not hdr_file.exists(): hdr_file = bil_path.parent / f"{bil_path.name}.hdr" if not hdr_file.exists(): hdr_file = bil_path.parent / f"{bil_path.stem}.hdr" if not hdr_file.exists(): possible_hdrs = list(bil_path.parent.glob(f"{bil_path.stem}*.hdr")) if possible_hdrs: hdr_file = possible_hdrs[0] else: raise FileNotFoundError(f"未找到对应的头文件") # 使用spectral读取ENVI格式文件 image_data = spectral.open_image(str(hdr_file)) data = image_data.load() # 获取元数据 metadata = { 'file_path': bil_file, 'hdr_file': str(hdr_file), 'lines': image_data.nrows, 'samples': image_data.ncols, 'bands': image_data.nbands, 'wavelengths': image_data.bands.centers if hasattr(image_data.bands, 'centers') else None, 'data_type': str(data.dtype), 'interleave': getattr(image_data, 'interleave', 'unknown') } return data, metadata except Exception as e: raise RuntimeError(f"读取高光谱文件失败: {bil_file}, 错误: {e}") def perform_geometric_transform(original_data: np.ndarray, geo_data: np.ndarray, geo_metadata: dict, output_shape: Optional[Tuple[int, int]] = None, chunk_size: Optional[int] = None) -> np.ndarray: """ 向量化版本的地理变换 核心逻辑: 1. geo_data[:, 3]和geo_data[:, 4]是原始bil文件中的列号和行号(geo从0开始,bil从1开始,需减1) 2. geo像素中"非地理参考"的波段(前3个和后5个)不全为0的像素才需要替换(输出保持0) 3. 输出尺寸为geo图像的尺寸(geo_metadata['lines'], geo_metadata['samples']) 4. 支持可选分块以降低峰值内存 参数: ----------- original_data : np.ndarray 原始高光谱数据 (orig_lines, orig_samples, bands) geo_data : np.ndarray 地理校正数据 (pixels, 11) - geo图像中所有像素的序列 geo_metadata : dict 地理校正数据的元数据 output_shape : tuple, optional 输出图像的形状 (lines, samples),如果不指定则尝试从geo_metadata推断 chunk_size : int, optional 分块处理时的块大小,默认一次性处理所有像素 返回: ----------- transformed_data : np.ndarray 变换后的高光谱数据,尺寸为geo图像的尺寸 """ try: # 原始BIL尺寸 orig_lines, orig_samples, bands = original_data.shape # 输出尺寸:使用geo尺寸 if output_shape is None: if 'lines' in geo_metadata and 'samples' in geo_metadata: lines_out, samples_out = int(geo_metadata['lines']), int(geo_metadata['samples']) else: # 兜底:按geo展平像素数推断一个尽量方的尺寸 total = len(geo_data) lines_out = int(np.sqrt(total)) samples_out = (total + lines_out - 1) // lines_out else: lines_out, samples_out = output_shape # 验证输出图像尺寸是否与geo_data像素数量匹配 expected_pixels = lines_out * samples_out if expected_pixels != len(geo_data): print(f"警告: 输出图像尺寸({lines_out}x{samples_out}={expected_pixels})与geo数据像素数({len(geo_data)})不匹配") print("将调整输出尺寸以匹配geo数据像素数") lines_out = len(geo_data) // samples_out if lines_out * samples_out < len(geo_data): lines_out += 1 # 预分配输出(忽略值=0) out = np.zeros((lines_out, samples_out, bands), dtype=original_data.dtype) print(f"开始地理变换: 原始bil尺寸 {orig_lines}x{orig_samples}x{bands}") print(f"输出geo图像尺寸: {lines_out}x{samples_out}x{bands}") print(f"处理 {len(geo_data)} 个geo像素") # 获取坐标索引(从metadata中读取,默认为3、4、5) col_idx = geo_metadata.get('col_idx', 3) row_idx = geo_metadata.get('row_idx', 4) # 有效掩码: # 1) geo像素的"非地理参考"波段(前col_idx个和后面的)不全为0 # 对于11波段,检查波段0-2和6-10是否不全为0 non_geo_bands = np.concatenate([np.arange(col_idx), np.arange(col_idx+3, geo_data.shape[1])]) non_zero_mask = ~(np.all(geo_data[:, non_geo_bands] == 0, axis=1)) # 2) 索引合法(geo从0,BIL从1,因此索引时要减1) src_cols = geo_data[:, col_idx].astype(np.int64) - 1 src_rows = geo_data[:, row_idx].astype(np.int64) - 1 in_bounds = (src_rows >= 0) & (src_rows < orig_lines) & (src_cols >= 0) & (src_cols < orig_samples) valid = non_zero_mask & in_bounds if not np.any(valid): print("地理变换完成: 没有有效像素需要处理") return out # 有效条目 valid_idx = np.flatnonzero(valid) # 计算原始BIL扁平索引,并批量取光谱 src_rows_v = src_rows[valid] src_cols_v = src_cols[valid] src_lin = src_rows_v * orig_samples + src_cols_v # (N_valid,) orig_flat = original_data.reshape(-1, bands) if chunk_size is None or len(valid_idx) <= chunk_size: # 一次性 spectra = orig_flat[src_lin] # (N_valid, bands) # 计算输出位置(geo是按行优先展平) out_rows = valid_idx // samples_out out_cols = valid_idx % samples_out out[out_rows, out_cols, :] = spectra else: # 分块,降低峰值内存 for s in range(0, len(valid_idx), chunk_size): e = min(s + chunk_size, len(valid_idx)) src_lin_chunk = src_lin[s:e] spectra = orig_flat[src_lin_chunk] idx_chunk = valid_idx[s:e] out_rows = idx_chunk // samples_out out_cols = idx_chunk % samples_out out[out_rows, out_cols, :] = spectra valid_count = len(valid_idx) skipped_count = len(geo_data) - valid_count print(f"地理变换完成: 成功处理 {valid_count} 个像素,跳过 {skipped_count} 个像素(无效或全0)") return out except Exception as e: raise RuntimeError(f"地理变换失败: {e}") def fill_nan_with_nearest(data: np.ndarray) -> np.ndarray: """ 用最近邻值填充NaN值 参数: ----------- data : np.ndarray 包含NaN的数组 返回: ----------- filled_data : np.ndarray 填充后的数组 """ # 简单实现:使用前向填充 filled_data = data.copy() # 对于每个波段 for band in range(data.shape[2]): band_data = data[:, :, band] # 找到非NaN值的掩码 valid_mask = ~np.isnan(band_data) if np.any(valid_mask): # 使用最近邻插值(这里使用简单的行方向填充) for i in range(band_data.shape[0]): row_data = band_data[i, :] valid_indices = np.where(~np.isnan(row_data))[0] if len(valid_indices) > 0: # 对每一行,用有效值填充 for j in range(len(row_data)): if np.isnan(row_data[j]): # 找到最近的有效值 distances = np.abs(valid_indices - j) nearest_idx = valid_indices[np.argmin(distances)] row_data[j] = row_data[nearest_idx] filled_data[:, :, band] = band_data return filled_data def save_transformed_data(data: np.ndarray, output_file: str, wavelengths: Optional[np.ndarray] = None, geo_info: Optional[dict] = None): """ 保存变换后的高光谱数据为ENVI BIL格式 参数: ----------- data : np.ndarray 要保存的数据 (lines, samples, bands) output_file : str 输出文件路径 (.bil) wavelengths : np.ndarray, optional 波长信息 """ lines, samples, bands = data.shape # 确保输出目录存在 output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) # 输出文件路径 bil_file = str(output_path.with_suffix('.dat')) hdr_file = str(output_path.with_suffix('.hdr')) try: if GDAL_AVAILABLE: # 使用GDAL保存ENVI格式文件 save_with_gdal_envi(data, bil_file, wavelengths, geo_info) else: # 回退到numpy保存 save_with_numpy_envi(data, bil_file, hdr_file, wavelengths, geo_info) print(f"✅ 成功保存地理变换结果:") print(f" 数据文件: {bil_file}") print(f" 头文件: {hdr_file}") print(f" 数据尺寸: {lines} x {samples} x {bands}") print(f" 保存方式: {'GDAL' if GDAL_AVAILABLE else 'NumPy'}") except Exception as e: raise RuntimeError(f"保存文件失败: {output_file}, 错误: {e}") def save_with_gdal_envi(data: np.ndarray, bil_file: str, wavelengths: Optional[np.ndarray] = None, geo_info: Optional[dict] = None): """ 使用GDAL保存ENVI BIL格式文件 """ lines, samples, bands = data.shape hdr_file = bil_file.replace('.dat', '.hdr') # 创建GDAL ENVI驱动 driver = gdal.GetDriverByName('ENVI') # 创建数据集,GDAL ENVI默认使用BSQ格式,data type = 12 (uint16) dataset = driver.Create(bil_file, samples, lines, bands, gdal.GDT_UInt16, options=['INTERLEAVE=BSQ']) if dataset is None: raise RuntimeError(f"无法创建ENVI数据集: {bil_file}") try: # 设置元数据 metadata = dataset.GetMetadata() metadata['DESCRIPTION'] = 'Geometrically transformed hyperspectral data' metadata['SENSOR_TYPE'] = 'Hyperspectral' metadata['DATA_UNITS'] = 'Reflectance' metadata['PROCESSING_ALGORITHM'] = 'Geometric Transformation' metadata['CREATION_DATE'] = str(np.datetime64('now')) # 添加波长信息到元数据 if wavelengths is not None and len(wavelengths) == bands: metadata['wavelength_units'] = 'nm' for i, wl in enumerate(wavelengths): metadata[f'wavelength_{i+1}'] = str(wl) dataset.SetMetadata(metadata) # 写入数据 for band_idx in range(bands): band = dataset.GetRasterBand(band_idx + 1) band_data = data[:, :, band_idx].astype(np.float32) band.WriteArray(band_data) band.SetNoDataValue(0.0) # 设置NoData值 # 设置波段描述 if wavelengths is not None and band_idx < len(wavelengths): band.SetDescription(f'{wavelengths[band_idx]:.1f} nm') else: band.SetDescription(f'Band {band_idx + 1}') finally: # 关闭数据集 dataset = None # 在数据集完全关闭后,覆盖GDAL自动创建的HDR文件 import time time.sleep(0.1) # 短暂等待确保GDAL完成写入 create_envi_header(hdr_file, lines, samples, bands, wavelengths, geo_info) def save_with_numpy_envi(data: np.ndarray, bil_file: str, hdr_file: str, wavelengths: Optional[np.ndarray] = None, geo_info: Optional[dict] = None): """ 使用numpy保存ENVI BSQ格式文件(GDAL不可用时的回退方案) """ lines, samples, bands = data.shape # 保存二进制数据 - BSQ格式:按波段顺序存储,data type = 12 (uint16) with open(bil_file, 'wb') as f: # BSQ格式:对于每个波段,存储所有像素 for band_idx in range(bands): # 转换为uint16,裁剪到有效范围 band_data = np.clip(data[:, :, band_idx], 0, 65535).astype(np.uint16) band_data.tofile(f) # 创建HDR头文件 create_envi_header(hdr_file, lines, samples, bands, wavelengths, geo_info) def create_envi_header(hdr_file: str, lines: int, samples: int, bands: int, wavelengths: Optional[np.ndarray] = None, geo_info: Optional[dict] = None): """ 创建ENVI格式的HDR头文件 """ with open(hdr_file, 'w', encoding='utf-8') as f: f.write("ENVI\n") f.write("description = {\n") f.write(" Geometrically transformed hyperspectral data\n") f.write(" Processed with Python geometric transformation}\n") f.write(f"samples = {samples}\n") f.write(f"lines = {lines}\n") f.write(f"bands = {bands}\n") f.write("header offset = 0\n") f.write("file type = ENVI Standard\n") f.write("data type = 12\n") # uint16 f.write("interleave = bsq\n") f.write("sensor type = Hyperspectral\n") f.write("byte order = 0\n") # little-endian f.write("data ignore value = 0\n") # 添加地理参考信息(直接使用从原始HDR复制的格式) if geo_info: if geo_info.get('map_info'): f.write(f"map info = {geo_info['map_info']}\n") if geo_info.get('coordinate_system_string'): f.write(f"coordinate system string = {geo_info['coordinate_system_string']}\n") if geo_info.get('projection_info'): f.write(f"projection info = {geo_info['projection_info']}\n") # 添加波长信息 if wavelengths is not None and len(wavelengths) == bands: f.write("wavelength units = nm\n") f.write("wavelength = { ") for i, wl in enumerate(wavelengths): f.write(f"{wl}") if i < len(wavelengths) - 1: f.write(",") f.write(" }\n") def process_file_pair(geo_dat_file: str, bil_file: str, output_dir: str) -> bool: """ 处理一对匹配的文件 参数: ----------- geo_dat_file : str 地理校正dat文件路径 bil_file : str 原始bil文件路径 output_dir : str 输出目录 返回: ----------- success : bool 处理是否成功 """ try: # 读取地理校正数据 geo_data, geo_metadata = load_geo_corrected_dat(geo_dat_file) # 读取原始高光谱数据 original_data, orig_metadata = load_original_bil(bil_file) # 执行地理变换 transformed_data = perform_geometric_transform(original_data, geo_data, geo_metadata) # 生成输出文件名 bil_name = Path(bil_file).stem output_file = Path(output_dir) / f"{bil_name}_geo_corrected.dat" # 保存结果 wavelengths = orig_metadata.get('wavelengths') geo_info = geo_metadata.get('geo_info') save_transformed_data(transformed_data, str(output_file), wavelengths, geo_info) return True except Exception as e: print(f"处理失败: {e}") return False def find_matching_files(geo_dir: str, bil_dir: str) -> List[Tuple[str, str]]: """ 查找匹配的文件对 参数: ----------- geo_dir : str 地理校正文件目录 bil_dir : str 原始bil文件目录 返回: ----------- file_pairs : List[Tuple[str, str]] 匹配的文件对列表 [(geo_file, bil_file), ...] """ geo_files = {} bil_files = {} # 收集地理校正文件 for file_path in Path(geo_dir).glob('*_rad_rgbxyz_geo_angles_registered.bip'): key = extract_file_key(file_path.name) geo_files[key] = str(file_path) # 收集bil文件 for file_path in Path(bil_dir).glob('*_rad.bil'): key = extract_file_key(file_path.name) bil_files[key] = str(file_path) # 找到匹配的对 matching_pairs = [] for key in geo_files.keys(): if key in bil_files: matching_pairs.append((geo_files[key], bil_files[key])) return matching_pairs def batch_process(geo_dir: str, bil_dir: str, output_dir: str) -> dict: """ 批量处理所有匹配的文件对 参数: ----------- geo_dir : str 地理校正文件目录 bil_dir : str 原始bil文件目录 output_dir : str 输出目录 返回: ----------- results : dict 处理结果统计 """ # 确保输出目录存在 Path(output_dir).mkdir(parents=True, exist_ok=True) # 查找匹配的文件对 file_pairs = find_matching_files(geo_dir, bil_dir) if not file_pairs: return {'total': 0, 'success': 0, 'failed': 0} # 处理每一对文件 results = {'total': len(file_pairs), 'success': 0, 'failed': 0} for geo_file, bil_file in file_pairs: success = process_file_pair(geo_file, bil_file, output_dir) if success: results['success'] += 1 else: results['failed'] += 1 return results def main(): """ 主函数 - 示例用法 """ # 示例路径(需要根据实际情况修改) geo_corrected_dir = r"D:\BaiduNetdiskDownload\20250902\_3_52_52\316\jiaozhen" # 地理校正dat文件目录 original_bil_dir = r"D:\BaiduNetdiskDownload\20250902\_3_52_52\Geoout\Radout" # 原始bil文件目录 output_dir = r"D:\BaiduNetdiskDownload\20250902\_3_52_52\316\cube" # 输出目录 # 检查依赖 if not SPECTRAL_AVAILABLE: print("错误: 需要安装spectral库") return # 批量处理 results = batch_process(geo_corrected_dir, original_bil_dir, output_dir) if results['success'] > 0: print(f"处理完成!成功处理了 {results['success']} 对文件") else: print("未成功处理任何文件,请检查文件路径和格式") if __name__ == "__main__": main()