import pandas as pd import numpy as np from datetime import datetime, timedelta, timezone from pysolar.solar import get_altitude import argparse import sys def calculate_solar_elevation(lat, lon, dt): """计算给定时间和地点的太阳高度角(度)""" # 将本地时间(如北京时间)转换为UTC时间(东八区) utc_dt = dt - timedelta(hours=8) utc_dt = utc_dt.replace(tzinfo=timezone.utc) # 计算太阳高度角 return get_altitude(lat, lon, utc_dt) def filter_data_by_solar_elevation(input_file, output_file, lat, lon, threshold1, threshold2): """ 处理高光谱数据文件,根据太阳高度角筛选数据并过滤异常光谱 参数: input_file: 输入CSV文件路径 output_file: 输出CSV文件路径 lat: 纬度(度) lon: 经度(度) threshold1: 主要太阳高度角阈值(度) threshold2: 次要太阳高度角阈值(度) """ # 读取数据文件 df = pd.read_csv(input_file) # 解析时间列(假设第一列名为'timestamp') time_column = df.columns[0] df[time_column] = pd.to_datetime(df[time_column], format='%Y_%m_%d %H:%M:%S') # 计算每个时间点的太阳高度角 df['solar_elevation'] = df[time_column].apply( lambda dt: calculate_solar_elevation(lat, lon, dt) ) # 添加日期列用于分组 df['date'] = df[time_column].dt.date # 按日期分组处理 filtered_dfs = [] for date, group in df.groupby('date'): # 筛选当前日期太阳高度角 > 阈值1 的数据 high_elevation = group[group['solar_elevation'] > threshold1] if not high_elevation.empty: filtered_dfs.append(high_elevation) else: # 如果没有 > 阈值1,尝试 > 阈值2 medium_elevation = group[group['solar_elevation'] > threshold2] if not medium_elevation.empty: filtered_dfs.append(medium_elevation) else: print(f"警告: {date} 没有符合条件的太阳高度角数据 (>{threshold2}°)") # 合并筛选后的数据 if filtered_dfs: result_df = pd.concat(filtered_dfs) else: print("警告: 没有找到任何符合条件的数据") result_df = pd.DataFrame(columns=df.columns) # 删除辅助列 result_df = result_df.drop(columns=['solar_elevation', 'date']) # 新增:筛选异常光谱(任何波段反射率 > 2) if not result_df.empty: # 获取反射率列(排除时间列) reflectance_cols = result_df.columns[1:] # 创建异常值掩码 abnormal_mask = (result_df[reflectance_cols] > 2).any(axis=1) # 过滤异常值 clean_df = result_df[~abnormal_mask] # 统计信息 total_count = len(result_df) abnormal_count = abnormal_mask.sum() print(f"过滤异常光谱: 共处理 {total_count} 条数据," f"发现 {abnormal_count} 条异常光谱 ({abnormal_count / total_count:.1%})") else: clean_df = result_df print("无有效数据进行异常光谱过滤") # 保存结果 clean_df.to_csv(output_file, index=False) print(f"处理完成! 有效数据 {len(clean_df)} 条,已保存到: {output_file}") return clean_df def main(): # 创建命令行参数解析器 parser = argparse.ArgumentParser( description='高光谱数据筛选工具 - 根据太阳高度角和反射率异常值筛选数据', formatter_class=argparse.ArgumentDefaultsHelpFormatter ) # 添加命令行参数 parser.add_argument('-i', '--input', required=True, help='输入CSV文件路径') parser.add_argument('-o', '--output', required=True, help='输出CSV文件路径') parser.add_argument('--lat', type=float, required=True, help='纬度(度)') parser.add_argument('--lon', type=float, required=True, help='经度(度)') parser.add_argument('--threshold1', type=float, default=45.0, help='主要太阳高度角阈值(度)') parser.add_argument('--threshold2', type=float, default=30.0, help='次要太阳高度角阈值(度)') # 解析命令行参数 args = parser.parse_args() # 打印参数摘要 print("\n" + "=" * 50) print("光谱数据筛选参数:") print(f"输入文件: {args.input}") print(f"输出文件: {args.output}") print(f"纬度: {args.lat}°") print(f"经度: {args.lon}°") print(f"主高度角阈值: {args.threshold1}°") print(f"次高度角阈值: {args.threshold2}°") print("=" * 50 + "\n") # 执行数据处理 try: filter_data_by_solar_elevation( args.input, args.output, args.lat, args.lon, args.threshold1, args.threshold2 ) print("数据处理成功完成!") except Exception as e: print(f"\n错误: 数据处理失败 - {str(e)}") sys.exit(1) if __name__ == "__main__": main()