import requests import json import time import os from lxml import html import re import urllib.parse import pandas as pd # ★ 引入pandas用于处理多Sheet Excel # ================= 1. 配置区域 ================= base_url = "http://111.198.24.44:88/index.php" # 登录参数 login_payload = { "module": "Users", "action": "Authenticate", "return_module": "Users", "return_action": "Login", "user_name": "TEST", # 请填入真实用户名 "user_password": "****", # 请填入真实密码 "login_theme": "newskin" } # 全局 HTTP 请求头 http_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index" } # ================= 2. 核心辅助函数 ================= def get_current_action_id(): """生成当前时间的13位时间戳""" return int(time.time() * 1000) def clean_text_structure(element): """深度清洗函数""" if element is None: return "" import copy el = copy.deepcopy(element) for bad_tag in el.xpath('.//script | .//style | .//noscript'): bad_tag.drop_tree() for br in el.xpath('.//br'): br.tail = "\n" + (br.tail if br.tail else "") text_content = el.text_content() lines = [] for line in text_content.splitlines(): clean_line = line.replace('\xa0', ' ').strip() if clean_line: lines.append(clean_line) return "\n".join(lines) def extract_html_content(html_content, xpath): try: tree = html.fromstring(html_content) elements = tree.xpath(xpath) if elements: target_element = elements[0] raw_html = html.tostring(target_element, encoding='unicode', pretty_print=True) cleaned_text = clean_text_structure(target_element) return {"raw_html": raw_html, "cleaned_text": cleaned_text} else: return {"raw_html": "", "cleaned_text": ""} except Exception as e: print(f" ❌ HTML解析错误: {e}") return {"raw_html": "", "cleaned_text": ""} def fetch_html_detail(session, record_id, xpath): try: url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={record_id}" resp = session.get(url, headers=http_headers) if resp.status_code == 200: return extract_html_content(resp.content, xpath) return {"raw_html": "", "cleaned_text": ""} except Exception: return {"raw_html": "", "cleaned_text": ""} def extract_crmid_from_search_result(html_content): crmids = [] try: tree = html.fromstring(html_content) links = tree.xpath('//div[@id="collapse-SalesOrder"]//a[contains(@onclick, "record=")]') if not links: links = tree.xpath('//a[contains(@onclick, "module=SalesOrder") and contains(@onclick, "record=")]') for link in links: onclick = link.get('onclick', '') match = re.search(r"record=(\d+)", onclick) if match: crmid = match.group(1) if crmid not in crmids: crmids.append(crmid) return crmids except Exception: return [] def perform_search(session, query_string): try: search_url = f"http://111.198.24.44:88/index.php?module=Home&action=UnifiedSearch&selectedmodule=undefined&query_string={query_string}" resp = session.get(search_url, headers=http_headers) if resp.status_code == 200: return extract_crmid_from_search_result(resp.content) return [] except Exception: return [] # ================= 3. 核心解析逻辑 ================= def parse_order_text(text): """ 解析文本,返回通用字典 """ if not text: return {} # 初始化通用字段池 (包含内贸和外贸所有可能用到的字段) data = { "合同编号": "", "内贸合同号": "", "外贸合同号": "", "签署公司": "", "收款情况": "", "签订日期": "", "销售员": "", "最终用户单位": "", "最终用户信息联系人": "", "最终用户信息电话": "", "最终用户信息邮箱": "", "最终用户所在地": "", "买方单位": "", "买方信息联系人": "", "买方信息电话": "", "买方信息邮箱": "", "厂家型号": "", "合同标的": "", "数量": "", "单位": "台/套", "折扣率(%)": "", "合同额": "", "合同总额": "", "外购付款方式": "", "最晚发货期": "", "已收款": "", "未收款": "", "收款日期": "" } lines = [line.strip() for line in text.split('\n') if line.strip()] # 映射表:文本中的Key -> 数据字典中的Key key_map = { "收款账户": "签署公司", "收款状态": "收款情况", "签约日期": "签订日期", "负责人": "销售员", "客户名称": "最终用户单位", "联系人姓名": "最终用户信息联系人", "合同总额": "合同总额", "最新收款日期": "收款日期", "最晚发货期": "最晚发货期", "付款比例及期限": "外购付款方式", # 这里对应您的要求 "地址": "最终用户所在地" } for i, line in enumerate(lines): # 1.0 合同订单编号处理 if line == "合同订单编号": if i + 1 < len(lines): full_val = lines[i + 1].strip() parts = full_val.split() if len(parts) >= 1: data["合同编号"] = parts[0] # 判断第二部分是内贸号还是外贸号,暂时先都存起来,在外面根据W/N区分 if len(parts) >= 2: # 临时存储,稍后在 main 函数里根据 W/N 决定赋给谁 data["_temp_second_code"] = parts[1] # 1.1 常规映射 elif line in key_map: if i + 1 < len(lines): target_key = key_map[line] if not data[target_key]: data[target_key] = lines[i + 1] # 1.2 产品行解析 elif "合同标的" in line and "品名/型号" in line: if i + 1 < len(lines): parts = lines[i + 1].split('/') # 格式假设: 标的/型号/数量/单价/总价 if len(parts) >= 1: data["合同标的"] = parts[0] if len(parts) >= 2: data["厂家型号"] = parts[1] if len(parts) >= 3: data["数量"] = parts[2] if len(parts) >= 5: data["合同额"] = parts[4] # 1.3 折扣率 (如果有这个字段的话,通常在产品附近) # 这里假设如果没有明确字段,暂留空,或者您有特定的关键词提取逻辑 # 2. 正则提取买方信息 buyer_match = re.search(r"(?:买方|The Buyer)[::]\s*(.*?)(?:\n|$)", text) if buyer_match and len(buyer_match.group(1)) > 1: data["买方单位"] = buyer_match.group(1).strip() buyer_contact = re.search(r"联系人(Contact person)[::]\s*(.*?)(?:\n|$)", text) if buyer_contact: data["买方信息联系人"] = buyer_contact.group(1).strip() buyer_tel = re.search(r"电话\(Tel\)[::]\s*(.*?)(?:\s+|$|传真)", text) if buyer_tel: data["买方信息电话"] = buyer_tel.group(1).strip() # 3. 计算已收/未收 try: total = float(data["合同总额"]) if data["合同总额"] else 0 status = data["收款情况"] if "已收" in status: data["已收款"] = str(total) data["未收款"] = "0" elif "未" in status: data["已收款"] = "0" data["未收款"] = str(total) except: pass return data # ================= 4. 主程序逻辑 ================= def main(): session = requests.Session() target_xpath = "/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]" try: # --- 1. 登录 --- print("1. 正在登录...") session.post(base_url, data=login_payload, headers=http_headers) if 'PHPSESSID' in session.cookies: print(" ✅ 登录成功") else: print(" ⚠️ 警告: 未检测到Cookie,可能登录失败") # --- 2. 搜索 --- print("\n2. 请输入搜索内容:") query_input = input(" 搜索关键词: ").strip() if not query_input: return encoded_query = urllib.parse.quote(query_input) print(f"\n3. 执行搜索...") crmids = perform_search(session, encoded_query) if not crmids: print(" ❌ 未找到相关订单。") return print(f" ✅ 找到 {len(crmids)} 个订单 ID: {crmids}") # --- 3. 抓取与分类 --- print(f"\n4. 开始获取详情并分类处理...") # 定义三个列表用于存储不同类型的数据 list_domestic = [] # 内贸 (N开头) list_foreign = [] # 外贸 (W开头) list_other = [] # 其他 valid_count = 0 for i, crmid in enumerate(crmids): print(f" [{i + 1}/{len(crmids)}] 处理 ID: {crmid}") html_data = fetch_html_detail(session, crmid, target_xpath) clean_text = html_data['cleaned_text'] # 解析 data = parse_order_text(clean_text) contract_no = data.get("合同编号", "").strip().upper() # 转大写处理 # ★ 过滤空数据 if not contract_no: print(f" ⚠️ 跳过: 未找到合同编号") continue # ★ 核心分类逻辑 second_code = data.pop("_temp_second_code", "") # 取出临时存的第二段编号 if contract_no.startswith('W'): # 外贸 data['外贸合同号'] = second_code list_foreign.append(data) print(f" 🌍 归类: [外贸] {contract_no}") elif contract_no.startswith('N'): # 内贸 data['内贸合同号'] = second_code list_domestic.append(data) print(f" 🏠 归类: [内贸] {contract_no}") else: # 其他 data['内贸合同号'] = second_code # 默认存这里 list_other.append(data) print(f" ❓ 归类: [其他] {contract_no}") valid_count += 1 time.sleep(0.5) # --- 4. 导出 Excel (多Sheet) --- print(f"\n5. 正在导出 Excel 文件...") if valid_count == 0: print(" ❌ 无有效数据导出") return timestamp = time.strftime("%Y%m%d_%H%M%S") output_dir = f"Result_{timestamp}" os.makedirs(output_dir, exist_ok=True) xlsx_filename = os.path.join(output_dir, f"Export_{query_input}_{timestamp}.xlsx") # 定义列顺序 (表头) # 内贸表头 cols_domestic = [ "合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", "销售员", "最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地", "买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱", "厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额", "外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期" ] # 外贸表头 (参考内贸稍作调整) cols_foreign = [ "合同编号", "签署公司", "外贸合同号", "收款情况", "签订日期", "销售员", "最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地", "买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱", "厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额", "外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期" ] # 使用 Pandas ExcelWriter 写入多个 Sheet try: with pd.ExcelWriter(xlsx_filename, engine='openpyxl') as writer: # 1. 写入内贸 Sheet if list_domestic: df_domestic = pd.DataFrame(list_domestic) # 按照指定列顺序排列,如果数据里没有该列会自动填空 df_domestic = df_domestic.reindex(columns=cols_domestic) df_domestic.to_excel(writer, sheet_name='内贸', index=False) # 2. 写入外贸 Sheet if list_foreign: df_foreign = pd.DataFrame(list_foreign) df_foreign = df_foreign.reindex(columns=cols_foreign) df_foreign.to_excel(writer, sheet_name='外贸', index=False) # 3. 写入其他 Sheet if list_other: df_other = pd.DataFrame(list_other) # 其他表也暂用内贸的表头格式 df_other = df_other.reindex(columns=cols_domestic) df_other.to_excel(writer, sheet_name='其他', index=False) print(f" ✅ 成功导出多Sheet表格: {os.path.abspath(xlsx_filename)}") print(f" - 内贸: {len(list_domestic)} 条") print(f" - 外贸: {len(list_foreign)} 条") print(f" - 其他: {len(list_other)} 条") except ImportError: print(" ❌ 错误: 缺少 pandas 或 openpyxl 库。") print(" 请在终端运行: pip install pandas openpyxl") except Exception as e: print(f" ❌ 写入 Excel 失败: {e}") except Exception as e: print(f"\n❌ 程序发生错误: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()