Files
Contract-document-crawling-…/爬取全量和搜索合并.py
2026-01-18 11:31:40 +08:00

465 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import time
import os
from lxml import html
import re
import urllib.parse
import pandas as pd
import math
from collections import defaultdict, Counter
# ================= 1. 配置区域 =================
base_url = "http://111.198.24.44:88/index.php"
# 登录参数
login_payload = {
"module": "Users",
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "TEST", # ★★★ 请填入真实用户名
"user_password": "***", # ★★★ 请填入真实密码
"login_theme": "newskin"
}
# 请求头 (包含 Ajax 标识)
http_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index",
"X-Requested-With": "XMLHttpRequest", # 关键
"Accept": "application/json, text/javascript, */*; q=0.01"
}
# 详情页 XPath
target_xpath = "/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]"
# ================= 2. 核心辅助函数 =================
def get_current_action_id():
return int(time.time() * 1000)
def clean_text_structure(element):
"""深度清洗函数"""
if element is None: return ""
import copy
el = copy.deepcopy(element)
for bad_tag in el.xpath('.//script | .//style | .//noscript'): bad_tag.drop_tree()
for br in el.xpath('.//br'): br.tail = "\n" + (br.tail if br.tail else "")
text_content = el.text_content()
lines = [line.replace('\xa0', ' ').strip() for line in text_content.splitlines() if
line.replace('\xa0', ' ').strip()]
return "\n".join(lines)
def extract_html_content(html_content, xpath):
try:
tree = html.fromstring(html_content)
elements = tree.xpath(xpath)
if elements:
target_element = elements[0]
cleaned_text = clean_text_structure(target_element)
return cleaned_text
return ""
except Exception:
return ""
def fetch_html_detail(session, record_id):
"""获取HTML页面详情"""
try:
url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={record_id}"
resp = session.get(url, headers=http_headers, timeout=10)
if resp.status_code == 200:
return extract_html_content(resp.content, target_xpath)
return ""
except Exception as e:
print(f" ❌ 获取详情失败 ID {record_id}: {e}")
return ""
# ================= 3. ID 获取逻辑 (搜索 vs 全量) =================
def perform_search(session, query_string):
"""模式1搜索"""
try:
search_url = f"http://111.198.24.44:88/index.php?module=Home&action=UnifiedSearch&selectedmodule=undefined&query_string={query_string}"
resp = session.get(search_url, headers=http_headers)
if resp.status_code == 200:
tree = html.fromstring(resp.content)
crmids = []
links = tree.xpath('//div[@id="collapse-SalesOrder"]//a[contains(@onclick, "record=")]')
if not links:
links = tree.xpath('//a[contains(@onclick, "module=SalesOrder") and contains(@onclick, "record=")]')
for link in links:
onclick = link.get('onclick', '')
match = re.search(r"record=(\d+)", onclick)
if match:
if match.group(1) not in crmids: crmids.append(match.group(1))
return crmids
return []
except Exception:
return []
def perform_full_crawl(session):
"""模式2全量爬取 (修复版:自动翻页 + 强制参数顺序 + 防止最后一页死循环)"""
all_crmids = []
page_size = 100
page = 1
# ★★★ 新增记录上一页的ID列表用于检测死循环 ★★★
last_page_ids = []
print(" 📡 开始全量爬取 (忽略 recTotal检测到页面内容重复时停止)...")
# 手动构建 URL确保参数顺序和浏览器完全一致包含 viewname=476
def build_url(page_num):
action_id = get_current_action_id()
query_str = (
f"module=SalesOrder&"
f"action=SalesOrderAjax&"
f"file=ListViewData&"
f"sorder=&"
f"start={page_num}&"
f"order_by=&"
f"pagesize={page_size}&"
f"actionId={action_id}&"
f"isFilter=true&"
f"search%5Bviewscope%5D=all_to_me&"
f"search%5Bviewname%5D=476" # 关键参数
)
return f"{base_url}?{query_str}"
while True:
current_url = build_url(page)
try:
resp = session.get(current_url, headers=http_headers)
# 1. 尝试解析 JSON
try:
data = resp.json()
except json.JSONDecodeError:
print(f" ❌ 第 {page} 页解析失败:服务器未返回 JSON (可能是Session失效)")
break
# 2. 提取数据列表
entries = data.get('data', [])
if not entries and 'entries' in data:
entries = data['entries']
# 3. 检查是否有数据
if not entries or len(entries) == 0:
print(f" 🏁 第 {page} 页为空 (数据抓取结束)。")
break # 退出循环
# 4. 提取本页 ID
current_page_ids = []
if isinstance(entries, list):
for item in entries:
if isinstance(item, dict):
if 'crmid' in item:
current_page_ids.append(item['crmid'])
elif 'id' in item:
current_page_ids.append(item['id'])
elif isinstance(entries, dict):
current_page_ids = list(entries.keys())
count = len(current_page_ids)
# ★★★ 5. 核心修复:死循环检测 ★★★
# 如果当前页的数据 ID 序列与上一页完全一致(且不是第一页),说明服务器在重复返回最后一页
if page > 1 and current_page_ids == last_page_ids:
print(f" 🛑 第 {page} 页数据与第 {page - 1} 页完全一致,判定为最后一页重复,爬取结束!")
break
# 更新上一页记录
last_page_ids = current_page_ids
# 6. 保存数据
all_crmids.extend(current_page_ids)
print(f" ✅ 第 {page} 页获取成功 (本页 {count} 条)")
# 7. 翻下一页
page += 1
time.sleep(0.5) # 稍微休息
except Exception as e:
print(f" ❌ 请求第 {page} 页发生异常: {e}")
break
# 最终去重 (防止翻页过程中数据插入导致的轻微重复)
all_crmids = list(set(all_crmids))
print(f" 🎉 ID列表获取完毕去重后共: {len(all_crmids)}")
return all_crmids
# ================= 4. 文本解析逻辑 =================
def parse_order_text(text):
"""解析文本为字典"""
if not text: return {}
data = {
"合同编号": "", "内贸合同号": "", "外贸合同号": "",
"签署公司": "", "收款情况": "", "签订日期": "", "销售员": "",
"最终用户单位": "", "最终用户信息联系人": "", "最终用户信息电话": "", "最终用户信息邮箱": "",
"最终用户所在地": "",
"买方单位": "", "买方信息联系人": "", "买方信息电话": "", "买方信息邮箱": "",
"厂家": "", "厂家型号": "", "合同标的": "", "数量": "", "单位": "台/套",
"折扣率(%)": "", "合同额": "", "合同总额": "",
"外购付款方式": "", "最晚发货期": "", "已收款": "", "未收款": "", "收款日期": "",
"IS_ASD": False,
"_temp_second_code": ""
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
key_map = {
"收款账户": "签署公司", "收款状态": "收款情况", "签约日期": "签订日期",
"负责人": "销售员", "客户名称": "最终用户单位", "联系人姓名": "最终用户信息联系人",
"合同总额": "合同总额", "最新收款日期": "收款日期", "最晚发货期": "最晚发货期",
"付款比例及期限": "外购付款方式", "地址": "最终用户所在地", "厂家": "厂家"
}
for i, line in enumerate(lines):
if line == "合同订单编号" and i + 1 < len(lines):
parts = lines[i + 1].strip().split()
if len(parts) >= 1: data["合同编号"] = parts[0]
if len(parts) >= 2: data["_temp_second_code"] = parts[1]
elif line in key_map and i + 1 < len(lines):
target = key_map[line]
if not data[target]: data[target] = lines[i + 1]
elif "合同标的" in line and "品名/型号" in line and i + 1 < len(lines):
parts = lines[i + 1].split('/')
if len(parts) >= 1: data["合同标的"] = parts[0]
if len(parts) >= 2: data["厂家型号"] = parts[1]
if len(parts) >= 3: data["数量"] = parts[2]
if len(parts) >= 5: data["合同额"] = parts[4]
buyer_match = re.search(r"(?:买方|The Buyer)[:]\s*(.*?)(?:\n|$)", text)
if buyer_match and len(buyer_match.group(1)) > 1: data["买方单位"] = buyer_match.group(1).strip()
buyer_ct = re.search(r"联系人Contact person[:]\s*(.*?)(?:\n|$)", text)
if buyer_ct: data["买方信息联系人"] = buyer_ct.group(1).strip()
buyer_tel = re.search(r"电话\(Tel\)[:]\s*(.*?)(?:\s+|$|传真)", text)
if buyer_tel: data["买方信息电话"] = buyer_tel.group(1).strip()
try:
total = float(data["合同总额"]) if data["合同总额"] else 0
if "已收" in data["收款情况"]:
data["已收款"] = str(total); data["未收款"] = "0"
elif "" in data["收款情况"]:
data["已收款"] = "0"; data["未收款"] = str(total)
except:
pass
factory_val = data.get("厂家", "")
if factory_val and "ASD" in factory_val.upper():
data["IS_ASD"] = True
else:
data["IS_ASD"] = False
return data
# ================= 5. 逻辑冲突检查函数 =================
def check_and_print_conflicts(all_records):
"""
1. 检查合同编号本身是否有重复 (PrimaryKey Conflict)
2. 检查内贸/外贸合同号是否对应了多个不同的合同编号 (Logical Conflict)
"""
print("\n" + "=" * 25 + " 数据异常检测报告 " + "=" * 25)
# 1. 检查合同编号自身的重复
contract_ids = [r.get("合同编号", "").strip() for r in all_records if r.get("合同编号")]
id_counts = Counter(contract_ids)
dup_ids = {k: v for k, v in id_counts.items() if v > 1}
print(f"\n📋 [检查1] 合同编号唯一性检查:")
if not dup_ids:
print(" ✅ 通过:没有发现完全重复的合同编号。")
else:
print(f" ❌ 警告:发现 {len(dup_ids)} 个重复的合同编号 (可能存在完全重复的记录):")
for k, v in dup_ids.items():
print(f" 🔸 {k} (出现了 {v} 次)")
# 2. 检查 内贸/外贸号 的逻辑冲突
def detect_mapping_conflict(field_name):
mapping = defaultdict(set)
for record in all_records:
target_val = record.get(field_name, "").strip()
main_id = record.get("合同编号", "").strip()
if target_val and main_id:
mapping[target_val].add(main_id)
conflicts = {k: v for k, v in mapping.items() if len(v) > 1}
print(f"\n📋 [检查2] {field_name} 冲突检查 (是否存在多个合同共用一个号):")
if not conflicts:
print(f" ✅ 通过:每个{field_name}都只对应唯一的合同编号。")
else:
print(f" ❌ 严重警告:发现 {len(conflicts)} 个冲突!以下号码被多个合同共用:")
for val, ids in conflicts.items():
print(f" 🔴 号码 [{val}] 同时出现在以下合同中: {list(ids)}")
detect_mapping_conflict("内贸合同号")
detect_mapping_conflict("外贸合同号")
print("\n" + "=" * 66 + "\n")
# ================= 6. 导出 Excel =================
def export_excel_files(all_records, output_dir, file_prefix):
cols_common = [
"合同编号", "签署公司", "收款情况", "签订日期", "销售员", "厂家",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地",
"买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱",
"厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额",
"外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期"
]
cols_domestic = cols_common[:2] + ["内贸合同号"] + cols_common[2:]
cols_foreign = cols_common[:2] + ["外贸合同号"] + cols_common[2:]
datasets = {
"ASD": {"Domestic": [], "Foreign": [], "Other": []},
"Non_ASD": {"Domestic": [], "Foreign": [], "Other": []}
}
for record in all_records:
main_key = "ASD" if record["IS_ASD"] else "Non_ASD"
c_no = record.get("合同编号", "").strip().upper()
if c_no.startswith('N'):
datasets[main_key]["Domestic"].append(record)
elif c_no.startswith('W'):
datasets[main_key]["Foreign"].append(record)
else:
datasets[main_key]["Other"].append(record)
for type_name in ["ASD", "Non_ASD"]:
filename = f"{type_name}_产品表_{file_prefix}.xlsx"
filepath = os.path.join(output_dir, filename)
subset = datasets[type_name]
df_dom = pd.DataFrame(subset["Domestic"])
df_for = pd.DataFrame(subset["Foreign"])
df_oth = pd.DataFrame(subset["Other"])
# 排序
if not df_dom.empty and "合同编号" in df_dom.columns:
df_dom.sort_values(by="合同编号", ascending=True, inplace=True)
if not df_for.empty and "合同编号" in df_for.columns:
df_for.sort_values(by="合同编号", ascending=True, inplace=True)
has_data = False
try:
with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
if not df_dom.empty:
df_dom.reindex(columns=cols_domestic).to_excel(writer, sheet_name='内贸', index=False)
has_data = True
if not df_for.empty:
df_for.reindex(columns=cols_foreign).to_excel(writer, sheet_name='外贸', index=False)
has_data = True
if not df_oth.empty:
df_oth.reindex(columns=cols_domestic).to_excel(writer, sheet_name='其他', index=False)
has_data = True
if has_data:
print(f" 💾 已生成: {filename}")
except Exception as e:
print(f" ❌ 写入 {filename} 失败: {e}")
# ================= 7. 主程序 =================
def main():
session = requests.Session()
print("================ CRM 爬取助手 (智能防循环版) ================")
print("1. 正在尝试自动登录 CRM...")
# 先访问首页获取基础Cookie
session.get(base_url, headers=http_headers)
session.post(base_url, data=login_payload, headers=http_headers)
if 'PHPSESSID' not in session.cookies:
print(" ❌ 登录失败: 未检测到 Cookie请检查账号密码。")
return
print(" ✅ 登录成功")
print("\n请选择运行模式:")
print(" [1] 搜索模式 (输入关键词)")
print(" [2] 全量爬取 (自动翻页爬取所有)")
mode = input("请输入数字 (1/2): ").strip()
crmids = []
file_tag = ""
if mode == '1':
query = input("\n请输入搜索关键词: ").strip()
if not query: return
print(f" 🔍 正在搜索: {query}")
crmids = perform_search(session, urllib.parse.quote(query))
file_tag = f"搜索_{query}"
elif mode == '2':
print("\n 🚀 开始全量爬取流程...")
crmids = perform_full_crawl(session)
file_tag = "全量爬取"
else:
print(" ❌ 输入无效")
return
if not crmids:
print(" ❌ 未获取到 CRM ID")
return
print(f"\n3. 开始获取 {len(crmids)} 条数据详情...")
all_parsed_data = []
success_count = 0
for i, cid in enumerate(crmids):
# 打印进度条
if i % 10 == 0:
print(f" ⏳ 进度: {i}/{len(crmids)} ...")
text = fetch_html_detail(session, cid)
data = parse_order_text(text)
contract_no = data.get("合同编号", "").strip().upper()
if not contract_no:
continue
data["系统ID"] = cid
# 分配 内贸/外贸号
second_code = data.pop("_temp_second_code", "")
if contract_no.startswith('W'):
data["外贸合同号"] = second_code
elif contract_no.startswith('N'):
data["内贸合同号"] = second_code
else:
data["内贸合同号"] = second_code
all_parsed_data.append(data)
success_count += 1
time.sleep(0.2) # 礼貌延时
print(f"\n ✅ 详情抓取完成。有效记录: {success_count}")
# 4. 逻辑冲突检查
check_and_print_conflicts(all_parsed_data)
# 5. 导出
print("5. 正在导出 Excel...")
ts = time.strftime("%Y%m%d_%H%M%S")
out_dir = f"Result_{ts}"
os.makedirs(out_dir, exist_ok=True)
export_excel_files(all_parsed_data, out_dir, f"{file_tag}_{ts}")
print(f"\n🎉 全部完成!结果保存在: {os.path.abspath(out_dir)}")
if __name__ == "__main__":
main()