Files
ZDXX/2_1banben/app.py

260 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import mimetypes
import logging
from datetime import datetime
import pytz # ✅ 必须引入:用于强制指定北京时间
from flask import Flask, send_from_directory, jsonify
from flask_cors import CORS
from flask_apscheduler import APScheduler
# ==============================================================================
# ✅ 1. 核心模块引用
# ==============================================================================
try:
from config import Config
from extensions import db
# 引入两个模型Device(存最新状态), DeviceHistory(存所有历史日志)
from models import Device, DeviceHistory
from services.core import execute_monitor_task
# from services.iot_api import sync_iot_data_service # 如不需要可注释
try:
from routes.api import api_bp as device_bp
from routes.api import calculate_offset
except ImportError:
from routes.api import device_bp, calculate_offset
except ImportError as e:
print(f"❌ 严重错误: 模块导入失败。详细信息: {e}")
sys.exit(1)
# ==============================================================================
# 2. 路径配置
# ==============================================================================
def get_paths():
if getattr(sys, 'frozen', False):
resource_base = sys._MEIPASS
data_base = os.path.dirname(sys.executable)
else:
base = os.path.abspath(os.path.dirname(__file__))
resource_base = base
data_base = base
return resource_base, data_base
RESOURCE_BASE, DATA_BASE = get_paths()
STATIC_FOLDER = os.path.join(RESOURCE_BASE, 'web_dist')
INSTANCE_PATH = os.path.join(DATA_BASE, 'instance')
mimetypes.add_type('application/javascript', '.js')
mimetypes.add_type('text/css', '.css')
# ==============================================================================
# 3. 核心定时任务逻辑 (已修正:历史记录改为“永久新增”模式)
# ==============================================================================
def auto_monitor_job(app):
"""
每天 17:00 触发的爬虫任务。
逻辑说明:
1. Device 表:始终更新为最新状态(覆盖旧值),保证首页看到的是最新的。
2. DeviceHistory 表:始终新增一条记录(追加模式),保留每一次爬取的历史痕迹。
"""
# ✅ 强制使用应用上下文,确保数据库连接有效
with app.app_context():
# 获取当前北京时间用于日志
tz = pytz.timezone('Asia/Shanghai')
now_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
print(f"\n{'=' * 50}")
print(f"⏰ [定时任务触发] 北京时间: {now_str}")
print(f"🚀 正在开始执行爬虫逻辑...")
if not execute_monitor_task:
print("❌ 错误: 未找到爬虫执行函数 (execute_monitor_task)")
return
try:
# 1. 执行爬取
task_result = execute_monitor_task()
if not task_result:
print("⚠️ [警告] 爬虫执行完毕,但返回空数据 (None)")
return
scraped_list = task_result.get('device_list', [])
print(f"📦 [数据获取] 爬虫返回了 {len(scraped_list)} 条设备数据")
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 统计计数
stats = {'new_device': 0, 'history_added': 0}
# 2. 遍历每一条爬取到的数据
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
# 获取关键数据字段
f_count = item.get('num_files', 0)
target_date = item.get('target_time') # 例如 "2026-02-04"
# =========================================================
# ✅ A. 处理 Device 表 (始终更新最新状态)
# =========================================================
device = Device.query.filter_by(name=d_name).first()
if not device:
print(f"🆕 发现新设备,正在注册: {d_name}")
device = Device(name=d_name, source=item.get('source'), install_site="")
db.session.add(device)
db.session.flush() # 立即获取 ID
stats['new_device'] += 1
# 无论之前状态如何,强制更新 Device 表的实时字段
# 这样前端首页卡片才能显示最新的 72 条
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = target_date
device.check_time = current_time # 证明刚刚检查过
device.file_count = f_count
device.offset = calculate_offset(target_date)
# JSON 数据合并逻辑
old_json = {}
try:
if device.json_data:
old_json = json.loads(device.json_data)
except:
old_json = {}
new_json = item.get('raw_json', {})
if isinstance(new_json, dict):
old_json.update(new_json)
final_json_str = json.dumps(old_json, ensure_ascii=False)
device.json_data = final_json_str
# =========================================================
# ✅ B. 处理 DeviceHistory 表 (纯新增逻辑)
# =========================================================
# 这里不进行任何查询判断,直接将当次爬取的结果作为一条新历史写入。
# 哪怕 target_date 是一样的create_time 也会不同,从而记录数据变化过程。
history_entry = DeviceHistory(
device_id=device.id,
status=item.get('status'),
result_data=item.get('value'),
data_time=target_date, # 数据原本的日期 (如 2026-02-04)
json_data=final_json_str, # 当时的详细JSON
file_count=f_count, # 当时的文件数 (如 72)
create_time=datetime.now() # 记录这条日志的物理时间 (如 17:00:05)
)
db.session.add(history_entry)
stats['history_added'] += 1
# ✅ 3. 提交事务
print(f"💾 正在提交事务到数据库...")
db.session.commit()
print(f"✅ [入库完成] 统计结果:")
print(f" - 新增设备: {stats['new_device']}")
print(f" - 新增历史记录: {stats['history_added']} (所有爬取数据均已追加)")
print(f"{'=' * 50}\n")
except Exception as e:
db.session.rollback() # 出错回滚
print(f"❌ [严重异常] 数据写入失败: {e}")
import traceback
traceback.print_exc()
# ==============================================================================
# 4. Flask 应用工厂
# ==============================================================================
def create_app():
app = Flask(__name__, static_folder=STATIC_FOLDER, instance_path=INSTANCE_PATH)
CORS(app)
# 数据库路径配置
if not os.path.exists(app.instance_path):
os.makedirs(app.instance_path, exist_ok=True)
app.config.from_object(Config)
db_name = 'monitor_data.db'
db_path = os.path.join(app.instance_path, db_name)
if sys.platform.startswith('win'):
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}'
else:
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:////{db_path}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# ✅ APScheduler 配置
app.config['SCHEDULER_API_ENABLED'] = True
app.config['SCHEDULER_TIMEZONE'] = "Asia/Shanghai" # 全局时区设置
db.init_app(app)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# ✅ 添加定时任务
scheduler.add_job(
id='daily_monitor_task',
func=auto_monitor_job,
args=[app],
trigger='cron',
hour=17, # 每天 17:00
minute=0,
second=0,
misfire_grace_time=3600,
timezone=pytz.timezone('Asia/Shanghai')
)
print(f"📅 定时任务已锁定: 每天北京时间 17:00 执行")
app.register_blueprint(device_bp)
# 手动触发测试接口
@app.route('/api/force_run')
def force_run_task():
auto_monitor_job(app)
return jsonify({'code': 200, 'msg': '手动触发成功,历史记录已追加'})
# 前端路由
@app.route('/')
def serve_index():
return send_from_directory(app.static_folder, 'index.html')
@app.route('/<path:path>')
def serve_static(path):
file_path = os.path.join(app.static_folder, path)
if os.path.exists(file_path):
return send_from_directory(app.static_folder, path)
if path.startswith('api'):
return jsonify({'code': 404, 'message': 'Not Found'}), 404
return send_from_directory(app.static_folder, 'index.html')
with app.app_context():
db.create_all()
return app
if __name__ == '__main__':
app = create_app()
debug_mode = not getattr(sys, 'frozen', False)
print("🚀 服务启动中 (24小时常驻模式)...")
# use_reloader=False 避免定时任务重复执行
app.run(host='0.0.0.0', port=5000, debug=debug_mode, use_reloader=False)