From d94f8e1b907c3dbe00fd80249949b73f5ce79ccd Mon Sep 17 00:00:00 2001 From: DXC Date: Thu, 22 Jan 2026 14:30:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=88=A4=E6=96=AD=E6=9D=A1?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=87=BA=E5=85=A5=E5=BA=93?= =?UTF-8?q?=E5=8D=95=EF=BC=8C=E8=BF=9B=E8=B4=A7=E5=8D=95=E7=AD=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 导出数据.py | 219 ++++++++++++++++++++++++++++-------------------- 1 file changed, 129 insertions(+), 90 deletions(-) diff --git a/导出数据.py b/导出数据.py index 6a8ebd4..d045855 100644 --- a/导出数据.py +++ b/导出数据.py @@ -6,6 +6,7 @@ import os import pandas as pd from concurrent.futures import ThreadPoolExecutor, as_completed from requests.adapters import HTTPAdapter +import threading # ================= 配置区域 ================= BASE_URL = "http://111.198.24.44:88/index.php" @@ -13,46 +14,58 @@ USERNAME = "TEST" PASSWORD = "test" # <--- 请在此填入真实密码 # --- 调试配置 --- -# True: 开启调试模式,只获取前 200 条数据进行测试 -# False: 关闭调试模式,处理所有数据 (2万条+) +# True: 开启调试模式,只处理前 200 条 +# False: 关闭调试模式,跑全量 DEBUG_MODE = False -DEBUG_LIMIT = 1000 +DEBUG_LIMIT = 200 + +# --- 并发配置 --- +MAX_WORKERS = 10 # --- 文件配置 --- -TEMPLATE_FILE = "产品-导入模板.csv" # 你的 CSV 模板文件 -OUTPUT_FILE = "最终导出数据.xlsx" # 生成的 Excel 文件 -MAX_WORKERS = 10 # 并发线程数 - +TEMPLATE_FILE = "产品-导入模板.csv" +OUTPUT_FILE = "最终导出数据.xlsx" # =========================================== +# 统计计数器 +STATS = { + "total_processed": 0, + "skipped_no_id": 0, + "skipped_has_sales": 0, # 销量不为0 + "skipped_has_relations": 0, # 关联 Key (36/37/325/523/561) 任意一个不为0 + "skipped_has_history": 0, # 【恢复】有仓库历史记录 + "skipped_api_error": 0, + "success": 0 +} +STATS_LOCK = threading.Lock() + + class CRMFetcher: def __init__(self): self.session = requests.Session() - # 优化连接池 + # 优化连接池,防止高并发报错 adapter = HTTPAdapter(pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS) self.session.mount('http://', adapter) - self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "X-Requested-With": "XMLHttpRequest" } def login(self): - """执行登录""" print("[*] 正在登录系统...") - payload = { - "module": "Users", "action": "Authenticate", "return_module": "Users", - "return_action": "Login", "user_name": USERNAME, "user_password": PASSWORD, - "login_theme": "newskin" - } try: + payload = { + "module": "Users", "action": "Authenticate", "return_module": "Users", + "return_action": "Login", "user_name": USERNAME, "user_password": PASSWORD, + "login_theme": "newskin" + } resp = self.session.post(BASE_URL, data=payload, headers=self.headers) if "logout" in resp.text.lower() or "退出" in resp.text: print("[+] 登录成功!") return True else: - print("[-] 登录失败,请检查账号密码。") + print(f"[-] 登录失败: {resp.status_code}") return False except Exception as e: print(f"[-] 登录异常: {e}") @@ -63,92 +76,122 @@ class CRMFetcher: all_products = [] page = 1 page_size = 100 + last_page_ids = [] - print(f"\n[*] 第一阶段:开始获取产品列表 (调试模式: {'开启' if DEBUG_MODE else '关闭'})...") + print(f"\n[*] 第一阶段:开始获取产品列表 (viewname=397)...") + if DEBUG_MODE: + print(f" [提示] 调试模式开启,仅获取前 {DEBUG_LIMIT} 条。") while True: - # 调试模式限制 + # 调试限制 if DEBUG_MODE and len(all_products) >= DEBUG_LIMIT: - print(f" [调试] 已达到 {DEBUG_LIMIT} 条限制,停止获取列表。") + print(f" [调试] 已达到 {DEBUG_LIMIT} 条限制,停止获取。") all_products = all_products[:DEBUG_LIMIT] break payload = { "module": "Products", "action": "ProductsAjax", "file": "ListViewData", - "start": str(page), "pagesize": str(page_size), - "isFilter": "true", "search[viewname]": "28", - "filter[Fields0]": "cf_2318", "filter[Condition0]": "is", "filter[Srch_value0]": "否", - "filter[type0]": "opts", "filter[search_cnt]": "1", "filter[matchtype]": "all" + "sorder": "", "start": str(page), "order_by": "", "pagesize": str(page_size), + "actionId": "1769042712624", "isFilter": "true", "search[viewname]": "397" } try: resp = self.session.post(BASE_URL, data=payload, headers=self.headers) data = resp.json() - page_items = data.get("data", []) if isinstance(data, dict) else data - if not page_items or len(page_items) == 0: - print(f" 第 {page} 页为空,列表获取结束。") + if not page_items: + print(f" 第 {page} 页为空,结束。") break - all_products.extend(page_items) - print(f" 已获取第 {page} 页 - 总计: {len(all_products)}条") + # 死循环检测 + current_page_ids = [item.get('crmid') for item in page_items] + if current_page_ids == last_page_ids: + print(f" 第 {page} 页重复,停止。") + break + last_page_ids = current_page_ids + all_products.extend(page_items) + print(f" 已获取第 {page} 页 (本页{len(page_items)}条) - 总计: {len(all_products)}条") page += 1 time.sleep(0.2) except Exception as e: - print(f"[-] 获取第 {page} 页时出错: {e}") + print(f"[-] 获取第 {page} 页出错: {e}") break return all_products def check_single_product(self, item): """ - 核心检查逻辑 - 返回:{'产品名称': name, '产品编码': code} 如果符合条件 - 返回:None 如果不符合 + 核心筛选逻辑: + 1. 检查销量 (SalesNum) -> 必须为0 + 2. 检查关联 (Key 36, 37, 325, 523, 561) -> 必须全为0 + 3. 【恢复】检查历史 (CangkuHistory) -> 必须为空 """ - crm_id = item.get("crmid") + with STATS_LOCK: + STATS["total_processed"] += 1 + + # 1. 获取基础信息 + crm_id = item.get("crmid") or item.get("productid") raw_name = item.get("productname", "") product_code = item.get("productcode", "") - # === 优化步骤 0: 检查 salesnum (销量) === - # 获取销量,处理可能的逗号 (如 "1,000.00") 和空值 + if not crm_id: + with STATS_LOCK: STATS["skipped_no_id"] += 1 + return None + + # 2. 筛选第一步:检查销量 (必须为0) sales_str = str(item.get("salesnum", "0")).replace(",", "") try: sales_num = float(sales_str) except ValueError: sales_num = 0.0 - # 如果销量不为0,说明是“保留”产品,不需要进行后续检查,直接跳过(返回 None) - # 从而极大减少 API 请求 if sales_num != 0: - return None - - # ------------------------------------------------------- - # 下面是销量为 0 时,进行的严格验证 (验证是否为废弃/空闲数据) - # ------------------------------------------------------- - - if not crm_id: + with STATS_LOCK: STATS["skipped_has_sales"] += 1 return None try: - # === 步骤 1: 检查关联列表 (Key 36 是否为 0) === + # 3. 筛选第二步:检查关联列表 + # 获取所有关联模块的计数值 check1_params = { "module": "Users", "action": "UsersAjax", "file": "setRelatedListCount", "modulename": "Products", "record": crm_id } resp1 = self.session.post(BASE_URL, data=check1_params, headers=self.headers, timeout=10) - data1 = resp1.json() - - val_36 = data1.get("36") or data1.get(36) - - # 如果不等于0,跳过 - if str(val_36) != "0": + if not resp1.text: + with STATS_LOCK: STATS["skipped_api_error"] += 1 return None - # === 步骤 2: 检查仓库历史 (是否为空) === + data1 = resp1.json() # 拿到完整的 JSON 字典,例如 {'36': '0', '37': '5', ...} + + # === 修改核心逻辑 === + # 定义需要检查的 Key 列表 + target_keys = ["36", "37", "325", "523", "561"] + + # 只要这些 Key 中有一个值不为 "0",就直接判定为“有关联”,立即跳过 + for key in target_keys: + # 获取值(兼容 key 可能是 int 或 str 的情况) + val = data1.get(key) + if val is None: + # 尝试用 int 获取(防止 json 解析自动转 int) + try: + val = data1.get(int(key)) + except: + pass + + # 如果获取不到,默认为 0 (即 None 视为 0) + # 否则转字符串进行比较 + val_str = str(val) if val is not None else "0" + + if val_str != "0": + # 只要有一个不为0,命中规则,直接跳过,无需检查后续 Key + with STATS_LOCK: STATS["skipped_has_relations"] += 1 + return None + # === 逻辑结束 === + + # 4. 【恢复】筛选第三步:检查仓库历史 (必须为空) check2_params = { "module": "Products", "action": "ProductsAjax", "file": "getCangkuHistoryInfo", "productid": crm_id, "currpage": "1" @@ -156,13 +199,17 @@ class CRMFetcher: resp2 = self.session.post(BASE_URL, data=check2_params, headers=self.headers, timeout=10) data2 = resp2.json() + # 获取 entity -> value 列表 entity_value = data2.get("entity", {}).get("value") - # 如果有历史记录,跳过 + # 如果列表存在且长度大于0,说明有历史记录,跳过 if entity_value and len(entity_value) > 0: + with STATS_LOCK: STATS["skipped_has_history"] += 1 return None - # === 步骤 3: 所有条件满足(销量0 + 无关联 + 无历史),写入 Excel === + # === 全部通过 === + with STATS_LOCK: + STATS["success"] += 1 clean_name = re.sub(r'<[^>]+>', '', raw_name).strip() return { @@ -171,23 +218,20 @@ class CRMFetcher: } except Exception as e: - # 网络超时或其他错误,跳过 + with STATS_LOCK: + STATS["skipped_api_error"] += 1 return None def get_template_columns(filename): - """读取 CSV 模板的表头""" if not os.path.exists(filename): print(f"[-] 错误:找不到模板文件 '{filename}'") return None - try: - # 兼容 utf-8 和 gbk try: df = pd.read_csv(filename, encoding='utf-8-sig', nrows=0) except UnicodeDecodeError: df = pd.read_csv(filename, encoding='gbk', nrows=0) - return df.columns.tolist() except Exception as e: print(f"[-] 读取模板表头失败: {e}") @@ -195,17 +239,14 @@ def get_template_columns(filename): def main(): - # 1. 读取模板表头 columns = get_template_columns(TEMPLATE_FILE) if not columns: return - print(f"[*] 成功读取模板表头,目标 Excel 将包含这 {len(columns)} 列。") fetcher = CRMFetcher() if not fetcher.login(): return - # 2. 获取数据列表 all_data = fetcher.fetch_all_products() total_count = len(all_data) @@ -213,14 +254,12 @@ def main(): print("[-] 未获取到数据。") return - print(f"\n[*] 第二阶段:智能筛选 {total_count} 条数据 (利用销量数据加速)...") + print(f"\n[*] 第二阶段:并发筛选 {total_count} 条数据 (含多重关联与历史记录验证)...") valid_rows = [] processed_count = 0 - skipped_by_sales = 0 # 统计优化了多少条 start_time = time.time() - # 3. 开启线程池 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_item = {executor.submit(fetcher.check_single_product, item): item for item in all_data} @@ -228,41 +267,41 @@ def main(): processed_count += 1 result_dict = future.result() - # 这里的统计逻辑稍微模糊,因为 result_dict 为 None 可能是因为销量不为0,也可能是因为 API 检查不通过 - # 但不影响核心功能 if result_dict: row_data = {col: None for col in columns} - if "产品名称" in columns: - row_data["产品名称"] = result_dict["产品名称"] - if "产品编码" in columns: - row_data["产品编码"] = result_dict["产品编码"] + if "产品名称" in columns: row_data["产品名称"] = result_dict["产品名称"] + if "产品编码" in columns: row_data["产品编码"] = result_dict["产品编码"] valid_rows.append(row_data) - # 进度条 - if processed_count % 50 == 0 or processed_count == total_count: + if processed_count % 20 == 0 or processed_count == total_count: percent = (processed_count / total_count) * 100 - elapsed = time.time() - start_time - speed = processed_count / elapsed if elapsed > 0 else 0 + speed = processed_count / (time.time() - start_time + 0.01) print( f"\r进度: {processed_count}/{total_count} ({percent:.1f}%) - 选中: {len(valid_rows)} - 速度: {speed:.1f}条/秒", end="") - print("\n\n[*] 筛选完成!") + print("\n\n" + "=" * 40) + print(" 筛选结果统计") + print("=" * 40) + print(f"总处理条数 : {STATS['total_processed']}") + print(f"[-] 因缺失ID跳过 : {STATS['skipped_no_id']}") + print(f"[-] 因有销量跳过 : {STATS['skipped_has_sales']}") + print(f"[-] 因有关联跳过 : {STATS['skipped_has_relations']} (Key 36/37/325/523/561 != 0)") + print(f"[-] 因有历史跳过 : {STATS['skipped_has_history']} (Has History)") + print(f"[-] 因API错误跳过 : {STATS['skipped_api_error']}") + print(f"[+] 最终成功保留 : {STATS['success']}") + print("=" * 40) - # 4. 生成 Excel - try: - if not valid_rows: - print("[!] 警告:没有筛选出符合条件的数据,生成的 Excel 将为空。") - - df_output = pd.DataFrame(valid_rows, columns=columns) - print(f"[*] 正在保存为 Excel 文件 '{OUTPUT_FILE}'...") - df_output.to_excel(OUTPUT_FILE, index=False) - - print(f"[+] 成功!结果已写入 '{OUTPUT_FILE}'") - print(f"[+] 提示:请务必检查 '调试模式' (DEBUG_MODE) 是否已根据需要关闭。") - - except Exception as e: - print(f"[-] 写入 Excel 失败: {e}") + if valid_rows: + try: + df_output = pd.DataFrame(valid_rows, columns=columns) + print(f"[*] 正在写入 Excel '{OUTPUT_FILE}'...") + df_output.to_excel(OUTPUT_FILE, index=False) + print(f"[+] 成功!") + except Exception as e: + print(f"[-] 写入失败: {e}") + else: + print("[-] 没有数据被选中。") if __name__ == "__main__":