import requests import json import re import time import os import pandas as pd from concurrent.futures import ThreadPoolExecutor, as_completed from requests.adapters import HTTPAdapter import threading # ================= 配置区域 ================= BASE_URL = "http://111.198.24.44:88/index.php" USERNAME = "TEST" PASSWORD = "test" # <--- 请在此填入真实密码 # --- 调试配置 --- # True: 开启调试模式,只处理前 200 条 # False: 关闭调试模式,跑全量 DEBUG_MODE = False DEBUG_LIMIT = 200 # --- 并发配置 --- MAX_WORKERS = 10 # --- 文件配置 --- TEMPLATE_FILE = "产品-导入模板.csv" OUTPUT_FILE = "最终导出数据_含供应商厂家.xlsx" # =========================================== # 统计计数器 STATS = { "total_processed": 0, "skipped_no_id": 0, "skipped_has_sales": 0, # 销量不为0 "skipped_has_relations": 0, # 关联 Key (36/37/325/523/561) 任意一个不为0 "skipped_has_history": 0, # 【恢复】有仓库历史记录 "skipped_api_error": 0, "success": 0 } STATS_LOCK = threading.Lock() class CRMFetcher: def __init__(self): self.session = requests.Session() # 优化连接池,防止高并发报错 adapter = HTTPAdapter(pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS) self.session.mount('http://', adapter) self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "X-Requested-With": "XMLHttpRequest" } def login(self): print("[*] 正在登录系统...") try: payload = { "module": "Users", "action": "Authenticate", "return_module": "Users", "return_action": "Login", "user_name": USERNAME, "user_password": PASSWORD, "login_theme": "newskin" } resp = self.session.post(BASE_URL, data=payload, headers=self.headers) if "logout" in resp.text.lower() or "退出" in resp.text: print("[+] 登录成功!") return True else: print(f"[-] 登录失败: {resp.status_code}") return False except Exception as e: print(f"[-] 登录异常: {e}") return False def fetch_all_products(self): """自动翻页获取产品列表""" all_products = [] page = 1 page_size = 100 last_page_ids = [] print(f"\n[*] 第一阶段:开始获取产品列表 (viewname=397)...") if DEBUG_MODE: print(f" [提示] 调试模式开启,仅获取前 {DEBUG_LIMIT} 条。") while True: # 调试限制 if DEBUG_MODE and len(all_products) >= DEBUG_LIMIT: print(f" [调试] 已达到 {DEBUG_LIMIT} 条限制,停止获取。") all_products = all_products[:DEBUG_LIMIT] break payload = { "module": "Products", "action": "ProductsAjax", "file": "ListViewData", "sorder": "", "start": str(page), "order_by": "", "pagesize": str(page_size), "actionId": "1769042712624", "isFilter": "true", "search[viewname]": "397" } try: resp = self.session.post(BASE_URL, data=payload, headers=self.headers) data = resp.json() page_items = data.get("data", []) if isinstance(data, dict) else data if not page_items: print(f" 第 {page} 页为空,结束。") break # 死循环检测 current_page_ids = [item.get('crmid') for item in page_items] if current_page_ids == last_page_ids: print(f" 第 {page} 页重复,停止。") break last_page_ids = current_page_ids all_products.extend(page_items) print(f" 已获取第 {page} 页 (本页{len(page_items)}条) - 总计: {len(all_products)}条") page += 1 time.sleep(0.2) except Exception as e: print(f"[-] 获取第 {page} 页出错: {e}") break return all_products def check_single_product(self, item): """ 核心筛选逻辑: 1. 检查销量 (SalesNum) -> 必须为0 2. 检查关联 (Key 36, 37, 325, 523, 561) -> 必须全为0 3. 【恢复】检查历史 (CangkuHistory) -> 必须为空 """ with STATS_LOCK: STATS["total_processed"] += 1 # 1. 获取基础信息 crm_id = item.get("crmid") or item.get("productid") raw_name = item.get("productname", "") product_code = item.get("productcode", "") if not crm_id: with STATS_LOCK: STATS["skipped_no_id"] += 1 return None # 2. 筛选第一步:检查销量 (必须为0) sales_str = str(item.get("salesnum", "0")).replace(",", "") try: sales_num = float(sales_str) except ValueError: sales_num = 0.0 if sales_num != 0: with STATS_LOCK: STATS["skipped_has_sales"] += 1 return None try: # 3. 筛选第二步:检查关联列表 # 获取所有关联模块的计数值 check1_params = { "module": "Users", "action": "UsersAjax", "file": "setRelatedListCount", "modulename": "Products", "record": crm_id } resp1 = self.session.post(BASE_URL, data=check1_params, headers=self.headers, timeout=10) if not resp1.text: with STATS_LOCK: STATS["skipped_api_error"] += 1 return None data1 = resp1.json() # 拿到完整的 JSON 字典 # 定义需要检查的 Key 列表 target_keys = ["36", "37", "325", "523", "561"] # 只要这些 Key 中有一个值不为 "0",就直接判定为“有关联”,立即跳过 for key in target_keys: val = data1.get(key) if val is None: try: val = data1.get(int(key)) except: pass val_str = str(val) if val is not None else "0" if val_str != "0": with STATS_LOCK: STATS["skipped_has_relations"] += 1 return None # 4. 【恢复】筛选第三步:检查仓库历史 (必须为空) check2_params = { "module": "Products", "action": "ProductsAjax", "file": "getCangkuHistoryInfo", "productid": crm_id, "currpage": "1" } resp2 = self.session.post(BASE_URL, data=check2_params, headers=self.headers, timeout=10) data2 = resp2.json() # 获取 entity -> value 列表 entity_value = data2.get("entity", {}).get("value") # 如果列表存在且长度大于0,说明有历史记录,跳过 if entity_value and len(entity_value) > 0: with STATS_LOCK: STATS["skipped_has_history"] += 1 return None # === 全部通过,提取详细数据 === with STATS_LOCK: STATS["success"] += 1 # --- 数据清洗与提取 --- # 1. 产品名称 (去除 HTML) clean_name = re.sub(r'<[^>]+>', '', str(raw_name)).strip() # 2. 厂家 (cf_2128) - 直接获取文本 manufacturer = str(item.get("cf_2128", "")).strip() # 3. 供应商名称 (vendorid) - 去除 HTML 标签,提取文本 raw_vendor = item.get("vendorid", "") clean_vendor = re.sub(r'<[^>]+>', '', str(raw_vendor)).strip() # 4. 产品类别 (catalogid) - 根据要求,不含HTML,直接获取文本 clean_catalog = str(item.get("catalogid", "")).strip() return { "产品名称": clean_name, "产品编码": product_code, "厂家": manufacturer, "供应商名称": clean_vendor, "产品类别": clean_catalog } except Exception as e: with STATS_LOCK: STATS["skipped_api_error"] += 1 return None def get_template_columns(filename): if not os.path.exists(filename): print(f"[-] 错误:找不到模板文件 '{filename}'") return None try: try: df = pd.read_csv(filename, encoding='utf-8-sig', nrows=0) except UnicodeDecodeError: df = pd.read_csv(filename, encoding='gbk', nrows=0) return df.columns.tolist() except Exception as e: print(f"[-] 读取模板表头失败: {e}") return None def main(): columns = get_template_columns(TEMPLATE_FILE) if not columns: return fetcher = CRMFetcher() if not fetcher.login(): return all_data = fetcher.fetch_all_products() total_count = len(all_data) if total_count == 0: print("[-] 未获取到数据。") return print(f"\n[*] 第二阶段:并发筛选 {total_count} 条数据 (含多重关联与历史记录验证)...") valid_rows = [] processed_count = 0 start_time = time.time() with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_item = {executor.submit(fetcher.check_single_product, item): item for item in all_data} for future in as_completed(future_to_item): processed_count += 1 result_dict = future.result() if result_dict: # 动态映射:只有模板里有的列,才会被写入 row_data = {col: None for col in columns} # 映射关系配置 (Excel列名 : 数据字典Key) # 请确保您的CSV模板中包含 "厂家", "供应商名称", "产品类别" 这几列,否则不会写入 mapping = { "产品名称": "产品名称", "产品编码": "产品编码", "厂家": "厂家", "供应商名称": "供应商名称", "产品类别": "产品类别" } for col_name in columns: if col_name in mapping: row_data[col_name] = result_dict.get(mapping[col_name]) valid_rows.append(row_data) if processed_count % 20 == 0 or processed_count == total_count: percent = (processed_count / total_count) * 100 speed = processed_count / (time.time() - start_time + 0.01) print( f"\r进度: {processed_count}/{total_count} ({percent:.1f}%) - 选中: {len(valid_rows)} - 速度: {speed:.1f}条/秒", end="") print("\n\n" + "=" * 40) print(" 筛选结果统计") print("=" * 40) print(f"总处理条数 : {STATS['total_processed']}") print(f"[-] 因缺失ID跳过 : {STATS['skipped_no_id']}") print(f"[-] 因有销量跳过 : {STATS['skipped_has_sales']}") print(f"[-] 因有关联跳过 : {STATS['skipped_has_relations']} (Key 36/37/325/523/561 != 0)") print(f"[-] 因有历史跳过 : {STATS['skipped_has_history']} (Has History)") print(f"[-] 因API错误跳过 : {STATS['skipped_api_error']}") print(f"[+] 最终成功保留 : {STATS['success']}") print("=" * 40) if valid_rows: try: df_output = pd.DataFrame(valid_rows, columns=columns) print(f"[*] 正在写入 Excel '{OUTPUT_FILE}'...") df_output.to_excel(OUTPUT_FILE, index=False) print(f"[+] 成功!") except Exception as e: print(f"[-] 写入失败: {e}") else: print("[-] 没有数据被选中。") if __name__ == "__main__": main()