import pandas as pd import tkinter as tk from tkinter import ttk, filedialog, messagebox, simpledialog import os import numpy as np import re from datetime import datetime # ========================================== # 第一部分:业务逻辑核心 # ========================================== class DataProcessor: def __init__(self): # ==================== 1. 外贸总表表头 ==================== self.cols_asd_foreign_general = [ "合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期", "销售员", "最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地", "厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率", "合同额", "总合同额", "外购", "已收款", "未收款", "收款日期", "最晚发货期", "付款方式", "发货港", "目的港", "发货日期", "买方单位", "买方信息\n联系人、电话、邮箱", "收货人信息", "转为美元净合同额", "转为美元总合同额" ] self.cols_nonasd_foreign_general = [ "合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期", "销售员", "最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地", "厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率", "合同额", "总合同额", "外购", "已收款", "未收款", "收款日期", "最晚发货期", "付款方式", "发货港", "目的港", "发货日期", "买方单位", "买方信息\n联系人、电话、邮箱", "收货人信息", "合同币种/美元", "转为美元净合同额", "转为美元总合同额" ] # ==================== 2. 内贸总表表头 ==================== self.cols_domestic_general = [ "合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", "销售员", "最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地", "买方单位", "买方信息\n联系人、电话、邮箱", "厂家", "型号", "合同标的", "数量", "单位", "折扣率(%)", "合同额", "合同总额", "外购", "付款方式", "最晚发货期", "已收款", "未收款", "收款日期", "转为美元净合同额", "转为美元总合同额" ] # ==================== 3. 外贸明细表头 ==================== self.cols_foreign_detail = [ "合同编号", "销售员", "合同标的", "厂家", "货号", "产品描述", "数量", "单位", "币种", "报价单价", "报价总价", "销售单价", "销售总价", "折扣率", "外购", "合同币种/美元", "外购转美元", "报价总价美元", "净合同额美元" ] # ==================== 4. 内贸明细表头 ==================== self.cols_domestic_detail = [ "合同编号", "销售员", "合同标的", "厂家", "货号", "产品描述", "数量", "单位", "外币币种", "外币报价单价", "报价RMB单价", "报价RMB总价", "售价RMB单价", "售价RMB总价", "折扣率(%)", "外购", "计算汇率", "外购转美元", "报价总价美元", "净合同额美元" ] # ==================== 5. OM合同表头 ==================== self.cols_om = [ "合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", "销售员", "最终用户单位", "最终用户信息\n联系人、电话、邮箱", "最终用户所在地", "买方单位", "买方信息\n联系人、电话、邮箱", "合同标的", "合同总额", "已收款", "未收款", "收款日期" ] # [逻辑] 只写在“第一行”(单价最高行)的列 self.header_only_cols = set([ "总合同额", "合同总额", "外购", "付款方式", "最晚发货期", "已收款", "未收款", "收款日期", "收款情况", "转为美元净合同额", "转为美元总合同额" ]) # [逻辑] 金额列 (保留两位小数) self.money_cols = set([ "合同额", "总合同额", "合同总额", "外购", "已收款", "未收款", "净合同额美元", "外购转美元", "报价总价美元", "外币报价单价", "报价RMB单价", "报价RMB总价", "售价RMB单价", "售价RMB总价", "外购产品金额", "转为美元净合同额", "转为美元总合同额", "报价单价", "报价总价", "销售单价", "销售总价" ]) # [逻辑] 比率列 (百分比展示) self.percent_cols = set([ "折扣率", "折扣率(%)", "计算汇率", "合同币种/美元" ]) # [新增逻辑] 日期列 (需要去除时分秒) self.date_cols = set([ "合同签订日期", "签订日期", "收款日期", "最晚发货期", "发货日期" ]) # [逻辑] 旧表头映射 (用于读取旧Excel时兼容) self.legacy_map = { "外币币种": "币种", "汇率": "计算汇率", "折扣率(%)": "折扣率", "折扣率(%)": "折扣率(%)", "合同": "合同额" } # [核心] 构建所有标准列名的快速查找字典 (清洗后的key -> 标准带换行的key) # 目的:无论Excel里是 "最终用户信息联系人..." 还是 "最终用户信息\n联系人...", 都能映射回标准 self.standard_col_map = {} all_lists = [ self.cols_asd_foreign_general, self.cols_nonasd_foreign_general, self.cols_domestic_general, self.cols_foreign_detail, self.cols_domestic_detail, self.cols_om ] for lst in all_lists: for col in lst: clean_key = self.clean_header_key(col) self.standard_col_map[clean_key] = col def clean_header_key(self, text): """清洗表头:去除换行、空格、制表符,只保留纯文本""" if not isinstance(text, str): return str(text) return re.sub(r'[\s\n\r]+', '', text) def safe_float(self, val): try: if isinstance(val, str): val = val.replace(',', '').replace('¥', '').replace('$', '').strip() if val == '': return 0.0 if pd.isna(val): return 0.0 return float(val) except: return 0.0 def format_money_str(self, val): if pd.isna(val) or str(val).strip() == "": return "" try: f_val = self.safe_float(val) return "{:.2f}".format(f_val) except: return str(val) def format_percent_str(self, val): if pd.isna(val) or str(val).strip() == "": return "" try: s_val = str(val).strip() if '%' in s_val: return s_val f_val = self.safe_float(val) return "{:.2f}%".format(f_val * 100) except: return str(val) def format_date_str(self, val): """格式化日期:去除时分秒,统一为 YYYY-MM-DD""" if pd.isna(val) or str(val).strip() == "": return "" try: # 如果已经是短日期字符串,直接返回 s_val = str(val).strip() # 尝试解析 dt = pd.to_datetime(val, errors='coerce') if pd.isnull(dt): return s_val # 解析失败返回原样 return dt.strftime('%Y-%m-%d') except: return str(val) def normalize_for_compare(self, val): if pd.isna(val) or val is None: return "" s_val = str(val).strip() if s_val.lower() == 'nan': return "" clean_val = s_val.replace(',', '').replace('%', '') try: f_val = float(clean_val) return "{:.4f}".format(f_val) except: return s_val def load_csv(self, file_path): df = None encodings = ['utf-8', 'gbk', 'gb18030'] for enc in encodings: try: df = pd.read_csv(file_path, encoding=enc) break except UnicodeDecodeError: continue if df is None: try: df = pd.read_csv(file_path, encoding='gb18030', encoding_errors='replace') except: return None, "无法读取文件,请检查编码。" col_factory_general = '厂家' col_factory_detail = '厂家.1' if '厂家.1' in df.columns else '厂家' df[col_factory_general] = df[col_factory_general].fillna('').astype(str) df['合同类型'] = df['合同类型'].fillna('').astype(str) return df, (col_factory_general, col_factory_detail) def parse_buyer_info(self, text): info = {'name': '', 'contact_full': ''} if not isinstance(text, str) or not text.strip(): return info lines = [l.strip() for l in text.split('\n') if l.strip()] if not lines: return info info['name'] = lines[0] info['contact_full'] = " ".join(lines[1:]) return info def parse_single_line_subject(self, text): res = {'name': '', 'model': '', 'qty': '', 'unit': '', 'price': '', 'sort_price': 0.0} if not isinstance(text, str) or not text.strip(): return res text = text.strip() if '/' in text: parts = [p.strip() for p in text.split('/')] if len(parts) >= 1: res['name'] = parts[0] if len(parts) >= 2: res['model'] = parts[1] if len(parts) >= 3: m_qty = re.match(r'^(\d+(\.\d+)?)\s*([\u4e00-\u9fa5a-zA-Z]+)?$', parts[2]) if m_qty: res['qty'] = m_qty.group(1) res['unit'] = m_qty.group(3) if m_qty.group(3) else "" else: res['qty'] = parts[2] if len(parts) >= 4: res['price'] = parts[3] res['sort_price'] = self.safe_float(parts[3]) return res name_match = re.search(r'(?:中文品名|中文名称|名称|Name)[::]\s*(.*?)(?:\n|$)', text, re.IGNORECASE) if name_match: res['name'] = name_match.group(1).strip() else: res['name'] = text.split('\n')[0] nums = re.findall(r'\d+(?:\.\d+)?', text.replace(',', '')) if nums: res['sort_price'] = self.safe_float(nums[-1]) res['price'] = nums[-1] return res # === [核心] 总表处理逻辑 === def process_row_general_expanded(self, row, trade_type, trade_cols, col_factory): # 使用传入的 trade_cols (已是根据ASD/NonASD选择好的标准表头) target_cols = trade_cols base_data = {} order_no_raw = str(row.get('合同订单编号', '')).strip() parts_no = order_no_raw.split() base_data['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw contract_no_col = '外贸合同号' if trade_type == '外贸' else '内贸合同号' base_data[contract_no_col] = " ".join(parts_no[1:]) if len(parts_no) > 1 else "" # 财务数据 total_amount = self.format_money_str(row.get('合同总额', '')) status = str(row.get('收款状态', '')).strip() received = "" unreceived = "" if '已收' in status: received = total_amount unreceived = self.format_money_str(0) # 买方信息 if trade_type == '内贸': buyer_raw = str(row.get('合同买方(名称/联系人/电话/邮箱)', '')) else: buyer_raw = str(row.get('进口代理(名称/USCI/地址/联系人/电话/邮箱)', '')) if buyer_raw == '' or buyer_raw == 'nan': buyer_raw = str(row.get('合同买方(名称/联系人/电话/邮箱)', '')) parsed_buyer = self.parse_buyer_info(buyer_raw) # 解析标的 target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) lines = [line.strip() for line in target_raw.split('\n') if line.strip()] parsed_items = [] if not lines: parsed_items.append({'name': '', 'model': '', 'qty': '', 'unit': '', 'price': '', 'sort_price': 0}) else: for line in lines: parsed_items.append(self.parse_single_line_subject(line)) # 排序并只取第一行 parsed_items.sort(key=lambda x: x['sort_price'], reverse=True) best_item = parsed_items[0] new_row = {col: "" for col in target_cols} new_row['合同编号'] = base_data['合同编号'] new_row[contract_no_col] = base_data[contract_no_col] new_row['签署公司'] = row.get('收款账户', '') # 日期 (使用新格式化函数) date_raw = row.get('签约日期', '') if '合同签订日期' in new_row: new_row['合同签订日期'] = self.format_date_str(date_raw) if '签订日期' in new_row: new_row['签订日期'] = self.format_date_str(date_raw) new_row['销售员'] = row.get('负责人', '') new_row['最终用户单位'] = row.get('客户名称', '') # 处理带换行符的列名映射 # 通过遍历 target_cols 找到匹配的列 for col in target_cols: if "最终用户信息" in col: new_row[col] = row.get('联系人姓名', '') if "买方信息" in col: new_row[col] = parsed_buyer['contact_full'] new_row['厂家'] = row.get(col_factory, '') if '币种' in new_row: new_row['币种'] = row.get('货币(选完产品再改)', '') if '发货港' in new_row: new_row['发货港'] = row.get('发货地', '') if '目的港' in new_row: new_row['目的港'] = row.get('目的港', '') new_row['买方单位'] = parsed_buyer['name'] if '收货人信息' in new_row: new_row['收货人信息'] = parsed_buyer['name'] discount_col = '折扣率' if '折扣率' in new_row else '折扣率(%)' if discount_col in new_row: new_row[discount_col] = self.format_percent_str(row.get('折扣率', '')) new_row['合同标的'] = best_item['name'] if '型号/货号' in new_row: new_row['型号/货号'] = best_item['model'] if '型号' in new_row: new_row['型号'] = best_item['model'] new_row['数量'] = best_item['qty'] new_row['单位'] = best_item['unit'] # 合同额 (单行价格) if '合同额' in new_row: new_row['合同额'] = self.format_money_str(best_item['price']) # 财务总额 (整单) total_col_name = '总合同额' if '总合同额' in new_row else '合同总额' new_row[total_col_name] = total_amount new_row['收款情况'] = status new_row['外购'] = self.format_money_str(row.get('外购产品金额', '')) new_row['已收款'] = received new_row['未收款'] = unreceived new_row['收款日期'] = self.format_date_str(row.get('最新收款日期', '')) if '最晚发货期' in new_row: new_row['最晚发货期'] = self.format_date_str(row.get('最晚发货期', '')) if '付款方式' in new_row: new_row['付款方式'] = row.get('付款比例及期限', '') if '发货日期' in new_row: new_row['发货日期'] = "" # 初始为空 if '合同币种/美元' in new_row: new_row['合同币种/美元'] = row.get('合同币种/美元', '') new_row['_sort_price'] = best_item['sort_price'] return [new_row] # === [核心] 通用总表聚合行生成逻辑 (用于处理多行CSV聚合) === def generate_general_row_aggregated(self, contract_id, group_df, target_cols, trade_type, is_asd, col_factory): first_row = group_df.iloc[0] # 直接复用单行处理逻辑,因为核心差异在标的聚合,我们在这里做聚合解析 # 实际上 process_row_general_expanded 已经包含了标的解析和 Top 1 选取 # 但如果是多行CSV记录(例如3行CSV对应同一个合同号),我们需要把所有标的收集起来排序 all_items = [] for _, row in group_df.iterrows(): target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) lines = [line.strip() for line in target_raw.split('\n') if line.strip()] if lines: for line in lines: all_items.append(self.parse_single_line_subject(line)) if not all_items: all_items.append({'name': '', 'model': '', 'qty': '', 'unit': '', 'price': '', 'sort_price': 0}) all_items.sort(key=lambda x: x['sort_price'], reverse=True) best_item = all_items[0] # 构造一个合成的 row,大部分信息取 first_row,标的信息替换为 best_item # 为了复用 process_row_general_expanded 的大量字段映射逻辑,我们构造一个 Series # 但 process_row_general_expanded 内部又会解析一次标的... # 简便做法:修改 process_row_general_expanded 让它接受 item 参数 # 或者我们在这里手动构造 # 重新利用 process_row_general_expanded 生成骨架,然后修正标的数据 rows = self.process_row_general_expanded(first_row, trade_type, target_cols, col_factory) final_row = rows[0] # 修正标的字段为全局最优 final_row['合同标的'] = best_item['name'] if '型号/货号' in final_row: final_row['型号/货号'] = best_item['model'] if '型号' in final_row: final_row['型号'] = best_item['model'] final_row['数量'] = best_item['qty'] final_row['单位'] = best_item['unit'] if '合同额' in final_row: final_row['合同额'] = self.format_money_str(best_item['price']) final_row['_sort_price'] = best_item['sort_price'] return final_row # === 明细表处理逻辑 === def process_row_detail(self, row, col_factory, trade_type): if trade_type == '外贸': target_cols = self.cols_foreign_detail else: target_cols = self.cols_domestic_detail new_row = {col: "" for col in target_cols} detail_manuf_val = str(row.get(col_factory, '')) order_no_raw = str(row.get('合同订单编号', '')).strip() parts_no = order_no_raw.split() new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw new_row['销售员'] = row.get('负责人', '') new_row['厂家'] = detail_manuf_val new_row['货号'] = row.get('产品编码', '') if trade_type == '外贸': new_row['币种'] = row.get('原币种', '') else: new_row['外币币种'] = row.get('原币种', '') target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) if '/' in target_raw: new_row['合同标的'] = target_raw.split('/')[0].strip() else: new_row['合同标的'] = target_raw.split('\n')[0].strip() csv_qty = str(row.get('数量', '')).strip() if csv_qty and csv_qty.lower() != 'nan': new_row['数量'] = csv_qty val_product_subtotal = self.safe_float(row.get('产品小计', 0)) if '外购' in detail_manuf_val: new_row['外购'] = self.format_money_str(val_product_subtotal) remark = str(row.get('备注', '')).strip() if not remark or remark.lower() == 'nan': outsourced = str(row.get('外购产品明细', '')).strip() new_row['产品描述'] = outsourced if outsourced.lower() != 'nan' else "" else: new_row['产品描述'] = remark else: new_row['外购'] = "" new_row['产品描述'] = row.get('产品名称', '') if '外币报价单价' in new_row: new_row['外币报价单价'] = self.format_money_str(row.get('美元报价', '')) if '报价单价' in new_row: new_row['报价单价'] = self.format_money_str(row.get('美元报价', '')) if '报价RMB总价' in new_row: new_row['报价RMB总价'] = self.format_money_str(row.get('产品小计', '')) if '报价总价' in new_row: new_row['报价总价'] = self.format_money_str(row.get('产品小计', '')) if '计算汇率' in new_row: new_row['计算汇率'] = self.format_percent_str(row.get('汇率', '')) if '合同币种/美元' in new_row: new_row['合同币种/美元'] = self.format_percent_str(row.get('汇率', '')) discount_col = '折扣率' if '折扣率' in new_row else '折扣率(%)' if discount_col in new_row: new_row[discount_col] = self.format_percent_str(row.get('折扣率', '')) if '售价RMB单价' in new_row: new_row['售价RMB单价'] = self.format_money_str(row.get('销售单价', '')) if '销售单价' in new_row: new_row['销售单价'] = self.format_money_str(row.get('销售单价', '')) if '售价RMB总价' in new_row: new_row['售价RMB总价'] = self.format_money_str(row.get('销售总价', '')) if '销售总价' in new_row: new_row['销售总价'] = self.format_money_str(row.get('销售总价', '')) new_row['外购转美元'] = self.format_money_str(row.get('外购转美元', '')) new_row['报价总价美元'] = self.format_money_str(row.get('报价总价美元', '')) new_row['净合同额美元'] = self.format_money_str(row.get('净合同额美元', '')) if '报价RMB单价' in new_row: new_row['报价RMB单价'] = self.format_money_str(row.get('报价RMB单价', '')) return pd.Series(new_row) # OM表处理 (使用聚合) def generate_om_row_aggregated(self, contract_id, group_df, target_cols): first_row = group_df.iloc[0] all_items = [] for _, row in group_df.iterrows(): target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) lines = [line.strip() for line in target_raw.split('\n') if line.strip()] if lines: for line in lines: all_items.append(self.parse_single_line_subject(line)) if not all_items: all_items.append({'name': '', 'price': '', 'sort_price': 0}) all_items.sort(key=lambda x: x['sort_price'], reverse=True) best_item = all_items[0] new_row = {col: "" for col in target_cols} order_no_raw = str(first_row.get('合同订单编号', '')).strip() parts_no = order_no_raw.split() new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw new_row['内贸合同号'] = " ".join(parts_no[1:]) if len(parts_no) > 1 else "" total_amount = self.format_money_str(first_row.get('合同总额', '')) status = str(first_row.get('收款状态', '')).strip() received = "" unreceived = "" if '已收' in status: received = total_amount unreceived = self.format_money_str(0) new_row['签署公司'] = first_row.get('收款账户', '') new_row['签订日期'] = self.format_date_str(first_row.get('签约日期', '')) new_row['销售员'] = first_row.get('负责人', '') new_row['最终用户单位'] = first_row.get('客户名称', '') contact_col = '最终用户信息\n联系人、电话、邮箱' if contact_col in new_row: new_row[contact_col] = first_row.get('联系人姓名', '') buyer_raw = str(first_row.get('合同买方(名称/联系人/电话/邮箱)', '')) parsed_buyer = self.parse_buyer_info(buyer_raw) new_row['买方单位'] = parsed_buyer['name'] buyer_info_col = '买方信息\n联系人、电话、邮箱' if buyer_info_col in new_row: new_row[buyer_info_col] = parsed_buyer['contact_full'] new_row['收款日期'] = self.format_date_str(first_row.get('最新收款日期', '')) new_row['合同标的'] = best_item['name'] new_row['_sort_price'] = best_item['sort_price'] new_row['合同总额'] = total_amount new_row['收款情况'] = status new_row['已收款'] = received new_row['未收款'] = unreceived return new_row def merge_datasets(self, old_dfs, csv_df, is_asd): col_gen = '厂家' col_det = '厂家.1' if '厂家.1' in csv_df.columns else '厂家' if is_asd: df_subset = csv_df[csv_df[col_gen].str.contains('ASD', case=False, na=False)] else: df_subset = csv_df[~csv_df[col_gen].str.contains('ASD', case=False, na=False)] csv_foreign = df_subset[df_subset['合同类型'] == '外贸'].copy() csv_domestic = df_subset[df_subset['合同类型'] == '内贸'].copy() csv_om = df_subset[~df_subset['合同类型'].isin(['外贸', '内贸'])].copy() result_dfs = {} def merge_logic_expanded(old_df, new_rows_list, unique_col, target_columns): if old_df is None or old_df.empty: if not new_rows_list: return pd.DataFrame(columns=target_columns + ['_status']) combined = pd.DataFrame(new_rows_list) combined['_status'] = 'new' return combined combined = old_df.copy() for col in target_columns: if col not in combined.columns: combined[col] = "" if '_sort_price' not in combined.columns: combined['_sort_price'] = 0.0 if unique_col in combined.columns: combined[unique_col] = combined[unique_col].astype(str) if '_status' not in combined.columns: combined['_status'] = '' if not new_rows_list: return combined new_rows_df = pd.DataFrame(new_rows_list) if unique_col in new_rows_df.columns: new_rows_df[unique_col] = new_rows_df[unique_col].astype(str) new_contract_ids = new_rows_df[unique_col].unique() rows_to_append = [] for cid in new_contract_ids: new_subset = new_rows_df[new_rows_df[unique_col] == cid] old_indices = combined[combined[unique_col] == cid].index if len(old_indices) > 0: first_old_idx = old_indices[0] new_first_row = new_subset.iloc[0] has_changed = False for col in target_columns: if col in new_first_row: new_val = new_first_row[col] old_val = combined.at[first_old_idx, col] if str(new_val).strip() != "": if self.normalize_for_compare(old_val) != self.normalize_for_compare(new_val): combined.at[first_old_idx, col] = new_val has_changed = True if '_sort_price' in new_first_row: combined.at[first_old_idx, '_sort_price'] = new_first_row['_sort_price'] if has_changed: combined.at[first_old_idx, '_status'] = 'modified' else: new_subset_copy = new_subset.copy() new_subset_copy['_status'] = 'new' rows_to_append.append(new_subset_copy) if rows_to_append: combined = pd.concat([combined] + rows_to_append, ignore_index=True) return combined # --- 1. 外贸总表 (聚合) --- new_gen_rows = [] target_cols_foreign = self.cols_asd_foreign_general if is_asd else self.cols_nonasd_foreign_general if not csv_foreign.empty: grouped = csv_foreign.groupby('合同订单编号') for contract_id, group in grouped: row_data = self.generate_general_row_aggregated(contract_id, group, target_cols_foreign, '外贸', is_asd, col_gen) new_gen_rows.append(row_data) old_gen = old_dfs.get('外贸', old_dfs.get('外贸总表', pd.DataFrame(columns=target_cols_foreign))) result_dfs['外贸'] = merge_logic_expanded(old_gen, new_gen_rows, '合同编号', target_cols_foreign) # --- 2. 外贸明细 --- if not csv_foreign.empty: new_det = csv_foreign.apply(lambda r: self.process_row_detail(r, col_det, '外贸'), axis=1) else: new_det = pd.DataFrame(columns=self.cols_foreign_detail) old_det = old_dfs.get('外贸明细', pd.DataFrame(columns=self.cols_foreign_detail)) result_dfs['外贸明细'] = merge_logic_expanded(old_det, new_det.to_dict('records'), '合同编号', self.cols_foreign_detail) # --- 3. 内贸总表 (聚合) --- new_dom_rows = [] if not csv_domestic.empty: grouped = csv_domestic.groupby('合同订单编号') for contract_id, group in grouped: row_data = self.generate_general_row_aggregated(contract_id, group, self.cols_domestic_general, '内贸', is_asd, col_gen) new_dom_rows.append(row_data) old_dom_gen = old_dfs.get('内贸', old_dfs.get('内贸总表', pd.DataFrame(columns=self.cols_domestic_general))) result_dfs['内贸'] = merge_logic_expanded(old_dom_gen, new_dom_rows, '合同编号', self.cols_domestic_general) # --- 4. 内贸明细 --- if not csv_domestic.empty: new_dom_det = csv_domestic.apply(lambda r: self.process_row_detail(r, col_det, '内贸'), axis=1) else: new_dom_det = pd.DataFrame(columns=self.cols_domestic_detail) old_dom_det = old_dfs.get('内贸明细', pd.DataFrame(columns=self.cols_domestic_detail)) result_dfs['内贸明细'] = merge_logic_expanded(old_dom_det, new_dom_det.to_dict('records'), '合同编号', self.cols_domestic_detail) # --- 5. OM (聚合) --- new_om_rows = [] if not csv_om.empty: grouped = csv_om.groupby('合同订单编号') for contract_id, group in grouped: row_data = self.generate_om_row_aggregated(contract_id, group, self.cols_om) new_om_rows.append(row_data) old_om = old_dfs.get('OM合同', old_dfs.get('其他', pd.DataFrame(columns=self.cols_om))) result_dfs['OM合同'] = merge_logic_expanded(old_om, new_om_rows, '合同编号', self.cols_om) return result_dfs def apply_formatting_to_all(self, data_dict): for sheet_name, df in data_dict.items(): if df.empty: continue for col in self.money_cols: if col in df.columns: df[col] = df[col].apply(self.format_money_str) for col in self.percent_cols: if col in df.columns: df[col] = df[col].apply(self.format_percent_str) for col in self.date_cols: if col in df.columns: df[col] = df[col].apply(self.format_date_str) return data_dict # ========================================== # 第二部分:GUI 界面 # ========================================== class ContractApp: def __init__(self, root): self.root = root self.root.title("合同数据处理系统 V3.8 (换行符修复版)") self.root.geometry("1300x850") self.style = ttk.Style() self.style.theme_use('clam') self.colors = {'bg': '#F5F6FA', 'primary': '#409EFF', 'success': '#67C23A', 'warning': '#E6A23C', 'text': '#2C3E50', 'panel': '#FFFFFF'} self.root.configure(bg=self.colors['bg']) self.default_font = ("微软雅黑", 10) self.header_font = ("微软雅黑", 11, "bold") self.style.configure("TFrame", background=self.colors['bg']) self.style.configure("Panel.TFrame", background=self.colors['panel'], relief="flat") self.style.configure("TLabel", background=self.colors['panel'], foreground=self.colors['text'], font=self.default_font) self.style.configure("Header.TLabel", font=("微软雅黑", 16, "bold"), background=self.colors['bg'], foreground=self.colors['text']) self.style.configure("TButton", font=("微软雅黑", 10), borderwidth=0, padding=6) self.style.map("TButton", background=[('active', '#E0E0E0')]) self.style.configure("Primary.TButton", background=self.colors['primary'], foreground='white') self.style.map("Primary.TButton", background=[('active', '#66B1FF')]) self.style.configure("Success.TButton", background=self.colors['success'], foreground='white') self.style.map("Success.TButton", background=[('active', '#85CE61')]) self.style.configure("Treeview", background="white", foreground="black", fieldbackground="white", rowheight=28, font=("微软雅黑", 9)) self.style.configure("Treeview.Heading", font=("微软雅黑", 10, "bold"), background="#EBEEF5", foreground="#606266") self.style.map("Treeview", background=[('selected', '#409EFF')]) self.processor = DataProcessor() self.csv_path = tk.StringVar() self.asd_path = tk.StringVar() self.non_asd_path = tk.StringVar() self.final_data = {} self.create_widgets() def create_widgets(self): header_frame = ttk.Frame(self.root) header_frame.pack(fill="x", padx=20, pady=(20, 10)) ttk.Label(header_frame, text="📄 合同数据处理工具 (支持 OM合同)", style="Header.TLabel").pack(side="left") input_panel = ttk.Frame(self.root, style="Panel.TFrame", padding=20) input_panel.pack(fill="x", padx=20, pady=5) ttk.Label(input_panel, text="文件配置 (若未选择旧文件,将自动生成新文件)", font=self.header_font).grid(row=0, column=0, columnspan=3, sticky="w", pady=(0, 15)) self.create_file_row(input_panel, "📂 导入 CSV 源文件:", self.csv_path, 1) self.create_file_row(input_panel, "📘 旧 ASD Excel 文件:", self.asd_path, 2) self.create_file_row(input_panel, "📗 旧 非ASD Excel 文件:", self.non_asd_path, 3) btn_frame = ttk.Frame(input_panel, style="Panel.TFrame") btn_frame.grid(row=4, column=0, columnspan=3, pady=(15, 0), sticky="e") ttk.Button(btn_frame, text="▶ 开始处理并预览", style="Primary.TButton", command=self.process_files).pack( side="right") self.notebook = ttk.Notebook(self.root) self.notebook.pack(fill="both", expand=True, padx=20, pady=10) bottom_bar = ttk.Frame(self.root, style="Panel.TFrame", padding=15) bottom_bar.pack(fill="x", padx=20, pady=(0, 20)) legend_frame = ttk.Frame(bottom_bar, style="Panel.TFrame") legend_frame.pack(side="left") self.create_legend(legend_frame, "■ 新增数据", "#FFFFCC", "black") self.create_legend(legend_frame, "■ 有修改/变动", "#ECF5FF", "#409EFF") self.create_legend(legend_frame, "□ 无变动", "white", "black") ttk.Button(bottom_bar, text="💾 保存更改至 Excel", style="Success.TButton", command=self.save_files).pack( side="right") def create_file_row(self, parent, label_text, var, row_idx): ttk.Label(parent, text=label_text, width=20).grid(row=row_idx, column=0, sticky="w", pady=5) entry = ttk.Entry(parent, textvariable=var, font=("微软雅黑", 9)) entry.grid(row=row_idx, column=1, sticky="ew", padx=10, pady=5) ttk.Button(parent, text="浏览", command=lambda: self.browse_file(var)).grid(row=row_idx, column=2, padx=5) parent.columnconfigure(1, weight=1) def create_legend(self, parent, text, bg_color, fg_color): lbl = tk.Label(parent, text=text, bg=bg_color, fg=fg_color, font=("微软雅黑", 9), padx=8, pady=3, borderwidth=1, relief="solid") lbl.pack(side="left", padx=5) def browse_file(self, variable): f = filedialog.askopenfilename(filetypes=[("Excel/CSV Files", "*.csv;*.xlsx")]) if f: variable.set(f) def load_excel_safe(self, path): if not path or not os.path.exists(path): return {} try: dfs = pd.read_excel(path, sheet_name=None) clean_dfs = {} for k, v in dfs.items(): # [关键修复] 智能表头匹配:重命名表头为标准格式 new_columns = [] for col in v.columns: clean_col = self.processor.clean_header_key(str(col)) # 尝试在标准映射里找 if clean_col in self.processor.standard_col_map: new_columns.append(self.processor.standard_col_map[clean_col]) # 尝试在旧映射里找 elif col in self.processor.legacy_map: new_columns.append(self.processor.legacy_map[col]) else: new_columns.append(col) # 找不到就保留原样 v.columns = new_columns # 去重 v = v.loc[:, ~v.columns.duplicated()] if '合同编号' in v.columns: v['合同编号'] = v['合同编号'].astype(str) clean_dfs[k.strip()] = v return clean_dfs except Exception as e: messagebox.showwarning("读取错误", f"读取旧文件失败: {path}\n错误: {str(e)}") return {} def process_files(self): if not self.csv_path.get(): messagebox.showerror("提示", "请先选择 CSV 源文件!") return csv_df, headers = self.processor.load_csv(self.csv_path.get()) if csv_df is None: messagebox.showerror("错误", headers) return self.final_data = {} path_asd = self.asd_path.get() asd_old = self.load_excel_safe(path_asd) self.final_data['ASD'] = self.processor.merge_datasets(asd_old, csv_df, True) path_non = self.non_asd_path.get() non_old = self.load_excel_safe(path_non) self.final_data['NonASD'] = self.processor.merge_datasets(non_old, csv_df, False) self.final_data['ASD'] = self.processor.apply_formatting_to_all(self.final_data['ASD']) self.final_data['NonASD'] = self.processor.apply_formatting_to_all(self.final_data['NonASD']) self.refresh_preview() messagebox.showinfo("完成", "数据处理完成!\n请查看预览,确认无误后点击下方保存。") def refresh_preview(self): for tab in self.notebook.tabs(): self.notebook.forget(tab) for file_type in ['ASD', 'NonASD']: if file_type not in self.final_data: continue data_dict = self.final_data[file_type] main_frame = ttk.Frame(self.notebook, style="Panel.TFrame") self.notebook.add(main_frame, text=f" {file_type} 文件预览 ") inner_notebook = ttk.Notebook(main_frame) inner_notebook.pack(fill="both", expand=True, padx=5, pady=5) sheet_order = ['外贸', '外贸明细', '内贸', '内贸明细', 'OM合同'] for sheet_name in sheet_order: if sheet_name in data_dict: df = data_dict[sheet_name] if not df.empty: if '合同编号' in df.columns: df['合同编号'] = df['合同编号'].astype(str) sort_cols = ['合同编号'] asc_order = [True] if '_sort_price' in df.columns: sort_cols.append('_sort_price') asc_order.append(False) df = df.sort_values(by=sort_cols, ascending=asc_order) if '明细' in sheet_name: mask = df.duplicated(subset=['合同编号'], keep='first') df.loc[mask, '合同标的'] = "" standard_cols = [] is_asd = (file_type == 'ASD') if sheet_name == '外贸': standard_cols = self.processor.cols_asd_foreign_general if is_asd else self.processor.cols_nonasd_foreign_general elif sheet_name == '内贸': standard_cols = self.processor.cols_domestic_general elif sheet_name == 'OM合同': standard_cols = self.processor.cols_om elif sheet_name == '外贸明细': standard_cols = self.processor.cols_foreign_detail elif sheet_name == '内贸明细': standard_cols = self.processor.cols_domestic_detail self.create_treeview(inner_notebook, df, sheet_name, standard_cols) def create_treeview(self, parent, df, title, target_cols): frame = ttk.Frame(parent) parent.add(frame, text=title) scroll_y = ttk.Scrollbar(frame, orient="vertical") scroll_x = ttk.Scrollbar(frame, orient="horizontal") # 仅显示标准列 display_cols = target_cols tree = ttk.Treeview(frame, columns=display_cols, show='headings', yscrollcommand=scroll_y.set, xscrollcommand=scroll_x.set) scroll_y.config(command=tree.yview) scroll_x.config(command=tree.xview) scroll_y.pack(side="right", fill="y") scroll_x.pack(side="bottom", fill="x") tree.pack(fill="both", expand=True) for col in display_cols: # 清洗显示名称(换行变空格,防止表头太高) clean_header = col.replace('\n', ' ') tree.heading(col, text=clean_header) tree.column(col, width=120, anchor="center") tree.tag_configure('new', background='#FFFFCC') tree.tag_configure('modified', background='#ECF5FF', foreground='#409EFF') if not df.empty: df_display = df.fillna("") last_contract_id = None for idx, row in df_display.iterrows(): values = [] for col in display_cols: val = row.get(col, "") if '明细' in title and col == '合同标的': current_id = row.get('合同编号', '') if current_id == last_contract_id: val = "" values.append(val) if '明细' in title: last_contract_id = row.get('合同编号', '') status = row.get('_status', '') tree.insert("", "end", values=values, tags=(status,)) tree.bind("", lambda event: self.on_double_click(event, tree, df)) def on_double_click(self, event, tree, df): region = tree.identify("region", event.x, event.y) if region != "cell": return column = tree.identify_column(event.x) row_id = tree.identify_row(event.y) col_idx = int(column.replace('#', '')) - 1 col_name = tree['columns'][col_idx] current_val = tree.item(row_id, "values")[col_idx] new_val = simpledialog.askstring("快速编辑", f"修改 [{col_name}]:", initialvalue=current_val, parent=self.root) if new_val is not None: current_values = list(tree.item(row_id, "values")) current_values[col_idx] = new_val tree.item(row_id, values=current_values) def save_files(self): if not self.final_data: return base_dir = os.path.dirname(self.csv_path.get()) if self.csv_path.get() else "" try: for file_type, sheets in self.final_data.items(): target_path = "" if file_type == 'ASD': target_path = self.asd_path.get() if not target_path: target_path = os.path.join(base_dir, "ASD_Combined.xlsx") elif file_type == 'NonASD': target_path = self.non_asd_path.get() if not target_path: target_path = os.path.join(base_dir, "NonASD_Combined.xlsx") with pd.ExcelWriter(target_path, engine='openpyxl') as writer: valid_sheets = ['外贸', '外贸明细', '内贸', '内贸明细', 'OM合同'] for sheet_name in valid_sheets: if sheet_name in sheets: df = sheets[sheet_name] if '合同编号' in df.columns: sort_cols = ['合同编号'] asc_order = [True] if '_sort_price' in df.columns: sort_cols.append('_sort_price') asc_order.append(False) df = df.sort_values(by=sort_cols, ascending=asc_order) save_df = df.drop(columns=['_status', '_sort_price'], errors='ignore') if not save_df.empty: if '明细' in sheet_name: mask = save_df.duplicated(subset=['合同编号'], keep='first') save_df.loc[mask, '合同标的'] = "" save_df.to_excel(writer, sheet_name=sheet_name, index=False) messagebox.showinfo("成功", f"文件保存成功!\n位置: {base_dir or '当前目录'}") except PermissionError: messagebox.showerror("保存失败", "文件被占用!\n请先关闭 Excel 文件后再点击保存。") except Exception as e: messagebox.showerror("保存失败", str(e)) if __name__ == "__main__": root = tk.Tk() app = ContractApp(root) root.mainloop()