import os import sys import json import mimetypes import logging from datetime import datetime import pytz from flask import Flask, send_from_directory, jsonify from flask_cors import CORS from flask_apscheduler import APScheduler # ============================================================================== # ✅ 1. 核心模块引用 # ============================================================================== try: from config import Config from extensions import db from models import Device, DeviceHistory # 引入核心爬虫调度 from services.core import execute_monitor_task try: from services.iot_api import sync_iot_data_service except ImportError: sync_iot_data_service = None try: from routes.api import api_bp as device_bp from routes.api import calculate_offset except ImportError: # 兜底逻辑,防止缺失 calculate_offset 导致崩溃 def calculate_offset(target_time): return 0 from routes.api import device_bp except ImportError as e: print(f"❌ [启动错误] 模块导入失败: {e}") sys.exit(1) # ============================================================================== # 2. 智能路径配置 # ============================================================================== RESOURCE_BASE = Config.BASE_DIR INSTANCE_PATH = Config.INSTANCE_DIR def find_static_folder(base_path): """ 全能路径搜寻逻辑,适配 PyInstaller 打包环境 """ if getattr(sys, 'frozen', False): if hasattr(sys, '_MEIPASS'): mei_path = os.path.join(sys._MEIPASS, 'web_dist') if os.path.exists(os.path.join(mei_path, 'index.html')): return mei_path internal_path = os.path.join(base_path, '_internal', 'web_dist') if os.path.exists(os.path.join(internal_path, 'index.html')): return internal_path path = os.path.join(base_path, 'web_dist') if os.path.exists(os.path.join(path, 'index.html')): return path parent_path = os.path.join(os.path.dirname(base_path), 'web_dist') if os.path.exists(os.path.join(parent_path, 'index.html')): return parent_path return path STATIC_FOLDER = find_static_folder(RESOURCE_BASE) mimetypes.add_type('application/javascript', '.js') mimetypes.add_type('text/css', '.css') # ============================================================================== # 3. 核心定时任务逻辑 (深度优化版) # ============================================================================== def auto_monitor_job(app): """ [关键修复] 1. 使用 app.app_context() 确保线程中有 Flask 上下文 2. 使用 db.session.remove() 强制清理旧连接 3. 使用 db.session.merge() 确保对象状态被正确追踪 4. 增加详细日志,对比爬虫返回的数据与入库行为 """ with app.app_context(): # A. 强制清理会话,确保线程获取的是全新的数据库连接 db.session.remove() tz = pytz.timezone('Asia/Shanghai') now_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S') print(f"\n{'=' * 50}") print(f"⏰ [定时任务启动] {now_str}") if not execute_monitor_task: print("❌ 错误: execute_monitor_task 未定义") return try: # B. 执行爬虫 task_result = execute_monitor_task() if not task_result: print("⚠️ [警告] 爬虫执行完毕,但返回空数据") return scraped_list = task_result.get('device_list', []) print(f"📦 [数据获取] 爬取到 {len(scraped_list)} 条设备数据") current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") stats = {'updated': 0, 'history': 0} for item in scraped_list: d_name = item.get('name') if not d_name: continue # --- 1. 数据解包与默认值处理 --- # 显式提取,防止 None 覆盖数据库现有的值(如果业务需要) # 这里假设爬虫返回 None 就是要写入 None,或者空字符串 raw_status = item.get('status', '未知') raw_value = item.get('value', '') f_count = item.get('num_files', 0) # 时间处理:必须有时间,否则用当前时间 target_date = item.get('target_time') if not target_date: target_date = current_time raw_json = item.get('raw_json', {}) # [调试日志] 仅打印第一条或特定的设备,防止刷屏,但能帮你确认数据是否为空 # if '0025' in d_name: # print(f" >>> [写入前检查] {d_name}: Value='{raw_value}' | Files={f_count}") # --- 2. 数据库操作 (使用 Merge 机制) --- # 先尝试查询 device = Device.query.filter_by(name=d_name).first() if not device: # 如果不存在,新建对象 device = Device(name=d_name, source=item.get('source', '自动爬虫'), install_site="") db.session.add(device) db.session.flush() # 立即获取 ID # 更新字段 device.status = raw_status device.current_value = raw_value device.latest_time = target_date device.check_time = current_time device.file_count = f_count # 计算 Offset try: device.offset = calculate_offset(target_date) except: device.offset = 0 # JSON 数据合并 old_json = {} try: if device.json_data: old_json = json.loads(device.json_data) except: old_json = {} if isinstance(raw_json, dict): old_json.update(raw_json) device.json_data = json.dumps(old_json, ensure_ascii=False) # [核心修复] 使用 merge 告诉 Session "这个对象归你管,请更新它" # 这能解决后台线程中 "DetachedInstanceError" 或更新丢失的问题 db.session.merge(device) stats['updated'] += 1 # --- 3. 写入历史记录 --- history = DeviceHistory( device_id=device.id, status=raw_status, result_data=raw_value, data_time=target_date, file_count=f_count, json_data=device.json_data ) db.session.add(history) stats['history'] += 1 # C. 提交事务 db.session.commit() print(f"✅ [入库成功] 设备更新: {stats['updated']} | 历史追加: {stats['history']}") except Exception as e: db.session.rollback() print(f"❌ [严重异常] 数据写入失败: {e}") # 打印堆栈以便排查 import traceback traceback.print_exc() finally: # D. 再次清理 Session,防止内存泄漏或污染下一次任务 db.session.remove() print(f"{'=' * 50}\n") # ============================================================================== # 4. Flask 应用工厂 # ============================================================================== def create_app(): print(f"🔍 [前端路径锁定] {STATIC_FOLDER}") app = Flask(__name__, static_folder=STATIC_FOLDER, instance_path=INSTANCE_PATH) CORS(app) if not os.path.exists(app.instance_path): os.makedirs(app.instance_path, exist_ok=True) app.config.from_object(Config) # 初始化 DB db.init_app(app) # 初始化调度器 scheduler = APScheduler() scheduler.init_app(app) scheduler.start() # --- 添加定时任务 --- # 注意:这里我们传递 [app] 作为参数,确保 job 函数内能获取到 app 上下文 scheduler.add_job( id='daily_monitor_task', func=auto_monitor_job, args=[app], trigger='cron', hour=17, minute=00, second=00, misfire_grace_time=3600, timezone=pytz.timezone('Asia/Shanghai') ) print(f"📅 定时任务已锁定: 每天北京时间 17:00 执行") app.register_blueprint(device_bp) @app.route('/api/force_run') def force_run_task(): """手动触发接口:复用同一个 auto_monitor_job 函数,确保逻辑一致""" auto_monitor_job(app) return jsonify({'code': 200, 'msg': '手动触发成功,请查看服务器日志'}) @app.route('/') def serve_index(): try: return send_from_directory(app.static_folder, 'index.html') except Exception: return "Frontend Error", 404 @app.route('/') def serve_static(path): if path.startswith('api'): return jsonify({'code': 404, 'message': 'API endpoint not found'}), 404 file_path = os.path.join(app.static_folder, path) if os.path.exists(file_path): return send_from_directory(app.static_folder, path) return send_from_directory(app.static_folder, 'index.html') with app.app_context(): db.create_all() return app if __name__ == '__main__': app = create_app() debug_mode = not getattr(sys, 'frozen', False) print(f"🚀 服务启动中... 数据库: {app.config['SQLALCHEMY_DATABASE_URI']}") # 注意:use_reloader=False 防止调度器在 Debug 模式下运行两次 app.run(host='0.0.0.0', port=5000, debug=debug_mode, use_reloader=False)