import csv, tempfile, os, re import struct import time import numpy as np import argparse import paramiko import shutil import configparser from ftplib import FTP from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from datetime import datetime import ssl from ftplib import FTP_TLS, error_perm def parse_sif_csv(_file_path): _metadata = {} _wavelengths = [] _spectra_data = [] with open(_file_path, encoding='utf-8') as f: reader = csv.reader(f) rows = list(reader) # 第1行:除了第一列,后续是变量名和值交替出现 row1 = rows[0][1:] for i in range(0, len(row1), 2): if i + 1 < len(row1): _metadata[row1[i]] = row1[i + 1] # 第2行:前一个是变量名,后一个是变量值 if len(rows) > 1 and len(rows[1]) >= 2: _metadata[rows[1][0]] = rows[1][1] # 第3行:除了第一列,后续是变量名和值交替出现 row3 = rows[2][1:] for i in range(0, len(row3), 2): if i + 1 < len(row3): _metadata[row3[i]] = row3[i + 1] # 第4行是波长 _wavelengths = np.array([float(w) for w in rows[3][1:]]) # 第5行忽略 # 从第6行开始是光谱数据 for row in rows[5:]: if len(row) < 4 or row[1].lower() != "valid": continue # 跳过表头或无效行 try: entry = { "Location": row[0], "Valid": row[1], "Integration": int(row[2]), "DN": np.array([float(val) for val in row[3:]]) } _spectra_data.append(entry) except ValueError: continue # 跳过不能解析的行 return _metadata, _wavelengths, _spectra_data def read_cal(_file_path): # 定义结构体格式 # unsigned int (4) + float (4) + int (4) + 4096 floats (4 each) + 4096 doubles (8 each) + 4096 doubles (8 each) fmt = '= 2: return Path(parts[-2]) / parts[-1] else: return path_obj # 如果路径少于两层,返回原路径 # folder_name = os.path.basename(folder_path) folder_name = get_last_two_levels(folder_path) rad_folder = os.path.join(result_folder, folder_name, "rad") if not os.path.exists(rad_folder): os.makedirs(rad_folder) print(f"文件夹已创建: {rad_folder}") else: print(f"文件夹已存在: {rad_folder}") sif_folder = os.path.join(result_folder, folder_name, "sif") if not os.path.exists(sif_folder): os.makedirs(sif_folder) print(f"文件夹已创建: {sif_folder}") else: print(f"文件夹已存在: {sif_folder}") rad_folder_tmp = os.path.join(result_folder, folder_name, "rad_tmp") if os.path.exists(rad_folder_tmp): shutil.rmtree(rad_folder_tmp) os.makedirs(rad_folder_tmp) rad_path = os.path.join(rad_folder_tmp, new_name) metadata, wavelengths, spectra_data = parse_sif_csv(input_csv) sorted_cal_files_path = get_sorted_files_by_number(input_cal) for i in range(len(spectra_data)): uiExposureTimeInMS, fTemperature, iPixels, fWaveLength, dCal_Gain, dCal_Offset = read_cal( sorted_cal_files_path[i]) gain_scale = uiExposureTimeInMS / spectra_data[i]['Integration'] data_gain_adjust = dCal_Gain * gain_scale spectra_data[i]['RAD'] = spectra_data[i]['DN'] * data_gain_adjust write_file(input_csv, rad_path, spectra_data) shutil.copy(rad_path, rad_folder) # 计算有效性 rad_file_name = os.path.basename(rad_path) quality_level = self.compute_quality_level(rad_file_name, rad_folder) return rad_folder_tmp, sif_folder, quality_level def compute_quality_level(self, file_name, folder_path, center=753, window=3): all_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')] # 处理文件夹为空或只有给定文件的情况 if not all_files or file_name not in all_files: return 2 def parse_time_from_filename(filename): # 去掉.csv后缀,然后用下划线分割 time_str = filename.rsplit('.', 1)[0] try: # 将字符串转换为datetime对象 return datetime.strptime(time_str, '%Y_%m_%d_%H_%M_%S') except ValueError: # 如果文件名不符合格式,返回一个遥远的未来时间,确保它被排到最后 return datetime.max sorted_files = sorted(all_files, key=parse_time_from_filename) index_of_given = sorted_files.index(file_name) if index_of_given==0: return 2 current_file = os.path.join(folder_path, file_name) metadata1, wavelengths1, spectra_data1 = parse_sif_csv(current_file) pre_file = os.path.join(folder_path, sorted_files[index_of_given-1]) metadata2, wavelengths2, spectra_data2 = parse_sif_csv(pre_file) lower_bound = center - window upper_bound = center + window indices = np.where((wavelengths2 >= lower_bound) & (wavelengths2 <= upper_bound))[0] pos = 1 # 以csv文件中那个位置计算可信度 a1 = spectra_data1[pos]['DN'][indices].mean() a2 = spectra_data2[pos]['DN'][indices].mean() quality_number = abs(a2 - a1) / a1 * 100 if quality_number <= 2: return 0 elif quality_number <= 5: return 1 elif quality_number <= 10: return 2 else: return 3 def compute_sif(self, input_csv_dn, rad_folder, sif_folder): folder_path = os.path.dirname(input_csv_dn) base_name = os.path.basename(input_csv_dn) # 获取文件名(含扩展名) name_part, ext = os.path.splitext(base_name) # 拆分文件名和扩展名 parts = name_part.split('_', 1) # 在第一个 _ 处分割 today = datetime.now() formatted_date = today.strftime("%Y_%m_%d") new_name = f"{formatted_date}_{parts[1]}{ext}" # 组合新文件名 # 调用丰算法 if os.name == "nt": # Windows program_path = r"python D:\PycharmProjects\sif\sif_retrieval.py" standard_sif_path = r"C:\EasySIF\standard_sif.csv" elif os.name == "posix": # Linux/macOS/Unix-like program_path = r"python3 /root/sif/feng/sif_retrieval.py" standard_sif_path = r"/root/sif/feng/standard_sif.csv" input_path = rad_folder file_name_tmp = parts[0] + "_" + new_name.split('.')[0] output_path_3fld = os.path.join(sif_folder, file_name_tmp + "_3fld.csv") param_3fld = r"[740,780],[756,759],[761,762] P1 3fld" output_path_sfld = os.path.join(sif_folder, file_name_tmp + "_sfld.csv") param_sfld = r"[740,780],[756,759] P1 sfld" output_path_sfm = os.path.join(sif_folder, file_name_tmp + "_sfm.csv") param_sfm = r" [759,770],760 P1 sfm" command_str_3fld = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_3fld + " " + param_3fld command_str_sfld = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfld + " " + param_sfld command_str_sfm = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfm + " " + param_sfm return_code = os.system(command_str_3fld) return_code = os.system(command_str_sfld) return_code = os.system(command_str_sfm) print(f"命令返回状态码: {return_code}") return output_path_3fld, output_path_sfld, output_path_sfm def add_validity_column_to_file(self, file_path, quality_level): # 创建临时文件 temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, newline='') try: with open(file_path, 'r', newline='') as csvfile, temp_file: reader = csv.reader(csvfile) writer = csv.writer(temp_file) # 读取所有行 rows = list(reader) if len(rows) < 2: return # 如果行数不足,直接返回 # 添加validity列 rows[0].append('validity') rows[1].append(quality_level) # 写入临时文件 writer.writerows(rows) # 用临时文件替换原始文件 shutil.move(temp_file.name, file_path) except Exception as e: # 如果出错,删除临时文件 os.unlink(temp_file.name) raise e # Press the green button in the gutter to run the script. if __name__ == '__main__': parser = argparse.ArgumentParser(description="监控文件夹的状态,当出现新的csv时,提取sif,并通过ftp发送。", prog='sif.') parser.add_argument('-i', '--input_ini', required=True, type=str, help='输入ini配置文件路径。') parser.add_argument("-v", "--version", action='version', version='%(prog)s 1.0') # parser.add_argument('-v', '--verbose', action='store_true', help='启用详细模式') args = parser.parse_args() ftp_config = load_config(args.input_ini) event_handler = CSVFileHandler(ftp_config) observer = Observer() observer.schedule(event_handler, ftp_config['monitor']['WATCH_DIR'], recursive=True) observer.start() print(f"正在监控目录:{ftp_config['monitor']['WATCH_DIR']}") try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()