Files
2025-07-22 10:29:43 +08:00

151 lines
5.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pysolar.solar import get_altitude
import argparse
import sys
def calculate_solar_elevation(lat, lon, dt):
"""计算给定时间和地点的太阳高度角(度)"""
# 将本地时间如北京时间转换为UTC时间东八区
utc_dt = dt - timedelta(hours=8)
utc_dt = utc_dt.replace(tzinfo=timezone.utc)
# 计算太阳高度角
return get_altitude(lat, lon, utc_dt)
def filter_data_by_solar_elevation(input_file, output_file, lat, lon, threshold1, threshold2):
"""
处理高光谱数据文件,根据太阳高度角筛选数据并过滤异常光谱
参数:
input_file: 输入CSV文件路径
output_file: 输出CSV文件路径
lat: 纬度(度)
lon: 经度(度)
threshold1: 主要太阳高度角阈值(度)
threshold2: 次要太阳高度角阈值(度)
"""
# 读取数据文件
df = pd.read_csv(input_file)
# 解析时间列(假设第一列名为'timestamp'
time_column = df.columns[0]
df[time_column] = pd.to_datetime(df[time_column], format='%Y_%m_%d %H:%M:%S')
# 计算每个时间点的太阳高度角
df['solar_elevation'] = df[time_column].apply(
lambda dt: calculate_solar_elevation(lat, lon, dt)
)
# 添加日期列用于分组
df['date'] = df[time_column].dt.date
# 按日期分组处理
filtered_dfs = []
for date, group in df.groupby('date'):
# 筛选当前日期太阳高度角 > 阈值1 的数据
high_elevation = group[group['solar_elevation'] > threshold1]
if not high_elevation.empty:
filtered_dfs.append(high_elevation)
else:
# 如果没有 > 阈值1尝试 > 阈值2
medium_elevation = group[group['solar_elevation'] > threshold2]
if not medium_elevation.empty:
filtered_dfs.append(medium_elevation)
else:
print(f"警告: {date} 没有符合条件的太阳高度角数据 (>{threshold2}°)")
# 合并筛选后的数据
if filtered_dfs:
result_df = pd.concat(filtered_dfs)
else:
print("警告: 没有找到任何符合条件的数据")
result_df = pd.DataFrame(columns=df.columns)
# 删除辅助列
result_df = result_df.drop(columns=['solar_elevation', 'date'])
# 新增:筛选异常光谱(任何波段反射率 > 2
if not result_df.empty:
# 获取反射率列(排除时间列)
reflectance_cols = result_df.columns[1:]
# 创建异常值掩码
abnormal_mask = (result_df[reflectance_cols] > 2).any(axis=1)
# 过滤异常值
clean_df = result_df[~abnormal_mask]
# 统计信息
total_count = len(result_df)
abnormal_count = abnormal_mask.sum()
print(f"过滤异常光谱: 共处理 {total_count} 条数据,"
f"发现 {abnormal_count} 条异常光谱 ({abnormal_count / total_count:.1%})")
else:
clean_df = result_df
print("无有效数据进行异常光谱过滤")
# 保存结果
clean_df.to_csv(output_file, index=False)
print(f"处理完成! 有效数据 {len(clean_df)} 条,已保存到: {output_file}")
return clean_df
def main():
# 创建命令行参数解析器
parser = argparse.ArgumentParser(
description='高光谱数据筛选工具 - 根据太阳高度角和反射率异常值筛选数据',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# 添加命令行参数
parser.add_argument('-i', '--input', required=True,
help='输入CSV文件路径')
parser.add_argument('-o', '--output', required=True,
help='输出CSV文件路径')
parser.add_argument('--lat', type=float, required=True,
help='纬度(度)')
parser.add_argument('--lon', type=float, required=True,
help='经度(度)')
parser.add_argument('--threshold1', type=float, default=45.0,
help='主要太阳高度角阈值(度)')
parser.add_argument('--threshold2', type=float, default=30.0,
help='次要太阳高度角阈值(度)')
# 解析命令行参数
args = parser.parse_args()
# 打印参数摘要
print("\n" + "=" * 50)
print("光谱数据筛选参数:")
print(f"输入文件: {args.input}")
print(f"输出文件: {args.output}")
print(f"纬度: {args.lat}°")
print(f"经度: {args.lon}°")
print(f"主高度角阈值: {args.threshold1}°")
print(f"次高度角阈值: {args.threshold2}°")
print("=" * 50 + "\n")
# 执行数据处理
try:
filter_data_by_solar_elevation(
args.input,
args.output,
args.lat,
args.lon,
args.threshold1,
args.threshold2
)
print("数据处理成功完成!")
except Exception as e:
print(f"\n错误: 数据处理失败 - {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()