Files
ZDXX/2_1banben/app.py
2026-02-03 17:40:36 +08:00

241 lines
8.4 KiB
Python

import os
import sys
import json
import mimetypes
import logging
from datetime import datetime
import pytz # ✅ 必须引入:用于强制指定北京时间
from flask import Flask, send_from_directory, jsonify
from flask_cors import CORS
from flask_apscheduler import APScheduler
# ==============================================================================
# ✅ 1. 核心模块引用
# ==============================================================================
try:
from config import Config
from extensions import db
from models import Device, DeviceHistory
from services.core import execute_monitor_task
# from services.iot_api import sync_iot_data_service # 如果不需要IoT可以注释
try:
from routes.api import api_bp as device_bp
from routes.api import calculate_offset
except ImportError:
from routes.api import device_bp, calculate_offset
except ImportError as e:
print(f"❌ 严重错误: 模块导入失败。详细信息: {e}")
sys.exit(1)
# ==============================================================================
# 2. 路径配置
# ==============================================================================
def get_paths():
if getattr(sys, 'frozen', False):
resource_base = sys._MEIPASS
data_base = os.path.dirname(sys.executable)
else:
base = os.path.abspath(os.path.dirname(__file__))
resource_base = base
data_base = base
return resource_base, data_base
RESOURCE_BASE, DATA_BASE = get_paths()
STATIC_FOLDER = os.path.join(RESOURCE_BASE, 'web_dist')
INSTANCE_PATH = os.path.join(DATA_BASE, 'instance')
mimetypes.add_type('application/javascript', '.js')
mimetypes.add_type('text/css', '.css')
# ==============================================================================
# 3. 核心定时任务逻辑 (加强版)
# ==============================================================================
def auto_monitor_job(app):
"""
每天 17:00 触发的爬虫任务
"""
# ✅ 强制使用应用上下文,确保数据库连接有效
with app.app_context():
# 获取当前北京时间用于日志
tz = pytz.timezone('Asia/Shanghai')
now_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
print(f"\n{'=' * 50}")
print(f"⏰ [定时任务触发] 北京时间: {now_str}")
print(f"🚀 正在开始执行爬虫逻辑...")
if not execute_monitor_task:
print("❌ 错误: 未找到爬虫执行函数 (execute_monitor_task)")
return
try:
# 1. 执行爬取
task_result = execute_monitor_task()
if not task_result:
print("⚠️ [警告] 爬虫执行完毕,但返回空数据 (None)")
return
scraped_list = task_result.get('device_list', [])
print(f"📦 [数据获取] 爬虫返回了 {len(scraped_list)} 条设备数据")
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
success_count = 0
# 2. 遍历入库
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
# 查找或新建设备
device = Device.query.filter_by(name=d_name).first()
if not device:
print(f"🆕 发现新设备: {d_name},正在创建...")
device = Device(name=d_name, source=item.get('source'), install_site="")
db.session.add(device)
db.session.flush() # 立即获取 ID
# 更新设备状态表
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = item.get('target_time')
device.check_time = current_time # 更新检查时间证明爬过了
f_count = item.get('num_files', 0)
device.file_count = f_count
# 计算 offset
device.offset = calculate_offset(item.get('target_time'))
# JSON 字段合并逻辑
old_json = {}
try:
if device.json_data:
old_json = json.loads(device.json_data)
except:
old_json = {}
new_json = item.get('raw_json', {})
if isinstance(new_json, dict):
old_json.update(new_json)
device.json_data = json.dumps(old_json, ensure_ascii=False)
# ✅ 3. 写入历史记录 (这是数据留存的关键)
history_entry = DeviceHistory(
device_id=device.id,
status=device.status,
result_data=device.current_value,
data_time=item.get('target_time'), # 文件的时间
json_data=device.json_data,
file_count=f_count,
create_time=datetime.now() # 记录入库时的系统时间
)
db.session.add(history_entry)
success_count += 1
# ✅ 4. 显式提交事务
print(f"💾 正在提交事务到数据库...")
db.session.commit()
print(f"✅ [成功] 已更新 {success_count} 台设备,并写入历史记录。")
print(f"{'=' * 50}\n")
except Exception as e:
db.session.rollback() # 出错回滚
print(f"❌ [严重异常] 定时任务执行失败: {e}")
import traceback
traceback.print_exc()
# ==============================================================================
# 4. Flask 应用工厂
# ==============================================================================
def create_app():
app = Flask(__name__, static_folder=STATIC_FOLDER, instance_path=INSTANCE_PATH)
CORS(app)
# 数据库路径配置
if not os.path.exists(app.instance_path):
os.makedirs(app.instance_path, exist_ok=True)
app.config.from_object(Config)
db_name = 'monitor_data.db'
db_path = os.path.join(app.instance_path, db_name)
if sys.platform.startswith('win'):
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}'
else:
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:////{db_path}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# ✅ APScheduler 配置
app.config['SCHEDULER_API_ENABLED'] = True
app.config['SCHEDULER_TIMEZONE'] = "Asia/Shanghai" # 全局时区设置
db.init_app(app)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# ✅ 添加定时任务 (针对常开机环境的最稳配置)
scheduler.add_job(
id='daily_monitor_task',
func=auto_monitor_job,
args=[app],
trigger='cron',
hour=17, # 每天 17 点
minute=0,
second=0,
misfire_grace_time=3600, # 允许延迟1小时执行
timezone=pytz.timezone('Asia/Shanghai') # 再次强制指定时区
)
# 打印一下确认任务已添加
print(f"📅 定时任务已锁定: 每天北京时间 17:00 执行")
app.register_blueprint(device_bp)
# 手动触发测试接口 (保留以备不时之需)
@app.route('/api/force_run')
def force_run_task():
auto_monitor_job(app)
return jsonify({'code': 200, 'msg': '手动触发成功'})
# 前端路由
@app.route('/')
def serve_index():
return send_from_directory(app.static_folder, 'index.html')
@app.route('/<path:path>')
def serve_static(path):
file_path = os.path.join(app.static_folder, path)
if os.path.exists(file_path):
return send_from_directory(app.static_folder, path)
if path.startswith('api'):
return jsonify({'code': 404, 'message': 'Not Found'}), 404
return send_from_directory(app.static_folder, 'index.html')
with app.app_context():
db.create_all()
return app
if __name__ == '__main__':
app = create_app()
debug_mode = not getattr(sys, 'frozen', False)
print("🚀 服务启动中 (24小时常驻模式)...")
# ✅ 关键设置: use_reloader=False
# 防止 Flask 的热重载功能启动两个进程,导致定时任务跑两遍或者被意外杀掉
app.run(host='0.0.0.0', port=5000, debug=debug_mode, use_reloader=False)