import os import sys import json from datetime import datetime from flask import Flask from flask_apscheduler import APScheduler # ✅ 新增:定时任务控制器 from extensions import db, cors from models import Device, DeviceHistory, MaintenanceLog from routes.api import api_bp # 尝试导入爬虫核心逻辑,以便定时任务调用 try: from services.core import execute_monitor_task except ImportError: execute_monitor_task = None # 解决 Windows 下控制台输出乱码问题 sys.stdout.reconfigure(encoding='utf-8') # 初始化调度器 scheduler = APScheduler() def auto_monitor_job(app): """定时任务执行逻辑""" with app.app_context(): print(f"⏰ [定时任务] 启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if not execute_monitor_task: print("❌ 错误: 找不到 services.core.execute_monitor_task 模块") return try: # 1. 执行爬取任务 task_result = execute_monitor_task() if not task_result: print("⚠️ [定时任务] 爬虫未返回数据,跳过更新") return scraped_list = task_result.get('device_list', []) current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 2. 这里的逻辑需要和 api.py 保持高度一致(复用更新逻辑) # 为了防止代码重复,建议以后将此逻辑封装在 services 里的一个独立函数中 from routes.api import calculate_offset count = 0 for item in scraped_list: d_name = item.get('name') if not d_name: continue device = Device.query.filter_by(name=d_name).first() if not device: device = Device(name=d_name, source=item.get('source'), install_site="") db.session.add(device) db.session.flush() # 更新动态字段 device.status = item.get('status') device.current_value = item.get('value') device.latest_time = item.get('target_time') device.check_time = current_time device.json_data = json.dumps(item.get('raw_json', {}), ensure_ascii=False) device.offset = calculate_offset(item.get('target_time')) # 记录历史 db.session.add(DeviceHistory( device_id=device.id, status=device.status, result_data=device.current_value, data_time=item.get('target_time'), json_data=device.json_data )) count += 1 db.session.commit() print(f"✅ [定时任务] 成功自动更新 {count} 台设备数据") except Exception as e: db.session.rollback() print(f"❌ [定时任务] 运行出错: {str(e)}") def create_app(): app = Flask(__name__) # 1. 配置路径与数据库 basedir = os.path.abspath(os.path.dirname(__file__)) instance_path = os.path.join(basedir, 'instance') if not os.path.exists(instance_path): os.makedirs(instance_path) db_path = os.path.join(instance_path, 'devices.db') app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['JSON_AS_ASCII'] = False # 定时任务配置 app.config['SCHEDULER_API_ENABLED'] = True app.config['SCHEDULER_TIMEZONE'] = "Asia/Shanghai" # 设置时区 # 2. 初始化插件 cors.init_app(app) db.init_app(app) # 3. 初始化并启动调度器 scheduler.init_app(app) # 4. 注册定时任务:每天 10:00 运行 # 如果你想测试是否生效,可以暂时把 hour=10 改为每隔一分钟运行一次:trigger='interval', minutes=1 scheduler.add_job( id='daily_crawl_task', func=auto_monitor_job, args=[app], trigger='cron', hour=10, minute=0 ) scheduler.start() # 5. 注册蓝图 app.register_blueprint(api_bp) # 6. 初始化数据库表 with app.app_context(): db.create_all() return app app = create_app() @app.shell_context_processor def make_shell_context(): return { 'db': db, 'Device': Device, 'DeviceHistory': DeviceHistory, 'MaintenanceLog': MaintenanceLog } if __name__ == '__main__': print("🚀 服务正在启动: http://127.0.0.1:5000") print("⏰ 定时任务已就绪:每天 10:00 自动执行爬取") # 注意:在生产环境中使用 debug=True 会导致调度器运行两次,建议生产环境设为 False app.run(debug=False, host='0.0.0.0', port=5000)