import csv import os import glob import math import argparse import sys def process_spectral_data(input_file): # 读取整个CSV文件 with open(input_file, 'r') as f: reader = csv.reader(f) all_rows = list(reader) # 存储最终结果 file_results = [] wavelength_header = None index = 0 total_rows = len(all_rows) group_count = 0 valid_group_count = 0 # 遍历所有行 while index < total_rows: group_count += 1 # 检查是否有足够行构成一个数据组(8行) if index + 8 > total_rows: break # 提取数据组的8行 group = all_rows[index:index + 8] # 跳过数据组之间的3行 index += 11 # 验证数据组结构 if len(group) < 6: continue try: # 提取时间戳(第一行第四列) timestamp = group[0][3] # 索引3 = 第四列 # 提取波长(第四行) wavelengths = [float(x) for x in group[3][1:-1] if x.strip()] # 提取上行RAD值(第五行) up_rad = [float(x) for x in group[6][1:-1] if x.strip()] # 提取下行RAD值(第六行) down_rad = [float(x) for x in group[7][1:-1] if x.strip()] # 验证数据长度 if not (len(wavelengths) == len(up_rad) == len(down_rad)): print(f"警告: 在文件 {os.path.basename(input_file)} 的时间点 {timestamp} 中, " f"波长({len(wavelengths)})、上行({len(up_rad)})、下行({len(down_rad)})数据长度不一致") continue # 处理光谱漂移: # 1. 下行rad值向前移动一位(舍弃第一个值) # 2. 舍弃最后一个波长值 down_rad_shifted = down_rad[1:] valid_wavelengths = wavelengths[:-1] valid_up_rad = up_rad[:-1] # 计算反射率:上行DN / 下行DN(移动后) # 增加分母为零的保护 reflectance = [] zero_denominator_count = 0 for u, d in zip(valid_up_rad, down_rad_shifted): # 检查分母是否为零或非常小 if u > 1e-64: # 避免除以零或接近零的值 reflectance.append(d / u) else: # 分母无效时使用特殊值(NaN或-9999) reflectance.append(float('0')) zero_denominator_count += 1 # 记录分母为零的警告 if zero_denominator_count > 0: print(f"警告: 在文件 {os.path.basename(input_file)} 的时间点 {timestamp} 中, " f"有 {zero_denominator_count} 个波段的分母为零或接近零") # 设置波长表头(仅第一次) if wavelength_header is None: wavelength_header = valid_wavelengths # 添加结果行:时间戳 + 反射率数据 file_results.append([timestamp] + reflectance) valid_group_count += 1 except (IndexError, ValueError) as e: print(f"处理文件 {os.path.basename(input_file)} 时出错: {e}") continue return file_results, wavelength_header, group_count, valid_group_count def process_spectral_folder(input_folder, output_file): """处理整个文件夹内的CSV文件""" # 查找所有CSV文件 csv_files = glob.glob(os.path.join(input_folder, "*.csv")) if not csv_files: print(f"在文件夹 {input_folder} 中未找到CSV文件") return False print(f"找到 {len(csv_files)} 个CSV文件待处理") # 存储所有结果 all_results = [] global_wavelength_header = None total_groups = 0 total_valid_groups = 0 processed_files = 0 # 处理每个文件 for i, csv_file in enumerate(csv_files): print(f"处理文件 {i + 1}/{len(csv_files)}: {os.path.basename(csv_file)}") file_results, wavelength_header, group_count, valid_group_count = process_spectral_data(csv_file) total_groups += group_count total_valid_groups += valid_group_count processed_files += 1 if file_results: # 如果是第一个有效文件,设置全局波长表头 if global_wavelength_header is None and wavelength_header is not None: global_wavelength_header = wavelength_header all_results.append(['Timestamp'] + global_wavelength_header) # 添加文件结果 all_results.extend(file_results) else: print(f"警告: 文件 {os.path.basename(csv_file)} 未提取到有效数据") # 如果没有有效数据,提前退出 if not all_results: print("错误: 未提取到任何有效数据,请检查输入文件格式") return False # 确保输出目录存在 output_dir = os.path.dirname(output_file) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir) # 写入输出文件 try: with open(output_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerows(all_results) except Exception as e: print(f"写入输出文件时出错: {e}") return False # 打印汇总信息 print(f"\n处理完成! 共处理 {processed_files}/{len(csv_files)} 个文件") print(f"发现 {total_groups} 个数据组, 成功提取 {total_valid_groups} 个有效数据组") print(f"反射率数据已保存至: {output_file}") return True def main(): # 设置命令行参数解析 parser = argparse.ArgumentParser( description='高光谱数据处理工具: 从原始传感器数据提取反射率并导出为CSV', epilog='示例: python hyperspec_export.py -i "C:/input/data" -o "C:/output/reflectance.csv"' ) parser.add_argument('-i', '--input', required=True, help='输入文件夹路径,包含原始高光谱CSV文件') parser.add_argument('-o', '--output', required=True, help='输出文件路径(CSV格式)') # 解析参数 args = parser.parse_args() # 检查输入路径是否存在 if not os.path.exists(args.input): print(f"错误: 输入路径 '{args.input}' 不存在") sys.exit(1) # 检查输入路径是否是目录 if not os.path.isdir(args.input): print(f"错误: '{args.input}' 不是有效的文件夹路径") sys.exit(1) # 处理文件夹 success = process_spectral_folder(args.input, args.output) if not success: print("处理过程中出现错误,请检查日志") sys.exit(1) if __name__ == "__main__": main()