Files
2026-01-21 15:24:12 +08:00

1054 lines
51 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, simpledialog
import os
import re
import numpy as np
from datetime import datetime
import traceback
# ==========================================
# 第一部分:业务逻辑核心
# ==========================================
class DataProcessor:
def __init__(self):
# ==================== 表头定义 ====================
self.cols_asd_foreign_general = [
"合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期", "销售员",
"最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地",
"厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率",
"合同额", "总合同额", "外购", "已收款", "未收款", "收款日期",
"最晚发货期", "付款方式", "发货港", "目的港", "发货日期",
"买方单位", "买方信息\n联系人、电话、邮箱", "收货人信息",
"转为美元净合同额", "转为美元总合同额"
]
self.cols_nonasd_foreign_general = [
"合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期", "销售员",
"最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地",
"厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率",
"合同额", "总合同额", "外购", "已收款", "未收款", "收款日期",
"最晚发货期", "付款方式", "发货港", "目的港", "发货日期",
"买方单位", "买方信息\n联系人、电话、邮箱", "收货人信息",
"合同币种/美元", "转为美元净合同额", "转为美元总合同额"
]
self.cols_domestic_general = [
"合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", "销售员",
"最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地",
"买方单位", "买方信息\n联系人、电话、邮箱",
"厂家", "型号", "合同标的", "数量", "单位", "折扣率(%)",
"合同额", "合同总额", "外购", "付款方式", "最晚发货期",
"已收款", "未收款", "收款日期",
"转为美元净合同额", "转为美元总合同额"
]
self.cols_foreign_detail = [
"合同编号", "销售员", "合同标的", "厂家", "货号", "产品描述", "数量", "单位",
"币种", "报价单价", "报价总价", "销售单价", "销售总价", "折扣率",
"外购", "合同币种/美元", "外购转美元", "报价总价美元", "净合同额美元"
]
self.cols_domestic_detail = [
"合同编号", "销售员", "合同标的", "厂家", "货号", "产品描述", "数量", "单位",
"外币币种", "外币报价单价", "报价RMB单价", "报价RMB总价",
"售价RMB单价", "售价RMB总价", "折扣率(%)", "外购",
"计算汇率", "外购转美元", "报价总价美元", "净合同额美元"
]
self.cols_om = [
"合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", "销售员",
"最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地",
"买方单位", "买方信息\n联系人、电话、邮箱", "合同标的",
"合同总额", "已收款", "未收款", "收款日期"
]
# 辅助集合
self.money_cols = set([
"合同额", "总合同额", "合同总额", "外购", "已收款", "未收款",
"净合同额美元", "外购转美元", "报价总价美元",
"外币报价单价", "报价RMB单价", "报价RMB总价",
"售价RMB单价", "售价RMB总价", "外购产品金额",
"转为美元净合同额", "转为美元总合同额", "报价单价", "报价总价", "销售单价", "销售总价"
])
# [修改点] 从这里移除了 "计算汇率" 和 "合同币种/美元",不再强制进行百分比转换
self.percent_cols = set(["折扣率", "折扣率(%)"])
self.date_cols = set(["合同签订日期", "签订日期", "收款日期", "最晚发货期", "发货日期"])
self.legacy_map = {
"外币币种": "币种", "汇率": "计算汇率", "折扣率(%)": "折扣率",
"折扣率(%": "折扣率(%)", "合同": "合同额"
}
self.standard_col_map = {}
all_lists = [
self.cols_asd_foreign_general, self.cols_nonasd_foreign_general,
self.cols_domestic_general, self.cols_foreign_detail,
self.cols_domestic_detail, self.cols_om
]
for lst in all_lists:
for col in lst:
clean_key = self.clean_header_key(col)
self.standard_col_map[clean_key] = col
# --- 工具方法 ---
def clean_header_key(self, text):
if not isinstance(text, str): return str(text)
return re.sub(r'[\s\n\r]+', '', text)
def safe_float(self, val):
try:
if isinstance(val, (int, float)): return float(val)
if isinstance(val, str):
val = val.replace(',', '').replace('¥', '').replace('$', '').strip()
if val == '': return 0.0
if pd.isna(val): return 0.0
return float(val)
except:
return 0.0
def format_money_str(self, val):
if pd.isna(val) or str(val).strip() == "": return ""
try:
f_val = self.safe_float(val)
return "{:.2f}".format(f_val)
except:
return str(val)
def format_percent_str(self, val):
if pd.isna(val) or str(val).strip() == "": return ""
try:
s_val = str(val).strip()
if '%' in s_val: return s_val
f_val = self.safe_float(val)
return "{:.2f}%".format(f_val * 100)
except:
return str(val)
def format_date_str(self, val):
if pd.isna(val) or str(val).strip() == "": return ""
try:
s_val = str(val).strip()
if len(s_val) == 10 and s_val[4] == '-' and s_val[7] == '-': return s_val
dt = pd.to_datetime(val, errors='coerce')
if pd.isnull(dt): return s_val
return dt.strftime('%Y-%m-%d')
except:
return str(val)
def load_multiple_csvs(self, file_paths):
"""支持多 CSV 导入"""
if isinstance(file_paths, str):
paths = [p.strip() for p in file_paths.split(';') if p.strip()]
else:
paths = list(file_paths)
if not paths: return None, "未选择文件"
all_dfs = []
error_msgs = []
col_factory_general = '厂家'
col_factory_detail = '厂家'
for path in paths:
if not os.path.exists(path): continue
df = None
for enc in ['utf-8', 'gbk', 'gb18030']:
try:
df = pd.read_csv(path, encoding=enc)
break
except UnicodeDecodeError:
continue
if df is None:
try:
df = pd.read_csv(path, encoding='gb18030', encoding_errors='replace')
except:
error_msgs.append(f"无法读取: {os.path.basename(path)}")
continue
df['合同类型'] = df['合同类型'].fillna('').astype(str)
if '厂家.1' in df.columns: col_factory_detail = '厂家.1'
all_dfs.append(df)
if not all_dfs: return None, "\n".join(error_msgs) if error_msgs else "没有有效的数据文件"
try:
final_df = pd.concat(all_dfs, ignore_index=True)
except Exception as e:
return None, f"文件合并失败: {str(e)}"
col_factory_detail = '厂家.1' if '厂家.1' in final_df.columns else '厂家'
final_df[col_factory_general] = final_df[col_factory_general].fillna('').astype(str)
if '合同订单编号' in final_df.columns:
final_df['原始_合同订单编号'] = final_df['合同订单编号'].astype(str).str.strip()
final_df['Clean_ID'] = final_df['原始_合同订单编号'].apply(lambda x: x.split()[0] if x else "")
return final_df, (col_factory_general, col_factory_detail)
def parse_buyer_info(self, text):
info = {'name': '', 'contact_full': ''}
if not isinstance(text, str) or not text.strip(): return info
lines = [l.strip() for l in text.split('\n') if l.strip()]
if not lines: return info
info['name'] = lines[0]
info['contact_full'] = " ".join(lines[1:])
return info
def parse_single_line_subject(self, text):
res = {'name': '', 'model': '', 'qty': '', 'unit': '', 'price': '', 'sort_price': 0.0}
if not isinstance(text, str) or not text.strip(): return res
text = text.strip()
# Model 格式解析
if re.match(r'^Model[:]', text, re.IGNORECASE):
m = re.match(
r'Model[:]\s*(.+?)\s+([a-zA-Z\u4e00-\u9fa5]+)\s+(\d+(?:\.\d+)?)\s+(\d+(?:\.\d+)?)\s+(\d+(?:\.\d+)?)',
text, re.IGNORECASE)
if m:
res['name'] = m.group(1).strip()
res['model'] = m.group(1).strip()
res['unit'] = m.group(2)
res['qty'] = m.group(3)
res['price'] = m.group(4)
res['sort_price'] = self.safe_float(m.group(4))
return res
else:
m2 = re.match(r'Model[:]\s*(.+?)\s+(\d+(?:\.\d+)?)\s+(\d+(?:\.\d+)?)\s+(\d+(?:\.\d+)?)', text,
re.IGNORECASE)
if m2:
res['name'] = m2.group(1).strip()
res['model'] = m2.group(1).strip()
res['qty'] = m2.group(2)
res['price'] = m2.group(3)
res['sort_price'] = self.safe_float(m2.group(3))
return res
if '/' in text:
parts = [p.strip() for p in text.split('/')]
if len(parts) >= 1: res['name'] = parts[0]
if len(parts) >= 2: res['model'] = parts[1]
if len(parts) >= 3:
m_qty = re.match(r'^(\d+(\.\d+)?)\s*([\u4e00-\u9fa5a-zA-Z]+)?$', parts[2])
if m_qty:
res['qty'] = m_qty.group(1)
res['unit'] = m_qty.group(3) if m_qty.group(3) else ""
else:
res['qty'] = parts[2]
if len(parts) >= 4:
res['price'] = parts[3]
res['sort_price'] = self.safe_float(parts[3])
return res
name_match = re.search(r'(?:中文品名|中文名称|名称|Name)[:]\s*(.*?)(?:\n|$)', text, re.IGNORECASE)
if name_match:
res['name'] = name_match.group(1).strip()
else:
res['name'] = text.split('\n')[0]
nums = re.findall(r'\d+(?:\.\d+)?', text.replace(',', ''))
if nums:
res['sort_price'] = self.safe_float(nums[-1])
res['price'] = nums[-1]
return res
# ==========================================
# 数据准备函数 (Prepare Functions)
# ==========================================
def prepare_new_data_general(self, csv_df, trade_type, target_cols, col_factory):
if csv_df.empty: return pd.DataFrame(columns=target_cols)
def extract_items(row):
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
lines = [line.strip() for line in target_raw.split('\n') if line.strip()]
items = []
if not lines:
items.append(self.parse_single_line_subject(""))
else:
for line in lines: items.append(self.parse_single_line_subject(line))
return items
parsed_series = csv_df.apply(extract_items, axis=1)
expanded_data = []
for idx, row in enumerate(csv_df.itertuples(index=False)):
items = parsed_series.iloc[idx]
r_dict = csv_df.iloc[idx].to_dict()
for item in items:
row_base = {
'Clean_ID': r_dict.get('Clean_ID', ''),
'原始_合同订单编号': r_dict.get('原始_合同订单编号', ''),
'收款账户': r_dict.get('收款账户', ''),
'签约日期': r_dict.get('签约日期', ''),
'负责人': r_dict.get('负责人', ''),
'客户名称': r_dict.get('客户名称', ''),
'联系人姓名': r_dict.get('联系人姓名', ''),
'合同买方_Raw': r_dict.get('合同买方(名称/联系人/电话/邮箱)', ''),
'进口代理_Raw': r_dict.get('进口代理(名称/USCI/地址/联系人/电话/邮箱)', ''),
'厂家_Val': r_dict.get(col_factory, ''),
'货币': r_dict.get('货币(选完产品再改)', ''),
'发货地': r_dict.get('发货地', ''),
'目的港': r_dict.get('目的港', ''),
'折扣率': r_dict.get('折扣率', ''),
'合同总额': r_dict.get('合同总额', ''),
'收款状态': r_dict.get('收款状态', ''),
'外购产品金额': r_dict.get('外购产品金额', ''),
'最新收款日期': r_dict.get('最新收款日期', ''),
'最晚发货期': r_dict.get('最晚发货期', ''),
'付款比例及期限': r_dict.get('付款比例及期限', ''),
'合同币种/美元': r_dict.get('合同币种/美元', ''),
'_item_name': item['name'],
'_item_model': item['model'],
'_item_qty': item['qty'],
'_item_unit': item['unit'],
'_item_price': item['price'],
'_sort_price': item['sort_price']
}
expanded_data.append(row_base)
df_expanded = pd.DataFrame(expanded_data)
if df_expanded.empty: return pd.DataFrame(columns=target_cols)
# 排序去重:取金额最大
df_expanded.sort_values(by=['Clean_ID', '_sort_price'], ascending=[True, False], inplace=True)
df_unique = df_expanded.drop_duplicates(subset=['Clean_ID'], keep='first').copy()
result = pd.DataFrame(index=df_unique.index)
parts = df_unique['原始_合同订单编号'].str.split(n=1, expand=True)
result['合同编号'] = parts[0]
contract_no_col = '外贸合同号' if trade_type == '外贸' else '内贸合同号'
result[contract_no_col] = parts[1] if parts.shape[1] > 1 else ""
result['签署公司'] = df_unique['收款账户']
result['合同签订日期'] = df_unique['签约日期'].apply(self.format_date_str)
if '签订日期' in target_cols: result['签订日期'] = result['合同签订日期']
result['销售员'] = df_unique['负责人']
result['最终用户单位'] = df_unique['客户名称']
if any("最终用户信息" in c for c in target_cols):
col_name = next(c for c in target_cols if "最终用户信息" in c)
result[col_name] = df_unique['联系人姓名']
def get_buyer_info(row):
raw = row['进口代理_Raw'] if trade_type == '外贸' and str(row['进口代理_Raw']) not in ['', 'nan'] else row[
'合同买方_Raw']
return self.parse_buyer_info(str(raw))
buyer_infos = df_unique.apply(get_buyer_info, axis=1)
result['买方单位'] = [x['name'] for x in buyer_infos]
if any("买方信息" in c for c in target_cols):
col_name = next(c for c in target_cols if "买方信息" in c)
result[col_name] = [x['contact_full'] for x in buyer_infos]
if '收货人信息' in target_cols: result['收货人信息'] = result['买方单位']
result['厂家'] = df_unique['厂家_Val']
if '币种' in target_cols: result['币种'] = df_unique['货币']
if '发货港' in target_cols: result['发货港'] = df_unique['发货地']
if '目的港' in target_cols: result['目的港'] = df_unique['目的港']
discount_col = '折扣率' if '折扣率' in target_cols else '折扣率(%)'
result[discount_col] = df_unique['折扣率'].apply(self.format_percent_str)
result['合同标的'] = df_unique['_item_name']
if '型号/货号' in target_cols: result['型号/货号'] = df_unique['_item_model']
if '型号' in target_cols: result['型号'] = df_unique['_item_model']
result['数量'] = df_unique['_item_qty']
result['单位'] = df_unique['_item_unit']
if '合同额' in target_cols: result['合同额'] = df_unique['_item_price'].apply(self.format_money_str)
total_col_name = '总合同额' if '总合同额' in target_cols else '合同总额'
result[total_col_name] = df_unique['合同总额'].apply(self.format_money_str)
result['收款情况'] = df_unique['收款状态'].fillna('').astype(str).str.strip()
is_received = result['收款情况'].str.contains('已收')
result['已收款'] = ""
result['未收款'] = ""
result.loc[is_received, '已收款'] = result.loc[is_received, total_col_name]
result.loc[is_received, '未收款'] = "0.00"
result['外购'] = df_unique['外购产品金额'].apply(self.format_money_str)
result['收款日期'] = df_unique['最新收款日期'].apply(self.format_date_str)
if '最晚发货期' in target_cols: result['最晚发货期'] = df_unique['最晚发货期'].apply(self.format_date_str)
if '付款方式' in target_cols: result['付款方式'] = df_unique['付款比例及期限']
if '发货日期' in target_cols: result['发货日期'] = ""
# [修改点] 这里只取 raw value后续也不格式化
if '合同币种/美元' in target_cols:
# 确保转为字符串,避免 float 警告
result['合同币种/美元'] = df_unique['合同币种/美元'].fillna("").astype(str)
result['_sort_price'] = df_unique['_sort_price']
for col in target_cols:
if col not in result.columns: result[col] = ""
return result[target_cols + ['_sort_price']]
def prepare_new_data_detail(self, csv_df, trade_type, target_cols, col_factory):
if csv_df.empty: return pd.DataFrame(columns=target_cols)
new_rows = csv_df.apply(lambda r: self.process_row_detail_single(r, col_factory, trade_type), axis=1)
if isinstance(new_rows, pd.Series): new_rows = pd.DataFrame([new_rows])
for col in target_cols:
if col not in new_rows.columns: new_rows[col] = ""
return new_rows[target_cols]
def process_row_detail_single(self, row, col_factory, trade_type):
if trade_type == '外贸':
target_cols = self.cols_foreign_detail
else:
target_cols = self.cols_domestic_detail
new_row = {col: "" for col in target_cols}
detail_manuf_val = str(row.get(col_factory, ''))
raw_order_no = str(row.get('原始_合同订单编号', row.get('合同订单编号', ''))).strip()
parts_no = raw_order_no.split()
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else raw_order_no
new_row['销售员'] = row.get('负责人', '')
new_row['厂家'] = detail_manuf_val
new_row['货号'] = row.get('产品编码', '')
if trade_type == '外贸':
new_row['币种'] = row.get('原币种', '')
else:
new_row['外币币种'] = row.get('原币种', '')
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
if '/' in target_raw:
new_row['合同标的'] = target_raw.split('/')[0].strip()
else:
new_row['合同标的'] = target_raw.split('\n')[0].strip()
csv_qty = str(row.get('数量', '')).strip()
if csv_qty and csv_qty.lower() != 'nan': new_row['数量'] = csv_qty
val_product_subtotal = self.safe_float(row.get('产品小计', 0))
if '外购' in detail_manuf_val:
new_row['外购'] = self.format_money_str(val_product_subtotal)
remark = str(row.get('备注', '')).strip()
if not remark or remark.lower() == 'nan':
outsourced = str(row.get('外购产品明细', '')).strip()
new_row['产品描述'] = outsourced if outsourced.lower() != 'nan' else ""
else:
new_row['产品描述'] = remark
else:
new_row['外购'] = ""
new_row['产品描述'] = row.get('产品名称', '')
if '外币报价单价' in new_row: new_row['外币报价单价'] = self.format_money_str(row.get('美元报价', ''))
if '报价单价' in new_row: new_row['报价单价'] = self.format_money_str(row.get('美元报价', ''))
if '报价RMB总价' in new_row: new_row['报价RMB总价'] = self.format_money_str(row.get('产品小计', ''))
if '报价总价' in new_row: new_row['报价总价'] = self.format_money_str(row.get('产品小计', ''))
# [修改点] 汇率不格式化,只转字符串
rate_val = str(row.get('汇率', '')).strip()
if rate_val.lower() == 'nan': rate_val = ""
if '计算汇率' in new_row: new_row['计算汇率'] = rate_val
if '合同币种/美元' in new_row: new_row['合同币种/美元'] = rate_val
discount_col = '折扣率' if '折扣率' in new_row else '折扣率(%)'
if discount_col in new_row: new_row[discount_col] = self.format_percent_str(row.get('折扣率', ''))
if '售价RMB单价' in new_row: new_row['售价RMB单价'] = self.format_money_str(row.get('销售单价', ''))
if '销售单价' in new_row: new_row['销售单价'] = self.format_money_str(row.get('销售单价', ''))
if '售价RMB总价' in new_row: new_row['售价RMB总价'] = self.format_money_str(row.get('销售总价', ''))
if '销售总价' in new_row: new_row['销售总价'] = self.format_money_str(row.get('销售总价', ''))
new_row['外购转美元'] = self.format_money_str(row.get('外购转美元', ''))
new_row['报价总价美元'] = self.format_money_str(row.get('报价总价美元', ''))
new_row['净合同额美元'] = self.format_money_str(row.get('净合同额美元', ''))
if '报价RMB单价' in new_row: new_row['报价RMB单价'] = self.format_money_str(row.get('报价RMB单价', ''))
return pd.Series(new_row)
def prepare_new_data_om(self, csv_df, target_cols):
if csv_df.empty: return pd.DataFrame(columns=target_cols)
def extract_items(row):
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
lines = [line.strip() for line in target_raw.split('\n') if line.strip()]
items = []
if not lines:
items.append(self.parse_single_line_subject(""))
else:
for line in lines: items.append(self.parse_single_line_subject(line))
return items
parsed_series = csv_df.apply(extract_items, axis=1)
expanded_data = []
for idx, row in enumerate(csv_df.itertuples(index=False)):
r_dict = csv_df.iloc[idx].to_dict()
items = parsed_series.iloc[idx]
for item in items:
row_base = {
'Clean_ID': r_dict.get('Clean_ID', ''),
'原始_合同订单编号': r_dict.get('原始_合同订单编号', ''),
'收款账户': r_dict.get('收款账户', ''),
'签约日期': r_dict.get('签约日期', ''),
'负责人': r_dict.get('负责人', ''),
'客户名称': r_dict.get('客户名称', ''),
'联系人姓名': r_dict.get('联系人姓名', ''),
'合同买方_Raw': r_dict.get('合同买方(名称/联系人/电话/邮箱)', ''),
'合同总额': r_dict.get('合同总额', ''),
'收款状态': r_dict.get('收款状态', ''),
'最新收款日期': r_dict.get('最新收款日期', ''),
'_item_name': item['name'],
'_sort_price': item['sort_price']
}
expanded_data.append(row_base)
df_expanded = pd.DataFrame(expanded_data)
if df_expanded.empty: return pd.DataFrame(columns=target_cols)
df_expanded.sort_values(by=['Clean_ID', '_sort_price'], ascending=[True, False], inplace=True)
df_unique = df_expanded.drop_duplicates(subset=['Clean_ID'], keep='first').copy()
result = pd.DataFrame(index=df_unique.index)
parts = df_unique['原始_合同订单编号'].str.split(n=1, expand=True)
result['合同编号'] = parts[0]
result['内贸合同号'] = parts[1] if parts.shape[1] > 1 else ""
result['合同总额'] = df_unique['合同总额'].apply(self.format_money_str)
result['收款情况'] = df_unique['收款状态'].fillna('').astype(str).str.strip()
is_received = result['收款情况'].str.contains('已收')
result['已收款'] = ""
result['未收款'] = ""
result.loc[is_received, '已收款'] = result.loc[is_received, '合同总额']
result.loc[is_received, '未收款'] = "0.00"
result['签署公司'] = df_unique['收款账户']
result['签订日期'] = df_unique['签约日期'].apply(self.format_date_str)
result['销售员'] = df_unique['负责人']
result['最终用户单位'] = df_unique['客户名称']
if '最终用户信息\n联系人、电话、邮箱' in target_cols:
result['最终用户信息\n联系人、电话、邮箱'] = df_unique['联系人姓名']
buyer_infos = df_unique['合同买方_Raw'].astype(str).apply(self.parse_buyer_info)
result['买方单位'] = [x['name'] for x in buyer_infos]
if '买方信息\n联系人、电话、邮箱' in target_cols:
result['买方信息\n联系人、电话、邮箱'] = [x['contact_full'] for x in buyer_infos]
result['收款日期'] = df_unique['最新收款日期'].apply(self.format_date_str)
result['合同标的'] = df_unique['_item_name']
result['_sort_price'] = df_unique['_sort_price']
return result[target_cols + ['_sort_price']]
# ==========================================
# 核心优化:智能防覆盖 + 消除类型警告
# ==========================================
def merge_datasets(self, old_dfs, csv_df, is_asd):
col_gen = '厂家'
col_det = '厂家.1' if '厂家.1' in csv_df.columns else '厂家'
if is_asd:
df_subset = csv_df[csv_df[col_gen].str.contains('ASD', case=False, na=False)]
else:
df_subset = csv_df[~csv_df[col_gen].str.contains('ASD', case=False, na=False)]
csv_foreign = df_subset[df_subset['合同类型'] == '外贸'].copy()
csv_domestic = df_subset[df_subset['合同类型'] == '内贸'].copy()
csv_om = df_subset[~df_subset['合同类型'].isin(['外贸', '内贸'])].copy()
result_dfs = {}
def vectorized_merge(old_df, new_df, unique_col, target_columns):
if new_df.empty:
if old_df is None or old_df.empty: return pd.DataFrame(columns=target_columns + ['_status'])
old_df['_status'] = ''
return old_df
for col in target_columns:
if col not in new_df.columns: new_df[col] = ""
if '_sort_price' not in new_df.columns: new_df['_sort_price'] = 0.0
if old_df is None or old_df.empty:
combined = new_df.copy()
combined['_status'] = 'new'
return combined
old_df = old_df.copy()
if unique_col not in old_df.columns: old_df[unique_col] = ""
if '_status' not in old_df.columns: old_df['_status'] = ''
# === 消除 FutureWarning 核心 ===
# 将旧数据中所有目标列强制转换为 object (字符串/混合),防止 float/int 写入 str 报错
for col in target_columns:
if col in old_df.columns:
old_df[col] = old_df[col].astype(object)
# === 总表 (ID 唯一) - 智能字段级更新 ===
is_unique_index = (old_df[unique_col].duplicated().sum() == 0) and (
new_df[unique_col].duplicated().sum() == 0)
if is_unique_index:
old_df.set_index(unique_col, inplace=True, drop=False)
new_df.set_index(unique_col, inplace=True, drop=False)
# 1. 纯新增行
new_ids = new_df.index.difference(old_df.index)
rows_new = new_df.loc[new_ids].copy()
rows_new['_status'] = 'new'
# 2. 共有行:逐列智能检查
common_ids = new_df.index.intersection(old_df.index)
if not common_ids.empty:
for col in target_columns:
new_vals = new_df.loc[common_ids, col].astype(str).str.strip()
old_vals = old_df.loc[common_ids, col].fillna("").astype(str).str.strip()
# 防覆盖核心:
# 1. 新数据非空
# 2. 新旧不一致
valid_new_mask = (new_vals != "") & (new_vals != "nan") & (new_vals != "None")
diff_mask = valid_new_mask & (new_vals != old_vals)
ids_to_update = diff_mask[diff_mask].index
if not ids_to_update.empty:
old_df.loc[ids_to_update, col] = new_df.loc[ids_to_update, col]
old_df.loc[ids_to_update, '_status'] = 'modified'
old_df.loc[common_ids, '_sort_price'] = new_df.loc[common_ids, '_sort_price']
old_df.reset_index(drop=True, inplace=True)
rows_new.reset_index(drop=True, inplace=True)
final_df = pd.concat([old_df, rows_new], ignore_index=True)
return final_df
else:
# === 明细表 (ID 不唯一) - 增量追加 ===
new_ids = set(new_df[unique_col]) - set(old_df[unique_col])
rows_to_add = new_df[new_df[unique_col].isin(new_ids)].copy()
rows_to_add['_status'] = 'new'
final_df = pd.concat([old_df, rows_to_add], ignore_index=True)
return final_df
target_cols_foreign = self.cols_asd_foreign_general if is_asd else self.cols_nonasd_foreign_general
old_gen = old_dfs.get('外贸', pd.DataFrame(columns=target_cols_foreign))
new_gen_df = self.prepare_new_data_general(csv_foreign, '外贸', target_cols_foreign, col_gen)
result_dfs['外贸'] = vectorized_merge(old_gen, new_gen_df, '合同编号', target_cols_foreign)
old_det = old_dfs.get('外贸明细', pd.DataFrame(columns=self.cols_foreign_detail))
new_det_df = self.prepare_new_data_detail(csv_foreign, '外贸', self.cols_foreign_detail, col_det)
result_dfs['外贸明细'] = vectorized_merge(old_det, new_det_df, '合同编号', self.cols_foreign_detail)
old_dom_gen = old_dfs.get('内贸', pd.DataFrame(columns=self.cols_domestic_general))
new_dom_df = self.prepare_new_data_general(csv_domestic, '内贸', self.cols_domestic_general, col_gen)
result_dfs['内贸'] = vectorized_merge(old_dom_gen, new_dom_df, '合同编号', self.cols_domestic_general)
old_dom_det = old_dfs.get('内贸明细', pd.DataFrame(columns=self.cols_domestic_detail))
new_dom_det_df = self.prepare_new_data_detail(csv_domestic, '内贸', self.cols_domestic_detail, col_det)
result_dfs['内贸明细'] = vectorized_merge(old_dom_det, new_dom_det_df, '合同编号', self.cols_domestic_detail)
old_om = old_dfs.get('OM合同', pd.DataFrame(columns=self.cols_om))
new_om_df = self.prepare_new_data_om(csv_om, self.cols_om)
result_dfs['OM合同'] = vectorized_merge(old_om, new_om_df, '合同编号', self.cols_om)
return result_dfs
def apply_formatting_to_all(self, data_dict):
for sheet_name, df in data_dict.items():
if df.empty: continue
for col in self.money_cols:
if col in df.columns: df[col] = df[col].apply(self.format_money_str)
for col in self.percent_cols:
if col in df.columns: df[col] = df[col].apply(self.format_percent_str)
for col in self.date_cols:
if col in df.columns: df[col] = df[col].apply(self.format_date_str)
return data_dict
# ==========================================
# 第二部分GUI 界面 (布局修复 + 逻辑修复)
# ==========================================
class ContractApp:
def __init__(self, root):
self.root = root
self.root.title("合同数据处理系统 V4.6 (布局与类型警告修复版)")
self.root.geometry("1300x850")
# 允许窗口调整大小,但最小尺寸有限制
self.root.minsize(1000, 700)
self.colors = {
'bg': '#F0F2F5',
'panel': '#FFFFFF',
'primary': '#1890FF',
'primary_hover': '#40A9FF',
'success': '#52C41A',
'success_hover': '#73D13D',
'text_main': '#262626',
'text_sub': '#8C8C8C',
'border': '#D9D9D9',
'tag_new': '#FFFBE6',
'tag_mod': '#E6F7FF'
}
self.setup_styles()
self.processor = DataProcessor()
self.csv_paths = tk.StringVar()
self.asd_path = tk.StringVar()
self.non_asd_path = tk.StringVar()
self.status_var = tk.StringVar(value="准备就绪")
self.final_data = {}
self.create_widgets()
def setup_styles(self):
self.style = ttk.Style()
self.style.theme_use('clam')
self.style.configure("TFrame", background=self.colors['bg'])
self.style.configure("Panel.TFrame", background=self.colors['panel'], relief="flat")
self.style.configure("TLabel", background=self.colors['panel'], foreground=self.colors['text_main'],
font=("Microsoft YaHei UI", 10))
self.style.configure("Header.TLabel", font=("Microsoft YaHei UI", 18, "bold"), background=self.colors['bg'],
foreground=self.colors['text_main'])
self.style.configure("SubHeader.TLabel", font=("Microsoft YaHei UI", 12, "bold"),
background=self.colors['panel'], foreground=self.colors['text_main'])
self.style.configure("Status.TLabel", background=self.colors['bg'], foreground=self.colors['text_sub'],
font=("Microsoft YaHei UI", 9))
self.style.configure("TButton", font=("Microsoft YaHei UI", 10), borderwidth=0, padding=8)
self.style.map("TButton", background=[('active', '#E0E0E0')], relief=[('pressed', 'sunken')])
self.style.configure("Primary.TButton", background=self.colors['primary'], foreground='white')
self.style.map("Primary.TButton", background=[('active', self.colors['primary_hover'])])
self.style.configure("Success.TButton", background=self.colors['success'], foreground='white')
self.style.map("Success.TButton", background=[('active', self.colors['success_hover'])])
self.style.configure("TEntry", fieldbackground="white", padding=5)
self.style.configure("Treeview", background="white", foreground=self.colors['text_main'], rowheight=30,
font=("Microsoft YaHei UI", 9), fieldbackground="white")
self.style.configure("Treeview.Heading", font=("Microsoft YaHei UI", 10, "bold"), background="#FAFAFA",
foreground=self.colors['text_main'], relief="flat")
self.style.map("Treeview", background=[('selected', self.colors['primary_hover'])],
foreground=[('selected', 'white')])
def create_widgets(self):
# 1. 顶部 Header (Pack Top)
header_frame = ttk.Frame(self.root)
header_frame.pack(side="top", fill="x", padx=25, pady=(25, 10))
ttk.Label(header_frame, text="🚀 合同数据智能处理系统", style="Header.TLabel").pack(side="left")
# 2. 底部按钮栏 (Pack Bottom 优先! 确保永远可见)
bottom_bar = ttk.Frame(self.root, style="Panel.TFrame", padding=15)
bottom_bar.pack(side="bottom", fill="x", padx=25, pady=(0, 25))
legend_frame = ttk.Frame(bottom_bar, style="Panel.TFrame")
legend_frame.pack(side="left")
self.create_legend(legend_frame, "● 新增数据", self.colors['tag_new'], "#D48806")
self.create_legend(legend_frame, "● 发生变更", self.colors['tag_mod'], self.colors['primary'])
ttk.Label(bottom_bar, textvariable=self.status_var, style="Status.TLabel").pack(side="left", padx=20)
ttk.Button(bottom_bar, text="💾 确认无误,保存写入", style="Success.TButton", command=self.save_files).pack(
side="right")
# 3. 输入面板 (Pack Top, under header)
input_panel = ttk.Frame(self.root, style="Panel.TFrame", padding=25)
input_panel.pack(side="top", fill="x", padx=25, pady=5)
ttk.Label(input_panel, text="文件配置与导入", style="SubHeader.TLabel").grid(row=0, column=0, columnspan=3,
sticky="w", pady=(0, 20))
self.create_file_row(input_panel, "📂 导入 CSV 源文件 (支持多选):", self.csv_paths, 1, is_multiple=True)
self.create_file_row(input_panel, "📘 旧 ASD Excel 文件:", self.asd_path, 2)
self.create_file_row(input_panel, "📗 旧 非ASD Excel 文件:", self.non_asd_path, 3)
btn_frame = ttk.Frame(input_panel, style="Panel.TFrame")
btn_frame.grid(row=4, column=0, columnspan=3, pady=(20, 0), sticky="e")
ttk.Button(btn_frame, text="▶ 开始极速处理 (仅预览)", style="Primary.TButton",
command=self.process_files).pack(side="right")
# 4. 中间预览区域 (Pack Fill Both, Expand True) - 填充剩余所有空间
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(side="top", fill="both", expand=True, padx=25, pady=15)
def create_file_row(self, parent, label_text, var, row_idx, is_multiple=False):
lbl = ttk.Label(parent, text=label_text, width=28)
lbl.grid(row=row_idx, column=0, sticky="w", pady=8)
entry = ttk.Entry(parent, textvariable=var, font=("Microsoft YaHei UI", 9))
entry.grid(row=row_idx, column=1, sticky="ew", padx=10, pady=8)
btn = ttk.Button(parent, text="浏览...", width=8, command=lambda: self.browse_file(var, is_multiple))
btn.grid(row=row_idx, column=2, padx=5)
parent.columnconfigure(1, weight=1)
def create_legend(self, parent, text, bg_color, fg_color):
f = tk.Frame(parent, bg=bg_color, padx=10, pady=4)
f.pack(side="left", padx=5)
tk.Label(f, text=text, bg=bg_color, fg=fg_color, font=("Microsoft YaHei UI", 9, "bold")).pack()
def browse_file(self, variable, is_multiple=False):
if is_multiple:
files = filedialog.askopenfilenames(filetypes=[("CSV Files", "*.csv")])
if files: variable.set("; ".join(files))
else:
f = filedialog.askopenfilename(filetypes=[("Excel/CSV Files", "*.csv;*.xlsx")])
if f: variable.set(f)
def load_excel_safe(self, path):
if not path or not os.path.exists(path): return {}
try:
dfs = pd.read_excel(path, sheet_name=None)
clean_dfs = {}
for k, v in dfs.items():
new_columns = []
for col in v.columns:
clean_col = self.processor.clean_header_key(str(col))
if clean_col in self.processor.standard_col_map:
new_columns.append(self.processor.standard_col_map[clean_col])
elif col in self.processor.legacy_map:
new_columns.append(self.processor.legacy_map[col])
else:
new_columns.append(col)
v.columns = new_columns
v = v.loc[:, ~v.columns.duplicated()]
if '合同编号' in v.columns: v['合同编号'] = v['合同编号'].astype(str)
clean_dfs[k.strip()] = v
return clean_dfs
except Exception as e:
messagebox.showwarning("读取错误", f"读取旧文件失败: {path}\n错误: {str(e)}")
return {}
def process_files(self):
csv_paths_str = self.csv_paths.get()
if not csv_paths_str:
messagebox.showerror("提示", "请先选择 CSV 源文件!")
return
self.status_var.set("⏳ 正在读取多个数据源...")
self.root.update()
try:
csv_df, headers_or_msg = self.processor.load_multiple_csvs(csv_paths_str)
if csv_df is None:
messagebox.showerror("读取错误", headers_or_msg)
return
self.status_var.set("🚀 正在极速合并数据...")
self.root.update()
self.final_data = {}
path_asd = self.asd_path.get()
asd_old = self.load_excel_safe(path_asd)
self.final_data['ASD'] = self.processor.merge_datasets(asd_old, csv_df, True)
path_non = self.non_asd_path.get()
non_old = self.load_excel_safe(path_non)
self.final_data['NonASD'] = self.processor.merge_datasets(non_old, csv_df, False)
self.final_data['ASD'] = self.processor.apply_formatting_to_all(self.final_data['ASD'])
self.final_data['NonASD'] = self.processor.apply_formatting_to_all(self.final_data['NonASD'])
self.refresh_preview()
self.status_var.set("✅ 预览已生成。确认无误后请点击右下角保存!")
messagebox.showinfo("完成",
"数据预览已生成!\n\n注意:此时尚未写入文件。\n请在下方检查数据,确认无误后点击 [保存] 按钮。")
except Exception as e:
self.status_var.set("❌ 发生错误")
traceback.print_exc()
messagebox.showerror("运行错误", str(e))
def refresh_preview(self):
for tab in self.notebook.tabs():
self.notebook.forget(tab)
for file_type in ['ASD', 'NonASD']:
if file_type not in self.final_data: continue
data_dict = self.final_data[file_type]
main_frame = ttk.Frame(self.notebook, style="Panel.TFrame", padding=10)
self.notebook.add(main_frame, text=f" {file_type} 预览 ")
inner_notebook = ttk.Notebook(main_frame)
inner_notebook.pack(fill="both", expand=True)
sheet_order = ['外贸', '外贸明细', '内贸', '内贸明细', 'OM合同']
for sheet_name in sheet_order:
if sheet_name in data_dict:
df = data_dict[sheet_name]
if not df.empty:
if '合同编号' in df.columns:
df['合同编号'] = df['合同编号'].astype(str)
sort_cols = ['合同编号']
asc_order = [True]
if '_sort_price' in df.columns:
sort_cols.append('_sort_price')
asc_order.append(False)
df = df.sort_values(by=sort_cols, ascending=asc_order)
if '明细' in sheet_name:
mask = df.duplicated(subset=['合同编号'], keep='first')
df.loc[mask, '合同标的'] = ""
standard_cols = []
is_asd = (file_type == 'ASD')
if sheet_name == '外贸':
standard_cols = self.processor.cols_asd_foreign_general if is_asd else self.processor.cols_nonasd_foreign_general
elif sheet_name == '内贸':
standard_cols = self.processor.cols_domestic_general
elif sheet_name == 'OM合同':
standard_cols = self.processor.cols_om
elif sheet_name == '外贸明细':
standard_cols = self.processor.cols_foreign_detail
elif sheet_name == '内贸明细':
standard_cols = self.processor.cols_domestic_detail
self.create_treeview(inner_notebook, df, sheet_name, standard_cols)
def create_treeview(self, parent, df, title, target_cols):
frame = ttk.Frame(parent)
parent.add(frame, text=f" {title} ")
scroll_y = ttk.Scrollbar(frame, orient="vertical")
scroll_x = ttk.Scrollbar(frame, orient="horizontal")
tree = ttk.Treeview(frame, columns=target_cols, show='headings',
yscrollcommand=scroll_y.set, xscrollcommand=scroll_x.set)
scroll_y.config(command=tree.yview)
scroll_x.config(command=tree.xview)
scroll_y.pack(side="right", fill="y")
scroll_x.pack(side="bottom", fill="x")
tree.pack(fill="both", expand=True)
for col in target_cols:
clean_header = col.replace('\n', ' ')
tree.heading(col, text=clean_header)
tree.column(col, width=130, anchor="center")
tree.tag_configure('new', background=self.colors['tag_new'])
tree.tag_configure('modified', background=self.colors['tag_mod'], foreground=self.colors['primary'])
tree.tag_configure('odd', background='white')
tree.tag_configure('even', background='#FAFAFA')
if not df.empty:
df_display = df.fillna("")
last_contract_id = None
count = 0
for _, row in df_display.iterrows():
values = []
for col in target_cols:
val = row.get(col, "")
if '明细' in title and col == '合同标的':
current_id = row.get('合同编号', '')
if current_id == last_contract_id: val = ""
values.append(val)
if '明细' in title: last_contract_id = row.get('合同编号', '')
status = row.get('_status', '')
tags = [status] if status else [('even' if count % 2 == 0 else 'odd')]
tree.insert("", "end", values=values, tags=tags)
count += 1
tree.bind("<Double-1>", lambda event: self.on_double_click(event, tree))
def on_double_click(self, event, tree):
region = tree.identify("region", event.x, event.y)
if region != "cell": return
column = tree.identify_column(event.x)
row_id = tree.identify_row(event.y)
col_idx = int(column.replace('#', '')) - 1
col_name = tree['columns'][col_idx]
current_val = tree.item(row_id, "values")[col_idx]
new_val = simpledialog.askstring("快速编辑", f"修改 [{col_name}]:", initialvalue=current_val, parent=self.root)
if new_val is not None:
current_values = list(tree.item(row_id, "values"))
current_values[col_idx] = new_val
tree.item(row_id, values=current_values)
def save_files(self):
if not self.final_data:
messagebox.showwarning("提示", "没有可保存的数据,请先处理文件!")
return
csv_path_str = self.csv_paths.get()
first_path = csv_path_str.split(';')[0].strip() if csv_path_str else ""
base_dir = os.path.dirname(first_path) if first_path else ""
confirm = messagebox.askyesno("确认写入",
"您确定要将预览的数据写入到 Excel 文件吗?\n\n此操作将覆盖目标文件中的数据。")
if not confirm:
self.status_var.set("已取消写入")
return
self.status_var.set("💾 正在写入文件...")
self.root.update()
try:
for file_type, sheets in self.final_data.items():
target_path = ""
if file_type == 'ASD':
target_path = self.asd_path.get()
if not target_path: target_path = os.path.join(base_dir, "ASD_Combined.xlsx")
elif file_type == 'NonASD':
target_path = self.non_asd_path.get()
if not target_path: target_path = os.path.join(base_dir, "NonASD_Combined.xlsx")
with pd.ExcelWriter(target_path, engine='openpyxl') as writer:
valid_sheets = ['外贸', '外贸明细', '内贸', '内贸明细', 'OM合同']
for sheet_name in valid_sheets:
if sheet_name in sheets:
df = sheets[sheet_name]
if '合同编号' in df.columns:
sort_cols = ['合同编号']
asc_order = [True]
if '_sort_price' in df.columns:
sort_cols.append('_sort_price')
asc_order.append(False)
df = df.sort_values(by=sort_cols, ascending=asc_order)
save_df = df.drop(columns=['_status', '_sort_price'], errors='ignore')
if not save_df.empty:
if '明细' in sheet_name:
mask = save_df.duplicated(subset=['合同编号'], keep='first')
save_df.loc[mask, '合同标的'] = ""
save_df.to_excel(writer, sheet_name=sheet_name, index=False)
self.status_var.set("✅ 写入成功!")
messagebox.showinfo("成功", f"文件已成功写入!\n位置: {base_dir or '当前目录'}")
except PermissionError:
messagebox.showerror("保存失败", "文件被占用!\n请先关闭 Excel 文件后再点击保存。")
except Exception as e:
messagebox.showerror("保存失败", str(e))
finally:
if self.status_var.get() != "已取消写入":
self.status_var.set("准备就绪")
if __name__ == "__main__":
root = tk.Tk()
try:
from ctypes import windll
windll.shcore.SetProcessDpiAwareness(1)
except:
pass
app = ContractApp(root)
root.mainloop()