Compare commits

...

5 Commits

Author SHA1 Message Date
DXC
de5797378e 测试版终版 2026-01-19 12:47:03 +08:00
DXC
eb8e1221fe 测试版 2026-01-19 10:46:05 +08:00
659edeba48 1.0带页面内容 2026-01-18 11:31:40 +08:00
b42698fb5c 搜索测试成功 2026-01-17 13:40:52 +08:00
9da92a4489 登录以及获取信息测试 2026-01-16 15:16:35 +08:00
8 changed files with 3324 additions and 2 deletions

761
new_页面内容.py Normal file
View File

@ -0,0 +1,761 @@
import sys
import os
import time
import threading
import re
import urllib.parse
import webbrowser
import json
from datetime import datetime, timedelta
import tkinter as tk
from tkinter import filedialog, messagebox
import requests
import pandas as pd
from lxml import html
# ================= 1. 导入 UI 库 (已修正路径) =================
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from ttkbootstrap.dialogs import Messagebox
# 修正后的组件导入
try:
from ttkbootstrap.widgets import DateEntry
from ttkbootstrap.scrolled import ScrolledText
from ttkbootstrap.tableview import Tableview
from ttkbootstrap.toast import ToastNotification
except ImportError:
# 兼容性导入
from ttkbootstrap.widgets import DateEntry
from tkinter.scrolledtext import ScrolledText
from ttkbootstrap.tableview import Tableview
from ttkbootstrap.toast import ToastNotification
# ================= 2. 后端核心逻辑 =================
class CRMCrawler:
def __init__(self, log_callback, data_callback):
self.log = log_callback
self.on_data = data_callback
self.stop_flag = False
self.session = requests.Session()
self.base_url = "http://111.198.24.44:88/index.php"
self.http_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01"
}
def login(self, username, password):
self.log(f"🔑 正在登录... 用户: {username}")
login_payload = {
"module": "Users", "action": "Authenticate", "return_module": "Users",
"return_action": "Login", "user_name": username, "user_password": password, "login_theme": "newskin"
}
try:
self.session.get(self.base_url, headers=self.http_headers)
self.session.post(self.base_url, data=login_payload, headers=self.http_headers)
if 'PHPSESSID' in self.session.cookies:
self.log("✅ 登录成功!")
return True
else:
self.log("❌ 登录失败:请检查账号密码")
return False
except Exception as e:
self.log(f"❌ 网络错误: {str(e)}")
return False
def get_timestamp(self):
return int(time.time() * 1000)
def clean_num(self, val):
if val is None or val == "": return ""
try:
f_val = float(val)
if f_val.is_integer():
return str(int(f_val))
else:
return str(f_val)
except:
return str(val)
def _safe_float(self, val):
try:
return float(val)
except:
return 0.0
def fetch_product_details(self, record_id, contract_no, sales_person, outsourced_desc_from_html):
detail_payload = {
"module": "Plugins", "pluginName": "DetailProductTable", "action": "getTableData",
"moduleName": "SalesOrder", "record": record_id, "actionId": self.get_timestamp(), "isTool": "1"
}
product_rows = []
try:
res = self.session.post(self.base_url, data=detail_payload, headers=self.http_headers)
try:
detail_json = res.json()
except:
return []
products = []
raw_data = detail_json.get('data')
if isinstance(raw_data, list):
products = raw_data
elif isinstance(raw_data, dict):
if 'rows' in raw_data:
products = raw_data['rows']
else:
for v in raw_data.values():
if isinstance(v, dict) and ('productid' in v or 'productname' in v):
products.append(v)
for prod in products:
manufacturer = self._get_nested_val(prod, 'cf_2128') or self._get_nested_val(prod, 'manufacturer')
prod_desc_text = prod.get('productname', '')
unit = self._get_nested_val(prod, 'usageunit')
qty_raw = self._get_nested_val(prod, 'qty')
discount = self.clean_num(self._get_nested_val(prod, 'discount_percent'))
currency = self._get_nested_val(prod, 'cf_534')
list_price_raw = self._get_nested_val(prod, 'listPrice')
f_qty = self._safe_float(qty_raw)
f_list_price = self._safe_float(list_price_raw)
f_total_val = f_list_price * f_qty
is_outsourced = False
if manufacturer and "外购" in manufacturer:
is_outsourced = True
final_desc = prod_desc_text
if is_outsourced and outsourced_desc_from_html:
final_desc = outsourced_desc_from_html
col_quote_unit = ""
col_quote_total = ""
col_sales_unit = ""
col_sales_total = ""
col_outsourced = ""
if is_outsourced:
col_outsourced = self.clean_num(f_total_val)
else:
col_quote_unit = self.clean_num(f_list_price)
col_quote_total = self.clean_num(f_total_val)
row = {
"合同编号": contract_no,
"销售员": sales_person,
"厂家": manufacturer,
"货号": prod.get('productcode', ''),
"产品描述": final_desc,
"数量": self.clean_num(qty_raw),
"单位": unit,
"币种": currency,
"报价单价": col_quote_unit,
"报价总价": col_quote_total,
"销售单价": col_sales_unit,
"销售总价": col_sales_total,
"折扣率": discount,
"外购": col_outsourced,
"合同币种/美元": "",
"外购转美元": "",
"报价总价美元": "",
"净合同额美元": ""
}
product_rows.append(row)
except Exception:
pass
return product_rows
def _get_nested_val(self, item, key):
if not item or key not in item: return ""
val = item[key]
if isinstance(val, dict) and 'value' in val: return val['value']
return val
def fetch_detail_html(self, record_id):
try:
url = f"{self.base_url}?module=SalesOrder&action=DetailView&record={record_id}"
resp = self.session.get(url, headers=self.http_headers, timeout=10)
if resp.status_code == 200:
tree = html.fromstring(resp.content)
target = tree.xpath("/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]")
if target:
import copy
el = copy.deepcopy(target[0])
for bad in el.xpath('.//script | .//style'): bad.drop_tree()
for br in el.xpath('.//br'): br.tail = "\n" + (br.tail if br.tail else "")
return "\n".join([line.strip() for line in el.text_content().splitlines() if line.strip()])
except Exception:
pass
return ""
def parse_data(self, text, cid):
if not text: return None
data = {
"系统ID": cid,
"合同编号": "", "内贸合同号": "", "外贸合同号": "",
"签署公司": "", "收款情况": "", "签订日期": "", "销售员": "",
"最终用户单位": "", "最终用户信息联系人": "", "最终用户信息电话": "", "最终用户信息邮箱": "",
"最终用户所在地": "",
"买方单位": "", "买方信息联系人": "", "买方信息电话": "", "买方信息邮箱": "",
"厂家": "", "厂家型号": "", "合同标的": "", "数量": "", "单位": "台/套",
"折扣率(%)": "", "合同额": "", "合同总额": "",
"外购付款方式": "", "最晚发货期": "", "已收款": "", "未收款": "", "收款日期": "",
"IS_ASD": False, "_temp_second_code": "",
"OUTSOURCED_DESC_HTML": "",
"product_list": []
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
key_map = {
"收款账户": "签署公司", "收款状态": "收款情况", "签约日期": "签订日期",
"负责人": "销售员", "客户名称": "最终用户单位", "联系人姓名": "最终用户信息联系人",
"合同总额": "合同总额", "最新收款日期": "收款日期", "最晚发货期": "最晚发货期",
"付款比例及期限": "外购付款方式", "地址": "最终用户所在地", "厂家": "厂家",
"外购产品明细": "OUTSOURCED_DESC_HTML"
}
for i, line in enumerate(lines):
if line == "合同订单编号" and i + 1 < len(lines):
parts = lines[i + 1].strip().split()
if len(parts) >= 1: data["合同编号"] = parts[0]
if len(parts) >= 2: data["_temp_second_code"] = parts[1]
elif line in key_map and i + 1 < len(lines):
target = key_map[line]
if not data[target]: data[target] = lines[i + 1]
elif "合同标的" in line and "品名/型号" in line and i + 1 < len(lines):
parts = lines[i + 1].split('/')
if len(parts) >= 1: data["合同标的"] = parts[0]
if len(parts) >= 2: data["厂家型号"] = parts[1]
if len(parts) >= 3: data["数量"] = self.clean_num(parts[2])
if len(parts) >= 5: data["合同额"] = parts[4]
if not data["买方单位"]:
buyer_match = re.search(r"(?:买方|The Buyer)[:]\s*(.*?)(?:\n|$)", text)
if buyer_match and len(buyer_match.group(1)) > 1: data["买方单位"] = buyer_match.group(1).strip()
try:
total = float(data["合同总额"]) if data["合同总额"] else 0
if "已收" in data["收款情况"]:
data["已收款"] = self.clean_num(total);
data["未收款"] = "0"
elif "" in data["收款情况"]:
data["已收款"] = "0";
data["未收款"] = self.clean_num(total)
except:
pass
factory_val = data.get("厂家", "")
if factory_val and "ASD" in factory_val.upper():
data["IS_ASD"] = True
else:
data["IS_ASD"] = False
c_no = data.get("合同编号", "").strip().upper()
sec_code = data.pop("_temp_second_code", "")
if c_no.startswith('W'):
data["外贸合同号"] = sec_code
elif c_no.startswith('N'):
data["内贸合同号"] = sec_code
else:
data["内贸合同号"] = sec_code
if not c_no: return None
return data
def extract_time(self, text):
matches = re.findall(r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})", text)
if matches:
dt_objects = [datetime.strptime(m, "%Y-%m-%d %H:%M:%S") for m in matches]
return max(dt_objects)
return None
def run_task(self, mode, **kwargs):
crmids = []
if mode == 'search':
query = kwargs.get('query')
self.log(f"🔍 正在搜索: {query}")
url = f"{self.base_url}?module=Home&action=UnifiedSearch&selectedmodule=undefined&query_string={urllib.parse.quote(query)}"
resp = self.session.get(url, headers=self.http_headers)
tree = html.fromstring(resp.content)
links = tree.xpath('//a[contains(@onclick, "record=")]')
for link in links:
match = re.search(r"record=(\d+)", link.get('onclick', ''))
if match: crmids.append(match.group(1))
crmids = list(set(crmids))
elif mode == 'date':
s_date = kwargs.get('start');
e_date = kwargs.get('end')
self.log(f"📅 时间筛选: {s_date} ~ {e_date}")
self._process_date_range(s_date, e_date)
return
self.log(f" 共找到 {len(crmids)} 条记录,开始解析详情...")
for i, cid in enumerate(crmids):
if self.stop_flag: break
self._process_single_id(cid)
self.log(f" 进度: {i + 1}/{len(crmids)}")
def _process_date_range(self, s_str, e_str):
try:
t_start = datetime.strptime(s_str, "%Y-%m-%d")
t_end = datetime.strptime(e_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
except:
self.log("❌ 日期格式错误");
return
page = 1
while not self.stop_flag:
ts = int(time.time() * 1000)
url = f"{self.base_url}?module=SalesOrder&action=SalesOrderAjax&file=ListViewData&sorder=DESC&order_by=modifiedtime&start={page}&pagesize=50&actionId={ts}&isFilter=true&search%5Bviewscope%5D=all_to_me&search%5Bviewname%5D=476"
try:
resp = self.session.get(url, headers=self.http_headers)
data = resp.json()
entries = data.get('data', []) or data.get('entries', [])
if not entries: break
page_ids = [x.get('crmid') or x.get('id') for x in entries if isinstance(x, dict)]
self.log(f" 🔎 正在检查第 {page} 页 ({len(page_ids)} 条)...")
valid_cnt = 0
for cid in page_ids:
if self.stop_flag: break
text_html = self.fetch_detail_html(cid)
r_time = self.extract_time(text_html)
if r_time:
if r_time > t_end: continue
if r_time < t_start:
self.log(f" 🛑 遇到旧数据 ({r_time}),停止爬取")
self.stop_flag = True;
break
self._process_data_payload(text_html, cid)
valid_cnt += 1
if valid_cnt > 0: self.log(f" ✅ 第 {page} 页入库 {valid_cnt}")
page += 1
if self.stop_flag: break
except Exception as e:
self.log(f"❌ 错误: {e}");
break
def _process_single_id(self, cid):
text_html = self.fetch_detail_html(cid)
self._process_data_payload(text_html, cid)
def _process_data_payload(self, text_html, cid):
parsed = self.parse_data(text_html, cid)
if parsed:
c_no = parsed.get("合同编号", "")
s_person = parsed.get("销售员", "")
outsourced_html_val = parsed.get("OUTSOURCED_DESC_HTML", "")
detail_rows = self.fetch_product_details(cid, c_no, s_person, outsourced_html_val)
parsed['product_list'] = detail_rows
self.on_data(parsed)
# ================= 3. 界面显示类 =================
class CRMGUI(ttk.Window):
def __init__(self):
super().__init__(themename="cosmo")
self.title("CRM 智能数据助手 测试版")
self.geometry("1400x900")
self.crawler = CRMCrawler(self.log_msg, self.add_record_to_table)
self.is_running = False
self.stored_data = {
'ASD': {'Domestic': [], 'Foreign': [], 'Other': []},
'NON_ASD': {'Domestic': [], 'Foreign': [], 'Other': []}
}
self.treeviews = {}
# 1. 主表字段
self.base_cols = [
"合同编号", "签署公司", "收款情况", "签订日期", "销售员", "厂家",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "买方单位",
"厂家型号", "合同标的", "数量", "合同额", "合同总额",
"最晚发货期", "已收款", "未收款", "收款日期"
]
self.cols_domestic = ["内贸合同号"] + self.base_cols + ["系统ID"]
self.cols_foreign = ["外贸合同号"] + self.base_cols + ["系统ID"]
self.cols_other = self.base_cols + ["系统ID"]
# 2. 明细表字段
self.cols_detail = [
"合同编号", "销售员", "厂家", "货号", "产品描述",
"数量", "单位", "币种",
"报价单价", "报价总价", "销售单价", "销售总价", "折扣率", "外购",
"合同币种/美元", "外购转美元", "报价总价美元", "净合同额美元"
]
self.create_widgets()
def create_widgets(self):
# --- 1. 顶部控制 ---
control_frame = ttk.Frame(self, padding=10, bootstyle="light")
control_frame.pack(fill=X)
login_grp = ttk.Labelframe(control_frame, text="身份验证", padding=10)
login_grp.pack(side=LEFT, padx=5, fill=Y)
ttk.Label(login_grp, text="用户:").pack(side=LEFT)
self.user_ent = ttk.Entry(login_grp, width=10);
self.user_ent.insert(0, "TEST");
self.user_ent.pack(side=LEFT, padx=5)
ttk.Label(login_grp, text="密码:").pack(side=LEFT)
self.pass_ent = ttk.Entry(login_grp, width=10, show="*");
self.pass_ent.insert(0, "***");
self.pass_ent.pack(side=LEFT, padx=5)
mode_grp = ttk.Labelframe(control_frame, text="任务类型", padding=10)
mode_grp.pack(side=LEFT, padx=10, fill=Y, expand=True)
self.nb_mode = ttk.Notebook(mode_grp, bootstyle="primary")
self.nb_mode.pack(fill=BOTH, expand=True)
# === 📅 日期选择部分 ===
f_date = ttk.Frame(self.nb_mode, padding=10)
self.nb_mode.add(f_date, text="📅 按时间范围")
self.ent_start = DateEntry(f_date, dateformat='%Y-%m-%d', width=11, bootstyle="primary")
self.ent_start.pack(side=LEFT, padx=5)
ttk.Label(f_date, text="").pack(side=LEFT)
self.ent_end = DateEntry(f_date, dateformat='%Y-%m-%d', width=11, bootstyle="primary")
self.ent_end.pack(side=LEFT, padx=5)
# =========================
f_search = ttk.Frame(self.nb_mode, padding=10)
self.nb_mode.add(f_search, text="🔍 关键词搜索")
self.ent_query = ttk.Entry(f_search, width=25);
self.ent_query.pack(fill=X)
self.nb_mode.select(f_date)
btn_grp = ttk.Frame(control_frame, padding=10)
btn_grp.pack(side=RIGHT, fill=Y)
self.btn_run = ttk.Button(btn_grp, text="▶ 开始", bootstyle="success", command=self.start_thread, width=10)
self.btn_run.pack(side=TOP, pady=2)
self.btn_stop = ttk.Button(btn_grp, text="⏹ 停止", bootstyle="danger", command=self.stop_task, state=DISABLED,
width=10)
self.btn_stop.pack(side=TOP, pady=2)
# --- 2. 核心展示区 ---
toggle_frame = ttk.Frame(self, padding=(10, 5))
toggle_frame.pack(fill=X)
self.curr_view = tk.StringVar(value="ASD")
self.btn_view_asd = ttk.Button(toggle_frame, text="ASD 产品列表", command=lambda: self.switch_view("ASD"),
width=20)
self.btn_view_asd.pack(side=LEFT, padx=5)
self.btn_view_non = ttk.Button(toggle_frame, text="非 ASD 产品列表",
command=lambda: self.switch_view("NON_ASD"), width=20)
self.btn_view_non.pack(side=LEFT, padx=5)
self.container = ttk.Frame(self)
self.container.pack(fill=BOTH, expand=True, padx=10)
self.frame_asd = ttk.Frame(self.container)
self.frame_non = ttk.Frame(self.container)
self._init_inner_tabs(self.frame_asd, "ASD")
self._init_inner_tabs(self.frame_non, "NON_ASD")
self.switch_view("ASD")
# --- 3. 底部区 ---
bottom_frame = ttk.Frame(self, padding=5)
bottom_frame.pack(fill=X, padx=10, pady=5)
log_frame = ttk.Labelframe(bottom_frame, text="系统日志", padding=5)
log_frame.pack(side=LEFT, fill=BOTH, expand=True)
self.txt_log = ScrolledText(log_frame, height=5);
self.txt_log.text.configure(state=DISABLED);
self.txt_log.pack(fill=BOTH, expand=True)
export_frame = ttk.Frame(bottom_frame, padding=10)
export_frame.pack(side=RIGHT, fill=Y)
ttk.Button(export_frame, text="📂 导出完整 Excel", bootstyle="primary", command=self.export_data).pack(fill=X,
pady=10)
def _init_inner_tabs(self, parent_frame, prefix):
nb = ttk.Notebook(parent_frame, bootstyle="info")
nb.pack(fill=BOTH, expand=True)
# 汇总 Tab
f_dom = ttk.Frame(nb);
nb.add(f_dom, text="📜 内贸汇总");
self._create_treeview(f_dom, self.cols_domestic, f"{prefix}_Domestic")
f_for = ttk.Frame(nb);
nb.add(f_for, text="📜 外贸汇总");
self._create_treeview(f_for, self.cols_foreign, f"{prefix}_Foreign")
f_oth = ttk.Frame(nb);
nb.add(f_oth, text="📜 其他汇总");
self._create_treeview(f_oth, self.cols_other, f"{prefix}_Other")
# 明细 Tab
f_detail_dom = ttk.Frame(nb);
nb.add(f_detail_dom, text="📦 内贸明细清单")
self._create_treeview(f_detail_dom, self.cols_detail, f"{prefix}_Detail_Domestic")
f_detail_for = ttk.Frame(nb);
nb.add(f_detail_for, text="📦 外贸明细清单")
self._create_treeview(f_detail_for, self.cols_detail, f"{prefix}_Detail_Foreign")
def _create_treeview(self, parent, cols, key):
sy = ttk.Scrollbar(parent, orient=VERTICAL)
sx = ttk.Scrollbar(parent, orient=HORIZONTAL)
tv = ttk.Treeview(parent, columns=cols, show="headings", selectmode="browse", yscrollcommand=sy.set,
xscrollcommand=sx.set)
sy.config(command=tv.yview);
sy.pack(side=RIGHT, fill=Y)
sx.config(command=tv.xview);
sx.pack(side=BOTTOM, fill=X)
tv.pack(side=LEFT, fill=BOTH, expand=True)
for c in cols:
tv.heading(c, text=c, anchor="center")
w = 100
if "描述" in c or "标的" in c or "公司" in c or "单位" in c:
w = 200
elif "编号" in c:
w = 120
elif "系统ID" in c:
w = 0
elif "" in c or "" in c or "外购" in c:
w = 80
tv.column(c, width=w, minwidth=50, anchor="center")
tv.bind("<Button-3>", lambda e: self.on_right_click(e, tv, key))
self.treeviews[key] = tv
return tv
def switch_view(self, view_name):
self.curr_view.set(view_name)
if view_name == "ASD":
self.frame_non.pack_forget();
self.frame_asd.pack(fill=BOTH, expand=True)
self.btn_view_asd.configure(bootstyle="primary")
self.btn_view_non.configure(bootstyle="secondary-outline")
else:
self.frame_asd.pack_forget();
self.frame_non.pack(fill=BOTH, expand=True)
self.btn_view_asd.configure(bootstyle="secondary-outline")
self.btn_view_non.configure(bootstyle="primary")
def start_thread(self):
if self.is_running: return
self.stored_data = {'ASD': {'Domestic': [], 'Foreign': [], 'Other': []},
'NON_ASD': {'Domestic': [], 'Foreign': [], 'Other': []}}
for tv in self.treeviews.values():
for item in tv.get_children(): tv.delete(item)
self.is_running = True
self.crawler.stop_flag = False
self.btn_run.config(state=DISABLED);
self.btn_stop.config(state=NORMAL)
t = threading.Thread(target=self._worker);
t.daemon = True;
t.start()
def stop_task(self):
self.crawler.stop_flag = True
self.log_msg("🛑 正在停止...")
def _worker(self):
user = self.user_ent.get();
pwd = self.pass_ent.get()
if not self.crawler.login(user, pwd): self._reset_ui(); return
curr_idx = self.nb_mode.index(self.nb_mode.select())
mode = "date";
kwargs = {}
if curr_idx == 0:
mode = "date"
kwargs = {'start': self.ent_start.entry.get(), 'end': self.ent_end.entry.get()}
elif curr_idx == 1:
mode = "search"
kwargs = {'query': self.ent_query.get()}
try:
self.crawler.run_task(mode, **kwargs);
self.log_msg("🎉 完成!")
except Exception as e:
self.log_msg(f"❌ 错误: {e}")
finally:
self._reset_ui()
def _reset_ui(self):
self.is_running = False
self.after(0, lambda: self.btn_run.config(state=NORMAL))
self.after(0, lambda: self.btn_stop.config(state=DISABLED))
def log_msg(self, msg):
self.after(0, lambda: self._append_log(msg))
def _append_log(self, msg):
self.txt_log.text.configure(state=NORMAL)
self.txt_log.text.insert(END, f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n")
self.txt_log.text.see(END);
self.txt_log.text.configure(state=DISABLED)
def add_record_to_table(self, record):
def _update():
main_key = 'ASD' if record['IS_ASD'] else 'NON_ASD'
c_no = str(record.get("合同编号", "")).strip().upper()
sub_key = "Other"
if c_no.startswith('N'):
sub_key = "Domestic"
elif c_no.startswith('W'):
sub_key = "Foreign"
self.stored_data[main_key][sub_key].append(record)
record_idx = len(self.stored_data[main_key][sub_key]) - 1
tv_key = f"{main_key}_{sub_key}"
tv = self.treeviews.get(tv_key)
if tv:
cols = list(tv['columns'])
vals = [record.get(c, "") for c in cols]
tv.insert("", END, iid=f"main_{main_key}_{sub_key}_{record_idx}", values=vals)
detail_key_suffix = ""
if sub_key == "Domestic":
detail_key_suffix = "Domestic"
elif sub_key == "Foreign":
detail_key_suffix = "Foreign"
if detail_key_suffix:
tv_detail_key = f"{main_key}_Detail_{detail_key_suffix}"
tv_detail = self.treeviews.get(tv_detail_key)
if tv_detail and record.get('product_list'):
detail_cols = list(tv_detail['columns'])
for p_idx, prod_row in enumerate(record['product_list']):
d_vals = [prod_row.get(c, "") for c in detail_cols]
unique_id = f"detail_{main_key}_{sub_key}_{record_idx}_{p_idx}"
tv_detail.insert("", END, iid=unique_id, values=d_vals)
self.after(0, _update)
def on_right_click(self, event, tv, key):
item_id = tv.identify_row(event.y)
if not item_id: return
tv.selection_set(item_id)
if item_id.startswith("main_"):
parts = item_id.split('_')
main_key, sub_key, idx = parts[1], parts[2], int(parts[3])
record = self.stored_data[main_key][sub_key][idx]
crm_id = record.get("系统ID", "")
menu = tk.Menu(self, tearoff=0)
menu.add_command(label="🌐 在浏览器查看", command=lambda: self.open_browser(crm_id))
menu.post(event.x_root, event.y_root)
def open_browser(self, crm_id):
if crm_id:
url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={crm_id}"
webbrowser.open(url)
def export_data(self):
folder = filedialog.askdirectory()
if not folder: return
self.log_msg(f"💾 正在导出...")
ts = time.strftime("%Y%m%d_%H%M%S")
export_cols = [
"合同编号", "签署公司", "收款情况", "签订日期", "销售员", "厂家",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地",
"买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱",
"厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额",
"外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期"
]
detail_cols_order = self.cols_detail
for main_key, prefix in [('ASD', 'ASD_产品表'), ('NON_ASD', 'Non_ASD_产品表')]:
data_map = self.stored_data[main_key]
total = sum(len(v) for v in data_map.values())
if total == 0: continue
detail_domestic_rows = []
detail_foreign_rows = []
for sub_key in data_map:
for rec in data_map[sub_key]:
products = rec.get('product_list', [])
contract_no = rec.get('合同编号', '').upper()
if contract_no.startswith('W'):
detail_foreign_rows.extend(products)
else:
detail_domestic_rows.extend(products)
# ========== 核心修改:按合同编号升序排列 ==========
detail_domestic_rows.sort(key=lambda x: x.get("合同编号", ""))
detail_foreign_rows.sort(key=lambda x: x.get("合同编号", ""))
# ===============================================
path = os.path.join(folder, f"{prefix}_{ts}.xlsx")
try:
with pd.ExcelWriter(path, engine='openpyxl') as writer:
if data_map['Domestic']:
df = pd.DataFrame(data_map['Domestic'])
for c in export_cols:
if c not in df.columns: df[c] = ""
cols = export_cols[:2] + ["内贸合同号"] + export_cols[2:]
df = df.reindex(columns=cols)
# --- 排序 ---
df.sort_values(by="合同编号", ascending=True, inplace=True)
df.to_excel(writer, sheet_name='内贸汇总', index=False)
if data_map['Foreign']:
df = pd.DataFrame(data_map['Foreign'])
for c in export_cols:
if c not in df.columns: df[c] = ""
cols = export_cols[:2] + ["外贸合同号"] + export_cols[2:]
df = df.reindex(columns=cols)
# --- 排序 ---
df.sort_values(by="合同编号", ascending=True, inplace=True)
df.to_excel(writer, sheet_name='外贸汇总', index=False)
if data_map['Other']:
df = pd.DataFrame(data_map['Other'])
for c in export_cols:
if c not in df.columns: df[c] = ""
cols = export_cols[:2] + ["内贸合同号"] + export_cols[2:]
df = df.reindex(columns=cols)
# --- 排序 ---
df.sort_values(by="合同编号", ascending=True, inplace=True)
df.to_excel(writer, sheet_name='其他汇总', index=False)
if detail_domestic_rows:
df_d = pd.DataFrame(detail_domestic_rows)
df_d = df_d.reindex(columns=detail_cols_order)
# (已在前面 List 阶段排序)
df_d.to_excel(writer, sheet_name='内贸明细', index=False)
if detail_foreign_rows:
df_f = pd.DataFrame(detail_foreign_rows)
df_f = df_f.reindex(columns=detail_cols_order)
# (已在前面 List 阶段排序)
df_f.to_excel(writer, sheet_name='外贸明细', index=False)
self.log_msg(f" ✅ 导出成功: {os.path.basename(path)}")
except Exception as e:
self.log_msg(f" ❌ 导出失败: {e}")
Messagebox.show_info("导出完成", "Excel文件已生成")
if __name__ == "__main__":
app = CRMGUI()
app.mainloop()

709
前端页面.py Normal file
View File

@ -0,0 +1,709 @@
import sys
import os
import time
import threading
import re
import urllib.parse
import webbrowser
from datetime import datetime
import tkinter as tk
from tkinter import filedialog, messagebox
import requests
import pandas as pd
from lxml import html
# ================= 1. 导入 UI 库 =================
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from ttkbootstrap.dialogs import Messagebox
# 兼容导入
try:
from ttkbootstrap.widgets import ScrolledText, Tableview, ToastNotification
except ImportError:
from ttkbootstrap.scrolled import ScrolledText
from ttkbootstrap.tableview import Tableview
from ttkbootstrap.toast import ToastNotification
# ================= 2. 后端核心逻辑 (保持功能完整) =================
class CRMCrawler:
def __init__(self, log_callback, data_callback):
self.log = log_callback
self.on_data = data_callback
self.stop_flag = False
self.session = requests.Session()
self.base_url = "http://111.198.24.44:88/index.php"
self.http_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01"
}
def login(self, username, password):
self.log(f"🔑 正在登录... 用户: {username}")
login_payload = {
"module": "Users", "action": "Authenticate", "return_module": "Users",
"return_action": "Login", "user_name": username, "user_password": password, "login_theme": "newskin"
}
try:
self.session.get(self.base_url, headers=self.http_headers)
self.session.post(self.base_url, data=login_payload, headers=self.http_headers)
if 'PHPSESSID' in self.session.cookies:
self.log("✅ 登录成功!")
return True
else:
self.log("❌ 登录失败:请检查账号密码")
return False
except Exception as e:
self.log(f"❌ 网络错误: {str(e)}")
return False
def fetch_detail(self, record_id):
try:
url = f"{self.base_url}?module=SalesOrder&action=DetailView&record={record_id}"
resp = self.session.get(url, headers=self.http_headers, timeout=10)
if resp.status_code == 200:
tree = html.fromstring(resp.content)
target = tree.xpath("/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]")
if target:
import copy
el = copy.deepcopy(target[0])
for bad in el.xpath('.//script | .//style'): bad.drop_tree()
for br in el.xpath('.//br'): br.tail = "\n" + (br.tail if br.tail else "")
return "\n".join([line.strip() for line in el.text_content().splitlines() if line.strip()])
except Exception:
pass
return ""
def parse_data(self, text, cid):
if not text: return None
data = {
"系统ID": cid,
"合同编号": "", "内贸合同号": "", "外贸合同号": "",
"签署公司": "", "收款情况": "", "签订日期": "", "销售员": "",
"最终用户单位": "", "最终用户信息联系人": "", "最终用户信息电话": "", "最终用户信息邮箱": "",
"最终用户所在地": "",
"买方单位": "", "买方信息联系人": "", "买方信息电话": "", "买方信息邮箱": "",
"厂家": "", "厂家型号": "", "合同标的": "", "数量": "", "单位": "台/套",
"折扣率(%)": "", "合同额": "", "合同总额": "",
"外购付款方式": "", "最晚发货期": "", "已收款": "", "未收款": "", "收款日期": "",
"IS_ASD": False, "_temp_second_code": ""
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
key_map = {
"收款账户": "签署公司", "收款状态": "收款情况", "签约日期": "签订日期",
"负责人": "销售员", "客户名称": "最终用户单位", "联系人姓名": "最终用户信息联系人",
"合同总额": "合同总额", "最新收款日期": "收款日期", "最晚发货期": "最晚发货期",
"付款比例及期限": "外购付款方式", "地址": "最终用户所在地", "厂家": "厂家"
}
for i, line in enumerate(lines):
if line == "合同订单编号" and i + 1 < len(lines):
parts = lines[i + 1].strip().split()
if len(parts) >= 1: data["合同编号"] = parts[0]
if len(parts) >= 2: data["_temp_second_code"] = parts[1]
elif line in key_map and i + 1 < len(lines):
target = key_map[line]
if not data[target]: data[target] = lines[i + 1]
elif "合同标的" in line and "品名/型号" in line and i + 1 < len(lines):
parts = lines[i + 1].split('/')
if len(parts) >= 1: data["合同标的"] = parts[0]
if len(parts) >= 2: data["厂家型号"] = parts[1]
if len(parts) >= 3: data["数量"] = parts[2]
if len(parts) >= 5: data["合同额"] = parts[4]
buyer_match = re.search(r"(?:买方|The Buyer)[:]\s*(.*?)(?:\n|$)", text)
if buyer_match and len(buyer_match.group(1)) > 1: data["买方单位"] = buyer_match.group(1).strip()
buyer_ct = re.search(r"联系人Contact person[:]\s*(.*?)(?:\n|$)", text)
if buyer_ct: data["买方信息联系人"] = buyer_ct.group(1).strip()
buyer_tel = re.search(r"电话\(Tel\)[:]\s*(.*?)(?:\s+|$|传真)", text)
if buyer_tel: data["买方信息电话"] = buyer_tel.group(1).strip()
try:
total = float(data["合同总额"]) if data["合同总额"] else 0
if "已收" in data["收款情况"]:
data["已收款"] = str(total);
data["未收款"] = "0"
elif "" in data["收款情况"]:
data["已收款"] = "0";
data["未收款"] = str(total)
except:
pass
factory_val = data.get("厂家", "")
if factory_val and "ASD" in factory_val.upper():
data["IS_ASD"] = True
else:
data["IS_ASD"] = False
c_no = data.get("合同编号", "").strip().upper()
sec_code = data.pop("_temp_second_code", "")
if c_no.startswith('W'):
data["外贸合同号"] = sec_code
elif c_no.startswith('N'):
data["内贸合同号"] = sec_code
else:
data["内贸合同号"] = sec_code
if not c_no: return None
return data
def extract_time(self, text):
matches = re.findall(r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})", text)
if matches:
dt_objects = [datetime.strptime(m, "%Y-%m-%d %H:%M:%S") for m in matches]
return max(dt_objects)
return None
def run_task(self, mode, **kwargs):
crmids = []
if mode == 'search':
query = kwargs.get('query')
self.log(f"🔍 正在搜索: {query}")
url = f"{self.base_url}?module=Home&action=UnifiedSearch&selectedmodule=undefined&query_string={urllib.parse.quote(query)}"
resp = self.session.get(url, headers=self.http_headers)
tree = html.fromstring(resp.content)
links = tree.xpath('//a[contains(@onclick, "record=")]')
for link in links:
onclick = link.get('onclick', '')
match = re.search(r"record=(\d+)", onclick)
if match: crmids.append(match.group(1))
crmids = list(set(crmids))
elif mode == 'full':
self.log("🚀 开始全量爬取 (演示限制前5页)")
crmids = self._get_list_ids(limit_pages=5)
elif mode == 'date':
s_date = kwargs.get('start')
e_date = kwargs.get('end')
self.log(f"📅 时间筛选: {s_date} ~ {e_date}")
self._process_date_range(s_date, e_date)
return
self.log(f" 共找到 {len(crmids)} 条记录,开始解析详情...")
for i, cid in enumerate(crmids):
if self.stop_flag: break
self._process_single_id(cid)
self.log(f" 进度: {i + 1}/{len(crmids)}")
def _get_list_ids(self, limit_pages=3):
ids = []
for p in range(1, limit_pages + 1):
if self.stop_flag: break
try:
ts = int(time.time() * 1000)
url = f"{self.base_url}?module=SalesOrder&action=SalesOrderAjax&file=ListViewData&start={p}&actionId={ts}"
resp = self.session.get(url, headers=self.http_headers)
entries = resp.json().get('data', [])
if not entries: break
for item in entries:
if isinstance(item, dict):
ids.append(item.get('crmid') or item.get('id'))
except:
break
return list(set(ids))
def _process_date_range(self, s_str, e_str):
try:
t_start = datetime.strptime(s_str, "%Y-%m-%d")
t_end = datetime.strptime(e_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
except:
self.log("❌ 日期格式错误")
return
page = 1
while not self.stop_flag:
ts = int(time.time() * 1000)
url = f"{self.base_url}?module=SalesOrder&action=SalesOrderAjax&file=ListViewData&sorder=DESC&order_by=modifiedtime&start={page}&pagesize=50&actionId={ts}&isFilter=true&search%5Bviewscope%5D=all_to_me&search%5Bviewname%5D=476"
try:
resp = self.session.get(url, headers=self.http_headers)
data = resp.json()
entries = data.get('data', []) or data.get('entries', [])
if not entries: break
page_ids = [x.get('crmid') or x.get('id') for x in entries if isinstance(x, dict)]
self.log(f" 🔎 正在检查第 {page} 页 ({len(page_ids)} 条)...")
valid_cnt = 0
for cid in page_ids:
if self.stop_flag: break
text = self.fetch_detail(cid)
r_time = self.extract_time(text)
if r_time:
if r_time > t_end: continue
if r_time < t_start:
self.log(f" 🛑 遇到旧数据 ({r_time}),停止爬取")
self.stop_flag = True
break
parsed = self.parse_data(text, cid)
if parsed:
self.on_data(parsed)
valid_cnt += 1
if valid_cnt > 0: self.log(f" ✅ 第 {page} 页入库 {valid_cnt}")
page += 1
if self.stop_flag: break
except Exception as e:
self.log(f"❌ 错误: {e}")
break
def _process_single_id(self, cid):
text = self.fetch_detail(cid)
parsed = self.parse_data(text, cid)
if parsed:
self.on_data(parsed)
# ================= 3. 界面显示类 (重大升级) =================
class CRMGUI(ttk.Window):
def __init__(self):
super().__init__(themename="cosmo") # 使用 cosmo 主题
self.title("CRM 智能数据助手 Pro Max")
self.geometry("1280x850")
self.crawler = CRMCrawler(self.log_msg, self.add_record_to_table)
self.is_running = False
# 内存数据结构:{'ASD': {'Domestic':[], 'Foreign':[], 'Other':[]}, 'NON_ASD': {...}}
self.stored_data = {
'ASD': {'Domestic': [], 'Foreign': [], 'Other': []},
'NON_ASD': {'Domestic': [], 'Foreign': [], 'Other': []}
}
# 引用字典,方便后续操作
self.treeviews = {}
# 基础字段
self.base_cols = [
"合同编号", "签署公司", "收款情况", "签订日期", "销售员", "厂家",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "买方单位",
"厂家型号", "合同标的", "数量", "合同额", "合同总额",
"最晚发货期", "已收款", "未收款", "收款日期"
]
# 定制表头
self.cols_domestic = ["内贸合同号"] + self.base_cols + ["系统ID"]
self.cols_foreign = ["外贸合同号"] + self.base_cols + ["系统ID"]
self.cols_other = self.base_cols + ["系统ID"]
self.create_widgets()
def create_widgets(self):
# --- 1. 顶部:控制区 ---
control_frame = ttk.Frame(self, padding=10, bootstyle="light")
control_frame.pack(fill=X)
# 登录
login_grp = ttk.Labelframe(control_frame, text="身份验证", padding=10)
login_grp.pack(side=LEFT, padx=5, fill=Y)
ttk.Label(login_grp, text="用户:").pack(side=LEFT)
self.user_ent = ttk.Entry(login_grp, width=10);
self.user_ent.insert(0, "TEST");
self.user_ent.pack(side=LEFT, padx=5)
ttk.Label(login_grp, text="密码:").pack(side=LEFT)
self.pass_ent = ttk.Entry(login_grp, width=10, show="*");
self.pass_ent.insert(0, "***");
self.pass_ent.pack(side=LEFT, padx=5)
# 模式
mode_grp = ttk.Labelframe(control_frame, text="任务类型", padding=10)
mode_grp.pack(side=LEFT, padx=10, fill=Y, expand=True)
self.nb_mode = ttk.Notebook(mode_grp, bootstyle="primary")
self.nb_mode.pack(fill=BOTH, expand=True)
f_date = ttk.Frame(self.nb_mode, padding=10)
self.nb_mode.add(f_date, text="📅 按时间范围")
self.ent_start = ttk.Entry(f_date, width=12);
self.ent_start.insert(0, "2026-01-14");
self.ent_start.pack(side=LEFT, padx=5)
ttk.Label(f_date, text="").pack(side=LEFT)
self.ent_end = ttk.Entry(f_date, width=12);
self.ent_end.insert(0, "2026-01-15");
self.ent_end.pack(side=LEFT, padx=5)
f_search = ttk.Frame(self.nb_mode, padding=10)
self.nb_mode.add(f_search, text="🔍 关键词搜索")
self.ent_query = ttk.Entry(f_search, width=25);
self.ent_query.pack(fill=X)
f_full = ttk.Frame(self.nb_mode, padding=10)
self.nb_mode.add(f_full, text="🚀 全量")
ttk.Label(f_full, text="数据量大,慎用").pack()
self.nb_mode.select(f_date)
# 运行按钮
btn_grp = ttk.Frame(control_frame, padding=10)
btn_grp.pack(side=RIGHT, fill=Y)
self.btn_run = ttk.Button(btn_grp, text="▶ 开始", bootstyle="success", command=self.start_thread, width=10)
self.btn_run.pack(side=TOP, pady=2)
self.btn_stop = ttk.Button(btn_grp, text="⏹ 停止", bootstyle="danger", command=self.stop_task, state=DISABLED,
width=10)
self.btn_stop.pack(side=TOP, pady=2)
# --- 2. 核心展示区 (解决颜色问题) ---
# 使用“切换按钮”代替顶层Tab实现 [选中=蓝色实心] [未选中=白色空心]
toggle_frame = ttk.Frame(self, padding=(10, 5))
toggle_frame.pack(fill=X)
self.curr_view = tk.StringVar(value="ASD") # 默认 ASD
self.btn_view_asd = ttk.Button(toggle_frame, text="ASD 产品列表", command=lambda: self.switch_view("ASD"),
width=20)
self.btn_view_asd.pack(side=LEFT, padx=5)
self.btn_view_non = ttk.Button(toggle_frame, text="非 ASD 产品列表",
command=lambda: self.switch_view("NON_ASD"), width=20)
self.btn_view_non.pack(side=LEFT, padx=5)
# 容器 Frame
self.container = ttk.Frame(self)
self.container.pack(fill=BOTH, expand=True, padx=10)
# 创建两个大 Frame分别装 ASD 和 NON_ASD 的内容
self.frame_asd = ttk.Frame(self.container)
self.frame_non = ttk.Frame(self.container)
# 初始化内部结构 (内贸/外贸/其他 分离)
self._init_inner_tabs(self.frame_asd, "ASD")
self._init_inner_tabs(self.frame_non, "NON_ASD")
# 默认显示 ASD
self.switch_view("ASD")
# --- 3. 底部区 ---
bottom_frame = ttk.Frame(self, padding=5)
bottom_frame.pack(fill=X, padx=10, pady=5)
log_frame = ttk.Labelframe(bottom_frame, text="系统日志", padding=5)
log_frame.pack(side=LEFT, fill=BOTH, expand=True)
self.txt_log = ScrolledText(log_frame, height=5)
self.txt_log.text.configure(state=DISABLED)
self.txt_log.pack(fill=BOTH, expand=True)
export_frame = ttk.Frame(bottom_frame, padding=10)
export_frame.pack(side=RIGHT, fill=Y)
ttk.Button(export_frame, text="📂 导出 Excel", bootstyle="primary", command=self.export_data).pack(fill=X,
pady=10)
def _init_inner_tabs(self, parent_frame, prefix):
"""在父Frame中创建 内贸/外贸/其他 的Tab结构"""
nb = ttk.Notebook(parent_frame, bootstyle="info")
nb.pack(fill=BOTH, expand=True)
# 内贸 Tab
f_dom = ttk.Frame(nb);
nb.add(f_dom, text="内贸 (Domestic)")
self._create_treeview(f_dom, self.cols_domestic, f"{prefix}_Domestic")
# 外贸 Tab
f_for = ttk.Frame(nb);
nb.add(f_for, text="外贸 (Foreign)")
self._create_treeview(f_for, self.cols_foreign, f"{prefix}_Foreign")
# 其他 Tab
f_oth = ttk.Frame(nb);
nb.add(f_oth, text="其他 (Other)")
self._create_treeview(f_oth, self.cols_other, f"{prefix}_Other")
def _create_treeview(self, parent, cols, key):
"""创建表格并注册到 self.treeviews"""
# 滚动条
sy = ttk.Scrollbar(parent, orient=VERTICAL)
sx = ttk.Scrollbar(parent, orient=HORIZONTAL)
tv = ttk.Treeview(parent, columns=cols, show="headings", selectmode="browse",
yscrollcommand=sy.set, xscrollcommand=sx.set)
sy.config(command=tv.yview);
sy.pack(side=RIGHT, fill=Y)
sx.config(command=tv.xview);
sx.pack(side=BOTTOM, fill=X)
tv.pack(side=LEFT, fill=BOTH, expand=True)
for c in cols:
tv.heading(c, text=c)
w = 100
if c in ["合同标的", "最终用户单位", "签署公司", "买方单位"]:
w = 200
elif c == "系统ID":
w = 0
tv.column(c, width=w, minwidth=50)
# 绑定双击
tv.bind("<Double-1>", lambda e: self.on_double_click(e, tv, key))
# 绑定右键菜单
tv.bind("<Button-3>", lambda e: self.on_right_click(e, tv, key))
self.treeviews[key] = tv
return tv
def switch_view(self, view_name):
"""切换 ASD / NON_ASD 视图,并处理按钮颜色反转"""
self.curr_view.set(view_name)
if view_name == "ASD":
self.frame_non.pack_forget()
self.frame_asd.pack(fill=BOTH, expand=True)
# ASD选中ASD实心(primary)NON空心(outline)
self.btn_view_asd.configure(bootstyle="primary")
self.btn_view_non.configure(bootstyle="secondary-outline")
else:
self.frame_asd.pack_forget()
self.frame_non.pack(fill=BOTH, expand=True)
# NON选中ASD空心NON实心
self.btn_view_asd.configure(bootstyle="secondary-outline")
self.btn_view_non.configure(bootstyle="primary")
# --- 逻辑控制 ---
def start_thread(self):
if self.is_running: return
# 清空所有数据和表格
self.stored_data = {
'ASD': {'Domestic': [], 'Foreign': [], 'Other': []},
'NON_ASD': {'Domestic': [], 'Foreign': [], 'Other': []}
}
for tv in self.treeviews.values():
for item in tv.get_children(): tv.delete(item)
self.is_running = True
self.crawler.stop_flag = False
self.btn_run.config(state=DISABLED)
self.btn_stop.config(state=NORMAL)
t = threading.Thread(target=self._worker)
t.daemon = True
t.start()
def stop_task(self):
self.crawler.stop_flag = True
self.log_msg("🛑 正在停止...")
def _worker(self):
user = self.user_ent.get()
pwd = self.pass_ent.get()
if not self.crawler.login(user, pwd):
self._reset_ui();
return
curr_idx = self.nb_mode.index(self.nb_mode.select())
mode = "date"
kwargs = {}
if curr_idx == 0:
mode = "date";
kwargs = {'start': self.ent_start.get(), 'end': self.ent_end.get()}
elif curr_idx == 1:
mode = "search";
kwargs = {'query': self.ent_query.get()}
elif curr_idx == 2:
mode = "full"
try:
self.crawler.run_task(mode, **kwargs)
self.log_msg("🎉 完成!")
except Exception as e:
self.log_msg(f"❌ 错误: {e}")
finally:
self._reset_ui()
def _reset_ui(self):
self.is_running = False
self.after(0, lambda: self.btn_run.config(state=NORMAL))
self.after(0, lambda: self.btn_stop.config(state=DISABLED))
def log_msg(self, msg):
self.after(0, lambda: self._append_log(msg))
def _append_log(self, msg):
self.txt_log.text.configure(state=NORMAL)
self.txt_log.text.insert(END, f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n")
self.txt_log.text.see(END)
self.txt_log.text.configure(state=DISABLED)
# --- 数据分发逻辑 (核心) ---
def add_record_to_table(self, record):
def _update():
# 1. 确定大类
main_key = 'ASD' if record['IS_ASD'] else 'NON_ASD'
# 2. 确定子类 (内贸/外贸/其他)
c_no = str(record.get("合同编号", "")).strip().upper()
sub_key = "Other"
if c_no.startswith('N'):
sub_key = "Domestic"
elif c_no.startswith('W'):
sub_key = "Foreign"
# 3. 存入内存
self.stored_data[main_key][sub_key].append(record)
# 4. 插入对应表格
# 组合 Key 找到对应的 Treeview
tv_key = f"{main_key}_{sub_key}"
tv = self.treeviews.get(tv_key)
if tv:
# 获取该表格对应的列
# 注意columns 是 tuple需要转 list
cols = list(tv['columns'])
vals = [record.get(c, "") for c in cols]
# iid 设为列表索引,方便查找
idx = len(self.stored_data[main_key][sub_key]) - 1
tv.insert("", END, iid=idx, values=vals)
self.after(0, _update)
# --- 编辑与跳转逻辑 ---
def on_right_click(self, event, tv, key):
"""右键菜单"""
item_id = tv.identify_row(event.y)
if not item_id: return
tv.selection_set(item_id)
# 解析 key (例如 "ASD_Domestic")
parts = key.split('_')
main_key = parts[0]
if len(parts) > 2: main_key = f"{parts[0]}_{parts[1]}" # 防止 NON_ASD 这种
sub_key = parts[-1]
record = self.stored_data[main_key][sub_key][int(item_id)]
crm_id = record.get("系统ID", "")
menu = tk.Menu(self, tearoff=0)
menu.add_command(label="🌐 在浏览器查看", command=lambda: self.open_browser(crm_id))
menu.add_command(label="📝 编辑详情", command=lambda: self.show_detail_popup(record, tv, item_id))
menu.post(event.x_root, event.y_root)
def on_double_click(self, event, tv, key):
item_id = tv.selection()
if not item_id: return
idx = int(item_id[0])
parts = key.split('_')
main_key = parts[0]
if len(parts) > 2: main_key = f"{parts[0]}_{parts[1]}"
sub_key = parts[-1]
record = self.stored_data[main_key][sub_key][idx]
self.show_detail_popup(record, tv, item_id)
def open_browser(self, crm_id):
if crm_id:
url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={crm_id}"
webbrowser.open(url)
self.log_msg(f"🌐 跳转: {crm_id}")
def show_detail_popup(self, record, tv, item_id):
top = ttk.Toplevel(self)
top.title(f"订单详情: {record.get('合同编号')}")
top.geometry("600x700")
# 滚动容器
canvas = tk.Canvas(top)
sb = ttk.Scrollbar(top, orient="vertical", command=canvas.yview)
f_scroll = ttk.Frame(canvas)
f_scroll.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=f_scroll, anchor="nw")
canvas.configure(yscrollcommand=sb.set)
canvas.pack(side="left", fill="both", expand=True)
sb.pack(side="right", fill="y")
# 滚轮支持
canvas.bind_all("<MouseWheel>", lambda e: canvas.yview_scroll(int(-1 * (e.delta / 120)), "units"))
# 按钮
crm_id = record.get("系统ID", "")
ttk.Button(f_scroll, text="🌐 浏览器查看原始网页", bootstyle="info-outline",
command=lambda: self.open_browser(crm_id)).grid(row=0, column=0, columnspan=2, pady=10)
# 字段编辑
entries = {}
row = 1
# 显示该表格对应的所有列
cols = list(tv['columns'])
for field in cols:
if field == "系统ID": continue
ttk.Label(f_scroll, text=field + ":").grid(row=row, column=0, sticky=E, padx=5, pady=5)
ent = ttk.Entry(f_scroll, width=40)
ent.insert(0, str(record.get(field, "")))
ent.grid(row=row, column=1, padx=5, pady=5)
entries[field] = ent
row += 1
def save():
for k, e in entries.items(): record[k] = e.get()
new_vals = [record.get(c, "") for c in cols]
tv.item(item_id, values=new_vals)
top.destroy()
ToastNotification("保存成功", "本地数据已更新", 1500).show_toast()
ttk.Button(f_scroll, text="💾 保存修改", bootstyle="success", command=save).grid(row=row, column=0, columnspan=2,
pady=20)
# --- 导出 ---
def export_data(self):
folder = filedialog.askdirectory()
if not folder: return
self.log_msg(f"💾 正在导出...")
ts = time.strftime("%Y%m%d_%H%M%S")
# 完整的字段映射,用于导出时的列顺序(比界面显示的更多更全)
export_cols = [
"合同编号", "签署公司", "收款情况", "签订日期", "销售员", "厂家",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地",
"买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱",
"厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额",
"外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期"
]
for main_key, prefix in [('ASD', 'ASD_产品表'), ('NON_ASD', 'Non_ASD_产品表')]:
data_map = self.stored_data[main_key]
# data_map 结构: {'Domestic': [records], 'Foreign': [], ...}
# 检查是否为空
total = sum(len(v) for v in data_map.values())
if total == 0: continue
path = os.path.join(folder, f"{prefix}_{ts}.xlsx")
try:
with pd.ExcelWriter(path, engine='openpyxl') as writer:
# 内贸 Sheet
if data_map['Domestic']:
df = pd.DataFrame(data_map['Domestic'])
# 插入内贸号
cols = export_cols[:2] + ["内贸合同号"] + export_cols[2:]
df = df.reindex(columns=cols)
df.to_excel(writer, sheet_name='内贸', index=False)
# 外贸 Sheet
if data_map['Foreign']:
df = pd.DataFrame(data_map['Foreign'])
# 插入外贸号
cols = export_cols[:2] + ["外贸合同号"] + export_cols[2:]
df = df.reindex(columns=cols)
df.to_excel(writer, sheet_name='外贸', index=False)
# 其他 Sheet
if data_map['Other']:
df = pd.DataFrame(data_map['Other'])
cols = export_cols[:2] + ["内贸合同号"] + export_cols[2:] # 默认用内贸结构
df = df.reindex(columns=cols)
df.to_excel(writer, sheet_name='其他', index=False)
self.log_msg(f" ✅ 导出成功: {os.path.basename(path)}")
except Exception as e:
self.log_msg(f" ❌ 导出失败: {e}")
Messagebox.show_info("导出完成", "任务结束")
if __name__ == "__main__":
app = CRMGUI()
app.mainloop()

221
商品明细.py Normal file
View File

@ -0,0 +1,221 @@
import requests
import json
import time
import os
import pandas as pd
import re
# ================= 1. 配置区域 =================
base_url = "http://111.198.24.44:88/index.php"
# 登录信息
login_payload = {
"module": "Users",
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "TEST", # <--- 【请修改】这里填用户名
"user_password": "****", # <--- 【请修改】这里填密码
"login_theme": "newskin"
}
# 列表查询参数
list_payload = {
"module": "SalesOrder",
"action": "SalesOrderAjax",
"file": "ListViewData",
"sorder": "",
"start": "1",
"pagesize": "50",
"actionId": "",
"isFilter": "true",
"search[viewscope]": "all_to_me",
"search[viewname]": "324126",
# 筛选条件
"filter[Fields0]": "subject",
"filter[Condition0]": "cts",
"filter[Srch_value0]": "W25A",
"filter[type0]": "text",
"filter[dateCondition1]": "prevfy",
"filter[Fields1]": "duedate",
"filter[Condition1]": "btwa",
"filter[Srch_value1]": "2025-01-01,2025-12-31",
"filter[type1]": "date",
"filter[Fields2]": "subject",
"filter[Condition2]": "dcts",
"filter[Srch_value2]": "取消",
"filter[type2]": "text",
"filter[search_cnt]": "3",
"filter[matchtype]": "all"
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index"
}
# ================= 2. 辅助工具 =================
def get_timestamp():
return int(time.time() * 1000)
def extract_nested_value(item, key):
"""提取 {'value': '...'} 结构的值"""
if not item or key not in item:
return ""
val = item[key]
if isinstance(val, dict) and 'value' in val:
return val['value']
return val
def clean_html(text):
"""清洗HTML标签只留纯文本"""
if not isinstance(text, str): return str(text)
text = re.sub(r'<[^>]+>', '', text)
return text.strip()
# ================= 3. 主程序 =================
def main():
session = requests.Session()
all_rows = []
try:
# --- 1. 登录 ---
print("1. 正在登录...")
session.post(base_url, data=login_payload, headers=headers)
# --- 2. 获取列表 ---
print("2. 获取订单列表...")
list_payload['actionId'] = get_timestamp()
res = session.post(base_url, data=list_payload, headers=headers)
raw_data = res.json()
orders = []
# 列表解析
if isinstance(raw_data, list):
orders = raw_data
elif isinstance(raw_data, dict):
for k in ['entries', 'rows', 'data', 'records']:
if k in raw_data and isinstance(raw_data[k], list):
orders = raw_data[k]
break
if not orders:
for v in raw_data.values():
if isinstance(v, list) and len(v) > 0:
orders = v
break
print(f"✅ 找到 {len(orders)} 个订单,开始处理...")
# --- 3. 逐个提取 ---
for i, order in enumerate(orders):
rid = order.get('crmid') or order.get('salesorderid') or order.get('id')
if not rid: continue
# 列表页基本信息
contract_no = clean_html(order.get('subject', ''))
salesperson = order.get('assigned_user_id', '') or order.get('smownerid', '')
print(f" [{i + 1}/{len(orders)}] 提取: {contract_no}")
# 请求产品详情
detail_payload = {
"module": "Plugins",
"pluginName": "DetailProductTable",
"action": "getTableData",
"moduleName": "SalesOrder",
"record": rid,
"actionId": get_timestamp(),
"isTool": "1"
}
try:
detail_res = session.post(base_url, data=detail_payload, headers=headers)
detail_json = detail_res.json()
# 寻找产品列表 data
products = []
raw_data_content = detail_json.get('data')
if isinstance(raw_data_content, list):
products = raw_data_content
elif isinstance(raw_data_content, dict):
if 'rows' in raw_data_content:
products = raw_data_content['rows']
else:
for v in raw_data_content.values():
if isinstance(v, dict) and ('productid' in v or 'productname' in v):
products.append(v)
if not products:
continue
# --- 核心:严格按你要求的表头填充 ---
for prod in products:
row_data = {
# === 第一部分:确定的字段 ===
"合同编号": contract_no,
"销售员": salesperson,
"厂家": prod.get('cf_2128', ''), # ASD
"货号": prod.get('productcode', ''), # 135636
"产品描述": prod.get('productname', ''), # Full Range...
"数量": extract_nested_value(prod, 'qty'),
"单位": prod.get('usageunit', ''), # 通常字段,如果没有也没关系
"币种": prod.get('cf_534', ''), # USD
"报价单价": extract_nested_value(prod, 'listPrice'), # 4022.20
"报价总价": extract_nested_value(prod, 'subtotal'), # 4022.20
# === 第二部分:按照指示全部留空的字段 ===
"销售单价": "",
"销售总价": "",
"折扣率": "",
"外购": "",
"合同币种/美元": "",
"外购转美元": "",
"报价总价美元": "",
"净合同额美元": ""
}
all_rows.append(row_data)
except Exception as e:
print(f" ❌ 解析错误: {e}")
time.sleep(0.1)
# --- 4. 生成 Excel ---
if all_rows:
# 严格按照你的表头顺序定义
strict_columns = [
'合同编号', '销售员', '厂家', '货号', '产品描述',
'数量', '单位', '币种', '报价单价', '报价总价',
'销售单价', '销售总价', '折扣率', '外购',
'合同币种/美元', '外购转美元', '报价总价美元', '净合同额美元'
]
df = pd.DataFrame(all_rows)
# 确保列存在
for col in strict_columns:
if col not in df.columns:
df[col] = ""
# 强制列顺序
df = df[strict_columns]
filename = "Strict_Format_Export.xlsx"
df.to_excel(filename, index=False)
print(f"\n✅ 表格生成成功!已严格留空指定列,保存至: {os.path.abspath(filename)}")
else:
print("\n❌ 未提取到数据。")
except Exception as e:
print(f"❌ 程序错误: {e}")
if __name__ == "__main__":
main()

326
拿取内容.py Normal file
View File

@ -0,0 +1,326 @@
import requests
import json
import time
import os
from lxml import html
import re
# ================= 1. 配置区域 =================
base_url = "http://111.198.24.44:88/index.php"
# 登录参数
login_payload = {
"module": "Users",
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "TEST", # 在这里填入真实的用户名
"user_password": "***", # 在这里填入真实的密码
"login_theme": "newskin"
}
# 列表查询参数
list_payload = {
"module": "SalesOrder",
"action": "SalesOrderAjax",
"file": "ListViewData",
"sorder": "",
"start": "1",
"pagesize": "100", # 设置抓取数量
"actionId": "", # 稍后自动填充
"isFilter": "true",
"search[viewscope]": "all_to_me",
"search[viewname]": "324126",
# 筛选条件
"filter[Fields0]": "subject",
"filter[Condition0]": "cts",
"filter[Srch_value0]": "W25A",
"filter[type0]": "text",
"filter[dateCondition1]": "prevfy",
"filter[Fields1]": "duedate",
"filter[Condition1]": "btwa",
"filter[Srch_value1]": "2025-01-01,2025-12-31",
"filter[type1]": "date",
"filter[Fields2]": "subject",
"filter[Condition2]": "dcts",
"filter[Srch_value2]": "取消",
"filter[type2]": "text",
"filter[search_cnt]": "3",
"filter[matchtype]": "all"
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index"
}
# ================= 2. 辅助函数 =================
def get_current_action_id():
"""生成当前时间的13位时间戳"""
return int(time.time() * 1000)
def clean_html_tags(text):
"""清洗HTML标签保留文本内容"""
if not text:
return ""
# 移除HTML标签
clean_text = re.sub(r'<[^>]+>', ' ', text)
# 替换HTML实体
clean_text = clean_text.replace('&nbsp;', ' ')
# 合并多个空格和换行符
clean_text = re.sub(r'\s+', ' ', clean_text)
# 去除首尾空格
clean_text = clean_text.strip()
return clean_text
def extract_html_content(html_content, xpath):
"""从HTML中提取指定XPath的内容"""
try:
# 解析HTML
tree = html.fromstring(html_content)
# 尝试提取指定XPath的内容
elements = tree.xpath(xpath)
if elements:
# 获取元素的HTML内容
element_html = html.tostring(elements[0], encoding='unicode', pretty_print=True)
# 清洗HTML标签
cleaned_text = clean_html_tags(element_html)
# 同时保留原始HTML和清洗后的文本
return {
"raw_html": element_html,
"cleaned_text": cleaned_text
}
else:
print(f" ⚠️ 未找到XPath: {xpath}")
return {
"raw_html": "",
"cleaned_text": ""
}
except Exception as e:
print(f" ❌ HTML解析错误: {e}")
return {
"raw_html": "",
"cleaned_text": ""
}
def fetch_html_detail(session, record_id, xpath):
"""获取HTML页面详情并提取指定XPath内容"""
try:
# 构造HTML详情页URL
html_url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={record_id}"
# 获取HTML页面
html_response = session.get(html_url, headers=headers)
if html_response.status_code == 200:
# 提取指定XPath的内容
extracted_content = extract_html_content(html_response.content, xpath)
return extracted_content
else:
print(f" ❌ HTML页面请求失败: HTTP {html_response.status_code}")
return {
"raw_html": "",
"cleaned_text": ""
}
except Exception as e:
print(f" ❌ 获取HTML详情失败: {e}")
return {
"raw_html": "",
"cleaned_text": ""
}
# ================= 3. 主程序逻辑 =================
def main():
session = requests.Session()
# 指定要提取的XPath
target_xpath = "/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]"
try:
# --- 第一步:登录 ---
print("1. 正在登录...")
login_response = session.post(base_url, data=login_payload, headers=headers)
# 检查是否拿到 Cookie
if 'PHPSESSID' not in session.cookies:
print("⚠️ 警告:未检测到 PHPSESSID登录可能失败后续操作可能会出错。")
else:
print(" ✅ 登录成功Cookie 已获取。")
# --- 第二步:获取列表 ---
print("\n2. 正在获取订单列表...")
list_payload['actionId'] = get_current_action_id()
list_resp = session.post(base_url, data=list_payload, headers=headers)
try:
list_data = list_resp.json()
except json.JSONDecodeError:
print("❌ 错误:列表接口返回的不是 JSON 数据。")
print("返回内容预览:", list_resp.text[:200])
return
# === 智能解析列表数据 ===
orders = []
# 策略 A: 如果返回的是直接的列表
if isinstance(list_data, list):
orders = list_data
# 策略 B: 如果返回的是字典
elif isinstance(list_data, dict):
# 1. 尝试查找常见的列表键名
found_key = False
possible_keys = ['entries', 'rows', 'data', 'records', 'list']
for key in possible_keys:
if key in list_data and isinstance(list_data[key], list):
orders = list_data[key]
print(f" [系统] 自动在键名 '{key}' 下找到数据列表。")
found_key = True
break
# 2. 如果没找到键名,尝试智能提取
if not found_key:
print(" [系统] 未找到标准键名,正在尝试智能提取字典对象...")
# 遍历字典的所有值,找出那些看起来像"订单"的字典
for val in list_data.values():
if isinstance(val, dict) and ('crmid' in val or 'salesorderid' in val or 'id' in val):
orders.append(val)
if not orders:
print("❌ 错误:未能提取到任何订单数据。")
# 调试用:保存一下原始返回数据看看结构
with open("debug_list_response.json", "w", encoding="utf-8") as f:
json.dump(list_data, f, ensure_ascii=False, indent=4)
return
print(f" ✅ 成功提取到 {len(orders)} 条有效订单。")
# --- 第三步:循环获取详情 ---
print("\n3. 开始逐个获取订单详情...")
success_count = 0
for index, order in enumerate(orders):
# 防御性检查:确保 order 是字典
if not isinstance(order, dict):
continue
# 1. 获取 ID (尝试多个可能的字段名)
record_id = order.get('crmid') or order.get('salesorderid') or order.get('id')
if not record_id:
print(f" ⚠️ 第 {index + 1} 条数据没有找到 ID跳过。")
continue
print(f"\n [{index + 1}/{len(orders)}] 处理订单 ID: {record_id}")
# 2. 获取JSON详情 (产品详情)
json_detail = None
detail_payload = {
"module": "Plugins",
"pluginName": "DetailProductTable",
"action": "getTableData",
"moduleName": "SalesOrder",
"record": record_id,
"actionId": get_current_action_id(),
"isTool": "1"
}
try:
# 请求JSON详情
detail_resp = session.post(base_url, data=detail_payload, headers=headers)
json_detail = detail_resp.json()
print(f" ✅ JSON详情获取成功")
except Exception as e:
print(f" ❌ JSON详情获取失败: {e}")
json_detail = {"error": str(e)}
# 3. 获取HTML详情并提取指定XPath内容
print(f" 正在获取HTML详情...")
html_content = fetch_html_detail(session, record_id, target_xpath)
# 4. 将详情合并到原数据中
order['json_details'] = json_detail
order['html_details'] = html_content
# 5. 创建一个合并的字段,方便查看
order['combined_data'] = {
"crmid": record_id,
"json_data": json_detail,
"html_extracted_text": html_content.get("cleaned_text", ""),
"html_raw": html_content.get("raw_html", "")
}
success_count += 1
print(f" ✅ 订单 {record_id} 处理完成")
# 礼貌性延时,避免请求过快
time.sleep(0.5)
# --- 第四步:保存结果 ---
print(f"\n4. 正在保存结果...")
# 创建存储目录
output_dir = "crm_data"
os.makedirs(output_dir, exist_ok=True)
# 保存完整的合并数据
full_filename = os.path.join(output_dir, "all_orders_combined.json")
with open(full_filename, 'w', encoding='utf-8') as f:
json.dump(orders, f, ensure_ascii=False, indent=4)
# 同时按crmid分别存储
print(f" 正在按CRM ID分别存储文件...")
for order in orders:
record_id = order.get('crmid') or order.get('salesorderid') or order.get('id')
if record_id:
# 单独保存每个crmid的数据
single_filename = os.path.join(output_dir, f"crm_{record_id}.json")
with open(single_filename, 'w', encoding='utf-8') as f:
json.dump(order, f, ensure_ascii=False, indent=4)
# 保存提取的文本内容为文本文件,便于查看
text_filename = os.path.join(output_dir, "extracted_texts.txt")
with open(text_filename, 'w', encoding='utf-8') as f:
f.write("=== 提取的HTML文本内容 ===\n\n")
for order in orders:
record_id = order.get('crmid') or order.get('salesorderid') or order.get('id')
if record_id:
extracted_text = order.get('html_details', {}).get('cleaned_text', '')
if extracted_text:
f.write(f"\n--- CRM ID: {record_id} ---\n")
f.write(f"{extracted_text}\n")
f.write("-" * 50 + "\n")
print(f"\n✅ 全部完成!")
print(f" 成功处理: {success_count}/{len(orders)} 个订单")
print(f" 文件保存目录: {os.path.abspath(output_dir)}")
print(f" 主要文件:")
print(f" - {full_filename}")
print(f" - {text_filename}")
print(f" - 按CRM ID单独存储的 {success_count} 个JSON文件")
except Exception as e:
print(f"\n❌ 程序发生未捕获的错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

371
搜索获取数据.py Normal file
View File

@ -0,0 +1,371 @@
import requests
import json
import time
import os
from lxml import html
import re
import urllib.parse
import pandas as pd # ★ 引入pandas用于处理多Sheet Excel
# ================= 1. 配置区域 =================
base_url = "http://111.198.24.44:88/index.php"
# 登录参数
login_payload = {
"module": "Users",
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "TEST", # 请填入真实用户名
"user_password": "****", # 请填入真实密码
"login_theme": "newskin"
}
# 全局 HTTP 请求头
http_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index"
}
# ================= 2. 核心辅助函数 =================
def get_current_action_id():
"""生成当前时间的13位时间戳"""
return int(time.time() * 1000)
def clean_text_structure(element):
"""深度清洗函数"""
if element is None:
return ""
import copy
el = copy.deepcopy(element)
for bad_tag in el.xpath('.//script | .//style | .//noscript'):
bad_tag.drop_tree()
for br in el.xpath('.//br'):
br.tail = "\n" + (br.tail if br.tail else "")
text_content = el.text_content()
lines = []
for line in text_content.splitlines():
clean_line = line.replace('\xa0', ' ').strip()
if clean_line:
lines.append(clean_line)
return "\n".join(lines)
def extract_html_content(html_content, xpath):
try:
tree = html.fromstring(html_content)
elements = tree.xpath(xpath)
if elements:
target_element = elements[0]
raw_html = html.tostring(target_element, encoding='unicode', pretty_print=True)
cleaned_text = clean_text_structure(target_element)
return {"raw_html": raw_html, "cleaned_text": cleaned_text}
else:
return {"raw_html": "", "cleaned_text": ""}
except Exception as e:
print(f" ❌ HTML解析错误: {e}")
return {"raw_html": "", "cleaned_text": ""}
def fetch_html_detail(session, record_id, xpath):
try:
url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={record_id}"
resp = session.get(url, headers=http_headers)
if resp.status_code == 200:
return extract_html_content(resp.content, xpath)
return {"raw_html": "", "cleaned_text": ""}
except Exception:
return {"raw_html": "", "cleaned_text": ""}
def extract_crmid_from_search_result(html_content):
crmids = []
try:
tree = html.fromstring(html_content)
links = tree.xpath('//div[@id="collapse-SalesOrder"]//a[contains(@onclick, "record=")]')
if not links:
links = tree.xpath('//a[contains(@onclick, "module=SalesOrder") and contains(@onclick, "record=")]')
for link in links:
onclick = link.get('onclick', '')
match = re.search(r"record=(\d+)", onclick)
if match:
crmid = match.group(1)
if crmid not in crmids:
crmids.append(crmid)
return crmids
except Exception:
return []
def perform_search(session, query_string):
try:
search_url = f"http://111.198.24.44:88/index.php?module=Home&action=UnifiedSearch&selectedmodule=undefined&query_string={query_string}"
resp = session.get(search_url, headers=http_headers)
if resp.status_code == 200:
return extract_crmid_from_search_result(resp.content)
return []
except Exception:
return []
# ================= 3. 核心解析逻辑 =================
def parse_order_text(text):
"""
解析文本,返回通用字典
"""
if not text:
return {}
# 初始化通用字段池 (包含内贸和外贸所有可能用到的字段)
data = {
"合同编号": "", "内贸合同号": "", "外贸合同号": "",
"签署公司": "", "收款情况": "", "签订日期": "", "销售员": "",
"最终用户单位": "", "最终用户信息联系人": "", "最终用户信息电话": "", "最终用户信息邮箱": "",
"最终用户所在地": "",
"买方单位": "", "买方信息联系人": "", "买方信息电话": "", "买方信息邮箱": "",
"厂家型号": "", "合同标的": "", "数量": "", "单位": "台/套",
"折扣率(%)": "", "合同额": "", "合同总额": "",
"外购付款方式": "", "最晚发货期": "",
"已收款": "", "未收款": "", "收款日期": ""
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
# 映射表文本中的Key -> 数据字典中的Key
key_map = {
"收款账户": "签署公司",
"收款状态": "收款情况",
"签约日期": "签订日期",
"负责人": "销售员",
"客户名称": "最终用户单位",
"联系人姓名": "最终用户信息联系人",
"合同总额": "合同总额",
"最新收款日期": "收款日期",
"最晚发货期": "最晚发货期",
"付款比例及期限": "外购付款方式", # 这里对应您的要求
"地址": "最终用户所在地"
}
for i, line in enumerate(lines):
# 1.0 合同订单编号处理
if line == "合同订单编号":
if i + 1 < len(lines):
full_val = lines[i + 1].strip()
parts = full_val.split()
if len(parts) >= 1:
data["合同编号"] = parts[0]
# 判断第二部分是内贸号还是外贸号暂时先都存起来在外面根据W/N区分
if len(parts) >= 2:
# 临时存储,稍后在 main 函数里根据 W/N 决定赋给谁
data["_temp_second_code"] = parts[1]
# 1.1 常规映射
elif line in key_map:
if i + 1 < len(lines):
target_key = key_map[line]
if not data[target_key]:
data[target_key] = lines[i + 1]
# 1.2 产品行解析
elif "合同标的" in line and "品名/型号" in line:
if i + 1 < len(lines):
parts = lines[i + 1].split('/')
# 格式假设: 标的/型号/数量/单价/总价
if len(parts) >= 1: data["合同标的"] = parts[0]
if len(parts) >= 2: data["厂家型号"] = parts[1]
if len(parts) >= 3: data["数量"] = parts[2]
if len(parts) >= 5: data["合同额"] = parts[4]
# 1.3 折扣率 (如果有这个字段的话,通常在产品附近)
# 这里假设如果没有明确字段,暂留空,或者您有特定的关键词提取逻辑
# 2. 正则提取买方信息
buyer_match = re.search(r"(?:买方|The Buyer)[:]\s*(.*?)(?:\n|$)", text)
if buyer_match and len(buyer_match.group(1)) > 1:
data["买方单位"] = buyer_match.group(1).strip()
buyer_contact = re.search(r"联系人Contact person[:]\s*(.*?)(?:\n|$)", text)
if buyer_contact:
data["买方信息联系人"] = buyer_contact.group(1).strip()
buyer_tel = re.search(r"电话\(Tel\)[:]\s*(.*?)(?:\s+|$|传真)", text)
if buyer_tel:
data["买方信息电话"] = buyer_tel.group(1).strip()
# 3. 计算已收/未收
try:
total = float(data["合同总额"]) if data["合同总额"] else 0
status = data["收款情况"]
if "已收" in status:
data["已收款"] = str(total)
data["未收款"] = "0"
elif "" in status:
data["已收款"] = "0"
data["未收款"] = str(total)
except:
pass
return data
# ================= 4. 主程序逻辑 =================
def main():
session = requests.Session()
target_xpath = "/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]"
try:
# --- 1. 登录 ---
print("1. 正在登录...")
session.post(base_url, data=login_payload, headers=http_headers)
if 'PHPSESSID' in session.cookies:
print(" ✅ 登录成功")
else:
print(" ⚠️ 警告: 未检测到Cookie可能登录失败")
# --- 2. 搜索 ---
print("\n2. 请输入搜索内容:")
query_input = input(" 搜索关键词: ").strip()
if not query_input: return
encoded_query = urllib.parse.quote(query_input)
print(f"\n3. 执行搜索...")
crmids = perform_search(session, encoded_query)
if not crmids:
print(" ❌ 未找到相关订单。")
return
print(f" ✅ 找到 {len(crmids)} 个订单 ID: {crmids}")
# --- 3. 抓取与分类 ---
print(f"\n4. 开始获取详情并分类处理...")
# 定义三个列表用于存储不同类型的数据
list_domestic = [] # 内贸 (N开头)
list_foreign = [] # 外贸 (W开头)
list_other = [] # 其他
valid_count = 0
for i, crmid in enumerate(crmids):
print(f" [{i + 1}/{len(crmids)}] 处理 ID: {crmid}")
html_data = fetch_html_detail(session, crmid, target_xpath)
clean_text = html_data['cleaned_text']
# 解析
data = parse_order_text(clean_text)
contract_no = data.get("合同编号", "").strip().upper() # 转大写处理
# ★ 过滤空数据
if not contract_no:
print(f" ⚠️ 跳过: 未找到合同编号")
continue
# ★ 核心分类逻辑
second_code = data.pop("_temp_second_code", "") # 取出临时存的第二段编号
if contract_no.startswith('W'):
# 外贸
data['外贸合同号'] = second_code
list_foreign.append(data)
print(f" 🌍 归类: [外贸] {contract_no}")
elif contract_no.startswith('N'):
# 内贸
data['内贸合同号'] = second_code
list_domestic.append(data)
print(f" 🏠 归类: [内贸] {contract_no}")
else:
# 其他
data['内贸合同号'] = second_code # 默认存这里
list_other.append(data)
print(f" ❓ 归类: [其他] {contract_no}")
valid_count += 1
time.sleep(0.5)
# --- 4. 导出 Excel (多Sheet) ---
print(f"\n5. 正在导出 Excel 文件...")
if valid_count == 0:
print(" ❌ 无有效数据导出")
return
timestamp = time.strftime("%Y%m%d_%H%M%S")
output_dir = f"Result_{timestamp}"
os.makedirs(output_dir, exist_ok=True)
xlsx_filename = os.path.join(output_dir, f"Export_{query_input}_{timestamp}.xlsx")
# 定义列顺序 (表头)
# 内贸表头
cols_domestic = [
"合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", "销售员",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地",
"买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱",
"厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额",
"外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期"
]
# 外贸表头 (参考内贸稍作调整)
cols_foreign = [
"合同编号", "签署公司", "外贸合同号", "收款情况", "签订日期", "销售员",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地",
"买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱",
"厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额",
"外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期"
]
# 使用 Pandas ExcelWriter 写入多个 Sheet
try:
with pd.ExcelWriter(xlsx_filename, engine='openpyxl') as writer:
# 1. 写入内贸 Sheet
if list_domestic:
df_domestic = pd.DataFrame(list_domestic)
# 按照指定列顺序排列,如果数据里没有该列会自动填空
df_domestic = df_domestic.reindex(columns=cols_domestic)
df_domestic.to_excel(writer, sheet_name='内贸', index=False)
# 2. 写入外贸 Sheet
if list_foreign:
df_foreign = pd.DataFrame(list_foreign)
df_foreign = df_foreign.reindex(columns=cols_foreign)
df_foreign.to_excel(writer, sheet_name='外贸', index=False)
# 3. 写入其他 Sheet
if list_other:
df_other = pd.DataFrame(list_other)
# 其他表也暂用内贸的表头格式
df_other = df_other.reindex(columns=cols_domestic)
df_other.to_excel(writer, sheet_name='其他', index=False)
print(f" ✅ 成功导出多Sheet表格: {os.path.abspath(xlsx_filename)}")
print(f" - 内贸: {len(list_domestic)}")
print(f" - 外贸: {len(list_foreign)}")
print(f" - 其他: {len(list_other)}")
except ImportError:
print(" ❌ 错误: 缺少 pandas 或 openpyxl 库。")
print(" 请在终端运行: pip install pandas openpyxl")
except Exception as e:
print(f" ❌ 写入 Excel 失败: {e}")
except Exception as e:
print(f"\n❌ 程序发生错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

469
添加日期选择.py Normal file
View File

@ -0,0 +1,469 @@
import requests
import json
import time
import os
from lxml import html
import re
import urllib.parse
import pandas as pd
from collections import defaultdict, Counter
from datetime import datetime
# ================= 1. 配置区域 (保持不变) =================
base_url = "http://111.198.24.44:88/index.php"
login_payload = {
"module": "Users",
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "TEST", # ★★★ 请填入真实用户名
"user_password": "test", # ★★★ 请填入真实密码
"login_theme": "newskin"
}
http_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01"
}
target_xpath = "/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]"
# ================= 2. 核心辅助函数 =================
def get_current_action_id():
return int(time.time() * 1000)
def clean_text_structure(element):
"""深度清洗函数"""
if element is None: return ""
import copy
el = copy.deepcopy(element)
for bad_tag in el.xpath('.//script | .//style | .//noscript'): bad_tag.drop_tree()
for br in el.xpath('.//br'): br.tail = "\n" + (br.tail if br.tail else "")
text_content = el.text_content()
lines = [line.replace('\xa0', ' ').strip() for line in text_content.splitlines() if
line.replace('\xa0', ' ').strip()]
return "\n".join(lines)
def extract_html_content(html_content, xpath):
try:
tree = html.fromstring(html_content)
elements = tree.xpath(xpath)
if elements:
target_element = elements[0]
cleaned_text = clean_text_structure(target_element)
return cleaned_text
return ""
except Exception:
return ""
def fetch_html_detail(session, record_id):
"""获取HTML页面详情"""
try:
url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={record_id}"
resp = session.get(url, headers=http_headers, timeout=10)
if resp.status_code == 200:
return extract_html_content(resp.content, target_xpath)
return ""
except Exception as e:
print(f" ❌ 获取详情失败 ID {record_id}: {e}")
return ""
# ================= 3. 辅助:从详情文本中提取时间 =================
def extract_time_from_text(text):
"""
从详情页的纯文本中查找类似 2026-01-15 17:19:16 的时间
策略:找到所有符合格式的时间,取最大的那个(通常是修改时间)
"""
if not text:
return None
# 正则匹配 YYYY-MM-DD HH:MM:SS
matches = re.findall(r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})", text)
if not matches:
# 如果没有时分秒,尝试匹配 YYYY-MM-DD
matches = re.findall(r"(\d{4}-\d{2}-\d{2})", text)
if matches:
# 补全为当天的0点防止报错但精度会降低
return datetime.strptime(matches[0], "%Y-%m-%d")
return None
# 找到所有的日期时间对象
dt_objects = []
for m in matches:
try:
dt_objects.append(datetime.strptime(m, "%Y-%m-%d %H:%M:%S"))
except:
pass
if dt_objects:
# 假设详情页里最新的时间通常是修改时间或创建时间
# 我们取所有时间里最新的一个作为参考
return max(dt_objects)
return None
# ================= 4. 核心逻辑:范围爬取 (JSON列表 -> HTML详情 -> 判读时间) =================
def perform_date_range_crawl(session, start_date_str, end_date_str):
"""
针对时间隐藏在HTML详情页的场景优化
1. 请求列表 (盲排序: 让服务器按 modifiedtime 倒序)
2. 必须进入详情页抓取文本
3. 在文本中提取时间
4. 判断是否停止
"""
final_data_list = [] # 直接在这里存解析好的数据,避免重复请求
page_size = 50 # 降低分页大小,因为每页都要深入爬取,太大容易超时或内存高
page = 1
last_page_ids = []
try:
target_start = datetime.strptime(start_date_str, "%Y-%m-%d")
# 结束时间设为当天的 23:59:59
target_end = datetime.strptime(end_date_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
except ValueError:
print(" ❌ 日期格式错误")
return []
print(f" 📅 目标区间: {target_start}{target_end}")
print(" 📡 正在执行 [列表->详情->时间判断] 策略...")
stop_flag = False
while not stop_flag:
action_id = get_current_action_id()
# 依然请求服务器倒序,虽然列表里不显示,但希望服务器能按这个顺序发给我们
current_url = (
f"{base_url}?module=SalesOrder&action=SalesOrderAjax&file=ListViewData&"
f"sorder=DESC&order_by=modifiedtime&" # 关键:盲注排序参数
f"start={page}&pagesize={page_size}&actionId={action_id}&isFilter=true&"
f"search%5Bviewscope%5D=all_to_me&search%5Bviewname%5D=476"
)
try:
resp = session.get(current_url, headers=http_headers)
try:
data = resp.json()
except:
print(f" ❌ 第 {page} 页 JSON 解析失败")
break
entries = data.get('data', [])
if not entries and 'entries' in data: entries = data['entries']
if not entries:
print(" 🏁 列表数据为空,停止。")
break
# 提取本页ID
current_page_ids = []
for item in entries:
if isinstance(item, dict):
cid = item.get('crmid') or item.get('id')
if cid: current_page_ids.append(cid)
# 死循环检测
if page > 1 and current_page_ids == last_page_ids:
print(" 🛑 页面ID重复判定为最后一页停止。")
break
last_page_ids = current_page_ids
print(f" 🔎 第 {page} 页: 预加载 {len(current_page_ids)} 条ID正在逐条进入详情页检查时间...")
# === 逐条进入详情页 ===
page_valid_count = 0
for cid in current_page_ids:
# 1. 获取详情文本
text = fetch_html_detail(session, cid)
# 2. 从详情文本中提取时间
record_time = extract_time_from_text(text)
# 3. 时间判断逻辑
if record_time:
time_str = record_time.strftime("%Y-%m-%d %H:%M:%S")
if record_time > target_end:
# 太新了,跳过,继续看下一条
# print(f" ⏭ ID {cid} 时间 {time_str} > 目标区间 (太新)")
continue
elif record_time < target_start:
# 太旧了!因为是倒序,后面的一定更旧
print(f" 🛑 发现 ID {cid} 时间 {time_str} 早于起始日期,触发熔断停止!")
stop_flag = True
break # 跳出 for 循环
else:
# 命中!
print(f" ✅ 命中: ID {cid} 时间 {time_str}")
# 顺便把数据解析了,不用后面再爬一次
parsed = parse_order_text(text)
parsed["系统ID"] = cid
# 处理内贸外贸号逻辑
c_no = parsed.get("合同编号", "").strip().upper()
sc = parsed.pop("_temp_second_code", "")
if c_no.startswith('W'):
parsed["外贸合同号"] = sc
else:
parsed["内贸合同号"] = sc
final_data_list.append(parsed)
page_valid_count += 1
else:
# 如果详情页里完全找不到时间(可能是格式不对,或者是空页面)
# 保守策略:如果还没触发停止,就先收录(或者你可以选择跳过)
# 这里选择跳过并打印警告
# print(f" ⚠️ ID {cid} 未找到时间,已跳过")
pass
print(f" 📊 第 {page} 页处理完毕。有效入库: {page_valid_count}")
page += 1
if stop_flag: break
# time.sleep(0.5) # 因为fetch_html_detail里通常有耗时这里不需要额外sleep太久
except Exception as e:
print(f" ❌ 异常: {e}")
break
return final_data_list
# ================= 5. 其他函数 (保持不变) =================
# 注意parse_order_text, check_and_print_conflicts, export_excel_files
# 这些函数完全不用动,直接用之前的即可。
# ... (为节省篇幅,此处省略,请确保它们存在于你的代码中) ...
def parse_order_text(text):
"""(保持你原有的解析逻辑)"""
if not text: return {}
data = {
"合同编号": "", "内贸合同号": "", "外贸合同号": "",
"签署公司": "", "收款情况": "", "签订日期": "", "销售员": "",
"最终用户单位": "", "最终用户信息联系人": "", "最终用户信息电话": "", "最终用户信息邮箱": "",
"最终用户所在地": "",
"买方单位": "", "买方信息联系人": "", "买方信息电话": "", "买方信息邮箱": "",
"厂家": "", "厂家型号": "", "合同标的": "", "数量": "", "单位": "台/套",
"折扣率(%)": "", "合同额": "", "合同总额": "",
"外购付款方式": "", "最晚发货期": "", "已收款": "", "未收款": "", "收款日期": "",
"IS_ASD": False, "_temp_second_code": ""
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
key_map = {
"收款账户": "签署公司", "收款状态": "收款情况", "签约日期": "签订日期",
"负责人": "销售员", "客户名称": "最终用户单位", "联系人姓名": "最终用户信息联系人",
"合同总额": "合同总额", "最新收款日期": "收款日期", "最晚发货期": "最晚发货期",
"付款比例及期限": "外购付款方式", "地址": "最终用户所在地", "厂家": "厂家"
}
for i, line in enumerate(lines):
if line == "合同订单编号" and i + 1 < len(lines):
parts = lines[i + 1].strip().split()
if len(parts) >= 1: data["合同编号"] = parts[0]
if len(parts) >= 2: data["_temp_second_code"] = parts[1]
elif line in key_map and i + 1 < len(lines):
target = key_map[line]
if not data[target]: data[target] = lines[i + 1]
elif "合同标的" in line and "品名/型号" in line and i + 1 < len(lines):
parts = lines[i + 1].split('/')
if len(parts) >= 1: data["合同标的"] = parts[0]
if len(parts) >= 2: data["厂家型号"] = parts[1]
if len(parts) >= 3: data["数量"] = parts[2]
if len(parts) >= 5: data["合同额"] = parts[4]
buyer_match = re.search(r"(?:买方|The Buyer)[:]\s*(.*?)(?:\n|$)", text)
if buyer_match and len(buyer_match.group(1)) > 1: data["买方单位"] = buyer_match.group(1).strip()
buyer_ct = re.search(r"联系人Contact person[:]\s*(.*?)(?:\n|$)", text)
if buyer_ct: data["买方信息联系人"] = buyer_ct.group(1).strip()
buyer_tel = re.search(r"电话\(Tel\)[:]\s*(.*?)(?:\s+|$|传真)", text)
if buyer_tel: data["买方信息电话"] = buyer_tel.group(1).strip()
try:
total = float(data["合同总额"]) if data["合同总额"] else 0
if "已收" in data["收款情况"]:
data["已收款"] = str(total);
data["未收款"] = "0"
elif "" in data["收款情况"]:
data["已收款"] = "0";
data["未收款"] = str(total)
except:
pass
factory_val = data.get("厂家", "")
if factory_val and "ASD" in factory_val.upper():
data["IS_ASD"] = True
else:
data["IS_ASD"] = False
return data
def check_and_print_conflicts(all_records):
# (保持不变,省略)
pass
def export_excel_files(all_records, output_dir, file_prefix):
# (保持不变,省略)
cols_common = [
"合同编号", "签署公司", "收款情况", "签订日期", "销售员", "厂家",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地",
"买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱",
"厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额",
"外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期"
]
cols_domestic = cols_common[:2] + ["内贸合同号"] + cols_common[2:]
cols_foreign = cols_common[:2] + ["外贸合同号"] + cols_common[2:]
datasets = {"ASD": {"Domestic": [], "Foreign": [], "Other": []},
"Non_ASD": {"Domestic": [], "Foreign": [], "Other": []}}
for record in all_records:
main_key = "ASD" if record["IS_ASD"] else "Non_ASD"
c_no = record.get("合同编号", "").strip().upper()
if c_no.startswith('N'):
datasets[main_key]["Domestic"].append(record)
elif c_no.startswith('W'):
datasets[main_key]["Foreign"].append(record)
else:
datasets[main_key]["Other"].append(record)
for type_name in ["ASD", "Non_ASD"]:
filename = f"{type_name}_产品表_{file_prefix}.xlsx"
filepath = os.path.join(output_dir, filename)
subset = datasets[type_name]
df_dom = pd.DataFrame(subset["Domestic"])
df_for = pd.DataFrame(subset["Foreign"])
df_oth = pd.DataFrame(subset["Other"])
if not df_dom.empty and "合同编号" in df_dom.columns: df_dom.sort_values(by="合同编号", ascending=True,
inplace=True)
if not df_for.empty and "合同编号" in df_for.columns: df_for.sort_values(by="合同编号", ascending=True,
inplace=True)
has_data = False
try:
with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
if not df_dom.empty:
df_dom.reindex(columns=cols_domestic).to_excel(writer, sheet_name='内贸', index=False);
has_data = True
if not df_for.empty:
df_for.reindex(columns=cols_foreign).to_excel(writer, sheet_name='外贸', index=False);
has_data = True
if not df_oth.empty:
df_oth.reindex(columns=cols_domestic).to_excel(writer, sheet_name='其他', index=False);
has_data = True
if has_data: print(f" 💾 已生成: {filename}")
except Exception as e:
print(f" ❌ 写入 {filename} 失败: {e}")
# ================= 6. 主程序 =================
def main():
session = requests.Session()
print("================ CRM 爬取助手 (深度时间过滤版) ================")
# 登录流程 (保持不变)
print("1. 正在自动登录...")
session.get(base_url, headers=http_headers)
session.post(base_url, data=login_payload, headers=http_headers)
if 'PHPSESSID' not in session.cookies:
print(" ❌ 登录失败")
return
print(" ✅ 登录成功")
print("\n请选择运行模式:")
print(" [1] 搜索模式")
print(" [2] 全量爬取 (慢)")
print(" [3] 范围爬取 (推荐! 自动进入详情页检查时间)")
mode = input("请输入 (1/2/3): ").strip()
final_data = [] # 存储最终结果
file_tag = ""
if mode == '1':
# 搜索模式逻辑 (保持不变, 需要稍微调整结构以复用解析)
query = input("\n请输入搜索关键词: ").strip()
if query:
crmids = perform_search(session, urllib.parse.quote(query)) # 需确保perform_search函数存在
# 搜索模式比较简单,直接循环抓取即可
for cid in crmids:
text = fetch_html_detail(session, cid)
parsed = parse_order_text(text)
if parsed.get("合同编号"):
parsed["系统ID"] = cid
c_no = parsed["合同编号"].upper()
sc = parsed.pop("_temp_second_code", "")
if c_no.startswith('W'):
parsed["外贸合同号"] = sc
else:
parsed["内贸合同号"] = sc
final_data.append(parsed)
file_tag = f"搜索_{query}"
elif mode == '2':
# 全量模式逻辑 (保持不变)
pass # 这里你可以调用之前的 perform_full_crawl 然后再循环抓详情,或者直接用下面的结构
elif mode == '3':
print("\n 📅 请输入时间范围 (格式: YYYY-MM-DD)")
s_date = input(" 开始日期 (如 2025-12-01): ").strip()
e_date = input(" 结束日期 (如 2026-01-15): ").strip()
if s_date and e_date:
# ★ 直接调用新的函数,它会返回解析好的数据列表
final_data = perform_date_range_crawl(session, s_date, e_date)
file_tag = f"范围_{s_date}_{e_date}"
if not final_data:
print(" ❌ 未获取到数据或已退出")
return
print(f"\n ✅ 抓取完成。有效记录: {len(final_data)}")
# 导出 (保持不变)
# check_and_print_conflicts(final_data) # 如果你需要冲突检查
ts = time.strftime("%Y%m%d_%H%M%S")
out_dir = f"Result_{ts}"
os.makedirs(out_dir, exist_ok=True)
export_excel_files(final_data, out_dir, f"{file_tag}_{ts}")
print(f"\n🎉 结果已保存: {out_dir}")
# 需要把之前定义的 perform_search 和 perform_full_crawl 补全在文件里才能运行模式1和2
# 如果只跑模式3上面的代码已经足够
def perform_search(session, query_string):
# (保持原有搜索代码)
try:
search_url = f"http://111.198.24.44:88/index.php?module=Home&action=UnifiedSearch&selectedmodule=undefined&query_string={query_string}"
resp = session.get(search_url, headers=http_headers)
if resp.status_code == 200:
tree = html.fromstring(resp.content)
crmids = []
links = tree.xpath('//div[@id="collapse-SalesOrder"]//a[contains(@onclick, "record=")]')
if not links:
links = tree.xpath('//a[contains(@onclick, "module=SalesOrder") and contains(@onclick, "record=")]')
for link in links:
onclick = link.get('onclick', '')
match = re.search(r"record=(\d+)", onclick)
if match:
if match.group(1) not in crmids: crmids.append(match.group(1))
return crmids
return []
except:
return []
if __name__ == "__main__":
main()

View File

@ -0,0 +1,465 @@
import requests
import json
import time
import os
from lxml import html
import re
import urllib.parse
import pandas as pd
import math
from collections import defaultdict, Counter
# ================= 1. 配置区域 =================
base_url = "http://111.198.24.44:88/index.php"
# 登录参数
login_payload = {
"module": "Users",
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "TEST", # ★★★ 请填入真实用户名
"user_password": "***", # ★★★ 请填入真实密码
"login_theme": "newskin"
}
# 请求头 (包含 Ajax 标识)
http_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index",
"X-Requested-With": "XMLHttpRequest", # 关键
"Accept": "application/json, text/javascript, */*; q=0.01"
}
# 详情页 XPath
target_xpath = "/html/body/div[1]/div/div[2]/div[2]/form/div[1]/div[1]/div[2]"
# ================= 2. 核心辅助函数 =================
def get_current_action_id():
return int(time.time() * 1000)
def clean_text_structure(element):
"""深度清洗函数"""
if element is None: return ""
import copy
el = copy.deepcopy(element)
for bad_tag in el.xpath('.//script | .//style | .//noscript'): bad_tag.drop_tree()
for br in el.xpath('.//br'): br.tail = "\n" + (br.tail if br.tail else "")
text_content = el.text_content()
lines = [line.replace('\xa0', ' ').strip() for line in text_content.splitlines() if
line.replace('\xa0', ' ').strip()]
return "\n".join(lines)
def extract_html_content(html_content, xpath):
try:
tree = html.fromstring(html_content)
elements = tree.xpath(xpath)
if elements:
target_element = elements[0]
cleaned_text = clean_text_structure(target_element)
return cleaned_text
return ""
except Exception:
return ""
def fetch_html_detail(session, record_id):
"""获取HTML页面详情"""
try:
url = f"http://111.198.24.44:88/index.php?module=SalesOrder&action=DetailView&record={record_id}"
resp = session.get(url, headers=http_headers, timeout=10)
if resp.status_code == 200:
return extract_html_content(resp.content, target_xpath)
return ""
except Exception as e:
print(f" ❌ 获取详情失败 ID {record_id}: {e}")
return ""
# ================= 3. ID 获取逻辑 (搜索 vs 全量) =================
def perform_search(session, query_string):
"""模式1搜索"""
try:
search_url = f"http://111.198.24.44:88/index.php?module=Home&action=UnifiedSearch&selectedmodule=undefined&query_string={query_string}"
resp = session.get(search_url, headers=http_headers)
if resp.status_code == 200:
tree = html.fromstring(resp.content)
crmids = []
links = tree.xpath('//div[@id="collapse-SalesOrder"]//a[contains(@onclick, "record=")]')
if not links:
links = tree.xpath('//a[contains(@onclick, "module=SalesOrder") and contains(@onclick, "record=")]')
for link in links:
onclick = link.get('onclick', '')
match = re.search(r"record=(\d+)", onclick)
if match:
if match.group(1) not in crmids: crmids.append(match.group(1))
return crmids
return []
except Exception:
return []
def perform_full_crawl(session):
"""模式2全量爬取 (修复版:自动翻页 + 强制参数顺序 + 防止最后一页死循环)"""
all_crmids = []
page_size = 100
page = 1
# ★★★ 新增记录上一页的ID列表用于检测死循环 ★★★
last_page_ids = []
print(" 📡 开始全量爬取 (忽略 recTotal检测到页面内容重复时停止)...")
# 手动构建 URL确保参数顺序和浏览器完全一致包含 viewname=476
def build_url(page_num):
action_id = get_current_action_id()
query_str = (
f"module=SalesOrder&"
f"action=SalesOrderAjax&"
f"file=ListViewData&"
f"sorder=&"
f"start={page_num}&"
f"order_by=&"
f"pagesize={page_size}&"
f"actionId={action_id}&"
f"isFilter=true&"
f"search%5Bviewscope%5D=all_to_me&"
f"search%5Bviewname%5D=476" # 关键参数
)
return f"{base_url}?{query_str}"
while True:
current_url = build_url(page)
try:
resp = session.get(current_url, headers=http_headers)
# 1. 尝试解析 JSON
try:
data = resp.json()
except json.JSONDecodeError:
print(f" ❌ 第 {page} 页解析失败:服务器未返回 JSON (可能是Session失效)")
break
# 2. 提取数据列表
entries = data.get('data', [])
if not entries and 'entries' in data:
entries = data['entries']
# 3. 检查是否有数据
if not entries or len(entries) == 0:
print(f" 🏁 第 {page} 页为空 (数据抓取结束)。")
break # 退出循环
# 4. 提取本页 ID
current_page_ids = []
if isinstance(entries, list):
for item in entries:
if isinstance(item, dict):
if 'crmid' in item:
current_page_ids.append(item['crmid'])
elif 'id' in item:
current_page_ids.append(item['id'])
elif isinstance(entries, dict):
current_page_ids = list(entries.keys())
count = len(current_page_ids)
# ★★★ 5. 核心修复:死循环检测 ★★★
# 如果当前页的数据 ID 序列与上一页完全一致(且不是第一页),说明服务器在重复返回最后一页
if page > 1 and current_page_ids == last_page_ids:
print(f" 🛑 第 {page} 页数据与第 {page - 1} 页完全一致,判定为最后一页重复,爬取结束!")
break
# 更新上一页记录
last_page_ids = current_page_ids
# 6. 保存数据
all_crmids.extend(current_page_ids)
print(f" ✅ 第 {page} 页获取成功 (本页 {count} 条)")
# 7. 翻下一页
page += 1
time.sleep(0.5) # 稍微休息
except Exception as e:
print(f" ❌ 请求第 {page} 页发生异常: {e}")
break
# 最终去重 (防止翻页过程中数据插入导致的轻微重复)
all_crmids = list(set(all_crmids))
print(f" 🎉 ID列表获取完毕去重后共: {len(all_crmids)}")
return all_crmids
# ================= 4. 文本解析逻辑 =================
def parse_order_text(text):
"""解析文本为字典"""
if not text: return {}
data = {
"合同编号": "", "内贸合同号": "", "外贸合同号": "",
"签署公司": "", "收款情况": "", "签订日期": "", "销售员": "",
"最终用户单位": "", "最终用户信息联系人": "", "最终用户信息电话": "", "最终用户信息邮箱": "",
"最终用户所在地": "",
"买方单位": "", "买方信息联系人": "", "买方信息电话": "", "买方信息邮箱": "",
"厂家": "", "厂家型号": "", "合同标的": "", "数量": "", "单位": "台/套",
"折扣率(%)": "", "合同额": "", "合同总额": "",
"外购付款方式": "", "最晚发货期": "", "已收款": "", "未收款": "", "收款日期": "",
"IS_ASD": False,
"_temp_second_code": ""
}
lines = [line.strip() for line in text.split('\n') if line.strip()]
key_map = {
"收款账户": "签署公司", "收款状态": "收款情况", "签约日期": "签订日期",
"负责人": "销售员", "客户名称": "最终用户单位", "联系人姓名": "最终用户信息联系人",
"合同总额": "合同总额", "最新收款日期": "收款日期", "最晚发货期": "最晚发货期",
"付款比例及期限": "外购付款方式", "地址": "最终用户所在地", "厂家": "厂家"
}
for i, line in enumerate(lines):
if line == "合同订单编号" and i + 1 < len(lines):
parts = lines[i + 1].strip().split()
if len(parts) >= 1: data["合同编号"] = parts[0]
if len(parts) >= 2: data["_temp_second_code"] = parts[1]
elif line in key_map and i + 1 < len(lines):
target = key_map[line]
if not data[target]: data[target] = lines[i + 1]
elif "合同标的" in line and "品名/型号" in line and i + 1 < len(lines):
parts = lines[i + 1].split('/')
if len(parts) >= 1: data["合同标的"] = parts[0]
if len(parts) >= 2: data["厂家型号"] = parts[1]
if len(parts) >= 3: data["数量"] = parts[2]
if len(parts) >= 5: data["合同额"] = parts[4]
buyer_match = re.search(r"(?:买方|The Buyer)[:]\s*(.*?)(?:\n|$)", text)
if buyer_match and len(buyer_match.group(1)) > 1: data["买方单位"] = buyer_match.group(1).strip()
buyer_ct = re.search(r"联系人Contact person[:]\s*(.*?)(?:\n|$)", text)
if buyer_ct: data["买方信息联系人"] = buyer_ct.group(1).strip()
buyer_tel = re.search(r"电话\(Tel\)[:]\s*(.*?)(?:\s+|$|传真)", text)
if buyer_tel: data["买方信息电话"] = buyer_tel.group(1).strip()
try:
total = float(data["合同总额"]) if data["合同总额"] else 0
if "已收" in data["收款情况"]:
data["已收款"] = str(total); data["未收款"] = "0"
elif "" in data["收款情况"]:
data["已收款"] = "0"; data["未收款"] = str(total)
except:
pass
factory_val = data.get("厂家", "")
if factory_val and "ASD" in factory_val.upper():
data["IS_ASD"] = True
else:
data["IS_ASD"] = False
return data
# ================= 5. 逻辑冲突检查函数 =================
def check_and_print_conflicts(all_records):
"""
1. 检查合同编号本身是否有重复 (PrimaryKey Conflict)
2. 检查内贸/外贸合同号是否对应了多个不同的合同编号 (Logical Conflict)
"""
print("\n" + "=" * 25 + " 数据异常检测报告 " + "=" * 25)
# 1. 检查合同编号自身的重复
contract_ids = [r.get("合同编号", "").strip() for r in all_records if r.get("合同编号")]
id_counts = Counter(contract_ids)
dup_ids = {k: v for k, v in id_counts.items() if v > 1}
print(f"\n📋 [检查1] 合同编号唯一性检查:")
if not dup_ids:
print(" ✅ 通过:没有发现完全重复的合同编号。")
else:
print(f" ❌ 警告:发现 {len(dup_ids)} 个重复的合同编号 (可能存在完全重复的记录):")
for k, v in dup_ids.items():
print(f" 🔸 {k} (出现了 {v} 次)")
# 2. 检查 内贸/外贸号 的逻辑冲突
def detect_mapping_conflict(field_name):
mapping = defaultdict(set)
for record in all_records:
target_val = record.get(field_name, "").strip()
main_id = record.get("合同编号", "").strip()
if target_val and main_id:
mapping[target_val].add(main_id)
conflicts = {k: v for k, v in mapping.items() if len(v) > 1}
print(f"\n📋 [检查2] {field_name} 冲突检查 (是否存在多个合同共用一个号):")
if not conflicts:
print(f" ✅ 通过:每个{field_name}都只对应唯一的合同编号。")
else:
print(f" ❌ 严重警告:发现 {len(conflicts)} 个冲突!以下号码被多个合同共用:")
for val, ids in conflicts.items():
print(f" 🔴 号码 [{val}] 同时出现在以下合同中: {list(ids)}")
detect_mapping_conflict("内贸合同号")
detect_mapping_conflict("外贸合同号")
print("\n" + "=" * 66 + "\n")
# ================= 6. 导出 Excel =================
def export_excel_files(all_records, output_dir, file_prefix):
cols_common = [
"合同编号", "签署公司", "收款情况", "签订日期", "销售员", "厂家",
"最终用户单位", "最终用户信息联系人", "最终用户信息电话", "最终用户信息邮箱", "最终用户所在地",
"买方单位", "买方信息联系人", "买方信息电话", "买方信息邮箱",
"厂家型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额",
"外购付款方式", "最晚发货期", "已收款", "未收款", "收款日期"
]
cols_domestic = cols_common[:2] + ["内贸合同号"] + cols_common[2:]
cols_foreign = cols_common[:2] + ["外贸合同号"] + cols_common[2:]
datasets = {
"ASD": {"Domestic": [], "Foreign": [], "Other": []},
"Non_ASD": {"Domestic": [], "Foreign": [], "Other": []}
}
for record in all_records:
main_key = "ASD" if record["IS_ASD"] else "Non_ASD"
c_no = record.get("合同编号", "").strip().upper()
if c_no.startswith('N'):
datasets[main_key]["Domestic"].append(record)
elif c_no.startswith('W'):
datasets[main_key]["Foreign"].append(record)
else:
datasets[main_key]["Other"].append(record)
for type_name in ["ASD", "Non_ASD"]:
filename = f"{type_name}_产品表_{file_prefix}.xlsx"
filepath = os.path.join(output_dir, filename)
subset = datasets[type_name]
df_dom = pd.DataFrame(subset["Domestic"])
df_for = pd.DataFrame(subset["Foreign"])
df_oth = pd.DataFrame(subset["Other"])
# 排序
if not df_dom.empty and "合同编号" in df_dom.columns:
df_dom.sort_values(by="合同编号", ascending=True, inplace=True)
if not df_for.empty and "合同编号" in df_for.columns:
df_for.sort_values(by="合同编号", ascending=True, inplace=True)
has_data = False
try:
with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
if not df_dom.empty:
df_dom.reindex(columns=cols_domestic).to_excel(writer, sheet_name='内贸', index=False)
has_data = True
if not df_for.empty:
df_for.reindex(columns=cols_foreign).to_excel(writer, sheet_name='外贸', index=False)
has_data = True
if not df_oth.empty:
df_oth.reindex(columns=cols_domestic).to_excel(writer, sheet_name='其他', index=False)
has_data = True
if has_data:
print(f" 💾 已生成: {filename}")
except Exception as e:
print(f" ❌ 写入 {filename} 失败: {e}")
# ================= 7. 主程序 =================
def main():
session = requests.Session()
print("================ CRM 爬取助手 (智能防循环版) ================")
print("1. 正在尝试自动登录 CRM...")
# 先访问首页获取基础Cookie
session.get(base_url, headers=http_headers)
session.post(base_url, data=login_payload, headers=http_headers)
if 'PHPSESSID' not in session.cookies:
print(" ❌ 登录失败: 未检测到 Cookie请检查账号密码。")
return
print(" ✅ 登录成功")
print("\n请选择运行模式:")
print(" [1] 搜索模式 (输入关键词)")
print(" [2] 全量爬取 (自动翻页爬取所有)")
mode = input("请输入数字 (1/2): ").strip()
crmids = []
file_tag = ""
if mode == '1':
query = input("\n请输入搜索关键词: ").strip()
if not query: return
print(f" 🔍 正在搜索: {query}")
crmids = perform_search(session, urllib.parse.quote(query))
file_tag = f"搜索_{query}"
elif mode == '2':
print("\n 🚀 开始全量爬取流程...")
crmids = perform_full_crawl(session)
file_tag = "全量爬取"
else:
print(" ❌ 输入无效")
return
if not crmids:
print(" ❌ 未获取到 CRM ID")
return
print(f"\n3. 开始获取 {len(crmids)} 条数据详情...")
all_parsed_data = []
success_count = 0
for i, cid in enumerate(crmids):
# 打印进度条
if i % 10 == 0:
print(f" ⏳ 进度: {i}/{len(crmids)} ...")
text = fetch_html_detail(session, cid)
data = parse_order_text(text)
contract_no = data.get("合同编号", "").strip().upper()
if not contract_no:
continue
data["系统ID"] = cid
# 分配 内贸/外贸号
second_code = data.pop("_temp_second_code", "")
if contract_no.startswith('W'):
data["外贸合同号"] = second_code
elif contract_no.startswith('N'):
data["内贸合同号"] = second_code
else:
data["内贸合同号"] = second_code
all_parsed_data.append(data)
success_count += 1
time.sleep(0.2) # 礼貌延时
print(f"\n ✅ 详情抓取完成。有效记录: {success_count}")
# 4. 逻辑冲突检查
check_and_print_conflicts(all_parsed_data)
# 5. 导出
print("5. 正在导出 Excel...")
ts = time.strftime("%Y%m%d_%H%M%S")
out_dir = f"Result_{ts}"
os.makedirs(out_dir, exist_ok=True)
export_excel_files(all_parsed_data, out_dir, f"{file_tag}_{ts}")
print(f"\n🎉 全部完成!结果保存在: {os.path.abspath(out_dir)}")
if __name__ == "__main__":
main()

View File

@ -11,8 +11,8 @@ login_payload = {
"action": "Authenticate",
"return_module": "Users",
"return_action": "Login",
"user_name": "你的用户名", # <--- 记得填
"user_password": "你的密码", # <--- 记得填
"user_name": "TEST", # 在这里填入真实的用户名
"user_password": "test", # 在这里填入真实的密码
"login_theme": "newskin"
}