From d15edc0f8b89d5f0f4f8327d977428debdb048ed Mon Sep 17 00:00:00 2001 From: tangchao0503 <735056338@qq.com> Date: Mon, 29 Sep 2025 16:15:49 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=A1=E7=AE=97sif=E5=80=BC=EF=BC=8C?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E5=88=B0=E5=9C=B0=E8=B0=83=E9=99=A2=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=99=A8=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + config.ini | 10 +-- main.py | 194 +++++++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 179 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index 6116fde..b66c384 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ /.idea test_data tmp +result # Byte-compiled / optimized / DLL files diff --git a/config.ini b/config.ini index ecafd76..771a81f 100644 --- a/config.ini +++ b/config.ini @@ -1,9 +1,9 @@ [FTP] -host = 172.16.0.73 -port = 22 -user = ftpuser -password = 123 -target_dir = /home/ftpuser/ +host = 183.207.7.90 +port = 65521 +user = sif +password = groundiot@2025 +target_dir = /eco_monitoring/ground_iot/sif [monitor] WATCH_DIR = D:\PycharmProjects\sif_data_parse\test_data cal_dir = D:\PycharmProjects\sif_data_parse\test_data\cal \ No newline at end of file diff --git a/main.py b/main.py index b0d7ad8..5936f0b 100644 --- a/main.py +++ b/main.py @@ -12,6 +12,10 @@ from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from datetime import datetime +import ssl +from ftplib import FTP_TLS, error_perm + + def parse_sif_csv(_file_path): _metadata = {} @@ -140,24 +144,89 @@ class CSVFileHandler(FileSystemEventHandler): return if event.src_path.lower().endswith('.csv'): file_path = os.path.abspath(event.src_path) - print(f"发现CSV文件: {file_path}") + print(f"发现CSV文件: {file_path}----------------------------------------------------------------------------") # 选择定标文件夹 cal_dir = self.ftp_config['monitor']['cal_dir'] - a=1 if "towersif20" in file_path: cal_dir = os.path.join(cal_dir,"20") elif "towersif21" in file_path: cal_dir = os.path.join(cal_dir,"21") time.sleep(0.1) # 文件一出现就处理文件,偶发permission deny,所以等待100ms - _ = self.process_csv(file_path, cal_dir) - # 为csv添加有效性字段 + rad_folder_tmp, sif_folder, quality_level = self.rad_conversion(file_path, cal_dir) + _ = self.compute_sif(file_path, rad_folder_tmp, sif_folder) for i in _: - self.add_validity_column_to_file(i) + self.add_validity_column_to_file(i, quality_level) - self.send_via_sftp(_) + self.send_via_ftps(_) + + def send_via_ftps(self, file_paths, max_retries=3, retry_delay=5): + retries = 0 + ftps = None + + while retries < max_retries: + try: + print("正在尝试连接 FTPS 服务器...") + + # 建立 FTPS 连接 + ftps = FTP_TLS() + # 忽略自签名证书(你的服务器证书是 self-signed) + ftps.context = ssl._create_unverified_context() + ftps.connect( + host=self.ftp_config['FTP']['host'], + port=int(self.ftp_config['FTP'].get('port', 21)), # 默认 21, 你的可能是 65521 + timeout=30 + ) + + # 登录 + ftps.login( + user=self.ftp_config['FTP']['user'], + passwd=self.ftp_config['FTP']['password'] + ) + print("FTPS 连接成功,准备上传文件...") + + # 切换到安全数据通道 + ftps.prot_p() + + # 检查并切换到目标目录 + remote_dir = self.ftp_config['FTP'].get('target_dir', '.') + try: + ftps.cwd(remote_dir) + except error_perm: + print(f"远程目录不存在,尝试创建: {remote_dir}") + ftps.mkd(remote_dir) + ftps.cwd(remote_dir) + + # 上传多个文件 + success_count = 0 + for file_path in file_paths: + try: + filename = os.path.basename(file_path) + with open(file_path, "rb") as f: + ftps.storbinary(f"STOR {filename}", f) + print(f"✅ 文件上传成功: {filename}") + success_count += 1 + except Exception as e: + print(f"❌ 文件上传失败 {file_path}: {e}") + + return success_count == len(file_paths) # 全部成功返回 True,否则 False + + except Exception as e: + retries += 1 + print(f"❌ FTPS 连接失败(尝试 {retries}/{max_retries}): {e}") + if retries < max_retries: + time.sleep(retry_delay) + finally: + if ftps: + try: + ftps.quit() + except Exception: + ftps.close() + + print(f"❌ 上传失败(已达最大重试次数 {max_retries})") + return False def send_via_sftp(self, file_paths, max_retries=3, retry_delay=5): retries = 0 @@ -228,7 +297,7 @@ class CSVFileHandler(FileSystemEventHandler): print(f"❌ 上传失败(已达最大重试次数 {max_retries})") return False - def process_csv(self, input_csv, input_cal): + def rad_conversion(self, input_csv, input_cal): # 提取文件夹路径 folder_path = os.path.dirname(input_csv) base_name = os.path.basename(input_csv) # 获取文件名(含扩展名) @@ -238,27 +307,44 @@ class CSVFileHandler(FileSystemEventHandler): formatted_date = today.strftime("%Y_%m_%d") new_name = f"{formatted_date}_{parts[1]}{ext}" # 组合新文件名 - # tmp_folder = os.path.join(os.path.dirname(ftp_config['monitor']['WATCH_DIR']), "tmp") - tmp_folder = os.path.join(Path(ftp_config['monitor']['WATCH_DIR']).parent, "tmp") - if not os.path.exists(tmp_folder): - os.makedirs(tmp_folder) - print(f"文件夹已创建: {tmp_folder}") + result_folder = os.path.join(Path(ftp_config['monitor']['WATCH_DIR']).parent, "result") + if not os.path.exists(result_folder): + os.makedirs(result_folder) + print(f"文件夹已创建: {result_folder}") else: - print(f"文件夹已存在: {tmp_folder}") + print(f"文件夹已存在: {result_folder}") - rad_folder = os.path.join(tmp_folder, "rad") - if os.path.exists(rad_folder): - shutil.rmtree(rad_folder) - os.makedirs(rad_folder) + def get_last_two_levels(path): + path_obj = Path(path) + parts = path_obj.parts + # 获取最后两层 + if len(parts) >= 2: + return Path(parts[-2]) / parts[-1] + else: + return path_obj # 如果路径少于两层,返回原路径 - sif_folder = os.path.join(tmp_folder, "sif") + # folder_name = os.path.basename(folder_path) + folder_name = get_last_two_levels(folder_path) + + rad_folder = os.path.join(result_folder, folder_name, "rad") + if not os.path.exists(rad_folder): + os.makedirs(rad_folder) + print(f"文件夹已创建: {rad_folder}") + else: + print(f"文件夹已存在: {rad_folder}") + + sif_folder = os.path.join(result_folder, folder_name, "sif") if not os.path.exists(sif_folder): os.makedirs(sif_folder) print(f"文件夹已创建: {sif_folder}") else: print(f"文件夹已存在: {sif_folder}") - rad_path = os.path.join(rad_folder, new_name) + rad_folder_tmp = os.path.join(result_folder, folder_name, "rad_tmp") + if os.path.exists(rad_folder_tmp): + shutil.rmtree(rad_folder_tmp) + os.makedirs(rad_folder_tmp) + rad_path = os.path.join(rad_folder_tmp, new_name) metadata, wavelengths, spectra_data = parse_sif_csv(input_csv) @@ -272,6 +358,72 @@ class CSVFileHandler(FileSystemEventHandler): spectra_data[i]['RAD'] = spectra_data[i]['DN'] * data_gain_adjust write_file(input_csv, rad_path, spectra_data) + shutil.copy(rad_path, rad_folder) + + # 计算有效性 + rad_file_name = os.path.basename(rad_path) + quality_level = self.compute_quality_level(rad_file_name, rad_folder) + + return rad_folder_tmp, sif_folder, quality_level + + def compute_quality_level(self, file_name, folder_path, center=753, window=3): + all_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')] + + # 处理文件夹为空或只有给定文件的情况 + if not all_files or file_name not in all_files: + return 2 + + def parse_time_from_filename(filename): + # 去掉.csv后缀,然后用下划线分割 + time_str = filename.rsplit('.', 1)[0] + try: + # 将字符串转换为datetime对象 + return datetime.strptime(time_str, '%Y_%m_%d_%H_%M_%S') + except ValueError: + # 如果文件名不符合格式,返回一个遥远的未来时间,确保它被排到最后 + return datetime.max + + sorted_files = sorted(all_files, key=parse_time_from_filename) + + index_of_given = sorted_files.index(file_name) + + if index_of_given==0: + return 2 + + current_file = os.path.join(folder_path, file_name) + metadata1, wavelengths1, spectra_data1 = parse_sif_csv(current_file) + + pre_file = os.path.join(folder_path, sorted_files[index_of_given-1]) + metadata2, wavelengths2, spectra_data2 = parse_sif_csv(pre_file) + + lower_bound = center - window + upper_bound = center + window + + indices = np.where((wavelengths2 >= lower_bound) & (wavelengths2 <= upper_bound))[0] + + pos = 1 # 以csv文件中那个位置计算可信度 + a1 = spectra_data1[pos]['DN'][indices].mean() + a2 = spectra_data2[pos]['DN'][indices].mean() + + quality_number = abs(a2 - a1) / a1 * 100 + + if quality_number <= 2: + return 0 + elif quality_number <= 5: + return 1 + elif quality_number <= 10: + return 2 + else: + return 3 + + def compute_sif(self, input_csv_dn, rad_folder, sif_folder): + folder_path = os.path.dirname(input_csv_dn) + base_name = os.path.basename(input_csv_dn) # 获取文件名(含扩展名) + name_part, ext = os.path.splitext(base_name) # 拆分文件名和扩展名 + parts = name_part.split('_', 1) # 在第一个 _ 处分割 + today = datetime.now() + formatted_date = today.strftime("%Y_%m_%d") + new_name = f"{formatted_date}_{parts[1]}{ext}" # 组合新文件名 # 调用丰算法 if os.name == "nt": # Windows @@ -305,7 +457,7 @@ class CSVFileHandler(FileSystemEventHandler): return output_path_3fld, output_path_sfld, output_path_sfm - def add_validity_column_to_file(self, file_path): + def add_validity_column_to_file(self, file_path, quality_level): # 创建临时文件 temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, newline='') @@ -322,7 +474,7 @@ class CSVFileHandler(FileSystemEventHandler): # 添加validity列 rows[0].append('validity') - rows[1].append('1') + rows[1].append(quality_level) # 写入临时文件 writer.writerows(rows)