Files
ZDXX/2_1banben/services/iot_api.py

260 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import requests
import json
import hashlib
import logging
from flask import current_app
# ==========================================
# 1. 配置获取 (从 Flask 全局配置读取)
# ==========================================
def get_config(key):
"""
优先从 Flask 应用上下文获取配置
"""
try:
if current_app:
return current_app.config.get(key)
except RuntimeError:
# 如果在非 Flask 上下文运行(如单独调试),返回 None 或报错
print("[Warning] Not in Flask context")
pass
return None
# ==========================================
# 2. 核心签名算法 (Java 兼容版)
# ==========================================
def generate_signature_final(params, is_json_body=False):
"""
签名公式: secret + appid + timestamp + paramData + secret -> MD5(lower)
"""
appid = get_config('IOT_APP_ID')
secret = get_config('IOT_SECRET')
# 1. 拷贝参数,避免修改原字典
params_copy = params.copy()
# 2. 移除不参与签名的字段 (timestamp, appid, signature)
# 注意timestamp 在签名公式中是单独拼接的,不在 paramData 里
timestamp = str(params_copy.pop('timestamp', int(time.time() * 1000)))
if 'appid' in params_copy: params_copy.pop('appid')
if 'signature' in params_copy: params_copy.pop('signature')
# 3. 生成 paramData
param_data = ""
if is_json_body:
# POST JSON 模式: 无空格 JSON 字符串,按 key 排序
# separators=(',', ':') 去除默认的空格
param_data = json.dumps(params_copy, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
else:
# GET 键值对模式: key=value 直接拼接 (注意Java版没有 '&' 符号)
sorted_keys = sorted([k for k in params_copy.keys() if params_copy[k] is not None])
kv_list = [f"{k}={params_copy[k]}" for k in sorted_keys]
param_data = "".join(kv_list)
# 4. 拼接最终字符串
sign_str = f"{secret}{appid}{timestamp}{param_data}{secret}"
# 5. MD5 加密并转小写
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().lower()
# ==========================================
# 3. 业务接口封装
# ==========================================
def get_access_token():
"""
登录获取 Token
"""
base_url = get_config('IOT_BASE_URL')
login_url = get_config('IOT_URL_LOGIN')
if not base_url or not login_url:
print("[IoT API] 配置缺失")
return None
url = base_url + login_url
payload = {
"username": get_config('IOT_USERNAME'),
"password": get_config('IOT_PASSWORD')
}
try:
# print(f"DEBUG: 正在登录 IoT 平台...")
res = requests.post(url, json=payload, timeout=10).json()
if res.get('code') == 0:
token = res['data']['accessToken']
return token
else:
print(f"[IoT API] 登录失败: {res.get('msg')}")
return None
except Exception as e:
print(f"[IoT API] 登录异常: {e}")
return None
def get_iot_card_page(token, page_no=1, page_size=100):
"""
获取单页卡列表
"""
base_url = get_config('IOT_BASE_URL')
page_url = get_config('IOT_URL_PAGE')
url = base_url + page_url
timestamp = int(time.time() * 1000)
params = {
"appid": get_config('IOT_APP_ID'),
"pageNo": page_no,
"pageSize": page_size,
"timestamp": timestamp
}
# 计算签名
sign = generate_signature_final(params, is_json_body=False)
params['signature'] = sign
headers = {'Authorization': f'Bearer {token}'}
try:
resp = requests.get(url, params=params, headers=headers, timeout=15)
return resp.json()
except Exception as e:
print(f"[IoT API] 获取列表页失败 (Page {page_no}): {e}")
return None
def get_iot_card_details_batch(token, iccids):
"""
批量获取卡详情
"""
if not iccids: return None
base_url = get_config('IOT_BASE_URL')
detail_url = get_config('IOT_URL_DETAIL')
url = base_url + detail_url
timestamp = int(time.time() * 1000)
payload = {
"iccids": iccids,
"timestamp": timestamp
}
# 计算签名 (POST JSON)
sign = generate_signature_final(payload, is_json_body=True)
payload['signature'] = sign
# 补回 timestamp 到 body 中,因为签名计算时 pop 掉了
payload['timestamp'] = timestamp
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
try:
resp = requests.post(url, json=payload, headers=headers, timeout=20)
return resp.json()
except Exception as e:
print(f"[IoT API] 获取详情失败: {e}")
return None
# ==========================================
# 4. 主服务入口 (供 api.py 调用)
# ==========================================
def sync_iot_data_service():
"""
执行完整的同步流程:
1. 登录
2. 遍历所有分页获取 ICCID
3. 批量查询详情
4. 解析 cardStatus 状态码
5. 返回完整数据列表 (List[Dict])
"""
print("[IoT Service] 开始同步任务...")
# ✅ 1. 定义状态码映射表 (根据提供的需求文档)
STATUS_MAP = {
"1": "测试期",
"2": "沉默期",
"3": "在使用",
"4": "停机",
"5": "停机保号",
"6": "销户"
}
token = get_access_token()
if not token:
return []
all_iccids = []
page_no = 1
page_size = 100
# ✅ 2. 循环翻页获取所有 ICCID
while True:
res = get_iot_card_page(token, page_no, page_size)
if not res or (res.get('code') != 0 and res.get('code') != 200):
print(f"[IoT Service] 列表获取结束或中断: {res.get('msg') if res else 'No Response'}")
break
data_field = res.get('data', {})
rows = []
if isinstance(data_field, list):
rows = data_field
elif isinstance(data_field, dict):
rows = data_field.get('rows', []) or data_field.get('list', [])
if not rows:
break
current_batch = [str(x.get('iccid')) for x in rows if x.get('iccid')]
all_iccids.extend(current_batch)
if len(rows) < page_size:
break
page_no += 1
time.sleep(0.2)
total_count = len(all_iccids)
if total_count == 0:
print("[IoT Service] 未找到任何卡片")
return []
# ✅ 3. 分批查询详情并处理状态
final_data_list = []
batch_size = 50
for i in range(0, total_count, batch_size):
batch_iccids = all_iccids[i: i + batch_size]
detail_res = get_iot_card_details_batch(token, batch_iccids)
if detail_res and (detail_res.get('code') == 0 or detail_res.get('code') == 200):
details = detail_res.get('data', [])
if isinstance(details, list):
# === 核心修改:增加状态解析逻辑 ===
for card in details:
# 获取原始状态码 (如 "3")
raw_status = str(card.get('cardStatus', ''))
# 匹配中文描述 (如 "在使用")
status_desc = STATUS_MAP.get(raw_status, "未知状态")
# 将描述写入新字段,前端可直接取用 card.statusDesc
card['statusDesc'] = status_desc
final_data_list.append(card)
# =================================
time.sleep(0.2)
print(f"[IoT Service] 同步完成,共获取 {len(final_data_list)} 条详情数据")
return final_data_list