521 lines
20 KiB
Python
521 lines
20 KiB
Python
import csv, tempfile, os, re
|
||
import struct
|
||
import time
|
||
import numpy as np
|
||
import argparse
|
||
import paramiko
|
||
import shutil
|
||
import configparser
|
||
from ftplib import FTP
|
||
from pathlib import Path
|
||
from watchdog.observers import Observer
|
||
from watchdog.events import FileSystemEventHandler
|
||
from datetime import datetime
|
||
|
||
import ssl
|
||
from ftplib import FTP_TLS, error_perm
|
||
|
||
|
||
|
||
def parse_sif_csv(_file_path):
|
||
_metadata = {}
|
||
_wavelengths = []
|
||
_spectra_data = []
|
||
|
||
with open(_file_path, encoding='utf-8') as f:
|
||
reader = csv.reader(f)
|
||
rows = list(reader)
|
||
|
||
# 第1行:除了第一列,后续是变量名和值交替出现
|
||
row1 = rows[0][1:]
|
||
for i in range(0, len(row1), 2):
|
||
if i + 1 < len(row1):
|
||
_metadata[row1[i]] = row1[i + 1]
|
||
|
||
# 第2行:前一个是变量名,后一个是变量值
|
||
if len(rows) > 1 and len(rows[1]) >= 2:
|
||
_metadata[rows[1][0]] = rows[1][1]
|
||
|
||
# 第3行:除了第一列,后续是变量名和值交替出现
|
||
row3 = rows[2][1:]
|
||
for i in range(0, len(row3), 2):
|
||
if i + 1 < len(row3):
|
||
_metadata[row3[i]] = row3[i + 1]
|
||
|
||
# 第4行是波长
|
||
_wavelengths = np.array([float(w) for w in rows[3][1:]])
|
||
|
||
# 第5行忽略
|
||
|
||
# 从第6行开始是光谱数据
|
||
for row in rows[5:]:
|
||
if len(row) < 4 or row[1].lower() != "valid":
|
||
continue # 跳过表头或无效行
|
||
try:
|
||
entry = {
|
||
"Location": row[0],
|
||
"Valid": row[1],
|
||
"Integration": int(row[2]),
|
||
"DN": np.array([float(val) for val in row[3:]])
|
||
}
|
||
_spectra_data.append(entry)
|
||
except ValueError:
|
||
continue # 跳过不能解析的行
|
||
|
||
return _metadata, _wavelengths, _spectra_data
|
||
|
||
def read_cal(_file_path):
|
||
# 定义结构体格式
|
||
# unsigned int (4) + float (4) + int (4) + 4096 floats (4 each) + 4096 doubles (8 each) + 4096 doubles (8 each)
|
||
fmt = '<I f i ' + '4096f' + '4096d' + '4096d' # 小端
|
||
|
||
# 计算总字节大小
|
||
struct_size = struct.calcsize(fmt)
|
||
|
||
with open(_file_path, 'rb') as f:
|
||
data = f.read(struct_size)
|
||
unpacked = struct.unpack(fmt, data)
|
||
|
||
# 拆分数据
|
||
_uiExposureTimeInMS = unpacked[0]
|
||
_fTemperature = unpacked[1]
|
||
_iPixels = unpacked[2]
|
||
_fWaveLength = np.array(unpacked[3:3+4096])[0:_iPixels]
|
||
_dCal_Gain = np.array(unpacked[3+4096 : 3+4096+4096])[0:_iPixels]
|
||
_dCal_Offset = np.array(unpacked[3+4096+4096:])[0:_iPixels]
|
||
|
||
return _uiExposureTimeInMS,_fTemperature,_iPixels,_fWaveLength,_dCal_Gain,_dCal_Offset
|
||
|
||
def write_file(_in_path, _out_path, _spectra_data):
|
||
with open(_in_path, encoding='utf-8') as f:
|
||
reader = csv.reader(f)
|
||
rows = list(reader)
|
||
|
||
with open(_out_path, 'w', newline='', encoding='utf-8') as f:
|
||
writer = csv.writer(f)
|
||
|
||
# 写入前4行原样(元数据和波长信息)
|
||
writer.writerow(rows[0])
|
||
writer.writerow(rows[1])
|
||
writer.writerow(rows[2])
|
||
writer.writerow(rows[3])
|
||
writer.writerow(rows[4]) # 第5行照抄
|
||
|
||
# 写入处理后的光谱数据
|
||
for entry in _spectra_data:
|
||
row = [entry["Location"], entry["Valid"], entry["Integration"]] + list(entry["RAD"])
|
||
writer.writerow(row)
|
||
|
||
def get_sorted_files_by_number(folder_path):
|
||
# 获取文件夹下所有文件和子文件夹
|
||
all_items = os.listdir(folder_path)
|
||
|
||
# 过滤出文件(排除文件夹),并转换为绝对路径
|
||
files = [
|
||
os.path.abspath(os.path.join(folder_path, item))
|
||
for item in all_items
|
||
if os.path.isfile(os.path.join(folder_path, item))
|
||
]
|
||
|
||
# 定义一个函数来提取文件名中的数字(仅文件名部分)
|
||
def extract_numbers(filepath):
|
||
filename = os.path.basename(filepath) # 获取文件名(不含路径)
|
||
numbers = re.findall(r'\d+', filename) # 提取数字
|
||
return [int(num) for num in numbers] # 转为整数
|
||
|
||
# 按文件名中的数字排序文件
|
||
sorted_files = sorted(files, key=lambda x: extract_numbers(x))
|
||
|
||
return sorted_files
|
||
|
||
# 配置读取函数
|
||
def load_config(config_path='config.ini'):
|
||
config = configparser.ConfigParser()
|
||
config.read(config_path)
|
||
return config
|
||
|
||
class CSVFileHandler(FileSystemEventHandler):
|
||
def __init__(self, ftp_config):
|
||
super().__init__()
|
||
self.ftp_config = ftp_config
|
||
|
||
def on_created(self, event):
|
||
if event.is_directory:
|
||
return
|
||
if event.src_path.lower().endswith('.csv'):
|
||
file_path = os.path.abspath(event.src_path)
|
||
print(f"发现CSV文件: {file_path}----------------------------------------------------------------------------")
|
||
|
||
# 选择定标文件夹
|
||
cal_dir = self.ftp_config['monitor']['cal_dir']
|
||
if "towersif20" in file_path:
|
||
cal_dir = os.path.join(cal_dir,"20")
|
||
elif "towersif21" in file_path:
|
||
cal_dir = os.path.join(cal_dir,"21")
|
||
|
||
time.sleep(0.1) # 文件一出现就处理文件,偶发permission deny,所以等待100ms
|
||
|
||
rad_folder_tmp, sif_folder, quality_level = self.rad_conversion(file_path, cal_dir)
|
||
_ = self.compute_sif(file_path, rad_folder_tmp, sif_folder)
|
||
for i in _:
|
||
self.add_validity_column_to_file(i, quality_level)
|
||
|
||
self.send_via_ftps(_)
|
||
|
||
def send_via_ftps(self, file_paths, max_retries=3, retry_delay=5):
|
||
retries = 0
|
||
ftps = None
|
||
|
||
while retries < max_retries:
|
||
try:
|
||
print("正在尝试连接 FTPS 服务器...")
|
||
|
||
# 建立 FTPS 连接
|
||
ftps = FTP_TLS()
|
||
# 忽略自签名证书(你的服务器证书是 self-signed)
|
||
ftps.context = ssl._create_unverified_context()
|
||
ftps.connect(
|
||
host=self.ftp_config['FTP']['host'],
|
||
port=int(self.ftp_config['FTP'].get('port', 21)), # 默认 21, 你的可能是 65521
|
||
timeout=30
|
||
)
|
||
|
||
# 登录
|
||
ftps.login(
|
||
user=self.ftp_config['FTP']['user'],
|
||
passwd=self.ftp_config['FTP']['password']
|
||
)
|
||
print("FTPS 连接成功,准备上传文件...")
|
||
|
||
# 切换到安全数据通道
|
||
ftps.prot_p()
|
||
|
||
# 检查并切换到目标目录
|
||
remote_dir = self.ftp_config['FTP'].get('target_dir', '.')
|
||
try:
|
||
ftps.cwd(remote_dir)
|
||
except error_perm:
|
||
print(f"远程目录不存在,尝试创建: {remote_dir}")
|
||
ftps.mkd(remote_dir)
|
||
ftps.cwd(remote_dir)
|
||
|
||
# 上传多个文件
|
||
success_count = 0
|
||
for file_path in file_paths:
|
||
if not os.path.exists(file_path):
|
||
continue
|
||
try:
|
||
filename = os.path.basename(file_path)
|
||
with open(file_path, "rb") as f:
|
||
ftps.storbinary(f"STOR {filename}", f)
|
||
print(f"✅ 文件上传成功: {filename}")
|
||
success_count += 1
|
||
except Exception as e:
|
||
print(f"❌ 文件上传失败 {file_path}: {e}")
|
||
|
||
return success_count == len(file_paths) # 全部成功返回 True,否则 False
|
||
|
||
except Exception as e:
|
||
retries += 1
|
||
print(f"❌ FTPS 连接失败(尝试 {retries}/{max_retries}): {e}")
|
||
if retries < max_retries:
|
||
time.sleep(retry_delay)
|
||
finally:
|
||
if ftps:
|
||
try:
|
||
ftps.quit()
|
||
except Exception:
|
||
ftps.close()
|
||
|
||
print(f"❌ 上传失败(已达最大重试次数 {max_retries})")
|
||
return False
|
||
|
||
def send_via_sftp(self, file_paths, max_retries=3, retry_delay=5):
|
||
retries = 0
|
||
ssh = None
|
||
sftp = None
|
||
|
||
while retries < max_retries:
|
||
try:
|
||
print("正在尝试连接 SFTP 服务器...")
|
||
|
||
# 创建 SSH 客户端
|
||
ssh = paramiko.SSHClient()
|
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 生产环境建议更安全的方式
|
||
|
||
# 连接参数(参考 FileZilla 的设置)
|
||
ssh.connect(
|
||
hostname=self.ftp_config['FTP']['host'],
|
||
port=int(self.ftp_config['FTP'].get('port', 22)),
|
||
username=self.ftp_config['FTP']['user'],
|
||
password=self.ftp_config['FTP']['password'],
|
||
timeout=30,
|
||
allow_agent=False, # 禁用 ssh-agent(避免 'lost ssh-agent' 错误)
|
||
look_for_keys=False, # 不自动查找密钥(强制使用密码认证)
|
||
)
|
||
|
||
print("SFTP 连接成功,准备上传文件...")
|
||
sftp = ssh.open_sftp()
|
||
|
||
# 检查并切换到目标目录
|
||
remote_dir = self.ftp_config['FTP'].get('target_dir', '.')
|
||
try:
|
||
sftp.chdir(remote_dir) # 尝试进入目录
|
||
except IOError:
|
||
print(f"远程目录不存在,尝试创建: {remote_dir}")
|
||
sftp.mkdir(remote_dir) # 尝试创建目录
|
||
sftp.chdir(remote_dir)
|
||
|
||
# 上传多个文件
|
||
success_count = 0
|
||
for file_path in file_paths:
|
||
try:
|
||
filename = os.path.basename(file_path)
|
||
sftp.put(file_path, filename)
|
||
print(f"✅ 文件上传成功: {filename}")
|
||
success_count += 1
|
||
except Exception as e:
|
||
print(f"❌ 文件上传失败 {file_path}: {e}")
|
||
|
||
return success_count == len(file_paths) # 全部成功返回True,否则False
|
||
|
||
except paramiko.AuthenticationException as e:
|
||
print(f"❌ 认证失败: {e}")
|
||
return False
|
||
except paramiko.SSHException as e:
|
||
retries += 1
|
||
print(f"❌ SFTP 连接失败(尝试 {retries}/{max_retries}): {e}")
|
||
if retries < max_retries:
|
||
time.sleep(retry_delay)
|
||
except Exception as e:
|
||
print(f"❌ 未知错误: {e}")
|
||
return False
|
||
finally:
|
||
if sftp:
|
||
sftp.close()
|
||
if ssh:
|
||
ssh.close()
|
||
|
||
print(f"❌ 上传失败(已达最大重试次数 {max_retries})")
|
||
return False
|
||
|
||
def rad_conversion(self, input_csv, input_cal):
|
||
# 提取文件夹路径
|
||
folder_path = os.path.dirname(input_csv)
|
||
base_name = os.path.basename(input_csv) # 获取文件名(含扩展名)
|
||
name_part, ext = os.path.splitext(base_name) # 拆分文件名和扩展名
|
||
parts = name_part.split('_', 1) # 在第一个 _ 处分割
|
||
today = datetime.now()
|
||
formatted_date = today.strftime("%Y_%m_%d")
|
||
new_name = f"{formatted_date}_{parts[1]}{ext}" # 组合新文件名
|
||
|
||
result_folder = os.path.join(Path(ftp_config['monitor']['WATCH_DIR']).parent, "result")
|
||
if not os.path.exists(result_folder):
|
||
os.makedirs(result_folder)
|
||
print(f"文件夹已创建: {result_folder}")
|
||
else:
|
||
print(f"文件夹已存在: {result_folder}")
|
||
|
||
def get_last_two_levels(path):
|
||
path_obj = Path(path)
|
||
parts = path_obj.parts
|
||
# 获取最后两层
|
||
if len(parts) >= 2:
|
||
return Path(parts[-2]) / parts[-1]
|
||
else:
|
||
return path_obj # 如果路径少于两层,返回原路径
|
||
|
||
# folder_name = os.path.basename(folder_path)
|
||
folder_name = get_last_two_levels(folder_path)
|
||
|
||
rad_folder = os.path.join(result_folder, folder_name, "rad")
|
||
if not os.path.exists(rad_folder):
|
||
os.makedirs(rad_folder)
|
||
print(f"文件夹已创建: {rad_folder}")
|
||
else:
|
||
print(f"文件夹已存在: {rad_folder}")
|
||
|
||
sif_folder = os.path.join(result_folder, folder_name, "sif")
|
||
if not os.path.exists(sif_folder):
|
||
os.makedirs(sif_folder)
|
||
print(f"文件夹已创建: {sif_folder}")
|
||
else:
|
||
print(f"文件夹已存在: {sif_folder}")
|
||
|
||
rad_folder_tmp = os.path.join(result_folder, folder_name, "rad_tmp")
|
||
if os.path.exists(rad_folder_tmp):
|
||
shutil.rmtree(rad_folder_tmp)
|
||
os.makedirs(rad_folder_tmp)
|
||
rad_path = os.path.join(rad_folder_tmp, new_name)
|
||
|
||
metadata, wavelengths, spectra_data = parse_sif_csv(input_csv)
|
||
|
||
sorted_cal_files_path = get_sorted_files_by_number(input_cal)
|
||
for i in range(len(spectra_data)):
|
||
uiExposureTimeInMS, fTemperature, iPixels, fWaveLength, dCal_Gain, dCal_Offset = read_cal(
|
||
sorted_cal_files_path[i])
|
||
|
||
gain_scale = uiExposureTimeInMS / spectra_data[i]['Integration']
|
||
data_gain_adjust = dCal_Gain * gain_scale
|
||
spectra_data[i]['RAD'] = spectra_data[i]['DN'] * data_gain_adjust
|
||
|
||
write_file(input_csv, rad_path, spectra_data)
|
||
shutil.copy(rad_path, rad_folder)
|
||
|
||
# 计算有效性
|
||
rad_file_name = os.path.basename(rad_path)
|
||
quality_level = self.compute_quality_level(rad_file_name, rad_folder)
|
||
|
||
return rad_folder_tmp, sif_folder, quality_level
|
||
|
||
def compute_quality_level(self, file_name, folder_path, center=753, window=3):
|
||
all_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
|
||
|
||
# 处理文件夹为空或只有给定文件的情况
|
||
if not all_files or file_name not in all_files:
|
||
return 2
|
||
|
||
def parse_time_from_filename(filename):
|
||
# 去掉.csv后缀,然后用下划线分割
|
||
time_str = filename.rsplit('.', 1)[0]
|
||
try:
|
||
# 将字符串转换为datetime对象
|
||
return datetime.strptime(time_str, '%Y_%m_%d_%H_%M_%S')
|
||
except ValueError:
|
||
# 如果文件名不符合格式,返回一个遥远的未来时间,确保它被排到最后
|
||
return datetime.max
|
||
|
||
sorted_files = sorted(all_files, key=parse_time_from_filename)
|
||
|
||
index_of_given = sorted_files.index(file_name)
|
||
|
||
if index_of_given==0:
|
||
return 2
|
||
|
||
current_file = os.path.join(folder_path, file_name)
|
||
metadata1, wavelengths1, spectra_data1 = parse_sif_csv(current_file)
|
||
|
||
pre_file = os.path.join(folder_path, sorted_files[index_of_given-1])
|
||
metadata2, wavelengths2, spectra_data2 = parse_sif_csv(pre_file)
|
||
|
||
lower_bound = center - window
|
||
upper_bound = center + window
|
||
|
||
indices = np.where((wavelengths2 >= lower_bound) & (wavelengths2 <= upper_bound))[0]
|
||
|
||
pos = 1 # 以csv文件中那个位置计算可信度
|
||
a1 = spectra_data1[pos]['DN'][indices].mean()
|
||
a2 = spectra_data2[pos]['DN'][indices].mean()
|
||
|
||
quality_number = abs(a2 - a1) / a1 * 100
|
||
|
||
if quality_number <= 2:
|
||
return 0
|
||
elif quality_number <= 5:
|
||
return 1
|
||
elif quality_number <= 10:
|
||
return 2
|
||
else:
|
||
return 3
|
||
|
||
def compute_sif(self, input_csv_dn, rad_folder, sif_folder):
|
||
folder_path = os.path.dirname(input_csv_dn)
|
||
base_name = os.path.basename(input_csv_dn) # 获取文件名(含扩展名)
|
||
name_part, ext = os.path.splitext(base_name) # 拆分文件名和扩展名
|
||
parts = name_part.split('_', 1) # 在第一个 _ 处分割
|
||
today = datetime.now()
|
||
formatted_date = today.strftime("%Y_%m_%d")
|
||
new_name = f"{formatted_date}_{parts[1]}{ext}" # 组合新文件名
|
||
|
||
# 调用丰算法
|
||
if os.name == "nt": # Windows
|
||
program_path = r"python D:\PycharmProjects\sif\sif_retrieval.py"
|
||
standard_sif_path = r"C:\EasySIF\standard_sif.csv"
|
||
elif os.name == "posix": # Linux/macOS/Unix-like
|
||
program_path = r"python3 /root/sif/feng/sif_retrieval.py"
|
||
standard_sif_path = r"/root/sif/feng/standard_sif.csv"
|
||
|
||
input_path = rad_folder
|
||
|
||
file_name_tmp = parts[0] + "_" + new_name.split('.')[0]
|
||
|
||
output_path_3fld = os.path.join(sif_folder, file_name_tmp + "_3fld.csv")
|
||
param_3fld = r"[740,780],[756,759],[761,762] P1 3fld"
|
||
|
||
output_path_sfld = os.path.join(sif_folder, file_name_tmp + "_sfld.csv")
|
||
param_sfld = r"[740,780],[756,759] P1 sfld"
|
||
|
||
output_path_sfm = os.path.join(sif_folder, file_name_tmp + "_sfm.csv")
|
||
param_sfm = r" [759,770],760 P1 sfm"
|
||
|
||
command_str_3fld = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_3fld + " " + param_3fld
|
||
command_str_sfld = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfld + " " + param_sfld
|
||
command_str_sfm = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfm + " " + param_sfm
|
||
|
||
return_code = os.system(command_str_3fld)
|
||
print(command_str_3fld)
|
||
return_code = os.system(command_str_sfld)
|
||
return_code = os.system(command_str_sfm)
|
||
print(f"命令返回状态码: {return_code}")
|
||
|
||
return output_path_3fld, output_path_sfld, output_path_sfm
|
||
|
||
def add_validity_column_to_file(self, file_path, quality_level):
|
||
if not os.path.exists(file_path):
|
||
return
|
||
|
||
# 创建临时文件
|
||
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, newline='')
|
||
|
||
try:
|
||
with open(file_path, 'r', newline='') as csvfile, temp_file:
|
||
reader = csv.reader(csvfile)
|
||
writer = csv.writer(temp_file)
|
||
|
||
# 读取所有行
|
||
rows = list(reader)
|
||
|
||
if len(rows) < 2:
|
||
return # 如果行数不足,直接返回
|
||
|
||
# 添加validity列
|
||
rows[0].append('validity')
|
||
rows[1].append(quality_level)
|
||
|
||
# 写入临时文件
|
||
writer.writerows(rows)
|
||
|
||
# 用临时文件替换原始文件
|
||
shutil.move(temp_file.name, file_path)
|
||
|
||
except Exception as e:
|
||
# 如果出错,删除临时文件
|
||
os.unlink(temp_file.name)
|
||
raise e
|
||
|
||
|
||
# Press the green button in the gutter to run the script.
|
||
if __name__ == '__main__':
|
||
parser = argparse.ArgumentParser(description="监控文件夹的状态,当出现新的csv时,提取sif,并通过ftp发送。", prog='sif.')
|
||
|
||
parser.add_argument('-i', '--input_ini', required=True, type=str, help='输入ini配置文件路径。')
|
||
|
||
parser.add_argument("-v", "--version", action='version', version='%(prog)s 1.0')
|
||
# parser.add_argument('-v', '--verbose', action='store_true', help='启用详细模式')
|
||
|
||
args = parser.parse_args()
|
||
|
||
ftp_config = load_config(args.input_ini)
|
||
event_handler = CSVFileHandler(ftp_config)
|
||
observer = Observer()
|
||
observer.schedule(event_handler, ftp_config['monitor']['WATCH_DIR'], recursive=True)
|
||
observer.start()
|
||
print(f"正在监控目录:{ftp_config['monitor']['WATCH_DIR']}")
|
||
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
observer.stop()
|
||
observer.join()
|