Files
WQ_GUI/src/core/modeling/modeling_batch.py

1137 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import pandas as pd
import joblib
import os
from pathlib import Path
from typing import List, Dict, Union, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')
# 机器学习模型导入 - 改为回归模型
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.model_selection import GridSearchCV, cross_val_score, KFold, train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.cross_decomposition import PLSRegression
from sklearn.ensemble import GradientBoostingRegressor, AdaBoostRegressor, ExtraTreesRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.neural_network import MLPRegressor
from joblib import parallel_backend
# 第三方模型导入
# try:
# import lightgbm as lgb
# LGB_AVAILABLE = True
# except ImportError:
# LGB_AVAILABLE = False
LGB_AVAILABLE = False # 注释掉lightgbm
# try:
# import catboost as cb
# CB_AVAILABLE = True
# except ImportError:
# CB_AVAILABLE = False
CB_AVAILABLE = False # 注释掉catboost
# 导入预处理模块
# 动态导入预处理模块
import sys
import os
from src.preprocessing.spectral_Preprocessing import Preprocessing
class WaterQualityModelingBatch:
"""水质参数反演批量建模类"""
def __init__(self, artifacts_dir: str = "models/artifacts"):
"""
初始化批量建模类
Args:
artifacts_dir: 模型保存目录
"""
self.artifacts_dir = Path(artifacts_dir)
self.artifacts_dir.mkdir(parents=True, exist_ok=True)
# 定义支持的回归模型及其参数网格
self.model_configs = {
'SVR': {
'model': SVR,
'params': {
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1],
'kernel': ['rbf', 'poly', 'sigmoid'],
'epsilon': [0.01, 0.1, 0.2]
},
'available': True
},
'RF': {
'model': RandomForestRegressor,
'params': {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
},
'available': True
},
'KNN': {
'model': KNeighborsRegressor,
'params': {
'n_neighbors': [3, 5, 7, 9, 11],
'weights': ['uniform', 'distance'],
'metric': ['euclidean', 'manhattan', 'minkowski']
},
'available': True
},
'LinearRegression': {
'model': LinearRegression,
'params': {
'fit_intercept': [True, False]
},
'available': True
},
'Ridge': {
'model': Ridge,
'params': {
'alpha': [0.01, 0.1, 1, 10, 100],
'fit_intercept': [True, False]
},
'available': True
},
'Lasso': {
'model': Lasso,
'params': {
'alpha': [0.01, 0.1, 1, 10, 100],
'fit_intercept': [True, False],
'max_iter': [1000, 2000]
},
'available': True
},
'ElasticNet': {
'model': ElasticNet,
'params': {
'alpha': [0.01, 0.1, 1, 10],
'l1_ratio': [0.1, 0.3, 0.5, 0.7, 0.9],
'fit_intercept': [True, False],
'max_iter': [1000, 2000]
},
'available': True
},
'XGBoost': {
'model': None, # xgboost is removed, so set to None
'params': {
'n_estimators': [50, 100, 200],
'max_depth': [3, 6, 9],
'learning_rate': [0.01, 0.1, 0.2],
'subsample': [0.8, 0.9, 1.0]
},
'available': False
},
'LightGBM': {
'model': lgb.LGBMRegressor if LGB_AVAILABLE else None,
'params': {
'n_estimators': [50, 100, 200],
'max_depth': [3, 6, 9],
'learning_rate': [0.01, 0.1, 0.2],
'num_leaves': [31, 50, 100]
},
'available': LGB_AVAILABLE
},
'CatBoost': {
'model': cb.CatBoostRegressor if CB_AVAILABLE else None,
'params': {
'iterations': [50, 100, 200],
'depth': [3, 6, 9],
'learning_rate': [0.01, 0.1, 0.2],
'l2_leaf_reg': [1, 3, 5]
},
'available': CB_AVAILABLE
},
'PLS': {
'model': PLSRegression,
'params': {
'n_components': [2, 3, 5, 7, 10]
},
'available': True
},
'GradientBoosting': {
'model': GradientBoostingRegressor,
'params': {
'n_estimators': [50, 100, 200],
'learning_rate': [0.01, 0.1, 0.2],
'max_depth': [3, 5, 7],
'subsample': [0.8, 0.9, 1.0],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
},
'available': True
},
'AdaBoost': {
'model': AdaBoostRegressor,
'params': {
'n_estimators': [50, 100, 200],
'learning_rate': [0.01, 0.1, 0.2],
'loss': ['linear', 'square', 'exponential']
},
'available': True
},
'DecisionTree': {
'model': DecisionTreeRegressor,
'params': {
'max_depth': [None, 5, 10, 20, 30],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['auto', 'sqrt', 'log2']
},
'available': True
},
'MLP': {
'model': MLPRegressor,
'params': {
'hidden_layer_sizes': [(50,), (100,), (50, 50), (100, 50)],
'activation': ['relu', 'tanh', 'logistic'],
'solver': ['adam', 'sgd'],
'alpha': [0.0001, 0.001, 0.01],
'learning_rate': ['constant', 'invscaling', 'adaptive'],
'max_iter': [1000, 2000]
},
'available': True
},
'ExtraTrees': {
'model': ExtraTreesRegressor,
'params': {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['auto', 'sqrt', 'log2']
},
'available': True
}
}
# 预处理方法列表
self.preprocessing_methods = [
"None", "MMS", "SS", "CT", "SNV", "MA", "SG", "MSC", "D1", "D2", "DT", "WVAE"
]
# 样本划分方法列表
self.split_methods = ["random", "spxy", "ks"]
self.results = {}
self.best_models = {}
def load_data_batch(self, csv_path: str, feature_start_column: Union[int, str]) -> Tuple[pd.DataFrame, Dict[str, pd.Series]]:
"""
批量加载CSV数据将指定列之前的列作为目标值
Args:
csv_path: CSV文件路径
feature_start_column: 特征开始列索引int或列名str
Returns:
X: 特征数据
y_dict: 目标值数据字典,键为列名
"""
# 读取CSV数据处理空字符串和缺失值
try:
data = pd.read_csv(csv_path, na_values=['', ' ', 'NaN', 'nan', 'NULL', 'null'])
except pd.errors.EmptyDataError:
raise ValueError(f"CSV文件 '{csv_path}' 为空或不存在")
except Exception as e:
raise ValueError(f"读取CSV文件 '{csv_path}' 时出错: {e}")
# 检查并清理数据中的空字符串和其他无效值
print("数据清理...")
original_shape = data.shape
# 将空字符串替换为NaN
data = data.replace(r'^\s*$', np.nan, regex=True)
# 对于数值列将无法转换为数字的字符串替换为NaN
for col in data.columns:
try:
# 尝试将列转换为数值类型
data[col] = pd.to_numeric(data[col], errors='coerce')
except Exception:
# 如果转换失败,保持原样(可能是字符串列)
pass
cleaned_shape = data.shape
if cleaned_shape != original_shape:
print(f"数据清理完成: {original_shape[0]}{original_shape[1]}列 -> {cleaned_shape[0]}{cleaned_shape[1]}")
print(f"数据加载完成,总列数: {data.shape[1]}")
print(f"所有列名: {list(data.columns)}")
# 如果feature_start_column是列名转换为索引
if isinstance(feature_start_column, str):
if feature_start_column not in data.columns:
raise ValueError(f"指定的特征开始列 '{feature_start_column}' 不存在于数据中")
feature_start_index = data.columns.get_loc(feature_start_column)
print(f"特征开始列 '{feature_start_column}' 对应索引: {feature_start_index}")
else:
feature_start_index = feature_start_column
print(f"特征开始列索引: {feature_start_index}")
# 提取特征数据从feature_start_index开始
X = data.iloc[:, feature_start_index:]
# 提取所有目标列从0列到feature_start_index-1列
y_dict = {}
target_columns = data.columns[:feature_start_index]
print(f"检测到的目标列: {list(target_columns)}")
for col_name in target_columns:
y_series = data[col_name]
# 检查是否有非空值
if not y_series.isna().all():
y_dict[col_name] = y_series
print(f" 目标列 '{col_name}': {y_series.count()} 个非空值, 范围: {y_series.min():.4f} ~ {y_series.max():.4f}")
else:
print(f" 跳过目标列 '{col_name}': 所有值为空")
print(f"特征数据形状: {X.shape}")
print(f"有效目标列数量: {len(y_dict)}")
return X, y_dict
def load_data_single(self, csv_path: str, target_column_name: str, feature_start_column: Union[int, str]) -> Tuple[pd.DataFrame, pd.Series]:
"""
加载单个目标列的CSV数据
Args:
csv_path: CSV文件路径
target_column_name: 目标列名
feature_start_column: 特征开始列索引int或列名str
Returns:
X: 特征数据
y: 目标值数据
"""
data = pd.read_csv(csv_path)
# 检查目标列是否存在
if target_column_name not in data.columns:
raise ValueError(f"目标列 '{target_column_name}' 不存在于数据中")
# 如果feature_start_column是列名转换为索引
if isinstance(feature_start_column, str):
if feature_start_column not in data.columns:
raise ValueError(f"指定的特征开始列 '{feature_start_column}' 不存在于数据中")
feature_start_index = data.columns.get_loc(feature_start_column)
else:
feature_start_index = feature_start_column
# 提取目标值和特征
y = data[target_column_name]
X = data.iloc[:, feature_start_index:]
# 去除y值为空的行
mask = ~y.isna()
data_cleaned = data[mask]
# 重新定义y和X去除对应的空值行
y = data_cleaned[target_column_name]
X = data_cleaned.iloc[:, feature_start_column:]
print(f"目标列 '{target_column_name}' 数据加载完成:")
print(f" 样本数量: {X.shape[0]}")
print(f" 特征数量: {X.shape[1]}")
print(f" 目标值范围: {y.min():.4f} ~ {y.max():.4f}")
print(f" 目标值均值: {y.mean():.4f}")
return X, y
def preprocess_data(self, X: pd.DataFrame, method: str) -> np.ndarray:
"""
数据预处理
Args:
X: 原始特征数据
method: 预处理方法
Returns:
预处理后的数据
"""
print(f"应用预处理方法: {method}")
# 如果方法为None直接返回原始数据
if method == "None" or method is None:
print("跳过预处理,使用原始数据")
return X.values
try:
X_processed = Preprocessing(method, X)
# 确保返回的是numpy数组
if isinstance(X_processed, pd.DataFrame):
X_processed = X_processed.values
print(f"预处理完成,数据形状: {X_processed.shape}")
return X_processed
except Exception as e:
print(f"预处理失败: {e}")
print("使用原始数据")
return X.values
def random(self, data, label, test_ratio=0.2, random_state=123):
"""
随机划分数据集
Args:
data: shape (n_samples, n_features)
label: shape (n_sample, )
test_ratio: 测试集比例,默认: 0.2
random_state: 随机种子,默认: 123
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_sample, )
y_test: (n_sample, )
"""
X_train, X_test, y_train, y_test = train_test_split(
data, label, test_size=test_ratio, random_state=random_state
)
return X_train, X_test, y_train, y_test
def spxy(self, data, label, test_size=0.2):
"""
SPXY算法划分数据集考虑X和Y空间的距离
Args:
data: shape (n_samples, n_features)
label: shape (n_samples, )
test_size: 测试集比例,默认: 0.2
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_samples, )
y_test: (n_samples, )
"""
# 确保 data 和 label 是 NumPy 数组
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
# 备份原始数据和标签
x_backup = data
y_backup = label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
# 归一化标签数据
label = (label - np.mean(label)) / np.std(label)
D = np.zeros((M, M))
Dy = np.zeros((M, M))
# 计算样本之间的距离
for i in range(M - 1):
xa = data[i, :]
ya = label[i]
for j in range((i + 1), M):
xb = data[j, :]
yb = label[j]
D[i, j] = np.linalg.norm(xa - xb)
Dy[i, j] = np.linalg.norm(ya - yb)
# 距离归一化
Dmax = np.max(D)
Dymax = np.max(Dy)
D = D / Dmax + Dy / Dymax
# 找到最远的两个点
maxD = D.max(axis=0)
index_row = D.argmax(axis=0)
index_column = maxD.argmax()
m = np.zeros(N, dtype=int)
m[0] = index_row[index_column]
m[1] = index_column
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
# 根据距离选择训练集
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros(M - i)
for j in range(M - i):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(samples, m)
# 划分训练集和测试集
X_train = data[m, :]
y_train = y_backup[m]
X_test = data[m_complement, :]
y_test = y_backup[m_complement]
return X_train, X_test, y_train, y_test
def ks(self, data, label, test_size=0.2):
"""
Kennard-Stone算法划分数据集
Args:
data: shape (n_samples, n_features)
label: shape (n_sample, )
test_size: 测试集比例,默认: 0.2
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_samples, )
y_test: (n_samples, )
"""
# 确保 data 和 label 是 NumPy 数组
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
D = np.zeros((M, M))
for i in range((M - 1)):
xa = data[i, :]
for j in range((i + 1), M):
xb = data[j, :]
D[i, j] = np.linalg.norm(xa - xb)
maxD = np.max(D, axis=0)
index_row = np.argmax(D, axis=0)
index_column = np.argmax(maxD)
m = np.zeros(N)
m[0] = np.array(index_row[index_column])
m[1] = np.array(index_column)
m = m.astype(int)
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros((M - i))
for j in range((M - i)):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(np.arange(data.shape[0]), m)
X_train = data[m, :]
y_train = label[m]
X_test = data[m_complement, :]
y_test = label[m_complement]
return X_train, X_test, y_train, y_test
def split_data(self, X: np.ndarray, y: pd.Series, method: str = "random",
test_size: float = 0.2, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
根据指定方法划分数据集
Args:
X: 特征数据
y: 目标值数据
method: 划分方法 ("random", "spxy", "ks")
test_size: 测试集比例
random_state: 随机种子仅对random方法有效
Returns:
X_train, X_test, y_train, y_test
"""
print(f"使用 {method} 方法划分数据集")
if method == "random":
return self.random(X, y, test_ratio=test_size, random_state=random_state)
elif method == "spxy":
return self.spxy(X, y, test_size=test_size)
elif method == "ks":
return self.ks(X, y, test_size=test_size)
else:
raise ValueError(f"不支持的划分方法: {method}. 支持的方法: {self.split_methods}")
def train_single_model(self, X: np.ndarray, y: pd.Series, model_name: str,
cv_folds: int = 5, scoring: str = 'neg_mean_squared_error',
test_size: float = 0.2, random_state: int = 42,
split_method: str = "random") -> Dict:
"""
训练单个回归模型
Args:
X: 特征数据
y: 目标值数据
model_name: 模型名称
cv_folds: 交叉验证折数
scoring: 评分指标
test_size: 测试集比例
random_state: 随机种子
split_method: 数据划分方法
Returns:
训练结果字典
"""
if model_name not in self.model_configs:
raise ValueError(f"不支持的模型: {model_name}")
config = self.model_configs[model_name]
if not config['available']:
print(f"模型 {model_name} 不可用,请安装相应的库")
return None
print(f"开始训练模型: {model_name}")
# 使用指定方法分割训练集和测试集
X_train, X_test, y_train, y_test = self.split_data(
X, y, method=split_method, test_size=test_size, random_state=random_state
)
print(f"数据分割完成:")
print(f" 训练集样本数: {X_train.shape[0]}")
print(f" 测试集样本数: {X_test.shape[0]}")
# 创建模型实例
if callable(config['model']):
base_model = config['model']()
else:
base_model = config['model']
# 特殊处理某些模型
if model_name == 'CatBoost':
base_model.set_params(verbose=False)
elif model_name == 'LightGBM':
base_model.set_params(verbose=-1)
# 网格搜索 - 使用KFold代替StratifiedKFold
cv_strategy = KFold(n_splits=cv_folds, shuffle=True, random_state=random_state)
grid_search = GridSearchCV(
base_model,
config['params'],
cv=cv_strategy,
scoring=scoring,
n_jobs=-1,
verbose=1
)
# 在训练集上训练模型
# with parallel_backend("threading", n_jobs=-1):
# grid_search.fit(X_train, y_train)
grid_search.fit(X_train, y_train)
# 获取最佳模型
best_model = grid_search.best_estimator_
# 交叉验证评估(在训练集上)
cv_scores = cross_val_score(best_model, X_train, y_train, cv=cv_strategy, scoring=scoring)
# 计算训练集上的回归指标
y_train_pred = best_model.predict(X_train)
train_mse = mean_squared_error(y_train, y_train_pred)
train_mae = mean_absolute_error(y_train, y_train_pred)
train_r2 = r2_score(y_train, y_train_pred)
train_rmse = np.sqrt(train_mse)
# 计算测试集上的回归指标
y_test_pred = best_model.predict(X_test)
test_mse = mean_squared_error(y_test, y_test_pred)
test_mae = mean_absolute_error(y_test, y_test_pred)
test_r2 = r2_score(y_test, y_test_pred)
test_rmse = np.sqrt(test_mse)
result = {
'model': best_model,
'best_params': grid_search.best_params_,
'best_score': grid_search.best_score_,
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'cv_scores': cv_scores,
# 训练集指标
'train_mse': train_mse,
'train_mae': train_mae,
'train_rmse': train_rmse,
'train_r2': train_r2,
# 测试集指标
'test_mse': test_mse,
'test_mae': test_mae,
'test_rmse': test_rmse,
'test_r2': test_r2,
# 数据分割信息
'train_size': X_train.shape[0],
'test_size': X_test.shape[0],
'split_method': split_method
}
print(f"模型 {model_name} 训练完成:")
print(f" 最佳参数: {result['best_params']}")
print(f" 最佳得分: {result['best_score']:.4f}")
print(f" CV均值: {result['cv_mean']:.4f} ± {result['cv_std']:.4f}")
print(f" 训练集指标:")
print(f" R²: {result['train_r2']:.4f}")
print(f" RMSE: {result['train_rmse']:.4f}")
print(f" MAE: {result['train_mae']:.4f}")
print(f" 测试集指标:")
print(f" R²: {result['test_r2']:.4f}")
print(f" RMSE: {result['test_rmse']:.4f}")
print(f" MAE: {result['test_mae']:.4f}")
return result
def save_model(self, model, target_column_name: str, preprocess_method: str, model_name: str,
metadata: Dict = None):
"""
保存模型,使用目标列名作为文件名的一部分
Args:
model: 训练好的模型
target_column_name: 目标列名
preprocess_method: 预处理方法名称
model_name: 模型名称
metadata: 模型元数据
"""
# 清理目标列名,移除可能的特殊字符
safe_target_name = "".join(c for c in target_column_name if c.isalnum() or c in ('-', '_')).rstrip()
filename = f"{safe_target_name}_{preprocess_method}_{model_name}.joblib"
filepath = self.artifacts_dir / filename
# 保存模型和元数据
save_data = {
'model': model,
'target_column_name': target_column_name,
'preprocess_method': preprocess_method,
'model_name': model_name,
'metadata': metadata or {}
}
joblib.dump(save_data, filepath)
print(f"模型已保存: {filepath}")
def train_models_batch(self, csv_path: str, feature_start_column: Union[int, str],
preprocessing_methods: Union[str, List[str]] = "None",
model_names: Union[str, List[str]] = "RF",
split_methods: Union[str, List[str]] = "random",
cv_folds: int = 5,
scoring: str = 'neg_mean_squared_error',
test_size: float = 0.2,
random_state: int = 42) -> Dict:
"""
批量训练多个目标列的模型
Args:
csv_path: 数据文件路径
feature_start_column: 特征开始列索引int或列名str
preprocessing_methods: 预处理方法列表
model_names: 模型名称列表
split_methods: 数据划分方法列表
cv_folds: 交叉验证折数
scoring: 评分指标(回归指标)
test_size: 测试集比例
random_state: 随机种子
Returns:
所有模型的训练结果
"""
# 转换为列表
if isinstance(preprocessing_methods, str):
preprocessing_methods = [preprocessing_methods]
if isinstance(model_names, str):
model_names = [model_names]
if isinstance(split_methods, str):
split_methods = [split_methods]
# 加载数据
X_raw, y_dict = self.load_data_batch(csv_path, feature_start_column)
all_results = {}
# 对每个目标列进行训练
for target_column_name, y in y_dict.items():
print(f"\n{'='*80}")
print(f"开始训练目标列: {target_column_name}")
print(f"{'='*80}")
# 创建该目标列的子目录
target_artifacts_dir = self.artifacts_dir / target_column_name
target_artifacts_dir.mkdir(parents=True, exist_ok=True)
# 临时更改artifacts_dir
original_artifacts_dir = self.artifacts_dir
self.artifacts_dir = target_artifacts_dir
try:
# 去除该目标列的空值
mask = ~y.isna()
if mask.sum() == 0:
print(f"目标列 '{target_column_name}' 无有效数据,跳过")
continue
X_clean = X_raw[mask]
y_clean = y[mask]
print(f"有效样本数: {len(y_clean)}")
# 训练该目标列的所有模型组合
target_results = self.train_models_single_target(
X_clean, y_clean, target_column_name,
preprocessing_methods, model_names, split_methods,
cv_folds, scoring, test_size, random_state
)
all_results[target_column_name] = target_results
except Exception as e:
print(f"训练目标列 '{target_column_name}' 时出错: {e}")
continue
finally:
# 恢复原始artifacts_dir
self.artifacts_dir = original_artifacts_dir
# 保存所有结果的汇总
self._save_batch_results_summary(all_results)
return all_results
def train_models_single_target(self, X_raw: pd.DataFrame, y: pd.Series, target_column_name: str,
preprocessing_methods: List[str], model_names: List[str],
split_methods: List[str], cv_folds: int, scoring: str,
test_size: float, random_state: int) -> Dict:
"""
训练单个目标列的所有模型组合
"""
results = {}
# 遍历所有组合
for split_method in split_methods:
for preprocess_method in preprocessing_methods:
for model_name in model_names:
combo_key = f"{split_method}_{preprocess_method}_{model_name}"
print(f"\n{'-' * 60}")
print(f"训练组合: {combo_key}")
print(f"{'-' * 60}")
try:
# 数据预处理
X_processed = self.preprocess_data(X_raw, preprocess_method)
# 训练模型
result = self.train_single_model(X_processed, y, model_name,
cv_folds, scoring, test_size, random_state, split_method)
if result is not None:
# 保存模型
metadata = {
'target_column_name': target_column_name,
'cv_mean': result['cv_mean'],
'cv_std': result['cv_std'],
'best_params': result['best_params'],
'data_shape': X_processed.shape,
'target_range': [float(y.min()), float(y.max())],
'train_r2': result['train_r2'],
'train_rmse': result['train_rmse'],
'train_mae': result['train_mae'],
'test_r2': result['test_r2'],
'test_rmse': result['test_rmse'],
'test_mae': result['test_mae'],
'train_size': result['train_size'],
'test_size': result['test_size'],
'split_method': result['split_method']
}
self.save_model(result['model'], target_column_name,
f"{split_method}_{preprocess_method}",
model_name, metadata)
results[combo_key] = result
except Exception as e:
print(f"训练组合 {combo_key} 失败: {e}")
continue
# 保存该目标列的结果摘要
self._save_single_target_results_summary(target_column_name, results)
return results
def _save_single_target_results_summary(self, target_column_name: str, results: Dict):
"""保存单个目标列的结果摘要"""
if not results:
print(f"目标列 '{target_column_name}' 没有训练结果")
return
summary_data = []
for combo_key, result in results.items():
# 分离划分方法、预处理方法和建模方法
parts = combo_key.split('_', 2)
split_method = parts[0] if len(parts) > 0 else ''
preprocess_method = parts[1] if len(parts) > 1 else ''
model_method = parts[2] if len(parts) > 2 else ''
summary_data.append({
'划分方法': split_method,
'预处理方法': preprocess_method,
'建模方法': model_method,
'CV均值': result['cv_mean'],
'CV标准差': result['cv_std'],
'最佳得分': result['best_score'],
'训练集R²': result['train_r2'],
'训练集RMSE': result['train_rmse'],
'训练集MAE': result['train_mae'],
'训练集MSE': result['train_mse'],
'测试集R²': result['test_r2'],
'测试集RMSE': result['test_rmse'],
'测试集MAE': result['test_mae'],
'测试集MSE': result['test_mse'],
'训练样本数': result['train_size'],
'测试样本数': result['test_size'],
'最佳参数': str(result['best_params'])
})
summary_df = pd.DataFrame(summary_data)
# 按测试集R²降序排列R²越大越好
summary_df = summary_df.sort_values('测试集R²', ascending=False)
# 清理目标列名,移除可能的特殊字符
safe_target_name = "".join(c for c in target_column_name if c.isalnum() or c in ('-', '_')).rstrip()
# 保存详细结果CSV中文版
detailed_path = self.artifacts_dir / f"{safe_target_name}_detailed_results.csv"
summary_df.to_csv(detailed_path, index=False, encoding='utf-8-sig')
# 保存简化版本用于兼容性(英文版)
summary_data_simple = []
for combo_key, result in results.items():
summary_data_simple.append({
'combination': combo_key,
'cv_mean': result['cv_mean'],
'cv_std': result['cv_std'],
'best_score': result['best_score'],
'train_r2': result['train_r2'],
'train_rmse': result['train_rmse'],
'train_mae': result['train_mae'],
'test_r2': result['test_r2'],
'test_rmse': result['test_rmse'],
'test_mae': result['test_mae'],
'train_size': result['train_size'],
'test_size': result['test_size'],
'split_method': result.get('split_method', 'unknown'),
'best_params': str(result['best_params'])
})
summary_df_simple = pd.DataFrame(summary_data_simple)
summary_df_simple = summary_df_simple.sort_values('test_r2', ascending=False)
simple_summary_path = self.artifacts_dir / f"{safe_target_name}_training_summary.csv"
summary_df_simple.to_csv(simple_summary_path, index=False)
print(f"\n{'-' * 60}")
print(f"目标列 '{target_column_name}' 训练结果摘要:")
print(f"{'-' * 60}")
print(summary_df[
['划分方法', '预处理方法', '建模方法', '训练集R²', '测试集R²', '训练集RMSE', '测试集RMSE', 'CV均值']].to_string(
index=False))
print(f"\n详细结果已保存: {detailed_path}")
print(f"简化结果已保存: {simple_summary_path}")
def _save_batch_results_summary(self, all_results: Dict):
"""保存批量训练结果汇总"""
all_summary_data = []
for target_column_name, target_results in all_results.items():
for combo_key, result in target_results.items():
# 分离划分方法、预处理方法和建模方法
parts = combo_key.split('_', 2)
split_method = parts[0] if len(parts) > 0 else ''
preprocess_method = parts[1] if len(parts) > 1 else ''
model_method = parts[2] if len(parts) > 2 else ''
all_summary_data.append({
'目标列': target_column_name,
'划分方法': split_method,
'预处理方法': preprocess_method,
'建模方法': model_method,
'CV均值': result['cv_mean'],
'CV标准差': result['cv_std'],
'最佳得分': result['best_score'],
'训练集R²': result['train_r2'],
'训练集RMSE': result['train_rmse'],
'训练集MAE': result['train_mae'],
'训练集MSE': result['train_mse'],
'测试集R²': result['test_r2'],
'测试集RMSE': result['test_rmse'],
'测试集MAE': result['test_mae'],
'测试集MSE': result['test_mse'],
'训练样本数': result['train_size'],
'测试样本数': result['test_size'],
'最佳参数': str(result['best_params'])
})
if all_summary_data:
summary_df = pd.DataFrame(all_summary_data)
# 按目标列和测试集R²排序
summary_df = summary_df.sort_values(['目标列', '测试集R²'], ascending=[True, False])
# 保存详细结果CSV中文版
detailed_path = self.artifacts_dir / "batch_detailed_results.csv"
summary_df.to_csv(detailed_path, index=False, encoding='utf-8-sig')
# 保持原有的批量训练汇总结果(中文版)
batch_summary_path = self.artifacts_dir / "batch_training_summary.csv"
summary_df.to_csv(batch_summary_path, index=False, encoding='utf-8-sig')
# 创建简化版本用于兼容性(英文版)
all_summary_data_simple = []
for target_column_name, target_results in all_results.items():
for combo_key, result in target_results.items():
all_summary_data_simple.append({
'target_column': target_column_name,
'combination': combo_key,
'cv_mean': result['cv_mean'],
'cv_std': result['cv_std'],
'best_score': result['best_score'],
'train_r2': result['train_r2'],
'train_rmse': result['train_rmse'],
'train_mae': result['train_mae'],
'test_r2': result['test_r2'],
'test_rmse': result['test_rmse'],
'test_mae': result['test_mae'],
'train_size': result['train_size'],
'test_size': result['test_size'],
'split_method': result.get('split_method', 'unknown'),
'best_params': str(result['best_params'])
})
summary_df_simple = pd.DataFrame(all_summary_data_simple)
summary_df_simple = summary_df_simple.sort_values(['target_column', 'test_r2'], ascending=[True, False])
simple_summary_path = self.artifacts_dir / "batch_training_summary_simple.csv"
summary_df_simple.to_csv(simple_summary_path, index=False)
print(f"\n{'='*80}")
print("批量训练结果汇总:")
print(f"{'='*80}")
# 显示每个目标列的最佳模型
for target_col in summary_df['目标列'].unique():
target_data = summary_df[summary_df['目标列'] == target_col]
best_row = target_data.iloc[0] # 已经按R²降序排列
print(f"\n目标列 '{target_col}' 最佳模型:")
print(f" 组合: {best_row['划分方法']}_{best_row['预处理方法']}_{best_row['建模方法']}")
print(f" 测试集R²: {best_row['测试集R²']:.4f}")
print(f" 测试集RMSE: {best_row['测试集RMSE']:.4f}")
print(f" 最佳参数: {best_row['最佳参数']}")
print(f"\n详细结果已保存: {detailed_path}")
print(f"批量训练汇总结果已保存: {batch_summary_path}")
print(f"简化结果已保存: {simple_summary_path}")
def load_model(self, preprocess_method: str, model_name: str):
"""
加载保存的模型
Args:
preprocess_method: 预处理方法名称
model_name: 模型名称
Returns:
加载的模型数据
"""
filename = f"{preprocess_method}_{model_name}.joblib"
filepath = self.artifacts_dir / filename
if not filepath.exists():
raise FileNotFoundError(f"模型文件不存在: {filepath}")
return joblib.load(filepath)
def get_best_model(self, metric: str = 'test_r2') -> Tuple[str, Dict]:
"""
获取最佳模型
Args:
metric: 评估指标默认使用测试集R²
可选:'test_r2', 'train_r2', 'test_rmse', 'test_mae',
'train_rmse', 'train_mae', 'cv_mean', 'best_score'
Returns:
最佳模型的组合名称和结果
"""
if not self.results:
raise ValueError("没有训练结果,请先训练模型")
# 对于回归指标R²和负MSE需要取最大值RMSE和MAE需要取最小值
if metric in ['test_r2', 'train_r2', 'cv_mean', 'best_score']:
best_combo = max(self.results.keys(),
key=lambda k: self.results[k][metric])
else: # rmse, mae等越小越好
best_combo = min(self.results.keys(),
key=lambda k: self.results[k][metric])
return best_combo, self.results[best_combo]
def main():
"""主函数示例 - 批量训练"""
# 创建批量建模实例
modeler = WaterQualityModelingBatch(r"D:\BaiduNetdiskDownload\yaobao\model")
# 批量训练多个目标列的模型
all_results = modeler.train_models_batch(
csv_path=r"D:\BaiduNetdiskDownload\yaobao\csv\yangdian_output.csv",
feature_start_column="374.285004", # 使用列名指定特征开始位置
preprocessing_methods=['None', 'MMS', 'SS', 'SNV', 'MA', 'SG', 'MSC', 'D1', 'D2', 'DT', 'CT'],#
model_names=['SVR', 'RF', 'Ridge', 'Lasso'],#, 'ElasticNet', 'XGBoost', 'LightGBM', 'CatBoost'
split_methods=['spxy', 'ks','random' ], #
cv_folds=5
)
print(f"\n批量训练完成,共训练了 {len(all_results)} 个目标列的模型")
# 显示每个目标列的最佳模型
for target_column_name, target_results in all_results.items():
if target_results:
best_combo = max(target_results.keys(),
key=lambda k: target_results[k]['test_r2'])
best_result = target_results[best_combo]
print(f"\n目标列 '{target_column_name}' 最佳模型:")
print(f" 组合: {best_combo}")
print(f" 测试集R²: {best_result['test_r2']:.4f}")
print(f" 测试集RMSE: {best_result['test_rmse']:.4f}")
if __name__ == "__main__":
main()