Files
Contract-document-crawling-…/页面.py

558 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, simpledialog
import os
import numpy as np
# ==========================================
# 第一部分:业务逻辑核心 (保持不变)
# ==========================================
class DataProcessor:
def __init__(self):
# 定义表头配置
self.columns_general = [
"合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期",
"销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地",
"厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率",
"合同", "总合同额", "外购", "已收款", "未收款", "收款日期",
"最晚发货期", "付款方式", "发货港", "目的港", "发货日期",
"买方单位", "买方信息联系人、电话、邮箱", "收货人信息"
]
self.columns_domestic_general = [c if c != "外贸合同号" else "内贸合同号" for c in self.columns_general]
self.columns_detail = [
"合同编号", "销售员", "厂家", "合同标的", "货号", "产品描述", "数量", "单位",
"币种", "报价单价", "报价总价", "销售单价", "销售总价", "折扣率",
"外购", "合同币种/美元", "外购转美元", "报价总价美元", "净合同额美元"
]
self.columns_other = [
"合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期",
"销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地",
"买方单位", "买方信息联系人、电话、邮箱", "合同标的", "合同总额",
"已收款", "未收款", "收款日期"
]
def safe_float(self, val):
try:
if isinstance(val, str):
val = val.replace(',', '').strip()
if val == '': return 0.0
return float(val)
except:
return 0.0
def normalize_for_compare(self, val):
if pd.isna(val) or val is None:
return ""
s_val = str(val).strip()
if s_val.lower() == 'nan':
return ""
try:
f_val = float(s_val)
if f_val.is_integer():
return str(int(f_val))
return str(f_val)
except:
return s_val
def load_csv(self, file_path):
df = None
encodings = ['utf-8', 'gbk', 'gb18030']
for enc in encodings:
try:
df = pd.read_csv(file_path, encoding=enc)
break
except UnicodeDecodeError:
continue
if df is None:
try:
df = pd.read_csv(file_path, encoding='gb18030', encoding_errors='replace')
except:
return None, "无法读取文件,请检查编码。"
col_factory_general = '厂家'
col_factory_detail = '厂家.1' if '厂家.1' in df.columns else '厂家'
df[col_factory_general] = df[col_factory_general].fillna('').astype(str)
df['合同类型'] = df['合同类型'].fillna('').astype(str)
return df, (col_factory_general, col_factory_detail)
def process_row_general(self, row, trade_type, col_factory):
target_cols = self.columns_general if trade_type == '外贸' else self.columns_domestic_general
new_row = {col: "" for col in target_cols}
order_no_raw = str(row.get('合同订单编号', ''))
parts_no = order_no_raw.split(' ')
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
contract_no_col = '外贸合同号' if trade_type == '外贸' else '内贸合同号'
new_row[contract_no_col] = parts_no[1] if len(parts_no) > 1 else ""
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parts_target = target_raw.split('/')
if len(parts_target) >= 1: new_row['合同标的'] = parts_target[0]
if len(parts_target) >= 2: new_row['型号/货号'] = parts_target[1]
if len(parts_target) >= 3: new_row['数量'] = parts_target[2]
if len(parts_target) >= 4: new_row['合同'] = parts_target[3]
new_row['总合同额'] = row.get('合同总额', '')
new_row['签署公司'] = row.get('收款账户', '')
new_row['收款情况'] = row.get('收款状态', '')
new_row['合同签订日期'] = row.get('签约日期', '')
new_row['销售员'] = row.get('负责人', '')
new_row['最终用户单位'] = row.get('客户名称', '')
new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '')
new_row['厂家'] = row.get(col_factory, '')
new_row['币种'] = row.get('货币(选完产品再改)', '')
new_row['外购'] = row.get('外购产品金额', '')
new_row['收款日期'] = row.get('最新收款日期', '')
new_row['最晚发货期'] = row.get('最晚发货期', '')
new_row['付款方式'] = row.get('付款比例及期限', '')
new_row['发货港'] = row.get('发货地', '')
new_row['目的港'] = row.get('目的港', '')
new_row['买方单位'] = row.get('合同买方(名称/联系人/电话/邮箱)', '')
return pd.Series(new_row)
def process_row_detail(self, row, col_factory):
new_row = {col: "" for col in self.columns_detail}
detail_manuf_val = str(row.get(col_factory, ''))
order_no_raw = str(row.get('合同订单编号', ''))
new_row['合同编号'] = order_no_raw.split(' ')[0] if order_no_raw else ""
new_row['销售员'] = row.get('负责人', '')
new_row['厂家'] = detail_manuf_val
new_row['货号'] = row.get('产品编码', '')
new_row['数量'] = row.get('数量', '')
new_row['币种'] = row.get('原币种', '')
new_row['单位'] = ""
new_row['折扣率'] = ""
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parts_target = target_raw.split('/')
new_row['合同标的'] = parts_target[0] if len(parts_target) >= 1 else ""
val_product_subtotal = self.safe_float(row.get('产品小计', 0))
if '外购' in detail_manuf_val:
new_row['外购'] = val_product_subtotal
new_row['产品描述'] = row.get('备注', '')
new_row['报价单价'] = ""
new_row['报价总价'] = ""
new_row['销售单价'] = ""
new_row['销售总价'] = ""
else:
new_row['外购'] = ""
new_row['产品描述'] = row.get('产品名称', '')
new_row['报价单价'] = row.get('美元报价', '')
new_row['报价总价'] = row.get('产品小计', '')
new_row['销售单价'] = ""
new_row['销售总价'] = ""
new_row['合同币种/美元'] = row.get('汇率', '')
new_row['外购转美元'] = ""
new_row['报价总价美元'] = ""
new_row['净合同额美元'] = ""
return pd.Series(new_row)
def process_row_other(self, row):
new_row = {col: "" for col in self.columns_other}
order_no_raw = str(row.get('合同订单编号', ''))
parts_no = order_no_raw.split(' ')
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
new_row['内贸合同号'] = parts_no[1] if len(parts_no) > 1 else ""
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parts_target = target_raw.split('/')
if len(parts_target) >= 1: new_row['合同标的'] = parts_target[0]
new_row['合同总额'] = row.get('合同总额', '')
new_row['签署公司'] = row.get('收款账户', '')
new_row['收款情况'] = row.get('收款状态', '')
new_row['签订日期'] = row.get('签约日期', '')
new_row['销售员'] = row.get('负责人', '')
new_row['最终用户单位'] = row.get('客户名称', '')
new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '')
new_row['买方单位'] = row.get('合同买方(名称/联系人/电话/邮箱)', '')
new_row['收款日期'] = row.get('最新收款日期', '')
return pd.Series(new_row)
def merge_datasets(self, old_dfs, csv_df, is_asd):
col_gen = '厂家'
col_det = '厂家.1' if '厂家.1' in csv_df.columns else '厂家'
if is_asd:
df_subset = csv_df[csv_df[col_gen].str.contains('ASD', case=False, na=False)]
else:
df_subset = csv_df[~csv_df[col_gen].str.contains('ASD', case=False, na=False)]
csv_foreign = df_subset[df_subset['合同类型'] == '外贸'].copy()
csv_domestic = df_subset[df_subset['合同类型'] == '内贸'].copy()
csv_other = df_subset[~df_subset['合同类型'].isin(['外贸', '内贸'])].copy()
result_dfs = {}
def is_row_different(old_row, new_row, columns):
for col in columns:
if col == '_status': continue
v1 = old_row.get(col)
v2 = new_row.get(col)
if self.normalize_for_compare(v1) != self.normalize_for_compare(v2):
return True
return False
def merge_logic(old_df, new_rows_df, unique_col, sheet_type='general'):
if old_df is None or old_df.empty:
if new_rows_df.empty: return pd.DataFrame()
combined = new_rows_df.copy()
combined['_status'] = 'new'
return combined
combined = old_df.copy()
# 明细表填充逻辑修复
if sheet_type == 'detail' and '合同标的' in combined.columns and '合同编号' in combined.columns:
combined['合同标的'] = combined['合同标的'].replace(r'^\s*$', np.nan, regex=True)
combined['合同标的'] = combined.groupby('合同编号')['合同标的'].ffill()
combined['合同标的'] = combined['合同标的'].fillna("")
if '_status' not in combined.columns:
combined['_status'] = ''
if new_rows_df.empty:
return combined
new_contract_ids = new_rows_df[unique_col].unique()
for cid in new_contract_ids:
new_subset = new_rows_df[new_rows_df[unique_col] == cid].copy()
old_indices = combined[combined[unique_col] == cid].index
if len(old_indices) > 0:
old_subset = combined.loc[old_indices]
has_changed = False
if len(old_subset) != len(new_subset):
has_changed = True
else:
old_comp = old_subset.reset_index(drop=True)
new_comp = new_subset.reset_index(drop=True)
cols = [c for c in new_subset.columns if c != '_status']
for i in range(len(old_comp)):
if is_row_different(old_comp.iloc[i], new_comp.iloc[i], cols):
has_changed = True
break
if has_changed:
combined.drop(old_indices, inplace=True)
new_subset['_status'] = 'modified'
combined = pd.concat([combined, new_subset], ignore_index=True)
else:
combined.drop(old_indices, inplace=True)
new_subset['_status'] = ''
combined = pd.concat([combined, new_subset], ignore_index=True)
else:
new_subset['_status'] = 'new'
combined = pd.concat([combined, new_subset], ignore_index=True)
return combined
if not csv_foreign.empty:
new_gen = csv_foreign.apply(lambda r: self.process_row_general(r, '外贸', col_gen), axis=1)
new_gen = new_gen.drop_duplicates(subset=['合同编号'], keep='first')
else:
new_gen = pd.DataFrame(columns=self.columns_general)
old_gen = old_dfs.get('外贸总表', pd.DataFrame(columns=self.columns_general))
result_dfs['外贸总表'] = merge_logic(old_gen, new_gen, '合同编号', 'general')
if not csv_foreign.empty:
new_det = csv_foreign.apply(lambda r: self.process_row_detail(r, col_det), axis=1)
else:
new_det = pd.DataFrame(columns=self.columns_detail)
old_det = old_dfs.get('外贸明细', pd.DataFrame(columns=self.columns_detail))
result_dfs['外贸明细'] = merge_logic(old_det, new_det, '合同编号', 'detail')
if not csv_domestic.empty:
new_dom_gen = csv_domestic.apply(lambda r: self.process_row_general(r, '内贸', col_gen), axis=1)
new_dom_gen = new_dom_gen.drop_duplicates(subset=['合同编号'], keep='first')
else:
new_dom_gen = pd.DataFrame(columns=self.columns_domestic_general)
old_dom_gen = old_dfs.get('内贸总表', pd.DataFrame(columns=self.columns_domestic_general))
result_dfs['内贸总表'] = merge_logic(old_dom_gen, new_dom_gen, '合同编号', 'general')
if not csv_domestic.empty:
new_dom_det = csv_domestic.apply(lambda r: self.process_row_detail(r, col_det), axis=1)
else:
new_dom_det = pd.DataFrame(columns=self.columns_detail)
old_dom_det = old_dfs.get('内贸明细', pd.DataFrame(columns=self.columns_detail))
result_dfs['内贸明细'] = merge_logic(old_dom_det, new_dom_det, '合同编号', 'detail')
if not csv_other.empty:
new_other = csv_other.apply(lambda r: self.process_row_other(r), axis=1)
new_other = new_other.drop_duplicates(subset=['合同编号'], keep='first')
else:
new_other = pd.DataFrame(columns=self.columns_other)
old_other = old_dfs.get('其他', pd.DataFrame(columns=self.columns_other))
result_dfs['其他'] = merge_logic(old_other, new_other, '合同编号', 'general')
return result_dfs
# ==========================================
# 第二部分GUI 界面 (美化版)
# ==========================================
class ContractApp:
def __init__(self, root):
self.root = root
self.root.title("合同数据处理系统 V2.0")
self.root.geometry("1300x850")
# === 样式配置 ===
self.style = ttk.Style()
self.style.theme_use('clam') # 使用 clam 主题作为基础,更易定制
# 颜色定义
self.colors = {
'bg': '#F5F6FA', # 整体背景灰白
'primary': '#409EFF', # 主色蓝
'success': '#67C23A', # 成功绿
'warning': '#E6A23C', # 警告黄
'text': '#2C3E50', # 文字深灰
'panel': '#FFFFFF' # 面板白
}
self.root.configure(bg=self.colors['bg'])
# 配置字体和通用控件样式
self.default_font = ("微软雅黑", 10)
self.header_font = ("微软雅黑", 11, "bold")
self.style.configure("TFrame", background=self.colors['bg'])
self.style.configure("Panel.TFrame", background=self.colors['panel'], relief="flat")
self.style.configure("TLabel", background=self.colors['panel'], foreground=self.colors['text'],
font=self.default_font)
self.style.configure("Header.TLabel", font=("微软雅黑", 16, "bold"), background=self.colors['bg'],
foreground=self.colors['text'])
# 按钮样式
self.style.configure("TButton", font=("微软雅黑", 10), borderwidth=0, padding=6)
self.style.map("TButton", background=[('active', '#E0E0E0')])
# 主要按钮 (Primary)
self.style.configure("Primary.TButton", background=self.colors['primary'], foreground='white')
self.style.map("Primary.TButton", background=[('active', '#66B1FF')])
# 成功按钮 (Success)
self.style.configure("Success.TButton", background=self.colors['success'], foreground='white')
self.style.map("Success.TButton", background=[('active', '#85CE61')])
# 表格样式 (Treeview)
self.style.configure("Treeview",
background="white",
foreground="black",
fieldbackground="white",
rowheight=28, # 增加行高
font=("微软雅黑", 9))
self.style.configure("Treeview.Heading",
font=("微软雅黑", 10, "bold"),
background="#EBEEF5",
foreground="#606266")
self.style.map("Treeview", background=[('selected', '#409EFF')])
# 逻辑处理器
self.processor = DataProcessor()
self.csv_path = tk.StringVar()
self.asd_path = tk.StringVar()
self.non_asd_path = tk.StringVar()
self.final_data = {}
self.create_widgets()
def create_widgets(self):
# --- 顶部标题 ---
header_frame = ttk.Frame(self.root)
header_frame.pack(fill="x", padx=20, pady=(20, 10))
ttk.Label(header_frame, text="📄 合同数据智能合并与处理工具", style="Header.TLabel").pack(side="left")
# --- 文件选择区 (卡片式) ---
input_panel = ttk.Frame(self.root, style="Panel.TFrame", padding=20)
input_panel.pack(fill="x", padx=20, pady=5)
# 标题提示
ttk.Label(input_panel, text="文件配置 (若未选择旧文件,将自动生成新文件)", font=self.header_font).grid(row=0,
column=0,
columnspan=3,
sticky="w",
pady=(0,
15))
self.create_file_row(input_panel, "📂 导入 CSV 源文件:", self.csv_path, 1)
self.create_file_row(input_panel, "📘 旧 ASD Excel 文件:", self.asd_path, 2)
self.create_file_row(input_panel, "📗 旧 非ASD Excel 文件:", self.non_asd_path, 3)
# 处理按钮
btn_frame = ttk.Frame(input_panel, style="Panel.TFrame")
btn_frame.grid(row=4, column=0, columnspan=3, pady=(15, 0), sticky="e")
ttk.Button(btn_frame, text="▶ 开始处理并预览", style="Primary.TButton", command=self.process_files).pack(
side="right")
# --- 数据展示区 ---
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(fill="both", expand=True, padx=20, pady=10)
# --- 底部操作栏 ---
bottom_bar = ttk.Frame(self.root, style="Panel.TFrame", padding=15)
bottom_bar.pack(fill="x", padx=20, pady=(0, 20))
# 图例
legend_frame = ttk.Frame(bottom_bar, style="Panel.TFrame")
legend_frame.pack(side="left")
self.create_legend(legend_frame, "■ 新增数据", "#FFFFCC", "black")
self.create_legend(legend_frame, "■ 有修改/变动", "#ECF5FF", "#409EFF")
self.create_legend(legend_frame, "□ 无变动", "white", "black")
ttk.Button(bottom_bar, text="💾 保存更改至 Excel", style="Success.TButton", command=self.save_files).pack(
side="right")
def create_file_row(self, parent, label_text, var, row_idx):
ttk.Label(parent, text=label_text, width=20).grid(row=row_idx, column=0, sticky="w", pady=5)
entry = ttk.Entry(parent, textvariable=var, font=("微软雅黑", 9))
entry.grid(row=row_idx, column=1, sticky="ew", padx=10, pady=5)
ttk.Button(parent, text="浏览", command=lambda: self.browse_file(var)).grid(row=row_idx, column=2, padx=5)
parent.columnconfigure(1, weight=1)
def create_legend(self, parent, text, bg_color, fg_color):
lbl = tk.Label(parent, text=text, bg=bg_color, fg=fg_color, font=("微软雅黑", 9), padx=8, pady=3, borderwidth=1,
relief="solid")
lbl.pack(side="left", padx=5)
def browse_file(self, variable):
f = filedialog.askopenfilename(filetypes=[("Excel/CSV Files", "*.csv;*.xlsx")])
if f: variable.set(f)
def process_files(self):
if not self.csv_path.get():
messagebox.showerror("提示", "请先选择 CSV 源文件!")
return
csv_df, headers = self.processor.load_csv(self.csv_path.get())
if csv_df is None:
messagebox.showerror("错误", headers)
return
self.final_data = {}
# ASD 处理
path_asd = self.asd_path.get()
asd_old = pd.read_excel(path_asd, sheet_name=None) if path_asd and os.path.exists(path_asd) else {}
self.final_data['ASD'] = self.processor.merge_datasets(asd_old, csv_df, True)
# 非ASD 处理
path_non = self.non_asd_path.get()
non_old = pd.read_excel(path_non, sheet_name=None) if path_non and os.path.exists(path_non) else {}
self.final_data['NonASD'] = self.processor.merge_datasets(non_old, csv_df, False)
self.refresh_preview()
messagebox.showinfo("完成", "数据处理完成!\n请查看预览,确认无误后点击下方保存。")
def refresh_preview(self):
for tab in self.notebook.tabs():
self.notebook.forget(tab)
for file_type in ['ASD', 'NonASD']:
if file_type not in self.final_data: continue
data_dict = self.final_data[file_type]
main_frame = ttk.Frame(self.notebook, style="Panel.TFrame")
self.notebook.add(main_frame, text=f" {file_type} 文件预览 ")
inner_notebook = ttk.Notebook(main_frame)
inner_notebook.pack(fill="both", expand=True, padx=5, pady=5)
sheet_order = ['外贸总表', '外贸明细', '内贸总表', '内贸明细', '其他']
for sheet_name in sheet_order:
if sheet_name in data_dict:
df = data_dict[sheet_name]
if not df.empty and '合同编号' in df.columns:
df = df.sort_values(by='合同编号', ascending=True)
if '明细' in sheet_name:
mask = df.duplicated(subset=['合同编号'], keep='first')
df.loc[mask, '合同标的'] = ""
self.create_treeview(inner_notebook, df, sheet_name)
def create_treeview(self, parent, df, title):
frame = ttk.Frame(parent)
parent.add(frame, text=title)
# 滚动条容器
scroll_y = ttk.Scrollbar(frame, orient="vertical")
scroll_x = ttk.Scrollbar(frame, orient="horizontal")
display_cols = [c for c in df.columns if c != '_status']
tree = ttk.Treeview(frame, columns=display_cols, show='headings',
yscrollcommand=scroll_y.set, xscrollcommand=scroll_x.set)
scroll_y.config(command=tree.yview)
scroll_x.config(command=tree.xview)
scroll_y.pack(side="right", fill="y")
scroll_x.pack(side="bottom", fill="x")
tree.pack(fill="both", expand=True)
for col in display_cols:
tree.heading(col, text=col)
tree.column(col, width=130, anchor="center") # 居中对齐
# 颜色标签
tree.tag_configure('new', background='#FFFFCC') # 浅黄
# 使用淡蓝色标记有修改的行
tree.tag_configure('modified', background='#ECF5FF', foreground='#409EFF')
if not df.empty:
df_display = df.fillna("")
for idx, row in df_display.iterrows():
values = [row[c] for c in display_cols]
status = row.get('_status', '')
tree.insert("", "end", values=values, tags=(status,))
tree.bind("<Double-1>", lambda event: self.on_double_click(event, tree, df))
def on_double_click(self, event, tree, df):
region = tree.identify("region", event.x, event.y)
if region != "cell": return
column = tree.identify_column(event.x)
row_id = tree.identify_row(event.y)
col_idx = int(column.replace('#', '')) - 1
col_name = tree['columns'][col_idx]
current_val = tree.item(row_id, "values")[col_idx]
new_val = simpledialog.askstring("快速编辑", f"修改 [{col_name}]:", initialvalue=current_val, parent=self.root)
if new_val is not None:
current_values = list(tree.item(row_id, "values"))
current_values[col_idx] = new_val
tree.item(row_id, values=current_values)
def save_files(self):
if not self.final_data:
return
base_dir = os.path.dirname(self.csv_path.get()) if self.csv_path.get() else ""
try:
for file_type, sheets in self.final_data.items():
target_path = ""
if file_type == 'ASD':
target_path = self.asd_path.get()
if not target_path: target_path = os.path.join(base_dir, "ASD_Combined.xlsx")
elif file_type == 'NonASD':
target_path = self.non_asd_path.get()
if not target_path: target_path = os.path.join(base_dir, "NonASD_Combined.xlsx")
with pd.ExcelWriter(target_path, engine='openpyxl') as writer:
for sheet_name, df in sheets.items():
save_df = df.drop(columns=['_status'], errors='ignore')
if not save_df.empty and '合同编号' in save_df.columns:
save_df = save_df.sort_values(by='合同编号', ascending=True)
if '明细' in sheet_name:
mask = save_df.duplicated(subset=['合同编号'], keep='first')
save_df.loc[mask, '合同标的'] = ""
save_df.to_excel(writer, sheet_name=sheet_name, index=False)
messagebox.showinfo("成功", f"文件保存成功!\n位置: {base_dir or '当前目录'}")
except Exception as e:
messagebox.showerror("保存失败", str(e))
if __name__ == "__main__":
root = tk.Tk()
app = ContractApp(root)
root.mainloop()