Files
Contract-document-crawling-…/拿取内容.py
2026-01-17 13:40:52 +08:00

326 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import time
import os
from lxml import html
import re
# ================= 1. 配置区域 =================
base_url = "http://111.198.24.44:88/index.php"
# 登录参数
login_payload = {
"module": "Users",
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "TEST", # 在这里填入真实的用户名
"user_password": "test", # 在这里填入真实的密码
"login_theme": "newskin"
}
# 列表查询参数
list_payload = {
"module": "SalesOrder",
"action": "SalesOrderAjax",
"file": "ListViewData",
"sorder": "",
"start": "1",
"pagesize": "100", # 设置抓取数量
"actionId": "", # 稍后自动填充
"isFilter": "true",
"search[viewscope]": "all_to_me",
"search[viewname]": "324126",
# 筛选条件
"filter[Fields0]": "subject",
"filter[Condition0]": "cts",
"filter[Srch_value0]": "W25A",
"filter[type0]": "text",
"filter[dateCondition1]": "prevfy",
"filter[Fields1]": "duedate",
"filter[Condition1]": "btwa",
"filter[Srch_value1]": "2025-01-01,2025-12-31",
"filter[type1]": "date",
"filter[Fields2]": "subject",
"filter[Condition2]": "dcts",
"filter[Srch_value2]": "取消",
"filter[type2]": "text",
"filter[search_cnt]": "3",
"filter[matchtype]": "all"
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index"
}
# ================= 2. 辅助函数 =================
def get_current_action_id():
"""生成当前时间的13位时间戳"""
return int(time.time() * 1000)
def clean_html_tags(text):
"""清洗HTML标签保留文本内容"""
if not text:
return ""
# 移除HTML标签
clean_text = re.sub(r'<[^>]+>', ' ', text)
# 替换HTML实体
clean_text = clean_text.replace('&nbsp;', ' ')
# 合并多个空格和换行符
clean_text = re.sub(r'\s+', ' ', clean_text)
# 去除首尾空格
clean_text = clean_text.strip()
return clean_text
def extract_html_content(html_content, xpath):
"""从HTML中提取指定XPath的内容"""
try:
# 解析HTML
tree = html.fromstring(html_content)
# 尝试提取指定XPath的内容
elements = tree.xpath(xpath)
if elements:
# 获取元素的HTML内容
element_html = html.tostring(elements[0], encoding='unicode', pretty_print=True)
# 清洗HTML标签
cleaned_text = clean_html_tags(element_html)
# 同时保留原始HTML和清洗后的文本
return {
"raw_html": element_html,
"cleaned_text": cleaned_text
}
else:
print(f" ⚠️ 未找到XPath: {xpath}")
return {
"raw_html": "",
"cleaned_text": ""
}
except Exception as e:
print(f" ❌ HTML解析错误: {e}")
return {
"raw_html": "",
"cleaned_text": ""
}
def fetch_html_detail(session, record_id, xpath):
"""获取HTML页面详情并提取指定XPath内容"""
try:
# 构造HTML详情页URL
html_url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={record_id}"
# 获取HTML页面
html_response = session.get(html_url, headers=headers)
if html_response.status_code == 200:
# 提取指定XPath的内容
extracted_content = extract_html_content(html_response.content, xpath)
return extracted_content
else:
print(f" ❌ HTML页面请求失败: HTTP {html_response.status_code}")
return {
"raw_html": "",
"cleaned_text": ""
}
except Exception as e:
print(f" ❌ 获取HTML详情失败: {e}")
return {
"raw_html": "",
"cleaned_text": ""
}
# ================= 3. 主程序逻辑 =================
def main():
session = requests.Session()
# 指定要提取的XPath
target_xpath = "/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]"
try:
# --- 第一步:登录 ---
print("1. 正在登录...")
login_response = session.post(base_url, data=login_payload, headers=headers)
# 检查是否拿到 Cookie
if 'PHPSESSID' not in session.cookies:
print("⚠️ 警告:未检测到 PHPSESSID登录可能失败后续操作可能会出错。")
else:
print(" ✅ 登录成功Cookie 已获取。")
# --- 第二步:获取列表 ---
print("\n2. 正在获取订单列表...")
list_payload['actionId'] = get_current_action_id()
list_resp = session.post(base_url, data=list_payload, headers=headers)
try:
list_data = list_resp.json()
except json.JSONDecodeError:
print("❌ 错误:列表接口返回的不是 JSON 数据。")
print("返回内容预览:", list_resp.text[:200])
return
# === 智能解析列表数据 ===
orders = []
# 策略 A: 如果返回的是直接的列表
if isinstance(list_data, list):
orders = list_data
# 策略 B: 如果返回的是字典
elif isinstance(list_data, dict):
# 1. 尝试查找常见的列表键名
found_key = False
possible_keys = ['entries', 'rows', 'data', 'records', 'list']
for key in possible_keys:
if key in list_data and isinstance(list_data[key], list):
orders = list_data[key]
print(f" [系统] 自动在键名 '{key}' 下找到数据列表。")
found_key = True
break
# 2. 如果没找到键名,尝试智能提取
if not found_key:
print(" [系统] 未找到标准键名,正在尝试智能提取字典对象...")
# 遍历字典的所有值,找出那些看起来像"订单"的字典
for val in list_data.values():
if isinstance(val, dict) and ('crmid' in val or 'salesorderid' in val or 'id' in val):
orders.append(val)
if not orders:
print("❌ 错误:未能提取到任何订单数据。")
# 调试用:保存一下原始返回数据看看结构
with open("debug_list_response.json", "w", encoding="utf-8") as f:
json.dump(list_data, f, ensure_ascii=False, indent=4)
return
print(f" ✅ 成功提取到 {len(orders)} 条有效订单。")
# --- 第三步:循环获取详情 ---
print("\n3. 开始逐个获取订单详情...")
success_count = 0
for index, order in enumerate(orders):
# 防御性检查:确保 order 是字典
if not isinstance(order, dict):
continue
# 1. 获取 ID (尝试多个可能的字段名)
record_id = order.get('crmid') or order.get('salesorderid') or order.get('id')
if not record_id:
print(f" ⚠️ 第 {index + 1} 条数据没有找到 ID跳过。")
continue
print(f"\n [{index + 1}/{len(orders)}] 处理订单 ID: {record_id}")
# 2. 获取JSON详情 (产品详情)
json_detail = None
detail_payload = {
"module": "Plugins",
"pluginName": "DetailProductTable",
"action": "getTableData",
"moduleName": "SalesOrder",
"record": record_id,
"actionId": get_current_action_id(),
"isTool": "1"
}
try:
# 请求JSON详情
detail_resp = session.post(base_url, data=detail_payload, headers=headers)
json_detail = detail_resp.json()
print(f" ✅ JSON详情获取成功")
except Exception as e:
print(f" ❌ JSON详情获取失败: {e}")
json_detail = {"error": str(e)}
# 3. 获取HTML详情并提取指定XPath内容
print(f" 正在获取HTML详情...")
html_content = fetch_html_detail(session, record_id, target_xpath)
# 4. 将详情合并到原数据中
order['json_details'] = json_detail
order['html_details'] = html_content
# 5. 创建一个合并的字段,方便查看
order['combined_data'] = {
"crmid": record_id,
"json_data": json_detail,
"html_extracted_text": html_content.get("cleaned_text", ""),
"html_raw": html_content.get("raw_html", "")
}
success_count += 1
print(f" ✅ 订单 {record_id} 处理完成")
# 礼貌性延时,避免请求过快
time.sleep(0.5)
# --- 第四步:保存结果 ---
print(f"\n4. 正在保存结果...")
# 创建存储目录
output_dir = "crm_data"
os.makedirs(output_dir, exist_ok=True)
# 保存完整的合并数据
full_filename = os.path.join(output_dir, "all_orders_combined.json")
with open(full_filename, 'w', encoding='utf-8') as f:
json.dump(orders, f, ensure_ascii=False, indent=4)
# 同时按crmid分别存储
print(f" 正在按CRM ID分别存储文件...")
for order in orders:
record_id = order.get('crmid') or order.get('salesorderid') or order.get('id')
if record_id:
# 单独保存每个crmid的数据
single_filename = os.path.join(output_dir, f"crm_{record_id}.json")
with open(single_filename, 'w', encoding='utf-8') as f:
json.dump(order, f, ensure_ascii=False, indent=4)
# 保存提取的文本内容为文本文件,便于查看
text_filename = os.path.join(output_dir, "extracted_texts.txt")
with open(text_filename, 'w', encoding='utf-8') as f:
f.write("=== 提取的HTML文本内容 ===\n\n")
for order in orders:
record_id = order.get('crmid') or order.get('salesorderid') or order.get('id')
if record_id:
extracted_text = order.get('html_details', {}).get('cleaned_text', '')
if extracted_text:
f.write(f"\n--- CRM ID: {record_id} ---\n")
f.write(f"{extracted_text}\n")
f.write("-" * 50 + "\n")
print(f"\n✅ 全部完成!")
print(f" 成功处理: {success_count}/{len(orders)} 个订单")
print(f" 文件保存目录: {os.path.abspath(output_dir)}")
print(f" 主要文件:")
print(f" - {full_filename}")
print(f" - {text_filename}")
print(f" - 按CRM ID单独存储的 {success_count} 个JSON文件")
except Exception as e:
print(f"\n❌ 程序发生未捕获的错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()