Files
hyperspectral-hongshengrese…/Gen1HyperSpecReflExporter.py
2025-07-22 10:29:43 +08:00

198 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import os
import glob
import math
import argparse
import sys
def process_spectral_data(input_file):
# 读取整个CSV文件
with open(input_file, 'r') as f:
reader = csv.reader(f)
all_rows = list(reader)
# 存储最终结果
file_results = []
wavelength_header = None
index = 0
total_rows = len(all_rows)
group_count = 0
valid_group_count = 0
# 遍历所有行
while index < total_rows:
group_count += 1
# 检查是否有足够行构成一个数据组8行
if index + 8 > total_rows:
break
# 提取数据组的8行
group = all_rows[index:index + 8]
# 跳过数据组之间的3行
index += 11
# 验证数据组结构
if len(group) < 6:
continue
try:
# 提取时间戳(第一行第四列)
timestamp = group[0][3] # 索引3 = 第四列
# 提取波长(第四行)
wavelengths = [float(x) for x in group[3][1:-1] if x.strip()]
# 提取上行RAD值第五行
up_rad = [float(x) for x in group[6][1:-1] if x.strip()]
# 提取下行RAD值第六行
down_rad = [float(x) for x in group[7][1:-1] if x.strip()]
# 验证数据长度
if not (len(wavelengths) == len(up_rad) == len(down_rad)):
print(f"警告: 在文件 {os.path.basename(input_file)} 的时间点 {timestamp} 中, "
f"波长({len(wavelengths)})、上行({len(up_rad)})、下行({len(down_rad)})数据长度不一致")
continue
# 处理光谱漂移:
# 1. 下行rad值向前移动一位舍弃第一个值
# 2. 舍弃最后一个波长值
down_rad_shifted = down_rad[1:]
valid_wavelengths = wavelengths[:-1]
valid_up_rad = up_rad[:-1]
# 计算反射率上行DN / 下行DN移动后
# 增加分母为零的保护
reflectance = []
zero_denominator_count = 0
for u, d in zip(valid_up_rad, down_rad_shifted):
# 检查分母是否为零或非常小
if u > 1e-64: # 避免除以零或接近零的值
reflectance.append(d / u)
else:
# 分母无效时使用特殊值NaN或-9999
reflectance.append(float('0'))
zero_denominator_count += 1
# 记录分母为零的警告
if zero_denominator_count > 0:
print(f"警告: 在文件 {os.path.basename(input_file)} 的时间点 {timestamp} 中, "
f"{zero_denominator_count} 个波段的分母为零或接近零")
# 设置波长表头(仅第一次)
if wavelength_header is None:
wavelength_header = valid_wavelengths
# 添加结果行:时间戳 + 反射率数据
file_results.append([timestamp] + reflectance)
valid_group_count += 1
except (IndexError, ValueError) as e:
print(f"处理文件 {os.path.basename(input_file)} 时出错: {e}")
continue
return file_results, wavelength_header, group_count, valid_group_count
def process_spectral_folder(input_folder, output_file):
"""处理整个文件夹内的CSV文件"""
# 查找所有CSV文件
csv_files = glob.glob(os.path.join(input_folder, "*.csv"))
if not csv_files:
print(f"在文件夹 {input_folder} 中未找到CSV文件")
return False
print(f"找到 {len(csv_files)} 个CSV文件待处理")
# 存储所有结果
all_results = []
global_wavelength_header = None
total_groups = 0
total_valid_groups = 0
processed_files = 0
# 处理每个文件
for i, csv_file in enumerate(csv_files):
print(f"处理文件 {i + 1}/{len(csv_files)}: {os.path.basename(csv_file)}")
file_results, wavelength_header, group_count, valid_group_count = process_spectral_data(csv_file)
total_groups += group_count
total_valid_groups += valid_group_count
processed_files += 1
if file_results:
# 如果是第一个有效文件,设置全局波长表头
if global_wavelength_header is None and wavelength_header is not None:
global_wavelength_header = wavelength_header
all_results.append(['Timestamp'] + global_wavelength_header)
# 添加文件结果
all_results.extend(file_results)
else:
print(f"警告: 文件 {os.path.basename(csv_file)} 未提取到有效数据")
# 如果没有有效数据,提前退出
if not all_results:
print("错误: 未提取到任何有效数据,请检查输入文件格式")
return False
# 确保输出目录存在
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# 写入输出文件
try:
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerows(all_results)
except Exception as e:
print(f"写入输出文件时出错: {e}")
return False
# 打印汇总信息
print(f"\n处理完成! 共处理 {processed_files}/{len(csv_files)} 个文件")
print(f"发现 {total_groups} 个数据组, 成功提取 {total_valid_groups} 个有效数据组")
print(f"反射率数据已保存至: {output_file}")
return True
def main():
# 设置命令行参数解析
parser = argparse.ArgumentParser(
description='高光谱数据处理工具: 从原始传感器数据提取反射率并导出为CSV',
epilog='示例: python hyperspec_export.py -i "C:/input/data" -o "C:/output/reflectance.csv"'
)
parser.add_argument('-i', '--input', required=True,
help='输入文件夹路径包含原始高光谱CSV文件')
parser.add_argument('-o', '--output', required=True,
help='输出文件路径CSV格式')
# 解析参数
args = parser.parse_args()
# 检查输入路径是否存在
if not os.path.exists(args.input):
print(f"错误: 输入路径 '{args.input}' 不存在")
sys.exit(1)
# 检查输入路径是否是目录
if not os.path.isdir(args.input):
print(f"错误: '{args.input}' 不是有效的文件夹路径")
sys.exit(1)
# 处理文件夹
success = process_spectral_folder(args.input, args.output)
if not success:
print("处理过程中出现错误,请检查日志")
sys.exit(1)
if __name__ == "__main__":
main()