Files
CRM-chanpin/导出数据.py

332 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import re
import time
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
import threading
# ================= 配置区域 =================
BASE_URL = "http://111.198.24.44:88/index.php"
USERNAME = "TEST"
PASSWORD = "test" # <--- 请在此填入真实密码
# --- 调试配置 ---
# True: 开启调试模式,只处理前 200 条
# False: 关闭调试模式,跑全量
DEBUG_MODE = False
DEBUG_LIMIT = 200
# --- 并发配置 ---
MAX_WORKERS = 10
# --- 文件配置 ---
TEMPLATE_FILE = "产品-导入模板.csv"
OUTPUT_FILE = "最终导出数据_含供应商厂家.xlsx"
# ===========================================
# 统计计数器
STATS = {
"total_processed": 0,
"skipped_no_id": 0,
"skipped_has_sales": 0, # 销量不为0
"skipped_has_relations": 0, # 关联 Key (36/37/325/523/561) 任意一个不为0
"skipped_has_history": 0, # 【恢复】有仓库历史记录
"skipped_api_error": 0,
"success": 0
}
STATS_LOCK = threading.Lock()
class CRMFetcher:
def __init__(self):
self.session = requests.Session()
# 优化连接池,防止高并发报错
adapter = HTTPAdapter(pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS)
self.session.mount('http://', adapter)
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest"
}
def login(self):
print("[*] 正在登录系统...")
try:
payload = {
"module": "Users", "action": "Authenticate", "return_module": "Users",
"return_action": "Login", "user_name": USERNAME, "user_password": PASSWORD,
"login_theme": "newskin"
}
resp = self.session.post(BASE_URL, data=payload, headers=self.headers)
if "logout" in resp.text.lower() or "退出" in resp.text:
print("[+] 登录成功!")
return True
else:
print(f"[-] 登录失败: {resp.status_code}")
return False
except Exception as e:
print(f"[-] 登录异常: {e}")
return False
def fetch_all_products(self):
"""自动翻页获取产品列表"""
all_products = []
page = 1
page_size = 100
last_page_ids = []
print(f"\n[*] 第一阶段:开始获取产品列表 (viewname=397)...")
if DEBUG_MODE:
print(f" [提示] 调试模式开启,仅获取前 {DEBUG_LIMIT} 条。")
while True:
# 调试限制
if DEBUG_MODE and len(all_products) >= DEBUG_LIMIT:
print(f" [调试] 已达到 {DEBUG_LIMIT} 条限制,停止获取。")
all_products = all_products[:DEBUG_LIMIT]
break
payload = {
"module": "Products", "action": "ProductsAjax", "file": "ListViewData",
"sorder": "", "start": str(page), "order_by": "", "pagesize": str(page_size),
"actionId": "1769042712624", "isFilter": "true", "search[viewname]": "397"
}
try:
resp = self.session.post(BASE_URL, data=payload, headers=self.headers)
data = resp.json()
page_items = data.get("data", []) if isinstance(data, dict) else data
if not page_items:
print(f"{page} 页为空,结束。")
break
# 死循环检测
current_page_ids = [item.get('crmid') for item in page_items]
if current_page_ids == last_page_ids:
print(f"{page} 页重复,停止。")
break
last_page_ids = current_page_ids
all_products.extend(page_items)
print(f" 已获取第 {page} 页 (本页{len(page_items)}条) - 总计: {len(all_products)}")
page += 1
time.sleep(0.2)
except Exception as e:
print(f"[-] 获取第 {page} 页出错: {e}")
break
return all_products
def check_single_product(self, item):
"""
核心筛选逻辑:
1. 检查销量 (SalesNum) -> 必须为0
2. 检查关联 (Key 36, 37, 325, 523, 561) -> 必须全为0
3. 【恢复】检查历史 (CangkuHistory) -> 必须为空
"""
with STATS_LOCK:
STATS["total_processed"] += 1
# 1. 获取基础信息
crm_id = item.get("crmid") or item.get("productid")
raw_name = item.get("productname", "")
product_code = item.get("productcode", "")
if not crm_id:
with STATS_LOCK: STATS["skipped_no_id"] += 1
return None
# 2. 筛选第一步:检查销量 (必须为0)
sales_str = str(item.get("salesnum", "0")).replace(",", "")
try:
sales_num = float(sales_str)
except ValueError:
sales_num = 0.0
if sales_num != 0:
with STATS_LOCK: STATS["skipped_has_sales"] += 1
return None
try:
# 3. 筛选第二步:检查关联列表
# 获取所有关联模块的计数值
check1_params = {
"module": "Users", "action": "UsersAjax", "file": "setRelatedListCount",
"modulename": "Products", "record": crm_id
}
resp1 = self.session.post(BASE_URL, data=check1_params, headers=self.headers, timeout=10)
if not resp1.text:
with STATS_LOCK: STATS["skipped_api_error"] += 1
return None
data1 = resp1.json() # 拿到完整的 JSON 字典
# 定义需要检查的 Key 列表
target_keys = ["36", "37", "325", "523", "561"]
# 只要这些 Key 中有一个值不为 "0",就直接判定为“有关联”,立即跳过
for key in target_keys:
val = data1.get(key)
if val is None:
try:
val = data1.get(int(key))
except:
pass
val_str = str(val) if val is not None else "0"
if val_str != "0":
with STATS_LOCK: STATS["skipped_has_relations"] += 1
return None
# 4. 【恢复】筛选第三步:检查仓库历史 (必须为空)
check2_params = {
"module": "Products", "action": "ProductsAjax", "file": "getCangkuHistoryInfo",
"productid": crm_id, "currpage": "1"
}
resp2 = self.session.post(BASE_URL, data=check2_params, headers=self.headers, timeout=10)
data2 = resp2.json()
# 获取 entity -> value 列表
entity_value = data2.get("entity", {}).get("value")
# 如果列表存在且长度大于0说明有历史记录跳过
if entity_value and len(entity_value) > 0:
with STATS_LOCK: STATS["skipped_has_history"] += 1
return None
# === 全部通过,提取详细数据 ===
with STATS_LOCK:
STATS["success"] += 1
# --- 数据清洗与提取 ---
# 1. 产品名称 (去除 HTML)
clean_name = re.sub(r'<[^>]+>', '', str(raw_name)).strip()
# 2. 厂家 (cf_2128) - 直接获取文本
manufacturer = str(item.get("cf_2128", "")).strip()
# 3. 供应商名称 (vendorid) - 去除 HTML 标签,提取文本
raw_vendor = item.get("vendorid", "")
clean_vendor = re.sub(r'<[^>]+>', '', str(raw_vendor)).strip()
# 4. 产品类别 (catalogid) - 根据要求不含HTML直接获取文本
clean_catalog = str(item.get("catalogid", "")).strip()
return {
"产品名称": clean_name,
"产品编码": product_code,
"厂家": manufacturer,
"供应商名称": clean_vendor,
"产品类别": clean_catalog
}
except Exception as e:
with STATS_LOCK:
STATS["skipped_api_error"] += 1
return None
def get_template_columns(filename):
if not os.path.exists(filename):
print(f"[-] 错误:找不到模板文件 '{filename}'")
return None
try:
try:
df = pd.read_csv(filename, encoding='utf-8-sig', nrows=0)
except UnicodeDecodeError:
df = pd.read_csv(filename, encoding='gbk', nrows=0)
return df.columns.tolist()
except Exception as e:
print(f"[-] 读取模板表头失败: {e}")
return None
def main():
columns = get_template_columns(TEMPLATE_FILE)
if not columns:
return
fetcher = CRMFetcher()
if not fetcher.login():
return
all_data = fetcher.fetch_all_products()
total_count = len(all_data)
if total_count == 0:
print("[-] 未获取到数据。")
return
print(f"\n[*] 第二阶段:并发筛选 {total_count} 条数据 (含多重关联与历史记录验证)...")
valid_rows = []
processed_count = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_item = {executor.submit(fetcher.check_single_product, item): item for item in all_data}
for future in as_completed(future_to_item):
processed_count += 1
result_dict = future.result()
if result_dict:
# 动态映射:只有模板里有的列,才会被写入
row_data = {col: None for col in columns}
# 映射关系配置 (Excel列名 : 数据字典Key)
# 请确保您的CSV模板中包含 "厂家", "供应商名称", "产品类别" 这几列,否则不会写入
mapping = {
"产品名称": "产品名称",
"产品编码": "产品编码",
"厂家": "厂家",
"供应商名称": "供应商名称",
"产品类别": "产品类别"
}
for col_name in columns:
if col_name in mapping:
row_data[col_name] = result_dict.get(mapping[col_name])
valid_rows.append(row_data)
if processed_count % 20 == 0 or processed_count == total_count:
percent = (processed_count / total_count) * 100
speed = processed_count / (time.time() - start_time + 0.01)
print(
f"\r进度: {processed_count}/{total_count} ({percent:.1f}%) - 选中: {len(valid_rows)} - 速度: {speed:.1f}条/秒",
end="")
print("\n\n" + "=" * 40)
print(" 筛选结果统计")
print("=" * 40)
print(f"总处理条数 : {STATS['total_processed']}")
print(f"[-] 因缺失ID跳过 : {STATS['skipped_no_id']}")
print(f"[-] 因有销量跳过 : {STATS['skipped_has_sales']}")
print(f"[-] 因有关联跳过 : {STATS['skipped_has_relations']} (Key 36/37/325/523/561 != 0)")
print(f"[-] 因有历史跳过 : {STATS['skipped_has_history']} (Has History)")
print(f"[-] 因API错误跳过 : {STATS['skipped_api_error']}")
print(f"[+] 最终成功保留 : {STATS['success']}")
print("=" * 40)
if valid_rows:
try:
df_output = pd.DataFrame(valid_rows, columns=columns)
print(f"[*] 正在写入 Excel '{OUTPUT_FILE}'...")
df_output.to_excel(OUTPUT_FILE, index=False)
print(f"[+] 成功!")
except Exception as e:
print(f"[-] 写入失败: {e}")
else:
print("[-] 没有数据被选中。")
if __name__ == "__main__":
main()