Files
CRM-chanpin/导出数据.py

269 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import re
import time
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
# ================= 配置区域 =================
BASE_URL = "http://111.198.24.44:88/index.php"
USERNAME = "TEST"
PASSWORD = "test" # <--- 请在此填入真实密码
# --- 调试配置 ---
# True: 开启调试模式,只获取前 200 条数据进行测试
# False: 关闭调试模式,处理所有数据 (2万条+)
DEBUG_MODE = False
DEBUG_LIMIT = 1000
# --- 文件配置 ---
TEMPLATE_FILE = "产品-导入模板.csv" # 你的 CSV 模板文件
OUTPUT_FILE = "最终导出数据.xlsx" # 生成的 Excel 文件
MAX_WORKERS = 10 # 并发线程数
# ===========================================
class CRMFetcher:
def __init__(self):
self.session = requests.Session()
# 优化连接池
adapter = HTTPAdapter(pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS)
self.session.mount('http://', adapter)
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest"
}
def login(self):
"""执行登录"""
print("[*] 正在登录系统...")
payload = {
"module": "Users", "action": "Authenticate", "return_module": "Users",
"return_action": "Login", "user_name": USERNAME, "user_password": PASSWORD,
"login_theme": "newskin"
}
try:
resp = self.session.post(BASE_URL, data=payload, headers=self.headers)
if "logout" in resp.text.lower() or "退出" in resp.text:
print("[+] 登录成功!")
return True
else:
print("[-] 登录失败,请检查账号密码。")
return False
except Exception as e:
print(f"[-] 登录异常: {e}")
return False
def fetch_all_products(self):
"""自动翻页获取产品列表"""
all_products = []
page = 1
page_size = 100
print(f"\n[*] 第一阶段:开始获取产品列表 (调试模式: {'开启' if DEBUG_MODE else '关闭'})...")
while True:
# 调试模式限制
if DEBUG_MODE and len(all_products) >= DEBUG_LIMIT:
print(f" [调试] 已达到 {DEBUG_LIMIT} 条限制,停止获取列表。")
all_products = all_products[:DEBUG_LIMIT]
break
payload = {
"module": "Products", "action": "ProductsAjax", "file": "ListViewData",
"start": str(page), "pagesize": str(page_size),
"isFilter": "true", "search[viewname]": "28",
"filter[Fields0]": "cf_2318", "filter[Condition0]": "is", "filter[Srch_value0]": "",
"filter[type0]": "opts", "filter[search_cnt]": "1", "filter[matchtype]": "all"
}
try:
resp = self.session.post(BASE_URL, data=payload, headers=self.headers)
data = resp.json()
page_items = data.get("data", []) if isinstance(data, dict) else data
if not page_items or len(page_items) == 0:
print(f"{page} 页为空,列表获取结束。")
break
all_products.extend(page_items)
print(f" 已获取第 {page} 页 - 总计: {len(all_products)}")
page += 1
time.sleep(0.2)
except Exception as e:
print(f"[-] 获取第 {page} 页时出错: {e}")
break
return all_products
def check_single_product(self, item):
"""
核心检查逻辑
返回:{'产品名称': name, '产品编码': code} 如果符合条件
返回None 如果不符合
"""
crm_id = item.get("crmid")
raw_name = item.get("productname", "")
product_code = item.get("productcode", "")
# === 优化步骤 0: 检查 salesnum (销量) ===
# 获取销量,处理可能的逗号 (如 "1,000.00") 和空值
sales_str = str(item.get("salesnum", "0")).replace(",", "")
try:
sales_num = float(sales_str)
except ValueError:
sales_num = 0.0
# 如果销量不为0说明是“保留”产品不需要进行后续检查直接跳过返回 None
# 从而极大减少 API 请求
if sales_num != 0:
return None
# -------------------------------------------------------
# 下面是销量为 0 时,进行的严格验证 (验证是否为废弃/空闲数据)
# -------------------------------------------------------
if not crm_id:
return None
try:
# === 步骤 1: 检查关联列表 (Key 36 是否为 0) ===
check1_params = {
"module": "Users", "action": "UsersAjax", "file": "setRelatedListCount",
"modulename": "Products", "record": crm_id
}
resp1 = self.session.post(BASE_URL, data=check1_params, headers=self.headers, timeout=10)
data1 = resp1.json()
val_36 = data1.get("36") or data1.get(36)
# 如果不等于0跳过
if str(val_36) != "0":
return None
# === 步骤 2: 检查仓库历史 (是否为空) ===
check2_params = {
"module": "Products", "action": "ProductsAjax", "file": "getCangkuHistoryInfo",
"productid": crm_id, "currpage": "1"
}
resp2 = self.session.post(BASE_URL, data=check2_params, headers=self.headers, timeout=10)
data2 = resp2.json()
entity_value = data2.get("entity", {}).get("value")
# 如果有历史记录,跳过
if entity_value and len(entity_value) > 0:
return None
# === 步骤 3: 所有条件满足销量0 + 无关联 + 无历史),写入 Excel ===
clean_name = re.sub(r'<[^>]+>', '', raw_name).strip()
return {
"产品名称": clean_name,
"产品编码": product_code
}
except Exception as e:
# 网络超时或其他错误,跳过
return None
def get_template_columns(filename):
"""读取 CSV 模板的表头"""
if not os.path.exists(filename):
print(f"[-] 错误:找不到模板文件 '{filename}'")
return None
try:
# 兼容 utf-8 和 gbk
try:
df = pd.read_csv(filename, encoding='utf-8-sig', nrows=0)
except UnicodeDecodeError:
df = pd.read_csv(filename, encoding='gbk', nrows=0)
return df.columns.tolist()
except Exception as e:
print(f"[-] 读取模板表头失败: {e}")
return None
def main():
# 1. 读取模板表头
columns = get_template_columns(TEMPLATE_FILE)
if not columns:
return
print(f"[*] 成功读取模板表头,目标 Excel 将包含这 {len(columns)} 列。")
fetcher = CRMFetcher()
if not fetcher.login():
return
# 2. 获取数据列表
all_data = fetcher.fetch_all_products()
total_count = len(all_data)
if total_count == 0:
print("[-] 未获取到数据。")
return
print(f"\n[*] 第二阶段:智能筛选 {total_count} 条数据 (利用销量数据加速)...")
valid_rows = []
processed_count = 0
skipped_by_sales = 0 # 统计优化了多少条
start_time = time.time()
# 3. 开启线程池
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_item = {executor.submit(fetcher.check_single_product, item): item for item in all_data}
for future in as_completed(future_to_item):
processed_count += 1
result_dict = future.result()
# 这里的统计逻辑稍微模糊,因为 result_dict 为 None 可能是因为销量不为0也可能是因为 API 检查不通过
# 但不影响核心功能
if result_dict:
row_data = {col: None for col in columns}
if "产品名称" in columns:
row_data["产品名称"] = result_dict["产品名称"]
if "产品编码" in columns:
row_data["产品编码"] = result_dict["产品编码"]
valid_rows.append(row_data)
# 进度条
if processed_count % 50 == 0 or processed_count == total_count:
percent = (processed_count / total_count) * 100
elapsed = time.time() - start_time
speed = processed_count / elapsed if elapsed > 0 else 0
print(
f"\r进度: {processed_count}/{total_count} ({percent:.1f}%) - 选中: {len(valid_rows)} - 速度: {speed:.1f}条/秒",
end="")
print("\n\n[*] 筛选完成!")
# 4. 生成 Excel
try:
if not valid_rows:
print("[!] 警告:没有筛选出符合条件的数据,生成的 Excel 将为空。")
df_output = pd.DataFrame(valid_rows, columns=columns)
print(f"[*] 正在保存为 Excel 文件 '{OUTPUT_FILE}'...")
df_output.to_excel(OUTPUT_FILE, index=False)
print(f"[+] 成功!结果已写入 '{OUTPUT_FILE}'")
print(f"[+] 提示:请务必检查 '调试模式' (DEBUG_MODE) 是否已根据需要关闭。")
except Exception as e:
print(f"[-] 写入 Excel 失败: {e}")
if __name__ == "__main__":
main()