2.3权限管理

This commit is contained in:
YueL1331
2026-01-09 13:38:51 +08:00
parent 43f049112f
commit 9c73e25937
41 changed files with 0 additions and 60670 deletions

279
2_3banben/routes/api.py Normal file
View File

@ -0,0 +1,279 @@
import os
import shutil
import json
import re
from datetime import datetime
from flask import Blueprint, jsonify, request
from sqlalchemy import desc, or_
from extensions import db
from models import Device, DeviceHistory, MaintenanceLog
# 尝试导入爬虫模块
try:
from services.core import execute_monitor_task
except ImportError:
execute_monitor_task = None
api_bp = Blueprint('api', __name__, url_prefix='/api')
# =======================
# 0. 认证接口
# =======================
@api_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if username == 'admin' and password == 'licahk':
return jsonify({
'code': 200,
'message': '登录成功',
'token': 'super-admin-token-2026',
'user': {'username': 'admin', 'role': 'administrator'}
})
return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401
# =======================
# 1. 设备概览与详情接口
# =======================
@api_bp.route('/devices_overview', methods=['GET'])
def devices_overview():
try:
devices = Device.query.all()
data_list = [d.to_dict() for d in devices]
return jsonify({'code': 200, 'data': data_list})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/device_data_by_date', methods=['GET'])
def device_data_by_date():
name = request.args.get('name')
date_str = request.args.get('date')
if not name or not date_str:
return jsonify({'code': 400, 'message': 'Missing name or date'}), 400
device = Device.query.filter_by(name=name).first()
if not device:
return jsonify({'code': 404, 'message': 'Device not found'}), 404
content = None
history_record = DeviceHistory.query.filter(
DeviceHistory.device_id == device.id,
DeviceHistory.data_time.like(f"{date_str}%")
).order_by(desc(DeviceHistory.id)).first()
if history_record:
content = history_record.json_data
elif device.latest_time and device.latest_time.startswith(date_str):
content = device.json_data
if content:
return jsonify({
'code': 200,
'name': device.name,
'source': device.source,
'content': content
})
return jsonify({'code': 404, 'message': 'No data for this date'}), 404
# =======================
# 2. 维修日志接口
# =======================
@api_bp.route('/logs/list', methods=['GET'])
def get_logs():
keyword = request.args.get('keyword', '')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
query = MaintenanceLog.query
if keyword:
kw = f"%{keyword}%"
query = query.filter(or_(
MaintenanceLog.device_name.like(kw),
MaintenanceLog.engineer.like(kw),
MaintenanceLog.location.like(kw),
MaintenanceLog.content.like(kw)
))
if start_date and end_date:
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
query = query.filter(MaintenanceLog.timestamp.between(start_dt, end_dt))
except ValueError:
pass
logs = query.order_by(MaintenanceLog.timestamp.desc()).all()
return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]})
@api_bp.route('/logs/add', methods=['POST'])
def add_log():
data = request.get_json()
try:
new_log = MaintenanceLog(
device_name=data.get('device_name', '未知设备'),
engineer=data.get('engineer', ''),
location=data.get('location', ''),
content=data.get('content', '')
)
db.session.add(new_log)
db.session.commit()
return jsonify({'code': 200, 'message': 'Log saved'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/update', methods=['POST'])
def update_log():
data = request.get_json()
log_id = data.get('id')
log = MaintenanceLog.query.get(log_id)
if not log: return jsonify({'code': 404, 'message': 'Not found'}), 404
try:
log.device_name = data.get('device_name', log.device_name)
log.engineer = data.get('engineer', log.engineer)
log.location = data.get('location', log.location)
log.content = data.get('content', log.content)
db.session.commit()
return jsonify({'code': 200, 'message': 'Log updated'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/delete', methods=['POST'])
def delete_log():
data = request.get_json()
log = MaintenanceLog.query.get(data.get('id'))
if log:
db.session.delete(log)
db.session.commit()
return jsonify({'code': 200, 'message': 'Deleted'})
return jsonify({'code': 404, 'message': 'Not found'}), 404
# =======================
# 3. 辅助与控制接口 (核心修复逻辑)
# =======================
def calculate_offset(latest_time_str):
if not latest_time_str or latest_time_str == "N/A": return "从未同步"
try:
clean = str(latest_time_str).split()[0].replace('_', '-')
target = datetime.strptime(clean, "%Y-%m-%d").date()
diff = (datetime.now().date() - target).days
return "当天已同步" if diff == 0 else f"滞后 {diff}"
except:
return "时间解析失败"
@api_bp.route('/run_monitor', methods=['POST'])
def run_monitor():
try:
if not execute_monitor_task:
return jsonify({'code': 500, 'message': 'Core module missing'})
task_result = execute_monitor_task()
if not task_result: return jsonify({'code': 200, 'message': '任务跳过'})
scraped_list = task_result.get('device_list', [])
current_check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
d_raw = item.get('raw_json', {})
source = item.get('source', '')
target_time = item.get('target_time')
# 处理 106 路径时间
if '106' in str(source):
try:
path_str = d_raw.get('path', '')
match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str)
if match:
target_time = f"{match.group(1).replace('_', '-')} {match.group(2).replace('_', ':')}"
except:
pass
json_str = json.dumps(d_raw, ensure_ascii=False) if isinstance(d_raw, (dict, list)) else str(d_raw)
# --- 关键修改:先查询,后更新 ---
device = Device.query.filter_by(name=d_name).first()
if not device:
# 只有新设备才初始化静态字段
device = Device(name=d_name, source=source, install_site="")
db.session.add(device)
db.session.flush() # 获取 ID 供 History 使用
# 仅更新动态抓取的字段,保留手动填写的 install_site, is_maintaining, is_hidden
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = target_time
device.check_time = current_check_time
device.json_data = json_str
device.offset = calculate_offset(target_time)
new_history = DeviceHistory(
device_id=device.id,
status=item.get('status'),
result_data=item.get('value'),
data_time=target_time,
json_data=json_str
)
db.session.add(new_history)
count += 1
db.session.commit()
return jsonify({'code': 200, 'message': f'成功更新 {count} 台设备,资料已保留'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/update_site', methods=['POST'])
def update_site():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.install_site = data.get('site')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
@api_bp.route('/toggle_maintenance', methods=['POST'])
def toggle_maintenance():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.is_maintaining = data.get('is_maintaining')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
@api_bp.route('/toggle_hidden', methods=['POST'])
def toggle_hidden():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.is_hidden = data.get('is_hidden')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404