Files
sif_didiaoyuan/main.py
tangchao0503 dcd9c76ddd 1、错误控制:文件不存在、sif计算失败;
2、监控某一目录,并将新生成的csv文件上传到ftp服务器;
2025-10-11 15:11:48 +08:00

521 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv, tempfile, os, re
import struct
import time
import numpy as np
import argparse
import paramiko
import shutil
import configparser
from ftplib import FTP
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime
import ssl
from ftplib import FTP_TLS, error_perm
def parse_sif_csv(_file_path):
_metadata = {}
_wavelengths = []
_spectra_data = []
with open(_file_path, encoding='utf-8') as f:
reader = csv.reader(f)
rows = list(reader)
# 第1行除了第一列后续是变量名和值交替出现
row1 = rows[0][1:]
for i in range(0, len(row1), 2):
if i + 1 < len(row1):
_metadata[row1[i]] = row1[i + 1]
# 第2行前一个是变量名后一个是变量值
if len(rows) > 1 and len(rows[1]) >= 2:
_metadata[rows[1][0]] = rows[1][1]
# 第3行除了第一列后续是变量名和值交替出现
row3 = rows[2][1:]
for i in range(0, len(row3), 2):
if i + 1 < len(row3):
_metadata[row3[i]] = row3[i + 1]
# 第4行是波长
_wavelengths = np.array([float(w) for w in rows[3][1:]])
# 第5行忽略
# 从第6行开始是光谱数据
for row in rows[5:]:
if len(row) < 4 or row[1].lower() != "valid":
continue # 跳过表头或无效行
try:
entry = {
"Location": row[0],
"Valid": row[1],
"Integration": int(row[2]),
"DN": np.array([float(val) for val in row[3:]])
}
_spectra_data.append(entry)
except ValueError:
continue # 跳过不能解析的行
return _metadata, _wavelengths, _spectra_data
def read_cal(_file_path):
# 定义结构体格式
# unsigned int (4) + float (4) + int (4) + 4096 floats (4 each) + 4096 doubles (8 each) + 4096 doubles (8 each)
fmt = '<I f i ' + '4096f' + '4096d' + '4096d' # 小端
# 计算总字节大小
struct_size = struct.calcsize(fmt)
with open(_file_path, 'rb') as f:
data = f.read(struct_size)
unpacked = struct.unpack(fmt, data)
# 拆分数据
_uiExposureTimeInMS = unpacked[0]
_fTemperature = unpacked[1]
_iPixels = unpacked[2]
_fWaveLength = np.array(unpacked[3:3+4096])[0:_iPixels]
_dCal_Gain = np.array(unpacked[3+4096 : 3+4096+4096])[0:_iPixels]
_dCal_Offset = np.array(unpacked[3+4096+4096:])[0:_iPixels]
return _uiExposureTimeInMS,_fTemperature,_iPixels,_fWaveLength,_dCal_Gain,_dCal_Offset
def write_file(_in_path, _out_path, _spectra_data):
with open(_in_path, encoding='utf-8') as f:
reader = csv.reader(f)
rows = list(reader)
with open(_out_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 写入前4行原样元数据和波长信息
writer.writerow(rows[0])
writer.writerow(rows[1])
writer.writerow(rows[2])
writer.writerow(rows[3])
writer.writerow(rows[4]) # 第5行照抄
# 写入处理后的光谱数据
for entry in _spectra_data:
row = [entry["Location"], entry["Valid"], entry["Integration"]] + list(entry["RAD"])
writer.writerow(row)
def get_sorted_files_by_number(folder_path):
# 获取文件夹下所有文件和子文件夹
all_items = os.listdir(folder_path)
# 过滤出文件(排除文件夹),并转换为绝对路径
files = [
os.path.abspath(os.path.join(folder_path, item))
for item in all_items
if os.path.isfile(os.path.join(folder_path, item))
]
# 定义一个函数来提取文件名中的数字(仅文件名部分)
def extract_numbers(filepath):
filename = os.path.basename(filepath) # 获取文件名(不含路径)
numbers = re.findall(r'\d+', filename) # 提取数字
return [int(num) for num in numbers] # 转为整数
# 按文件名中的数字排序文件
sorted_files = sorted(files, key=lambda x: extract_numbers(x))
return sorted_files
# 配置读取函数
def load_config(config_path='config.ini'):
config = configparser.ConfigParser()
config.read(config_path)
return config
class CSVFileHandler(FileSystemEventHandler):
def __init__(self, ftp_config):
super().__init__()
self.ftp_config = ftp_config
def on_created(self, event):
if event.is_directory:
return
if event.src_path.lower().endswith('.csv'):
file_path = os.path.abspath(event.src_path)
print(f"发现CSV文件: {file_path}----------------------------------------------------------------------------")
# 选择定标文件夹
cal_dir = self.ftp_config['monitor']['cal_dir']
if "towersif20" in file_path:
cal_dir = os.path.join(cal_dir,"20")
elif "towersif21" in file_path:
cal_dir = os.path.join(cal_dir,"21")
time.sleep(0.1) # 文件一出现就处理文件偶发permission deny所以等待100ms
rad_folder_tmp, sif_folder, quality_level = self.rad_conversion(file_path, cal_dir)
_ = self.compute_sif(file_path, rad_folder_tmp, sif_folder)
for i in _:
self.add_validity_column_to_file(i, quality_level)
self.send_via_ftps(_)
def send_via_ftps(self, file_paths, max_retries=3, retry_delay=5):
retries = 0
ftps = None
while retries < max_retries:
try:
print("正在尝试连接 FTPS 服务器...")
# 建立 FTPS 连接
ftps = FTP_TLS()
# 忽略自签名证书(你的服务器证书是 self-signed
ftps.context = ssl._create_unverified_context()
ftps.connect(
host=self.ftp_config['FTP']['host'],
port=int(self.ftp_config['FTP'].get('port', 21)), # 默认 21, 你的可能是 65521
timeout=30
)
# 登录
ftps.login(
user=self.ftp_config['FTP']['user'],
passwd=self.ftp_config['FTP']['password']
)
print("FTPS 连接成功,准备上传文件...")
# 切换到安全数据通道
ftps.prot_p()
# 检查并切换到目标目录
remote_dir = self.ftp_config['FTP'].get('target_dir', '.')
try:
ftps.cwd(remote_dir)
except error_perm:
print(f"远程目录不存在,尝试创建: {remote_dir}")
ftps.mkd(remote_dir)
ftps.cwd(remote_dir)
# 上传多个文件
success_count = 0
for file_path in file_paths:
if not os.path.exists(file_path):
continue
try:
filename = os.path.basename(file_path)
with open(file_path, "rb") as f:
ftps.storbinary(f"STOR {filename}", f)
print(f"✅ 文件上传成功: {filename}")
success_count += 1
except Exception as e:
print(f"❌ 文件上传失败 {file_path}: {e}")
return success_count == len(file_paths) # 全部成功返回 True否则 False
except Exception as e:
retries += 1
print(f"❌ FTPS 连接失败(尝试 {retries}/{max_retries}: {e}")
if retries < max_retries:
time.sleep(retry_delay)
finally:
if ftps:
try:
ftps.quit()
except Exception:
ftps.close()
print(f"❌ 上传失败(已达最大重试次数 {max_retries}")
return False
def send_via_sftp(self, file_paths, max_retries=3, retry_delay=5):
retries = 0
ssh = None
sftp = None
while retries < max_retries:
try:
print("正在尝试连接 SFTP 服务器...")
# 创建 SSH 客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 生产环境建议更安全的方式
# 连接参数(参考 FileZilla 的设置)
ssh.connect(
hostname=self.ftp_config['FTP']['host'],
port=int(self.ftp_config['FTP'].get('port', 22)),
username=self.ftp_config['FTP']['user'],
password=self.ftp_config['FTP']['password'],
timeout=30,
allow_agent=False, # 禁用 ssh-agent避免 'lost ssh-agent' 错误)
look_for_keys=False, # 不自动查找密钥(强制使用密码认证)
)
print("SFTP 连接成功,准备上传文件...")
sftp = ssh.open_sftp()
# 检查并切换到目标目录
remote_dir = self.ftp_config['FTP'].get('target_dir', '.')
try:
sftp.chdir(remote_dir) # 尝试进入目录
except IOError:
print(f"远程目录不存在,尝试创建: {remote_dir}")
sftp.mkdir(remote_dir) # 尝试创建目录
sftp.chdir(remote_dir)
# 上传多个文件
success_count = 0
for file_path in file_paths:
try:
filename = os.path.basename(file_path)
sftp.put(file_path, filename)
print(f"✅ 文件上传成功: {filename}")
success_count += 1
except Exception as e:
print(f"❌ 文件上传失败 {file_path}: {e}")
return success_count == len(file_paths) # 全部成功返回True否则False
except paramiko.AuthenticationException as e:
print(f"❌ 认证失败: {e}")
return False
except paramiko.SSHException as e:
retries += 1
print(f"❌ SFTP 连接失败(尝试 {retries}/{max_retries}: {e}")
if retries < max_retries:
time.sleep(retry_delay)
except Exception as e:
print(f"❌ 未知错误: {e}")
return False
finally:
if sftp:
sftp.close()
if ssh:
ssh.close()
print(f"❌ 上传失败(已达最大重试次数 {max_retries}")
return False
def rad_conversion(self, input_csv, input_cal):
# 提取文件夹路径
folder_path = os.path.dirname(input_csv)
base_name = os.path.basename(input_csv) # 获取文件名(含扩展名)
name_part, ext = os.path.splitext(base_name) # 拆分文件名和扩展名
parts = name_part.split('_', 1) # 在第一个 _ 处分割
today = datetime.now()
formatted_date = today.strftime("%Y_%m_%d")
new_name = f"{formatted_date}_{parts[1]}{ext}" # 组合新文件名
result_folder = os.path.join(Path(ftp_config['monitor']['WATCH_DIR']).parent, "result")
if not os.path.exists(result_folder):
os.makedirs(result_folder)
print(f"文件夹已创建: {result_folder}")
else:
print(f"文件夹已存在: {result_folder}")
def get_last_two_levels(path):
path_obj = Path(path)
parts = path_obj.parts
# 获取最后两层
if len(parts) >= 2:
return Path(parts[-2]) / parts[-1]
else:
return path_obj # 如果路径少于两层,返回原路径
# folder_name = os.path.basename(folder_path)
folder_name = get_last_two_levels(folder_path)
rad_folder = os.path.join(result_folder, folder_name, "rad")
if not os.path.exists(rad_folder):
os.makedirs(rad_folder)
print(f"文件夹已创建: {rad_folder}")
else:
print(f"文件夹已存在: {rad_folder}")
sif_folder = os.path.join(result_folder, folder_name, "sif")
if not os.path.exists(sif_folder):
os.makedirs(sif_folder)
print(f"文件夹已创建: {sif_folder}")
else:
print(f"文件夹已存在: {sif_folder}")
rad_folder_tmp = os.path.join(result_folder, folder_name, "rad_tmp")
if os.path.exists(rad_folder_tmp):
shutil.rmtree(rad_folder_tmp)
os.makedirs(rad_folder_tmp)
rad_path = os.path.join(rad_folder_tmp, new_name)
metadata, wavelengths, spectra_data = parse_sif_csv(input_csv)
sorted_cal_files_path = get_sorted_files_by_number(input_cal)
for i in range(len(spectra_data)):
uiExposureTimeInMS, fTemperature, iPixels, fWaveLength, dCal_Gain, dCal_Offset = read_cal(
sorted_cal_files_path[i])
gain_scale = uiExposureTimeInMS / spectra_data[i]['Integration']
data_gain_adjust = dCal_Gain * gain_scale
spectra_data[i]['RAD'] = spectra_data[i]['DN'] * data_gain_adjust
write_file(input_csv, rad_path, spectra_data)
shutil.copy(rad_path, rad_folder)
# 计算有效性
rad_file_name = os.path.basename(rad_path)
quality_level = self.compute_quality_level(rad_file_name, rad_folder)
return rad_folder_tmp, sif_folder, quality_level
def compute_quality_level(self, file_name, folder_path, center=753, window=3):
all_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
# 处理文件夹为空或只有给定文件的情况
if not all_files or file_name not in all_files:
return 2
def parse_time_from_filename(filename):
# 去掉.csv后缀然后用下划线分割
time_str = filename.rsplit('.', 1)[0]
try:
# 将字符串转换为datetime对象
return datetime.strptime(time_str, '%Y_%m_%d_%H_%M_%S')
except ValueError:
# 如果文件名不符合格式,返回一个遥远的未来时间,确保它被排到最后
return datetime.max
sorted_files = sorted(all_files, key=parse_time_from_filename)
index_of_given = sorted_files.index(file_name)
if index_of_given==0:
return 2
current_file = os.path.join(folder_path, file_name)
metadata1, wavelengths1, spectra_data1 = parse_sif_csv(current_file)
pre_file = os.path.join(folder_path, sorted_files[index_of_given-1])
metadata2, wavelengths2, spectra_data2 = parse_sif_csv(pre_file)
lower_bound = center - window
upper_bound = center + window
indices = np.where((wavelengths2 >= lower_bound) & (wavelengths2 <= upper_bound))[0]
pos = 1 # 以csv文件中那个位置计算可信度
a1 = spectra_data1[pos]['DN'][indices].mean()
a2 = spectra_data2[pos]['DN'][indices].mean()
quality_number = abs(a2 - a1) / a1 * 100
if quality_number <= 2:
return 0
elif quality_number <= 5:
return 1
elif quality_number <= 10:
return 2
else:
return 3
def compute_sif(self, input_csv_dn, rad_folder, sif_folder):
folder_path = os.path.dirname(input_csv_dn)
base_name = os.path.basename(input_csv_dn) # 获取文件名(含扩展名)
name_part, ext = os.path.splitext(base_name) # 拆分文件名和扩展名
parts = name_part.split('_', 1) # 在第一个 _ 处分割
today = datetime.now()
formatted_date = today.strftime("%Y_%m_%d")
new_name = f"{formatted_date}_{parts[1]}{ext}" # 组合新文件名
# 调用丰算法
if os.name == "nt": # Windows
program_path = r"python D:\PycharmProjects\sif\sif_retrieval.py"
standard_sif_path = r"C:\EasySIF\standard_sif.csv"
elif os.name == "posix": # Linux/macOS/Unix-like
program_path = r"python3 /root/sif/feng/sif_retrieval.py"
standard_sif_path = r"/root/sif/feng/standard_sif.csv"
input_path = rad_folder
file_name_tmp = parts[0] + "_" + new_name.split('.')[0]
output_path_3fld = os.path.join(sif_folder, file_name_tmp + "_3fld.csv")
param_3fld = r"[740,780],[756,759],[761,762] P1 3fld"
output_path_sfld = os.path.join(sif_folder, file_name_tmp + "_sfld.csv")
param_sfld = r"[740,780],[756,759] P1 sfld"
output_path_sfm = os.path.join(sif_folder, file_name_tmp + "_sfm.csv")
param_sfm = r" [759,770],760 P1 sfm"
command_str_3fld = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_3fld + " " + param_3fld
command_str_sfld = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfld + " " + param_sfld
command_str_sfm = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfm + " " + param_sfm
return_code = os.system(command_str_3fld)
print(command_str_3fld)
return_code = os.system(command_str_sfld)
return_code = os.system(command_str_sfm)
print(f"命令返回状态码: {return_code}")
return output_path_3fld, output_path_sfld, output_path_sfm
def add_validity_column_to_file(self, file_path, quality_level):
if not os.path.exists(file_path):
return
# 创建临时文件
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, newline='')
try:
with open(file_path, 'r', newline='') as csvfile, temp_file:
reader = csv.reader(csvfile)
writer = csv.writer(temp_file)
# 读取所有行
rows = list(reader)
if len(rows) < 2:
return # 如果行数不足,直接返回
# 添加validity列
rows[0].append('validity')
rows[1].append(quality_level)
# 写入临时文件
writer.writerows(rows)
# 用临时文件替换原始文件
shutil.move(temp_file.name, file_path)
except Exception as e:
# 如果出错,删除临时文件
os.unlink(temp_file.name)
raise e
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="监控文件夹的状态当出现新的csv时提取sif并通过ftp发送。", prog='sif.')
parser.add_argument('-i', '--input_ini', required=True, type=str, help='输入ini配置文件路径。')
parser.add_argument("-v", "--version", action='version', version='%(prog)s 1.0')
# parser.add_argument('-v', '--verbose', action='store_true', help='启用详细模式')
args = parser.parse_args()
ftp_config = load_config(args.input_ini)
event_handler = CSVFileHandler(ftp_config)
observer = Observer()
observer.schedule(event_handler, ftp_config['monitor']['WATCH_DIR'], recursive=True)
observer.start()
print(f"正在监控目录:{ftp_config['monitor']['WATCH_DIR']}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()