import time import requests import json import hashlib import logging from flask import current_app # ========================================== # 1. 配置获取 (从 Flask 全局配置读取) # ========================================== def get_config(key): """ 优先从 Flask 应用上下文获取配置 """ try: if current_app: return current_app.config.get(key) except RuntimeError: # 如果在非 Flask 上下文运行(如单独调试),返回 None 或报错 print("[Warning] Not in Flask context") pass return None # ========================================== # 2. 核心签名算法 (Java 兼容版) # ========================================== def generate_signature_final(params, is_json_body=False): """ 签名公式: secret + appid + timestamp + paramData + secret -> MD5(lower) """ appid = get_config('IOT_APP_ID') secret = get_config('IOT_SECRET') # 1. 拷贝参数,避免修改原字典 params_copy = params.copy() # 2. 移除不参与签名的字段 (timestamp, appid, signature) # 注意:timestamp 在签名公式中是单独拼接的,不在 paramData 里 timestamp = str(params_copy.pop('timestamp', int(time.time() * 1000))) if 'appid' in params_copy: params_copy.pop('appid') if 'signature' in params_copy: params_copy.pop('signature') # 3. 生成 paramData param_data = "" if is_json_body: # POST JSON 模式: 无空格 JSON 字符串,按 key 排序 # separators=(',', ':') 去除默认的空格 param_data = json.dumps(params_copy, sort_keys=True, separators=(',', ':'), ensure_ascii=False) else: # GET 键值对模式: key=value 直接拼接 (注意:Java版没有 '&' 符号) sorted_keys = sorted([k for k in params_copy.keys() if params_copy[k] is not None]) kv_list = [f"{k}={params_copy[k]}" for k in sorted_keys] param_data = "".join(kv_list) # 4. 拼接最终字符串 sign_str = f"{secret}{appid}{timestamp}{param_data}{secret}" # 5. MD5 加密并转小写 return hashlib.md5(sign_str.encode('utf-8')).hexdigest().lower() # ========================================== # 3. 业务接口封装 # ========================================== def get_access_token(): """ 登录获取 Token """ base_url = get_config('IOT_BASE_URL') login_url = get_config('IOT_URL_LOGIN') if not base_url or not login_url: print("[IoT API] 配置缺失") return None url = base_url + login_url payload = { "username": get_config('IOT_USERNAME'), "password": get_config('IOT_PASSWORD') } try: # print(f"DEBUG: 正在登录 IoT 平台...") res = requests.post(url, json=payload, timeout=10).json() if res.get('code') == 0: token = res['data']['accessToken'] return token else: print(f"[IoT API] 登录失败: {res.get('msg')}") return None except Exception as e: print(f"[IoT API] 登录异常: {e}") return None def get_iot_card_page(token, page_no=1, page_size=100): """ 获取单页卡列表 """ base_url = get_config('IOT_BASE_URL') page_url = get_config('IOT_URL_PAGE') url = base_url + page_url timestamp = int(time.time() * 1000) params = { "appid": get_config('IOT_APP_ID'), "pageNo": page_no, "pageSize": page_size, "timestamp": timestamp } # 计算签名 sign = generate_signature_final(params, is_json_body=False) params['signature'] = sign headers = {'Authorization': f'Bearer {token}'} try: resp = requests.get(url, params=params, headers=headers, timeout=15) return resp.json() except Exception as e: print(f"[IoT API] 获取列表页失败 (Page {page_no}): {e}") return None def get_iot_card_details_batch(token, iccids): """ 批量获取卡详情 """ if not iccids: return None base_url = get_config('IOT_BASE_URL') detail_url = get_config('IOT_URL_DETAIL') url = base_url + detail_url timestamp = int(time.time() * 1000) payload = { "iccids": iccids, "timestamp": timestamp } # 计算签名 (POST JSON) sign = generate_signature_final(payload, is_json_body=True) payload['signature'] = sign # 补回 timestamp 到 body 中,因为签名计算时 pop 掉了 payload['timestamp'] = timestamp headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } try: resp = requests.post(url, json=payload, headers=headers, timeout=20) return resp.json() except Exception as e: print(f"[IoT API] 获取详情失败: {e}") return None # ========================================== # 4. 主服务入口 (供 api.py 调用) # ========================================== def sync_iot_data_service(): """ 执行完整的同步流程: 1. 登录 2. 遍历所有分页获取 ICCID 3. 批量查询详情 4. 返回完整数据列表 (List[Dict]) """ print("[IoT Service] 开始同步任务...") # 1. 登录 token = get_access_token() if not token: return [] # 2. 循环翻页获取所有 ICCID all_iccids = [] page_no = 1 page_size = 100 while True: res = get_iot_card_page(token, page_no, page_size) # 校验响应 if not res or (res.get('code') != 0 and res.get('code') != 200): print(f"[IoT Service] 列表获取结束或中断: {res.get('msg') if res else 'No Response'}") break # 解析数据结构 (兼容 data 为 list 或 data.rows) data_field = res.get('data', {}) rows = [] if isinstance(data_field, list): rows = data_field elif isinstance(data_field, dict): rows = data_field.get('rows', []) or data_field.get('list', []) if not rows: break # 提取 ICCID current_batch = [str(x.get('iccid')) for x in rows if x.get('iccid')] all_iccids.extend(current_batch) # print(f"DEBUG: page {page_no} done, items: {len(current_batch)}") # 判断是否最后一页 if len(rows) < page_size: break page_no += 1 time.sleep(0.2) # 避免请求过快 total_count = len(all_iccids) if total_count == 0: print("[IoT Service] 未找到任何卡片") return [] # 3. 分批查询详情 final_data_list = [] batch_size = 50 # print(f"DEBUG: 开始查询 {total_count} 张卡的详情...") for i in range(0, total_count, batch_size): batch_iccids = all_iccids[i: i + batch_size] detail_res = get_iot_card_details_batch(token, batch_iccids) if detail_res and (detail_res.get('code') == 0 or detail_res.get('code') == 200): details = detail_res.get('data', []) if isinstance(details, list): final_data_list.extend(details) time.sleep(0.2) print(f"[IoT Service] 同步完成,共获取 {len(final_data_list)} 条详情数据") # 4. 返回列表供 api.py 写入数据库 return final_data_list