Files
sif_didiaoyuan/main.py
tangchao0503 7666368d18 通过配置文件监控目录,当出现csv时做以下操作:
1、转辐亮度;
2、调用丰算法计算sif,输出成csv;
3、将2中的csv上传到ftp服务器;
2025-08-20 14:47:19 +08:00

363 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv, tempfile, os, re
import struct
import time
import numpy as np
import argparse
import paramiko
import shutil
import configparser
from ftplib import FTP
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime
def parse_sif_csv(_file_path):
_metadata = {}
_wavelengths = []
_spectra_data = []
with open(_file_path, encoding='utf-8') as f:
reader = csv.reader(f)
rows = list(reader)
# 第1行除了第一列后续是变量名和值交替出现
row1 = rows[0][1:]
for i in range(0, len(row1), 2):
if i + 1 < len(row1):
_metadata[row1[i]] = row1[i + 1]
# 第2行前一个是变量名后一个是变量值
if len(rows) > 1 and len(rows[1]) >= 2:
_metadata[rows[1][0]] = rows[1][1]
# 第3行除了第一列后续是变量名和值交替出现
row3 = rows[2][1:]
for i in range(0, len(row3), 2):
if i + 1 < len(row3):
_metadata[row3[i]] = row3[i + 1]
# 第4行是波长
_wavelengths = np.array([float(w) for w in rows[3][1:]])
# 第5行忽略
# 从第6行开始是光谱数据
for row in rows[5:]:
if len(row) < 4 or row[1].lower() != "valid":
continue # 跳过表头或无效行
try:
entry = {
"Location": row[0],
"Valid": row[1],
"Integration": int(row[2]),
"DN": np.array([float(val) for val in row[3:]])
}
_spectra_data.append(entry)
except ValueError:
continue # 跳过不能解析的行
return _metadata, _wavelengths, _spectra_data
def read_cal(_file_path):
# 定义结构体格式
# unsigned int (4) + float (4) + int (4) + 4096 floats (4 each) + 4096 doubles (8 each) + 4096 doubles (8 each)
fmt = '<I f i ' + '4096f' + '4096d' + '4096d' # 小端
# 计算总字节大小
struct_size = struct.calcsize(fmt)
with open(_file_path, 'rb') as f:
data = f.read(struct_size)
unpacked = struct.unpack(fmt, data)
# 拆分数据
_uiExposureTimeInMS = unpacked[0]
_fTemperature = unpacked[1]
_iPixels = unpacked[2]
_fWaveLength = np.array(unpacked[3:3+4096])[0:_iPixels]
_dCal_Gain = np.array(unpacked[3+4096 : 3+4096+4096])[0:_iPixels]
_dCal_Offset = np.array(unpacked[3+4096+4096:])[0:_iPixels]
return _uiExposureTimeInMS,_fTemperature,_iPixels,_fWaveLength,_dCal_Gain,_dCal_Offset
def write_file(_in_path, _out_path, _spectra_data):
with open(_in_path, encoding='utf-8') as f:
reader = csv.reader(f)
rows = list(reader)
with open(_out_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 写入前4行原样元数据和波长信息
writer.writerow(rows[0])
writer.writerow(rows[1])
writer.writerow(rows[2])
writer.writerow(rows[3])
writer.writerow(rows[4]) # 第5行照抄
# 写入处理后的光谱数据
for entry in _spectra_data:
row = [entry["Location"], entry["Valid"], entry["Integration"]] + list(entry["RAD"])
writer.writerow(row)
def get_sorted_files_by_number(folder_path):
# 获取文件夹下所有文件和子文件夹
all_items = os.listdir(folder_path)
# 过滤出文件(排除文件夹),并转换为绝对路径
files = [
os.path.abspath(os.path.join(folder_path, item))
for item in all_items
if os.path.isfile(os.path.join(folder_path, item))
]
# 定义一个函数来提取文件名中的数字(仅文件名部分)
def extract_numbers(filepath):
filename = os.path.basename(filepath) # 获取文件名(不含路径)
numbers = re.findall(r'\d+', filename) # 提取数字
return [int(num) for num in numbers] # 转为整数
# 按文件名中的数字排序文件
sorted_files = sorted(files, key=lambda x: extract_numbers(x))
return sorted_files
# 配置读取函数
def load_config(config_path='config.ini'):
config = configparser.ConfigParser()
config.read(config_path)
return config
class CSVFileHandler(FileSystemEventHandler):
def __init__(self, ftp_config):
super().__init__()
self.ftp_config = ftp_config
def on_created(self, event):
if event.is_directory:
return
if event.src_path.lower().endswith('.csv'):
file_path = os.path.abspath(event.src_path)
print(f"发现CSV文件: {file_path}")
# 选择定标文件夹
cal_dir = self.ftp_config['monitor']['cal_dir']
a=1
if "towersif20" in file_path:
cal_dir = os.path.join(cal_dir,"20")
elif "towersif21" in file_path:
cal_dir = os.path.join(cal_dir,"21")
time.sleep(0.1) # 文件一出现就处理文件偶发permission deny所以等待100ms
_ = self.process_csv(file_path, cal_dir)
# 为csv添加有效性字段
for i in _:
self.add_validity_column_to_file(i)
self.send_via_sftp(_)
def send_via_sftp(self, file_paths, max_retries=3, retry_delay=5):
retries = 0
ssh = None
sftp = None
while retries < max_retries:
try:
print("正在尝试连接 SFTP 服务器...")
# 创建 SSH 客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 生产环境建议更安全的方式
# 连接参数(参考 FileZilla 的设置)
ssh.connect(
hostname=self.ftp_config['FTP']['host'],
port=int(self.ftp_config['FTP'].get('port', 22)),
username=self.ftp_config['FTP']['user'],
password=self.ftp_config['FTP']['password'],
timeout=30,
allow_agent=False, # 禁用 ssh-agent避免 'lost ssh-agent' 错误)
look_for_keys=False, # 不自动查找密钥(强制使用密码认证)
)
print("SFTP 连接成功,准备上传文件...")
sftp = ssh.open_sftp()
# 检查并切换到目标目录
remote_dir = self.ftp_config['FTP'].get('target_dir', '.')
try:
sftp.chdir(remote_dir) # 尝试进入目录
except IOError:
print(f"远程目录不存在,尝试创建: {remote_dir}")
sftp.mkdir(remote_dir) # 尝试创建目录
sftp.chdir(remote_dir)
# 上传多个文件
success_count = 0
for file_path in file_paths:
try:
filename = os.path.basename(file_path)
sftp.put(file_path, filename)
print(f"✅ 文件上传成功: {filename}")
success_count += 1
except Exception as e:
print(f"❌ 文件上传失败 {file_path}: {e}")
return success_count == len(file_paths) # 全部成功返回True否则False
except paramiko.AuthenticationException as e:
print(f"❌ 认证失败: {e}")
return False
except paramiko.SSHException as e:
retries += 1
print(f"❌ SFTP 连接失败(尝试 {retries}/{max_retries}: {e}")
if retries < max_retries:
time.sleep(retry_delay)
except Exception as e:
print(f"❌ 未知错误: {e}")
return False
finally:
if sftp:
sftp.close()
if ssh:
ssh.close()
print(f"❌ 上传失败(已达最大重试次数 {max_retries}")
return False
def process_csv(self, input_csv, input_cal):
# 提取文件夹路径
folder_path = os.path.dirname(input_csv)
base_name = os.path.basename(input_csv) # 获取文件名(含扩展名)
name_part, ext = os.path.splitext(base_name) # 拆分文件名和扩展名
parts = name_part.split('_', 1) # 在第一个 _ 处分割
today = datetime.now()
formatted_date = today.strftime("%Y_%m_%d")
new_name = f"{formatted_date}_{parts[1]}{ext}" # 组合新文件名
# tmp_folder = os.path.join(os.path.dirname(ftp_config['monitor']['WATCH_DIR']), "tmp")
tmp_folder = os.path.join(Path(ftp_config['monitor']['WATCH_DIR']).parent, "tmp")
if not os.path.exists(tmp_folder):
os.makedirs(tmp_folder)
print(f"文件夹已创建: {tmp_folder}")
else:
print(f"文件夹已存在: {tmp_folder}")
rad_folder = os.path.join(tmp_folder, "rad")
if os.path.exists(rad_folder):
shutil.rmtree(rad_folder)
os.makedirs(rad_folder)
sif_folder = os.path.join(tmp_folder, "sif")
if not os.path.exists(sif_folder):
os.makedirs(sif_folder)
print(f"文件夹已创建: {sif_folder}")
else:
print(f"文件夹已存在: {sif_folder}")
rad_path = os.path.join(rad_folder, new_name)
metadata, wavelengths, spectra_data = parse_sif_csv(input_csv)
sorted_cal_files_path = get_sorted_files_by_number(input_cal)
for i in range(len(spectra_data)):
uiExposureTimeInMS, fTemperature, iPixels, fWaveLength, dCal_Gain, dCal_Offset = read_cal(
sorted_cal_files_path[i])
gain_scale = uiExposureTimeInMS / spectra_data[i]['Integration']
data_gain_adjust = dCal_Gain * gain_scale
spectra_data[i]['RAD'] = spectra_data[i]['DN'] * data_gain_adjust
write_file(input_csv, rad_path, spectra_data)
# 调用丰算法
if os.name == "nt": # Windows
program_path = r"python D:\PycharmProjects\sif\sif_retrieval.py"
standard_sif_path = r"C:\EasySIF\standard_sif.csv"
elif os.name == "posix": # Linux/macOS/Unix-like
program_path = r"python3 /root/sif/feng/sif_retrieval.py"
standard_sif_path = r"/root/sif/feng/standard_sif.csv"
input_path = rad_folder
file_name_tmp = parts[0] + "_" + new_name.split('.')[0]
output_path_3fld = os.path.join(sif_folder, file_name_tmp + "_3fld.csv")
param_3fld = r"[740,780],[756,759],[761,762] P1 3fld"
output_path_sfld = os.path.join(sif_folder, file_name_tmp + "_sfld.csv")
param_sfld = r"[740,780],[756,759] P1 sfld"
output_path_sfm = os.path.join(sif_folder, file_name_tmp + "_sfm.csv")
param_sfm = r" [759,770],760 P1 sfm"
command_str_3fld = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_3fld + " " + param_3fld
command_str_sfld = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfld + " " + param_sfld
command_str_sfm = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfm + " " + param_sfm
return_code = os.system(command_str_3fld)
return_code = os.system(command_str_sfld)
return_code = os.system(command_str_sfm)
print(f"命令返回状态码: {return_code}")
return output_path_3fld, output_path_sfld, output_path_sfm
def add_validity_column_to_file(self, file_path):
# 创建临时文件
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, newline='')
try:
with open(file_path, 'r', newline='') as csvfile, temp_file:
reader = csv.reader(csvfile)
writer = csv.writer(temp_file)
# 读取所有行
rows = list(reader)
if len(rows) < 2:
return # 如果行数不足,直接返回
# 添加validity列
rows[0].append('validity')
rows[1].append('1')
# 写入临时文件
writer.writerows(rows)
# 用临时文件替换原始文件
shutil.move(temp_file.name, file_path)
except Exception as e:
# 如果出错,删除临时文件
os.unlink(temp_file.name)
raise e
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="监控文件夹的状态当出现新的csv时提取sif并通过ftp发送。", prog='sif.')
parser.add_argument('-i', '--input_ini', required=True, type=str, help='输入ini配置文件路径。')
parser.add_argument("-v", "--version", action='version', version='%(prog)s 1.0')
# parser.add_argument('-v', '--verbose', action='store_true', help='启用详细模式')
args = parser.parse_args()
ftp_config = load_config(args.input_ini)
event_handler = CSVFileHandler(ftp_config)
observer = Observer()
observer.schedule(event_handler, ftp_config['monitor']['WATCH_DIR'], recursive=True)
observer.start()
print(f"正在监控目录:{ftp_config['monitor']['WATCH_DIR']}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()