1、错误控制:文件不存在、sif计算失败;

2、监控某一目录,并将新生成的csv文件上传到ftp服务器;
This commit is contained in:
tangchao0503
2025-10-11 15:11:48 +08:00
parent d15edc0f8b
commit dcd9c76ddd
2 changed files with 136 additions and 0 deletions

View File

@ -202,6 +202,8 @@ class CSVFileHandler(FileSystemEventHandler):
# 上传多个文件
success_count = 0
for file_path in file_paths:
if not os.path.exists(file_path):
continue
try:
filename = os.path.basename(file_path)
with open(file_path, "rb") as f:
@ -451,6 +453,7 @@ class CSVFileHandler(FileSystemEventHandler):
command_str_sfm = program_path + " " + standard_sif_path + " " + input_path + " " + output_path_sfm + " " + param_sfm
return_code = os.system(command_str_3fld)
print(command_str_3fld)
return_code = os.system(command_str_sfld)
return_code = os.system(command_str_sfm)
print(f"命令返回状态码: {return_code}")
@ -458,6 +461,9 @@ class CSVFileHandler(FileSystemEventHandler):
return output_path_3fld, output_path_sfld, output_path_sfm
def add_validity_column_to_file(self, file_path, quality_level):
if not os.path.exists(file_path):
return
# 创建临时文件
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, newline='')

130
upload_file_to_ftp.py Normal file
View File

@ -0,0 +1,130 @@
import csv, tempfile, os, re
import struct
import time
import numpy as np
import argparse
import paramiko
import shutil
import configparser
from ftplib import FTP
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime
import ssl
from ftplib import FTP_TLS, error_perm
def load_config(config_path='config.ini'):
config = configparser.ConfigParser()
config.read(config_path)
return config
class CSVFileHandler(FileSystemEventHandler):
def __init__(self, ftp_config):
super().__init__()
self.ftp_config = ftp_config
def on_created(self, event):
if event.is_directory:
return
if event.src_path.lower().endswith('.csv'):
file_path = os.path.abspath(event.src_path)
print(f"发现CSV文件: {file_path}----------------------------------------------------------------------------")
time.sleep(0.1) # 文件一出现就处理文件偶发permission deny所以等待100ms
self.send_via_ftps(file_path)
def send_via_ftps(self, file_path, max_retries=3, retry_delay=5):
retries = 0
ftps = None
while retries < max_retries:
try:
print("正在尝试连接 FTPS 服务器...")
# 建立 FTPS 连接
ftps = FTP_TLS()
# 忽略自签名证书(你的服务器证书是 self-signed
ftps.context = ssl._create_unverified_context()
ftps.connect(
host=self.ftp_config['FTP']['host'],
port=int(self.ftp_config['FTP'].get('port', 21)), # 默认 21, 你的可能是 65521
timeout=30
)
# 登录
ftps.login(
user=self.ftp_config['FTP']['user'],
passwd=self.ftp_config['FTP']['password']
)
print("FTPS 连接成功,准备上传文件...")
# 切换到安全数据通道
ftps.prot_p()
# 检查并切换到目标目录
remote_dir = self.ftp_config['FTP'].get('target_dir', '.')
try:
ftps.cwd(remote_dir)
except error_perm:
print(f"远程目录不存在,尝试创建: {remote_dir}")
ftps.mkd(remote_dir)
ftps.cwd(remote_dir)
# 上传多个文件
success_count = 0
if not os.path.exists(file_path):
continue
try:
filename = os.path.basename(file_path)
with open(file_path, "rb") as f:
ftps.storbinary(f"STOR {filename}", f)
print(f"✅ 文件上传成功: {filename}")
success_count += 1
except Exception as e:
print(f"❌ 文件上传失败 {file_path}: {e}")
return True
except Exception as e:
retries += 1
print(f"❌ FTPS 连接失败(尝试 {retries}/{max_retries}: {e}")
if retries < max_retries:
time.sleep(retry_delay)
finally:
if ftps:
try:
ftps.quit()
except Exception:
ftps.close()
print(f"❌ 上传失败(已达最大重试次数 {max_retries}")
return False
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="监控文件夹的状态当出现新的csv时提取sif并通过ftp发送。", prog='sif.')
parser.add_argument('-i', '--input_ini', required=True, type=str, help='输入ini配置文件路径。')
parser.add_argument("-v", "--version", action='version', version='%(prog)s 1.0')
# parser.add_argument('-v', '--verbose', action='store_true', help='启用详细模式')
args = parser.parse_args()
ftp_config = load_config(args.input_ini)
event_handler = CSVFileHandler(ftp_config)
observer = Observer()
observer.schedule(event_handler, ftp_config['monitor']['WATCH_DIR'], recursive=True)
observer.start()
print(f"正在监控目录:{ftp_config['monitor']['WATCH_DIR']}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()