Files
Contract-document-crawling-…/添加日期选择.py
2026-01-18 11:31:40 +08:00

469 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import time
import os
from lxml import html
import re
import urllib.parse
import pandas as pd
from collections import defaultdict, Counter
from datetime import datetime
# ================= 1. 配置区域 (保持不变) =================
base_url = "http://111.198.24.44:88/index.php"
login_payload = {
"module": "Users",
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "TEST", # ★★★ 请填入真实用户名
"user_password": "test", # ★★★ 请填入真实密码
"login_theme": "newskin"
}
http_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01"
}
target_xpath = "/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]"
# ================= 2. 核心辅助函数 =================
def get_current_action_id():
return int(time.time() * 1000)
def clean_text_structure(element):
"""深度清洗函数"""
if element is None: return ""
import copy
el = copy.deepcopy(element)
for bad_tag in el.xpath('.//script | .//style | .//noscript'): bad_tag.drop_tree()
for br in el.xpath('.//br'): br.tail = "\n" + (br.tail if br.tail else "")
text_content = el.text_content()
lines = [line.replace('\xa0', ' ').strip() for line in text_content.splitlines() if
line.replace('\xa0', ' ').strip()]
return "\n".join(lines)
def extract_html_content(html_content, xpath):
try:
tree = html.fromstring(html_content)
elements = tree.xpath(xpath)
if elements:
target_element = elements[0]
cleaned_text = clean_text_structure(target_element)
return cleaned_text
return ""
except Exception:
return ""
def fetch_html_detail(session, record_id):
"""获取HTML页面详情"""
try:
url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={record_id}"
resp = session.get(url, headers=http_headers, timeout=10)
if resp.status_code == 200:
return extract_html_content(resp.content, target_xpath)
return ""
except Exception as e:
print(f" ❌ 获取详情失败 ID {record_id}: {e}")
return ""
# ================= 3. 辅助:从详情文本中提取时间 =================
def extract_time_from_text(text):
"""
从详情页的纯文本中查找类似 2026-01-15 17:19:16 的时间
策略:找到所有符合格式的时间,取最大的那个(通常是修改时间)
"""
if not text:
return None
# 正则匹配 YYYY-MM-DD HH:MM:SS
matches = re.findall(r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})", text)
if not matches:
# 如果没有时分秒,尝试匹配 YYYY-MM-DD
matches = re.findall(r"(\d{4}-\d{2}-\d{2})", text)
if matches:
# 补全为当天的0点防止报错但精度会降低
return datetime.strptime(matches[0], "%Y-%m-%d")
return None
# 找到所有的日期时间对象
dt_objects = []
for m in matches:
try:
dt_objects.append(datetime.strptime(m, "%Y-%m-%d %H:%M:%S"))
except:
pass
if dt_objects:
# 假设详情页里最新的时间通常是修改时间或创建时间
# 我们取所有时间里最新的一个作为参考
return max(dt_objects)
return None
# ================= 4. 核心逻辑:范围爬取 (JSON列表 -> HTML详情 -> 判读时间) =================
def perform_date_range_crawl(session, start_date_str, end_date_str):
"""
针对时间隐藏在HTML详情页的场景优化
1. 请求列表 (盲排序: 让服务器按 modifiedtime 倒序)
2. 必须进入详情页抓取文本
3. 在文本中提取时间
4. 判断是否停止
"""
final_data_list = [] # 直接在这里存解析好的数据,避免重复请求
page_size = 50 # 降低分页大小,因为每页都要深入爬取,太大容易超时或内存高
page = 1
last_page_ids = []
try:
target_start = datetime.strptime(start_date_str, "%Y-%m-%d")
# 结束时间设为当天的 23:59:59
target_end = datetime.strptime(end_date_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
except ValueError:
print(" ❌ 日期格式错误")
return []
print(f" 📅 目标区间: {target_start}{target_end}")
print(" 📡 正在执行 [列表->详情->时间判断] 策略...")
stop_flag = False
while not stop_flag:
action_id = get_current_action_id()
# 依然请求服务器倒序,虽然列表里不显示,但希望服务器能按这个顺序发给我们
current_url = (
f"{base_url}?module=SalesOrder&action=SalesOrderAjax&file=ListViewData&"
f"sorder=DESC&order_by=modifiedtime&" # 关键:盲注排序参数
f"start={page}&pagesize={page_size}&actionId={action_id}&isFilter=true&"
f"search%5Bviewscope%5D=all_to_me&search%5Bviewname%5D=476"
)
try:
resp = session.get(current_url, headers=http_headers)
try:
data = resp.json()
except:
print(f" ❌ 第 {page} 页 JSON 解析失败")
break
entries = data.get('data', [])
if not entries and 'entries' in data: entries = data['entries']
if not entries:
print(" 🏁 列表数据为空,停止。")
break
# 提取本页ID
current_page_ids = []
for item in entries:
if isinstance(item, dict):
cid = item.get('crmid') or item.get('id')
if cid: current_page_ids.append(cid)
# 死循环检测
if page > 1 and current_page_ids == last_page_ids:
print(" 🛑 页面ID重复判定为最后一页停止。")
break
last_page_ids = current_page_ids
print(f" 🔎 第 {page} 页: 预加载 {len(current_page_ids)} 条ID正在逐条进入详情页检查时间...")
# === 逐条进入详情页 ===
page_valid_count = 0
for cid in current_page_ids:
# 1. 获取详情文本
text = fetch_html_detail(session, cid)
# 2. 从详情文本中提取时间
record_time = extract_time_from_text(text)
# 3. 时间判断逻辑
if record_time:
time_str = record_time.strftime("%Y-%m-%d %H:%M:%S")
if record_time > target_end:
# 太新了,跳过,继续看下一条
# print(f" ⏭ ID {cid} 时间 {time_str} > 目标区间 (太新)")
continue
elif record_time < target_start:
# 太旧了!因为是倒序,后面的一定更旧
print(f" 🛑 发现 ID {cid} 时间 {time_str} 早于起始日期,触发熔断停止!")
stop_flag = True
break # 跳出 for 循环
else:
# 命中!
print(f" ✅ 命中: ID {cid} 时间 {time_str}")
# 顺便把数据解析了,不用后面再爬一次
parsed = parse_order_text(text)
parsed["系统ID"] = cid
# 处理内贸外贸号逻辑
c_no = parsed.get("合同编号", "").strip().upper()
sc = parsed.pop("_temp_second_code", "")
if c_no.startswith('W'):
parsed["外贸合同号"] = sc
else:
parsed["内贸合同号"] = sc
final_data_list.append(parsed)
page_valid_count += 1
else:
# 如果详情页里完全找不到时间(可能是格式不对,或者是空页面)
# 保守策略:如果还没触发停止,就先收录(或者你可以选择跳过)
# 这里选择跳过并打印警告
# print(f" ⚠️ ID {cid} 未找到时间,已跳过")
pass
print(f" 📊 第 {page} 页处理完毕。有效入库: {page_valid_count}")
page += 1
if stop_flag: break
# time.sleep(0.5) # 因为fetch_html_detail里通常有耗时这里不需要额外sleep太久
except Exception as e:
print(f" ❌ 异常: {e}")
break
return final_data_list
# ================= 5. 其他函数 (保持不变) =================
# 注意parse_order_text, check_and_print_conflicts, export_excel_files
# 这些函数完全不用动,直接用之前的即可。
# ... (为节省篇幅,此处省略,请确保它们存在于你的代码中) ...
def parse_order_text(text):
"""(保持你原有的解析逻辑)"""
if not text: return {}
data = {
"合同编号": "", "内贸合同号": "", "外贸合同号": "",
"签署公司": "", "收款情况": "", "签订日期": "", "销售员": "",
"最终用户单位": "", "最终用户信息联系人": "", "最终用户信息电话": "", "最终用户信息邮箱": "",
"最终用户所在地": "",
"买方单位": "", "买方信息联系人": "", "买方信息电话": "", "买方信息邮箱": "",
"厂家": "", "厂家型号": "", "合同标的": "", "数量": "", "单位": "台/套",
"折扣率(%)": "", "合同额": "", "合同总额": "",
"外购付款方式": "", "最晚发货期": "", "已收款": "", "未收款": "", "收款日期": "",
"IS_ASD": False, "_temp_second_code": ""
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
key_map = {
"收款账户": "签署公司", "收款状态": "收款情况", "签约日期": "签订日期",
"负责人": "销售员", "客户名称": "最终用户单位", "联系人姓名": "最终用户信息联系人",
"合同总额": "合同总额", "最新收款日期": "收款日期", "最晚发货期": "最晚发货期",
"付款比例及期限": "外购付款方式", "地址": "最终用户所在地", "厂家": "厂家"
}
for i, line in enumerate(lines):
if line == "合同订单编号" and i + 1 < len(lines):
parts = lines[i + 1].strip().split()
if len(parts) >= 1: data["合同编号"] = parts[0]
if len(parts) >= 2: data["_temp_second_code"] = parts[1]
elif line in key_map and i + 1 < len(lines):
target = key_map[line]
if not data[target]: data[target] = lines[i + 1]
elif "合同标的" in line and "品名/型号" in line and i + 1 < len(lines):
parts = lines[i + 1].split('/')
if len(parts) >= 1: data["合同标的"] = parts[0]
if len(parts) >= 2: data["厂家型号"] = parts[1]
if len(parts) >= 3: data["数量"] = parts[2]
if len(parts) >= 5: data["合同额"] = parts[4]
buyer_match = re.search(r"(?:买方|The Buyer)[:]\s*(.*?)(?:\n|$)", text)
if buyer_match and len(buyer_match.group(1)) > 1: data["买方单位"] = buyer_match.group(1).strip()
buyer_ct = re.search(r"联系人Contact person[:]\s*(.*?)(?:\n|$)", text)
if buyer_ct: data["买方信息联系人"] = buyer_ct.group(1).strip()
buyer_tel = re.search(r"电话\(Tel\)[:]\s*(.*?)(?:\s+|$|传真)", text)
if buyer_tel: data["买方信息电话"] = buyer_tel.group(1).strip()
try:
total = float(data["合同总额"]) if data["合同总额"] else 0
if "已收" in data["收款情况"]:
data["已收款"] = str(total);
data["未收款"] = "0"
elif "" in data["收款情况"]:
data["已收款"] = "0";
data["未收款"] = str(total)
except:
pass
factory_val = data.get("厂家", "")
if factory_val and "ASD" in factory_val.upper():
data["IS_ASD"] = True
else:
data["IS_ASD"] = False
return data
def check_and_print_conflicts(all_records):
# (保持不变,省略)
pass
def export_excel_files(all_records, output_dir, file_prefix):
# (保持不变,省略)
cols_common = [
"合同编号", "签署公司", "收款情况", "签订日期", "销售员", "厂家",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地",
"买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱",
"厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额",
"外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期"
]
cols_domestic = cols_common[:2] + ["内贸合同号"] + cols_common[2:]
cols_foreign = cols_common[:2] + ["外贸合同号"] + cols_common[2:]
datasets = {"ASD": {"Domestic": [], "Foreign": [], "Other": []},
"Non_ASD": {"Domestic": [], "Foreign": [], "Other": []}}
for record in all_records:
main_key = "ASD" if record["IS_ASD"] else "Non_ASD"
c_no = record.get("合同编号", "").strip().upper()
if c_no.startswith('N'):
datasets[main_key]["Domestic"].append(record)
elif c_no.startswith('W'):
datasets[main_key]["Foreign"].append(record)
else:
datasets[main_key]["Other"].append(record)
for type_name in ["ASD", "Non_ASD"]:
filename = f"{type_name}_产品表_{file_prefix}.xlsx"
filepath = os.path.join(output_dir, filename)
subset = datasets[type_name]
df_dom = pd.DataFrame(subset["Domestic"])
df_for = pd.DataFrame(subset["Foreign"])
df_oth = pd.DataFrame(subset["Other"])
if not df_dom.empty and "合同编号" in df_dom.columns: df_dom.sort_values(by="合同编号", ascending=True,
inplace=True)
if not df_for.empty and "合同编号" in df_for.columns: df_for.sort_values(by="合同编号", ascending=True,
inplace=True)
has_data = False
try:
with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
if not df_dom.empty:
df_dom.reindex(columns=cols_domestic).to_excel(writer, sheet_name='内贸', index=False);
has_data = True
if not df_for.empty:
df_for.reindex(columns=cols_foreign).to_excel(writer, sheet_name='外贸', index=False);
has_data = True
if not df_oth.empty:
df_oth.reindex(columns=cols_domestic).to_excel(writer, sheet_name='其他', index=False);
has_data = True
if has_data: print(f" 💾 已生成: {filename}")
except Exception as e:
print(f" ❌ 写入 {filename} 失败: {e}")
# ================= 6. 主程序 =================
def main():
session = requests.Session()
print("================ CRM 爬取助手 (深度时间过滤版) ================")
# 登录流程 (保持不变)
print("1. 正在自动登录...")
session.get(base_url, headers=http_headers)
session.post(base_url, data=login_payload, headers=http_headers)
if 'PHPSESSID' not in session.cookies:
print(" ❌ 登录失败")
return
print(" ✅ 登录成功")
print("\n请选择运行模式:")
print(" [1] 搜索模式")
print(" [2] 全量爬取 (慢)")
print(" [3] 范围爬取 (推荐! 自动进入详情页检查时间)")
mode = input("请输入 (1/2/3): ").strip()
final_data = [] # 存储最终结果
file_tag = ""
if mode == '1':
# 搜索模式逻辑 (保持不变, 需要稍微调整结构以复用解析)
query = input("\n请输入搜索关键词: ").strip()
if query:
crmids = perform_search(session, urllib.parse.quote(query)) # 需确保perform_search函数存在
# 搜索模式比较简单,直接循环抓取即可
for cid in crmids:
text = fetch_html_detail(session, cid)
parsed = parse_order_text(text)
if parsed.get("合同编号"):
parsed["系统ID"] = cid
c_no = parsed["合同编号"].upper()
sc = parsed.pop("_temp_second_code", "")
if c_no.startswith('W'):
parsed["外贸合同号"] = sc
else:
parsed["内贸合同号"] = sc
final_data.append(parsed)
file_tag = f"搜索_{query}"
elif mode == '2':
# 全量模式逻辑 (保持不变)
pass # 这里你可以调用之前的 perform_full_crawl 然后再循环抓详情,或者直接用下面的结构
elif mode == '3':
print("\n 📅 请输入时间范围 (格式: YYYY-MM-DD)")
s_date = input(" 开始日期 (如 2025-12-01): ").strip()
e_date = input(" 结束日期 (如 2026-01-15): ").strip()
if s_date and e_date:
# ★ 直接调用新的函数,它会返回解析好的数据列表
final_data = perform_date_range_crawl(session, s_date, e_date)
file_tag = f"范围_{s_date}_{e_date}"
if not final_data:
print(" ❌ 未获取到数据或已退出")
return
print(f"\n ✅ 抓取完成。有效记录: {len(final_data)}")
# 导出 (保持不变)
# check_and_print_conflicts(final_data) # 如果你需要冲突检查
ts = time.strftime("%Y%m%d_%H%M%S")
out_dir = f"Result_{ts}"
os.makedirs(out_dir, exist_ok=True)
export_excel_files(final_data, out_dir, f"{file_tag}_{ts}")
print(f"\n🎉 结果已保存: {out_dir}")
# 需要把之前定义的 perform_search 和 perform_full_crawl 补全在文件里才能运行模式1和2
# 如果只跑模式3上面的代码已经足够
def perform_search(session, query_string):
# (保持原有搜索代码)
try:
search_url = f"http://111.198.24.44:88/index.php?module=Home&action=UnifiedSearch&selectedmodule=undefined&query_string={query_string}"
resp = session.get(search_url, headers=http_headers)
if resp.status_code == 200:
tree = html.fromstring(resp.content)
crmids = []
links = tree.xpath('//div[@id="collapse-SalesOrder"]//a[contains(@onclick, "record=")]')
if not links:
links = tree.xpath('//a[contains(@onclick, "module=SalesOrder") and contains(@onclick, "record=")]')
for link in links:
onclick = link.get('onclick', '')
match = re.search(r"record=(\d+)", onclick)
if match:
if match.group(1) not in crmids: crmids.append(match.group(1))
return crmids
return []
except:
return []
if __name__ == "__main__":
main()