From 6d0053a8c671625d4c9d20f54380a46521b99b19 Mon Sep 17 00:00:00 2001 From: DXC Date: Mon, 19 Jan 2026 16:25:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E4=B8=8D=E5=86=8D=E6=8B=BF=E5=8F=96=E6=95=B0=E6=8D=AE=EF=BC=8C?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=BA=E5=8F=8C=E8=A1=A8=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E5=90=88=E5=B9=B6=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 双表合并.py | 291 ++++++++++++++++++++++ 拿取内容测试.py | 91 ------- 登录测试.py | 56 ----- 页面.py | 558 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 849 insertions(+), 147 deletions(-) create mode 100644 双表合并.py delete mode 100644 拿取内容测试.py delete mode 100644 登录测试.py create mode 100644 页面.py diff --git a/双表合并.py b/双表合并.py new file mode 100644 index 0000000..158fd75 --- /dev/null +++ b/双表合并.py @@ -0,0 +1,291 @@ +import pandas as pd +import os + + +def process_contracts(file_path): + print(f"正在读取并处理文件: {file_path} ...") + + # --- 1. 读取CSV文件 (容错处理) --- + df = None + encodings = ['utf-8', 'gbk', 'gb18030'] + for enc in encodings: + try: + df = pd.read_csv(file_path, encoding=enc) + break + except UnicodeDecodeError: + continue + + if df is None: + try: + print("注意: 标准编码读取失败,正在尝试忽略乱码强制读取...") + df = pd.read_csv(file_path, encoding='gb18030', encoding_errors='replace') + except Exception as e: + print(f"读取文件彻底失败: {e}") + return + + # --- 2. 确认厂家列名 --- + col_factory_general = '厂家' + col_factory_detail = '厂家.1' + + if col_factory_detail not in df.columns: + print("警告: 未检测到第二个'厂家'列,明细表将被迫使用第一个'厂家'列。") + col_factory_detail = '厂家' + else: + print(f"厂家列识别成功:总表使用 '{col_factory_general}',明细表使用 '{col_factory_detail}'") + + # --- 3. 定义表头 --- + + # 3.1 外贸/内贸 总表表头 + columns_general = [ + "合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期", + "销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地", + "厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率", + "合同", "总合同额", "外购", "已收款", "未收款", "收款日期", + "最晚发货期", "付款方式", "发货港", "目的港", "发货日期", + "买方单位", "买方信息联系人、电话、邮箱", "收货人信息" + ] + columns_domestic_general = [c if c != "外贸合同号" else "内贸合同号" for c in columns_general] + + # 3.2 明细表表头 + columns_detail = [ + "合同编号", "销售员", "厂家", "合同标的", "货号", "产品描述", "数量", "单位", + "币种", "报价单价", "报价总价", "销售单价", "销售总价", "折扣率", + "外购", "合同币种/美元", "外购转美元", "报价总价美元", "净合同额美元" + ] + + # 3.3 其他表表头 + columns_other = [ + "合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", + "销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地", + "买方单位", "买方信息联系人、电话、邮箱", "合同标的", "合同总额", + "已收款", "未收款", "收款日期" + ] + + # --- 4. 辅助函数:安全转数字 --- + def safe_float(val): + try: + if isinstance(val, str): + val = val.replace(',', '').strip() + if val == '': return 0.0 + return float(val) + except (ValueError, TypeError): + return 0.0 + + # --- 5. 数据转换逻辑 --- + + # 5.1 外贸/内贸 总表转换逻辑 + def transform_general_row(row, trade_type): + target_cols = columns_general if trade_type == '外贸' else columns_domestic_general + new_row = {col: "" for col in target_cols} + + # 拆分合同号 + order_no_raw = str(row.get('合同订单编号', '')) + parts_no = order_no_raw.split(' ') + new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw + contract_no_col = '外贸合同号' if trade_type == '外贸' else '内贸合同号' + new_row[contract_no_col] = parts_no[1] if len(parts_no) > 1 else "" + + # 拆分合同标的 (不再从这里取总价) + target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) + parts_target = target_raw.split('/') + if len(parts_target) >= 1: new_row['合同标的'] = parts_target[0] + if len(parts_target) >= 2: new_row['型号/货号'] = parts_target[1] + if len(parts_target) >= 3: new_row['数量'] = parts_target[2] + if len(parts_target) >= 4: new_row['合同'] = parts_target[3] # 单价 + + # 【修改点】总合同额:直接读取 CSV 中的“合同总额”列 + new_row['总合同额'] = row.get('合同总额', '') + + # 映射其他字段 + new_row['签署公司'] = row.get('收款账户', '') + new_row['收款情况'] = row.get('收款状态', '') + new_row['合同签订日期'] = row.get('签约日期', '') + new_row['销售员'] = row.get('负责人', '') + new_row['最终用户单位'] = row.get('客户名称', '') + new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '') + new_row['厂家'] = row.get(col_factory_general, '') + new_row['币种'] = row.get('货币(选完产品再改)', '') + new_row['外购'] = row.get('外购产品金额', '') + new_row['收款日期'] = row.get('最新收款日期', '') + new_row['最晚发货期'] = row.get('最晚发货期', '') + new_row['付款方式'] = row.get('付款比例及期限', '') + new_row['发货港'] = row.get('发货地', '') + new_row['目的港'] = row.get('目的港', '') + new_row['买方单位'] = row.get('合同买方(名称/联系人/电话/邮箱)', '') + + return pd.Series(new_row) + + # 5.2 明细表转换逻辑 + def transform_detail_row(row): + new_row = {col: "" for col in columns_detail} + + detail_manuf_val = str(row.get(col_factory_detail, '')) + order_no_raw = str(row.get('合同订单编号', '')) + new_row['合同编号'] = order_no_raw.split(' ')[0] if order_no_raw else "" + new_row['销售员'] = row.get('负责人', '') + new_row['厂家'] = detail_manuf_val + new_row['货号'] = row.get('产品编码', '') + new_row['数量'] = row.get('数量', '') + new_row['单位'] = "" + new_row['币种'] = row.get('原币种', '') + new_row['折扣率'] = "" + + target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) + parts_target = target_raw.split('/') + new_row['合同标的'] = parts_target[0] if len(parts_target) >= 1 else "" + + val_outsourcing_raw = safe_float(row.get('外购产品金额', 0)) + val_rate = safe_float(row.get('汇率', 1)) + if val_rate == 0: val_rate = 1 + + raw_price_unit = row.get('美元报价', '') + raw_price_total = row.get('产品小计', '') + + if '外购' in detail_manuf_val: + new_row['外购'] = val_outsourcing_raw + new_row['产品描述'] = row.get('备注', '') + new_row['报价单价'] = "" + new_row['报价总价'] = "" + new_row['销售单价'] = "" + new_row['销售总价'] = "" + current_outsourcing_cost = val_outsourcing_raw + else: + new_row['外购'] = "" + new_row['产品描述'] = row.get('产品名称', '') + new_row['报价单价'] = raw_price_unit + new_row['报价总价'] = raw_price_total + new_row['销售单价'] = "" + new_row['销售总价'] = "" + current_outsourcing_cost = 0 + + new_row['合同币种/美元'] = "" + + if current_outsourcing_cost > 0: + new_row['外购转美元'] = round(current_outsourcing_cost / val_rate, 2) + else: + new_row['外购转美元'] = "" + + new_row['报价总价美元'] = "" + new_row['净合同额美元'] = "" + + return pd.Series(new_row) + + # 5.3 其他表转换逻辑 + def transform_other_row(row): + new_row = {col: "" for col in columns_other} + + # 拆分合同号 + order_no_raw = str(row.get('合同订单编号', '')) + parts_no = order_no_raw.split(' ') + new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw + new_row['内贸合同号'] = parts_no[1] if len(parts_no) > 1 else "" + + # 合同标的 (取第一部分) + target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) + parts_target = target_raw.split('/') + if len(parts_target) >= 1: + new_row['合同标的'] = parts_target[0] + + # 【修改点】合同总额:直接读取源CSV的“合同总额”列 + new_row['合同总额'] = row.get('合同总额', '') + + # 映射其他字段 + new_row['签署公司'] = row.get('收款账户', '') + new_row['收款情况'] = row.get('收款状态', '') + new_row['签订日期'] = row.get('签约日期', '') + new_row['销售员'] = row.get('负责人', '') + new_row['最终用户单位'] = row.get('客户名称', '') + new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '') + new_row['买方单位'] = row.get('合同买方(名称/联系人/电话/邮箱)', '') + new_row['收款日期'] = row.get('最新收款日期', '') + + return pd.Series(new_row) + + # --- 6. 主处理流程 --- + + df[col_factory_general] = df[col_factory_general].fillna('').astype(str) + df['合同类型'] = df['合同类型'].fillna('').astype(str) + + # 文件拆分逻辑 + df_asd = df[df[col_factory_general].str.contains('ASD', case=False, na=False)] + df_non_asd = df[~df[col_factory_general].str.contains('ASD', case=False, na=False)] + + def create_excel(dataframe, filename): + raw_foreign = dataframe[dataframe['合同类型'] == '外贸'].copy() + raw_domestic = dataframe[dataframe['合同类型'] == '内贸'].copy() + raw_other = dataframe[~dataframe['合同类型'].isin(['外贸', '内贸'])].copy() + + # === 1. 生成外贸数据 === + if not raw_foreign.empty: + df_gen = raw_foreign.apply(lambda row: transform_general_row(row, '外贸'), axis=1) + df_gen = df_gen[columns_general] + df_gen_unique = df_gen.drop_duplicates(subset=['合同编号'], keep='first') + df_gen_unique = df_gen_unique.sort_values(by='合同编号', ascending=True) + + df_det = raw_foreign.apply(lambda row: transform_detail_row(row), axis=1) + df_det = df_det[columns_detail] + df_det = df_det.sort_values(by='合同编号', ascending=True) + mask_duplicates = df_det.duplicated(subset=['合同编号'], keep='first') + df_det.loc[mask_duplicates, '合同标的'] = "" + + else: + df_gen_unique = pd.DataFrame(columns=columns_general) + df_det = pd.DataFrame(columns=columns_detail) + + # === 2. 生成内贸数据 === + if not raw_domestic.empty: + df_dom_gen = raw_domestic.apply(lambda row: transform_general_row(row, '内贸'), axis=1) + df_dom_gen = df_dom_gen[columns_domestic_general] + df_dom_gen_unique = df_dom_gen.drop_duplicates(subset=['合同编号'], keep='first') + df_dom_gen_unique = df_dom_gen_unique.sort_values(by='合同编号', ascending=True) + + df_dom_det = raw_domestic.apply(lambda row: transform_detail_row(row), axis=1) + df_dom_det = df_dom_det[columns_detail] + df_dom_det = df_dom_det.sort_values(by='合同编号', ascending=True) + mask_duplicates_dom = df_dom_det.duplicated(subset=['合同编号'], keep='first') + df_dom_det.loc[mask_duplicates_dom, '合同标的'] = "" + + else: + df_dom_gen_unique = pd.DataFrame(columns=columns_domestic_general) + df_dom_det = pd.DataFrame(columns=columns_detail) + + # === 3. 生成其他数据 === + if not raw_other.empty: + df_other = raw_other.apply(lambda row: transform_other_row(row), axis=1) + df_other = df_other[columns_other] + # 去重 + df_other_unique = df_other.drop_duplicates(subset=['合同编号'], keep='first') + # 排序 + df_other_unique = df_other_unique.sort_values(by='合同编号', ascending=True) + else: + df_other_unique = pd.DataFrame(columns=columns_other) + + # === 4. 写入 Excel === + try: + print(f"[{filename}] 正在写入Excel...") + with pd.ExcelWriter(filename, engine='openpyxl') as writer: + df_gen_unique.to_excel(writer, sheet_name='外贸总表', index=False) + df_det.to_excel(writer, sheet_name='外贸明细', index=False) + df_dom_gen_unique.to_excel(writer, sheet_name='内贸总表', index=False) + df_dom_det.to_excel(writer, sheet_name='内贸明细', index=False) + df_other_unique.to_excel(writer, sheet_name='其他', index=False) + print(f"成功生成文件: {filename}") + except Exception as e: + print(f"生成 {filename} 时发生错误: {e}") + + # 执行生成 + print("-" * 40) + create_excel(df_asd, 'ASD.xlsx') + print("-" * 40) + create_excel(df_non_asd, '非ASD.xlsx') + print("-" * 40) + print("全部处理完成!") + + +# --- 运行入口 --- +if __name__ == "__main__": + csv_file = 'test.csv' + if os.path.exists(csv_file): + process_contracts(csv_file) + else: + print(f"找不到文件: {csv_file},请检查路径。") \ No newline at end of file diff --git a/拿取内容测试.py b/拿取内容测试.py deleted file mode 100644 index 01a21e3..0000000 --- a/拿取内容测试.py +++ /dev/null @@ -1,91 +0,0 @@ -import requests -import json -import os - -# ================= 配置区域 ================= -base_url = "http://111.198.24.44:88/index.php" - -# 1. 登录信息 -login_payload = { - "module": "Users", - "action": "Authenticate", - "return_module": "Users", - "return_action": "Login", - "user_name": "你的用户名", # <--- 记得填 - "user_password": "你的密码", # <--- 记得填 - "login_theme": "newskin" -} - -# 2. 抓取数据参数 (保留了你之前的筛选条件) -data_payload = { - "module": "SalesOrder", - "action": "SalesOrderAjax", - "file": "ListViewData", - "sorder": "", - "start": "1", - "pagesize": "100", - "actionId": "1768546984243", - "isFilter": "true", - "search[viewscope]": "all_to_me", - "search[viewname]": "324126", - "filter[Fields0]": "subject", - "filter[Condition0]": "cts", - "filter[Srch_value0]": "W25A", - "filter[type0]": "text", - "filter[dateCondition1]": "prevfy", - "filter[Fields1]": "duedate", - "filter[Condition1]": "btwa", - "filter[Srch_value1]": "2025-01-01,2025-12-31", - "filter[type1]": "date", - "filter[Fields2]": "subject", - "filter[Condition2]": "dcts", - "filter[Srch_value2]": "取消", - "filter[type2]": "text", - "filter[search_cnt]": "3", - "filter[matchtype]": "all" -} - -headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", - "Referer": "http://111.198.24.44:88/index.php?module=SalesOrder&action=index" -} - -# ================= 执行逻辑 ================= -session = requests.Session() - -try: - print("1. 正在登录...") - session.post(base_url, data=login_payload, headers=headers) - - if 'PHPSESSID' in session.cookies: - print(" 登录成功,Cookie已获取。") - else: - print(" ⚠️ 警告:可能登录失败 (未检测到PHPSESSID)。") - - print("2. 正在获取数据并导出...") - resp = session.post(base_url, data=data_payload, headers=headers) - - # === 关键修改:保存文件 === - try: - # 尝试解析 JSON - json_data = resp.json() - - # 定义文件名 - filename = "result.json" - - # 写入文件 - # ensure_ascii=False 保证中文能正常显示,而不是显示成 \u53d6\u6d88 - with open(filename, 'w', encoding='utf-8') as f: - json.dump(json_data, f, ensure_ascii=False, indent=4) - - print(f"\n✅ 成功!数据已保存到当前目录下的: 【{filename}】") - print(f" 文件路径: {os.path.abspath(filename)}") - - except json.JSONDecodeError: - print("\n❌ 失败:服务器返回的不是 JSON 格式。") - print("可能是 HTML 页面,已保存为 'error_page.html' 供检查。") - with open("error_page.html", "w", encoding="utf-8") as f: - f.write(resp.text) - -except Exception as e: - print(f"发生错误: {e}") \ No newline at end of file diff --git a/登录测试.py b/登录测试.py deleted file mode 100644 index b53e7f3..0000000 --- a/登录测试.py +++ /dev/null @@ -1,56 +0,0 @@ -import requests - -# 1. 准备登录信息 -login_url = "http://111.198.24.44:88/index.php" - -# 这是你刚刚抓到的 Payload 数据 -payload = { - "error": "", - "login_theme": "newskin", - "module": "Users", - "action": "Authenticate", - "return_module": "Users", - "return_action": "Login", - "user_name": "TEST", # 在这里填入真实的用户名 - "user_password": "test", # 在这里填入真实的密码 - "code": "", - "user_validate": "" -} - -# 伪装成浏览器(这很重要,防止被反爬虫拦截) -headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" -} - -# 2. 创建一个 Session (会话) -# Session 的作用就像一个浏览器窗口,它会自动保存 Cookie -session = requests.Session() - -try: - # 3. 发送登录请求 - # allow_redirects=True 会自动跟随 301 跳转到主页,就像浏览器一样 - response = session.post(login_url, data=payload, headers=headers, allow_redirects=True) - - # 4. 检查结果 - print(f"状态码: {response.status_code}") - - # 获取到的 Cookie - print("获取到的 Cookies:") - print(session.cookies.get_dict()) - - # 简单的验证:如果返回的网页里包含了'退出'或用户名的字样,说明登录成功了 - if "logout" in response.text.lower() or "退出" in response.text: - print("\n==> 登录成功! <==") - - # 【进阶】: 登录成功后,你可以直接用这个 session 访问其他页面 - # 比如访问主页获取数据,它会自动带上刚才拿到的 cookie - # home_page = session.get("http://111.198.24.44:88/index.php?module=Home&action=index") - # print(home_page.text[:200]) - - else: - print("\n可能登录失败,请检查用户名密码。") - # 如果失败,打印一部分返回内容看看原因 - print("返回内容预览:", response.text[:500]) - -except Exception as e: - print(f"发生错误: {e}") \ No newline at end of file diff --git a/页面.py b/页面.py new file mode 100644 index 0000000..1779a82 --- /dev/null +++ b/页面.py @@ -0,0 +1,558 @@ +import pandas as pd +import tkinter as tk +from tkinter import ttk, filedialog, messagebox, simpledialog +import os +import numpy as np + + +# ========================================== +# 第一部分:业务逻辑核心 (保持不变) +# ========================================== + +class DataProcessor: + def __init__(self): + # 定义表头配置 + self.columns_general = [ + "合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期", + "销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地", + "厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率", + "合同", "总合同额", "外购", "已收款", "未收款", "收款日期", + "最晚发货期", "付款方式", "发货港", "目的港", "发货日期", + "买方单位", "买方信息联系人、电话、邮箱", "收货人信息" + ] + self.columns_domestic_general = [c if c != "外贸合同号" else "内贸合同号" for c in self.columns_general] + + self.columns_detail = [ + "合同编号", "销售员", "厂家", "合同标的", "货号", "产品描述", "数量", "单位", + "币种", "报价单价", "报价总价", "销售单价", "销售总价", "折扣率", + "外购", "合同币种/美元", "外购转美元", "报价总价美元", "净合同额美元" + ] + + self.columns_other = [ + "合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期", + "销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地", + "买方单位", "买方信息联系人、电话、邮箱", "合同标的", "合同总额", + "已收款", "未收款", "收款日期" + ] + + def safe_float(self, val): + try: + if isinstance(val, str): + val = val.replace(',', '').strip() + if val == '': return 0.0 + return float(val) + except: + return 0.0 + + def normalize_for_compare(self, val): + if pd.isna(val) or val is None: + return "" + s_val = str(val).strip() + if s_val.lower() == 'nan': + return "" + try: + f_val = float(s_val) + if f_val.is_integer(): + return str(int(f_val)) + return str(f_val) + except: + return s_val + + def load_csv(self, file_path): + df = None + encodings = ['utf-8', 'gbk', 'gb18030'] + for enc in encodings: + try: + df = pd.read_csv(file_path, encoding=enc) + break + except UnicodeDecodeError: + continue + if df is None: + try: + df = pd.read_csv(file_path, encoding='gb18030', encoding_errors='replace') + except: + return None, "无法读取文件,请检查编码。" + + col_factory_general = '厂家' + col_factory_detail = '厂家.1' if '厂家.1' in df.columns else '厂家' + df[col_factory_general] = df[col_factory_general].fillna('').astype(str) + df['合同类型'] = df['合同类型'].fillna('').astype(str) + return df, (col_factory_general, col_factory_detail) + + def process_row_general(self, row, trade_type, col_factory): + target_cols = self.columns_general if trade_type == '外贸' else self.columns_domestic_general + new_row = {col: "" for col in target_cols} + order_no_raw = str(row.get('合同订单编号', '')) + parts_no = order_no_raw.split(' ') + new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw + contract_no_col = '外贸合同号' if trade_type == '外贸' else '内贸合同号' + new_row[contract_no_col] = parts_no[1] if len(parts_no) > 1 else "" + target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) + parts_target = target_raw.split('/') + if len(parts_target) >= 1: new_row['合同标的'] = parts_target[0] + if len(parts_target) >= 2: new_row['型号/货号'] = parts_target[1] + if len(parts_target) >= 3: new_row['数量'] = parts_target[2] + if len(parts_target) >= 4: new_row['合同'] = parts_target[3] + new_row['总合同额'] = row.get('合同总额', '') + new_row['签署公司'] = row.get('收款账户', '') + new_row['收款情况'] = row.get('收款状态', '') + new_row['合同签订日期'] = row.get('签约日期', '') + new_row['销售员'] = row.get('负责人', '') + new_row['最终用户单位'] = row.get('客户名称', '') + new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '') + new_row['厂家'] = row.get(col_factory, '') + new_row['币种'] = row.get('货币(选完产品再改)', '') + new_row['外购'] = row.get('外购产品金额', '') + new_row['收款日期'] = row.get('最新收款日期', '') + new_row['最晚发货期'] = row.get('最晚发货期', '') + new_row['付款方式'] = row.get('付款比例及期限', '') + new_row['发货港'] = row.get('发货地', '') + new_row['目的港'] = row.get('目的港', '') + new_row['买方单位'] = row.get('合同买方(名称/联系人/电话/邮箱)', '') + return pd.Series(new_row) + + def process_row_detail(self, row, col_factory): + new_row = {col: "" for col in self.columns_detail} + detail_manuf_val = str(row.get(col_factory, '')) + order_no_raw = str(row.get('合同订单编号', '')) + new_row['合同编号'] = order_no_raw.split(' ')[0] if order_no_raw else "" + new_row['销售员'] = row.get('负责人', '') + new_row['厂家'] = detail_manuf_val + new_row['货号'] = row.get('产品编码', '') + new_row['数量'] = row.get('数量', '') + new_row['币种'] = row.get('原币种', '') + new_row['单位'] = "" + new_row['折扣率'] = "" + target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) + parts_target = target_raw.split('/') + new_row['合同标的'] = parts_target[0] if len(parts_target) >= 1 else "" + val_product_subtotal = self.safe_float(row.get('产品小计', 0)) + + if '外购' in detail_manuf_val: + new_row['外购'] = val_product_subtotal + new_row['产品描述'] = row.get('备注', '') + new_row['报价单价'] = "" + new_row['报价总价'] = "" + new_row['销售单价'] = "" + new_row['销售总价'] = "" + else: + new_row['外购'] = "" + new_row['产品描述'] = row.get('产品名称', '') + new_row['报价单价'] = row.get('美元报价', '') + new_row['报价总价'] = row.get('产品小计', '') + new_row['销售单价'] = "" + new_row['销售总价'] = "" + + new_row['合同币种/美元'] = row.get('汇率', '') + new_row['外购转美元'] = "" + new_row['报价总价美元'] = "" + new_row['净合同额美元'] = "" + return pd.Series(new_row) + + def process_row_other(self, row): + new_row = {col: "" for col in self.columns_other} + order_no_raw = str(row.get('合同订单编号', '')) + parts_no = order_no_raw.split(' ') + new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw + new_row['内贸合同号'] = parts_no[1] if len(parts_no) > 1 else "" + target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', '')) + parts_target = target_raw.split('/') + if len(parts_target) >= 1: new_row['合同标的'] = parts_target[0] + new_row['合同总额'] = row.get('合同总额', '') + new_row['签署公司'] = row.get('收款账户', '') + new_row['收款情况'] = row.get('收款状态', '') + new_row['签订日期'] = row.get('签约日期', '') + new_row['销售员'] = row.get('负责人', '') + new_row['最终用户单位'] = row.get('客户名称', '') + new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '') + new_row['买方单位'] = row.get('合同买方(名称/联系人/电话/邮箱)', '') + new_row['收款日期'] = row.get('最新收款日期', '') + return pd.Series(new_row) + + def merge_datasets(self, old_dfs, csv_df, is_asd): + col_gen = '厂家' + col_det = '厂家.1' if '厂家.1' in csv_df.columns else '厂家' + if is_asd: + df_subset = csv_df[csv_df[col_gen].str.contains('ASD', case=False, na=False)] + else: + df_subset = csv_df[~csv_df[col_gen].str.contains('ASD', case=False, na=False)] + + csv_foreign = df_subset[df_subset['合同类型'] == '外贸'].copy() + csv_domestic = df_subset[df_subset['合同类型'] == '内贸'].copy() + csv_other = df_subset[~df_subset['合同类型'].isin(['外贸', '内贸'])].copy() + result_dfs = {} + + def is_row_different(old_row, new_row, columns): + for col in columns: + if col == '_status': continue + v1 = old_row.get(col) + v2 = new_row.get(col) + if self.normalize_for_compare(v1) != self.normalize_for_compare(v2): + return True + return False + + def merge_logic(old_df, new_rows_df, unique_col, sheet_type='general'): + if old_df is None or old_df.empty: + if new_rows_df.empty: return pd.DataFrame() + combined = new_rows_df.copy() + combined['_status'] = 'new' + return combined + + combined = old_df.copy() + # 明细表填充逻辑修复 + if sheet_type == 'detail' and '合同标的' in combined.columns and '合同编号' in combined.columns: + combined['合同标的'] = combined['合同标的'].replace(r'^\s*$', np.nan, regex=True) + combined['合同标的'] = combined.groupby('合同编号')['合同标的'].ffill() + combined['合同标的'] = combined['合同标的'].fillna("") + + if '_status' not in combined.columns: + combined['_status'] = '' + + if new_rows_df.empty: + return combined + + new_contract_ids = new_rows_df[unique_col].unique() + for cid in new_contract_ids: + new_subset = new_rows_df[new_rows_df[unique_col] == cid].copy() + old_indices = combined[combined[unique_col] == cid].index + + if len(old_indices) > 0: + old_subset = combined.loc[old_indices] + has_changed = False + if len(old_subset) != len(new_subset): + has_changed = True + else: + old_comp = old_subset.reset_index(drop=True) + new_comp = new_subset.reset_index(drop=True) + cols = [c for c in new_subset.columns if c != '_status'] + for i in range(len(old_comp)): + if is_row_different(old_comp.iloc[i], new_comp.iloc[i], cols): + has_changed = True + break + if has_changed: + combined.drop(old_indices, inplace=True) + new_subset['_status'] = 'modified' + combined = pd.concat([combined, new_subset], ignore_index=True) + else: + combined.drop(old_indices, inplace=True) + new_subset['_status'] = '' + combined = pd.concat([combined, new_subset], ignore_index=True) + else: + new_subset['_status'] = 'new' + combined = pd.concat([combined, new_subset], ignore_index=True) + return combined + + if not csv_foreign.empty: + new_gen = csv_foreign.apply(lambda r: self.process_row_general(r, '外贸', col_gen), axis=1) + new_gen = new_gen.drop_duplicates(subset=['合同编号'], keep='first') + else: + new_gen = pd.DataFrame(columns=self.columns_general) + old_gen = old_dfs.get('外贸总表', pd.DataFrame(columns=self.columns_general)) + result_dfs['外贸总表'] = merge_logic(old_gen, new_gen, '合同编号', 'general') + + if not csv_foreign.empty: + new_det = csv_foreign.apply(lambda r: self.process_row_detail(r, col_det), axis=1) + else: + new_det = pd.DataFrame(columns=self.columns_detail) + old_det = old_dfs.get('外贸明细', pd.DataFrame(columns=self.columns_detail)) + result_dfs['外贸明细'] = merge_logic(old_det, new_det, '合同编号', 'detail') + + if not csv_domestic.empty: + new_dom_gen = csv_domestic.apply(lambda r: self.process_row_general(r, '内贸', col_gen), axis=1) + new_dom_gen = new_dom_gen.drop_duplicates(subset=['合同编号'], keep='first') + else: + new_dom_gen = pd.DataFrame(columns=self.columns_domestic_general) + old_dom_gen = old_dfs.get('内贸总表', pd.DataFrame(columns=self.columns_domestic_general)) + result_dfs['内贸总表'] = merge_logic(old_dom_gen, new_dom_gen, '合同编号', 'general') + + if not csv_domestic.empty: + new_dom_det = csv_domestic.apply(lambda r: self.process_row_detail(r, col_det), axis=1) + else: + new_dom_det = pd.DataFrame(columns=self.columns_detail) + old_dom_det = old_dfs.get('内贸明细', pd.DataFrame(columns=self.columns_detail)) + result_dfs['内贸明细'] = merge_logic(old_dom_det, new_dom_det, '合同编号', 'detail') + + if not csv_other.empty: + new_other = csv_other.apply(lambda r: self.process_row_other(r), axis=1) + new_other = new_other.drop_duplicates(subset=['合同编号'], keep='first') + else: + new_other = pd.DataFrame(columns=self.columns_other) + old_other = old_dfs.get('其他', pd.DataFrame(columns=self.columns_other)) + result_dfs['其他'] = merge_logic(old_other, new_other, '合同编号', 'general') + + return result_dfs + + +# ========================================== +# 第二部分:GUI 界面 (美化版) +# ========================================== + +class ContractApp: + def __init__(self, root): + self.root = root + self.root.title("合同数据处理系统 V2.0") + self.root.geometry("1300x850") + + # === 样式配置 === + self.style = ttk.Style() + self.style.theme_use('clam') # 使用 clam 主题作为基础,更易定制 + + # 颜色定义 + self.colors = { + 'bg': '#F5F6FA', # 整体背景灰白 + 'primary': '#409EFF', # 主色蓝 + 'success': '#67C23A', # 成功绿 + 'warning': '#E6A23C', # 警告黄 + 'text': '#2C3E50', # 文字深灰 + 'panel': '#FFFFFF' # 面板白 + } + + self.root.configure(bg=self.colors['bg']) + + # 配置字体和通用控件样式 + self.default_font = ("微软雅黑", 10) + self.header_font = ("微软雅黑", 11, "bold") + + self.style.configure("TFrame", background=self.colors['bg']) + self.style.configure("Panel.TFrame", background=self.colors['panel'], relief="flat") + self.style.configure("TLabel", background=self.colors['panel'], foreground=self.colors['text'], + font=self.default_font) + self.style.configure("Header.TLabel", font=("微软雅黑", 16, "bold"), background=self.colors['bg'], + foreground=self.colors['text']) + + # 按钮样式 + self.style.configure("TButton", font=("微软雅黑", 10), borderwidth=0, padding=6) + self.style.map("TButton", background=[('active', '#E0E0E0')]) + + # 主要按钮 (Primary) + self.style.configure("Primary.TButton", background=self.colors['primary'], foreground='white') + self.style.map("Primary.TButton", background=[('active', '#66B1FF')]) + + # 成功按钮 (Success) + self.style.configure("Success.TButton", background=self.colors['success'], foreground='white') + self.style.map("Success.TButton", background=[('active', '#85CE61')]) + + # 表格样式 (Treeview) + self.style.configure("Treeview", + background="white", + foreground="black", + fieldbackground="white", + rowheight=28, # 增加行高 + font=("微软雅黑", 9)) + self.style.configure("Treeview.Heading", + font=("微软雅黑", 10, "bold"), + background="#EBEEF5", + foreground="#606266") + self.style.map("Treeview", background=[('selected', '#409EFF')]) + + # 逻辑处理器 + self.processor = DataProcessor() + self.csv_path = tk.StringVar() + self.asd_path = tk.StringVar() + self.non_asd_path = tk.StringVar() + + self.final_data = {} + self.create_widgets() + + def create_widgets(self): + # --- 顶部标题 --- + header_frame = ttk.Frame(self.root) + header_frame.pack(fill="x", padx=20, pady=(20, 10)) + ttk.Label(header_frame, text="📄 合同数据智能合并与处理工具", style="Header.TLabel").pack(side="left") + + # --- 文件选择区 (卡片式) --- + input_panel = ttk.Frame(self.root, style="Panel.TFrame", padding=20) + input_panel.pack(fill="x", padx=20, pady=5) + + # 标题提示 + ttk.Label(input_panel, text="文件配置 (若未选择旧文件,将自动生成新文件)", font=self.header_font).grid(row=0, + column=0, + columnspan=3, + sticky="w", + pady=(0, + 15)) + + self.create_file_row(input_panel, "📂 导入 CSV 源文件:", self.csv_path, 1) + self.create_file_row(input_panel, "📘 旧 ASD Excel 文件:", self.asd_path, 2) + self.create_file_row(input_panel, "📗 旧 非ASD Excel 文件:", self.non_asd_path, 3) + + # 处理按钮 + btn_frame = ttk.Frame(input_panel, style="Panel.TFrame") + btn_frame.grid(row=4, column=0, columnspan=3, pady=(15, 0), sticky="e") + + ttk.Button(btn_frame, text="▶ 开始处理并预览", style="Primary.TButton", command=self.process_files).pack( + side="right") + + # --- 数据展示区 --- + self.notebook = ttk.Notebook(self.root) + self.notebook.pack(fill="both", expand=True, padx=20, pady=10) + + # --- 底部操作栏 --- + bottom_bar = ttk.Frame(self.root, style="Panel.TFrame", padding=15) + bottom_bar.pack(fill="x", padx=20, pady=(0, 20)) + + # 图例 + legend_frame = ttk.Frame(bottom_bar, style="Panel.TFrame") + legend_frame.pack(side="left") + self.create_legend(legend_frame, "■ 新增数据", "#FFFFCC", "black") + self.create_legend(legend_frame, "■ 有修改/变动", "#ECF5FF", "#409EFF") + self.create_legend(legend_frame, "□ 无变动", "white", "black") + + ttk.Button(bottom_bar, text="💾 保存更改至 Excel", style="Success.TButton", command=self.save_files).pack( + side="right") + + def create_file_row(self, parent, label_text, var, row_idx): + ttk.Label(parent, text=label_text, width=20).grid(row=row_idx, column=0, sticky="w", pady=5) + entry = ttk.Entry(parent, textvariable=var, font=("微软雅黑", 9)) + entry.grid(row=row_idx, column=1, sticky="ew", padx=10, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_file(var)).grid(row=row_idx, column=2, padx=5) + parent.columnconfigure(1, weight=1) + + def create_legend(self, parent, text, bg_color, fg_color): + lbl = tk.Label(parent, text=text, bg=bg_color, fg=fg_color, font=("微软雅黑", 9), padx=8, pady=3, borderwidth=1, + relief="solid") + lbl.pack(side="left", padx=5) + + def browse_file(self, variable): + f = filedialog.askopenfilename(filetypes=[("Excel/CSV Files", "*.csv;*.xlsx")]) + if f: variable.set(f) + + def process_files(self): + if not self.csv_path.get(): + messagebox.showerror("提示", "请先选择 CSV 源文件!") + return + + csv_df, headers = self.processor.load_csv(self.csv_path.get()) + if csv_df is None: + messagebox.showerror("错误", headers) + return + + self.final_data = {} + + # ASD 处理 + path_asd = self.asd_path.get() + asd_old = pd.read_excel(path_asd, sheet_name=None) if path_asd and os.path.exists(path_asd) else {} + self.final_data['ASD'] = self.processor.merge_datasets(asd_old, csv_df, True) + + # 非ASD 处理 + path_non = self.non_asd_path.get() + non_old = pd.read_excel(path_non, sheet_name=None) if path_non and os.path.exists(path_non) else {} + self.final_data['NonASD'] = self.processor.merge_datasets(non_old, csv_df, False) + + self.refresh_preview() + messagebox.showinfo("完成", "数据处理完成!\n请查看预览,确认无误后点击下方保存。") + + def refresh_preview(self): + for tab in self.notebook.tabs(): + self.notebook.forget(tab) + + for file_type in ['ASD', 'NonASD']: + if file_type not in self.final_data: continue + + data_dict = self.final_data[file_type] + main_frame = ttk.Frame(self.notebook, style="Panel.TFrame") + self.notebook.add(main_frame, text=f" {file_type} 文件预览 ") + + inner_notebook = ttk.Notebook(main_frame) + inner_notebook.pack(fill="both", expand=True, padx=5, pady=5) + + sheet_order = ['外贸总表', '外贸明细', '内贸总表', '内贸明细', '其他'] + for sheet_name in sheet_order: + if sheet_name in data_dict: + df = data_dict[sheet_name] + if not df.empty and '合同编号' in df.columns: + df = df.sort_values(by='合同编号', ascending=True) + if '明细' in sheet_name: + mask = df.duplicated(subset=['合同编号'], keep='first') + df.loc[mask, '合同标的'] = "" + self.create_treeview(inner_notebook, df, sheet_name) + + def create_treeview(self, parent, df, title): + frame = ttk.Frame(parent) + parent.add(frame, text=title) + + # 滚动条容器 + scroll_y = ttk.Scrollbar(frame, orient="vertical") + scroll_x = ttk.Scrollbar(frame, orient="horizontal") + + display_cols = [c for c in df.columns if c != '_status'] + + tree = ttk.Treeview(frame, columns=display_cols, show='headings', + yscrollcommand=scroll_y.set, xscrollcommand=scroll_x.set) + + scroll_y.config(command=tree.yview) + scroll_x.config(command=tree.xview) + + scroll_y.pack(side="right", fill="y") + scroll_x.pack(side="bottom", fill="x") + tree.pack(fill="both", expand=True) + + for col in display_cols: + tree.heading(col, text=col) + tree.column(col, width=130, anchor="center") # 居中对齐 + + # 颜色标签 + tree.tag_configure('new', background='#FFFFCC') # 浅黄 + # 使用淡蓝色标记有修改的行 + tree.tag_configure('modified', background='#ECF5FF', foreground='#409EFF') + + if not df.empty: + df_display = df.fillna("") + for idx, row in df_display.iterrows(): + values = [row[c] for c in display_cols] + status = row.get('_status', '') + tree.insert("", "end", values=values, tags=(status,)) + + tree.bind("", lambda event: self.on_double_click(event, tree, df)) + + def on_double_click(self, event, tree, df): + region = tree.identify("region", event.x, event.y) + if region != "cell": return + column = tree.identify_column(event.x) + row_id = tree.identify_row(event.y) + col_idx = int(column.replace('#', '')) - 1 + col_name = tree['columns'][col_idx] + current_val = tree.item(row_id, "values")[col_idx] + + new_val = simpledialog.askstring("快速编辑", f"修改 [{col_name}]:", initialvalue=current_val, parent=self.root) + if new_val is not None: + current_values = list(tree.item(row_id, "values")) + current_values[col_idx] = new_val + tree.item(row_id, values=current_values) + + def save_files(self): + if not self.final_data: + return + + base_dir = os.path.dirname(self.csv_path.get()) if self.csv_path.get() else "" + + try: + for file_type, sheets in self.final_data.items(): + target_path = "" + if file_type == 'ASD': + target_path = self.asd_path.get() + if not target_path: target_path = os.path.join(base_dir, "ASD_Combined.xlsx") + elif file_type == 'NonASD': + target_path = self.non_asd_path.get() + if not target_path: target_path = os.path.join(base_dir, "NonASD_Combined.xlsx") + + with pd.ExcelWriter(target_path, engine='openpyxl') as writer: + for sheet_name, df in sheets.items(): + save_df = df.drop(columns=['_status'], errors='ignore') + if not save_df.empty and '合同编号' in save_df.columns: + save_df = save_df.sort_values(by='合同编号', ascending=True) + if '明细' in sheet_name: + mask = save_df.duplicated(subset=['合同编号'], keep='first') + save_df.loc[mask, '合同标的'] = "" + save_df.to_excel(writer, sheet_name=sheet_name, index=False) + + messagebox.showinfo("成功", f"文件保存成功!\n位置: {base_dir or '当前目录'}") + + except Exception as e: + messagebox.showerror("保存失败", str(e)) + + +if __name__ == "__main__": + root = tk.Tk() + app = ContractApp(root) + root.mainloop() \ No newline at end of file