Files
CRM-chanpin/导出数据.py

308 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import re
import time
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
import threading
# ================= 配置区域 =================
BASE_URL = "http://111.198.24.44:88/index.php"
USERNAME = "TEST"
PASSWORD = "test" # <--- 请在此填入真实密码
# --- 调试配置 ---
# True: 开启调试模式,只处理前 200 条
# False: 关闭调试模式,跑全量
DEBUG_MODE = False
DEBUG_LIMIT = 200
# --- 并发配置 ---
MAX_WORKERS = 10
# --- 文件配置 ---
TEMPLATE_FILE = "产品-导入模板.csv"
OUTPUT_FILE = "最终导出数据.xlsx"
# ===========================================
# 统计计数器
STATS = {
"total_processed": 0,
"skipped_no_id": 0,
"skipped_has_sales": 0, # 销量不为0
"skipped_has_relations": 0, # 关联 Key (36/37/325/523/561) 任意一个不为0
"skipped_has_history": 0, # 【恢复】有仓库历史记录
"skipped_api_error": 0,
"success": 0
}
STATS_LOCK = threading.Lock()
class CRMFetcher:
def __init__(self):
self.session = requests.Session()
# 优化连接池,防止高并发报错
adapter = HTTPAdapter(pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS)
self.session.mount('http://', adapter)
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest"
}
def login(self):
print("[*] 正在登录系统...")
try:
payload = {
"module": "Users", "action": "Authenticate", "return_module": "Users",
"return_action": "Login", "user_name": USERNAME, "user_password": PASSWORD,
"login_theme": "newskin"
}
resp = self.session.post(BASE_URL, data=payload, headers=self.headers)
if "logout" in resp.text.lower() or "退出" in resp.text:
print("[+] 登录成功!")
return True
else:
print(f"[-] 登录失败: {resp.status_code}")
return False
except Exception as e:
print(f"[-] 登录异常: {e}")
return False
def fetch_all_products(self):
"""自动翻页获取产品列表"""
all_products = []
page = 1
page_size = 100
last_page_ids = []
print(f"\n[*] 第一阶段:开始获取产品列表 (viewname=397)...")
if DEBUG_MODE:
print(f" [提示] 调试模式开启,仅获取前 {DEBUG_LIMIT} 条。")
while True:
# 调试限制
if DEBUG_MODE and len(all_products) >= DEBUG_LIMIT:
print(f" [调试] 已达到 {DEBUG_LIMIT} 条限制,停止获取。")
all_products = all_products[:DEBUG_LIMIT]
break
payload = {
"module": "Products", "action": "ProductsAjax", "file": "ListViewData",
"sorder": "", "start": str(page), "order_by": "", "pagesize": str(page_size),
"actionId": "1769042712624", "isFilter": "true", "search[viewname]": "397"
}
try:
resp = self.session.post(BASE_URL, data=payload, headers=self.headers)
data = resp.json()
page_items = data.get("data", []) if isinstance(data, dict) else data
if not page_items:
print(f"{page} 页为空,结束。")
break
# 死循环检测
current_page_ids = [item.get('crmid') for item in page_items]
if current_page_ids == last_page_ids:
print(f"{page} 页重复,停止。")
break
last_page_ids = current_page_ids
all_products.extend(page_items)
print(f" 已获取第 {page} 页 (本页{len(page_items)}条) - 总计: {len(all_products)}")
page += 1
time.sleep(0.2)
except Exception as e:
print(f"[-] 获取第 {page} 页出错: {e}")
break
return all_products
def check_single_product(self, item):
"""
核心筛选逻辑:
1. 检查销量 (SalesNum) -> 必须为0
2. 检查关联 (Key 36, 37, 325, 523, 561) -> 必须全为0
3. 【恢复】检查历史 (CangkuHistory) -> 必须为空
"""
with STATS_LOCK:
STATS["total_processed"] += 1
# 1. 获取基础信息
crm_id = item.get("crmid") or item.get("productid")
raw_name = item.get("productname", "")
product_code = item.get("productcode", "")
if not crm_id:
with STATS_LOCK: STATS["skipped_no_id"] += 1
return None
# 2. 筛选第一步:检查销量 (必须为0)
sales_str = str(item.get("salesnum", "0")).replace(",", "")
try:
sales_num = float(sales_str)
except ValueError:
sales_num = 0.0
if sales_num != 0:
with STATS_LOCK: STATS["skipped_has_sales"] += 1
return None
try:
# 3. 筛选第二步:检查关联列表
# 获取所有关联模块的计数值
check1_params = {
"module": "Users", "action": "UsersAjax", "file": "setRelatedListCount",
"modulename": "Products", "record": crm_id
}
resp1 = self.session.post(BASE_URL, data=check1_params, headers=self.headers, timeout=10)
if not resp1.text:
with STATS_LOCK: STATS["skipped_api_error"] += 1
return None
data1 = resp1.json() # 拿到完整的 JSON 字典,例如 {'36': '0', '37': '5', ...}
# === 修改核心逻辑 ===
# 定义需要检查的 Key 列表
target_keys = ["36", "37", "325", "523", "561"]
# 只要这些 Key 中有一个值不为 "0",就直接判定为“有关联”,立即跳过
for key in target_keys:
# 获取值(兼容 key 可能是 int 或 str 的情况)
val = data1.get(key)
if val is None:
# 尝试用 int 获取(防止 json 解析自动转 int
try:
val = data1.get(int(key))
except:
pass
# 如果获取不到,默认为 0 (即 None 视为 0)
# 否则转字符串进行比较
val_str = str(val) if val is not None else "0"
if val_str != "0":
# 只要有一个不为0命中规则直接跳过无需检查后续 Key
with STATS_LOCK: STATS["skipped_has_relations"] += 1
return None
# === 逻辑结束 ===
# 4. 【恢复】筛选第三步:检查仓库历史 (必须为空)
check2_params = {
"module": "Products", "action": "ProductsAjax", "file": "getCangkuHistoryInfo",
"productid": crm_id, "currpage": "1"
}
resp2 = self.session.post(BASE_URL, data=check2_params, headers=self.headers, timeout=10)
data2 = resp2.json()
# 获取 entity -> value 列表
entity_value = data2.get("entity", {}).get("value")
# 如果列表存在且长度大于0说明有历史记录跳过
if entity_value and len(entity_value) > 0:
with STATS_LOCK: STATS["skipped_has_history"] += 1
return None
# === 全部通过 ===
with STATS_LOCK:
STATS["success"] += 1
clean_name = re.sub(r'<[^>]+>', '', raw_name).strip()
return {
"产品名称": clean_name,
"产品编码": product_code
}
except Exception as e:
with STATS_LOCK:
STATS["skipped_api_error"] += 1
return None
def get_template_columns(filename):
if not os.path.exists(filename):
print(f"[-] 错误:找不到模板文件 '{filename}'")
return None
try:
try:
df = pd.read_csv(filename, encoding='utf-8-sig', nrows=0)
except UnicodeDecodeError:
df = pd.read_csv(filename, encoding='gbk', nrows=0)
return df.columns.tolist()
except Exception as e:
print(f"[-] 读取模板表头失败: {e}")
return None
def main():
columns = get_template_columns(TEMPLATE_FILE)
if not columns:
return
fetcher = CRMFetcher()
if not fetcher.login():
return
all_data = fetcher.fetch_all_products()
total_count = len(all_data)
if total_count == 0:
print("[-] 未获取到数据。")
return
print(f"\n[*] 第二阶段:并发筛选 {total_count} 条数据 (含多重关联与历史记录验证)...")
valid_rows = []
processed_count = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_item = {executor.submit(fetcher.check_single_product, item): item for item in all_data}
for future in as_completed(future_to_item):
processed_count += 1
result_dict = future.result()
if result_dict:
row_data = {col: None for col in columns}
if "产品名称" in columns: row_data["产品名称"] = result_dict["产品名称"]
if "产品编码" in columns: row_data["产品编码"] = result_dict["产品编码"]
valid_rows.append(row_data)
if processed_count % 20 == 0 or processed_count == total_count:
percent = (processed_count / total_count) * 100
speed = processed_count / (time.time() - start_time + 0.01)
print(
f"\r进度: {processed_count}/{total_count} ({percent:.1f}%) - 选中: {len(valid_rows)} - 速度: {speed:.1f}条/秒",
end="")
print("\n\n" + "=" * 40)
print(" 筛选结果统计")
print("=" * 40)
print(f"总处理条数 : {STATS['total_processed']}")
print(f"[-] 因缺失ID跳过 : {STATS['skipped_no_id']}")
print(f"[-] 因有销量跳过 : {STATS['skipped_has_sales']}")
print(f"[-] 因有关联跳过 : {STATS['skipped_has_relations']} (Key 36/37/325/523/561 != 0)")
print(f"[-] 因有历史跳过 : {STATS['skipped_has_history']} (Has History)")
print(f"[-] 因API错误跳过 : {STATS['skipped_api_error']}")
print(f"[+] 最终成功保留 : {STATS['success']}")
print("=" * 40)
if valid_rows:
try:
df_output = pd.DataFrame(valid_rows, columns=columns)
print(f"[*] 正在写入 Excel '{OUTPUT_FILE}'...")
df_output.to_excel(OUTPUT_FILE, index=False)
print(f"[+] 成功!")
except Exception as e:
print(f"[-] 写入失败: {e}")
else:
print("[-] 没有数据被选中。")
if __name__ == "__main__":
main()