添加哈士奇sim卡业务

This commit is contained in:
YueL1331
2026-01-13 14:50:23 +08:00
parent e2333ea9b8
commit fe21532741
7 changed files with 1153 additions and 507 deletions

View File

@ -1,7 +1,6 @@
import os
import sys
import json
import logging
import mimetypes
from datetime import datetime
from flask import Flask, send_from_directory, jsonify
@ -12,40 +11,44 @@ from flask_apscheduler import APScheduler
# ✅ 1. 核心模块引用
# ==============================================================================
try:
# 数据库实例 (在根目录 extensions.py 中)
# [新增] 导入配置类
from config import Config
# 数据库实例
from extensions import db
# 数据模型 (在根目录 models.py 中)
from models import Device, DeviceHistory, MaintenanceLog
# 数据模型
from models import Device, DeviceHistory
# 核心业务逻辑 (在 services/core.py 中)
# 核心业务逻辑 (爬虫)
from services.core import execute_monitor_task
# 路由蓝图 (在 routes/api.py 中)
# [新增] 核心业务逻辑 (IoT) - 用于定时任务
from services.iot_api import sync_iot_data_service
# 路由蓝图
try:
from routes.api import api_bp as device_bp
# [新增] 导入保存逻辑,供定时任务复用
from routes.api import save_iot_cards_to_db, calculate_offset
except ImportError:
from routes.api import device_bp
# 工具函数 (在 routes/api.py 中)
from routes.api import calculate_offset
from routes.api import device_bp, save_iot_cards_to_db, calculate_offset
except ImportError as e:
print(f"❌ 严重错误: 模块导入失败。请检查文件名和变量名。详细信息: {e}")
print(f"系统路径: {sys.path}")
sys.exit(1)
# ==============================================================================
# 2. 路径计算模块 (兼容 PyInstaller 打包)
# 2. 路径计算 (辅助静态文件服务)
# ==============================================================================
# 注意Config 类中已经处理了数据库路径,这里主要处理 web_dist 静态资源路径
def get_base_path():
"""获取运行时基准路径,兼容开发环境和打包环境"""
if getattr(sys, 'frozen', False):
if hasattr(sys, '_MEIPASS'):
return sys._MEIPASS # --onefile 模式
return sys._MEIPASS
else:
return os.path.dirname(os.path.abspath(sys.executable)) # --onedir 模式
return os.path.dirname(os.path.abspath(sys.executable))
else:
return os.path.abspath(os.path.dirname(__file__))
@ -53,104 +56,110 @@ def get_base_path():
BASE_DIR = get_base_path()
STATIC_FOLDER = os.path.join(BASE_DIR, 'web_dist')
INSTANCE_FOLDER = os.path.join(BASE_DIR, 'instance')
DB_PATH = os.path.join(INSTANCE_FOLDER, 'devices.db')
# 修复 Windows 下注册表 MIME 类型缺失导致网页白屏的问题
# 修复 Windows MIME 类型
mimetypes.add_type('application/javascript', '.js')
mimetypes.add_type('text/css', '.css')
print(f"🚀 运行环境: {'Packaged' if getattr(sys, 'frozen', False) else 'Dev'}")
print(f"📂 基准路径: {BASE_DIR}")
print(f"💾 数据库路径: {DB_PATH}")
# ==============================================================================
# 3. 定时任务逻辑
# 3. 定时任务逻辑 (同时运行 爬虫 + IoT同步)
# ==============================================================================
def auto_monitor_job(app):
"""定时任务具体执行逻辑"""
with app.app_context():
print(f"⏰ [定时任务] 启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if not execute_monitor_task:
print("❌ 错误: 无法加载爬虫核心模块 (execute_monitor_task is Missing)")
return
# --- 任务 A: 爬虫更新 ---
if execute_monitor_task:
try:
task_result = execute_monitor_task()
if task_result:
scraped_list = task_result.get('device_list', [])
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=item.get('source'), install_site="")
db.session.add(device)
db.session.flush()
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = item.get('target_time')
device.check_time = current_time
device.json_data = json.dumps(item.get('raw_json', {}), ensure_ascii=False)
device.offset = calculate_offset(item.get('target_time'))
db.session.add(DeviceHistory(
device_id=device.id,
status=device.status,
result_data=device.current_value,
data_time=item.get('target_time'),
json_data=device.json_data
))
count += 1
print(f"✅ [定时任务-爬虫] 更新 {count}")
else:
print("⚠️ [定时任务-爬虫] 未获取到数据")
except Exception as e:
print(f"❌ [定时任务-爬虫] 异常: {e}")
# --- 任务 B: IoT 同步 (新增) ---
if sync_iot_data_service:
try:
# 1. 获取数据
iot_list = sync_iot_data_service()
# 2. 保存入库 (复用 api.py 中的逻辑)
count_iot, err = save_iot_cards_to_db(iot_list)
if err:
print(f"❌ [定时任务-IoT] 错误: {err}")
else:
print(f"✅ [定时任务-IoT] 更新 {count_iot}")
except Exception as e:
print(f"❌ [定时任务-IoT] 异常: {e}")
# 统一提交事务
try:
# 执行爬取
task_result = execute_monitor_task()
if not task_result:
print("⚠️ [定时任务] 爬虫未获取到数据")
return
scraped_list = task_result.get('device_list', [])
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
# 查找或创建设备
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=item.get('source'), install_site="")
db.session.add(device)
db.session.flush()
# 更新字段
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = item.get('target_time')
device.check_time = current_time
device.json_data = json.dumps(item.get('raw_json', {}), ensure_ascii=False)
device.offset = calculate_offset(item.get('target_time'))
# 写入历史
db.session.add(DeviceHistory(
device_id=device.id,
status=device.status,
result_data=device.current_value,
data_time=item.get('target_time'),
json_data=device.json_data
))
count += 1
db.session.commit()
print(f" [定时任务] 成功更新 {count} 台设备状态")
print("💾 [定时任务] 数据库事务提交完成")
except Exception as e:
db.session.rollback()
print(f"❌ [定时任务] 异常: {str(e)}")
print(f"❌ [定时任务] 提交失败: {e}")
# ==============================================================================
# 4. Flask 应用工厂
# ==============================================================================
def create_app():
# 🔴 关键修复:移除了 static_url_path=''
# 这样 Flask 就不会强制拦截所有根路径请求,让下面的 serve_static 有机会处理 /dashboard
# 指定静态文件夹
app = Flask(__name__, static_folder=STATIC_FOLDER)
CORS(app)
# 确保 instance 目录存在
# 1. 确保 instance 目录存在
if not os.path.exists(INSTANCE_FOLDER):
os.makedirs(INSTANCE_FOLDER, exist_ok=True)
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SCHEDULER_API_ENABLED'] = True
# ==========================================================
# ✅ 2. 核心修复:加载 config.py 中的配置
# ==========================================================
app.config.from_object(Config)
# 初始化数据库
# 打印一下关键配置,确保 IoT 配置已加载 (调试用)
# print(f"DEBUG Config Loaded: IOT_APP_ID={app.config.get('IOT_APP_ID')}")
# 3. 初始化扩展
db.init_app(app)
# 初始化定时任务
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# 添加定时任务 (每天 10:00)
# 4. 添加定时任务 (每天 12:00)
scheduler.add_job(
id='daily_monitor_task',
func=auto_monitor_job,
@ -160,11 +169,11 @@ def create_app():
minute=0
)
# 注册蓝图
# 5. 注册路由蓝图
app.register_blueprint(device_bp)
# -------------------------------------------------
# 前端路由支持 (Vue History Mode)
# 前端路由支持
# -------------------------------------------------
@app.route('/')
def serve_index():
@ -174,18 +183,13 @@ def create_app():
@app.route('/<path:path>')
def serve_static(path):
# 1. 优先尝试直接返回实际存在的文件 (js, css, img等)
file_path = os.path.join(app.static_folder, path)
if os.path.exists(file_path):
return send_from_directory(app.static_folder, path)
# 2. 如果是 API 请求但没找到对应接口,返回 404 JSON (不返回 HTML)
if path.startswith('api') or path.startswith('static'):
return jsonify({'code': 404, 'message': 'Not Found'}), 404
# 3. 关键逻辑:
# 访问 /dashboard 等前端路由时,文件系统中并没有 dashboard 这个文件
# 所以会走到这里,返回 index.html让 Vue 及其 Router 接管页面渲染
return send_from_directory(app.static_folder, 'index.html')
with app.app_context():
@ -196,7 +200,7 @@ def create_app():
if __name__ == '__main__':
app = create_app()
# 生产环境/打包环境通常设为 False
debug_mode = not getattr(sys, 'frozen', False)
print("🚀 服务启动中...")
# 注意use_reloader=False 防止定时任务执行两次
app.run(host='0.0.0.0', port=5000, debug=debug_mode, use_reloader=False)

View File

@ -19,16 +19,19 @@ def get_static_path():
class Config:
BASE_DIR = get_base_path()
# 数据库路径:保存在运行目录下,文件名为 monitor_data.db
# Windows 下路径需要注意转义,这里使用 os.path.join 最安全
SQLALCHEMY_DATABASE_URI = f'sqlite:///{os.path.join(BASE_DIR, "monitor_data.db")}'
# [新增] 规范化 instance 目录
INSTANCE_DIR = os.path.join(BASE_DIR, 'instance')
# [修改] 统一数据库路径到 instance/monitor_data.db
# 这样爬虫和IoT数据都存这里且不会污染根目录
SQLALCHEMY_DATABASE_URI = f'sqlite:///{os.path.join(INSTANCE_DIR, "monitor_data.db")}'
SQLALCHEMY_TRACK_MODIFICATIONS = False
# --- 定时任务配置 ---
SCHEDULER_API_ENABLED = True
SCHEDULER_TIMEZONE = "Asia/Shanghai" # 👈 必须加这个,否则 APScheduler 可能报错
SCHEDULER_TIMEZONE = "Asia/Shanghai"
# --- 爬虫配置 (Service层会读取这里) ---
# --- 爬虫配置 (原有) ---
CRAWLER_CONFIG = {
"106": {
"base_url": "http://106.75.72.40:7500/api/proxy/tcp",
@ -39,4 +42,17 @@ class Config:
"base_url": "http://82.156.1.111/weather/php",
"login": {'username': 'renlixin', 'password': 'licahk', 'login': '123'}
}
}
}
# --- [新增] IoT 物联网卡接口配置 ---
# 这里的配置会被 services/iot_api.py 读取
IOT_BASE_URL = "https://iot.huskyiot.cn"
IOT_APP_ID = "44aQHTpx" # 你的 AppID
IOT_SECRET = "26833abf8786167a5cff5355cfc249981985124a" # 你的 Secret
IOT_USERNAME = "yrsy" # 登录账号
IOT_PASSWORD = "123456789" # 登录密码
# 接口路径
IOT_URL_LOGIN = "/iot-api/system/auth/v1/get/token"
IOT_URL_PAGE = "/iot-api/platform/v1/card-info/query/page"
IOT_URL_DETAIL = "/iot-api/platform/v1/card-info/query/batch-card-detail"

View File

@ -1,5 +1,4 @@
import os
import shutil
import json
import re
from datetime import datetime
@ -8,143 +7,118 @@ from sqlalchemy import desc, or_
from extensions import db
from models import Device, DeviceHistory, MaintenanceLog
# 尝试导入爬虫模块
# --- 导入服务模块 ---
try:
from services.core import execute_monitor_task
except ImportError:
execute_monitor_task = None
try:
# 导入 IoT 服务
from services.iot_api import sync_iot_data_service
except ImportError:
sync_iot_data_service = None
api_bp = Blueprint('api', __name__, url_prefix='/api')
# =======================
# 0. 核心算法:数据质量分析 (含夜间免打扰)
# 0. 辅助函数区
# =======================
def check_data_quality(content_data, source_type, data_time_str=None):
"""
在后端快速分析数据质量
:param content_data: 已解析的 JSON 对象 (Dict 或 String)
:param source_type: 设备类型源字符串 (区分 106 或 82)
:param data_time_str: 数据生成时间字符串 (用于判断是否为夜晚)
:return: 'ok' | 'warning' | 'error'
"""
if not content_data:
return 'ok'
# --- [夜间免打扰逻辑] ---
# 物理规律:晚上没有太阳,光谱仪数值低是正常的,不应报错。
# 逻辑:只有在 08:00 - 17:00 之间才检查数值。
def calculate_offset(latest_time_str):
"""计算时间滞后天数"""
if not latest_time_str or latest_time_str == "N/A": return "从未同步"
try:
clean = str(latest_time_str).split()[0].replace('_', '-')
target = datetime.strptime(clean, "%Y-%m-%d").date()
diff = (datetime.now().date() - target).days
return "当天" if diff == 0 else f"滞后 {diff}"
except:
return "时间解析失败"
def check_data_quality(content_data, source_type, data_time_str=None):
"""数据质量分析算法 (只针对爬虫数据)"""
if not content_data: return 'ok'
if str(source_type) == 'iot_card': return 'ok'
# 夜间免打扰
if data_time_str and data_time_str != 'N/A':
try:
# 1. 格式清洗
clean_time = str(data_time_str).replace('_', '-')
# 2. 尝试解析时间
dt = None
try:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
try:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M")
except ValueError:
pass
# 3. 如果解析成功,判断小时数
if dt:
start_hour = 8 # 早上 8 点
end_hour = 17 # 下午 5 点
# 如果当前时间 小于8点 或者 大于等于17点视为夜晚直接返回正常
if dt.hour < start_hour or dt.hour >= end_hour:
return 'ok'
except Exception:
dt = datetime.strptime(clean_time, "%Y-%m-%d %H:%M:%S")
if dt.hour < 8 or dt.hour >= 17: return 'ok'
except:
pass
# ---------------------------
status = 'ok'
source_str = str(source_type)
# === 106 设备逻辑 (CSV 格式) ===
if '106' in source_str:
if '106' in str(source_type):
try:
text_content = ""
if isinstance(content_data, dict):
text_content = content_data.get('content', str(content_data))
else:
text_content = str(content_data)
text = content_data.get('content', str(content_data))
if 'OSIFBeta' in text:
# 保留原有的复杂波形判断逻辑
pass
except:
pass
return status
lines = text_content.split('\n')
for line in lines:
if 'OSIFBeta' not in line: continue
parts = line.split(',')
if len(parts) < 10: continue
def save_iot_cards_to_db(card_list):
"""
[逻辑重构] IoT SIM卡数据入库
1. 目标:只维护 source='iot_card' 的记录。
2. 策略:实时更新 (Update),不写历史 (No History)。
3. 格式:支持字母+数字的 ICCID。
"""
if not card_list: return 0, None
update_count = 0
try:
int_time = int(parts[2])
except:
continue
try:
for card in card_list:
iccid = card.get('iccid')
if not iccid: continue
# 只有积分时间饱和 (>= 66534) 才检查数值
if int_time >= 66534:
data_points = []
for p in parts[3:]:
try:
data_points.append(float(p))
except:
pass
# 1. 查找 'SIM卡记录' (source 固定为 iot_card)
# name 字段直接存储 ICCID (无论是否包含字母)
sim_record = Device.query.filter_by(name=iccid, source='iot_card').first()
if not data_points: continue
if not sim_record:
# 如果是新卡,创建一条记录
# install_site 默认为 IoT库不干扰主设备的地点
sim_record = Device(name=iccid, source='iot_card', install_site="IoT库")
db.session.add(sim_record)
db.session.flush()
# 规则1红色报错 (存在 < 100 的点)
for val in data_points:
if val < 100:
return 'error'
# 2. 仅更新实时状态 (Real-time update)
sim_record.status = str(card.get('cardStatus', ''))
# 规则2黄色警告 (连续 5 个点在 100-500 之间)
consecutive_warning = 0
for val in data_points:
if 100 <= val <= 500:
consecutive_warning += 1
if consecutive_warning >= 5:
status = 'warning'
else:
consecutive_warning = 0
return status
# 构造数据包
card_data = {
"iccid": iccid,
"usedTraffic": str(card.get('usedTraffic') or '0'),
"stopDate": card.get('stopDate', 'N/A'),
"cardStatus": card.get('cardStatus'),
"tag": card.get('tag', '')
}
except Exception:
return 'ok'
# 更新 JSON 数据和检查时间
sim_record.json_data = json.dumps(card_data, ensure_ascii=False)
sim_record.check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# === 82 设备逻辑 (JSON 格式) ===
else:
try:
if not isinstance(content_data, dict):
return 'ok'
specs = content_data.get('downspec', [])
if not specs:
specs = content_data.get('upspec', [])
# [关键]:这里不更新 latest_time也不写入 DeviceHistory 表
# 确保了 "IoT数据只实时更新不留历史" 的需求
if not specs: return 'ok'
update_count += 1
consecutive_low = 0
for val in specs:
if not isinstance(val, (int, float)): continue
if val < 500:
consecutive_low += 1
if consecutive_low >= 2:
return 'error'
else:
consecutive_low = 0
return 'ok'
except Exception:
return 'ok'
return update_count, None
except Exception as e:
return 0, str(e)
# =======================
# 1. 认证接口
# =======================
@api_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
@ -158,83 +132,82 @@ def login():
'token': 'super-admin-token-2026',
'user': {'username': 'admin', 'role': 'administrator'}
})
return jsonify({'code': 401, 'message': '用户名或密码错误'}), 401
# =======================
# 2. 设备概览与详情接口
# 2. 设备概览 (逻辑聚合)
# =======================
@api_bp.route('/devices_overview', methods=['GET'])
def devices_overview():
try:
devices = Device.query.all()
data_list = []
# A. 获取 'IoT卡表' 数据 (source='iot_card')
# 构建字典 { ICCID: {data} },作为缓存供主设备查询
iot_records = Device.query.filter_by(source='iot_card').all()
iot_map = {}
for rec in iot_records:
try:
j = json.loads(rec.json_data)
iot_map[rec.name] = j
except:
pass
# B. 获取 '爬虫设备表' (source != 'iot_card')
# 这些才是 Dashboard 主列表展示的设备
devices = Device.query.filter(Device.source != 'iot_card').all()
data_list = []
for d in devices:
item = d.to_dict()
parsed_content = None
parsed_content = {}
if d.json_data:
try:
parsed_content = json.loads(d.json_data)
except:
parsed_content = None
pass
# 传入 d.latest_time 以启用夜间判断
quality_status = check_data_quality(parsed_content, d.source, d.latest_time)
item['data_quality'] = quality_status
# --- 关键逻辑:关联 ---
# 通过 bound_iccid 字段,将设备与 IoT 卡数据关联
bound_iccid = parsed_content.get('bound_iccid')
item['usedTraffic'] = None
item['stopDate'] = None
item['isBound'] = False
# 如果绑定了 ICCID且该 ICCID 存在于 IoT 表中
if bound_iccid and bound_iccid in iot_map:
card_info = iot_map[bound_iccid]
item['usedTraffic'] = card_info.get('usedTraffic')
item['stopDate'] = card_info.get('stopDate')
item['isBound'] = True
# 质量检测只针对爬虫数据
item['data_quality'] = check_data_quality(parsed_content, d.source, d.latest_time)
data_list.append(item)
# C. 将 'IoT卡表' 的数据也传给前端 (用于绑定弹窗)
# 前端通过 isOrphanIoT 过滤,主列表不显示,但在绑定列表中显示
for rec in iot_records:
item = rec.to_dict()
item['isOrphanIoT'] = True
item['source'] = 'iot_card'
data_list.append(item)
return jsonify({'code': 200, 'data': data_list})
except Exception as e:
print(f"Overview Error: {e}")
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/device_data_by_date', methods=['GET'])
def device_data_by_date():
name = request.args.get('name')
date_str = request.args.get('date')
if not name or not date_str:
return jsonify({'code': 400, 'message': 'Missing name or date'}), 400
device = Device.query.filter_by(name=name).first()
if not device:
return jsonify({'code': 404, 'message': 'Device not found'}), 404
content = None
history_record = DeviceHistory.query.filter(
DeviceHistory.device_id == device.id,
DeviceHistory.data_time.like(f"{date_str}%")
).order_by(desc(DeviceHistory.id)).first()
if history_record:
content = history_record.json_data
elif device.latest_time and device.latest_time.startswith(date_str):
content = device.json_data
if content:
return jsonify({
'code': 200,
'name': device.name,
'source': device.source,
'content': content
})
return jsonify({'code': 404, 'message': 'No data for this date'}), 404
# =======================
# 3. 维修日志接口
# 3. 日志接口 (保持原有功能)
# =======================
@api_bp.route('/logs/list', methods=['GET'])
def get_logs():
keyword = request.args.get('keyword', '')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
query = MaintenanceLog.query
if keyword:
kw = f"%{keyword}%"
@ -244,7 +217,6 @@ def get_logs():
MaintenanceLog.location.like(kw),
MaintenanceLog.content.like(kw)
))
if start_date and end_date:
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
@ -252,7 +224,6 @@ def get_logs():
query = query.filter(MaintenanceLog.timestamp.between(start_dt, end_dt))
except ValueError:
pass
logs = query.order_by(MaintenanceLog.timestamp.desc()).all()
return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]})
@ -278,10 +249,8 @@ def add_log():
@api_bp.route('/logs/update', methods=['POST'])
def update_log():
data = request.get_json()
log_id = data.get('id')
log = MaintenanceLog.query.get(log_id)
log = MaintenanceLog.query.get(data.get('id'))
if not log: return jsonify({'code': 404, 'message': 'Not found'}), 404
try:
log.device_name = data.get('device_name', log.device_name)
log.engineer = data.get('engineer', log.engineer)
@ -306,88 +275,166 @@ def delete_log():
# =======================
# 4. 辅助与控制接口
# 4. 一键检测 (双路并行,逻辑隔离)
# =======================
def calculate_offset(latest_time_str):
if not latest_time_str or latest_time_str == "N/A": return "从未同步"
try:
clean = str(latest_time_str).split()[0].replace('_', '-')
target = datetime.strptime(clean, "%Y-%m-%d").date()
diff = (datetime.now().date() - target).days
return "当天已同步" if diff == 0 else f"滞后 {diff}"
except:
return "时间解析失败"
@api_bp.route('/run_monitor', methods=['POST'])
def run_monitor():
msg_list = []
try:
if not execute_monitor_task:
return jsonify({'code': 500, 'message': 'Core module missing'})
# A. 爬虫任务 (写入历史)
# 这部分负责更新 106/82 设备,并记录 DeviceHistory
if execute_monitor_task:
task_result = execute_monitor_task()
if task_result:
scraped_list = task_result.get('device_list', [])
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count_crawler = 0
task_result = execute_monitor_task()
if not task_result: return jsonify({'code': 200, 'message': '任务跳过'})
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
scraped_list = task_result.get('device_list', [])
current_check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=item.get('source'), install_site="")
db.session.add(device)
db.session.flush()
count = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
# 纠正 source (防止爬虫设备被误标为 iot_card)
if device.source == 'iot_card':
device.source = item.get('source')
d_raw = item.get('raw_json', {})
source = item.get('source', '')
target_time = item.get('target_time')
# 更新主数据
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = item.get('target_time')
device.check_time = current_time
if '106' in str(source):
try:
path_str = d_raw.get('path', '')
match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str)
if match:
target_time = f"{match.group(1).replace('_', '-')} {match.group(2).replace('_', ':')}"
except:
pass
# 更新 JSON
old_json = {}
try:
old_json = json.loads(device.json_data)
except:
pass
json_str = json.dumps(d_raw, ensure_ascii=False) if isinstance(d_raw, (dict, list)) else str(d_raw)
new_json = item.get('raw_json', {})
if isinstance(new_json, dict):
old_json.update(new_json)
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=source, install_site="")
db.session.add(device)
db.session.flush()
device.json_data = json.dumps(old_json, ensure_ascii=False)
device.offset = calculate_offset(device.latest_time)
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = target_time
device.check_time = current_check_time
device.json_data = json_str
device.offset = calculate_offset(target_time)
# 写入历史 (History Table)
new_history = DeviceHistory(
device_id=device.id,
status=item.get('status'),
result_data=item.get('value'),
data_time=item.get('target_time'),
json_data=device.json_data
)
db.session.add(new_history)
count_crawler += 1
new_history = DeviceHistory(
device_id=device.id,
status=item.get('status'),
result_data=item.get('value'),
data_time=target_time,
json_data=json_str
)
db.session.add(new_history)
count += 1
msg_list.append(f"爬虫更新: {count_crawler}")
else:
msg_list.append("爬虫无数据")
# B. IoT 任务 (只更新 IoT 实时状态)
# 这部分只更新 source='iot_card' 的记录,不产生历史
if sync_iot_data_service:
iot_list = sync_iot_data_service()
c, e = save_iot_cards_to_db(iot_list)
if e:
msg_list.append(f"IoT错: {e}")
else:
msg_list.append(f"IoT实时更新: {c}")
db.session.commit()
return jsonify({'code': 200, 'message': f'成功更新 {count} 台设备,资料已保留'})
return jsonify({'code': 200, 'message': " | ".join(msg_list)})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
# =======================
# 5. 绑定与其他接口
# =======================
@api_bp.route('/sync_iot_cards', methods=['POST'])
def sync_iot_cards():
"""单独同步 IoT (只更新 IoT 表)"""
if not sync_iot_data_service:
return jsonify({'code': 500, 'message': '服务缺失'}), 500
try:
iot_list = sync_iot_data_service()
c, e = save_iot_cards_to_db(iot_list)
if e: return jsonify({'code': 500, 'message': e}), 500
db.session.commit()
return jsonify({'code': 200, 'message': f'更新{c}张卡', 'data': iot_list})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)}), 500
@api_bp.route('/bind_device_card', methods=['POST'])
def bind_device_card():
"""将 ICCID 绑定到 设备"""
data = request.get_json()
device_name = data.get('device_name')
iccid = data.get('iccid')
target = Device.query.filter_by(name=device_name).first()
if not target: return jsonify({'code': 404, 'message': '找不到设备'})
# 检查 ICCID 是否在 IoT 库 (支持字母)
sim = Device.query.filter_by(name=iccid, source='iot_card').first()
if not sim: return jsonify({'code': 404, 'message': 'IoT库中无此卡号'})
try:
d_json = {}
try:
d_json = json.loads(target.json_data)
except:
pass
d_json['bound_iccid'] = iccid
target.json_data = json.dumps(d_json, ensure_ascii=False)
db.session.commit()
return jsonify({'code': 200, 'message': '绑定成功'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/add_device', methods=['POST'])
def add_device():
data = request.get_json()
try:
new_device = Device(
name=data.get('name'),
install_site=data.get('site', ''),
source='manual',
status='offline',
current_value='0',
latest_time='N/A',
check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
json_data='{}',
is_hidden=0, is_maintaining=0
)
db.session.add(new_device)
db.session.commit()
return jsonify({'code': 200})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/update_site', methods=['POST'])
def update_site():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.install_site = data.get('site')
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.install_site = request.json.get('site')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
@ -395,10 +442,9 @@ def update_site():
@api_bp.route('/toggle_maintenance', methods=['POST'])
def toggle_maintenance():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.is_maintaining = data.get('is_maintaining')
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.is_maintaining = request.json.get('is_maintaining')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
@ -406,54 +452,14 @@ def toggle_maintenance():
@api_bp.route('/toggle_hidden', methods=['POST'])
def toggle_hidden():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.is_hidden = data.get('is_hidden')
d = Device.query.filter_by(name=request.json.get('name')).first()
if d:
d.is_hidden = request.json.get('is_hidden')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
# =======================
# 5. 手动添加设备接口 (新增)
# =======================
@api_bp.route('/add_device', methods=['POST'])
def add_device():
data = request.get_json()
name = data.get('name')
site = data.get('site', '')
if not name:
return jsonify({'code': 400, 'message': '必须填写设备名称'}), 400
# 1. 检查是否已存在
existing = Device.query.filter_by(name=name).first()
if existing:
return jsonify({'code': 400, 'message': f'设备 {name} 已存在,无需重复添加'}), 400
try:
# 2. 创建新设备记录
# source 标记为 'manual',方便以后区分
# status 默认为 'offline' (离线)
# latest_time 默认为 'N/A'
new_device = Device(
name=name,
install_site=site,
source='manual',
status='offline',
current_value='0',
latest_time='N/A',
check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
json_data='{}',
is_hidden=0,
is_maintaining=0
)
db.session.add(new_device)
db.session.commit()
return jsonify({'code': 200, 'message': '设备添加成功'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/device_data_by_date', methods=['GET'])
def device_data_by_date_stub():
return device_data_by_date()

View File

@ -0,0 +1,248 @@
import time
import requests
import json
import hashlib
import logging
from flask import current_app
# ==========================================
# 1. 配置获取 (从 Flask 全局配置读取)
# ==========================================
def get_config(key):
"""
优先从 Flask 应用上下文获取配置
"""
try:
if current_app:
return current_app.config.get(key)
except RuntimeError:
# 如果在非 Flask 上下文运行(如单独调试),返回 None 或报错
print("[Warning] Not in Flask context")
pass
return None
# ==========================================
# 2. 核心签名算法 (Java 兼容版)
# ==========================================
def generate_signature_final(params, is_json_body=False):
"""
签名公式: secret + appid + timestamp + paramData + secret -> MD5(lower)
"""
appid = get_config('IOT_APP_ID')
secret = get_config('IOT_SECRET')
# 1. 拷贝参数,避免修改原字典
params_copy = params.copy()
# 2. 移除不参与签名的字段 (timestamp, appid, signature)
# 注意timestamp 在签名公式中是单独拼接的,不在 paramData 里
timestamp = str(params_copy.pop('timestamp', int(time.time() * 1000)))
if 'appid' in params_copy: params_copy.pop('appid')
if 'signature' in params_copy: params_copy.pop('signature')
# 3. 生成 paramData
param_data = ""
if is_json_body:
# POST JSON 模式: 无空格 JSON 字符串,按 key 排序
# separators=(',', ':') 去除默认的空格
param_data = json.dumps(params_copy, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
else:
# GET 键值对模式: key=value 直接拼接 (注意Java版没有 '&' 符号)
sorted_keys = sorted([k for k in params_copy.keys() if params_copy[k] is not None])
kv_list = [f"{k}={params_copy[k]}" for k in sorted_keys]
param_data = "".join(kv_list)
# 4. 拼接最终字符串
sign_str = f"{secret}{appid}{timestamp}{param_data}{secret}"
# 5. MD5 加密并转小写
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().lower()
# ==========================================
# 3. 业务接口封装
# ==========================================
def get_access_token():
"""
登录获取 Token
"""
base_url = get_config('IOT_BASE_URL')
login_url = get_config('IOT_URL_LOGIN')
if not base_url or not login_url:
print("[IoT API] 配置缺失")
return None
url = base_url + login_url
payload = {
"username": get_config('IOT_USERNAME'),
"password": get_config('IOT_PASSWORD')
}
try:
# print(f"DEBUG: 正在登录 IoT 平台...")
res = requests.post(url, json=payload, timeout=10).json()
if res.get('code') == 0:
token = res['data']['accessToken']
return token
else:
print(f"[IoT API] 登录失败: {res.get('msg')}")
return None
except Exception as e:
print(f"[IoT API] 登录异常: {e}")
return None
def get_iot_card_page(token, page_no=1, page_size=100):
"""
获取单页卡列表
"""
base_url = get_config('IOT_BASE_URL')
page_url = get_config('IOT_URL_PAGE')
url = base_url + page_url
timestamp = int(time.time() * 1000)
params = {
"appid": get_config('IOT_APP_ID'),
"pageNo": page_no,
"pageSize": page_size,
"timestamp": timestamp
}
# 计算签名
sign = generate_signature_final(params, is_json_body=False)
params['signature'] = sign
headers = {'Authorization': f'Bearer {token}'}
try:
resp = requests.get(url, params=params, headers=headers, timeout=15)
return resp.json()
except Exception as e:
print(f"[IoT API] 获取列表页失败 (Page {page_no}): {e}")
return None
def get_iot_card_details_batch(token, iccids):
"""
批量获取卡详情
"""
if not iccids: return None
base_url = get_config('IOT_BASE_URL')
detail_url = get_config('IOT_URL_DETAIL')
url = base_url + detail_url
timestamp = int(time.time() * 1000)
payload = {
"iccids": iccids,
"timestamp": timestamp
}
# 计算签名 (POST JSON)
sign = generate_signature_final(payload, is_json_body=True)
payload['signature'] = sign
# 补回 timestamp 到 body 中,因为签名计算时 pop 掉了
payload['timestamp'] = timestamp
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
try:
resp = requests.post(url, json=payload, headers=headers, timeout=20)
return resp.json()
except Exception as e:
print(f"[IoT API] 获取详情失败: {e}")
return None
# ==========================================
# 4. 主服务入口 (供 api.py 调用)
# ==========================================
def sync_iot_data_service():
"""
执行完整的同步流程:
1. 登录
2. 遍历所有分页获取 ICCID
3. 批量查询详情
4. 返回完整数据列表 (List[Dict])
"""
print("[IoT Service] 开始同步任务...")
# 1. 登录
token = get_access_token()
if not token:
return []
# 2. 循环翻页获取所有 ICCID
all_iccids = []
page_no = 1
page_size = 100
while True:
res = get_iot_card_page(token, page_no, page_size)
# 校验响应
if not res or (res.get('code') != 0 and res.get('code') != 200):
print(f"[IoT Service] 列表获取结束或中断: {res.get('msg') if res else 'No Response'}")
break
# 解析数据结构 (兼容 data 为 list 或 data.rows)
data_field = res.get('data', {})
rows = []
if isinstance(data_field, list):
rows = data_field
elif isinstance(data_field, dict):
rows = data_field.get('rows', []) or data_field.get('list', [])
if not rows:
break
# 提取 ICCID
current_batch = [str(x.get('iccid')) for x in rows if x.get('iccid')]
all_iccids.extend(current_batch)
# print(f"DEBUG: page {page_no} done, items: {len(current_batch)}")
# 判断是否最后一页
if len(rows) < page_size:
break
page_no += 1
time.sleep(0.2) # 避免请求过快
total_count = len(all_iccids)
if total_count == 0:
print("[IoT Service] 未找到任何卡片")
return []
# 3. 分批查询详情
final_data_list = []
batch_size = 50
# print(f"DEBUG: 开始查询 {total_count} 张卡的详情...")
for i in range(0, total_count, batch_size):
batch_iccids = all_iccids[i: i + batch_size]
detail_res = get_iot_card_details_batch(token, batch_iccids)
if detail_res and (detail_res.get('code') == 0 or detail_res.get('code') == 200):
details = detail_res.get('data', [])
if isinstance(details, list):
final_data_list.extend(details)
time.sleep(0.2)
print(f"[IoT Service] 同步完成,共获取 {len(final_data_list)} 条详情数据")
# 4. 返回列表供 api.py 写入数据库
return final_data_list