639 lines
22 KiB
Python
639 lines
22 KiB
Python
import os
|
|
import json
|
|
import re
|
|
from datetime import datetime
|
|
from flask import Blueprint, jsonify, request
|
|
from sqlalchemy import desc, or_
|
|
from extensions import db
|
|
from models import Device, DeviceHistory, MaintenanceLog
|
|
|
|
# =========================================================
|
|
# 模块动态导入 (防止循环引用或缺失报错)
|
|
# =========================================================
|
|
try:
|
|
from services.core import execute_monitor_task
|
|
except ImportError:
|
|
execute_monitor_task = None
|
|
|
|
try:
|
|
from services.iot_api import sync_iot_data_service
|
|
except ImportError:
|
|
sync_iot_data_service = None
|
|
|
|
api_bp = Blueprint('api', __name__, url_prefix='/api')
|
|
|
|
|
|
# =========================================================
|
|
# 0. 核心算法区:数据质量分析与辅助函数
|
|
# =========================================================
|
|
|
|
def calculate_offset(latest_time_str):
|
|
"""
|
|
计算时间滞后天数
|
|
用于前端展示设备数据是否过时
|
|
"""
|
|
if not latest_time_str or latest_time_str == "N/A":
|
|
return "从未同步"
|
|
try:
|
|
# 兼容处理 2026_01_13 和 2026-01-13 格式
|
|
clean = str(latest_time_str).split()[0].replace('_', '-')
|
|
target = datetime.strptime(clean, "%Y-%m-%d").date()
|
|
diff = (datetime.now().date() - target).days
|
|
return "当天" if diff == 0 else f"滞后 {diff} 天"
|
|
except:
|
|
return "时间解析失败"
|
|
|
|
|
|
def check_data_quality(content_data, source_type, data_time_str=None):
|
|
"""
|
|
数据质量分析算法 (融合版:旧版核心规则 + 新版夜间/IoT过滤)
|
|
用于判断设备状态颜色 (绿色ok/黄色warning/红色error)
|
|
"""
|
|
if not content_data:
|
|
return 'ok'
|
|
|
|
# 1. IoT 卡不需要检查数据质量
|
|
if str(source_type) == 'iot_card':
|
|
return 'ok'
|
|
|
|
# 2. 夜间免打扰逻辑 (08:00 - 17:00 之外不报错)
|
|
if data_time_str and data_time_str != 'N/A':
|
|
try:
|
|
clean_time = str(data_time_str).replace('_', '-')
|
|
dt = None
|
|
try:
|
|
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M:%S")
|
|
except:
|
|
try:
|
|
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M")
|
|
except:
|
|
pass
|
|
|
|
if dt and (dt.hour < 8 or dt.hour >= 17):
|
|
return 'ok'
|
|
except:
|
|
pass
|
|
|
|
# 3. 数据异常判断逻辑
|
|
status = 'ok'
|
|
source_str = str(source_type)
|
|
|
|
# --- Type A: 106 设备逻辑 (CSV格式) ---
|
|
if '106' in source_str:
|
|
try:
|
|
text_content = ""
|
|
if isinstance(content_data, dict):
|
|
text_content = content_data.get('content', str(content_data))
|
|
else:
|
|
text_content = str(content_data)
|
|
|
|
if 'OSIFBeta' in text_content:
|
|
lines = text_content.split('\n') if '\n' in text_content else [text_content]
|
|
|
|
for line in lines:
|
|
if 'OSIFBeta' not in line:
|
|
continue
|
|
|
|
parts = line.split(',')
|
|
if len(parts) < 10:
|
|
continue
|
|
|
|
try:
|
|
int_time = int(parts[2])
|
|
if int_time >= 66534:
|
|
data_points = []
|
|
for p in parts[3:]:
|
|
try:
|
|
data_points.append(float(p))
|
|
except:
|
|
pass
|
|
|
|
if not data_points:
|
|
continue
|
|
|
|
for val in data_points:
|
|
if val < 100:
|
|
return 'error'
|
|
|
|
consecutive_warning = 0
|
|
for val in data_points:
|
|
if 100 <= val <= 500:
|
|
consecutive_warning += 1
|
|
if consecutive_warning >= 5:
|
|
status = 'warning'
|
|
else:
|
|
consecutive_warning = 0
|
|
except:
|
|
continue
|
|
except Exception:
|
|
return 'ok'
|
|
|
|
# --- Type B: 82 设备逻辑 (JSON格式) ---
|
|
else:
|
|
try:
|
|
if not isinstance(content_data, dict):
|
|
return 'ok'
|
|
|
|
specs = content_data.get('downspec', [])
|
|
if not specs:
|
|
specs = content_data.get('upspec', [])
|
|
|
|
if specs and isinstance(specs, list):
|
|
consecutive_low = 0
|
|
for val in specs:
|
|
if not isinstance(val, (int, float)):
|
|
continue
|
|
|
|
if val < 500:
|
|
consecutive_low += 1
|
|
if consecutive_low >= 2:
|
|
return 'error'
|
|
else:
|
|
consecutive_low = 0
|
|
return 'ok'
|
|
except Exception:
|
|
return 'ok'
|
|
|
|
return status
|
|
|
|
|
|
def save_iot_cards_to_db(card_list):
|
|
"""
|
|
[核心修复] IoT数据入库逻辑 - 增量更新模式
|
|
这里负责将 iot_api.py 获取到的新字段保存到数据库的 JSON 字段中
|
|
"""
|
|
if not card_list: return 0, None
|
|
update_count = 0
|
|
|
|
try:
|
|
for card in card_list:
|
|
iccid = card.get('iccid') or card.get('card_id')
|
|
if not iccid: continue
|
|
|
|
# 1. 查找是否存在该 SIM 卡记录
|
|
sim_record = Device.query.filter_by(name=iccid, source='iot_card').first()
|
|
|
|
old_json = {}
|
|
|
|
if not sim_record:
|
|
sim_record = Device(name=iccid, source='iot_card', install_site="IoT库")
|
|
db.session.add(sim_record)
|
|
db.session.flush()
|
|
else:
|
|
try:
|
|
if sim_record.json_data:
|
|
old_json = json.loads(sim_record.json_data)
|
|
except:
|
|
old_json = {}
|
|
|
|
# 2. 准备需要更新的 API 数据
|
|
api_updates = {
|
|
"iccid": iccid,
|
|
"usedTraffic": str(card.get('usedTraffic') or '0'),
|
|
"stopDate": card.get('stopDate', 'N/A'),
|
|
"cardStatus": card.get('cardStatus'),
|
|
"tag": card.get('tag', ''),
|
|
|
|
# === [新增] 这里保存刚才在 iot_api.py 里生成的中文状态描述 ===
|
|
"statusDesc": card.get('statusDesc', '未知')
|
|
# ========================================================
|
|
}
|
|
|
|
# 3. 合并数据 (保留 is_whitelist)
|
|
old_json.update(api_updates)
|
|
|
|
if 'is_whitelist' not in old_json:
|
|
old_json['is_whitelist'] = False
|
|
|
|
# 4. 更新数据库字段
|
|
sim_record.status = str(card.get('cardStatus', ''))
|
|
sim_record.json_data = json.dumps(old_json, ensure_ascii=False)
|
|
sim_record.check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
update_count += 1
|
|
|
|
return update_count, None
|
|
except Exception as e:
|
|
return 0, str(e)
|
|
|
|
|
|
# =========================================================
|
|
# 1. 基础接口 (认证 & 概览)
|
|
# =========================================================
|
|
|
|
@api_bp.route('/login', methods=['POST'])
|
|
def login():
|
|
data = request.get_json()
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
if username == 'admin' and password == 'licahk':
|
|
return jsonify({
|
|
'code': 200,
|
|
'message': '登录成功',
|
|
'token': 'super-admin-token-2026',
|
|
'user': {'username': 'admin', 'role': 'administrator'}
|
|
})
|
|
return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401
|
|
|
|
|
|
@api_bp.route('/devices_overview', methods=['GET'])
|
|
def devices_overview():
|
|
try:
|
|
# A. 获取 IoT卡表
|
|
iot_records = Device.query.filter_by(source='iot_card').all()
|
|
iot_map = {}
|
|
for rec in iot_records:
|
|
try:
|
|
j = json.loads(rec.json_data)
|
|
iot_map[rec.name] = j
|
|
except:
|
|
pass
|
|
|
|
# B. 获取 真实设备
|
|
devices = Device.query.filter(Device.source != 'iot_card').all()
|
|
data_list = []
|
|
|
|
for d in devices:
|
|
item = d.to_dict()
|
|
|
|
# 强制格式化时间
|
|
raw_time = d.latest_time
|
|
if raw_time:
|
|
if hasattr(raw_time, 'strftime'):
|
|
item['latest_time'] = raw_time.strftime("%Y-%m-%d %H:%M:%S")
|
|
else:
|
|
s = str(raw_time).strip()
|
|
if '_' in s and ':' not in s:
|
|
item['latest_time'] = s.replace('_', '-') + " 00:00:00"
|
|
else:
|
|
item['latest_time'] = s
|
|
|
|
parsed_content = {}
|
|
if d.json_data:
|
|
try:
|
|
parsed_content = json.loads(d.json_data)
|
|
except:
|
|
pass
|
|
|
|
# --- 绑定逻辑 ---
|
|
bound_iccid = parsed_content.get('bound_iccid')
|
|
|
|
item['usedTraffic'] = None
|
|
item['stopDate'] = None
|
|
item['statusDesc'] = None # 初始化字段
|
|
item['isBound'] = False
|
|
item['bound_iccid'] = bound_iccid
|
|
item['is_whitelist'] = False
|
|
|
|
# 如果有绑定,注入卡片信息
|
|
if bound_iccid and bound_iccid in iot_map:
|
|
card_info = iot_map[bound_iccid]
|
|
item['usedTraffic'] = card_info.get('usedTraffic')
|
|
item['stopDate'] = card_info.get('stopDate')
|
|
item['is_whitelist'] = card_info.get('is_whitelist', False)
|
|
|
|
# === [新增] 将绑定的卡片状态描述传给前端 ===
|
|
item['statusDesc'] = card_info.get('statusDesc')
|
|
# ======================================
|
|
|
|
item['isBound'] = True
|
|
|
|
item['data_quality'] = check_data_quality(parsed_content, d.source, d.latest_time)
|
|
|
|
data_list.append(item)
|
|
|
|
# C. IoT卡表数据 (用于卡池管理界面)
|
|
for rec in iot_records:
|
|
item = rec.to_dict()
|
|
try:
|
|
j = json.loads(rec.json_data)
|
|
except:
|
|
j = {}
|
|
|
|
item['usedTraffic'] = j.get('usedTraffic', '0')
|
|
item['stopDate'] = j.get('stopDate', '')
|
|
item['is_whitelist'] = j.get('is_whitelist', False)
|
|
|
|
# === [新增] 将卡池列表中的状态描述传给前端 ===
|
|
item['statusDesc'] = j.get('statusDesc', '未知')
|
|
# =======================================
|
|
|
|
item['isOrphanIoT'] = True
|
|
item['source'] = 'iot_card'
|
|
data_list.append(item)
|
|
|
|
return jsonify({'code': 200, 'data': data_list})
|
|
except Exception as e:
|
|
return jsonify({'code': 500, 'message': str(e)})
|
|
|
|
|
|
# =========================================================
|
|
# 2. 历史数据接口
|
|
# =========================================================
|
|
|
|
@api_bp.route('/device_data_by_date', methods=['GET'])
|
|
def device_data_by_date():
|
|
name = request.args.get('name')
|
|
date_str = request.args.get('date')
|
|
|
|
if not name or not date_str:
|
|
return jsonify({'code': 400, 'message': 'Missing name or date'}), 400
|
|
|
|
device = Device.query.filter_by(name=name).first()
|
|
if not device:
|
|
return jsonify({'code': 404, 'message': 'Device not found'}), 404
|
|
|
|
content = None
|
|
query_date = date_str.replace('_', '-')
|
|
|
|
history_record = DeviceHistory.query.filter(
|
|
DeviceHistory.device_id == device.id,
|
|
DeviceHistory.data_time.like(f"{query_date}%")
|
|
).order_by(desc(DeviceHistory.id)).first()
|
|
|
|
if history_record:
|
|
content = history_record.json_data
|
|
elif device.latest_time and device.latest_time.startswith(query_date):
|
|
content = device.json_data
|
|
|
|
if content:
|
|
return jsonify({
|
|
'code': 200,
|
|
'name': device.name,
|
|
'source': device.source,
|
|
'content': content
|
|
})
|
|
return jsonify({'code': 404, 'message': 'No data for this date'}), 404
|
|
|
|
|
|
@api_bp.route('/device_data_by_date_stub', methods=['GET'])
|
|
def device_data_by_date_stub():
|
|
return device_data_by_date()
|
|
|
|
|
|
# =========================================================
|
|
# 3. 核心控制接口 (检测 & 写入)
|
|
# =========================================================
|
|
|
|
@api_bp.route('/run_monitor', methods=['POST'])
|
|
def run_monitor():
|
|
msg_list = []
|
|
|
|
try:
|
|
# --- A. 执行爬虫并入库 ---
|
|
if execute_monitor_task:
|
|
task_result = execute_monitor_task()
|
|
if task_result:
|
|
scraped_list = task_result.get('device_list', [])
|
|
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
count_crawler = 0
|
|
for item in scraped_list:
|
|
d_name = item.get('name')
|
|
if not d_name: continue
|
|
|
|
d_raw = item.get('raw_json', {})
|
|
source = item.get('source', '')
|
|
target_time = item.get('target_time')
|
|
|
|
if '106' in str(source):
|
|
try:
|
|
path_str = d_raw.get('path', '')
|
|
match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str)
|
|
if match:
|
|
date_part = match.group(1).replace('_', '-')
|
|
time_part = match.group(2).replace('_', ':')
|
|
target_time = f"{date_part} {time_part}"
|
|
except:
|
|
pass
|
|
|
|
device = Device.query.filter_by(name=d_name).first()
|
|
if not device:
|
|
device = Device(name=d_name, source=source, install_site="")
|
|
db.session.add(device)
|
|
db.session.flush()
|
|
|
|
if device.source == 'iot_card':
|
|
device.source = source
|
|
|
|
device.status = item.get('status')
|
|
device.current_value = item.get('value')
|
|
device.latest_time = target_time
|
|
device.check_time = current_time
|
|
|
|
old_json = {}
|
|
try:
|
|
if device.json_data:
|
|
old_json = json.loads(device.json_data)
|
|
except:
|
|
old_json = {}
|
|
|
|
new_json = d_raw if isinstance(d_raw, dict) else item.get('raw_json', {})
|
|
if isinstance(new_json, dict):
|
|
old_json.update(new_json)
|
|
|
|
device.json_data = json.dumps(old_json, ensure_ascii=False)
|
|
device.offset = calculate_offset(device.latest_time)
|
|
|
|
new_history = DeviceHistory(
|
|
device_id=device.id,
|
|
status=item.get('status'),
|
|
result_data=item.get('value'),
|
|
data_time=target_time,
|
|
json_data=device.json_data
|
|
)
|
|
db.session.add(new_history)
|
|
count_crawler += 1
|
|
|
|
msg_list.append(f"爬虫更新: {count_crawler}")
|
|
else:
|
|
msg_list.append("爬虫无数据")
|
|
|
|
# --- B. 执行 IoT 同步 (写入数据库) ---
|
|
if sync_iot_data_service:
|
|
iot_list = sync_iot_data_service()
|
|
# 复用 save_iot_cards_to_db 保存包含 statusDesc 的新数据
|
|
c, e = save_iot_cards_to_db(iot_list)
|
|
if e:
|
|
msg_list.append(f"IoT错: {e}")
|
|
else:
|
|
msg_list.append(f"IoT更新: {c}")
|
|
|
|
db.session.commit()
|
|
return jsonify({'code': 200, 'message': " | ".join(msg_list)})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'code': 500, 'message': str(e)})
|
|
|
|
|
|
# =========================================================
|
|
# 4. 白名单、绑定与设备管理
|
|
# =========================================================
|
|
|
|
@api_bp.route('/toggle_whitelist', methods=['POST'])
|
|
def toggle_whitelist():
|
|
data = request.get_json()
|
|
iccid = data.get('iccid')
|
|
is_whitelist = data.get('is_whitelist')
|
|
|
|
sim_record = Device.query.filter_by(name=iccid, source='iot_card').first()
|
|
if not sim_record:
|
|
return jsonify({'code': 404, 'message': '未找到该卡片'})
|
|
|
|
try:
|
|
j = json.loads(sim_record.json_data)
|
|
j['is_whitelist'] = is_whitelist
|
|
sim_record.json_data = json.dumps(j, ensure_ascii=False)
|
|
db.session.commit()
|
|
return jsonify({'code': 200, 'message': '设置成功'})
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'code': 500, 'message': str(e)})
|
|
|
|
|
|
@api_bp.route('/sync_iot_cards', methods=['POST'])
|
|
def sync_iot_cards():
|
|
if not sync_iot_data_service: return jsonify({'code': 500, 'message': '服务缺失'}), 500
|
|
try:
|
|
iot_list = sync_iot_data_service()
|
|
c, e = save_iot_cards_to_db(iot_list)
|
|
if e: return jsonify({'code': 500, 'message': e}), 500
|
|
db.session.commit()
|
|
return jsonify({'code': 200, 'message': f'更新{c}张卡', 'data': iot_list})
|
|
except Exception as e:
|
|
return jsonify({'code': 500, 'message': str(e)}), 500
|
|
|
|
|
|
@api_bp.route('/bind_device_card', methods=['POST'])
|
|
def bind_device_card():
|
|
data = request.get_json()
|
|
target = Device.query.filter_by(name=data.get('device_name')).first()
|
|
if not target: return jsonify({'code': 404, 'message': '找不到设备'})
|
|
try:
|
|
d_json = {}
|
|
try:
|
|
d_json = json.loads(target.json_data)
|
|
except:
|
|
pass
|
|
d_json['bound_iccid'] = data.get('iccid')
|
|
target.json_data = json.dumps(d_json, ensure_ascii=False)
|
|
db.session.commit()
|
|
return jsonify({'code': 200, 'message': '绑定成功'})
|
|
except Exception as e:
|
|
return jsonify({'code': 500, 'message': str(e)})
|
|
|
|
|
|
@api_bp.route('/add_device', methods=['POST'])
|
|
def add_device():
|
|
data = request.get_json()
|
|
try:
|
|
new_device = Device(
|
|
name=data.get('name'),
|
|
install_site=data.get('site', ''),
|
|
source='manual',
|
|
status='offline',
|
|
check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
json_data='{}',
|
|
is_hidden=0,
|
|
is_maintaining=0
|
|
)
|
|
db.session.add(new_device)
|
|
db.session.commit()
|
|
return jsonify({'code': 200})
|
|
except Exception as e:
|
|
return jsonify({'code': 500, 'message': str(e)})
|
|
|
|
|
|
@api_bp.route('/update_site', methods=['POST'])
|
|
def update_site():
|
|
d = Device.query.filter_by(name=request.json.get('name')).first()
|
|
if d:
|
|
d.install_site = request.json.get('site')
|
|
db.session.commit()
|
|
return jsonify({'code': 200})
|
|
return jsonify({'code': 404})
|
|
|
|
|
|
@api_bp.route('/toggle_maintenance', methods=['POST'])
|
|
def toggle_maintenance():
|
|
d = Device.query.filter_by(name=request.json.get('name')).first()
|
|
if d:
|
|
d.is_maintaining = request.json.get('is_maintaining')
|
|
db.session.commit()
|
|
return jsonify({'code': 200})
|
|
return jsonify({'code': 404})
|
|
|
|
|
|
@api_bp.route('/toggle_hidden', methods=['POST'])
|
|
def toggle_hidden():
|
|
d = Device.query.filter_by(name=request.json.get('name')).first()
|
|
if d:
|
|
d.is_hidden = request.json.get('is_hidden')
|
|
db.session.commit()
|
|
return jsonify({'code': 200})
|
|
return jsonify({'code': 404})
|
|
|
|
|
|
# =========================================================
|
|
# 5. 日志管理接口 (CRUD)
|
|
# =========================================================
|
|
|
|
@api_bp.route('/logs/list', methods=['GET'])
|
|
def get_logs_list():
|
|
keyword = request.args.get('keyword', '')
|
|
query = MaintenanceLog.query
|
|
if keyword:
|
|
kw = f"%{keyword}%"
|
|
query = query.filter(or_(
|
|
MaintenanceLog.device_name.like(kw),
|
|
MaintenanceLog.content.like(kw),
|
|
MaintenanceLog.engineer.like(kw)
|
|
))
|
|
logs = query.order_by(MaintenanceLog.timestamp.desc()).all()
|
|
return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]})
|
|
|
|
|
|
@api_bp.route('/logs/add', methods=['POST'])
|
|
def add_log_entry():
|
|
data = request.get_json()
|
|
try:
|
|
new_log = MaintenanceLog(
|
|
device_name=data.get('device_name', ''),
|
|
engineer=data.get('engineer', ''),
|
|
location=data.get('location', ''),
|
|
content=data.get('content', '')
|
|
)
|
|
db.session.add(new_log)
|
|
db.session.commit()
|
|
return jsonify({'code': 200})
|
|
except Exception as e:
|
|
return jsonify({'code': 500, 'message': str(e)})
|
|
|
|
|
|
@api_bp.route('/logs/update', methods=['POST'])
|
|
def update_log_entry():
|
|
data = request.get_json()
|
|
log = MaintenanceLog.query.get(data.get('id'))
|
|
if not log: return jsonify({'code': 404})
|
|
try:
|
|
log.device_name = data.get('device_name', log.device_name)
|
|
log.content = data.get('content', log.content)
|
|
log.engineer = data.get('engineer', log.engineer)
|
|
log.location = data.get('location', log.location)
|
|
db.session.commit()
|
|
return jsonify({'code': 200})
|
|
except Exception as e:
|
|
return jsonify({'code': 500})
|
|
|
|
|
|
@api_bp.route('/logs/delete', methods=['POST'])
|
|
def delete_log_entry():
|
|
data = request.get_json()
|
|
log = MaintenanceLog.query.get(data.get('id'))
|
|
if log:
|
|
db.session.delete(log)
|
|
db.session.commit()
|
|
return jsonify({'code': 200})
|
|
return jsonify({'code': 404}) |