151 lines
5.2 KiB
Python
151 lines
5.2 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime, timedelta, timezone
|
||
from pysolar.solar import get_altitude
|
||
import argparse
|
||
import sys
|
||
|
||
|
||
def calculate_solar_elevation(lat, lon, dt):
|
||
"""计算给定时间和地点的太阳高度角(度)"""
|
||
# 将本地时间(如北京时间)转换为UTC时间(东八区)
|
||
utc_dt = dt - timedelta(hours=8)
|
||
utc_dt = utc_dt.replace(tzinfo=timezone.utc)
|
||
|
||
# 计算太阳高度角
|
||
return get_altitude(lat, lon, utc_dt)
|
||
|
||
|
||
def filter_data_by_solar_elevation(input_file, output_file, lat, lon, threshold1, threshold2):
|
||
"""
|
||
处理高光谱数据文件,根据太阳高度角筛选数据并过滤异常光谱
|
||
|
||
参数:
|
||
input_file: 输入CSV文件路径
|
||
output_file: 输出CSV文件路径
|
||
lat: 纬度(度)
|
||
lon: 经度(度)
|
||
threshold1: 主要太阳高度角阈值(度)
|
||
threshold2: 次要太阳高度角阈值(度)
|
||
"""
|
||
# 读取数据文件
|
||
df = pd.read_csv(input_file)
|
||
|
||
# 解析时间列(假设第一列名为'timestamp')
|
||
time_column = df.columns[0]
|
||
df[time_column] = pd.to_datetime(df[time_column], format='%Y_%m_%d %H:%M:%S')
|
||
|
||
# 计算每个时间点的太阳高度角
|
||
df['solar_elevation'] = df[time_column].apply(
|
||
lambda dt: calculate_solar_elevation(lat, lon, dt)
|
||
)
|
||
|
||
# 添加日期列用于分组
|
||
df['date'] = df[time_column].dt.date
|
||
|
||
# 按日期分组处理
|
||
filtered_dfs = []
|
||
for date, group in df.groupby('date'):
|
||
# 筛选当前日期太阳高度角 > 阈值1 的数据
|
||
high_elevation = group[group['solar_elevation'] > threshold1]
|
||
|
||
if not high_elevation.empty:
|
||
filtered_dfs.append(high_elevation)
|
||
else:
|
||
# 如果没有 > 阈值1,尝试 > 阈值2
|
||
medium_elevation = group[group['solar_elevation'] > threshold2]
|
||
if not medium_elevation.empty:
|
||
filtered_dfs.append(medium_elevation)
|
||
else:
|
||
print(f"警告: {date} 没有符合条件的太阳高度角数据 (>{threshold2}°)")
|
||
|
||
# 合并筛选后的数据
|
||
if filtered_dfs:
|
||
result_df = pd.concat(filtered_dfs)
|
||
else:
|
||
print("警告: 没有找到任何符合条件的数据")
|
||
result_df = pd.DataFrame(columns=df.columns)
|
||
|
||
# 删除辅助列
|
||
result_df = result_df.drop(columns=['solar_elevation', 'date'])
|
||
|
||
# 新增:筛选异常光谱(任何波段反射率 > 2)
|
||
if not result_df.empty:
|
||
# 获取反射率列(排除时间列)
|
||
reflectance_cols = result_df.columns[1:]
|
||
|
||
# 创建异常值掩码
|
||
abnormal_mask = (result_df[reflectance_cols] > 2).any(axis=1)
|
||
|
||
# 过滤异常值
|
||
clean_df = result_df[~abnormal_mask]
|
||
|
||
# 统计信息
|
||
total_count = len(result_df)
|
||
abnormal_count = abnormal_mask.sum()
|
||
print(f"过滤异常光谱: 共处理 {total_count} 条数据,"
|
||
f"发现 {abnormal_count} 条异常光谱 ({abnormal_count / total_count:.1%})")
|
||
else:
|
||
clean_df = result_df
|
||
print("无有效数据进行异常光谱过滤")
|
||
|
||
# 保存结果
|
||
clean_df.to_csv(output_file, index=False)
|
||
print(f"处理完成! 有效数据 {len(clean_df)} 条,已保存到: {output_file}")
|
||
|
||
return clean_df
|
||
|
||
|
||
def main():
|
||
# 创建命令行参数解析器
|
||
parser = argparse.ArgumentParser(
|
||
description='高光谱数据筛选工具 - 根据太阳高度角和反射率异常值筛选数据',
|
||
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||
)
|
||
|
||
# 添加命令行参数
|
||
parser.add_argument('-i', '--input', required=True,
|
||
help='输入CSV文件路径')
|
||
parser.add_argument('-o', '--output', required=True,
|
||
help='输出CSV文件路径')
|
||
parser.add_argument('--lat', type=float, required=True,
|
||
help='纬度(度)')
|
||
parser.add_argument('--lon', type=float, required=True,
|
||
help='经度(度)')
|
||
parser.add_argument('--threshold1', type=float, default=45.0,
|
||
help='主要太阳高度角阈值(度)')
|
||
parser.add_argument('--threshold2', type=float, default=30.0,
|
||
help='次要太阳高度角阈值(度)')
|
||
|
||
# 解析命令行参数
|
||
args = parser.parse_args()
|
||
|
||
# 打印参数摘要
|
||
print("\n" + "=" * 50)
|
||
print("光谱数据筛选参数:")
|
||
print(f"输入文件: {args.input}")
|
||
print(f"输出文件: {args.output}")
|
||
print(f"纬度: {args.lat}°")
|
||
print(f"经度: {args.lon}°")
|
||
print(f"主高度角阈值: {args.threshold1}°")
|
||
print(f"次高度角阈值: {args.threshold2}°")
|
||
print("=" * 50 + "\n")
|
||
|
||
# 执行数据处理
|
||
try:
|
||
filter_data_by_solar_elevation(
|
||
args.input,
|
||
args.output,
|
||
args.lat,
|
||
args.lon,
|
||
args.threshold1,
|
||
args.threshold2
|
||
)
|
||
print("数据处理成功完成!")
|
||
except Exception as e:
|
||
print(f"\n错误: 数据处理失败 - {str(e)}")
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |