diff --git a/2.1版本/app.py b/2.1版本/app.py index 2a567c2..398a7e0 100644 --- a/2.1版本/app.py +++ b/2.1版本/app.py @@ -1,41 +1,128 @@ -# app.py import os import sys +import json +from datetime import datetime from flask import Flask -from extensions import db, cors # ✅ 从 extensions 导入 -from models import Device, DeviceHistory, MaintenanceLog # 导入模型以便 SQLAlchemy 识别 +from flask_apscheduler import APScheduler # ✅ 新增:定时任务控制器 +from extensions import db, cors +from models import Device, DeviceHistory, MaintenanceLog from routes.api import api_bp +# 尝试导入爬虫核心逻辑,以便定时任务调用 +try: + from services.core import execute_monitor_task +except ImportError: + execute_monitor_task = None + # 解决 Windows 下控制台输出乱码问题 sys.stdout.reconfigure(encoding='utf-8') +# 初始化调度器 +scheduler = APScheduler() + + +def auto_monitor_job(app): + """定时任务执行逻辑""" + with app.app_context(): + print(f"⏰ [定时任务] 启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + if not execute_monitor_task: + print("❌ 错误: 找不到 services.core.execute_monitor_task 模块") + return + + try: + # 1. 执行爬取任务 + task_result = execute_monitor_task() + if not task_result: + print("⚠️ [定时任务] 爬虫未返回数据,跳过更新") + return + + scraped_list = task_result.get('device_list', []) + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # 2. 这里的逻辑需要和 api.py 保持高度一致(复用更新逻辑) + # 为了防止代码重复,建议以后将此逻辑封装在 services 里的一个独立函数中 + from routes.api import calculate_offset + + count = 0 + for item in scraped_list: + d_name = item.get('name') + if not d_name: continue + + device = Device.query.filter_by(name=d_name).first() + if not device: + device = Device(name=d_name, source=item.get('source'), install_site="") + db.session.add(device) + db.session.flush() + + # 更新动态字段 + device.status = item.get('status') + device.current_value = item.get('value') + device.latest_time = item.get('target_time') + device.check_time = current_time + device.json_data = json.dumps(item.get('raw_json', {}), ensure_ascii=False) + device.offset = calculate_offset(item.get('target_time')) + + # 记录历史 + db.session.add(DeviceHistory( + device_id=device.id, + status=device.status, + result_data=device.current_value, + data_time=item.get('target_time'), + json_data=device.json_data + )) + count += 1 + + db.session.commit() + print(f"✅ [定时任务] 成功自动更新 {count} 台设备数据") + + except Exception as e: + db.session.rollback() + print(f"❌ [定时任务] 运行出错: {str(e)}") + def create_app(): app = Flask(__name__) - # 1. 配置路径 + # 1. 配置路径与数据库 basedir = os.path.abspath(os.path.dirname(__file__)) instance_path = os.path.join(basedir, 'instance') if not os.path.exists(instance_path): os.makedirs(instance_path) - print(f"📁 检测到目录不存在,已自动创建: {instance_path}") db_path = os.path.join(instance_path, 'devices.db') - - # 配置 app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['JSON_AS_ASCII'] = False - # 2. 初始化插件 (使用 init_app 模式) - cors.init_app(app) # ✅ - db.init_app(app) # ✅ + # 定时任务配置 + app.config['SCHEDULER_API_ENABLED'] = True + app.config['SCHEDULER_TIMEZONE'] = "Asia/Shanghai" # 设置时区 - # 3. 注册蓝图 - # 注意:api.py 里已经写了 url_prefix='/api',这里不要再写,否则变 /api/api/... + # 2. 初始化插件 + cors.init_app(app) + db.init_app(app) + + # 3. 初始化并启动调度器 + scheduler.init_app(app) + + # 4. 注册定时任务:每天 10:00 运行 + # 如果你想测试是否生效,可以暂时把 hour=10 改为每隔一分钟运行一次:trigger='interval', minutes=1 + scheduler.add_job( + id='daily_crawl_task', + func=auto_monitor_job, + args=[app], + trigger='cron', + hour=10, + minute=0 + ) + + scheduler.start() + + # 5. 注册蓝图 app.register_blueprint(api_bp) - # 4. 初始化数据库表 + # 6. 初始化数据库表 with app.app_context(): db.create_all() @@ -44,6 +131,7 @@ def create_app(): app = create_app() + @app.shell_context_processor def make_shell_context(): return { @@ -53,6 +141,9 @@ def make_shell_context(): 'MaintenanceLog': MaintenanceLog } + if __name__ == '__main__': print("🚀 服务正在启动: http://127.0.0.1:5000") - app.run(debug=True, host='0.0.0.0', port=5000) \ No newline at end of file + print("⏰ 定时任务已就绪:每天 10:00 自动执行爬取") + # 注意:在生产环境中使用 debug=True 会导致调度器运行两次,建议生产环境设为 False + app.run(debug=False, host='0.0.0.0', port=5000) \ No newline at end of file diff --git a/2.1版本/models.py b/2.1版本/models.py index f20fc5d..37695ee 100644 --- a/2.1版本/models.py +++ b/2.1版本/models.py @@ -1,4 +1,3 @@ -# models.py from datetime import datetime import json from extensions import db @@ -10,24 +9,23 @@ class Device(db.Model): name = db.Column(db.String(100), unique=True, index=True) source = db.Column(db.String(50)) - # 快照字段 + # 快照字段(爬虫更新) status = db.Column(db.String(50)) current_value = db.Column(db.String(200)) - latest_time = db.Column(db.String(50)) # 原始抓取时间字符串 - json_data = db.Column(db.Text) # 完整数据 - - check_time = db.Column(db.String(50)) # 系统检查时间 + latest_time = db.Column(db.String(50)) + json_data = db.Column(db.Text) + check_time = db.Column(db.String(50)) reason = db.Column(db.String(255)) - offset = db.Column(db.String(50)) # 滞后描述 + offset = db.Column(db.String(50)) + # 手动录入字段(受保护,run_monitor 不主动覆盖) install_site = db.Column(db.String(100), default="") is_maintaining = db.Column(db.Boolean, default=False) is_hidden = db.Column(db.Boolean, default=False) def to_dict(self): - # 兼容处理 API 返回 + # 统一状态映射逻辑 api_status = 'offline' if self.status in ['离线', '异常', '已离线'] else 'online' - # 注意:列表接口不返回 json_data 以提高性能 return { 'id': self.id, 'name': self.name, @@ -37,7 +35,7 @@ class Device(db.Model): 'status_text': self.status, 'value': self.current_value, 'reason': self.reason, - 'install_site': self.install_site, + 'install_site': self.install_site or '', 'is_maintaining': self.is_maintaining, 'is_hidden': self.is_hidden, 'offset': self.offset @@ -61,10 +59,10 @@ class MaintenanceLog(db.Model): __tablename__ = 'maintenance_logs' id = db.Column(db.Integer, primary_key=True) device_name = db.Column(db.String(100), nullable=False) - engineer = db.Column(db.String(50)) # 工程师 - location = db.Column(db.String(100)) # 地点 - content = db.Column(db.Text) # 事件内容 - timestamp = db.Column(db.DateTime, default=datetime.now) # 自动记录时间 + engineer = db.Column(db.String(50)) + location = db.Column(db.String(100)) + content = db.Column(db.Text) + timestamp = db.Column(db.DateTime, default=datetime.now) def to_dict(self): return { diff --git a/2.1版本/routes/api.py b/2.1版本/routes/api.py index 1346f3b..6e9f6ee 100644 --- a/2.1版本/routes/api.py +++ b/2.1版本/routes/api.py @@ -1,7 +1,7 @@ import os import shutil import json -import re # [新增] 引入正则模块用于解析路径 +import re from datetime import datetime from flask import Blueprint, jsonify, request from sqlalchemy import desc, or_ @@ -17,6 +17,27 @@ except ImportError: api_bp = Blueprint('api', __name__, url_prefix='/api') +# ======================= +# 0. 认证接口 +# ======================= + +@api_bp.route('/login', methods=['POST']) +def login(): + data = request.get_json() + username = data.get('username') + password = data.get('password') + + if username == 'admin' and password == 'licahk': + return jsonify({ + 'code': 200, + 'message': '登录成功', + 'token': 'super-admin-token-2026', + 'user': {'username': 'admin', 'role': 'administrator'} + }) + + return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401 + + # ======================= # 1. 设备概览与详情接口 # ======================= @@ -34,7 +55,7 @@ def devices_overview(): @api_bp.route('/device_data_by_date', methods=['GET']) def device_data_by_date(): name = request.args.get('name') - date_str = request.args.get('date') # 前端传来的格式通常是 YYYY-MM-DD + date_str = request.args.get('date') if not name or not date_str: return jsonify({'code': 400, 'message': 'Missing name or date'}), 400 @@ -44,9 +65,6 @@ def device_data_by_date(): return jsonify({'code': 404, 'message': 'Device not found'}), 404 content = None - - # 1. 查历史表 - # 注意:如果数据是通过 run_monitor 经过处理保存的,data_time 应该是标准的 YYYY-MM-DD 格式 history_record = DeviceHistory.query.filter( DeviceHistory.device_id == device.id, DeviceHistory.data_time.like(f"{date_str}%") @@ -54,8 +72,6 @@ def device_data_by_date(): if history_record: content = history_record.json_data - - # 2. 查当前状态 (如果历史表没查到,且当前状态的时间也匹配) elif device.latest_time and device.latest_time.startswith(date_str): content = device.json_data @@ -66,8 +82,7 @@ def device_data_by_date(): 'source': device.source, 'content': content }) - else: - return jsonify({'code': 404, 'message': 'No data for this date'}), 404 + return jsonify({'code': 404, 'message': 'No data for this date'}), 404 # ======================= @@ -81,7 +96,6 @@ def get_logs(): end_date = request.args.get('end_date') query = MaintenanceLog.query - if keyword: kw = f"%{keyword}%" query = query.filter(or_( @@ -125,20 +139,14 @@ def add_log(): def update_log(): data = request.get_json() log_id = data.get('id') - - if not log_id: - return jsonify({'code': 400, 'message': 'Missing Log ID'}), 400 - log = MaintenanceLog.query.get(log_id) - if not log: - return jsonify({'code': 404, 'message': 'Log not found'}), 404 + if not log: return jsonify({'code': 404, 'message': 'Not found'}), 404 try: log.device_name = data.get('device_name', log.device_name) log.engineer = data.get('engineer', log.engineer) log.location = data.get('location', log.location) log.content = data.get('content', log.content) - db.session.commit() return jsonify({'code': 200, 'message': 'Log updated'}) except Exception as e: @@ -158,15 +166,13 @@ def delete_log(): # ======================= -# 3. 辅助与控制接口 (核心修改逻辑在 run_monitor) +# 3. 辅助与控制接口 (核心修复逻辑) # ======================= def calculate_offset(latest_time_str): if not latest_time_str or latest_time_str == "N/A": return "从未同步" try: - # 处理可能包含 _ 或 - 的日期 clean = str(latest_time_str).split()[0].replace('_', '-') - if len(clean) < 8: return latest_time_str target = datetime.strptime(clean, "%Y-%m-%d").date() diff = (datetime.now().date() - target).days return "当天已同步" if diff == 0 else f"滞后 {diff} 天" @@ -195,40 +201,30 @@ def run_monitor(): source = item.get('source', '') target_time = item.get('target_time') - # ================= [核心修改部分 START] ================= - # 针对 106 网站的数据,从 path 中解析标准时间 - # 格式示例: "path": "/Data/2026_01_08/xiwuzhu_16_29_28.csv" - # 目标格式: "2026-01-08 16:29:28" + # 处理 106 路径时间 if '106' in str(source): try: path_str = d_raw.get('path', '') - # 正则匹配日期 (YYYY_MM_DD) 和 时间 (HH_MM_SS) - # 解释:/Data/(年_月_日)/任意字符_(时_分_秒).csv match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str) - if match: - date_part = match.group(1).replace('_', '-') # 2026_01_08 -> 2026-01-08 - time_part = match.group(2).replace('_', ':') # 16_29_28 -> 16:29:28 - extracted_time = f"{date_part} {time_part}" - - # 覆盖爬虫原本获取的可能不准确的时间 - target_time = extracted_time - item['target_time'] = extracted_time - except Exception as parse_err: - print(f"Error parsing 106 path: {parse_err}") - # ================= [核心修改部分 END] ================= + target_time = f"{match.group(1).replace('_', '-')} {match.group(2).replace('_', ':')}" + except: + pass json_str = json.dumps(d_raw, ensure_ascii=False) if isinstance(d_raw, (dict, list)) else str(d_raw) + # --- 关键修改:先查询,后更新 --- device = Device.query.filter_by(name=d_name).first() if not device: - device = Device(name=d_name, source=source) + # 只有新设备才初始化静态字段 + device = Device(name=d_name, source=source, install_site="") db.session.add(device) - db.session.flush() # 获取ID + db.session.flush() # 获取 ID 供 History 使用 + # 仅更新动态抓取的字段,保留手动填写的 install_site, is_maintaining, is_hidden device.status = item.get('status') device.current_value = item.get('value') - device.latest_time = target_time # 使用解析后的时间 + device.latest_time = target_time device.check_time = current_check_time device.json_data = json_str device.offset = calculate_offset(target_time) @@ -237,14 +233,14 @@ def run_monitor(): device_id=device.id, status=item.get('status'), result_data=item.get('value'), - data_time=target_time, # 存入标准时间,方便查询 + data_time=target_time, json_data=json_str ) db.session.add(new_history) count += 1 db.session.commit() - return jsonify({'code': 200, 'message': f'更新 {count} 台设备'}) + return jsonify({'code': 200, 'message': f'成功更新 {count} 台设备,资料已保留'}) except Exception as e: db.session.rollback() return jsonify({'code': 500, 'message': str(e)})