Files
ZDXX/2_1banben/routes/api.py
2026-01-12 15:57:34 +08:00

459 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import shutil
import json
import re
from datetime import datetime
from flask import Blueprint, jsonify, request
from sqlalchemy import desc, or_
from extensions import db
from models import Device, DeviceHistory, MaintenanceLog
# 尝试导入爬虫模块
try:
from services.core import execute_monitor_task
except ImportError:
execute_monitor_task = None
api_bp = Blueprint('api', __name__, url_prefix='/api')
# =======================
# 0. 核心算法:数据质量分析 (含夜间免打扰)
# =======================
def check_data_quality(content_data, source_type, data_time_str=None):
"""
在后端快速分析数据质量
:param content_data: 已解析的 JSON 对象 (Dict 或 String)
:param source_type: 设备类型源字符串 (区分 106 或 82)
:param data_time_str: 数据生成时间字符串 (用于判断是否为夜晚)
:return: 'ok' | 'warning' | 'error'
"""
if not content_data:
return 'ok'
# --- [夜间免打扰逻辑] ---
# 物理规律:晚上没有太阳,光谱仪数值低是正常的,不应报错。
# 逻辑:只有在 08:00 - 17:00 之间才检查数值。
if data_time_str and data_time_str != 'N/A':
try:
# 1. 格式清洗
clean_time = str(data_time_str).replace('_', '-')
# 2. 尝试解析时间
dt = None
try:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
try:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M")
except ValueError:
pass
# 3. 如果解析成功,判断小时数
if dt:
start_hour = 8 # 早上 8 点
end_hour = 17 # 下午 5 点
# 如果当前时间 小于8点 或者 大于等于17点视为夜晚直接返回正常
if dt.hour < start_hour or dt.hour >= end_hour:
return 'ok'
except Exception:
pass
# ---------------------------
status = 'ok'
source_str = str(source_type)
# === 106 设备逻辑 (CSV 格式) ===
if '106' in source_str:
try:
text_content = ""
if isinstance(content_data, dict):
text_content = content_data.get('content', str(content_data))
else:
text_content = str(content_data)
lines = text_content.split('\n')
for line in lines:
if 'OSIFBeta' not in line: continue
parts = line.split(',')
if len(parts) < 10: continue
try:
int_time = int(parts[2])
except:
continue
# 只有积分时间饱和 (>= 66534) 才检查数值
if int_time >= 66534:
data_points = []
for p in parts[3:]:
try:
data_points.append(float(p))
except:
pass
if not data_points: continue
# 规则1红色报错 (存在 < 100 的点)
for val in data_points:
if val < 100:
return 'error'
# 规则2黄色警告 (连续 5 个点在 100-500 之间)
consecutive_warning = 0
for val in data_points:
if 100 <= val <= 500:
consecutive_warning += 1
if consecutive_warning >= 5:
status = 'warning'
else:
consecutive_warning = 0
return status
except Exception:
return 'ok'
# === 82 设备逻辑 (JSON 格式) ===
else:
try:
if not isinstance(content_data, dict):
return 'ok'
specs = content_data.get('downspec', [])
if not specs:
specs = content_data.get('upspec', [])
if not specs: return 'ok'
consecutive_low = 0
for val in specs:
if not isinstance(val, (int, float)): continue
if val < 500:
consecutive_low += 1
if consecutive_low >= 2:
return 'error'
else:
consecutive_low = 0
return 'ok'
except Exception:
return 'ok'
# =======================
# 1. 认证接口
# =======================
@api_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if username == 'admin' and password == 'licahk':
return jsonify({
'code': 200,
'message': '登录成功',
'token': 'super-admin-token-2026',
'user': {'username': 'admin', 'role': 'administrator'}
})
return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401
# =======================
# 2. 设备概览与详情接口
# =======================
@api_bp.route('/devices_overview', methods=['GET'])
def devices_overview():
try:
devices = Device.query.all()
data_list = []
for d in devices:
item = d.to_dict()
parsed_content = None
if d.json_data:
try:
parsed_content = json.loads(d.json_data)
except:
parsed_content = None
# 传入 d.latest_time 以启用夜间判断
quality_status = check_data_quality(parsed_content, d.source, d.latest_time)
item['data_quality'] = quality_status
data_list.append(item)
return jsonify({'code': 200, 'data': data_list})
except Exception as e:
print(f"Overview Error: {e}")
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/device_data_by_date', methods=['GET'])
def device_data_by_date():
name = request.args.get('name')
date_str = request.args.get('date')
if not name or not date_str:
return jsonify({'code': 400, 'message': 'Missing name or date'}), 400
device = Device.query.filter_by(name=name).first()
if not device:
return jsonify({'code': 404, 'message': 'Device not found'}), 404
content = None
history_record = DeviceHistory.query.filter(
DeviceHistory.device_id == device.id,
DeviceHistory.data_time.like(f"{date_str}%")
).order_by(desc(DeviceHistory.id)).first()
if history_record:
content = history_record.json_data
elif device.latest_time and device.latest_time.startswith(date_str):
content = device.json_data
if content:
return jsonify({
'code': 200,
'name': device.name,
'source': device.source,
'content': content
})
return jsonify({'code': 404, 'message': 'No data for this date'}), 404
# =======================
# 3. 维修日志接口
# =======================
@api_bp.route('/logs/list', methods=['GET'])
def get_logs():
keyword = request.args.get('keyword', '')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
query = MaintenanceLog.query
if keyword:
kw = f"%{keyword}%"
query = query.filter(or_(
MaintenanceLog.device_name.like(kw),
MaintenanceLog.engineer.like(kw),
MaintenanceLog.location.like(kw),
MaintenanceLog.content.like(kw)
))
if start_date and end_date:
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
query = query.filter(MaintenanceLog.timestamp.between(start_dt, end_dt))
except ValueError:
pass
logs = query.order_by(MaintenanceLog.timestamp.desc()).all()
return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]})
@api_bp.route('/logs/add', methods=['POST'])
def add_log():
data = request.get_json()
try:
new_log = MaintenanceLog(
device_name=data.get('device_name', '未知设备'),
engineer=data.get('engineer', ''),
location=data.get('location', ''),
content=data.get('content', '')
)
db.session.add(new_log)
db.session.commit()
return jsonify({'code': 200, 'message': 'Log saved'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/update', methods=['POST'])
def update_log():
data = request.get_json()
log_id = data.get('id')
log = MaintenanceLog.query.get(log_id)
if not log: return jsonify({'code': 404, 'message': 'Not found'}), 404
try:
log.device_name = data.get('device_name', log.device_name)
log.engineer = data.get('engineer', log.engineer)
log.location = data.get('location', log.location)
log.content = data.get('content', log.content)
db.session.commit()
return jsonify({'code': 200, 'message': 'Log updated'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/delete', methods=['POST'])
def delete_log():
data = request.get_json()
log = MaintenanceLog.query.get(data.get('id'))
if log:
db.session.delete(log)
db.session.commit()
return jsonify({'code': 200, 'message': 'Deleted'})
return jsonify({'code': 404, 'message': 'Not found'}), 404
# =======================
# 4. 辅助与控制接口
# =======================
def calculate_offset(latest_time_str):
if not latest_time_str or latest_time_str == "N/A": return "从未同步"
try:
clean = str(latest_time_str).split()[0].replace('_', '-')
target = datetime.strptime(clean, "%Y-%m-%d").date()
diff = (datetime.now().date() - target).days
return "当天已同步" if diff == 0 else f"滞后 {diff}"
except:
return "时间解析失败"
@api_bp.route('/run_monitor', methods=['POST'])
def run_monitor():
try:
if not execute_monitor_task:
return jsonify({'code': 500, 'message': 'Core module missing'})
task_result = execute_monitor_task()
if not task_result: return jsonify({'code': 200, 'message': '任务跳过'})
scraped_list = task_result.get('device_list', [])
current_check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
d_raw = item.get('raw_json', {})
source = item.get('source', '')
target_time = item.get('target_time')
if '106' in str(source):
try:
path_str = d_raw.get('path', '')
match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str)
if match:
target_time = f"{match.group(1).replace('_', '-')} {match.group(2).replace('_', ':')}"
except:
pass
json_str = json.dumps(d_raw, ensure_ascii=False) if isinstance(d_raw, (dict, list)) else str(d_raw)
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=source, install_site="")
db.session.add(device)
db.session.flush()
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = target_time
device.check_time = current_check_time
device.json_data = json_str
device.offset = calculate_offset(target_time)
new_history = DeviceHistory(
device_id=device.id,
status=item.get('status'),
result_data=item.get('value'),
data_time=target_time,
json_data=json_str
)
db.session.add(new_history)
count += 1
db.session.commit()
return jsonify({'code': 200, 'message': f'成功更新 {count} 台设备,资料已保留'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/update_site', methods=['POST'])
def update_site():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.install_site = data.get('site')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
@api_bp.route('/toggle_maintenance', methods=['POST'])
def toggle_maintenance():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.is_maintaining = data.get('is_maintaining')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
@api_bp.route('/toggle_hidden', methods=['POST'])
def toggle_hidden():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.is_hidden = data.get('is_hidden')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
# =======================
# 5. 手动添加设备接口 (新增)
# =======================
@api_bp.route('/add_device', methods=['POST'])
def add_device():
data = request.get_json()
name = data.get('name')
site = data.get('site', '')
if not name:
return jsonify({'code': 400, 'message': '必须填写设备名称'}), 400
# 1. 检查是否已存在
existing = Device.query.filter_by(name=name).first()
if existing:
return jsonify({'code': 400, 'message': f'设备 {name} 已存在,无需重复添加'}), 400
try:
# 2. 创建新设备记录
# source 标记为 'manual',方便以后区分
# status 默认为 'offline' (离线)
# latest_time 默认为 'N/A'
new_device = Device(
name=name,
install_site=site,
source='manual',
status='offline',
current_value='0',
latest_time='N/A',
check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
json_data='{}',
is_hidden=0,
is_maintaining=0
)
db.session.add(new_device)
db.session.commit()
return jsonify({'code': 200, 'message': '设备添加成功'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})