Files
ZDXX/2_1banben/app.py

283 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import mimetypes
import logging
from datetime import datetime
import pytz
from flask import Flask, send_from_directory, jsonify
from flask_cors import CORS
from flask_apscheduler import APScheduler
# ==============================================================================
# ✅ 1. 核心模块引用
# ==============================================================================
try:
from config import Config
from extensions import db
from models import Device, DeviceHistory
# 引入核心爬虫调度
from services.core import execute_monitor_task
try:
from services.iot_api import sync_iot_data_service
except ImportError:
sync_iot_data_service = None
try:
from routes.api import api_bp as device_bp
from routes.api import calculate_offset
except ImportError:
# 兜底逻辑,防止缺失 calculate_offset 导致崩溃
def calculate_offset(target_time):
return 0
from routes.api import device_bp
except ImportError as e:
print(f"❌ [启动错误] 模块导入失败: {e}")
sys.exit(1)
# ==============================================================================
# 2. 智能路径配置
# ==============================================================================
RESOURCE_BASE = Config.BASE_DIR
INSTANCE_PATH = Config.INSTANCE_DIR
def find_static_folder(base_path):
"""
全能路径搜寻逻辑,适配 PyInstaller 打包环境
"""
if getattr(sys, 'frozen', False):
if hasattr(sys, '_MEIPASS'):
mei_path = os.path.join(sys._MEIPASS, 'web_dist')
if os.path.exists(os.path.join(mei_path, 'index.html')):
return mei_path
internal_path = os.path.join(base_path, '_internal', 'web_dist')
if os.path.exists(os.path.join(internal_path, 'index.html')):
return internal_path
path = os.path.join(base_path, 'web_dist')
if os.path.exists(os.path.join(path, 'index.html')):
return path
parent_path = os.path.join(os.path.dirname(base_path), 'web_dist')
if os.path.exists(os.path.join(parent_path, 'index.html')):
return parent_path
return path
STATIC_FOLDER = find_static_folder(RESOURCE_BASE)
mimetypes.add_type('application/javascript', '.js')
mimetypes.add_type('text/css', '.css')
# ==============================================================================
# 3. 核心定时任务逻辑 (深度优化版)
# ==============================================================================
def auto_monitor_job(app):
"""
[关键修复]
1. 使用 app.app_context() 确保线程中有 Flask 上下文
2. 使用 db.session.remove() 强制清理旧连接
3. 使用 db.session.merge() 确保对象状态被正确追踪
4. 增加详细日志,对比爬虫返回的数据与入库行为
"""
with app.app_context():
# A. 强制清理会话,确保线程获取的是全新的数据库连接
db.session.remove()
tz = pytz.timezone('Asia/Shanghai')
now_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
print(f"\n{'=' * 50}")
print(f"⏰ [定时任务启动] {now_str}")
if not execute_monitor_task:
print("❌ 错误: execute_monitor_task 未定义")
return
try:
# B. 执行爬虫
task_result = execute_monitor_task()
if not task_result:
print("⚠️ [警告] 爬虫执行完毕,但返回空数据")
return
scraped_list = task_result.get('device_list', [])
print(f"📦 [数据获取] 爬取到 {len(scraped_list)} 条设备数据")
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
stats = {'updated': 0, 'history': 0}
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
# --- 1. 数据解包与默认值处理 ---
# 显式提取,防止 None 覆盖数据库现有的值(如果业务需要)
# 这里假设爬虫返回 None 就是要写入 None或者空字符串
raw_status = item.get('status', '未知')
raw_value = item.get('value', '')
f_count = item.get('num_files', 0)
# 时间处理:必须有时间,否则用当前时间
target_date = item.get('target_time')
if not target_date:
target_date = current_time
raw_json = item.get('raw_json', {})
# [调试日志] 仅打印第一条或特定的设备,防止刷屏,但能帮你确认数据是否为空
# if '0025' in d_name:
# print(f" >>> [写入前检查] {d_name}: Value='{raw_value}' | Files={f_count}")
# --- 2. 数据库操作 (使用 Merge 机制) ---
# 先尝试查询
device = Device.query.filter_by(name=d_name).first()
if not device:
# 如果不存在,新建对象
device = Device(name=d_name, source=item.get('source', '自动爬虫'), install_site="")
db.session.add(device)
db.session.flush() # 立即获取 ID
# 更新字段
device.status = raw_status
device.current_value = raw_value
device.latest_time = target_date
device.check_time = current_time
device.file_count = f_count
# 计算 Offset
try:
device.offset = calculate_offset(target_date)
except:
device.offset = 0
# JSON 数据合并
old_json = {}
try:
if device.json_data:
old_json = json.loads(device.json_data)
except:
old_json = {}
if isinstance(raw_json, dict):
old_json.update(raw_json)
device.json_data = json.dumps(old_json, ensure_ascii=False)
# [核心修复] 使用 merge 告诉 Session "这个对象归你管,请更新它"
# 这能解决后台线程中 "DetachedInstanceError" 或更新丢失的问题
db.session.merge(device)
stats['updated'] += 1
# --- 3. 写入历史记录 ---
history = DeviceHistory(
device_id=device.id,
status=raw_status,
result_data=raw_value,
data_time=target_date,
file_count=f_count,
json_data=device.json_data
)
db.session.add(history)
stats['history'] += 1
# C. 提交事务
db.session.commit()
print(f"✅ [入库成功] 设备更新: {stats['updated']} | 历史追加: {stats['history']}")
except Exception as e:
db.session.rollback()
print(f"❌ [严重异常] 数据写入失败: {e}")
# 打印堆栈以便排查
import traceback
traceback.print_exc()
finally:
# D. 再次清理 Session防止内存泄漏或污染下一次任务
db.session.remove()
print(f"{'=' * 50}\n")
# ==============================================================================
# 4. Flask 应用工厂
# ==============================================================================
def create_app():
print(f"🔍 [前端路径锁定] {STATIC_FOLDER}")
app = Flask(__name__, static_folder=STATIC_FOLDER, instance_path=INSTANCE_PATH)
CORS(app)
if not os.path.exists(app.instance_path):
os.makedirs(app.instance_path, exist_ok=True)
app.config.from_object(Config)
# 初始化 DB
db.init_app(app)
# 初始化调度器
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# --- 添加定时任务 ---
# 注意:这里我们传递 [app] 作为参数,确保 job 函数内能获取到 app 上下文
scheduler.add_job(
id='daily_monitor_task',
func=auto_monitor_job,
args=[app],
trigger='cron',
hour=17,
minute=00,
second=00,
misfire_grace_time=3600,
timezone=pytz.timezone('Asia/Shanghai')
)
print(f"📅 定时任务已锁定: 每天北京时间 17:00 执行")
app.register_blueprint(device_bp)
@app.route('/api/force_run')
def force_run_task():
"""手动触发接口:复用同一个 auto_monitor_job 函数,确保逻辑一致"""
auto_monitor_job(app)
return jsonify({'code': 200, 'msg': '手动触发成功,请查看服务器日志'})
@app.route('/')
def serve_index():
try:
return send_from_directory(app.static_folder, 'index.html')
except Exception:
return "Frontend Error", 404
@app.route('/<path:path>')
def serve_static(path):
if path.startswith('api'):
return jsonify({'code': 404, 'message': 'API endpoint not found'}), 404
file_path = os.path.join(app.static_folder, path)
if os.path.exists(file_path):
return send_from_directory(app.static_folder, path)
return send_from_directory(app.static_folder, 'index.html')
with app.app_context():
db.create_all()
return app
if __name__ == '__main__':
app = create_app()
debug_mode = not getattr(sys, 'frozen', False)
print(f"🚀 服务启动中... 数据库: {app.config['SQLALCHEMY_DATABASE_URI']}")
# 注意use_reloader=False 防止调度器在 Debug 模式下运行两次
app.run(host='0.0.0.0', port=5000, debug=debug_mode, use_reloader=False)