Files
Contract-document-crawling-…/页面.py
2026-01-21 15:24:12 +08:00

965 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, simpledialog
import os
import numpy as np
import re
from datetime import datetime
# ==========================================
# 第一部分:业务逻辑核心
# ==========================================
class DataProcessor:
def __init__(self):
# ==================== 1. 外贸总表表头 ====================
self.cols_asd_foreign_general = [
"合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期", "销售员",
"最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地",
"厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率",
"合同额", "总合同额", "外购", "已收款", "未收款", "收款日期",
"最晚发货期", "付款方式", "发货港", "目的港", "发货日期",
"买方单位", "买方信息\n联系人、电话、邮箱", "收货人信息",
"转为美元净合同额", "转为美元总合同额"
]
self.cols_nonasd_foreign_general = [
"合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期", "销售员",
"最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地",
"厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率",
"合同额", "总合同额", "外购", "已收款", "未收款", "收款日期",
"最晚发货期", "付款方式", "发货港", "目的港", "发货日期",
"买方单位", "买方信息\n联系人、电话、邮箱", "收货人信息",
"合同币种/美元", "转为美元净合同额", "转为美元总合同额"
]
# ==================== 2. 内贸总表表头 ====================
self.cols_domestic_general = [
"合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", "销售员",
"最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地",
"买方单位", "买方信息\n联系人、电话、邮箱",
"厂家", "型号", "合同标的", "数量", "单位", "折扣率(%)",
"合同额", "合同总额", "外购", "付款方式", "最晚发货期",
"已收款", "未收款", "收款日期",
"转为美元净合同额", "转为美元总合同额"
]
# ==================== 3. 外贸明细表头 ====================
self.cols_foreign_detail = [
"合同编号", "销售员", "合同标的", "厂家", "货号", "产品描述", "数量", "单位",
"币种", "报价单价", "报价总价", "销售单价", "销售总价", "折扣率",
"外购", "合同币种/美元", "外购转美元", "报价总价美元", "净合同额美元"
]
# ==================== 4. 内贸明细表头 ====================
self.cols_domestic_detail = [
"合同编号", "销售员", "合同标的", "厂家", "货号", "产品描述", "数量", "单位",
"外币币种", "外币报价单价", "报价RMB单价", "报价RMB总价",
"售价RMB单价", "售价RMB总价", "折扣率(%)", "外购",
"计算汇率", "外购转美元", "报价总价美元", "净合同额美元"
]
# ==================== 5. OM合同表头 ====================
self.cols_om = [
"合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", "销售员",
"最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地",
"买方单位", "买方信息\n联系人、电话、邮箱", "合同标的",
"合同总额", "已收款", "未收款", "收款日期"
]
# [逻辑] 只写在“第一行”(单价最高行)的列
self.header_only_cols = set([
"总合同额", "合同总额", "外购", "付款方式", "最晚发货期",
"已收款", "未收款", "收款日期", "收款情况",
"转为美元净合同额", "转为美元总合同额"
])
# [逻辑] 金额列 (保留两位小数)
self.money_cols = set([
"合同额", "总合同额", "合同总额", "外购", "已收款", "未收款",
"净合同额美元", "外购转美元", "报价总价美元",
"外币报价单价", "报价RMB单价", "报价RMB总价",
"售价RMB单价", "售价RMB总价", "外购产品金额",
"转为美元净合同额", "转为美元总合同额", "报价单价", "报价总价", "销售单价", "销售总价"
])
# [逻辑] 比率列 (百分比展示)
self.percent_cols = set([
"折扣率", "折扣率(%)", "计算汇率", "合同币种/美元"
])
# [新增逻辑] 日期列 (需要去除时分秒)
self.date_cols = set([
"合同签订日期", "签订日期", "收款日期", "最晚发货期", "发货日期"
])
# [逻辑] 旧表头映射 (用于读取旧Excel时兼容)
self.legacy_map = {
"外币币种": "币种",
"汇率": "计算汇率",
"折扣率(%)": "折扣率",
"折扣率(%": "折扣率(%)",
"合同": "合同额"
}
# [核心] 构建所有标准列名的快速查找字典 (清洗后的key -> 标准带换行的key)
# 目的无论Excel里是 "最终用户信息联系人..." 还是 "最终用户信息\n联系人...", 都能映射回标准
self.standard_col_map = {}
all_lists = [
self.cols_asd_foreign_general, self.cols_nonasd_foreign_general,
self.cols_domestic_general, self.cols_foreign_detail,
self.cols_domestic_detail, self.cols_om
]
for lst in all_lists:
for col in lst:
clean_key = self.clean_header_key(col)
self.standard_col_map[clean_key] = col
def clean_header_key(self, text):
"""清洗表头:去除换行、空格、制表符,只保留纯文本"""
if not isinstance(text, str): return str(text)
return re.sub(r'[\s\n\r]+', '', text)
def safe_float(self, val):
try:
if isinstance(val, str):
val = val.replace(',', '').replace('¥', '').replace('$', '').strip()
if val == '': return 0.0
if pd.isna(val): return 0.0
return float(val)
except:
return 0.0
def format_money_str(self, val):
if pd.isna(val) or str(val).strip() == "": return ""
try:
f_val = self.safe_float(val)
return "{:.2f}".format(f_val)
except:
return str(val)
def format_percent_str(self, val):
if pd.isna(val) or str(val).strip() == "": return ""
try:
s_val = str(val).strip()
if '%' in s_val: return s_val
f_val = self.safe_float(val)
return "{:.2f}%".format(f_val * 100)
except:
return str(val)
def format_date_str(self, val):
"""格式化日期:去除时分秒,统一为 YYYY-MM-DD"""
if pd.isna(val) or str(val).strip() == "": return ""
try:
# 如果已经是短日期字符串,直接返回
s_val = str(val).strip()
# 尝试解析
dt = pd.to_datetime(val, errors='coerce')
if pd.isnull(dt):
return s_val # 解析失败返回原样
return dt.strftime('%Y-%m-%d')
except:
return str(val)
def normalize_for_compare(self, val):
if pd.isna(val) or val is None: return ""
s_val = str(val).strip()
if s_val.lower() == 'nan': return ""
clean_val = s_val.replace(',', '').replace('%', '')
try:
f_val = float(clean_val)
return "{:.4f}".format(f_val)
except:
return s_val
def load_csv(self, file_path):
df = None
encodings = ['utf-8', 'gbk', 'gb18030']
for enc in encodings:
try:
df = pd.read_csv(file_path, encoding=enc)
break
except UnicodeDecodeError:
continue
if df is None:
try:
df = pd.read_csv(file_path, encoding='gb18030', encoding_errors='replace')
except:
return None, "无法读取文件,请检查编码。"
col_factory_general = '厂家'
col_factory_detail = '厂家.1' if '厂家.1' in df.columns else '厂家'
df[col_factory_general] = df[col_factory_general].fillna('').astype(str)
df['合同类型'] = df['合同类型'].fillna('').astype(str)
return df, (col_factory_general, col_factory_detail)
def parse_buyer_info(self, text):
info = {'name': '', 'contact_full': ''}
if not isinstance(text, str) or not text.strip(): return info
lines = [l.strip() for l in text.split('\n') if l.strip()]
if not lines: return info
info['name'] = lines[0]
info['contact_full'] = " ".join(lines[1:])
return info
def parse_single_line_subject(self, text):
res = {'name': '', 'model': '', 'qty': '', 'unit': '', 'price': '', 'sort_price': 0.0}
if not isinstance(text, str) or not text.strip(): return res
text = text.strip()
if '/' in text:
parts = [p.strip() for p in text.split('/')]
if len(parts) >= 1: res['name'] = parts[0]
if len(parts) >= 2: res['model'] = parts[1]
if len(parts) >= 3:
m_qty = re.match(r'^(\d+(\.\d+)?)\s*([\u4e00-\u9fa5a-zA-Z]+)?$', parts[2])
if m_qty:
res['qty'] = m_qty.group(1)
res['unit'] = m_qty.group(3) if m_qty.group(3) else ""
else:
res['qty'] = parts[2]
if len(parts) >= 4:
res['price'] = parts[3]
res['sort_price'] = self.safe_float(parts[3])
return res
name_match = re.search(r'(?:中文品名|中文名称|名称|Name)[:]\s*(.*?)(?:\n|$)', text, re.IGNORECASE)
if name_match:
res['name'] = name_match.group(1).strip()
else:
res['name'] = text.split('\n')[0]
nums = re.findall(r'\d+(?:\.\d+)?', text.replace(',', ''))
if nums:
res['sort_price'] = self.safe_float(nums[-1])
res['price'] = nums[-1]
return res
# === [核心] 总表处理逻辑 ===
def process_row_general_expanded(self, row, trade_type, trade_cols, col_factory):
# 使用传入的 trade_cols (已是根据ASD/NonASD选择好的标准表头)
target_cols = trade_cols
base_data = {}
order_no_raw = str(row.get('合同订单编号', '')).strip()
parts_no = order_no_raw.split()
base_data['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
contract_no_col = '外贸合同号' if trade_type == '外贸' else '内贸合同号'
base_data[contract_no_col] = " ".join(parts_no[1:]) if len(parts_no) > 1 else ""
# 财务数据
total_amount = self.format_money_str(row.get('合同总额', ''))
status = str(row.get('收款状态', '')).strip()
received = ""
unreceived = ""
if '已收' in status:
received = total_amount
unreceived = self.format_money_str(0)
# 买方信息
if trade_type == '内贸':
buyer_raw = str(row.get('合同买方(名称/联系人/电话/邮箱)', ''))
else:
buyer_raw = str(row.get('进口代理(名称/USCI/地址/联系人/电话/邮箱)', ''))
if buyer_raw == '' or buyer_raw == 'nan':
buyer_raw = str(row.get('合同买方(名称/联系人/电话/邮箱)', ''))
parsed_buyer = self.parse_buyer_info(buyer_raw)
# 解析标的
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
lines = [line.strip() for line in target_raw.split('\n') if line.strip()]
parsed_items = []
if not lines:
parsed_items.append({'name': '', 'model': '', 'qty': '', 'unit': '', 'price': '', 'sort_price': 0})
else:
for line in lines:
parsed_items.append(self.parse_single_line_subject(line))
# 排序并只取第一行
parsed_items.sort(key=lambda x: x['sort_price'], reverse=True)
best_item = parsed_items[0]
new_row = {col: "" for col in target_cols}
new_row['合同编号'] = base_data['合同编号']
new_row[contract_no_col] = base_data[contract_no_col]
new_row['签署公司'] = row.get('收款账户', '')
# 日期 (使用新格式化函数)
date_raw = row.get('签约日期', '')
if '合同签订日期' in new_row: new_row['合同签订日期'] = self.format_date_str(date_raw)
if '签订日期' in new_row: new_row['签订日期'] = self.format_date_str(date_raw)
new_row['销售员'] = row.get('负责人', '')
new_row['最终用户单位'] = row.get('客户名称', '')
# 处理带换行符的列名映射
# 通过遍历 target_cols 找到匹配的列
for col in target_cols:
if "最终用户信息" in col: new_row[col] = row.get('联系人姓名', '')
if "买方信息" in col: new_row[col] = parsed_buyer['contact_full']
new_row['厂家'] = row.get(col_factory, '')
if '币种' in new_row: new_row['币种'] = row.get('货币(选完产品再改)', '')
if '发货港' in new_row: new_row['发货港'] = row.get('发货地', '')
if '目的港' in new_row: new_row['目的港'] = row.get('目的港', '')
new_row['买方单位'] = parsed_buyer['name']
if '收货人信息' in new_row: new_row['收货人信息'] = parsed_buyer['name']
discount_col = '折扣率' if '折扣率' in new_row else '折扣率(%)'
if discount_col in new_row: new_row[discount_col] = self.format_percent_str(row.get('折扣率', ''))
new_row['合同标的'] = best_item['name']
if '型号/货号' in new_row: new_row['型号/货号'] = best_item['model']
if '型号' in new_row: new_row['型号'] = best_item['model']
new_row['数量'] = best_item['qty']
new_row['单位'] = best_item['unit']
# 合同额 (单行价格)
if '合同额' in new_row: new_row['合同额'] = self.format_money_str(best_item['price'])
# 财务总额 (整单)
total_col_name = '总合同额' if '总合同额' in new_row else '合同总额'
new_row[total_col_name] = total_amount
new_row['收款情况'] = status
new_row['外购'] = self.format_money_str(row.get('外购产品金额', ''))
new_row['已收款'] = received
new_row['未收款'] = unreceived
new_row['收款日期'] = self.format_date_str(row.get('最新收款日期', ''))
if '最晚发货期' in new_row: new_row['最晚发货期'] = self.format_date_str(row.get('最晚发货期', ''))
if '付款方式' in new_row: new_row['付款方式'] = row.get('付款比例及期限', '')
if '发货日期' in new_row: new_row['发货日期'] = "" # 初始为空
if '合同币种/美元' in new_row:
new_row['合同币种/美元'] = row.get('合同币种/美元', '')
new_row['_sort_price'] = best_item['sort_price']
return [new_row]
# === [核心] 通用总表聚合行生成逻辑 (用于处理多行CSV聚合) ===
def generate_general_row_aggregated(self, contract_id, group_df, target_cols, trade_type, is_asd, col_factory):
first_row = group_df.iloc[0]
# 直接复用单行处理逻辑,因为核心差异在标的聚合,我们在这里做聚合解析
# 实际上 process_row_general_expanded 已经包含了标的解析和 Top 1 选取
# 但如果是多行CSV记录例如3行CSV对应同一个合同号我们需要把所有标的收集起来排序
all_items = []
for _, row in group_df.iterrows():
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
lines = [line.strip() for line in target_raw.split('\n') if line.strip()]
if lines:
for line in lines:
all_items.append(self.parse_single_line_subject(line))
if not all_items:
all_items.append({'name': '', 'model': '', 'qty': '', 'unit': '', 'price': '', 'sort_price': 0})
all_items.sort(key=lambda x: x['sort_price'], reverse=True)
best_item = all_items[0]
# 构造一个合成的 row大部分信息取 first_row标的信息替换为 best_item
# 为了复用 process_row_general_expanded 的大量字段映射逻辑,我们构造一个 Series
# 但 process_row_general_expanded 内部又会解析一次标的...
# 简便做法:修改 process_row_general_expanded 让它接受 item 参数
# 或者我们在这里手动构造
# 重新利用 process_row_general_expanded 生成骨架,然后修正标的数据
rows = self.process_row_general_expanded(first_row, trade_type, target_cols, col_factory)
final_row = rows[0]
# 修正标的字段为全局最优
final_row['合同标的'] = best_item['name']
if '型号/货号' in final_row: final_row['型号/货号'] = best_item['model']
if '型号' in final_row: final_row['型号'] = best_item['model']
final_row['数量'] = best_item['qty']
final_row['单位'] = best_item['unit']
if '合同额' in final_row: final_row['合同额'] = self.format_money_str(best_item['price'])
final_row['_sort_price'] = best_item['sort_price']
return final_row
# === 明细表处理逻辑 ===
def process_row_detail(self, row, col_factory, trade_type):
if trade_type == '外贸':
target_cols = self.cols_foreign_detail
else:
target_cols = self.cols_domestic_detail
new_row = {col: "" for col in target_cols}
detail_manuf_val = str(row.get(col_factory, ''))
order_no_raw = str(row.get('合同订单编号', '')).strip()
parts_no = order_no_raw.split()
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
new_row['销售员'] = row.get('负责人', '')
new_row['厂家'] = detail_manuf_val
new_row['货号'] = row.get('产品编码', '')
if trade_type == '外贸':
new_row['币种'] = row.get('原币种', '')
else:
new_row['外币币种'] = row.get('原币种', '')
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
if '/' in target_raw:
new_row['合同标的'] = target_raw.split('/')[0].strip()
else:
new_row['合同标的'] = target_raw.split('\n')[0].strip()
csv_qty = str(row.get('数量', '')).strip()
if csv_qty and csv_qty.lower() != 'nan':
new_row['数量'] = csv_qty
val_product_subtotal = self.safe_float(row.get('产品小计', 0))
if '外购' in detail_manuf_val:
new_row['外购'] = self.format_money_str(val_product_subtotal)
remark = str(row.get('备注', '')).strip()
if not remark or remark.lower() == 'nan':
outsourced = str(row.get('外购产品明细', '')).strip()
new_row['产品描述'] = outsourced if outsourced.lower() != 'nan' else ""
else:
new_row['产品描述'] = remark
else:
new_row['外购'] = ""
new_row['产品描述'] = row.get('产品名称', '')
if '外币报价单价' in new_row: new_row['外币报价单价'] = self.format_money_str(row.get('美元报价', ''))
if '报价单价' in new_row: new_row['报价单价'] = self.format_money_str(row.get('美元报价', ''))
if '报价RMB总价' in new_row: new_row['报价RMB总价'] = self.format_money_str(row.get('产品小计', ''))
if '报价总价' in new_row: new_row['报价总价'] = self.format_money_str(row.get('产品小计', ''))
if '计算汇率' in new_row: new_row['计算汇率'] = self.format_percent_str(row.get('汇率', ''))
if '合同币种/美元' in new_row: new_row['合同币种/美元'] = self.format_percent_str(row.get('汇率', ''))
discount_col = '折扣率' if '折扣率' in new_row else '折扣率(%)'
if discount_col in new_row: new_row[discount_col] = self.format_percent_str(row.get('折扣率', ''))
if '售价RMB单价' in new_row: new_row['售价RMB单价'] = self.format_money_str(row.get('销售单价', ''))
if '销售单价' in new_row: new_row['销售单价'] = self.format_money_str(row.get('销售单价', ''))
if '售价RMB总价' in new_row: new_row['售价RMB总价'] = self.format_money_str(row.get('销售总价', ''))
if '销售总价' in new_row: new_row['销售总价'] = self.format_money_str(row.get('销售总价', ''))
new_row['外购转美元'] = self.format_money_str(row.get('外购转美元', ''))
new_row['报价总价美元'] = self.format_money_str(row.get('报价总价美元', ''))
new_row['净合同额美元'] = self.format_money_str(row.get('净合同额美元', ''))
if '报价RMB单价' in new_row: new_row['报价RMB单价'] = self.format_money_str(row.get('报价RMB单价', ''))
return pd.Series(new_row)
# OM表处理 (使用聚合)
def generate_om_row_aggregated(self, contract_id, group_df, target_cols):
first_row = group_df.iloc[0]
all_items = []
for _, row in group_df.iterrows():
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
lines = [line.strip() for line in target_raw.split('\n') if line.strip()]
if lines:
for line in lines:
all_items.append(self.parse_single_line_subject(line))
if not all_items: all_items.append({'name': '', 'price': '', 'sort_price': 0})
all_items.sort(key=lambda x: x['sort_price'], reverse=True)
best_item = all_items[0]
new_row = {col: "" for col in target_cols}
order_no_raw = str(first_row.get('合同订单编号', '')).strip()
parts_no = order_no_raw.split()
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
new_row['内贸合同号'] = " ".join(parts_no[1:]) if len(parts_no) > 1 else ""
total_amount = self.format_money_str(first_row.get('合同总额', ''))
status = str(first_row.get('收款状态', '')).strip()
received = ""
unreceived = ""
if '已收' in status:
received = total_amount
unreceived = self.format_money_str(0)
new_row['签署公司'] = first_row.get('收款账户', '')
new_row['签订日期'] = self.format_date_str(first_row.get('签约日期', ''))
new_row['销售员'] = first_row.get('负责人', '')
new_row['最终用户单位'] = first_row.get('客户名称', '')
contact_col = '最终用户信息\n联系人、电话、邮箱'
if contact_col in new_row: new_row[contact_col] = first_row.get('联系人姓名', '')
buyer_raw = str(first_row.get('合同买方(名称/联系人/电话/邮箱)', ''))
parsed_buyer = self.parse_buyer_info(buyer_raw)
new_row['买方单位'] = parsed_buyer['name']
buyer_info_col = '买方信息\n联系人、电话、邮箱'
if buyer_info_col in new_row: new_row[buyer_info_col] = parsed_buyer['contact_full']
new_row['收款日期'] = self.format_date_str(first_row.get('最新收款日期', ''))
new_row['合同标的'] = best_item['name']
new_row['_sort_price'] = best_item['sort_price']
new_row['合同总额'] = total_amount
new_row['收款情况'] = status
new_row['已收款'] = received
new_row['未收款'] = unreceived
return new_row
def merge_datasets(self, old_dfs, csv_df, is_asd):
col_gen = '厂家'
col_det = '厂家.1' if '厂家.1' in csv_df.columns else '厂家'
if is_asd:
df_subset = csv_df[csv_df[col_gen].str.contains('ASD', case=False, na=False)]
else:
df_subset = csv_df[~csv_df[col_gen].str.contains('ASD', case=False, na=False)]
csv_foreign = df_subset[df_subset['合同类型'] == '外贸'].copy()
csv_domestic = df_subset[df_subset['合同类型'] == '内贸'].copy()
csv_om = df_subset[~df_subset['合同类型'].isin(['外贸', '内贸'])].copy()
result_dfs = {}
def merge_logic_expanded(old_df, new_rows_list, unique_col, target_columns):
if old_df is None or old_df.empty:
if not new_rows_list: return pd.DataFrame(columns=target_columns + ['_status'])
combined = pd.DataFrame(new_rows_list)
combined['_status'] = 'new'
return combined
combined = old_df.copy()
for col in target_columns:
if col not in combined.columns: combined[col] = ""
if '_sort_price' not in combined.columns: combined['_sort_price'] = 0.0
if unique_col in combined.columns:
combined[unique_col] = combined[unique_col].astype(str)
if '_status' not in combined.columns: combined['_status'] = ''
if not new_rows_list: return combined
new_rows_df = pd.DataFrame(new_rows_list)
if unique_col in new_rows_df.columns:
new_rows_df[unique_col] = new_rows_df[unique_col].astype(str)
new_contract_ids = new_rows_df[unique_col].unique()
rows_to_append = []
for cid in new_contract_ids:
new_subset = new_rows_df[new_rows_df[unique_col] == cid]
old_indices = combined[combined[unique_col] == cid].index
if len(old_indices) > 0:
first_old_idx = old_indices[0]
new_first_row = new_subset.iloc[0]
has_changed = False
for col in target_columns:
if col in new_first_row:
new_val = new_first_row[col]
old_val = combined.at[first_old_idx, col]
if str(new_val).strip() != "":
if self.normalize_for_compare(old_val) != self.normalize_for_compare(new_val):
combined.at[first_old_idx, col] = new_val
has_changed = True
if '_sort_price' in new_first_row:
combined.at[first_old_idx, '_sort_price'] = new_first_row['_sort_price']
if has_changed:
combined.at[first_old_idx, '_status'] = 'modified'
else:
new_subset_copy = new_subset.copy()
new_subset_copy['_status'] = 'new'
rows_to_append.append(new_subset_copy)
if rows_to_append:
combined = pd.concat([combined] + rows_to_append, ignore_index=True)
return combined
# --- 1. 外贸总表 (聚合) ---
new_gen_rows = []
target_cols_foreign = self.cols_asd_foreign_general if is_asd else self.cols_nonasd_foreign_general
if not csv_foreign.empty:
grouped = csv_foreign.groupby('合同订单编号')
for contract_id, group in grouped:
row_data = self.generate_general_row_aggregated(contract_id, group, target_cols_foreign, '外贸', is_asd,
col_gen)
new_gen_rows.append(row_data)
old_gen = old_dfs.get('外贸', old_dfs.get('外贸总表', pd.DataFrame(columns=target_cols_foreign)))
result_dfs['外贸'] = merge_logic_expanded(old_gen, new_gen_rows, '合同编号', target_cols_foreign)
# --- 2. 外贸明细 ---
if not csv_foreign.empty:
new_det = csv_foreign.apply(lambda r: self.process_row_detail(r, col_det, '外贸'), axis=1)
else:
new_det = pd.DataFrame(columns=self.cols_foreign_detail)
old_det = old_dfs.get('外贸明细', pd.DataFrame(columns=self.cols_foreign_detail))
result_dfs['外贸明细'] = merge_logic_expanded(old_det, new_det.to_dict('records'), '合同编号',
self.cols_foreign_detail)
# --- 3. 内贸总表 (聚合) ---
new_dom_rows = []
if not csv_domestic.empty:
grouped = csv_domestic.groupby('合同订单编号')
for contract_id, group in grouped:
row_data = self.generate_general_row_aggregated(contract_id, group, self.cols_domestic_general, '内贸',
is_asd, col_gen)
new_dom_rows.append(row_data)
old_dom_gen = old_dfs.get('内贸', old_dfs.get('内贸总表', pd.DataFrame(columns=self.cols_domestic_general)))
result_dfs['内贸'] = merge_logic_expanded(old_dom_gen, new_dom_rows, '合同编号', self.cols_domestic_general)
# --- 4. 内贸明细 ---
if not csv_domestic.empty:
new_dom_det = csv_domestic.apply(lambda r: self.process_row_detail(r, col_det, '内贸'), axis=1)
else:
new_dom_det = pd.DataFrame(columns=self.cols_domestic_detail)
old_dom_det = old_dfs.get('内贸明细', pd.DataFrame(columns=self.cols_domestic_detail))
result_dfs['内贸明细'] = merge_logic_expanded(old_dom_det, new_dom_det.to_dict('records'), '合同编号',
self.cols_domestic_detail)
# --- 5. OM (聚合) ---
new_om_rows = []
if not csv_om.empty:
grouped = csv_om.groupby('合同订单编号')
for contract_id, group in grouped:
row_data = self.generate_om_row_aggregated(contract_id, group, self.cols_om)
new_om_rows.append(row_data)
old_om = old_dfs.get('OM合同', old_dfs.get('其他', pd.DataFrame(columns=self.cols_om)))
result_dfs['OM合同'] = merge_logic_expanded(old_om, new_om_rows, '合同编号', self.cols_om)
return result_dfs
def apply_formatting_to_all(self, data_dict):
for sheet_name, df in data_dict.items():
if df.empty: continue
for col in self.money_cols:
if col in df.columns:
df[col] = df[col].apply(self.format_money_str)
for col in self.percent_cols:
if col in df.columns:
df[col] = df[col].apply(self.format_percent_str)
for col in self.date_cols:
if col in df.columns:
df[col] = df[col].apply(self.format_date_str)
return data_dict
# ==========================================
# 第二部分GUI 界面
# ==========================================
class ContractApp:
def __init__(self, root):
self.root = root
self.root.title("合同数据处理系统 V3.8 (换行符修复版)")
self.root.geometry("1300x850")
self.style = ttk.Style()
self.style.theme_use('clam')
self.colors = {'bg': '#F5F6FA', 'primary': '#409EFF', 'success': '#67C23A', 'warning': '#E6A23C',
'text': '#2C3E50', 'panel': '#FFFFFF'}
self.root.configure(bg=self.colors['bg'])
self.default_font = ("微软雅黑", 10)
self.header_font = ("微软雅黑", 11, "bold")
self.style.configure("TFrame", background=self.colors['bg'])
self.style.configure("Panel.TFrame", background=self.colors['panel'], relief="flat")
self.style.configure("TLabel", background=self.colors['panel'], foreground=self.colors['text'],
font=self.default_font)
self.style.configure("Header.TLabel", font=("微软雅黑", 16, "bold"), background=self.colors['bg'],
foreground=self.colors['text'])
self.style.configure("TButton", font=("微软雅黑", 10), borderwidth=0, padding=6)
self.style.map("TButton", background=[('active', '#E0E0E0')])
self.style.configure("Primary.TButton", background=self.colors['primary'], foreground='white')
self.style.map("Primary.TButton", background=[('active', '#66B1FF')])
self.style.configure("Success.TButton", background=self.colors['success'], foreground='white')
self.style.map("Success.TButton", background=[('active', '#85CE61')])
self.style.configure("Treeview", background="white", foreground="black", fieldbackground="white", rowheight=28,
font=("微软雅黑", 9))
self.style.configure("Treeview.Heading", font=("微软雅黑", 10, "bold"), background="#EBEEF5",
foreground="#606266")
self.style.map("Treeview", background=[('selected', '#409EFF')])
self.processor = DataProcessor()
self.csv_path = tk.StringVar()
self.asd_path = tk.StringVar()
self.non_asd_path = tk.StringVar()
self.final_data = {}
self.create_widgets()
def create_widgets(self):
header_frame = ttk.Frame(self.root)
header_frame.pack(fill="x", padx=20, pady=(20, 10))
ttk.Label(header_frame, text="📄 合同数据处理工具 (支持 OM合同)", style="Header.TLabel").pack(side="left")
input_panel = ttk.Frame(self.root, style="Panel.TFrame", padding=20)
input_panel.pack(fill="x", padx=20, pady=5)
ttk.Label(input_panel, text="文件配置 (若未选择旧文件,将自动生成新文件)", font=self.header_font).grid(row=0,
column=0,
columnspan=3,
sticky="w",
pady=(0,
15))
self.create_file_row(input_panel, "📂 导入 CSV 源文件:", self.csv_path, 1)
self.create_file_row(input_panel, "📘 旧 ASD Excel 文件:", self.asd_path, 2)
self.create_file_row(input_panel, "📗 旧 非ASD Excel 文件:", self.non_asd_path, 3)
btn_frame = ttk.Frame(input_panel, style="Panel.TFrame")
btn_frame.grid(row=4, column=0, columnspan=3, pady=(15, 0), sticky="e")
ttk.Button(btn_frame, text="▶ 开始处理并预览", style="Primary.TButton", command=self.process_files).pack(
side="right")
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(fill="both", expand=True, padx=20, pady=10)
bottom_bar = ttk.Frame(self.root, style="Panel.TFrame", padding=15)
bottom_bar.pack(fill="x", padx=20, pady=(0, 20))
legend_frame = ttk.Frame(bottom_bar, style="Panel.TFrame")
legend_frame.pack(side="left")
self.create_legend(legend_frame, "■ 新增数据", "#FFFFCC", "black")
self.create_legend(legend_frame, "■ 有修改/变动", "#ECF5FF", "#409EFF")
self.create_legend(legend_frame, "□ 无变动", "white", "black")
ttk.Button(bottom_bar, text="💾 保存更改至 Excel", style="Success.TButton", command=self.save_files).pack(
side="right")
def create_file_row(self, parent, label_text, var, row_idx):
ttk.Label(parent, text=label_text, width=20).grid(row=row_idx, column=0, sticky="w", pady=5)
entry = ttk.Entry(parent, textvariable=var, font=("微软雅黑", 9))
entry.grid(row=row_idx, column=1, sticky="ew", padx=10, pady=5)
ttk.Button(parent, text="浏览", command=lambda: self.browse_file(var)).grid(row=row_idx, column=2, padx=5)
parent.columnconfigure(1, weight=1)
def create_legend(self, parent, text, bg_color, fg_color):
lbl = tk.Label(parent, text=text, bg=bg_color, fg=fg_color, font=("微软雅黑", 9), padx=8, pady=3, borderwidth=1,
relief="solid")
lbl.pack(side="left", padx=5)
def browse_file(self, variable):
f = filedialog.askopenfilename(filetypes=[("Excel/CSV Files", "*.csv;*.xlsx")])
if f: variable.set(f)
def load_excel_safe(self, path):
if not path or not os.path.exists(path):
return {}
try:
dfs = pd.read_excel(path, sheet_name=None)
clean_dfs = {}
for k, v in dfs.items():
# [关键修复] 智能表头匹配:重命名表头为标准格式
new_columns = []
for col in v.columns:
clean_col = self.processor.clean_header_key(str(col))
# 尝试在标准映射里找
if clean_col in self.processor.standard_col_map:
new_columns.append(self.processor.standard_col_map[clean_col])
# 尝试在旧映射里找
elif col in self.processor.legacy_map:
new_columns.append(self.processor.legacy_map[col])
else:
new_columns.append(col) # 找不到就保留原样
v.columns = new_columns
# 去重
v = v.loc[:, ~v.columns.duplicated()]
if '合同编号' in v.columns:
v['合同编号'] = v['合同编号'].astype(str)
clean_dfs[k.strip()] = v
return clean_dfs
except Exception as e:
messagebox.showwarning("读取错误", f"读取旧文件失败: {path}\n错误: {str(e)}")
return {}
def process_files(self):
if not self.csv_path.get():
messagebox.showerror("提示", "请先选择 CSV 源文件!")
return
csv_df, headers = self.processor.load_csv(self.csv_path.get())
if csv_df is None:
messagebox.showerror("错误", headers)
return
self.final_data = {}
path_asd = self.asd_path.get()
asd_old = self.load_excel_safe(path_asd)
self.final_data['ASD'] = self.processor.merge_datasets(asd_old, csv_df, True)
path_non = self.non_asd_path.get()
non_old = self.load_excel_safe(path_non)
self.final_data['NonASD'] = self.processor.merge_datasets(non_old, csv_df, False)
self.final_data['ASD'] = self.processor.apply_formatting_to_all(self.final_data['ASD'])
self.final_data['NonASD'] = self.processor.apply_formatting_to_all(self.final_data['NonASD'])
self.refresh_preview()
messagebox.showinfo("完成", "数据处理完成!\n请查看预览,确认无误后点击下方保存。")
def refresh_preview(self):
for tab in self.notebook.tabs():
self.notebook.forget(tab)
for file_type in ['ASD', 'NonASD']:
if file_type not in self.final_data: continue
data_dict = self.final_data[file_type]
main_frame = ttk.Frame(self.notebook, style="Panel.TFrame")
self.notebook.add(main_frame, text=f" {file_type} 文件预览 ")
inner_notebook = ttk.Notebook(main_frame)
inner_notebook.pack(fill="both", expand=True, padx=5, pady=5)
sheet_order = ['外贸', '外贸明细', '内贸', '内贸明细', 'OM合同']
for sheet_name in sheet_order:
if sheet_name in data_dict:
df = data_dict[sheet_name]
if not df.empty:
if '合同编号' in df.columns:
df['合同编号'] = df['合同编号'].astype(str)
sort_cols = ['合同编号']
asc_order = [True]
if '_sort_price' in df.columns:
sort_cols.append('_sort_price')
asc_order.append(False)
df = df.sort_values(by=sort_cols, ascending=asc_order)
if '明细' in sheet_name:
mask = df.duplicated(subset=['合同编号'], keep='first')
df.loc[mask, '合同标的'] = ""
standard_cols = []
is_asd = (file_type == 'ASD')
if sheet_name == '外贸':
standard_cols = self.processor.cols_asd_foreign_general if is_asd else self.processor.cols_nonasd_foreign_general
elif sheet_name == '内贸':
standard_cols = self.processor.cols_domestic_general
elif sheet_name == 'OM合同':
standard_cols = self.processor.cols_om
elif sheet_name == '外贸明细':
standard_cols = self.processor.cols_foreign_detail
elif sheet_name == '内贸明细':
standard_cols = self.processor.cols_domestic_detail
self.create_treeview(inner_notebook, df, sheet_name, standard_cols)
def create_treeview(self, parent, df, title, target_cols):
frame = ttk.Frame(parent)
parent.add(frame, text=title)
scroll_y = ttk.Scrollbar(frame, orient="vertical")
scroll_x = ttk.Scrollbar(frame, orient="horizontal")
# 仅显示标准列
display_cols = target_cols
tree = ttk.Treeview(frame, columns=display_cols, show='headings',
yscrollcommand=scroll_y.set, xscrollcommand=scroll_x.set)
scroll_y.config(command=tree.yview)
scroll_x.config(command=tree.xview)
scroll_y.pack(side="right", fill="y")
scroll_x.pack(side="bottom", fill="x")
tree.pack(fill="both", expand=True)
for col in display_cols:
# 清洗显示名称(换行变空格,防止表头太高)
clean_header = col.replace('\n', ' ')
tree.heading(col, text=clean_header)
tree.column(col, width=120, anchor="center")
tree.tag_configure('new', background='#FFFFCC')
tree.tag_configure('modified', background='#ECF5FF', foreground='#409EFF')
if not df.empty:
df_display = df.fillna("")
last_contract_id = None
for idx, row in df_display.iterrows():
values = []
for col in display_cols:
val = row.get(col, "")
if '明细' in title and col == '合同标的':
current_id = row.get('合同编号', '')
if current_id == last_contract_id:
val = ""
values.append(val)
if '明细' in title:
last_contract_id = row.get('合同编号', '')
status = row.get('_status', '')
tree.insert("", "end", values=values, tags=(status,))
tree.bind("<Double-1>", lambda event: self.on_double_click(event, tree, df))
def on_double_click(self, event, tree, df):
region = tree.identify("region", event.x, event.y)
if region != "cell": return
column = tree.identify_column(event.x)
row_id = tree.identify_row(event.y)
col_idx = int(column.replace('#', '')) - 1
col_name = tree['columns'][col_idx]
current_val = tree.item(row_id, "values")[col_idx]
new_val = simpledialog.askstring("快速编辑", f"修改 [{col_name}]:", initialvalue=current_val, parent=self.root)
if new_val is not None:
current_values = list(tree.item(row_id, "values"))
current_values[col_idx] = new_val
tree.item(row_id, values=current_values)
def save_files(self):
if not self.final_data: return
base_dir = os.path.dirname(self.csv_path.get()) if self.csv_path.get() else ""
try:
for file_type, sheets in self.final_data.items():
target_path = ""
if file_type == 'ASD':
target_path = self.asd_path.get()
if not target_path: target_path = os.path.join(base_dir, "ASD_Combined.xlsx")
elif file_type == 'NonASD':
target_path = self.non_asd_path.get()
if not target_path: target_path = os.path.join(base_dir, "NonASD_Combined.xlsx")
with pd.ExcelWriter(target_path, engine='openpyxl') as writer:
valid_sheets = ['外贸', '外贸明细', '内贸', '内贸明细', 'OM合同']
for sheet_name in valid_sheets:
if sheet_name in sheets:
df = sheets[sheet_name]
if '合同编号' in df.columns:
sort_cols = ['合同编号']
asc_order = [True]
if '_sort_price' in df.columns:
sort_cols.append('_sort_price')
asc_order.append(False)
df = df.sort_values(by=sort_cols, ascending=asc_order)
save_df = df.drop(columns=['_status', '_sort_price'], errors='ignore')
if not save_df.empty:
if '明细' in sheet_name:
mask = save_df.duplicated(subset=['合同编号'], keep='first')
save_df.loc[mask, '合同标的'] = ""
save_df.to_excel(writer, sheet_name=sheet_name, index=False)
messagebox.showinfo("成功", f"文件保存成功!\n位置: {base_dir or '当前目录'}")
except PermissionError:
messagebox.showerror("保存失败", "文件被占用!\n请先关闭 Excel 文件后再点击保存。")
except Exception as e:
messagebox.showerror("保存失败", str(e))
if __name__ == "__main__":
root = tk.Tk()
app = ContractApp(root)
root.mainloop()