248 lines
7.2 KiB
Python
248 lines
7.2 KiB
Python
import time
|
||
import requests
|
||
import json
|
||
import hashlib
|
||
import logging
|
||
from flask import current_app
|
||
|
||
|
||
# ==========================================
|
||
# 1. 配置获取 (从 Flask 全局配置读取)
|
||
# ==========================================
|
||
def get_config(key):
|
||
"""
|
||
优先从 Flask 应用上下文获取配置
|
||
"""
|
||
try:
|
||
if current_app:
|
||
return current_app.config.get(key)
|
||
except RuntimeError:
|
||
# 如果在非 Flask 上下文运行(如单独调试),返回 None 或报错
|
||
print("[Warning] Not in Flask context")
|
||
pass
|
||
return None
|
||
|
||
|
||
# ==========================================
|
||
# 2. 核心签名算法 (Java 兼容版)
|
||
# ==========================================
|
||
def generate_signature_final(params, is_json_body=False):
|
||
"""
|
||
签名公式: secret + appid + timestamp + paramData + secret -> MD5(lower)
|
||
"""
|
||
appid = get_config('IOT_APP_ID')
|
||
secret = get_config('IOT_SECRET')
|
||
|
||
# 1. 拷贝参数,避免修改原字典
|
||
params_copy = params.copy()
|
||
|
||
# 2. 移除不参与签名的字段 (timestamp, appid, signature)
|
||
# 注意:timestamp 在签名公式中是单独拼接的,不在 paramData 里
|
||
timestamp = str(params_copy.pop('timestamp', int(time.time() * 1000)))
|
||
if 'appid' in params_copy: params_copy.pop('appid')
|
||
if 'signature' in params_copy: params_copy.pop('signature')
|
||
|
||
# 3. 生成 paramData
|
||
param_data = ""
|
||
if is_json_body:
|
||
# POST JSON 模式: 无空格 JSON 字符串,按 key 排序
|
||
# separators=(',', ':') 去除默认的空格
|
||
param_data = json.dumps(params_copy, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
|
||
else:
|
||
# GET 键值对模式: key=value 直接拼接 (注意:Java版没有 '&' 符号)
|
||
sorted_keys = sorted([k for k in params_copy.keys() if params_copy[k] is not None])
|
||
kv_list = [f"{k}={params_copy[k]}" for k in sorted_keys]
|
||
param_data = "".join(kv_list)
|
||
|
||
# 4. 拼接最终字符串
|
||
sign_str = f"{secret}{appid}{timestamp}{param_data}{secret}"
|
||
|
||
# 5. MD5 加密并转小写
|
||
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().lower()
|
||
|
||
|
||
# ==========================================
|
||
# 3. 业务接口封装
|
||
# ==========================================
|
||
|
||
def get_access_token():
|
||
"""
|
||
登录获取 Token
|
||
"""
|
||
base_url = get_config('IOT_BASE_URL')
|
||
login_url = get_config('IOT_URL_LOGIN')
|
||
|
||
if not base_url or not login_url:
|
||
print("[IoT API] 配置缺失")
|
||
return None
|
||
|
||
url = base_url + login_url
|
||
payload = {
|
||
"username": get_config('IOT_USERNAME'),
|
||
"password": get_config('IOT_PASSWORD')
|
||
}
|
||
|
||
try:
|
||
# print(f"DEBUG: 正在登录 IoT 平台...")
|
||
res = requests.post(url, json=payload, timeout=10).json()
|
||
if res.get('code') == 0:
|
||
token = res['data']['accessToken']
|
||
return token
|
||
else:
|
||
print(f"[IoT API] 登录失败: {res.get('msg')}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"[IoT API] 登录异常: {e}")
|
||
return None
|
||
|
||
|
||
def get_iot_card_page(token, page_no=1, page_size=100):
|
||
"""
|
||
获取单页卡列表
|
||
"""
|
||
base_url = get_config('IOT_BASE_URL')
|
||
page_url = get_config('IOT_URL_PAGE')
|
||
url = base_url + page_url
|
||
|
||
timestamp = int(time.time() * 1000)
|
||
|
||
params = {
|
||
"appid": get_config('IOT_APP_ID'),
|
||
"pageNo": page_no,
|
||
"pageSize": page_size,
|
||
"timestamp": timestamp
|
||
}
|
||
|
||
# 计算签名
|
||
sign = generate_signature_final(params, is_json_body=False)
|
||
params['signature'] = sign
|
||
|
||
headers = {'Authorization': f'Bearer {token}'}
|
||
|
||
try:
|
||
resp = requests.get(url, params=params, headers=headers, timeout=15)
|
||
return resp.json()
|
||
except Exception as e:
|
||
print(f"[IoT API] 获取列表页失败 (Page {page_no}): {e}")
|
||
return None
|
||
|
||
|
||
def get_iot_card_details_batch(token, iccids):
|
||
"""
|
||
批量获取卡详情
|
||
"""
|
||
if not iccids: return None
|
||
|
||
base_url = get_config('IOT_BASE_URL')
|
||
detail_url = get_config('IOT_URL_DETAIL')
|
||
url = base_url + detail_url
|
||
|
||
timestamp = int(time.time() * 1000)
|
||
|
||
payload = {
|
||
"iccids": iccids,
|
||
"timestamp": timestamp
|
||
}
|
||
|
||
# 计算签名 (POST JSON)
|
||
sign = generate_signature_final(payload, is_json_body=True)
|
||
payload['signature'] = sign
|
||
# 补回 timestamp 到 body 中,因为签名计算时 pop 掉了
|
||
payload['timestamp'] = timestamp
|
||
|
||
headers = {
|
||
'Authorization': f'Bearer {token}',
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
try:
|
||
resp = requests.post(url, json=payload, headers=headers, timeout=20)
|
||
return resp.json()
|
||
except Exception as e:
|
||
print(f"[IoT API] 获取详情失败: {e}")
|
||
return None
|
||
|
||
|
||
# ==========================================
|
||
# 4. 主服务入口 (供 api.py 调用)
|
||
# ==========================================
|
||
|
||
def sync_iot_data_service():
|
||
"""
|
||
执行完整的同步流程:
|
||
1. 登录
|
||
2. 遍历所有分页获取 ICCID
|
||
3. 批量查询详情
|
||
4. 返回完整数据列表 (List[Dict])
|
||
"""
|
||
print("[IoT Service] 开始同步任务...")
|
||
|
||
# 1. 登录
|
||
token = get_access_token()
|
||
if not token:
|
||
return []
|
||
|
||
# 2. 循环翻页获取所有 ICCID
|
||
all_iccids = []
|
||
page_no = 1
|
||
page_size = 100
|
||
|
||
while True:
|
||
res = get_iot_card_page(token, page_no, page_size)
|
||
|
||
# 校验响应
|
||
if not res or (res.get('code') != 0 and res.get('code') != 200):
|
||
print(f"[IoT Service] 列表获取结束或中断: {res.get('msg') if res else 'No Response'}")
|
||
break
|
||
|
||
# 解析数据结构 (兼容 data 为 list 或 data.rows)
|
||
data_field = res.get('data', {})
|
||
rows = []
|
||
if isinstance(data_field, list):
|
||
rows = data_field
|
||
elif isinstance(data_field, dict):
|
||
rows = data_field.get('rows', []) or data_field.get('list', [])
|
||
|
||
if not rows:
|
||
break
|
||
|
||
# 提取 ICCID
|
||
current_batch = [str(x.get('iccid')) for x in rows if x.get('iccid')]
|
||
all_iccids.extend(current_batch)
|
||
|
||
# print(f"DEBUG: page {page_no} done, items: {len(current_batch)}")
|
||
|
||
# 判断是否最后一页
|
||
if len(rows) < page_size:
|
||
break
|
||
|
||
page_no += 1
|
||
time.sleep(0.2) # 避免请求过快
|
||
|
||
total_count = len(all_iccids)
|
||
if total_count == 0:
|
||
print("[IoT Service] 未找到任何卡片")
|
||
return []
|
||
|
||
# 3. 分批查询详情
|
||
final_data_list = []
|
||
batch_size = 50
|
||
|
||
# print(f"DEBUG: 开始查询 {total_count} 张卡的详情...")
|
||
|
||
for i in range(0, total_count, batch_size):
|
||
batch_iccids = all_iccids[i: i + batch_size]
|
||
|
||
detail_res = get_iot_card_details_batch(token, batch_iccids)
|
||
|
||
if detail_res and (detail_res.get('code') == 0 or detail_res.get('code') == 200):
|
||
details = detail_res.get('data', [])
|
||
if isinstance(details, list):
|
||
final_data_list.extend(details)
|
||
|
||
time.sleep(0.2)
|
||
|
||
print(f"[IoT Service] 同步完成,共获取 {len(final_data_list)} 条详情数据")
|
||
|
||
# 4. 返回列表供 api.py 写入数据库
|
||
return final_data_list |