650 lines
24 KiB
Python
650 lines
24 KiB
Python
"""
|
||
-*- coding: utf-8 -*-
|
||
@Time :2022/04/12 17:10
|
||
@Author : Pengyou FU
|
||
@blogs : https://blog.csdn.net/Echo_Code?spm=1000.2115.3001.5343
|
||
@github : https://github.com/FuSiry/OpenSA
|
||
@WeChat : Fu_siry
|
||
@License:Apache-2.0 license
|
||
|
||
"""
|
||
from imblearn.over_sampling import SMOTE
|
||
import pandas as pd
|
||
from classification_model.DataLoad.DataLoad import SetSplit, LoadNirtest
|
||
from classification_model.Preprocessing.Preprocessing import Preprocessing
|
||
from classification_model.WaveSelect.WaveSelcet import SpctrumFeatureSelcet
|
||
from classification_model.Classification.ClassicCls import (
|
||
LogisticRegressionModel, SVM as SVM_Classic, PLS_DA, RF,
|
||
XGBoost, LightGBM, CatBoost, AdaBoost, KNN
|
||
)
|
||
import warnings
|
||
warnings.filterwarnings("ignore", category=FutureWarning)
|
||
import sklearn.svm as svm
|
||
from sklearn.model_selection import GridSearchCV, cross_val_score, train_test_split
|
||
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, precision_score, recall_score, f1_score
|
||
import numpy as np
|
||
import joblib
|
||
import os
|
||
from sklearn.svm import SVC
|
||
from sklearn.model_selection import GridSearchCV
|
||
from sklearn.metrics import classification_report, confusion_matrix
|
||
import matplotlib.pyplot as plt
|
||
|
||
def cross_validate_model(model, X, y, cv=5):
|
||
"""
|
||
|
||
:param model: 模型
|
||
:param X:
|
||
:param y:
|
||
:param cv: 折数
|
||
:return:
|
||
"""
|
||
scores = cross_val_score(model, X, y, cv=cv)
|
||
print(f"Cross-validation accuracy: {scores.mean():.4f} ± {scores.std():.4f}")
|
||
return scores
|
||
|
||
|
||
# 混淆矩阵与分类报告
|
||
def evaluate_model(y_true, y_pred, dataset_name="Test", title="Confusion Matrix", cmap='Blues'):
|
||
"""
|
||
性能评估,包含分类报告和混淆矩阵,且标签从 1 开始。
|
||
|
||
参数:
|
||
y_true -- 真实标签
|
||
y_pred -- 预测标签
|
||
dataset_name -- 数据集名称(如 "Train" 或 "Test")
|
||
title -- 图表标题
|
||
cmap -- 热力图颜色
|
||
"""
|
||
print(f"{dataset_name} Classification Report:")
|
||
print(classification_report(y_true, y_pred))
|
||
|
||
# 计算混淆矩阵
|
||
cm = confusion_matrix(y_true, y_pred)
|
||
|
||
# 绘制热力图(此部分可选择性取消注释以显示图形)
|
||
# plt.figure(figsize=(8, 6))
|
||
# ax = sns.heatmap(cm, annot=True, fmt='g', cmap=cmap, cbar=True,
|
||
# linewidths=0.5, linecolor='black', square=True,
|
||
# annot_kws={"size": 12})
|
||
# ax.set_title(f"{dataset_name} {title}", fontsize=16)
|
||
# ax.set_xlabel('Predicted Label', fontsize=14)
|
||
# ax.set_ylabel('True Label', fontsize=14)
|
||
# plt.tight_layout()
|
||
# plt.show()
|
||
|
||
# 返回多个性能指标的字典,包括混淆矩阵
|
||
return {
|
||
"accuracy": accuracy_score(y_true, y_pred),
|
||
"precision": precision_score(y_true, y_pred, average='weighted'),
|
||
"recall": recall_score(y_true, y_pred, average='weighted'),
|
||
"f1_score": f1_score(y_true, y_pred, average='weighted'),
|
||
"confusion_matrix": cm
|
||
}
|
||
|
||
|
||
# 光谱定性分析
|
||
def SpectralQualitativeAnalysis(data, label, ProcessMethods, ProcessMethods2, FslecetedMethods, SetSplitMethods, use_smote=False):
|
||
# 预处理
|
||
ProcesedData = Preprocessing(ProcessMethods, data)
|
||
ProcesedData2 = Preprocessing(ProcessMethods2, ProcesedData)
|
||
|
||
# 特征选择
|
||
FeatrueData, labels, selected_columns = SpctrumFeatureSelcet(FslecetedMethods, ProcesedData2, label)
|
||
|
||
# 数据划分
|
||
X_train, X_test, y_train, y_test = SetSplit(SetSplitMethods, FeatrueData, labels, test_size=0.3, randomseed=42)
|
||
|
||
# 使用 SMOTE 增加少数类别样本
|
||
if use_smote:
|
||
smote = SMOTE(random_state=42)
|
||
X_train, y_train = smote.fit_resample(X_train, y_train)
|
||
print("SMOTE applied: Training set size after resampling:", len(y_train))
|
||
|
||
# 模型训练和评估
|
||
|
||
return X_train, X_test, y_train, y_test
|
||
|
||
def Procesed(data, ProcessMethods1, ProcessMethods2, model_path):
|
||
"""
|
||
对数据进行预处理,支持两种预处理方法
|
||
|
||
:param data: 输入数据
|
||
:param ProcessMethods1: 第一种预处理方法(如 'SS', 'MMS', 'None'等)
|
||
:param ProcessMethods2: 第二种预处理方法(如 'SG', 'D1', 'None'等)
|
||
:param model_path: 模型路径(用于定位scaler_params.pkl)
|
||
:return: 预处理后的数据
|
||
"""
|
||
import os
|
||
from classification_model.Preprocessing.Preprocessing import Preprocessing
|
||
|
||
# 第一步预处理
|
||
if ProcessMethods1 == 'SS':
|
||
# 当第一种预处理方法为SS时,需要加载保存的scaler
|
||
model_dir = os.path.dirname(model_path)
|
||
scaler_path = os.path.join(model_dir, 'scaler_params.pkl')
|
||
if not os.path.exists(scaler_path):
|
||
raise FileNotFoundError(f"Scaler file not found at {scaler_path}. Please ensure the model was trained with SS preprocessing.")
|
||
loaded_scaler = joblib.load(scaler_path)
|
||
transformed_data = loaded_scaler.transform(data)
|
||
# 转换为DataFrame格式以便后续处理
|
||
transformed_data_layout = pd.DataFrame(transformed_data)
|
||
elif ProcessMethods1 == 'None' or ProcessMethods1 is None:
|
||
# 如果第一种预处理方法为None,直接使用原始数据
|
||
transformed_data_layout = pd.DataFrame(data) if not isinstance(data, pd.DataFrame) else data
|
||
else:
|
||
# 其他预处理方法直接调用Preprocessing函数
|
||
transformed_data_layout = Preprocessing(ProcessMethods1, data)
|
||
if isinstance(transformed_data_layout, np.ndarray):
|
||
transformed_data_layout = pd.DataFrame(transformed_data_layout)
|
||
|
||
# 第二步预处理
|
||
if ProcessMethods2 == 'None' or ProcessMethods2 is None:
|
||
ProcesedData2 = transformed_data_layout
|
||
else:
|
||
ProcesedData2 = Preprocessing(ProcessMethods2, transformed_data_layout)
|
||
if isinstance(ProcesedData2, np.ndarray):
|
||
ProcesedData2 = pd.DataFrame(ProcesedData2)
|
||
|
||
return ProcesedData2
|
||
|
||
|
||
def SVM_with_kernels_visualization(X_train, X_test, y_train, y_test, param_grid=None):
|
||
"""
|
||
针对不同核函数的 SVM 模型进行超参数调优,使用测试集验证最佳模型,并分别绘制三维网络图。
|
||
|
||
:param X_train: 训练集特征
|
||
:param X_test: 测试集特征
|
||
:param y_train: 训练集标签
|
||
:param y_test: 测试集标签
|
||
:param param_grid: 超参数网格,默认为 None。如果 None,使用默认参数范围。
|
||
"""
|
||
if param_grid is None:
|
||
# 默认参数网格
|
||
param_grid = {
|
||
'C': [0.1, 1, 10, 100],
|
||
'gamma': [1, 0.1, 0.01, 0.001],
|
||
'kernel': ['linear', 'rbf', 'poly']
|
||
}
|
||
|
||
# 初始化 SVM 模型
|
||
svc = SVC()
|
||
|
||
# 网格搜索
|
||
grid_search = GridSearchCV(estimator=svc, param_grid=param_grid, cv=5, scoring='accuracy', verbose=1, n_jobs=-1)
|
||
grid_search.fit(X_train, y_train)
|
||
|
||
# 输出最优参数和对应的分数
|
||
print("Best Parameters:", grid_search.best_params_)
|
||
print("Best Cross-Validation Score:", grid_search.best_score_)
|
||
|
||
# 测试集验证最佳模型
|
||
best_model = grid_search.best_estimator_
|
||
y_test_pred = best_model.predict(X_test)
|
||
|
||
print("\nTest Set Evaluation:")
|
||
print(classification_report(y_test, y_test_pred))
|
||
print("Confusion Matrix:\n", confusion_matrix(y_test, y_test_pred))
|
||
print(f"Test Accuracy: {accuracy_score(y_test, y_test_pred):.4f}")
|
||
|
||
# 获取搜索结果
|
||
results = grid_search.cv_results_
|
||
|
||
# 按核函数类型分别绘制三维网格图
|
||
kernels = np.unique(param_grid['kernel'])
|
||
for kernel in kernels:
|
||
kernel_indices = [i for i, params in enumerate(results['params']) if params['kernel'] == kernel]
|
||
C_values = [results['params'][i]['C'] for i in kernel_indices]
|
||
gamma_values = [results['params'][i]['gamma'] for i in kernel_indices if 'gamma' in results['params'][i]]
|
||
scores = results['mean_test_score'][kernel_indices]
|
||
|
||
# 如果是线性核,不需要绘制 gamma 参数
|
||
if kernel == 'linear':
|
||
plot_linear_kernel(C_values, scores, kernel)
|
||
else:
|
||
plot_3D_grid(C_values, gamma_values, scores, kernel)
|
||
|
||
return best_model
|
||
|
||
def plot_3D_grid(C_values, gamma_values, scores, kernel):
|
||
"""
|
||
绘制三维超参数网络图(针对 RBF 和多项式核),添加颜色梯度。
|
||
|
||
:param C_values: C 参数的列表
|
||
:param gamma_values: gamma 参数的列表
|
||
:param scores: 对应的交叉验证分数
|
||
:param kernel: 核函数名称
|
||
"""
|
||
# 将数据转化为网格形式
|
||
C_unique = np.unique(C_values)
|
||
gamma_unique = np.unique(gamma_values)
|
||
C_grid, gamma_grid = np.meshgrid(C_unique, gamma_unique)
|
||
|
||
# 构建 Z 轴(对应交叉验证得分)
|
||
Z = np.zeros_like(C_grid)
|
||
for i, c in enumerate(C_unique):
|
||
for j, gamma in enumerate(gamma_unique):
|
||
indices = [k for k, val in enumerate(C_values) if val == c and gamma_values[k] == gamma]
|
||
if indices:
|
||
Z[j, i] = scores[indices[0]]
|
||
|
||
# 转换 C 和 gamma 为对数尺度
|
||
log_C_grid = np.log10(C_grid)
|
||
log_gamma_grid = np.log10(gamma_grid)
|
||
|
||
# 绘制三维表面图并添加颜色梯度
|
||
fig = plt.figure(figsize=(12, 8))
|
||
ax = fig.add_subplot(111, projection='3d')
|
||
surface = ax.plot_surface(
|
||
log_C_grid, log_gamma_grid, Z, cmap='viridis', edgecolor='k', alpha=0.8
|
||
)
|
||
|
||
# 添加颜色条
|
||
cbar = fig.colorbar(surface, pad=0.1, shrink=0.5, aspect=10)
|
||
cbar.set_label('Mean Accuracy', fontsize=12)
|
||
|
||
# 设置坐标轴和标题
|
||
ax.set_title(f'3D Hyperparameter Grid ({kernel} kernel)', fontsize=16)
|
||
ax.set_xlabel('Log10(C)', fontsize=12)
|
||
ax.set_ylabel('Log10(Gamma)', fontsize=12)
|
||
ax.set_zlabel('Mean Accuracy', fontsize=12)
|
||
|
||
# 显示图形
|
||
plt.show()
|
||
|
||
def plot_linear_kernel(C_values, scores, kernel):
|
||
"""
|
||
绘制线性核的超参数网络图(仅针对 C 参数)。
|
||
|
||
:param C_values: C 参数的列表
|
||
:param scores: 对应的交叉验证分数
|
||
:param kernel: 核函数名称
|
||
"""
|
||
# 将 C 转换为对数尺度
|
||
C_values = np.log10(C_values)
|
||
|
||
# 创建二维折线图
|
||
plt.figure(figsize=(8, 6))
|
||
plt.plot(C_values, scores, marker='o', label='Mean Accuracy')
|
||
plt.xlabel('Log10(C)', fontsize=12)
|
||
plt.ylabel('Mean Accuracy', fontsize=12)
|
||
plt.title(f'Hyperparameter Tuning ({kernel} kernel)', fontsize=16)
|
||
plt.grid(True)
|
||
plt.legend()
|
||
plt.show()
|
||
|
||
# 分类并填充结果到标签数组
|
||
def classify_and_fill(segments, superpixel_features, model, label_array):
|
||
"""
|
||
|
||
:param segments:
|
||
:param superpixel_features:
|
||
:param model: 模型
|
||
:param label_array:
|
||
:return: 类别列
|
||
"""
|
||
for segment, feature in superpixel_features.items():
|
||
# 将高光谱平均特征输入模型,预测类别
|
||
label = model.predict([feature])[0]
|
||
# 填充到标签数组的对应位置
|
||
label_array[segments == segment] = label
|
||
return label_array
|
||
|
||
def save_model(model, model_path, model_type='SVM'):
|
||
"""
|
||
保存模型到指定路径
|
||
:param model: 训练好的模型对象
|
||
:param model_path: 模型保存路径
|
||
:param model_type: 模型类型(用于文件命名)
|
||
:return: 保存的完整路径
|
||
"""
|
||
os.makedirs(os.path.dirname(model_path), exist_ok=True)
|
||
joblib.dump(model, model_path)
|
||
print(f"{model_type} model saved to: {model_path}")
|
||
return model_path
|
||
|
||
def load_model(model_path):
|
||
"""
|
||
加载模型(支持所有模型类型)
|
||
:param model_path: 模型路径
|
||
:return: 加载的模型
|
||
"""
|
||
return joblib.load(model_path)
|
||
|
||
def predict_and_save(df, model_path, model_type='SVM', ProcessMethods1='SS', ProcessMethods2='SG'):
|
||
"""
|
||
预测模型(支持所有模型类型)
|
||
:param df: 包含反射率和形状特征的dataframe
|
||
:param model_path: 模型的路径
|
||
:param model_type: 模型类型(可选,用于特殊处理)
|
||
:param ProcessMethods1: 第一次预处理方法,默认为'SS'(当为'SS'时会加载scaler_params.pkl)
|
||
:param ProcessMethods2: 第二次预处理方法,默认为'SG'
|
||
:return: 包含预测类别列的dataframe
|
||
"""
|
||
model = load_model(model_path)
|
||
|
||
# 找到轮廓列的索引
|
||
contour_col_idx = None
|
||
if 'contour' in df.columns:
|
||
contour_col_idx = df.columns.get_loc('contour')
|
||
|
||
# 选择所有数值列(排除轮廓列)
|
||
numeric_cols = []
|
||
for i in range(1, df.shape[1]): # 跳过第一列(可能是类别或ID)
|
||
if i != contour_col_idx:
|
||
col_name = df.columns[i]
|
||
# 只选择数值类型的列
|
||
if df[col_name].dtype in ['int64', 'float64']:
|
||
numeric_cols.append(col_name)
|
||
|
||
# 加载数据
|
||
x = df[numeric_cols]
|
||
|
||
# 进行预处理(支持两种预处理方法)
|
||
Procesed_features = Procesed(x, ProcessMethods1, ProcessMethods2, model_path)
|
||
|
||
# 确保Procesed_features是numpy数组格式供模型预测
|
||
if isinstance(Procesed_features, pd.DataFrame):
|
||
Procesed_features = Procesed_features.values
|
||
|
||
# 进行预测
|
||
predictions = model.predict(Procesed_features)
|
||
df['Predictions'] = predictions
|
||
|
||
return df
|
||
|
||
def SVM(X_train, X_test, y_train, y_test, kernel='linear', C=1, gamma=1e-3):
|
||
clf = svm.SVC(C=C, kernel=kernel, gamma=gamma)
|
||
|
||
# 交叉验证
|
||
cross_validate_model(clf, X_train, y_train)
|
||
|
||
# 模型拟合
|
||
clf.fit(X_train, y_train.ravel())
|
||
|
||
# 训练集评估
|
||
y_train_pred = clf.predict(X_train)
|
||
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
|
||
|
||
# 测试集评估
|
||
y_test_pred = clf.predict(X_test)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return clf
|
||
|
||
# ==================== 所有模型的训练函数(返回模型对象)====================
|
||
|
||
def train_LogisticRegression(X_train, X_test, y_train, y_test, penalty='l2', C=1.0, solver='lbfgs', max_iter=200):
|
||
"""训练逻辑回归模型并返回模型对象"""
|
||
from sklearn.linear_model import LogisticRegression
|
||
model = LogisticRegression(penalty=penalty, C=C, solver=solver, max_iter=max_iter, multi_class='multinomial', random_state=1)
|
||
|
||
cross_validate_model(model, X_train, y_train)
|
||
model.fit(X_train, y_train.ravel())
|
||
|
||
y_train_pred = model.predict(X_train)
|
||
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
|
||
|
||
y_test_pred = model.predict(X_test)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return model
|
||
|
||
def train_PLS_DA(X_train, X_test, y_train, y_test, n_components=40):
|
||
"""训练PLS-DA模型并返回模型对象"""
|
||
from sklearn.cross_decomposition import PLSRegression
|
||
y_train_encoded = pd.get_dummies(y_train)
|
||
model = PLSRegression(n_components=n_components)
|
||
|
||
model.fit(X_train, y_train_encoded)
|
||
|
||
y_train_pred = model.predict(X_train)
|
||
y_train_pred = np.argmax(y_train_pred, axis=1)
|
||
train_metrics = evaluate_model(np.argmax(y_train_encoded.values, axis=1), y_train_pred, dataset_name="Train")
|
||
|
||
y_test_pred = model.predict(X_test)
|
||
y_test_pred = np.argmax(y_test_pred, axis=1)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return model
|
||
|
||
def train_RF(X_train, X_test, y_train, y_test, n_estimators=200, max_depth=15, n_jobs=-1):
|
||
"""训练随机森林模型并返回模型对象"""
|
||
from sklearn.ensemble import RandomForestClassifier
|
||
model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=1, n_jobs=n_jobs)
|
||
|
||
cross_validate_model(model, X_train, y_train, n_jobs=n_jobs)
|
||
model.fit(X_train, y_train.ravel())
|
||
|
||
y_train_pred = model.predict(X_train)
|
||
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
|
||
|
||
y_test_pred = model.predict(X_test)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return model
|
||
|
||
def train_XGBoost(X_train, X_test, y_train, y_test, n_estimators=100, learning_rate=0.1, max_depth=3):
|
||
"""训练XGBoost模型并返回模型对象"""
|
||
import xgboost as xgb
|
||
model = xgb.XGBClassifier(
|
||
n_estimators=n_estimators,
|
||
learning_rate=learning_rate,
|
||
max_depth=max_depth,
|
||
random_state=1,
|
||
gpu_id=0
|
||
)
|
||
|
||
model.fit(X_train, y_train)
|
||
|
||
y_train_pred = model.predict(X_train)
|
||
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
|
||
|
||
y_test_pred = model.predict(X_test)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return model
|
||
|
||
def train_LightGBM(X_train, X_test, y_train, y_test, n_estimators=100, learning_rate=0.1, max_depth=-1, num_leaves=31):
|
||
"""训练LightGBM模型并返回模型对象"""
|
||
import lightgbm as lgb
|
||
model = lgb.LGBMClassifier(
|
||
n_estimators=n_estimators,
|
||
learning_rate=learning_rate,
|
||
max_depth=max_depth,
|
||
num_leaves=num_leaves,
|
||
random_state=1
|
||
)
|
||
|
||
model.fit(X_train, y_train)
|
||
|
||
y_train_pred = model.predict(X_train)
|
||
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
|
||
|
||
y_test_pred = model.predict(X_test)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return model
|
||
|
||
def train_CatBoost(X_train, X_test, y_train, y_test, iterations=500, learning_rate=0.1, depth=6):
|
||
"""训练CatBoost模型并返回模型对象"""
|
||
import catboost as cb
|
||
model = cb.CatBoostClassifier(
|
||
iterations=iterations,
|
||
learning_rate=learning_rate,
|
||
depth=depth,
|
||
random_seed=1,
|
||
verbose=0
|
||
)
|
||
|
||
model.fit(X_train, y_train)
|
||
|
||
y_train_pred = model.predict(X_train)
|
||
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
|
||
|
||
y_test_pred = model.predict(X_test)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return model
|
||
|
||
def train_AdaBoost(X_train, X_test, y_train, y_test, n_estimators=50, learning_rate=1.0):
|
||
"""训练AdaBoost模型并返回模型对象"""
|
||
from sklearn.ensemble import AdaBoostClassifier
|
||
from sklearn.tree import DecisionTreeClassifier
|
||
|
||
base_estimator = DecisionTreeClassifier(max_depth=1)
|
||
model = AdaBoostClassifier(
|
||
base_estimator=base_estimator,
|
||
n_estimators=n_estimators,
|
||
learning_rate=learning_rate,
|
||
random_state=1
|
||
)
|
||
|
||
model.fit(X_train, y_train)
|
||
|
||
y_train_pred = model.predict(X_train)
|
||
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
|
||
|
||
y_test_pred = model.predict(X_test)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return model
|
||
|
||
def train_KNN(X_train, X_test, y_train, y_test, n_neighbors=5, weights='uniform', algorithm='auto'):
|
||
"""训练KNN模型并返回模型对象"""
|
||
from sklearn.neighbors import KNeighborsClassifier
|
||
model = KNeighborsClassifier(n_neighbors=n_neighbors, weights=weights, algorithm=algorithm)
|
||
|
||
cross_validate_model(model, X_train, y_train)
|
||
model.fit(X_train, y_train)
|
||
|
||
y_train_pred = model.predict(X_train)
|
||
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
|
||
|
||
y_test_pred = model.predict(X_test)
|
||
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
|
||
|
||
return model
|
||
|
||
# ==================== 统一的模型训练和保存函数 ====================
|
||
|
||
def train_and_save_model(model_name, X_train, X_test, y_train, y_test, model_save_dir, **kwargs):
|
||
"""
|
||
训练指定模型并保存
|
||
:param model_name: 模型名称 ('SVM', 'LogisticRegression', 'PLS_DA', 'RF', 'XGBoost', 'LightGBM', 'CatBoost', 'AdaBoost', 'KNN')
|
||
:param X_train: 训练特征
|
||
:param X_test: 测试特征
|
||
:param y_train: 训练标签
|
||
:param y_test: 测试标签
|
||
:param model_save_dir: 模型保存目录
|
||
:param kwargs: 模型特定的超参数
|
||
:return: 训练好的模型和保存路径
|
||
"""
|
||
model_trainers = {
|
||
'SVM': SVM,
|
||
'LogisticRegression': train_LogisticRegression,
|
||
'PLS_DA': train_PLS_DA,
|
||
'RF': train_RF,
|
||
'XGBoost': train_XGBoost,
|
||
'LightGBM': train_LightGBM,
|
||
'CatBoost': train_CatBoost,
|
||
'AdaBoost': train_AdaBoost,
|
||
'KNN': train_KNN
|
||
}
|
||
|
||
if model_name not in model_trainers:
|
||
raise ValueError(f"Unsupported model: {model_name}. Supported models: {list(model_trainers.keys())}")
|
||
|
||
print(f"\n{'='*60}")
|
||
print(f"Training {model_name} model...")
|
||
print(f"{'='*60}")
|
||
|
||
# 训练模型
|
||
trainer = model_trainers[model_name]
|
||
model = trainer(X_train, X_test, y_train, y_test, **kwargs)
|
||
|
||
# 保存模型
|
||
os.makedirs(model_save_dir, exist_ok=True)
|
||
model_path = os.path.join(model_save_dir, f"{model_name.lower()}.m")
|
||
save_model(model, model_path, model_type=model_name)
|
||
|
||
return model, model_path
|
||
|
||
# ==================== 针对不同模型的预测函数 ====================
|
||
|
||
def predict_with_model(df, model_path, model_type='SVM', ProcessMethods1='SS', ProcessMethods2='SG'):
|
||
"""
|
||
使用指定模型进行预测
|
||
:param df: 包含反射率和形状特征的dataframe
|
||
:param model_path: 模型路径
|
||
:param model_type: 模型类型
|
||
:param ProcessMethods1: 第一次预处理方法,默认为'SS'(当为'SS'时会加载scaler_params.pkl)
|
||
:param ProcessMethods2: 第二次预处理方法,默认为'SG'
|
||
:return: 包含预测结果的dataframe
|
||
"""
|
||
return predict_and_save(df, model_path, model_type=model_type, ProcessMethods1=ProcessMethods1, ProcessMethods2=ProcessMethods2)
|
||
|
||
# 主函数,用于训练
|
||
if __name__ == "__main__":
|
||
# 使用 pandas 读取 CSV 文件
|
||
file_path = r"D:\Data2\traindata1\all\isf0303.csv"
|
||
df = pd.read_csv(
|
||
file_path,
|
||
encoding='utf-8', # 指定编码,如果出错可尝试 'gbk' 或 'gb18030'
|
||
low_memory=False # 避免数据类型推断问题
|
||
)
|
||
|
||
# # 使用 pandas 选择要删除的列(第93到117列,索引从0开始)
|
||
# cols_to_remove = df.columns[87:110]
|
||
#
|
||
# # 使用 pandas 删除指定列
|
||
# df_filtered = df.drop(columns=cols_to_remove)
|
||
|
||
# 使用 pandas 提取特征数据(从第2列开始到最后,排除第一列标签列)
|
||
# x = df_filtered.iloc[:, 1:]
|
||
x = df.iloc[:, 1:]
|
||
# 使用 pandas 提取标签(第一列)
|
||
y = df.iloc[:, 0]
|
||
X_train, X_test, y_train, y_test = SpectralQualitativeAnalysis(x, y, 'SS', 'None', 'None', 'random', use_smote=True)
|
||
|
||
# # # 网格搜索 SVM 模型并对不同核函数进行三维可视化
|
||
# param_grid = {
|
||
# 'C': np.logspace(-3, 3, 13), # 在 10^(-3) 到 10^3 范围内生成 13 个值
|
||
# 'gamma': np.logspace(-4, 1, 13), # 在 10^(-4) 到 10^1 范围内生成 13 个值
|
||
# 'kernel': ['rbf'] # 针对 RBF 核
|
||
# }
|
||
# clf = SVM_with_kernels_visualization(X_train, X_test, y_train, y_test, param_grid)
|
||
# joblib.dump(clf, "./classification_model/model_save/pre_salinas_MODEL.m")
|
||
# clf1 = joblib.load("./classification_model/model_save/pre_salinas_MODEL.m")
|
||
|
||
# 示例1: 训练并保存SVM模型(旧方法,仍然支持)
|
||
# clf = SVM(X_train, X_test, y_train, y_test)
|
||
# save_model(clf, r"D:\WQ\plastic\classification_model\modelsave\svm.m", model_type='SVM')
|
||
|
||
# 示例2: 使用统一的训练和保存函数(推荐)
|
||
save_dir = r"D:\plastic\plastic\modelsave\240model\new\0303"
|
||
|
||
# 训练并保存多个模型
|
||
models_to_train = ['SVM']#'SVM', 'RF', 'XGBoost', 'LogisticRegression'
|
||
for model_name in models_to_train:
|
||
model, model_path = train_and_save_model(
|
||
model_name=model_name,
|
||
X_train=X_train,
|
||
X_test=X_test,
|
||
y_train=y_train,
|
||
y_test=y_test,
|
||
model_save_dir=save_dir
|
||
)
|
||
print(f"{model_name} model saved at: {model_path}")
|
||
|
||
# 示例3: 加载模型并进行预测
|
||
# model_path = r"D:\WQ\plastic\classification_model\modelsave\svm.m"
|
||
# loaded_model = load_model(model_path)
|
||
# # 预测时使用与训练时相同的预处理方法
|
||
# # ProcessMethods1='SS' 时会自动加载scaler_params.pkl
|
||
# # ProcessMethods2='SG' 应用Savitzky-Golay滤波
|
||
# predictions_df = predict_with_model(df, model_path, model_type='SVM', ProcessMethods1='SS', ProcessMethods2='SG')
|
||
# print(f"Predictions completed. Results shape: {predictions_df.shape}")
|
||
|
||
|