上传文件至 /

This commit is contained in:
2025-07-22 10:29:43 +08:00
parent fe1bd426fa
commit 49c170c178
2 changed files with 349 additions and 0 deletions

View File

@ -0,0 +1,198 @@
import csv
import os
import glob
import math
import argparse
import sys
def process_spectral_data(input_file):
# 读取整个CSV文件
with open(input_file, 'r') as f:
reader = csv.reader(f)
all_rows = list(reader)
# 存储最终结果
file_results = []
wavelength_header = None
index = 0
total_rows = len(all_rows)
group_count = 0
valid_group_count = 0
# 遍历所有行
while index < total_rows:
group_count += 1
# 检查是否有足够行构成一个数据组8行
if index + 8 > total_rows:
break
# 提取数据组的8行
group = all_rows[index:index + 8]
# 跳过数据组之间的3行
index += 11
# 验证数据组结构
if len(group) < 6:
continue
try:
# 提取时间戳(第一行第四列)
timestamp = group[0][3] # 索引3 = 第四列
# 提取波长(第四行)
wavelengths = [float(x) for x in group[3][1:-1] if x.strip()]
# 提取上行RAD值第五行
up_rad = [float(x) for x in group[6][1:-1] if x.strip()]
# 提取下行RAD值第六行
down_rad = [float(x) for x in group[7][1:-1] if x.strip()]
# 验证数据长度
if not (len(wavelengths) == len(up_rad) == len(down_rad)):
print(f"警告: 在文件 {os.path.basename(input_file)} 的时间点 {timestamp} 中, "
f"波长({len(wavelengths)})、上行({len(up_rad)})、下行({len(down_rad)})数据长度不一致")
continue
# 处理光谱漂移:
# 1. 下行rad值向前移动一位舍弃第一个值
# 2. 舍弃最后一个波长值
down_rad_shifted = down_rad[1:]
valid_wavelengths = wavelengths[:-1]
valid_up_rad = up_rad[:-1]
# 计算反射率上行DN / 下行DN移动后
# 增加分母为零的保护
reflectance = []
zero_denominator_count = 0
for u, d in zip(valid_up_rad, down_rad_shifted):
# 检查分母是否为零或非常小
if u > 1e-64: # 避免除以零或接近零的值
reflectance.append(d / u)
else:
# 分母无效时使用特殊值NaN或-9999
reflectance.append(float('0'))
zero_denominator_count += 1
# 记录分母为零的警告
if zero_denominator_count > 0:
print(f"警告: 在文件 {os.path.basename(input_file)} 的时间点 {timestamp} 中, "
f"{zero_denominator_count} 个波段的分母为零或接近零")
# 设置波长表头(仅第一次)
if wavelength_header is None:
wavelength_header = valid_wavelengths
# 添加结果行:时间戳 + 反射率数据
file_results.append([timestamp] + reflectance)
valid_group_count += 1
except (IndexError, ValueError) as e:
print(f"处理文件 {os.path.basename(input_file)} 时出错: {e}")
continue
return file_results, wavelength_header, group_count, valid_group_count
def process_spectral_folder(input_folder, output_file):
"""处理整个文件夹内的CSV文件"""
# 查找所有CSV文件
csv_files = glob.glob(os.path.join(input_folder, "*.csv"))
if not csv_files:
print(f"在文件夹 {input_folder} 中未找到CSV文件")
return False
print(f"找到 {len(csv_files)} 个CSV文件待处理")
# 存储所有结果
all_results = []
global_wavelength_header = None
total_groups = 0
total_valid_groups = 0
processed_files = 0
# 处理每个文件
for i, csv_file in enumerate(csv_files):
print(f"处理文件 {i + 1}/{len(csv_files)}: {os.path.basename(csv_file)}")
file_results, wavelength_header, group_count, valid_group_count = process_spectral_data(csv_file)
total_groups += group_count
total_valid_groups += valid_group_count
processed_files += 1
if file_results:
# 如果是第一个有效文件,设置全局波长表头
if global_wavelength_header is None and wavelength_header is not None:
global_wavelength_header = wavelength_header
all_results.append(['Timestamp'] + global_wavelength_header)
# 添加文件结果
all_results.extend(file_results)
else:
print(f"警告: 文件 {os.path.basename(csv_file)} 未提取到有效数据")
# 如果没有有效数据,提前退出
if not all_results:
print("错误: 未提取到任何有效数据,请检查输入文件格式")
return False
# 确保输出目录存在
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# 写入输出文件
try:
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerows(all_results)
except Exception as e:
print(f"写入输出文件时出错: {e}")
return False
# 打印汇总信息
print(f"\n处理完成! 共处理 {processed_files}/{len(csv_files)} 个文件")
print(f"发现 {total_groups} 个数据组, 成功提取 {total_valid_groups} 个有效数据组")
print(f"反射率数据已保存至: {output_file}")
return True
def main():
# 设置命令行参数解析
parser = argparse.ArgumentParser(
description='高光谱数据处理工具: 从原始传感器数据提取反射率并导出为CSV',
epilog='示例: python hyperspec_export.py -i "C:/input/data" -o "C:/output/reflectance.csv"'
)
parser.add_argument('-i', '--input', required=True,
help='输入文件夹路径包含原始高光谱CSV文件')
parser.add_argument('-o', '--output', required=True,
help='输出文件路径CSV格式')
# 解析参数
args = parser.parse_args()
# 检查输入路径是否存在
if not os.path.exists(args.input):
print(f"错误: 输入路径 '{args.input}' 不存在")
sys.exit(1)
# 检查输入路径是否是目录
if not os.path.isdir(args.input):
print(f"错误: '{args.input}' 不是有效的文件夹路径")
sys.exit(1)
# 处理文件夹
success = process_spectral_folder(args.input, args.output)
if not success:
print("处理过程中出现错误,请检查日志")
sys.exit(1)
if __name__ == "__main__":
main()