Files
Contract-document-crawling-…/页面.py

750 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, simpledialog
import os
import numpy as np
import re
# ==========================================
# 第一部分:业务逻辑核心
# ==========================================
class DataProcessor:
def __init__(self):
# 1. 总表表头 (保持不变,严格去空格)
self.columns_general = [
"合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期",
"销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地",
"厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率",
"合同", "总合同额", "外购", "已收款", "未收款", "收款日期",
"最晚发货期", "付款方式", "发货港", "目的港", "发货日期",
"买方单位", "买方信息联系人、电话、邮箱", "收货人信息"
]
# 内贸总表表头
self.columns_domestic_general = [c if c != "外贸合同号" else "内贸合同号" for c in self.columns_general]
# 2. [关键修改] 明细表表头 (完全按照你的截图顺序和名称定义)
self.columns_detail = [
"合同编号", "销售员", "合同标的", "厂家", "货号", "产品描述",
"净合同额美元", "外购", "计算汇率", "外购转美元", "报价总价美元",
"数量", "单位", "外币币种", "外币报价单价",
"报价RMB单价", "报价RMB总价", "售价RMB单价", "售价RMB总价", "折扣率(%)"
]
# OM合同表头 (保持不变)
self.columns_om = [
"合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期",
"销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地",
"买方单位", "买方信息联系人、电话、邮箱", "合同标的", "合同总额",
"已收款", "未收款", "收款日期"
]
# [修改] 定义需要保留两位小数的金额列 (根据新表头更新)
self.money_cols = set([
"合同", "总合同额", "外购", "已收款", "未收款",
"净合同额美元", "外购转美元", "报价总价美元",
"外币报价单价", "报价RMB单价", "报价RMB总价",
"售价RMB单价", "售价RMB总价", "外购产品金额"
])
# [修改] 定义需要百分比展示的列 (根据新表头更新)
self.percent_cols = set([
"折扣率", "折扣率(%)", "计算汇率", "合同币种/美元"
])
# 旧表头映射字典 (现在代码标准已更新为Excel标准这个字典主要用于兼容总表的旧名称)
# 注意:明细表现在不需要映射了,因为 self.columns_detail 已经和 Excel 一样了
self.legacy_map = {
"外币币种": "币种", # 仅用于总表可能的兼容
"汇率": "计算汇率",
# 如果旧Excel里的总表还在用"折扣率(%)",映射回总表的"折扣率"
"折扣率(%)": "折扣率"
}
self.source_cols_processed = []
def safe_float(self, val):
try:
if isinstance(val, str):
val = val.replace(',', '').replace('¥', '').replace('$', '').strip()
if val == '': return 0.0
if pd.isna(val): return 0.0
return float(val)
except:
return 0.0
def format_money_str(self, val):
if pd.isna(val) or str(val).strip() == "": return ""
try:
f_val = self.safe_float(val)
return "{:.2f}".format(f_val)
except:
return str(val)
def format_percent_str(self, val):
if pd.isna(val) or str(val).strip() == "": return ""
try:
s_val = str(val).strip()
if '%' in s_val: return s_val
f_val = self.safe_float(val)
return "{:.2f}%".format(f_val * 100)
except:
return str(val)
def normalize_for_compare(self, val):
if pd.isna(val) or val is None: return ""
s_val = str(val).strip()
if s_val.lower() == 'nan': return ""
clean_val = s_val.replace(',', '').replace('%', '')
try:
f_val = float(clean_val)
return "{:.4f}".format(f_val)
except:
return s_val
def load_csv(self, file_path):
df = None
encodings = ['utf-8', 'gbk', 'gb18030']
for enc in encodings:
try:
df = pd.read_csv(file_path, encoding=enc)
break
except UnicodeDecodeError:
continue
if df is None:
try:
df = pd.read_csv(file_path, encoding='gb18030', encoding_errors='replace')
except:
return None, "无法读取文件,请检查编码。"
col_factory_general = '厂家'
col_factory_detail = '厂家.1' if '厂家.1' in df.columns else '厂家'
df[col_factory_general] = df[col_factory_general].fillna('').astype(str)
df['合同类型'] = df['合同类型'].fillna('').astype(str)
return df, (col_factory_general, col_factory_detail)
def parse_complex_subject(self, text):
res = {'name': '', 'model': '', 'qty': '', 'unit': '', 'price': ''}
if not isinstance(text, str) or not text.strip(): return res
text = text.strip()
name_patterns = [r'(?:中文品名|中文名称|名称|Name)[:]\s*(.*?)(?:\n|$)', r'(?:英文名称)[:]\s*(.*?)(?:\n|$)']
for p in name_patterns:
m = re.search(p, text, re.IGNORECASE)
if m and not res['name']: res['name'] = m.group(1).strip()
model_patterns = [r'(?:型号|Model)[:]\s*(.*?)(?:\n|$)']
for p in model_patterns:
m = re.search(p, text, re.IGNORECASE)
if m: res['model'] = m.group(1).strip()
brand_match = re.search(r'(?:品牌|Brand)[:]\s*(.*?)(?:\n|$)', text, re.IGNORECASE)
if brand_match:
brand_str = brand_match.group(1).strip()
if res['model']:
res['model'] = f"{brand_str} {res['model']}"
else:
res['model'] = brand_str
clean_text = text
for k in ['中文品名', '中文名称', '英文名称', '名称', '型号', 'Model', '品牌', 'Brand']:
clean_text = re.sub(f'{k}.*?(?:\n|$)', '', clean_text, flags=re.IGNORECASE)
if not res['name'] and '/' in text:
parts = text.split('/')
if len(parts) > 0: res['name'] = parts[0].strip()
if not res['qty']:
qty_slash = re.search(r'/(\d+(\.\d+)?)/', text)
if qty_slash:
res['qty'] = qty_slash.group(1)
else:
qty_unit_match = re.search(r'(\d+)\s*([台个套件支箱组setpc]+)|([setpc]+)\s*(\d+)', text, re.IGNORECASE)
if qty_unit_match:
if qty_unit_match.group(1):
res['qty'] = qty_unit_match.group(1)
res['unit'] = qty_unit_match.group(2)
else:
res['qty'] = qty_unit_match.group(4)
res['unit'] = qty_unit_match.group(3)
nums = re.findall(r'\d+(?:\.\d+)?', text.replace(',', '').replace('', ''))
if nums:
candidate = nums[-1]
if candidate != res['qty']: res['price'] = candidate
if not res['name'] and not res['model'] and '/' in text:
parts = text.split('/')
if len(parts) >= 1: res['name'] = parts[0]
if len(parts) >= 2: res['model'] = parts[1]
if len(parts) >= 3: res['qty'] = parts[2]
if len(parts) >= 4: res['price'] = parts[3]
return res
def parse_buyer_info(self, text):
info = {'name': '', 'contact_full': ''}
if not isinstance(text, str) or not text.strip(): return info
lines = [l.strip() for l in text.split('\n') if l.strip()]
if not lines: return info
info['name'] = lines[0]
info['contact_full'] = " ".join(lines[1:])
return info
def process_row_general(self, row, trade_type, col_factory):
target_cols = self.columns_general if trade_type == '外贸' else self.columns_domestic_general
new_row = {col: "" for col in target_cols}
order_no_raw = str(row.get('合同订单编号', '')).strip()
parts_no = order_no_raw.split()
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
contract_no_col = '外贸合同号' if trade_type == '外贸' else '内贸合同号'
new_row[contract_no_col] = " ".join(parts_no[1:]) if len(parts_no) > 1 else ""
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parsed_target = self.parse_complex_subject(target_raw)
new_row['合同标的'] = parsed_target['name']
new_row['型号/货号'] = parsed_target['model']
new_row['数量'] = parsed_target['qty']
new_row['单位'] = parsed_target['unit']
new_row['合同'] = parsed_target['price']
if trade_type == '内贸':
buyer_raw = str(row.get('合同买方(名称/联系人/电话/邮箱)', ''))
else:
buyer_raw = str(row.get('进口代理(名称/USCI/地址/联系人/电话/邮箱)', ''))
if buyer_raw == '' or buyer_raw == 'nan':
buyer_raw = str(row.get('合同买方(名称/联系人/电话/邮箱)', ''))
parsed_buyer = self.parse_buyer_info(buyer_raw)
new_row['买方单位'] = parsed_buyer['name']
new_row['买方信息联系人、电话、邮箱'] = parsed_buyer['contact_full']
new_row['收货人信息'] = parsed_buyer['name']
total_amount = row.get('合同总额', '')
status = str(row.get('收款状态', '')).strip()
new_row['总合同额'] = total_amount
new_row['收款情况'] = status
if '已收' in status:
new_row['已收款'] = total_amount
new_row['未收款'] = 0
else:
new_row['已收款'] = ""
new_row['未收款'] = ""
new_row['签署公司'] = row.get('收款账户', '')
new_row['合同签订日期'] = row.get('签约日期', '')
new_row['销售员'] = row.get('负责人', '')
new_row['最终用户单位'] = row.get('客户名称', '')
new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '')
new_row['厂家'] = row.get(col_factory, '')
new_row['币种'] = row.get('货币(选完产品再改)', '')
new_row['外购'] = row.get('外购产品金额', '')
new_row['收款日期'] = row.get('最新收款日期', '')
new_row['最晚发货期'] = row.get('最晚发货期', '')
new_row['付款方式'] = row.get('付款比例及期限', '')
new_row['发货港'] = row.get('发货地', '')
new_row['目的港'] = row.get('目的港', '')
new_row['折扣率'] = row.get('折扣率', '')
return pd.Series(new_row)
# [关键修改] 明细表处理逻辑更新,匹配新表头
def process_row_detail(self, row, col_factory):
new_row = {col: "" for col in self.columns_detail}
detail_manuf_val = str(row.get(col_factory, ''))
order_no_raw = str(row.get('合同订单编号', '')).strip()
parts_no = order_no_raw.split()
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
new_row['销售员'] = row.get('负责人', '')
new_row['厂家'] = detail_manuf_val
new_row['货号'] = row.get('产品编码', '')
# 币种 -> 外币币种
new_row['外币币种'] = row.get('原币种', '')
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parsed_target = self.parse_complex_subject(target_raw)
new_row['合同标的'] = parsed_target['name']
csv_qty = str(row.get('数量', '')).strip()
if csv_qty and csv_qty.lower() != 'nan':
new_row['数量'] = csv_qty
else:
new_row['数量'] = parsed_target['qty']
new_row['单位'] = parsed_target['unit']
val_product_subtotal = self.safe_float(row.get('产品小计', 0))
if '外购' in detail_manuf_val:
new_row['外购'] = val_product_subtotal
remark = str(row.get('备注', '')).strip()
if not remark or remark.lower() == 'nan':
outsourced_detail = str(row.get('外购产品明细', '')).strip()
if outsourced_detail and outsourced_detail.lower() != 'nan':
new_row['产品描述'] = outsourced_detail
else:
new_row['产品描述'] = ""
else:
new_row['产品描述'] = remark
else:
new_row['外购'] = ""
new_row['产品描述'] = row.get('产品名称', '')
# 美元报价 -> 外币报价单价
new_row['外币报价单价'] = row.get('美元报价', '')
# 产品小计 -> 报价RMB总价 (假设逻辑)
new_row['报价RMB总价'] = row.get('产品小计', '')
new_row['计算汇率'] = row.get('汇率', '')
new_row['折扣率(%)'] = row.get('折扣率', '')
new_row['售价RMB单价'] = row.get('销售单价', '')
new_row['售价RMB总价'] = row.get('销售总价', '')
new_row['外购转美元'] = row.get('外购转美元', '')
new_row['报价总价美元'] = row.get('报价总价美元', '')
new_row['净合同额美元'] = row.get('净合同额美元', '')
new_row['报价RMB单价'] = row.get('报价RMB单价', '') # 如果CSV有这一列如果没有则为空
return pd.Series(new_row)
def process_row_om(self, row):
new_row = {col: "" for col in self.columns_om}
order_no_raw = str(row.get('合同订单编号', '')).strip()
parts_no = order_no_raw.split()
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
if len(parts_no) > 1: new_row['内贸合同号'] = " ".join(parts_no[1:])
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parsed_target = self.parse_complex_subject(target_raw)
new_row['合同标的'] = parsed_target['name']
total_amount = row.get('合同总额', '')
status = str(row.get('收款状态', '')).strip()
new_row['合同总额'] = total_amount
new_row['收款情况'] = status
if '已收' in status:
new_row['已收款'] = total_amount
new_row['未收款'] = 0
else:
new_row['已收款'] = ""
new_row['未收款'] = ""
new_row['签署公司'] = row.get('收款账户', '')
new_row['签订日期'] = row.get('签约日期', '')
new_row['销售员'] = row.get('负责人', '')
new_row['最终用户单位'] = row.get('客户名称', '')
new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '')
buyer_raw = str(row.get('合同买方(名称/联系人/电话/邮箱)', ''))
parsed_buyer = self.parse_buyer_info(buyer_raw)
new_row['买方单位'] = parsed_buyer['name']
new_row['买方信息联系人、电话、邮箱'] = parsed_buyer['contact_full']
new_row['收款日期'] = row.get('最新收款日期', '')
return pd.Series(new_row)
def merge_datasets(self, old_dfs, csv_df, is_asd):
col_gen = '厂家'
col_det = '厂家.1' if '厂家.1' in csv_df.columns else '厂家'
if is_asd:
df_subset = csv_df[csv_df[col_gen].str.contains('ASD', case=False, na=False)]
else:
df_subset = csv_df[~csv_df[col_gen].str.contains('ASD', case=False, na=False)]
csv_foreign = df_subset[df_subset['合同类型'] == '外贸'].copy()
csv_domestic = df_subset[df_subset['合同类型'] == '内贸'].copy()
csv_om = df_subset[~df_subset['合同类型'].isin(['外贸', '内贸'])].copy()
result_dfs = {}
def merge_logic(old_df, new_rows_df, unique_col, target_columns):
if old_df is None or old_df.empty:
if new_rows_df.empty: return pd.DataFrame(columns=target_columns + ['_status'])
combined = new_rows_df.copy()
combined['_status'] = 'new'
return combined
combined = old_df.copy()
# 确保旧数据列名存在
for col in target_columns:
if col not in combined.columns:
combined[col] = ""
if unique_col in combined.columns:
combined[unique_col] = combined[unique_col].astype(str)
if unique_col in new_rows_df.columns:
new_rows_df[unique_col] = new_rows_df[unique_col].astype(str)
if '_status' not in combined.columns:
combined['_status'] = ''
if new_rows_df.empty:
return combined
new_contract_ids = new_rows_df[unique_col].unique()
rows_to_append = []
for cid in new_contract_ids:
new_subset = new_rows_df[new_rows_df[unique_col] == cid]
old_indices = combined[combined[unique_col] == cid].index
if len(old_indices) > 0:
idx = old_indices[0]
has_changed = False
new_row_series = new_subset.iloc[0]
for col in target_columns:
if col in new_row_series:
new_val = new_row_series[col]
old_val = combined.at[idx, col]
# 保护逻辑:新值非空才覆盖
if str(new_val).strip() != "":
if self.normalize_for_compare(old_val) != self.normalize_for_compare(new_val):
combined.at[idx, col] = new_val
has_changed = True
if has_changed:
combined.at[idx, '_status'] = 'modified'
else:
new_subset_copy = new_subset.copy()
new_subset_copy['_status'] = 'new'
rows_to_append.append(new_subset_copy)
if rows_to_append:
combined = pd.concat([combined] + rows_to_append, ignore_index=True)
return combined
# --- 合并执行 ---
if not csv_foreign.empty:
new_gen = csv_foreign.apply(lambda r: self.process_row_general(r, '外贸', col_gen), axis=1)
new_gen = new_gen.drop_duplicates(subset=['合同编号'], keep='first')
else:
new_gen = pd.DataFrame(columns=self.columns_general)
old_gen = old_dfs.get('外贸', old_dfs.get('外贸总表', pd.DataFrame(columns=self.columns_general)))
result_dfs['外贸'] = merge_logic(old_gen, new_gen, '合同编号', self.columns_general)
if not csv_foreign.empty:
new_det = csv_foreign.apply(lambda r: self.process_row_detail(r, col_det), axis=1)
else:
new_det = pd.DataFrame(columns=self.columns_detail)
old_det = old_dfs.get('外贸明细', pd.DataFrame(columns=self.columns_detail))
result_dfs['外贸明细'] = merge_logic(old_det, new_det, '合同编号', self.columns_detail)
if not csv_domestic.empty:
new_dom_gen = csv_domestic.apply(lambda r: self.process_row_general(r, '内贸', col_gen), axis=1)
new_dom_gen = new_dom_gen.drop_duplicates(subset=['合同编号'], keep='first')
else:
new_dom_gen = pd.DataFrame(columns=self.columns_domestic_general)
old_dom_gen = old_dfs.get('内贸', old_dfs.get('内贸总表', pd.DataFrame(columns=self.columns_domestic_general)))
result_dfs['内贸'] = merge_logic(old_dom_gen, new_dom_gen, '合同编号', self.columns_domestic_general)
if not csv_domestic.empty:
new_dom_det = csv_domestic.apply(lambda r: self.process_row_detail(r, col_det), axis=1)
else:
new_dom_det = pd.DataFrame(columns=self.columns_detail)
old_dom_det = old_dfs.get('内贸明细', pd.DataFrame(columns=self.columns_detail))
result_dfs['内贸明细'] = merge_logic(old_dom_det, new_dom_det, '合同编号', self.columns_detail)
if not csv_om.empty:
new_om = csv_om.apply(lambda r: self.process_row_om(r), axis=1)
new_om = new_om.drop_duplicates(subset=['合同编号'], keep='first')
else:
new_om = pd.DataFrame(columns=self.columns_om)
old_om = old_dfs.get('OM合同', old_dfs.get('其他', pd.DataFrame(columns=self.columns_om)))
result_dfs['OM合同'] = merge_logic(old_om, new_om, '合同编号', self.columns_om)
return result_dfs
def apply_formatting_to_all(self, data_dict):
for sheet_name, df in data_dict.items():
if df.empty: continue
for col in self.money_cols:
if col in df.columns:
df[col] = df[col].apply(self.format_money_str)
for col in self.percent_cols:
if col in df.columns:
df[col] = df[col].apply(self.format_percent_str)
return data_dict
# ==========================================
# 第二部分GUI 界面
# ==========================================
class ContractApp:
def __init__(self, root):
self.root = root
self.root.title("合同数据处理系统 V3.2 (表头修正版)")
self.root.geometry("1300x850")
self.style = ttk.Style()
self.style.theme_use('clam')
self.colors = {'bg': '#F5F6FA', 'primary': '#409EFF', 'success': '#67C23A', 'warning': '#E6A23C',
'text': '#2C3E50', 'panel': '#FFFFFF'}
self.root.configure(bg=self.colors['bg'])
self.default_font = ("微软雅黑", 10)
self.header_font = ("微软雅黑", 11, "bold")
self.style.configure("TFrame", background=self.colors['bg'])
self.style.configure("Panel.TFrame", background=self.colors['panel'], relief="flat")
self.style.configure("TLabel", background=self.colors['panel'], foreground=self.colors['text'],
font=self.default_font)
self.style.configure("Header.TLabel", font=("微软雅黑", 16, "bold"), background=self.colors['bg'],
foreground=self.colors['text'])
self.style.configure("TButton", font=("微软雅黑", 10), borderwidth=0, padding=6)
self.style.map("TButton", background=[('active', '#E0E0E0')])
self.style.configure("Primary.TButton", background=self.colors['primary'], foreground='white')
self.style.map("Primary.TButton", background=[('active', '#66B1FF')])
self.style.configure("Success.TButton", background=self.colors['success'], foreground='white')
self.style.map("Success.TButton", background=[('active', '#85CE61')])
self.style.configure("Treeview", background="white", foreground="black", fieldbackground="white", rowheight=28,
font=("微软雅黑", 9))
self.style.configure("Treeview.Heading", font=("微软雅黑", 10, "bold"), background="#EBEEF5",
foreground="#606266")
self.style.map("Treeview", background=[('selected', '#409EFF')])
self.processor = DataProcessor()
self.csv_path = tk.StringVar()
self.asd_path = tk.StringVar()
self.non_asd_path = tk.StringVar()
self.final_data = {}
self.create_widgets()
def create_widgets(self):
header_frame = ttk.Frame(self.root)
header_frame.pack(fill="x", padx=20, pady=(20, 10))
ttk.Label(header_frame, text="📄 合同数据处理工具 (支持 OM合同)", style="Header.TLabel").pack(side="left")
input_panel = ttk.Frame(self.root, style="Panel.TFrame", padding=20)
input_panel.pack(fill="x", padx=20, pady=5)
ttk.Label(input_panel, text="文件配置 (若未选择旧文件,将自动生成新文件)", font=self.header_font).grid(row=0,
column=0,
columnspan=3,
sticky="w",
pady=(0,
15))
self.create_file_row(input_panel, "📂 导入 CSV 源文件:", self.csv_path, 1)
self.create_file_row(input_panel, "📘 旧 ASD Excel 文件:", self.asd_path, 2)
self.create_file_row(input_panel, "📗 旧 非ASD Excel 文件:", self.non_asd_path, 3)
btn_frame = ttk.Frame(input_panel, style="Panel.TFrame")
btn_frame.grid(row=4, column=0, columnspan=3, pady=(15, 0), sticky="e")
ttk.Button(btn_frame, text="▶ 开始处理并预览", style="Primary.TButton", command=self.process_files).pack(
side="right")
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(fill="both", expand=True, padx=20, pady=10)
bottom_bar = ttk.Frame(self.root, style="Panel.TFrame", padding=15)
bottom_bar.pack(fill="x", padx=20, pady=(0, 20))
legend_frame = ttk.Frame(bottom_bar, style="Panel.TFrame")
legend_frame.pack(side="left")
self.create_legend(legend_frame, "■ 新增数据", "#FFFFCC", "black")
self.create_legend(legend_frame, "■ 有修改/变动", "#ECF5FF", "#409EFF")
self.create_legend(legend_frame, "□ 无变动", "white", "black")
ttk.Button(bottom_bar, text="💾 保存更改至 Excel", style="Success.TButton", command=self.save_files).pack(
side="right")
def create_file_row(self, parent, label_text, var, row_idx):
ttk.Label(parent, text=label_text, width=20).grid(row=row_idx, column=0, sticky="w", pady=5)
entry = ttk.Entry(parent, textvariable=var, font=("微软雅黑", 9))
entry.grid(row=row_idx, column=1, sticky="ew", padx=10, pady=5)
ttk.Button(parent, text="浏览", command=lambda: self.browse_file(var)).grid(row=row_idx, column=2, padx=5)
parent.columnconfigure(1, weight=1)
def create_legend(self, parent, text, bg_color, fg_color):
lbl = tk.Label(parent, text=text, bg=bg_color, fg=fg_color, font=("微软雅黑", 9), padx=8, pady=3, borderwidth=1,
relief="solid")
lbl.pack(side="left", padx=5)
def browse_file(self, variable):
f = filedialog.askopenfilename(filetypes=[("Excel/CSV Files", "*.csv;*.xlsx")])
if f: variable.set(f)
def load_excel_safe(self, path):
if not path or not os.path.exists(path):
return {}
try:
dfs = pd.read_excel(path, sheet_name=None)
clean_dfs = {}
for k, v in dfs.items():
v.columns = v.columns.astype(str).str.replace(r'\s+', '', regex=True)
# 总表仍可能需要 legacy_map但明细表不需要了因为我们已经在代码里统一了列名
v.rename(columns=self.processor.legacy_map, inplace=True)
v = v.loc[:, ~v.columns.duplicated()]
if '合同编号' in v.columns:
v['合同编号'] = v['合同编号'].astype(str)
clean_dfs[k.strip()] = v
return clean_dfs
except Exception as e:
messagebox.showwarning("读取错误", f"读取旧文件失败: {path}\n错误: {str(e)}")
return {}
def process_files(self):
if not self.csv_path.get():
messagebox.showerror("提示", "请先选择 CSV 源文件!")
return
csv_df, headers = self.processor.load_csv(self.csv_path.get())
if csv_df is None:
messagebox.showerror("错误", headers)
return
self.final_data = {}
path_asd = self.asd_path.get()
asd_old = self.load_excel_safe(path_asd)
self.final_data['ASD'] = self.processor.merge_datasets(asd_old, csv_df, True)
path_non = self.non_asd_path.get()
non_old = self.load_excel_safe(path_non)
self.final_data['NonASD'] = self.processor.merge_datasets(non_old, csv_df, False)
self.final_data['ASD'] = self.processor.apply_formatting_to_all(self.final_data['ASD'])
self.final_data['NonASD'] = self.processor.apply_formatting_to_all(self.final_data['NonASD'])
self.refresh_preview()
messagebox.showinfo("完成", "数据处理完成!\n请查看预览,确认无误后点击下方保存。")
def refresh_preview(self):
for tab in self.notebook.tabs():
self.notebook.forget(tab)
for file_type in ['ASD', 'NonASD']:
if file_type not in self.final_data: continue
data_dict = self.final_data[file_type]
main_frame = ttk.Frame(self.notebook, style="Panel.TFrame")
self.notebook.add(main_frame, text=f" {file_type} 文件预览 ")
inner_notebook = ttk.Notebook(main_frame)
inner_notebook.pack(fill="both", expand=True, padx=5, pady=5)
sheet_order = ['外贸', '外贸明细', '内贸', '内贸明细', 'OM合同']
for sheet_name in sheet_order:
if sheet_name in data_dict:
df = data_dict[sheet_name]
if not df.empty:
if '合同编号' in df.columns:
df['合同编号'] = df['合同编号'].astype(str)
df = df.sort_values(by='合同编号', ascending=True)
if '明细' in sheet_name:
mask = df.duplicated(subset=['合同编号'], keep='first')
df.loc[mask, '合同标的'] = ""
standard_cols = []
if sheet_name == '外贸':
standard_cols = self.processor.columns_general
elif sheet_name == '内贸':
standard_cols = self.processor.columns_domestic_general
elif sheet_name == 'OM合同':
standard_cols = self.processor.columns_om
elif '明细' in sheet_name:
standard_cols = self.processor.columns_detail
self.create_treeview(inner_notebook, df, sheet_name, standard_cols)
def create_treeview(self, parent, df, title, target_cols):
frame = ttk.Frame(parent)
parent.add(frame, text=title)
scroll_y = ttk.Scrollbar(frame, orient="vertical")
scroll_x = ttk.Scrollbar(frame, orient="horizontal")
display_cols = target_cols
tree = ttk.Treeview(frame, columns=display_cols, show='headings',
yscrollcommand=scroll_y.set, xscrollcommand=scroll_x.set)
scroll_y.config(command=tree.yview)
scroll_x.config(command=tree.xview)
scroll_y.pack(side="right", fill="y")
scroll_x.pack(side="bottom", fill="x")
tree.pack(fill="both", expand=True)
for col in display_cols:
tree.heading(col, text=col)
tree.column(col, width=120, anchor="center")
tree.tag_configure('new', background='#FFFFCC')
tree.tag_configure('modified', background='#ECF5FF', foreground='#409EFF')
if not df.empty:
df_display = df.fillna("")
last_contract_id = None
for idx, row in df_display.iterrows():
values = []
for col in display_cols:
val = row.get(col, "")
if '明细' in title and col == '合同标的':
current_id = row.get('合同编号', '')
if current_id == last_contract_id:
val = ""
values.append(val)
if '明细' in title:
last_contract_id = row.get('合同编号', '')
status = row.get('_status', '')
tree.insert("", "end", values=values, tags=(status,))
tree.bind("<Double-1>", lambda event: self.on_double_click(event, tree, df))
def on_double_click(self, event, tree, df):
region = tree.identify("region", event.x, event.y)
if region != "cell": return
column = tree.identify_column(event.x)
row_id = tree.identify_row(event.y)
col_idx = int(column.replace('#', '')) - 1
col_name = tree['columns'][col_idx]
current_val = tree.item(row_id, "values")[col_idx]
new_val = simpledialog.askstring("快速编辑", f"修改 [{col_name}]:", initialvalue=current_val, parent=self.root)
if new_val is not None:
current_values = list(tree.item(row_id, "values"))
current_values[col_idx] = new_val
tree.item(row_id, values=current_values)
def save_files(self):
if not self.final_data: return
base_dir = os.path.dirname(self.csv_path.get()) if self.csv_path.get() else ""
try:
for file_type, sheets in self.final_data.items():
target_path = ""
if file_type == 'ASD':
target_path = self.asd_path.get()
if not target_path: target_path = os.path.join(base_dir, "ASD_Combined.xlsx")
elif file_type == 'NonASD':
target_path = self.non_asd_path.get()
if not target_path: target_path = os.path.join(base_dir, "NonASD_Combined.xlsx")
with pd.ExcelWriter(target_path, engine='openpyxl') as writer:
valid_sheets = ['外贸', '外贸明细', '内贸', '内贸明细', 'OM合同']
for sheet_name in valid_sheets:
if sheet_name in sheets:
df = sheets[sheet_name]
save_df = df.drop(columns=['_status'], errors='ignore')
if not save_df.empty:
if '合同编号' in save_df.columns:
save_df['合同编号'] = save_df['合同编号'].astype(str)
save_df = save_df.sort_values(by='合同编号', ascending=True)
if '明细' in sheet_name:
mask = save_df.duplicated(subset=['合同编号'], keep='first')
save_df.loc[mask, '合同标的'] = ""
save_df.to_excel(writer, sheet_name=sheet_name, index=False)
messagebox.showinfo("成功", f"文件保存成功!\n位置: {base_dir or '当前目录'}")
except PermissionError:
messagebox.showerror("保存失败", "文件被占用!\n请先关闭 Excel 文件后再点击保存。")
except Exception as e:
messagebox.showerror("保存失败", str(e))
if __name__ == "__main__":
root = tk.Tk()
app = ContractApp(root)
root.mainloop()