Files
sif_didiaoyuan/upload_file_to_ftp.py
tangchao0503 dcd9c76ddd 1、错误控制:文件不存在、sif计算失败;
2、监控某一目录,并将新生成的csv文件上传到ftp服务器;
2025-10-11 15:11:48 +08:00

131 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv, tempfile, os, re
import struct
import time
import numpy as np
import argparse
import paramiko
import shutil
import configparser
from ftplib import FTP
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime
import ssl
from ftplib import FTP_TLS, error_perm
def load_config(config_path='config.ini'):
config = configparser.ConfigParser()
config.read(config_path)
return config
class CSVFileHandler(FileSystemEventHandler):
def __init__(self, ftp_config):
super().__init__()
self.ftp_config = ftp_config
def on_created(self, event):
if event.is_directory:
return
if event.src_path.lower().endswith('.csv'):
file_path = os.path.abspath(event.src_path)
print(f"发现CSV文件: {file_path}----------------------------------------------------------------------------")
time.sleep(0.1) # 文件一出现就处理文件偶发permission deny所以等待100ms
self.send_via_ftps(file_path)
def send_via_ftps(self, file_path, max_retries=3, retry_delay=5):
retries = 0
ftps = None
while retries < max_retries:
try:
print("正在尝试连接 FTPS 服务器...")
# 建立 FTPS 连接
ftps = FTP_TLS()
# 忽略自签名证书(你的服务器证书是 self-signed
ftps.context = ssl._create_unverified_context()
ftps.connect(
host=self.ftp_config['FTP']['host'],
port=int(self.ftp_config['FTP'].get('port', 21)), # 默认 21, 你的可能是 65521
timeout=30
)
# 登录
ftps.login(
user=self.ftp_config['FTP']['user'],
passwd=self.ftp_config['FTP']['password']
)
print("FTPS 连接成功,准备上传文件...")
# 切换到安全数据通道
ftps.prot_p()
# 检查并切换到目标目录
remote_dir = self.ftp_config['FTP'].get('target_dir', '.')
try:
ftps.cwd(remote_dir)
except error_perm:
print(f"远程目录不存在,尝试创建: {remote_dir}")
ftps.mkd(remote_dir)
ftps.cwd(remote_dir)
# 上传多个文件
success_count = 0
if not os.path.exists(file_path):
continue
try:
filename = os.path.basename(file_path)
with open(file_path, "rb") as f:
ftps.storbinary(f"STOR {filename}", f)
print(f"✅ 文件上传成功: {filename}")
success_count += 1
except Exception as e:
print(f"❌ 文件上传失败 {file_path}: {e}")
return True
except Exception as e:
retries += 1
print(f"❌ FTPS 连接失败(尝试 {retries}/{max_retries}: {e}")
if retries < max_retries:
time.sleep(retry_delay)
finally:
if ftps:
try:
ftps.quit()
except Exception:
ftps.close()
print(f"❌ 上传失败(已达最大重试次数 {max_retries}")
return False
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="监控文件夹的状态当出现新的csv时提取sif并通过ftp发送。", prog='sif.')
parser.add_argument('-i', '--input_ini', required=True, type=str, help='输入ini配置文件路径。')
parser.add_argument("-v", "--version", action='version', version='%(prog)s 1.0')
# parser.add_argument('-v', '--verbose', action='store_true', help='启用详细模式')
args = parser.parse_args()
ftp_config = load_config(args.input_ini)
event_handler = CSVFileHandler(ftp_config)
observer = Observer()
observer.schedule(event_handler, ftp_config['monitor']['WATCH_DIR'], recursive=True)
observer.start()
print(f"正在监控目录:{ftp_config['monitor']['WATCH_DIR']}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()