From 43e6bf45efeda583a03e54b5293f8d0a25672315 Mon Sep 17 00:00:00 2001 From: DXC Date: Wed, 21 Jan 2026 17:28:54 +0800 Subject: [PATCH] =?UTF-8?q?=E9=AA=8C=E8=AF=81=E5=AE=9E=E7=8E=B0=E8=8E=B7?= =?UTF-8?q?=E5=8F=96CRM=E4=BA=A7=E5=93=81=E7=AD=9B=E9=80=89=E9=9C=80?= =?UTF-8?q?=E8=A6=81=E5=88=A0=E9=99=A4=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 导出数据.py | 269 ++++++++++++++++++++++++++++++++++++++++++++++++ 获取列表.py | 107 +++++++++++++++++++ 2 files changed, 376 insertions(+) create mode 100644 导出数据.py create mode 100644 获取列表.py diff --git a/导出数据.py b/导出数据.py new file mode 100644 index 0000000..6a8ebd4 --- /dev/null +++ b/导出数据.py @@ -0,0 +1,269 @@ +import requests +import json +import re +import time +import os +import pandas as pd +from concurrent.futures import ThreadPoolExecutor, as_completed +from requests.adapters import HTTPAdapter + +# ================= 配置区域 ================= +BASE_URL = "http://111.198.24.44:88/index.php" +USERNAME = "TEST" +PASSWORD = "test" # <--- 请在此填入真实密码 + +# --- 调试配置 --- +# True: 开启调试模式,只获取前 200 条数据进行测试 +# False: 关闭调试模式,处理所有数据 (2万条+) +DEBUG_MODE = False +DEBUG_LIMIT = 1000 + +# --- 文件配置 --- +TEMPLATE_FILE = "产品-导入模板.csv" # 你的 CSV 模板文件 +OUTPUT_FILE = "最终导出数据.xlsx" # 生成的 Excel 文件 +MAX_WORKERS = 10 # 并发线程数 + + +# =========================================== + +class CRMFetcher: + def __init__(self): + self.session = requests.Session() + # 优化连接池 + adapter = HTTPAdapter(pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS) + self.session.mount('http://', adapter) + + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "X-Requested-With": "XMLHttpRequest" + } + + def login(self): + """执行登录""" + print("[*] 正在登录系统...") + payload = { + "module": "Users", "action": "Authenticate", "return_module": "Users", + "return_action": "Login", "user_name": USERNAME, "user_password": PASSWORD, + "login_theme": "newskin" + } + try: + resp = self.session.post(BASE_URL, data=payload, headers=self.headers) + if "logout" in resp.text.lower() or "退出" in resp.text: + print("[+] 登录成功!") + return True + else: + print("[-] 登录失败,请检查账号密码。") + return False + except Exception as e: + print(f"[-] 登录异常: {e}") + return False + + def fetch_all_products(self): + """自动翻页获取产品列表""" + all_products = [] + page = 1 + page_size = 100 + + print(f"\n[*] 第一阶段:开始获取产品列表 (调试模式: {'开启' if DEBUG_MODE else '关闭'})...") + + while True: + # 调试模式限制 + if DEBUG_MODE and len(all_products) >= DEBUG_LIMIT: + print(f" [调试] 已达到 {DEBUG_LIMIT} 条限制,停止获取列表。") + all_products = all_products[:DEBUG_LIMIT] + break + + payload = { + "module": "Products", "action": "ProductsAjax", "file": "ListViewData", + "start": str(page), "pagesize": str(page_size), + "isFilter": "true", "search[viewname]": "28", + "filter[Fields0]": "cf_2318", "filter[Condition0]": "is", "filter[Srch_value0]": "否", + "filter[type0]": "opts", "filter[search_cnt]": "1", "filter[matchtype]": "all" + } + + try: + resp = self.session.post(BASE_URL, data=payload, headers=self.headers) + data = resp.json() + + page_items = data.get("data", []) if isinstance(data, dict) else data + + if not page_items or len(page_items) == 0: + print(f" 第 {page} 页为空,列表获取结束。") + break + + all_products.extend(page_items) + print(f" 已获取第 {page} 页 - 总计: {len(all_products)}条") + + page += 1 + time.sleep(0.2) + + except Exception as e: + print(f"[-] 获取第 {page} 页时出错: {e}") + break + + return all_products + + def check_single_product(self, item): + """ + 核心检查逻辑 + 返回:{'产品名称': name, '产品编码': code} 如果符合条件 + 返回:None 如果不符合 + """ + crm_id = item.get("crmid") + raw_name = item.get("productname", "") + product_code = item.get("productcode", "") + + # === 优化步骤 0: 检查 salesnum (销量) === + # 获取销量,处理可能的逗号 (如 "1,000.00") 和空值 + sales_str = str(item.get("salesnum", "0")).replace(",", "") + try: + sales_num = float(sales_str) + except ValueError: + sales_num = 0.0 + + # 如果销量不为0,说明是“保留”产品,不需要进行后续检查,直接跳过(返回 None) + # 从而极大减少 API 请求 + if sales_num != 0: + return None + + # ------------------------------------------------------- + # 下面是销量为 0 时,进行的严格验证 (验证是否为废弃/空闲数据) + # ------------------------------------------------------- + + if not crm_id: + return None + + try: + # === 步骤 1: 检查关联列表 (Key 36 是否为 0) === + check1_params = { + "module": "Users", "action": "UsersAjax", "file": "setRelatedListCount", + "modulename": "Products", "record": crm_id + } + resp1 = self.session.post(BASE_URL, data=check1_params, headers=self.headers, timeout=10) + data1 = resp1.json() + + val_36 = data1.get("36") or data1.get(36) + + # 如果不等于0,跳过 + if str(val_36) != "0": + return None + + # === 步骤 2: 检查仓库历史 (是否为空) === + check2_params = { + "module": "Products", "action": "ProductsAjax", "file": "getCangkuHistoryInfo", + "productid": crm_id, "currpage": "1" + } + resp2 = self.session.post(BASE_URL, data=check2_params, headers=self.headers, timeout=10) + data2 = resp2.json() + + entity_value = data2.get("entity", {}).get("value") + + # 如果有历史记录,跳过 + if entity_value and len(entity_value) > 0: + return None + + # === 步骤 3: 所有条件满足(销量0 + 无关联 + 无历史),写入 Excel === + clean_name = re.sub(r'<[^>]+>', '', raw_name).strip() + + return { + "产品名称": clean_name, + "产品编码": product_code + } + + except Exception as e: + # 网络超时或其他错误,跳过 + return None + + +def get_template_columns(filename): + """读取 CSV 模板的表头""" + if not os.path.exists(filename): + print(f"[-] 错误:找不到模板文件 '{filename}'") + return None + + try: + # 兼容 utf-8 和 gbk + try: + df = pd.read_csv(filename, encoding='utf-8-sig', nrows=0) + except UnicodeDecodeError: + df = pd.read_csv(filename, encoding='gbk', nrows=0) + + return df.columns.tolist() + except Exception as e: + print(f"[-] 读取模板表头失败: {e}") + return None + + +def main(): + # 1. 读取模板表头 + columns = get_template_columns(TEMPLATE_FILE) + if not columns: + return + print(f"[*] 成功读取模板表头,目标 Excel 将包含这 {len(columns)} 列。") + + fetcher = CRMFetcher() + if not fetcher.login(): + return + + # 2. 获取数据列表 + all_data = fetcher.fetch_all_products() + total_count = len(all_data) + + if total_count == 0: + print("[-] 未获取到数据。") + return + + print(f"\n[*] 第二阶段:智能筛选 {total_count} 条数据 (利用销量数据加速)...") + + valid_rows = [] + processed_count = 0 + skipped_by_sales = 0 # 统计优化了多少条 + start_time = time.time() + + # 3. 开启线程池 + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + future_to_item = {executor.submit(fetcher.check_single_product, item): item for item in all_data} + + for future in as_completed(future_to_item): + processed_count += 1 + result_dict = future.result() + + # 这里的统计逻辑稍微模糊,因为 result_dict 为 None 可能是因为销量不为0,也可能是因为 API 检查不通过 + # 但不影响核心功能 + if result_dict: + row_data = {col: None for col in columns} + if "产品名称" in columns: + row_data["产品名称"] = result_dict["产品名称"] + if "产品编码" in columns: + row_data["产品编码"] = result_dict["产品编码"] + valid_rows.append(row_data) + + # 进度条 + if processed_count % 50 == 0 or processed_count == total_count: + percent = (processed_count / total_count) * 100 + elapsed = time.time() - start_time + speed = processed_count / elapsed if elapsed > 0 else 0 + print( + f"\r进度: {processed_count}/{total_count} ({percent:.1f}%) - 选中: {len(valid_rows)} - 速度: {speed:.1f}条/秒", + end="") + + print("\n\n[*] 筛选完成!") + + # 4. 生成 Excel + try: + if not valid_rows: + print("[!] 警告:没有筛选出符合条件的数据,生成的 Excel 将为空。") + + df_output = pd.DataFrame(valid_rows, columns=columns) + print(f"[*] 正在保存为 Excel 文件 '{OUTPUT_FILE}'...") + df_output.to_excel(OUTPUT_FILE, index=False) + + print(f"[+] 成功!结果已写入 '{OUTPUT_FILE}'") + print(f"[+] 提示:请务必检查 '调试模式' (DEBUG_MODE) 是否已根据需要关闭。") + + except Exception as e: + print(f"[-] 写入 Excel 失败: {e}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/获取列表.py b/获取列表.py new file mode 100644 index 0000000..dded215 --- /dev/null +++ b/获取列表.py @@ -0,0 +1,107 @@ +import requests +import json +import urllib.parse + +# ================= 配置区域 ================= +# 登录 URL +BASE_URL = "http://111.198.24.44:88/index.php" + +# 用户名和密码 (请填入真实信息) +USERNAME = "TEST" +PASSWORD = "test" # <--- 请在这里填入真实密码 + +# 伪装 Header +headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "X-Requested-With": "XMLHttpRequest" # AJAX 请求通常需要这个头 +} + + +# =========================================== + +def main(): + # 1. 创建 Session 对象 (它会自动管理 Cookies) + session = requests.Session() + + # 2. 准备登录数据 + login_payload = { + "error": "", + "login_theme": "newskin", + "module": "Users", + "action": "Authenticate", + "return_module": "Users", + "return_action": "Login", + "user_name": USERNAME, + "user_password": PASSWORD, + "code": "", + "user_validate": "" + } + + print("1. 正在尝试登录...") + try: + # 发送登录请求 + login_resp = session.post(BASE_URL, data=login_payload, headers=headers) + + # 简单检查登录是否成功 (根据你提供的逻辑) + if "logout" not in login_resp.text.lower() and "退出" not in login_resp.text: + print(f"[-] 登录失败,状态码: {login_resp.status_code}") + # print(login_resp.text[:500]) # 调试用 + return + + print("[+] 登录成功!") + + # 3. 准备获取数据的 Payload + # 注意:这里将 URL 参数转换为了字典 + # requests 会自动处理 urlencode,所以 '否' 不需要写成 '%E5%90%A6' + data_payload = { + "module": "Products", + "action": "ProductsAjax", + "file": "ListViewData", + "sorder": "", + "start": "1", + "order_by": "", + "pagesize": "100", + "actionId": "1768981966230", + "isFilter": "true", + "search[viewname]": "28", + "filter[Fields0]": "cf_2318", + "filter[Condition0]": "is", + "filter[Srch_value0]": "否", # 对应 %E5%90%A6 + "filter[type0]": "opts", + "filter[search_cnt]": "1", + "filter[matchtype]": "all" + } + + print("2. 正在获取列表数据...") + + # 发送数据请求 + # 注意:通常这种 Ajax 列表查询也是 POST 请求,如果失败可以尝试 session.get(..., params=data_payload) + data_resp = session.post(BASE_URL, data=data_payload, headers=headers) + + print(f" 状态码: {data_resp.status_code}") + + # 4. 解析并输出 JSON + try: + # 尝试解析 JSON + json_data = data_resp.json() + + # 在控制台漂亮地打印出来 + print("\n[+] 获取数据成功,JSON 内容预览 (前500字符):") + print(json.dumps(json_data, ensure_ascii=False, indent=4)[:500] + "...") + + # 将完整 JSON 保存到文件 + with open("result.json", "w", encoding="utf-8") as f: + json.dump(json_data, f, ensure_ascii=False, indent=4) + print("\n[+]完整数据已保存至当前目录下的 result.json 文件") + + except json.JSONDecodeError: + print("[-] 返回的不是有效的 JSON 数据。") + print("可能原因:1. Session 过期 2. 参数错误 3. 服务器报错") + print("返回内容片段:", data_resp.text[:500]) + + except Exception as e: + print(f"发生错误: {e}") + + +if __name__ == "__main__": + main() \ No newline at end of file