Files
ZDXX/2_1banben/routes/api.py

696 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import re
from datetime import datetime
from flask import Blueprint, jsonify, request
from sqlalchemy import desc, or_
from extensions import db
from models import Device, DeviceHistory, MaintenanceLog
# =========================================================
# 模块动态导入 (防止循环引用或缺失报错)
# =========================================================
try:
from services.core import execute_monitor_task
except ImportError:
execute_monitor_task = None
try:
from services.iot_api import sync_iot_data_service
except ImportError:
sync_iot_data_service = None
api_bp = Blueprint('api', __name__, url_prefix='/api')
# =========================================================
# 0. 核心算法区:数据质量分析与辅助函数
# =========================================================
def calculate_offset(latest_time_str):
"""
计算时间滞后天数
用于前端展示设备数据是否过时
"""
if not latest_time_str or latest_time_str == "N/A":
return "从未同步"
try:
# 兼容处理 2026_01_13 和 2026-01-13 格式
clean = str(latest_time_str).split()[0].replace('_', '-')
target = datetime.strptime(clean, "%Y-%m-%d").date()
diff = (datetime.now().date() - target).days
return "当天" if diff == 0 else f"滞后 {diff}"
except:
return "时间解析失败"
def check_data_quality(content_data, source_type, data_time_str=None):
"""
数据质量分析算法 (融合版:旧版核心规则 + 新版夜间/IoT过滤)
用于判断设备状态颜色 (绿色ok/黄色warning/红色error)
"""
if not content_data:
return 'ok'
# 1. IoT 卡不需要检查数据质量
if str(source_type) == 'iot_card':
return 'ok'
# 2. 夜间免打扰逻辑 (08:00 - 17:00 之外不报错)
if data_time_str and data_time_str != 'N/A':
try:
clean_time = str(data_time_str).replace('_', '-')
dt = None
try:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M:%S")
except:
try:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M")
except:
pass
if dt and (dt.hour < 8 or dt.hour >= 17):
return 'ok'
except:
pass
# 3. 数据异常判断逻辑
status = 'ok'
source_str = str(source_type)
# --- Type A: 106 设备逻辑 (CSV格式) ---
if '106' in source_str:
try:
text_content = ""
if isinstance(content_data, dict):
text_content = content_data.get('content', str(content_data))
else:
text_content = str(content_data)
if 'OSIFBeta' in text_content:
lines = text_content.split('\n') if '\n' in text_content else [text_content]
for line in lines:
if 'OSIFBeta' not in line:
continue
parts = line.split(',')
if len(parts) < 10:
continue
try:
int_time = int(parts[2])
if int_time >= 66534:
data_points = []
for p in parts[3:]:
try:
data_points.append(float(p))
except:
pass
if not data_points:
continue
for val in data_points:
if val < 100:
return 'error'
consecutive_warning = 0
for val in data_points:
if 100 <= val <= 500:
consecutive_warning += 1
if consecutive_warning >= 5:
status = 'warning'
else:
consecutive_warning = 0
except:
continue
except Exception:
return 'ok'
# --- Type B: 82 设备逻辑 (JSON格式) ---
else:
try:
if not isinstance(content_data, dict):
return 'ok'
specs = content_data.get('downspec', [])
if not specs:
specs = content_data.get('upspec', [])
if specs and isinstance(specs, list):
consecutive_low = 0
for val in specs:
if not isinstance(val, (int, float)):
continue
if val < 500:
consecutive_low += 1
if consecutive_low >= 2:
return 'error'
else:
consecutive_low = 0
return 'ok'
except Exception:
return 'ok'
return status
def save_iot_cards_to_db(card_list):
"""
[核心修复] IoT数据入库逻辑 - 增量更新模式
这里负责将 iot_api.py 获取到的新字段保存到数据库的 JSON 字段中
"""
if not card_list: return 0, None
update_count = 0
try:
for card in card_list:
iccid = card.get('iccid') or card.get('card_id')
if not iccid: continue
# 1. 查找是否存在该 SIM 卡记录
sim_record = Device.query.filter_by(name=iccid, source='iot_card').first()
old_json = {}
if not sim_record:
sim_record = Device(name=iccid, source='iot_card', install_site="IoT库")
db.session.add(sim_record)
db.session.flush()
else:
try:
if sim_record.json_data:
old_json = json.loads(sim_record.json_data)
except:
old_json = {}
# 2. 准备需要更新的 API 数据
api_updates = {
"iccid": iccid,
"usedTraffic": str(card.get('usedTraffic') or '0'),
"stopDate": card.get('stopDate', 'N/A'),
"cardStatus": card.get('cardStatus'),
"tag": card.get('tag', ''),
# === [新增] 这里保存刚才在 iot_api.py 里生成的中文状态描述 ===
"statusDesc": card.get('statusDesc', '未知')
# ========================================================
}
# 3. 合并数据 (保留 is_whitelist)
old_json.update(api_updates)
if 'is_whitelist' not in old_json:
old_json['is_whitelist'] = False
# 4. 更新数据库字段
sim_record.status = str(card.get('cardStatus', ''))
sim_record.json_data = json.dumps(old_json, ensure_ascii=False)
sim_record.check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
update_count += 1
return update_count, None
except Exception as e:
return 0, str(e)
# =========================================================
# 1. 基础接口 (认证 & 概览)
# =========================================================
@api_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if username == 'admin' and password == 'licahk':
return jsonify({
'code': 200,
'message': '登录成功',
'token': 'super-admin-token-2026',
'user': {'username': 'admin', 'role': 'administrator'}
})
return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401
@api_bp.route('/devices_overview', methods=['GET'])
def devices_overview():
try:
# A. 获取 IoT卡表
iot_records = Device.query.filter_by(source='iot_card').all()
iot_map = {}
for rec in iot_records:
try:
j = json.loads(rec.json_data)
iot_map[rec.name] = j
except:
pass
# B. 获取 真实设备
devices = Device.query.filter(Device.source != 'iot_card').all()
data_list = []
for d in devices:
# 关键d.to_dict() 在 models.py 中应包含 file_count
item = d.to_dict()
# 强制格式化时间
raw_time = d.latest_time
if raw_time:
if hasattr(raw_time, 'strftime'):
item['latest_time'] = raw_time.strftime("%Y-%m-%d %H:%M:%S")
else:
s = str(raw_time).strip()
if '_' in s and ':' not in s:
item['latest_time'] = s.replace('_', '-') + " 00:00:00"
else:
item['latest_time'] = s
parsed_content = {}
if d.json_data:
try:
parsed_content = json.loads(d.json_data)
except:
pass
# --- 绑定逻辑 ---
bound_iccid = parsed_content.get('bound_iccid')
item['usedTraffic'] = None
item['stopDate'] = None
item['statusDesc'] = None # 初始化字段
item['isBound'] = False
item['bound_iccid'] = bound_iccid
item['is_whitelist'] = False
# 如果有绑定,注入卡片信息
if bound_iccid and bound_iccid in iot_map:
card_info = iot_map[bound_iccid]
item['usedTraffic'] = card_info.get('usedTraffic')
item['stopDate'] = card_info.get('stopDate')
item['is_whitelist'] = card_info.get('is_whitelist', False)
# === [新增] 将绑定的卡片状态描述传给前端 ===
item['statusDesc'] = card_info.get('statusDesc')
# ======================================
item['isBound'] = True
item['data_quality'] = check_data_quality(parsed_content, d.source, d.latest_time)
data_list.append(item)
# C. IoT卡表数据 (用于卡池管理界面)
for rec in iot_records:
item = rec.to_dict()
try:
j = json.loads(rec.json_data)
except:
j = {}
item['usedTraffic'] = j.get('usedTraffic', '0')
item['stopDate'] = j.get('stopDate', '')
item['is_whitelist'] = j.get('is_whitelist', False)
# === [新增] 将卡池列表中的状态描述传给前端 ===
item['statusDesc'] = j.get('statusDesc', '未知')
# =======================================
item['isOrphanIoT'] = True
item['source'] = 'iot_card'
data_list.append(item)
return jsonify({'code': 200, 'data': data_list})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
# =========================================================
# 2. 历史数据接口
# =========================================================
@api_bp.route('/device_data_by_date', methods=['GET'])
def device_data_by_date():
name = request.args.get('name')
date_str = request.args.get('date')
if not name or not date_str:
return jsonify({'code': 400, 'message': 'Missing name or date'}), 400
device = Device.query.filter_by(name=name).first()
if not device:
return jsonify({'code': 404, 'message': 'Device not found'}), 404
content = None
query_date = date_str.replace('_', '-')
history_record = DeviceHistory.query.filter(
DeviceHistory.device_id == device.id,
DeviceHistory.data_time.like(f"{query_date}%")
).order_by(desc(DeviceHistory.id)).first()
if history_record:
content = history_record.json_data
elif device.latest_time and device.latest_time.startswith(query_date):
content = device.json_data
if content:
return jsonify({
'code': 200,
'name': device.name,
'source': device.source,
'content': content
})
return jsonify({'code': 404, 'message': 'No data for this date'}), 404
@api_bp.route('/device_data_by_date_stub', methods=['GET'])
def device_data_by_date_stub():
return device_data_by_date()
# =========================================================
# 3. 核心控制接口 (检测 & 写入)
# =========================================================
@api_bp.route('/run_monitor', methods=['POST'])
def run_monitor():
msg_list = []
try:
# --- A. 执行爬虫并入库 ---
if execute_monitor_task:
task_result = execute_monitor_task()
if task_result:
scraped_list = task_result.get('device_list', [])
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count_crawler = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
d_raw = item.get('raw_json', {})
source = item.get('source', '')
target_time = item.get('target_time')
if '106' in str(source):
try:
path_str = d_raw.get('path', '')
match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str)
if match:
date_part = match.group(1).replace('_', '-')
time_part = match.group(2).replace('_', ':')
target_time = f"{date_part} {time_part}"
except:
pass
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=source, install_site="")
db.session.add(device)
db.session.flush()
if device.source == 'iot_card':
device.source = source
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = target_time
device.check_time = current_time
# ✅ [核心修改] 获取爬虫返回的文件数量并保存
f_count = item.get('num_files', 0)
device.file_count = f_count
old_json = {}
try:
if device.json_data:
old_json = json.loads(device.json_data)
except:
old_json = {}
new_json = d_raw if isinstance(d_raw, dict) else item.get('raw_json', {})
if isinstance(new_json, dict):
old_json.update(new_json)
device.json_data = json.dumps(old_json, ensure_ascii=False)
device.offset = calculate_offset(device.latest_time)
# ✅ [核心修改] 写入历史记录时包含 file_count
new_history = DeviceHistory(
device_id=device.id,
status=item.get('status'),
result_data=item.get('value'),
data_time=target_time,
json_data=device.json_data,
file_count=f_count # 确保历史数据也记录文件数
)
db.session.add(new_history)
count_crawler += 1
msg_list.append(f"爬虫更新: {count_crawler}")
else:
msg_list.append("爬虫无数据")
# --- B. 执行 IoT 同步 (写入数据库) ---
if sync_iot_data_service:
iot_list = sync_iot_data_service()
# 复用 save_iot_cards_to_db 保存包含 statusDesc 的新数据
c, e = save_iot_cards_to_db(iot_list)
if e:
msg_list.append(f"IoT错: {e}")
else:
msg_list.append(f"IoT更新: {c}")
db.session.commit()
return jsonify({'code': 200, 'message': " | ".join(msg_list)})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
# =========================================================
# 4. 白名单、绑定与设备管理
# =========================================================
@api_bp.route('/toggle_whitelist', methods=['POST'])
def toggle_whitelist():
data = request.get_json()
iccid = data.get('iccid')
is_whitelist = data.get('is_whitelist')
sim_record = Device.query.filter_by(name=iccid, source='iot_card').first()
if not sim_record:
return jsonify({'code': 404, 'message': '未找到该卡片'})
try:
j = json.loads(sim_record.json_data)
j['is_whitelist'] = is_whitelist
sim_record.json_data = json.dumps(j, ensure_ascii=False)
db.session.commit()
return jsonify({'code': 200, 'message': '设置成功'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/sync_iot_cards', methods=['POST'])
def sync_iot_cards():
if not sync_iot_data_service: return jsonify({'code': 500, 'message': '服务缺失'}), 500
try:
iot_list = sync_iot_data_service()
c, e = save_iot_cards_to_db(iot_list)
if e: return jsonify({'code': 500, 'message': e}), 500
db.session.commit()
return jsonify({'code': 200, 'message': f'更新{c}张卡', 'data': iot_list})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)}), 500
@api_bp.route('/bind_device_card', methods=['POST'])
def bind_device_card():
data = request.get_json()
target = Device.query.filter_by(name=data.get('device_name')).first()
if not target: return jsonify({'code': 404, 'message': '找不到设备'})
try:
d_json = {}
try:
d_json = json.loads(target.json_data)
except:
pass
d_json['bound_iccid'] = data.get('iccid')
target.json_data = json.dumps(d_json, ensure_ascii=False)
db.session.commit()
return jsonify({'code': 200, 'message': '绑定成功'})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/add_device', methods=['POST'])
def add_device():
data = request.get_json()
try:
new_device = Device(
name=data.get('name'),
install_site=data.get('site', ''),
source='manual',
status='offline',
check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
json_data='{}',
is_hidden=0,
is_maintaining=0
)
db.session.add(new_device)
db.session.commit()
return jsonify({'code': 200})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/update_site', methods=['POST'])
def update_site():
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.install_site = request.json.get('site')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
@api_bp.route('/toggle_maintenance', methods=['POST'])
def toggle_maintenance():
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.is_maintaining = request.json.get('is_maintaining')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
@api_bp.route('/toggle_hidden', methods=['POST'])
def toggle_hidden():
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.is_hidden = request.json.get('is_hidden')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
# =========================================================
# 5. 日志管理接口 (CRUD)
# =========================================================
@api_bp.route('/logs/list', methods=['GET'])
def get_logs_list():
keyword = request.args.get('keyword', '')
query = MaintenanceLog.query
if keyword:
kw = f"%{keyword}%"
query = query.filter(or_(
MaintenanceLog.device_name.like(kw),
MaintenanceLog.content.like(kw),
MaintenanceLog.engineer.like(kw)
))
logs = query.order_by(MaintenanceLog.timestamp.desc()).all()
return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]})
@api_bp.route('/logs/add', methods=['POST'])
def add_log_entry():
data = request.get_json()
try:
new_log = MaintenanceLog(
device_name=data.get('device_name', ''),
engineer=data.get('engineer', ''),
location=data.get('location', ''),
content=data.get('content', '')
)
db.session.add(new_log)
db.session.commit()
return jsonify({'code': 200})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/update', methods=['POST'])
def update_log_entry():
data = request.get_json()
log = MaintenanceLog.query.get(data.get('id'))
if not log: return jsonify({'code': 404})
try:
log.device_name = data.get('device_name', log.device_name)
log.content = data.get('content', log.content)
log.engineer = data.get('engineer', log.engineer)
log.location = data.get('location', log.location)
db.session.commit()
return jsonify({'code': 200})
except Exception as e:
return jsonify({'code': 500})
@api_bp.route('/logs/delete', methods=['POST'])
def delete_log_entry():
data = request.get_json()
log = MaintenanceLog.query.get(data.get('id'))
if log:
db.session.delete(log)
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404})
@api_bp.route('/device_history_list', methods=['GET'])
def get_device_history_list():
try:
name = request.args.get('name')
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
if not name:
return jsonify({'code': 400, 'message': '缺少设备名称'})
# 1. 找到设备ID
device = Device.query.filter_by(name=name).first()
if not device:
return jsonify({'code': 404, 'message': '设备不存在'})
# 2. 查询历史记录 (按时间倒序)
query = DeviceHistory.query.filter_by(device_id=device.id).order_by(desc(DeviceHistory.data_time))
# 3. 获取总数
total = query.count()
# 4. 分页切片
history_list = query.offset((page - 1) * limit).limit(limit).all()
# 5. 格式化返回数据
data = []
for h in history_list:
# 简单处理日期格式,只取日期部分,或者保留完整时间视需求而定
# 这里假设 data_time 格式为 "YYYY-MM-DD HH:MM:SS" 或 "YYYY_MM_DD..."
date_str = h.data_time
if not date_str:
date_str = h.recorded_at.strftime("%Y-%m-%d %H:%M:%S") if h.recorded_at else "未知"
data.append({
'date': date_str,
'count': h.file_count or 0
})
return jsonify({
'code': 200,
'data': data,
'total': total,
'page': page,
'limit': limit
})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})