Files
ZDXX/2_1banben/routes/api.py

627 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import re
from datetime import datetime
from flask import Blueprint, jsonify, request
from sqlalchemy import desc, or_
from extensions import db
from models import Device, DeviceHistory, MaintenanceLog
# --- 尝试导入服务模块 ---
try:
from services.core import execute_monitor_task
except ImportError:
execute_monitor_task = None
try:
from services.iot_api import sync_iot_data_service
except ImportError:
sync_iot_data_service = None
api_bp = Blueprint('api', __name__, url_prefix='/api')
# =========================================================
# 0. 核心算法区:数据质量分析与辅助函数
# =========================================================
def calculate_offset(latest_time_str):
"""
计算时间滞后天数
"""
if not latest_time_str or latest_time_str == "N/A":
return "从未同步"
try:
# 兼容处理 2026_01_13 和 2026-01-13 格式
clean = str(latest_time_str).split()[0].replace('_', '-')
target = datetime.strptime(clean, "%Y-%m-%d").date()
diff = (datetime.now().date() - target).days
return "当天" if diff == 0 else f"滞后 {diff}"
except:
return "时间解析失败"
def check_data_quality(content_data, source_type, data_time_str=None):
"""
数据质量分析算法 (融合版:旧版核心规则 + 新版夜间/IoT过滤)
"""
if not content_data:
return 'ok'
# 1. [新功能] IoT 卡不需要检查数据质量
if str(source_type) == 'iot_card':
return 'ok'
# 2. [新功能] 夜间免打扰逻辑 (08:00 - 17:00 之外不报错)
# 物理规律:晚上没有太阳,光谱仪数值低是正常的,不应报错
if data_time_str and data_time_str != 'N/A':
try:
clean_time = str(data_time_str).replace('_', '-')
dt = None
try:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M:%S")
except:
try:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M")
except:
pass
# 如果解析成功,且不在 8点-17点之间视为夜晚直接返回 ok
if dt and (dt.hour < 8 or dt.hour >= 17):
return 'ok'
except:
pass
# 3. [旧版核心] 数据异常判断逻辑
status = 'ok'
source_str = str(source_type)
# --- Type A: 106 设备逻辑 (CSV格式) ---
if '106' in source_str:
try:
# 兼容处理:如果 content_data 是字典,尝试取 content 字段;如果是字符串直接用
text_content = ""
if isinstance(content_data, dict):
text_content = content_data.get('content', str(content_data))
else:
text_content = str(content_data)
# 只要包含 OSIFBeta 就进行解析
if 'OSIFBeta' in text_content:
lines = text_content.split('\n') if '\n' in text_content else [text_content]
for line in lines:
if 'OSIFBeta' not in line:
continue
parts = line.split(',')
# 简单校验列数
if len(parts) < 10:
continue
# 检查积分时间 (Index 2)
try:
int_time = int(parts[2])
# 旧代码逻辑:只有积分时间饱和 (>= 66534) 才检查数值
if int_time >= 66534:
# 数据点通常从第4个(Index 3)开始
data_points = []
for p in parts[3:]:
try:
data_points.append(float(p))
except:
pass
if not data_points:
continue
# 规则A红色报错 (存在 < 100 的点)
for val in data_points:
if val < 100:
return 'error'
# 规则B黄色警告 (连续 5 个点在 100-500 之间)
consecutive_warning = 0
for val in data_points:
if 100 <= val <= 500:
consecutive_warning += 1
if consecutive_warning >= 5:
status = 'warning'
# 注意:不立即返回,继续检查后面是否有 error
else:
consecutive_warning = 0
except:
continue
except Exception:
return 'ok'
# --- Type B: 82 设备逻辑 (JSON格式) ---
else:
try:
# 82 设备 content_data 应该已经是字典
if not isinstance(content_data, dict):
return 'ok'
specs = content_data.get('downspec', [])
if not specs:
specs = content_data.get('upspec', [])
if specs and isinstance(specs, list):
consecutive_low = 0
for val in specs:
# 确保 val 是数字
if not isinstance(val, (int, float)):
continue
# 旧代码逻辑: 连续2点 < 500 -> error
if val < 500:
consecutive_low += 1
if consecutive_low >= 2:
return 'error'
else:
consecutive_low = 0
return 'ok'
except Exception:
return 'ok'
return status
def save_iot_cards_to_db(card_list):
"""
[新功能] IoT数据入库逻辑
1. 只操作 source='iot_card' 的记录。
2. 必须保留 is_whitelist 状态,防止被自动同步覆盖。
"""
if not card_list: return 0, None
update_count = 0
try:
for card in card_list:
iccid = card.get('iccid')
if not iccid: continue
# 1. 查找是否存在
sim_record = Device.query.filter_by(name=iccid, source='iot_card').first()
current_whitelist = False
if not sim_record:
# 插入新数据
sim_record = Device(name=iccid, source='iot_card', install_site="IoT库")
db.session.add(sim_record)
db.session.flush()
else:
# 旧卡:读取并保留旧的白名单设置
try:
old_json = json.loads(sim_record.json_data)
current_whitelist = old_json.get('is_whitelist', False)
except:
current_whitelist = False
# 2. 更新字段
sim_record.status = str(card.get('cardStatus', ''))
card_data = {
"iccid": iccid,
"usedTraffic": str(card.get('usedTraffic') or '0'),
"stopDate": card.get('stopDate', 'N/A'),
"cardStatus": card.get('cardStatus'),
"tag": card.get('tag', ''),
"is_whitelist": current_whitelist # 写回保留的状态
}
sim_record.json_data = json.dumps(card_data, ensure_ascii=False)
sim_record.check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
update_count += 1
return update_count, None
except Exception as e:
return 0, str(e)
# =========================================================
# 1. 基础接口 (认证 & 概览)
# =========================================================
@api_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if username == 'admin' and password == 'licahk':
return jsonify({
'code': 200,
'message': '登录成功',
'token': 'super-admin-token-2026',
'user': {'username': 'admin', 'role': 'administrator'}
})
return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401
@api_bp.route('/devices_overview', methods=['GET'])
def devices_overview():
try:
# A. 获取 IoT卡表 (source='iot_card')
iot_records = Device.query.filter_by(source='iot_card').all()
iot_map = {}
for rec in iot_records:
try:
j = json.loads(rec.json_data)
iot_map[rec.name] = j
except:
pass
# B. 获取 真实设备 (source != 'iot_card')
devices = Device.query.filter(Device.source != 'iot_card').all()
data_list = []
for d in devices:
item = d.to_dict()
parsed_content = {}
if d.json_data:
try:
parsed_content = json.loads(d.json_data)
except:
pass
# --- 绑定逻辑 (将IoT卡信息注入到设备) ---
bound_iccid = parsed_content.get('bound_iccid')
item['usedTraffic'] = None
item['stopDate'] = None
item['isBound'] = False
item['bound_iccid'] = bound_iccid
item['is_whitelist'] = False # 默认无白名单
# 如果有绑定,且卡片存在,则注入卡片信息
if bound_iccid and bound_iccid in iot_map:
card_info = iot_map[bound_iccid]
item['usedTraffic'] = card_info.get('usedTraffic')
item['stopDate'] = card_info.get('stopDate')
item['is_whitelist'] = card_info.get('is_whitelist', False)
item['isBound'] = True
# [关键] 调用异常检测函数 (check_data_quality)
item['data_quality'] = check_data_quality(parsed_content, d.source, d.latest_time)
data_list.append(item)
# C. 必须把 IoT卡表 的数据也传给前端 (用于计算总流量 & 绑定弹窗)
for rec in iot_records:
item = rec.to_dict()
try:
j = json.loads(rec.json_data)
except:
j = {}
item['usedTraffic'] = j.get('usedTraffic', '0')
item['stopDate'] = j.get('stopDate', '')
item['is_whitelist'] = j.get('is_whitelist', False)
item['isOrphanIoT'] = True
item['source'] = 'iot_card'
data_list.append(item)
return jsonify({'code': 200, 'data': data_list})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
# =========================================================
# 2. 历史数据接口
# =========================================================
@api_bp.route('/device_data_by_date', methods=['GET'])
def device_data_by_date():
name = request.args.get('name')
date_str = request.args.get('date') # 格式: 2026_01_13 或 2026-01-13
if not name or not date_str:
return jsonify({'code': 400, 'message': 'Missing name or date'}), 400
device = Device.query.filter_by(name=name).first()
if not device:
return jsonify({'code': 404, 'message': 'Device not found'}), 404
content = None
# 统一将下划线格式转换为横杠格式进行查询
query_date = date_str.replace('_', '-')
# 1. 尝试从历史记录表中查找
history_record = DeviceHistory.query.filter(
DeviceHistory.device_id == device.id,
DeviceHistory.data_time.like(f"{query_date}%")
).order_by(desc(DeviceHistory.id)).first()
if history_record:
content = history_record.json_data
# 2. 如果历史表中没有,查当前 Device 表
elif device.latest_time and device.latest_time.startswith(query_date):
content = device.json_data
if content:
return jsonify({
'code': 200,
'name': device.name,
'source': device.source,
'content': content
})
return jsonify({'code': 404, 'message': 'No data for this date'}), 404
# 兼容旧调用的 stub
@api_bp.route('/device_data_by_date_stub', methods=['GET'])
def device_data_by_date_stub():
return device_data_by_date()
# =========================================================
# 3. 核心控制接口 (检测 & 写入)
# =========================================================
@api_bp.route('/run_monitor', methods=['POST'])
def run_monitor():
msg_list = []
try:
# --- A. 执行爬虫并入库 ---
if execute_monitor_task:
task_result = execute_monitor_task()
if task_result:
scraped_list = task_result.get('device_list', [])
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count_crawler = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
# 提取原始信息
d_raw = item.get('raw_json', {})
source = item.get('source', '')
target_time = item.get('target_time') # 默认时间
# [旧代码逻辑保留] 针对 106 设备,从路径中强制解析正确的时间格式
if '106' in str(source):
try:
path_str = d_raw.get('path', '')
# 匹配形如 /Data/2026_01_13/xxx_15_30_00.csv
match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str)
if match:
date_part = match.group(1).replace('_', '-') # 2026-01-13
time_part = match.group(2).replace('_', ':') # 15:30:00
target_time = f"{date_part} {time_part}"
except:
pass
# 查找或创建设备
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=source, install_site="")
db.session.add(device)
db.session.flush()
if device.source == 'iot_card':
device.source = source
# 更新字段
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = target_time
device.check_time = current_time
# [新代码逻辑] 合并模式 (update),防止覆盖掉 bound_iccid
old_json = {}
try:
old_json = json.loads(device.json_data)
except:
pass
new_json = d_raw if isinstance(d_raw, dict) else item.get('raw_json', {})
if isinstance(new_json, dict):
old_json.update(new_json)
device.json_data = json.dumps(old_json, ensure_ascii=False)
device.offset = calculate_offset(device.latest_time)
# 写入历史记录
new_history = DeviceHistory(
device_id=device.id,
status=item.get('status'),
result_data=item.get('value'),
data_time=target_time,
json_data=device.json_data
)
db.session.add(new_history)
count_crawler += 1
msg_list.append(f"爬虫更新: {count_crawler}")
else:
msg_list.append("爬虫无数据")
# --- B. 执行 IoT 同步 (写入数据库) ---
if sync_iot_data_service:
iot_list = sync_iot_data_service()
c, e = save_iot_cards_to_db(iot_list)
if e:
msg_list.append(f"IoT错: {e}")
else:
msg_list.append(f"IoT更新: {c}")
db.session.commit()
return jsonify({'code': 200, 'message': " | ".join(msg_list)})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
# =========================================================
# 4. 白名单、绑定与设备管理 (新功能)
# =========================================================
@api_bp.route('/toggle_whitelist', methods=['POST'])
def toggle_whitelist():
data = request.get_json()
iccid = data.get('iccid')
is_whitelist = data.get('is_whitelist')
sim_record = Device.query.filter_by(name=iccid, source='iot_card').first()
if not sim_record:
return jsonify({'code': 404, 'message': '未找到该卡片'})
try:
j = json.loads(sim_record.json_data)
j['is_whitelist'] = is_whitelist
sim_record.json_data = json.dumps(j, ensure_ascii=False)
db.session.commit()
return jsonify({'code': 200, 'message': '设置成功'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/sync_iot_cards', methods=['POST'])
def sync_iot_cards():
if not sync_iot_data_service: return jsonify({'code': 500, 'message': '服务缺失'}), 500
try:
iot_list = sync_iot_data_service()
c, e = save_iot_cards_to_db(iot_list)
if e: return jsonify({'code': 500, 'message': e}), 500
db.session.commit()
return jsonify({'code': 200, 'message': f'更新{c}张卡', 'data': iot_list})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)}), 500
@api_bp.route('/bind_device_card', methods=['POST'])
def bind_device_card():
data = request.get_json()
target = Device.query.filter_by(name=data.get('device_name')).first()
if not target: return jsonify({'code': 404, 'message': '找不到设备'})
try:
d_json = {}
try:
d_json = json.loads(target.json_data)
except:
pass
d_json['bound_iccid'] = data.get('iccid')
target.json_data = json.dumps(d_json, ensure_ascii=False)
db.session.commit()
return jsonify({'code': 200, 'message': '绑定成功'})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/add_device', methods=['POST'])
def add_device():
data = request.get_json()
try:
new_device = Device(
name=data.get('name'),
install_site=data.get('site', ''),
source='manual',
status='offline',
check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
json_data='{}',
is_hidden=0,
is_maintaining=0
)
db.session.add(new_device)
db.session.commit()
return jsonify({'code': 200})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/update_site', methods=['POST'])
def update_site():
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.install_site = request.json.get('site')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
@api_bp.route('/toggle_maintenance', methods=['POST'])
def toggle_maintenance():
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.is_maintaining = request.json.get('is_maintaining')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
@api_bp.route('/toggle_hidden', methods=['POST'])
def toggle_hidden():
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.is_hidden = request.json.get('is_hidden')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
# =========================================================
# 5. 日志管理接口 (CRUD)
# =========================================================
@api_bp.route('/logs/list', methods=['GET'])
def get_logs_list():
keyword = request.args.get('keyword', '')
query = MaintenanceLog.query
if keyword:
kw = f"%{keyword}%"
query = query.filter(or_(
MaintenanceLog.device_name.like(kw),
MaintenanceLog.content.like(kw),
MaintenanceLog.engineer.like(kw)
))
logs = query.order_by(MaintenanceLog.timestamp.desc()).all()
return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]})
@api_bp.route('/logs/add', methods=['POST'])
def add_log_entry():
data = request.get_json()
try:
new_log = MaintenanceLog(
device_name=data.get('device_name', ''),
engineer=data.get('engineer', ''),
location=data.get('location', ''),
content=data.get('content', '')
)
db.session.add(new_log)
db.session.commit()
return jsonify({'code': 200})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/update', methods=['POST'])
def update_log_entry():
data = request.get_json()
log = MaintenanceLog.query.get(data.get('id'))
if not log: return jsonify({'code': 404})
try:
log.device_name = data.get('device_name', log.device_name)
log.content = data.get('content', log.content)
log.engineer = data.get('engineer', log.engineer)
log.location = data.get('location', log.location)
db.session.commit()
return jsonify({'code': 200})
except Exception as e:
return jsonify({'code': 500})
@api_bp.route('/logs/delete', methods=['POST'])
def delete_log_entry():
data = request.get_json()
log = MaintenanceLog.query.get(data.get('id'))
if log:
db.session.delete(log)
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})