Files
Contract-document-crawling-…/双表合并.py

291 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import os
def process_contracts(file_path):
print(f"正在读取并处理文件: {file_path} ...")
# --- 1. 读取CSV文件 (容错处理) ---
df = None
encodings = ['utf-8', 'gbk', 'gb18030']
for enc in encodings:
try:
df = pd.read_csv(file_path, encoding=enc)
break
except UnicodeDecodeError:
continue
if df is None:
try:
print("注意: 标准编码读取失败,正在尝试忽略乱码强制读取...")
df = pd.read_csv(file_path, encoding='gb18030', encoding_errors='replace')
except Exception as e:
print(f"读取文件彻底失败: {e}")
return
# --- 2. 确认厂家列名 ---
col_factory_general = '厂家'
col_factory_detail = '厂家.1'
if col_factory_detail not in df.columns:
print("警告: 未检测到第二个'厂家'列,明细表将被迫使用第一个'厂家'列。")
col_factory_detail = '厂家'
else:
print(f"厂家列识别成功:总表使用 '{col_factory_general}',明细表使用 '{col_factory_detail}'")
# --- 3. 定义表头 ---
# 3.1 外贸/内贸 总表表头
columns_general = [
"合同编号", "签署公司", "外贸合同号", "收款情况", "合同签订日期",
"销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地",
"厂家", "型号/货号", "合同标的", "数量", "单位", "币种", "折扣率",
"合同", "总合同额", "外购", "已收款", "未收款", "收款日期",
"最晚发货期", "付款方式", "发货港", "目的港", "发货日期",
"买方单位", "买方信息联系人、电话、邮箱", "收货人信息"
]
columns_domestic_general = [c if c != "外贸合同号" else "内贸合同号" for c in columns_general]
# 3.2 明细表表头
columns_detail = [
"合同编号", "销售员", "厂家", "合同标的", "货号", "产品描述", "数量", "单位",
"币种", "报价单价", "报价总价", "销售单价", "销售总价", "折扣率",
"外购", "合同币种/美元", "外购转美元", "报价总价美元", "净合同额美元"
]
# 3.3 其他表表头
columns_other = [
"合同编号", "签署公司", "内贸合同号", "收款情况", "签订日期",
"销售员", "最终用户单位", "最终用户信息联系人、电话、邮箱", "最终用户所在地",
"买方单位", "买方信息联系人、电话、邮箱", "合同标的", "合同总额",
"已收款", "未收款", "收款日期"
]
# --- 4. 辅助函数:安全转数字 ---
def safe_float(val):
try:
if isinstance(val, str):
val = val.replace(',', '').strip()
if val == '': return 0.0
return float(val)
except (ValueError, TypeError):
return 0.0
# --- 5. 数据转换逻辑 ---
# 5.1 外贸/内贸 总表转换逻辑
def transform_general_row(row, trade_type):
target_cols = columns_general if trade_type == '外贸' else columns_domestic_general
new_row = {col: "" for col in target_cols}
# 拆分合同号
order_no_raw = str(row.get('合同订单编号', ''))
parts_no = order_no_raw.split(' ')
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
contract_no_col = '外贸合同号' if trade_type == '外贸' else '内贸合同号'
new_row[contract_no_col] = parts_no[1] if len(parts_no) > 1 else ""
# 拆分合同标的 (不再从这里取总价)
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parts_target = target_raw.split('/')
if len(parts_target) >= 1: new_row['合同标的'] = parts_target[0]
if len(parts_target) >= 2: new_row['型号/货号'] = parts_target[1]
if len(parts_target) >= 3: new_row['数量'] = parts_target[2]
if len(parts_target) >= 4: new_row['合同'] = parts_target[3] # 单价
# 【修改点】总合同额:直接读取 CSV 中的“合同总额”列
new_row['总合同额'] = row.get('合同总额', '')
# 映射其他字段
new_row['签署公司'] = row.get('收款账户', '')
new_row['收款情况'] = row.get('收款状态', '')
new_row['合同签订日期'] = row.get('签约日期', '')
new_row['销售员'] = row.get('负责人', '')
new_row['最终用户单位'] = row.get('客户名称', '')
new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '')
new_row['厂家'] = row.get(col_factory_general, '')
new_row['币种'] = row.get('货币(选完产品再改)', '')
new_row['外购'] = row.get('外购产品金额', '')
new_row['收款日期'] = row.get('最新收款日期', '')
new_row['最晚发货期'] = row.get('最晚发货期', '')
new_row['付款方式'] = row.get('付款比例及期限', '')
new_row['发货港'] = row.get('发货地', '')
new_row['目的港'] = row.get('目的港', '')
new_row['买方单位'] = row.get('合同买方(名称/联系人/电话/邮箱)', '')
return pd.Series(new_row)
# 5.2 明细表转换逻辑
def transform_detail_row(row):
new_row = {col: "" for col in columns_detail}
detail_manuf_val = str(row.get(col_factory_detail, ''))
order_no_raw = str(row.get('合同订单编号', ''))
new_row['合同编号'] = order_no_raw.split(' ')[0] if order_no_raw else ""
new_row['销售员'] = row.get('负责人', '')
new_row['厂家'] = detail_manuf_val
new_row['货号'] = row.get('产品编码', '')
new_row['数量'] = row.get('数量', '')
new_row['单位'] = ""
new_row['币种'] = row.get('原币种', '')
new_row['折扣率'] = ""
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parts_target = target_raw.split('/')
new_row['合同标的'] = parts_target[0] if len(parts_target) >= 1 else ""
val_outsourcing_raw = safe_float(row.get('外购产品金额', 0))
val_rate = safe_float(row.get('汇率', 1))
if val_rate == 0: val_rate = 1
raw_price_unit = row.get('美元报价', '')
raw_price_total = row.get('产品小计', '')
if '外购' in detail_manuf_val:
new_row['外购'] = val_outsourcing_raw
new_row['产品描述'] = row.get('备注', '')
new_row['报价单价'] = ""
new_row['报价总价'] = ""
new_row['销售单价'] = ""
new_row['销售总价'] = ""
current_outsourcing_cost = val_outsourcing_raw
else:
new_row['外购'] = ""
new_row['产品描述'] = row.get('产品名称', '')
new_row['报价单价'] = raw_price_unit
new_row['报价总价'] = raw_price_total
new_row['销售单价'] = ""
new_row['销售总价'] = ""
current_outsourcing_cost = 0
new_row['合同币种/美元'] = ""
if current_outsourcing_cost > 0:
new_row['外购转美元'] = round(current_outsourcing_cost / val_rate, 2)
else:
new_row['外购转美元'] = ""
new_row['报价总价美元'] = ""
new_row['净合同额美元'] = ""
return pd.Series(new_row)
# 5.3 其他表转换逻辑
def transform_other_row(row):
new_row = {col: "" for col in columns_other}
# 拆分合同号
order_no_raw = str(row.get('合同订单编号', ''))
parts_no = order_no_raw.split(' ')
new_row['合同编号'] = parts_no[0] if len(parts_no) > 0 else order_no_raw
new_row['内贸合同号'] = parts_no[1] if len(parts_no) > 1 else ""
# 合同标的 (取第一部分)
target_raw = str(row.get('合同标的(品名/型号/数量/单价/总价)', ''))
parts_target = target_raw.split('/')
if len(parts_target) >= 1:
new_row['合同标的'] = parts_target[0]
# 【修改点】合同总额直接读取源CSV的“合同总额”列
new_row['合同总额'] = row.get('合同总额', '')
# 映射其他字段
new_row['签署公司'] = row.get('收款账户', '')
new_row['收款情况'] = row.get('收款状态', '')
new_row['签订日期'] = row.get('签约日期', '')
new_row['销售员'] = row.get('负责人', '')
new_row['最终用户单位'] = row.get('客户名称', '')
new_row['最终用户信息联系人、电话、邮箱'] = row.get('联系人姓名', '')
new_row['买方单位'] = row.get('合同买方(名称/联系人/电话/邮箱)', '')
new_row['收款日期'] = row.get('最新收款日期', '')
return pd.Series(new_row)
# --- 6. 主处理流程 ---
df[col_factory_general] = df[col_factory_general].fillna('').astype(str)
df['合同类型'] = df['合同类型'].fillna('').astype(str)
# 文件拆分逻辑
df_asd = df[df[col_factory_general].str.contains('ASD', case=False, na=False)]
df_non_asd = df[~df[col_factory_general].str.contains('ASD', case=False, na=False)]
def create_excel(dataframe, filename):
raw_foreign = dataframe[dataframe['合同类型'] == '外贸'].copy()
raw_domestic = dataframe[dataframe['合同类型'] == '内贸'].copy()
raw_other = dataframe[~dataframe['合同类型'].isin(['外贸', '内贸'])].copy()
# === 1. 生成外贸数据 ===
if not raw_foreign.empty:
df_gen = raw_foreign.apply(lambda row: transform_general_row(row, '外贸'), axis=1)
df_gen = df_gen[columns_general]
df_gen_unique = df_gen.drop_duplicates(subset=['合同编号'], keep='first')
df_gen_unique = df_gen_unique.sort_values(by='合同编号', ascending=True)
df_det = raw_foreign.apply(lambda row: transform_detail_row(row), axis=1)
df_det = df_det[columns_detail]
df_det = df_det.sort_values(by='合同编号', ascending=True)
mask_duplicates = df_det.duplicated(subset=['合同编号'], keep='first')
df_det.loc[mask_duplicates, '合同标的'] = ""
else:
df_gen_unique = pd.DataFrame(columns=columns_general)
df_det = pd.DataFrame(columns=columns_detail)
# === 2. 生成内贸数据 ===
if not raw_domestic.empty:
df_dom_gen = raw_domestic.apply(lambda row: transform_general_row(row, '内贸'), axis=1)
df_dom_gen = df_dom_gen[columns_domestic_general]
df_dom_gen_unique = df_dom_gen.drop_duplicates(subset=['合同编号'], keep='first')
df_dom_gen_unique = df_dom_gen_unique.sort_values(by='合同编号', ascending=True)
df_dom_det = raw_domestic.apply(lambda row: transform_detail_row(row), axis=1)
df_dom_det = df_dom_det[columns_detail]
df_dom_det = df_dom_det.sort_values(by='合同编号', ascending=True)
mask_duplicates_dom = df_dom_det.duplicated(subset=['合同编号'], keep='first')
df_dom_det.loc[mask_duplicates_dom, '合同标的'] = ""
else:
df_dom_gen_unique = pd.DataFrame(columns=columns_domestic_general)
df_dom_det = pd.DataFrame(columns=columns_detail)
# === 3. 生成其他数据 ===
if not raw_other.empty:
df_other = raw_other.apply(lambda row: transform_other_row(row), axis=1)
df_other = df_other[columns_other]
# 去重
df_other_unique = df_other.drop_duplicates(subset=['合同编号'], keep='first')
# 排序
df_other_unique = df_other_unique.sort_values(by='合同编号', ascending=True)
else:
df_other_unique = pd.DataFrame(columns=columns_other)
# === 4. 写入 Excel ===
try:
print(f"[{filename}] 正在写入Excel...")
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
df_gen_unique.to_excel(writer, sheet_name='外贸总表', index=False)
df_det.to_excel(writer, sheet_name='外贸明细', index=False)
df_dom_gen_unique.to_excel(writer, sheet_name='内贸总表', index=False)
df_dom_det.to_excel(writer, sheet_name='内贸明细', index=False)
df_other_unique.to_excel(writer, sheet_name='其他', index=False)
print(f"成功生成文件: {filename}")
except Exception as e:
print(f"生成 {filename} 时发生错误: {e}")
# 执行生成
print("-" * 40)
create_excel(df_asd, 'ASD.xlsx')
print("-" * 40)
create_excel(df_non_asd, '非ASD.xlsx')
print("-" * 40)
print("全部处理完成!")
# --- 运行入口 ---
if __name__ == "__main__":
csv_file = 'test.csv'
if os.path.exists(csv_file):
process_contracts(csv_file)
else:
print(f"找不到文件: {csv_file},请检查路径。")