Files
2026-02-25 09:42:51 +08:00

327 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import pandas as pd
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, precision_score, recall_score, f1_score
import sklearn.svm as svm
from sklearn.cross_decomposition import PLSRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, cross_val_score, train_test_split
import xgboost as xgb
import lightgbm as lgb
import catboost as cb
# import torch
# from torch import nn, optim
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
# 固定随机种子
def set_random_seed(seed=42):
np.random.seed(seed)
set_random_seed()
# 交叉验证(多核心支持)
def cross_validate_model(model, X, y, cv=5, n_jobs=-1):
"""
多核心交叉验证
"""
scores = cross_val_score(model, X, y, cv=cv, n_jobs=n_jobs)
print(f"Cross-validation accuracy: {scores.mean():.4f} ± {scores.std():.4f}")
return scores
# 混淆矩阵与分类报告
def evaluate_model(y_true, y_pred, dataset_name="Test"):
"""
性能评估,包含分类报告和混淆矩阵。
"""
print(f"{dataset_name} Classification Report:")
print(classification_report(y_true, y_pred))
# 计算混淆矩阵
cm = confusion_matrix(y_true, y_pred)
# 返回多个性能指标的字典,包括混淆矩阵
return {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='weighted'),
"recall": recall_score(y_true, y_pred, average='weighted'),
"f1_score": f1_score(y_true, y_pred, average='weighted'),
"confusion_matrix": cm
}
# 神经网络模型ANN
# 神经网络模型ANN
# 逻辑回归模型 (Logistic Regression)
def LogisticRegressionModel(X_train, X_test, y_train, y_test, penalty='l2', C=1.0, solver='lbfgs', max_iter=200):
"""
逻辑回归模型(适用于多分类任务)
:param penalty: 正则化类型 ('l1', 'l2', 'elasticnet', 'none')
:param C: 正则化强度的倒数(较小的 C 代表更强的正则化)
:param solver: 优化算法('lbfgs', 'liblinear', 'saga', etc.
:param max_iter: 训练的最大迭代次数
"""
# 使用 multinomial 来处理多分类问题
model = LogisticRegression(penalty=penalty, C=C, solver=solver, max_iter=max_iter, multi_class='multinomial', random_state=1)
# 交叉验证
cross_validate_model(model, X_train, y_train)
# 模型拟合
model.fit(X_train, y_train.ravel())
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# SVM 模型
def SVM(X_train, X_test, y_train, y_test, kernel='linear', C=1, gamma=1e-3):
clf = svm.SVC(C=C, kernel=kernel, gamma=gamma)
# 交叉验证
cross_validate_model(clf, X_train, y_train)
# 模型拟合
clf.fit(X_train, y_train.ravel())
# 训练集评估
y_train_pred = clf.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = clf.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# PLS-DA 模型
def PLS_DA(X_train, X_test, y_train, y_test, n_components=40):
y_train = pd.get_dummies(y_train) # One-hot 编码
model = PLSRegression(n_components=n_components)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
y_train_pred = np.argmax(y_train_pred, axis=1)
train_metrics = evaluate_model(np.argmax(y_train.values, axis=1), y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
y_test_pred = np.argmax(y_test_pred, axis=1)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# 随机森林模型RF
def RF(X_train, X_test, y_train, y_test, n_estimators=200, max_depth=15, n_jobs=-1):
clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=1, n_jobs=n_jobs)
# 交叉验证
cross_validate_model(clf, X_train, y_train, n_jobs=n_jobs)
# 模型拟合
clf.fit(X_train, y_train.ravel())
# 训练集评估
y_train_pred = clf.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = clf.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# XGBoost 模型
# 网格搜索超参数优化
# 神经网络模型ANN使用 PyTorch 实现
# def ANN(X_train, X_test, y_train, y_test, hidden_layer_sizes=(50, 30), max_iter=500):
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 检测 GPU
# X_train = torch.tensor(X_train, device=device, dtype=torch.float32)
# X_test = torch.tensor(X_test, device=device, dtype=torch.float32)
# y_train = torch.tensor(y_train, device=device, dtype=torch.long)
# y_test = torch.tensor(y_test, device=device, dtype=torch.long)
#
# # 定义简单的神经网络
# class SimpleNN(nn.Module):
# def __init__(self, input_size, hidden_sizes, output_size):
# super(SimpleNN, self).__init__()
# self.fc1 = nn.Linear(input_size, hidden_sizes[0])
# self.fc2 = nn.Linear(hidden_sizes[0], hidden_sizes[1])
# self.fc3 = nn.Linear(hidden_sizes[1], output_size)
#
# def forward(self, x):
# x = torch.relu(self.fc1(x))
# x = torch.relu(self.fc2(x))
# x = self.fc3(x)
# return x
#
# model = SimpleNN(X_train.shape[1], hidden_layer_sizes, len(torch.unique(y_train))).to(device)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)
#
# # 训练模型
# for epoch in range(max_iter):
# optimizer.zero_grad()
# outputs = model(X_train)
# loss = criterion(outputs, y_train)
# loss.backward()
# optimizer.step()
#
# # 训练集评估
# with torch.no_grad():
# y_train_pred = torch.argmax(model(X_train), dim=1)
# train_metrics = evaluate_model(y_train.cpu(), y_train_pred.cpu(), dataset_name="Train")
#
# y_test_pred = torch.argmax(model(X_test), dim=1)
# test_metrics = evaluate_model(y_test.cpu(), y_test_pred.cpu(), dataset_name="Test")
#
# return train_metrics, test_metrics
# XGBoost 模型
def XGBoost(X_train, X_test, y_train, y_test, n_estimators=100, learning_rate=0.1, max_depth=3):
model = xgb.XGBClassifier(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
random_state=1,
# tree_method='gpu_hist', # 使用 GPU 加速
gpu_id=0
)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# LightGBM 模型
def LightGBM(X_train, X_test, y_train, y_test, n_estimators=100, learning_rate=0.1, max_depth=-1, num_leaves=31):
model = lgb.LGBMClassifier(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
num_leaves=num_leaves,
random_state=1,
# device='gpu' # 使用 GPU 加速
)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# CatBoost 模型
def CatBoost(X_train, X_test, y_train, y_test, iterations=500, learning_rate=0.1, depth=6):
model = cb.CatBoostClassifier(
iterations=iterations,
learning_rate=learning_rate,
depth=depth,
random_seed=1,
# task_type='GPU', # 使用 GPU
verbose=0
)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# AdaBoost 模型
def AdaBoost(X_train, X_test, y_train, y_test, n_estimators=50, learning_rate=1.0):
"""
AdaBoost多分类模型的实现
:param n_estimators: 基学习器的数量(迭代次数)
:param learning_rate: 学习率(对每个基学习器的贡献进行缩放)
"""
# 使用决策树作为基学习器
base_estimator = DecisionTreeClassifier(max_depth=1)
# 创建AdaBoost模型并移除不必要的参数
model = AdaBoostClassifier(
base_estimator=base_estimator,
n_estimators=n_estimators,
learning_rate=learning_rate,
random_state=1
)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
def KNN(X_train, X_test, y_train, y_test, n_neighbors=5, weights='uniform', algorithm='auto'):
"""
K-Nearest Neighbors 模型实现
:param n_neighbors: 最近邻的数量
:param weights: 'uniform''distance',决定邻居的权重
:param algorithm: 'auto', 'ball_tree', 'kd_tree', 'brute',用于计算邻居的算法
"""
# 创建 KNN 模型
model = KNeighborsClassifier(n_neighbors=n_neighbors, weights=weights, algorithm=algorithm)
# 交叉验证
cross_validate_model(model, X_train, y_train)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics