初始提交

This commit is contained in:
2026-02-25 09:42:51 +08:00
parent c25276c481
commit d84d886f35
182 changed files with 18438 additions and 0 deletions

View File

@ -0,0 +1,259 @@
import torch.nn.functional as F
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from torch.utils.tensorboard import SummaryWriter
from torch.cuda.amp import GradScaler, autocast
import os
from sklearn.metrics import precision_score, recall_score, f1_score
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
writer = SummaryWriter() # 初始化 TensorBoard
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
# 自定义数据集,包含数据增强(添加噪声)
class MyDataset(Dataset):
def __init__(self, specs, labels, augment=False):
self.specs = specs
self.labels = labels
self.augment = augment # 是否启用数据增强
def __getitem__(self, index):
spec, target = self.specs[index], self.labels[index]
# 数据增强:在训练数据上添加随机噪声
if self.augment:
noise = 0.01 * torch.randn_like(spec)
spec = spec + noise
return spec, target
def __len__(self):
return len(self.specs)
# 标准化数据
def ZspPocess(X_train, X_test, y_train, y_test, need=True):
if need:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train) # fit_transform 用于训练集
X_test = scaler.transform(X_test) # 只对测试集应用 transform
# 将标准化的数据转换为 Tensor
X_train = torch.tensor(X_train[:, np.newaxis, :], dtype=torch.float32)
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)
# y_train = torch.tensor(y_train.values, dtype=torch.long)
# y_test = torch.tensor(y_test.values, dtype=torch.long)
# 使用数据增强 (augment=True) 创建训练集
data_train = MyDataset(X_train, y_train, augment=True)
data_test = MyDataset(X_test, y_test, augment=False)
return data_train, data_test
# CNN 模型,添加 Dropout 层和调整 Dropout 率
class CNN3Layers(nn.Module):
def __init__(self, nls, dropout_conv=0.3, dropout_fc=0.5):
super(CNN3Layers, self).__init__()
self.CONV1 = nn.Sequential(
nn.Conv1d(1, 64, 5, 1, padding=2),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.MaxPool1d(2, 2),
nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout
)
self.CONV2 = nn.Sequential(
nn.Conv1d(64, 128, 5, 1, padding=2),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.MaxPool1d(2, 2),
nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout
)
self.CONV3 = nn.Sequential(
nn.Conv1d(128, 256, 3, 1, padding=1),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.AdaptiveMaxPool1d(1),
nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout
)
self.fc = nn.Sequential(
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(dropout_fc), # 全连接层中的 Dropout
nn.Linear(128, nls)
)
def forward(self, x):
x = self.CONV1(x)
x = self.CONV2(x)
x = self.CONV3(x)
x = x.view(x.size(0), -1)
out = self.fc(x)
return out
# 训练函数
def CNNTrain(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls, model_path):
data_train, data_test = ZspPocess(X_train, X_test, y_train, y_test, need=True)
train_loader = torch.utils.data.DataLoader(data_train, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
model = CNN3Layers(nls=nls, dropout_conv=0.3, dropout_fc=0.5).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=5)
criterion = nn.CrossEntropyLoss().to(device)
scaler = GradScaler()
best_acc = 0.0
model_save_path = model_path
for epoch in range(n_epochs):
model.train()
train_acc, train_loss = [], []
for i, data in enumerate(train_loader):
inputs, labels = data
inputs = inputs.to(device).float()
labels = labels.to(device).long()
optimizer.zero_grad()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
_, predicted = torch.max(outputs.data, 1)
acc = accuracy_score(labels.cpu(), predicted.cpu())
train_acc.append(acc)
train_loss.append(loss.item())
avg_train_loss = np.mean(train_loss)
avg_train_acc = np.mean(train_acc)
writer.add_scalar('Loss/train', avg_train_loss, epoch)
writer.add_scalar('Accuracy/train', avg_train_acc, epoch)
# 测试集评估
model.eval()
test_acc, test_loss, test_precision, test_recall, test_f1 = [], [], [], [], []
y_true, y_pred = [], []
with torch.no_grad():
for data in test_loader:
inputs, labels = data
inputs = inputs.to(device).float()
labels = labels.to(device).long()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
_, predicted = torch.max(outputs.data, 1)
acc = accuracy_score(labels.cpu(), predicted.cpu())
precision = precision_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
recall = recall_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
f1 = f1_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
test_acc.append(acc)
test_loss.append(loss.item())
test_precision.append(precision)
test_recall.append(recall)
test_f1.append(f1)
avg_test_loss = np.mean(test_loss)
avg_test_acc = np.mean(test_acc)
avg_test_precision = np.mean(test_precision)
avg_test_recall = np.mean(test_recall)
avg_test_f1 = np.mean(test_f1)
writer.add_scalar('Loss/test', avg_test_loss, epoch)
writer.add_scalar('Accuracy/test', avg_test_acc, epoch)
writer.add_scalar('Precision/test', avg_test_precision, epoch)
writer.add_scalar('Recall/test', avg_test_recall, epoch)
writer.add_scalar('F1_Score/test', avg_test_f1, epoch)
# 打印每个 epoch 的训练和测试结果
print(f"Epoch [{epoch + 1}/{n_epochs}]")
print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {avg_train_acc:.4f}")
print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {avg_test_acc:.4f}")
print(f"Test Precision: {avg_test_precision:.4f}, Test Recall: {avg_test_recall:.4f}, Test F1: {avg_test_f1:.4f}")
if avg_test_acc > best_acc:
best_acc = avg_test_acc
torch.save(model.state_dict(), model_save_path)
scheduler.step(avg_test_loss)
return {
"accuracy": avg_test_acc,
"precision": avg_test_precision,
"recall": avg_test_recall,
"f1_score": avg_test_f1,
"confusion_matrix": confusion_matrix(y_true, y_pred)
}
# 测试函数
def CNNtest(X_test, y_test, BATCH_SIZE, nls, model_path):
# 标准化测试数据并创建 DataLoader
scaler = StandardScaler()
X_test = scaler.fit_transform(X_test) # 只对 X_test 进行标准化
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
# 创建测试数据集和 DataLoader
data_test = MyDataset(X_test, y_test, augment=False)
test_loader = torch.utils.data.DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
# 加载模型结构和权重
model = CNN3Layers(nls=nls).to(device)
model.load_state_dict(torch.load(model_path))
# 初始化评估指标
y_true, y_pred = [], []
# 测试过程
model.eval()
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device).float(), labels.to(device).long()
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
# 收集真实标签和预测标签
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
# 计算评估指标
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')
cm = confusion_matrix(y_true, y_pred)
# 返回评估结果
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1,
"confusion_matrix": cm
}
def CNN(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls, model_path):
# 训练模型
train_metrics = CNNTrain(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls,model_path)
# 测试模型并获取评估指标
test_metrics = CNNtest(X_test, y_test, BATCH_SIZE, nls, model_path)
return train_metrics, test_metrics

View File

@ -0,0 +1,317 @@
import torch.nn.functional as F
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from torch.utils.tensorboard import SummaryWriter
from torch.cuda.amp import GradScaler, autocast
import os
from sklearn.metrics import precision_score, recall_score, f1_score
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
writer = SummaryWriter() # 初始化 TensorBoard
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
# 自定义数据集,包含数据增强(添加噪声)
from skopt import BayesSearchCV
from skopt.space import Real, Integer
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
class MyDataset(Dataset):
def __init__(self, specs, labels, augment=False):
self.specs = specs
self.labels = labels
self.augment = augment # 是否启用数据增强
def __getitem__(self, index):
spec, target = self.specs[index], self.labels[index]
# 数据增强:在训练数据上添加随机噪声
if self.augment:
noise = 0.01 * torch.randn_like(spec)
spec = spec + noise
return spec, target
def __len__(self):
return len(self.specs)
# 标准化数据
def ZspPocess(X_train, X_test, y_train, y_test, need=True):
if need:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train) # fit_transform 用于训练集
X_test = scaler.transform(X_test) # 只对测试集应用 transform
# 将标准化的数据转换为 Tensor
X_train = torch.tensor(X_train[:, np.newaxis, :], dtype=torch.float32)
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)
# 使用数据增强 (augment=True) 创建训练集
data_train = MyDataset(X_train, y_train, augment=True)
data_test = MyDataset(X_test, y_test, augment=False)
return data_train, data_test
# CNN 模型,添加 Dropout 层和调整 Dropout 率
class CNN3Layers(nn.Module):
def __init__(self, nls, dropout_conv=0.3, dropout_fc=0.5):
super(CNN3Layers, self).__init__()
self.CONV1 = nn.Sequential(
nn.Conv1d(1, 64, 5, 1, padding=2),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.MaxPool1d(2, 2),
nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout
)
self.CONV2 = nn.Sequential(
nn.Conv1d(64, 128, 5, 1, padding=2),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.MaxPool1d(2, 2),
nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout
)
self.CONV3 = nn.Sequential(
nn.Conv1d(128, 256, 3, 1, padding=1),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.AdaptiveMaxPool1d(1),
nn.Dropout(dropout_conv) # 在卷积层后添加 Dropout
)
self.fc = nn.Sequential(
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(dropout_fc), # 全连接层中的 Dropout
nn.Linear(128, nls)
)
def forward(self, x):
x = self.CONV1(x)
x = self.CONV2(x)
x = self.CONV3(x)
x = x.view(x.size(0), -1)
out = self.fc(x)
return out
# 训练函数
def CNNTrain(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls, model_path):
data_train, data_test = ZspPocess(X_train, X_test, y_train, y_test, need=True)
train_loader = torch.utils.data.DataLoader(data_train, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
model = CNN3Layers(nls=nls, dropout_conv=0.3, dropout_fc=0.5).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=5)
criterion = nn.CrossEntropyLoss().to(device)
scaler = GradScaler()
best_acc = 0.0
model_save_path = model_path
for epoch in range(n_epochs):
model.train()
train_acc, train_loss = [], []
for i, data in enumerate(train_loader):
inputs, labels = data
inputs = inputs.to(device).float()
labels = labels.to(device).long()
optimizer.zero_grad()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
_, predicted = torch.max(outputs.data, 1)
acc = accuracy_score(labels.cpu(), predicted.cpu())
train_acc.append(acc)
train_loss.append(loss.item())
avg_train_loss = np.mean(train_loss)
avg_train_acc = np.mean(train_acc)
writer.add_scalar('Loss/train', avg_train_loss, epoch)
writer.add_scalar('Accuracy/train', avg_train_acc, epoch)
# 测试集评估
model.eval()
test_acc, test_loss, test_precision, test_recall, test_f1 = [], [], [], [], []
y_true, y_pred = [], []
with torch.no_grad():
for data in test_loader:
inputs, labels = data
inputs = inputs.to(device).float()
labels = labels.to(device).long()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
_, predicted = torch.max(outputs.data, 1)
acc = accuracy_score(labels.cpu(), predicted.cpu())
precision = precision_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
recall = recall_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
f1 = f1_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
test_acc.append(acc)
test_loss.append(loss.item())
test_precision.append(precision)
test_recall.append(recall)
test_f1.append(f1)
avg_test_loss = np.mean(test_loss)
avg_test_acc = np.mean(test_acc)
avg_test_precision = np.mean(test_precision)
avg_test_recall = np.mean(test_recall)
avg_test_f1 = np.mean(test_f1)
writer.add_scalar('Loss/test', avg_test_loss, epoch)
writer.add_scalar('Accuracy/test', avg_test_acc, epoch)
writer.add_scalar('Precision/test', avg_test_precision, epoch)
writer.add_scalar('Recall/test', avg_test_recall, epoch)
writer.add_scalar('F1_Score/test', avg_test_f1, epoch)
# 打印每个 epoch 的训练和测试结果
print(f"Epoch [{epoch + 1}/{n_epochs}]")
print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {avg_train_acc:.4f}")
print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {avg_test_acc:.4f}")
print(f"Test Precision: {avg_test_precision:.4f}, Test Recall: {avg_test_recall:.4f}, Test F1: {avg_test_f1:.4f}")
if avg_test_acc > best_acc:
best_acc = avg_test_acc
torch.save(model.state_dict(), model_save_path)
scheduler.step(avg_test_loss)
return {
"accuracy": avg_test_acc,
"precision": avg_test_precision,
"recall": avg_test_recall,
"f1_score": avg_test_f1,
"confusion_matrix": confusion_matrix(y_true, y_pred)
}
# 测试函数
def CNNtest(X_test, y_test, BATCH_SIZE, nls, model_path):
# 标准化测试数据并创建 DataLoader
scaler = StandardScaler()
X_test = scaler.fit_transform(X_test) # 只对 X_test 进行标准化
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
# 创建测试数据集和 DataLoader
data_test = MyDataset(X_test, y_test, augment=False)
test_loader = torch.utils.data.DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
# 加载模型结构和权重
model = CNN3Layers(nls=nls).to(device)
model.load_state_dict(torch.load(model_path))
# 初始化评估指标
y_true, y_pred = [], []
# 测试过程
model.eval()
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device).float(), labels.to(device).long()
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
# 收集真实标签和预测标签
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
# 计算评估指标
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')
cm = confusion_matrix(y_true, y_pred)
# 返回评估结果
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1,
"confusion_matrix": cm
}
def optimize_CNN(X_train, X_test, y_train, y_test, model_path):
# 贝叶斯优化的搜索空间
param_space = {
'batch_size': Integer(16, 128), # batch size 的范围
'n_epochs': Integer(10, 100), # 训练 epochs 的范围
'dropout_conv': Real(0.1, 0.5, 'uniform'), # 卷积层 dropout 比例
'dropout_fc': Real(0.1, 0.5, 'uniform'), # 全连接层 dropout 比例
'lr': Real(1e-5, 1e-2, 'log-uniform'), # 学习率范围
}
# 训练模型的目标函数
def objective(params):
batch_size, n_epochs, dropout_conv, dropout_fc, lr = params
# 使用给定的超参数进行训练
train_metrics = CNNTrain(
X_train, X_test, y_train, y_test,
BATCH_SIZE=batch_size, n_epochs=n_epochs,
nls=21, model_path=model_path,
)
# 测试模型并返回评估指标
test_metrics = CNNtest(X_test, y_test, batch_size, nls=21, model_path=model_path)
# 我们以测试集的 accuracy 作为优化目标
return -test_metrics["accuracy"] # 贝叶斯优化是最小化目标函数,所以返回负值
# 使用贝叶斯优化进行调优
optimizer = BayesSearchCV(
estimator=None, # 不使用具体的模型,这里我们将目标函数传给贝叶斯优化
search_spaces=param_space, # 搜索空间
n_iter=20, # 调优的迭代次数
n_jobs=-1, # 使用所有可用的 CPU 核心
verbose=1, # 输出优化过程
random_state=42, # 固定随机种子
)
# 进行超参数调优
optimizer.fit(X_train, y_train)
# 输出最优超参数
best_params = optimizer.best_params_
print("Best hyperparameters:", best_params)
# 使用最优超参数训练并返回评估指标
batch_size = best_params['batch_size']
n_epochs = best_params['n_epochs']
dropout_conv = best_params['dropout_conv']
dropout_fc = best_params['dropout_fc']
lr = best_params['lr']
train_metrics = CNNTrain(
X_train, X_test, y_train, y_test,
BATCH_SIZE=batch_size, n_epochs=n_epochs,
nls=21, model_path=model_path,
)
test_metrics = CNNtest(X_test, y_test, batch_size, nls=21, model_path=model_path)
# 返回训练和测试的评估结果
return best_params, train_metrics, test_metrics

View File

@ -0,0 +1,330 @@
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
import numpy as np
import os
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
# 自定义数据集
class MyDataset(Dataset):
def __init__(self, specs, labels, augment=False):
self.specs = specs
self.labels = labels
self.augment = augment
def __getitem__(self, index):
spec, target = self.specs[index], self.labels[index]
if self.augment:
noise = 0.01 * torch.randn_like(spec)
spec = spec + noise
return spec, target
def __len__(self):
return len(self.specs)
# 数据标准化
def ZspProcess(X_train, X_test, y_train, y_test, need=True):
if need:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.tensor(X_train[:, np.newaxis, :], dtype=torch.float32)
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)
data_train = MyDataset(X_train, y_train, augment=True)
data_test = MyDataset(X_test, y_test, augment=False)
return data_train, data_test
# Focal Loss
class FocalLoss(nn.Module):
def __init__(self, alpha=1, gamma=2, reduction='mean'):
super(FocalLoss, self).__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, inputs, targets):
probs = torch.softmax(inputs, dim=1)
target_probs = probs[range(len(targets)), targets]
focal_weight = self.alpha * (1 - target_probs) ** self.gamma
log_prob = -torch.log(target_probs)
loss = focal_weight * log_prob
if self.reduction == 'mean':
return loss.mean()
elif self.reduction == 'sum':
return loss.sum()
else:
return loss
# 位置编码模块
class PositionalEncoding(nn.Module):
def __init__(self, embed_dim, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, embed_dim)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-torch.log(torch.tensor(10000.0)) / embed_dim))
pe[:, 0::2] = torch.sin(position * div_term) # 偶数维度
pe[:, 1::2] = torch.cos(position * div_term) # 奇数维度
pe = pe.unsqueeze(0).transpose(0, 1) # (max_len, 1, embed_dim)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(0), :]
# Transformer模块
class TransformerBlockWithSAE(nn.Module):
def __init__(self, embed_dim, ff_dim, dropout=0.1, max_len=5000):
super(TransformerBlockWithSAE, self).__init__()
self.query = nn.Linear(embed_dim, embed_dim)
self.key = nn.Linear(embed_dim, embed_dim)
self.value = nn.Linear(embed_dim, embed_dim)
self.scale = embed_dim ** 0.5
self.positional_encoding = PositionalEncoding(embed_dim, max_len)
self.feed_forward = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.ReLU(),
nn.Linear(ff_dim, embed_dim)
)
self.layernorm1 = nn.LayerNorm(embed_dim)
self.layernorm2 = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.positional_encoding(x)
q = self.query(x)
k = self.key(x)
v = self.value(x)
attn_weights = torch.matmul(q, k.transpose(-2, -1)) / self.scale
attn_weights = torch.softmax(attn_weights, dim=-1)
attn_output = torch.matmul(attn_weights, v)
x = self.layernorm1(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.layernorm2(x + self.dropout(ff_output))
return x
# 修改后的 CNN+Transformer 模型
class CNNWithSAE(nn.Module):
def __init__(self, nls, embed_dim=96, ff_dim=192, dropout=0.1, max_len=5000):
super(CNNWithSAE, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv1d(1, 64, kernel_size=5, stride=2, padding=2),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(0.2),
nn.MaxPool1d(2, 2)
)
self.conv2 = nn.Sequential(
nn.Conv1d(64, embed_dim, kernel_size=5, stride=2, padding=2),
nn.BatchNorm1d(embed_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.MaxPool1d(2, 2)
)
self.transformer = TransformerBlockWithSAE(embed_dim, ff_dim, dropout, max_len)
self.fc = nn.Sequential(
nn.Linear(embed_dim, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, nls)
)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = x.permute(2, 0, 1)
x = self.transformer(x)
x = x.mean(dim=0)
x = self.fc(x)
return x
# 修改后的 CNN+Transformer 模型
class CNNWithSAE(nn.Module):
def __init__(self, nls, embed_dim=96, ff_dim=192, dropout=0.1):
super(CNNWithSAE, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv1d(1, 64, kernel_size=5, stride=2, padding=2),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(0.2),
nn.MaxPool1d(2, 2)
)
self.conv2 = nn.Sequential(
nn.Conv1d(64, embed_dim, kernel_size=5, stride=2, padding=2),
nn.BatchNorm1d(embed_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.MaxPool1d(2, 2)
)
self.transformer = TransformerBlockWithSAE(embed_dim, ff_dim, dropout)
self.fc = nn.Sequential(
nn.Linear(embed_dim, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, nls)
)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = x.permute(2, 0, 1) # 调整为 Transformer 输入格式 (seq_len, batch, embed_dim)
x = self.transformer(x)
x = x.mean(dim=0) # 平均池化
x = self.fc(x)
return x
# 训练函数(包含早停机制)
def TransformerTrain(X_train, X_val, y_train, y_val, BATCH_SIZE, n_epochs, nls, model_path, patience=10):
data_train, data_val = ZspProcess(X_train, X_val, y_train, y_val, need=True)
train_loader = DataLoader(data_train, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(data_val, batch_size=BATCH_SIZE, shuffle=False)
model = CNNWithSAE(nls=nls).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)
criterion = FocalLoss(alpha=1, gamma=2).to(device)
scaler = GradScaler()
best_val_loss = float('inf')
early_stop_counter = 0
y_true_train, y_pred_train = [], []
for epoch in range(n_epochs):
model.train()
train_loss, train_acc = [], []
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
_, preds = torch.max(outputs, 1)
y_true_train.extend(labels.cpu().numpy())
y_pred_train.extend(preds.cpu().numpy())
acc = accuracy_score(labels.cpu(), preds.cpu())
train_loss.append(loss.item())
train_acc.append(acc)
# 验证集评估
model.eval()
val_loss = []
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss.append(loss.item())
avg_val_loss = np.mean(val_loss)
avg_train_loss = np.mean(train_loss)
avg_train_acc = np.mean(train_acc)
print(f"Epoch [{epoch+1}/{n_epochs}] - Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_acc:.4f}, Val Loss: {avg_val_loss:.4f}")
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
early_stop_counter = 0
torch.save(model.state_dict(), model_path)
print("Model improved and saved.")
else:
early_stop_counter += 1
print(f"No improvement. Early stop counter: {early_stop_counter}/{patience}")
if early_stop_counter >= patience:
print("Early stopping triggered.")
break
# 训练集指标
train_accuracy = accuracy_score(y_true_train, y_pred_train)
train_precision = precision_score(y_true_train, y_pred_train, average='weighted')
train_recall = recall_score(y_true_train, y_pred_train, average='weighted')
train_f1 = f1_score(y_true_train, y_pred_train, average='weighted')
train_cm = confusion_matrix(y_true_train, y_pred_train)
train_metrics = {
"accuracy": train_accuracy,
"precision": train_precision,
"recall": train_recall,
"f1_score": train_f1,
"confusion_matrix": train_cm
}
return model, train_metrics
# 测试函数
def TransformerTest(X_test, y_test, BATCH_SIZE, nls, model_path):
data_test = ZspProcess(X_test, X_test, y_test, y_test, need=True)[1]
test_loader = DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
model = CNNWithSAE(nls=nls).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
y_true, y_pred = [], []
test_loss = []
criterion = FocalLoss(alpha=1, gamma=2).to(device) # 使用 FocalLoss
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
_, preds = torch.max(outputs, 1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(preds.cpu().numpy())
test_loss.append(loss.item())
# 测试集指标
test_accuracy = accuracy_score(y_true, y_pred)
test_precision = precision_score(y_true, y_pred, average='weighted')
test_recall = recall_score(y_true, y_pred, average='weighted')
test_f1 = f1_score(y_true, y_pred, average='weighted')
test_cm = confusion_matrix(y_true, y_pred)
test_metrics = {
"accuracy": test_accuracy,
"precision": test_precision,
"recall": test_recall,
"f1_score": test_f1,
"confusion_matrix": test_cm
}
print(f"Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1 Score: {test_f1:.4f}")
print(f"Confusion Matrix:\n{test_cm}")
return test_metrics
def SAETrainAndTest(X,X_test, y, y_test, BATCH_SIZE, n_epochs, nls, model_path, val_split=0.2, patience=10):
# 从训练集中划分验证集
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=val_split, random_state=42)
# 训练模型并获取训练指标
model, train_metrics = TransformerTrain(X_train, X_val, y_train, y_val, BATCH_SIZE, n_epochs, nls, model_path, patience)
# 测试模型并获取测试指标
test_metrics = TransformerTest(X_test, y_test, BATCH_SIZE, nls, model_path)
return train_metrics, test_metrics

View File

@ -0,0 +1,268 @@
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
import numpy as np
import os
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
# 自定义数据集
class MyDataset(Dataset):
def __init__(self, specs, labels, augment=False):
self.specs = specs
self.labels = labels
self.augment = augment
def __getitem__(self, index):
spec, target = self.specs[index], self.labels[index]
if self.augment:
noise = 0.01 * torch.randn_like(spec)
spec = spec + noise
return spec, target
def __len__(self):
return len(self.specs)
# 数据标准化
def ZspProcess(X_train, X_test, y_train, y_test, need=True):
if need:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.tensor(X_train[:, np.newaxis, :], dtype=torch.float32)
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)
data_train = MyDataset(X_train, y_train, augment=True)
data_test = MyDataset(X_test, y_test, augment=False)
return data_train, data_test
# Focal Loss
class FocalLoss(nn.Module):
def __init__(self, alpha=1, gamma=2, reduction='mean'):
super(FocalLoss, self).__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, inputs, targets):
probs = torch.softmax(inputs, dim=1)
target_probs = probs[range(len(targets)), targets]
focal_weight = self.alpha * (1 - target_probs) ** self.gamma
log_prob = -torch.log(target_probs)
loss = focal_weight * log_prob
if self.reduction == 'mean':
return loss.mean()
elif self.reduction == 'sum':
return loss.sum()
else:
return loss
# Transformer模块
class TransformerBlock(nn.Module):
def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
super(TransformerBlock, self).__init__()
self.attention = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
self.feed_forward = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.ReLU(),
nn.Linear(ff_dim, embed_dim)
)
self.layernorm1 = nn.LayerNorm(embed_dim)
self.layernorm2 = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
attn_output, _ = self.attention(x, x, x)
x = self.layernorm1(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.layernorm2(x + self.dropout(ff_output))
return x
# 改进后的CNN+Transformer模型
class CNNWithTransformer(nn.Module):
def __init__(self, nls, embed_dim=96, num_heads=2, ff_dim=192, dropout=0.1):
super(CNNWithTransformer, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv1d(1, 64, kernel_size=5, stride=2, padding=2),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(0.2), # 添加Dropout
nn.MaxPool1d(2, 2)
)
self.conv2 = nn.Sequential(
nn.Conv1d(64, embed_dim, kernel_size=5, stride=2, padding=2),
nn.BatchNorm1d(embed_dim),
nn.ReLU(),
nn.Dropout(0.2), # 添加Dropout
nn.MaxPool1d(2, 2)
)
self.transformer = TransformerBlock(embed_dim, num_heads, ff_dim, dropout)
self.fc = nn.Sequential(
nn.Linear(embed_dim, 128),
nn.ReLU(),
nn.Dropout(0.3), # 添加Dropout
nn.Linear(128, nls)
)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = x.permute(2, 0, 1)
x = self.transformer(x)
x = x.mean(dim=0)
x = self.fc(x)
return x
# 训练函数(包含早停机制)
def TransformerTrain(X_train, X_val, y_train, y_val, BATCH_SIZE, n_epochs, nls, model_path, patience=10):
data_train, data_val = ZspProcess(X_train, X_val, y_train, y_val, need=True)
train_loader = DataLoader(data_train, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(data_val, batch_size=BATCH_SIZE, shuffle=False)
model = CNNWithTransformer(nls=nls).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)
criterion = FocalLoss(alpha=1, gamma=2).to(device)
scaler = GradScaler()
best_val_loss = float('inf')
early_stop_counter = 0
y_true_train, y_pred_train = [], []
for epoch in range(n_epochs):
model.train()
train_loss, train_acc = [], []
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
_, preds = torch.max(outputs, 1)
y_true_train.extend(labels.cpu().numpy())
y_pred_train.extend(preds.cpu().numpy())
acc = accuracy_score(labels.cpu(), preds.cpu())
train_loss.append(loss.item())
train_acc.append(acc)
# 验证集评估
model.eval()
val_loss = []
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss.append(loss.item())
avg_val_loss = np.mean(val_loss)
avg_train_loss = np.mean(train_loss)
avg_train_acc = np.mean(train_acc)
print(f"Epoch [{epoch+1}/{n_epochs}] - Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_acc:.4f}, Val Loss: {avg_val_loss:.4f}")
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
early_stop_counter = 0
torch.save(model.state_dict(), model_path)
print("Model improved and saved.")
else:
early_stop_counter += 1
print(f"No improvement. Early stop counter: {early_stop_counter}/{patience}")
if early_stop_counter >= patience:
print("Early stopping triggered.")
break
# 训练集指标
train_accuracy = accuracy_score(y_true_train, y_pred_train)
train_precision = precision_score(y_true_train, y_pred_train, average='weighted')
train_recall = recall_score(y_true_train, y_pred_train, average='weighted')
train_f1 = f1_score(y_true_train, y_pred_train, average='weighted')
train_cm = confusion_matrix(y_true_train, y_pred_train)
train_metrics = {
"accuracy": train_accuracy,
"precision": train_precision,
"recall": train_recall,
"f1_score": train_f1,
"confusion_matrix": train_cm
}
return model, train_metrics
# 测试函数
def TransformerTest(X_test, y_test, BATCH_SIZE, nls, model_path):
data_test = ZspProcess(X_test, X_test, y_test, y_test, need=True)[1]
test_loader = DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
model = CNNWithTransformer(nls=nls).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
y_true, y_pred = [], []
test_loss = []
criterion = FocalLoss(alpha=1, gamma=2).to(device) # 使用 FocalLoss
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
_, preds = torch.max(outputs, 1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(preds.cpu().numpy())
test_loss.append(loss.item())
# 测试集指标
test_accuracy = accuracy_score(y_true, y_pred)
test_precision = precision_score(y_true, y_pred, average='weighted')
test_recall = recall_score(y_true, y_pred, average='weighted')
test_f1 = f1_score(y_true, y_pred, average='weighted')
test_cm = confusion_matrix(y_true, y_pred)
test_metrics = {
"accuracy": test_accuracy,
"precision": test_precision,
"recall": test_recall,
"f1_score": test_f1,
"confusion_matrix": test_cm
}
print(f"Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1 Score: {test_f1:.4f}")
print(f"Confusion Matrix:\n{test_cm}")
return test_metrics
def TransformerTrainAndTest(X,X_test, y, y_test, BATCH_SIZE, n_epochs, nls, model_path, val_split=0.2, patience=10):
# 从训练集中划分验证集
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=val_split, random_state=42)
# 训练模型并获取训练指标
model, train_metrics = TransformerTrain(X_train, X_val, y_train, y_val, BATCH_SIZE, n_epochs, nls, model_path, patience)
# 测试模型并获取测试指标
test_metrics = TransformerTest(X_test, y_test, BATCH_SIZE, nls, model_path)
return train_metrics, test_metrics

View File

@ -0,0 +1,190 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import pandas as pd
# 设备配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 动态数据增强数据集
class SpectralDataset(Dataset):
def __init__(self, X, y, augment=False, input_length=462):
# 如果 X 是 DataFrame则转换为 numpy
if isinstance(X, pd.DataFrame):
X = X.values # 转换为 numpy 数组
if isinstance(y, pd.Series) or isinstance(y, pd.DataFrame):
y = y.values # 确保 y 也是 numpy 数组
# 确保 X 形状为 (N, L),然后扩展维度到 (N, 1, L)
assert len(X.shape) == 2, f"Expected X to be 2D, got {X.shape}"
self.X = torch.tensor(X[:, np.newaxis, :], dtype=torch.float32) # (N, 1, L)
self.y = torch.tensor(y, dtype=torch.long) # y 应该是一维的
self.augment = augment
self.input_length = input_length
def __getitem__(self, index):
x = self.X[index] # Shape: (1, L)
y = self.y[index]
if self.augment:
# 添加噪声
if torch.rand(1) < 0.7:
noise_level = torch.rand(1) * 0.05
x += noise_level * torch.randn_like(x)
# 光谱平移
if torch.rand(1) < 0.5:
shift = torch.randint(-5, 5, (1,)).item()
x = torch.roll(x, shifts=shift, dims=-1)
# 局部遮挡
if torch.rand(1) < 0.3:
start = torch.randint(0, self.input_length - 10, (1,)).item()
x[0, start:start + 10] = 0.0
return x, y
def __len__(self):
return len(self.X)
# 光谱注意力模块
class SpectralAttention(nn.Module):
def __init__(self, channel, reduction=8):
super().__init__()
self.avg_pool = nn.AdaptiveAvgPool1d(1)
self.fc = nn.Sequential(
nn.Linear(channel, channel // reduction),
nn.GELU(),
nn.Linear(channel // reduction, channel),
nn.Sigmoid()
)
def forward(self, x):
b, c, l = x.size()
y = self.avg_pool(x).view(b, c)
y = self.fc(y).view(b, c, 1)
return x * y.expand_as(x)
# CNN 模型
class AgroSpecCNN(nn.Module):
def __init__(self, input_length=462, num_classes=21):
super().__init__()
self.input_length = input_length
self.features = nn.Sequential(
nn.Conv1d(1, 64, 5, padding=2), # 使用更大的 kernel
nn.BatchNorm1d(64),
nn.GELU(),
SpectralAttention(64),
nn.MaxPool1d(2), # 池化层
nn.Conv1d(64, 128, 5, padding=2),
nn.BatchNorm1d(128),
nn.GELU(),
SpectralAttention(128),
nn.AdaptiveAvgPool1d(self.input_length // 2), # 自适应池化根据输入大小调整
nn.Conv1d(128, 256, 5, padding=2),
nn.BatchNorm1d(256),
nn.GELU(),
nn.AdaptiveAvgPool1d(1) # 最终池化为 1 维
)
self.classifier = nn.Sequential(
nn.Linear(256, 128),
nn.GELU(),
nn.Dropout(0.3),
nn.Linear(128, num_classes)
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1) # 扁平化处理
return self.classifier(x)
# 训练过程
def CNNTrain(X_train, y_train, BATCH_SIZE, n_epochs, input_length, num_classes, model_path):
train_set = SpectralDataset(X_train, y_train, augment=True, input_length=input_length)
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
model = AgroSpecCNN(input_length, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
for epoch in range(n_epochs):
model.train()
total_loss, correct, total = 0, 0, 0
for x, y in train_loader:
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
outputs = model(x)
loss = criterion(outputs, y)
loss.backward()
optimizer.step()
total_loss += loss.item()
_, predicted = outputs.max(1)
total += y.size(0)
correct += predicted.eq(y).sum().item()
print(f"Epoch {epoch+1}/{n_epochs} - Loss: {total_loss / len(train_loader):.4f}, Accuracy: {correct / total:.4f}")
torch.save(model.state_dict(), model_path)
return {"train_loss": total_loss / len(train_loader), "train_accuracy": correct / total}
# 测试过程
def CNNTest(X_test, y_test, BATCH_SIZE, input_length, num_classes, model_path):
test_set = SpectralDataset(X_test, y_test, augment=False, input_length=input_length)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)
model = AgroSpecCNN(input_length, num_classes).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
total_loss, correct, total = 0, 0, 0
all_preds, all_targets = [], []
criterion = nn.CrossEntropyLoss()
with torch.no_grad():
for x, y in test_loader:
x, y = x.to(device), y.to(device)
outputs = model(x)
loss = criterion(outputs, y)
total_loss += loss.item()
_, predicted = outputs.max(1)
total += y.size(0)
correct += predicted.eq(y).sum().item()
all_preds.extend(predicted.cpu().numpy())
all_targets.extend(y.cpu().numpy())
metrics = {
"test_loss": total_loss / len(test_loader),
"test_accuracy": correct / total,
"precision": precision_score(all_targets, all_preds, average='weighted'),
"recall": recall_score(all_targets, all_preds, average='weighted'),
"f1": f1_score(all_targets, all_preds, average='weighted'),
"confusion_matrix": confusion_matrix(all_targets, all_preds)
}
return metrics
# 统一的 CNN 训练与测试调用
def CNN_deepseek(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, input_length, num_classes, model_path):
train_metrics = CNNTrain(X_train, y_train, BATCH_SIZE, n_epochs, input_length, num_classes, model_path)
test_metrics = CNNTest(X_test, y_test, BATCH_SIZE, input_length, num_classes, model_path)
return train_metrics, test_metrics

View File

@ -0,0 +1,309 @@
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import GradScaler, autocast
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from torch.utils.tensorboard import SummaryWriter
# 设置设备和TensorBoard记录器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
writer = SummaryWriter() # 初始化 TensorBoard
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
# ---------------------------
# 数据集及数据预处理函数
# ---------------------------
class MyDataset(Dataset):
"""
自定义数据集,支持数据增强(在训练时添加噪声)
"""
def __init__(self, specs, labels, augment=False):
self.specs = specs
self.labels = labels
self.augment = augment
def __getitem__(self, index):
spec, target = self.specs[index], self.labels[index]
if self.augment:
noise = 0.01 * torch.randn_like(spec)
spec = spec + noise
return spec, target
def __len__(self):
return len(self.specs)
def ZspProcess(X_train, X_test, y_train, y_test, need=True):
"""
标准化数据并转换为Tensor转换后数据形状为 (样本数, 1, 特征数)
"""
if need:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.tensor(X_train[:, np.newaxis, :], dtype=torch.float32)
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)
data_train = MyDataset(X_train, y_train, augment=True)
data_test = MyDataset(X_test, y_test, augment=False)
return data_train, data_test
# ---------------------------
# 模型定义
# ---------------------------
class CNN3Layers(nn.Module):
"""
三层1D卷积神经网络支持自定义卷积层后Dropout率以及全连接层Dropout率
"""
def __init__(self, nls, dropout_conv=0.3, dropout_fc=0.5):
super(CNN3Layers, self).__init__()
self.CONV1 = nn.Sequential(
nn.Conv1d(1, 64, kernel_size=5, stride=1, padding=2),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=2),
nn.Dropout(dropout_conv)
)
self.CONV2 = nn.Sequential(
nn.Conv1d(64, 128, kernel_size=5, stride=1, padding=2),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=2),
nn.Dropout(dropout_conv)
)
self.CONV3 = nn.Sequential(
nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.AdaptiveMaxPool1d(1),
nn.Dropout(dropout_conv)
)
self.fc = nn.Sequential(
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(dropout_fc),
nn.Linear(128, nls)
)
def forward(self, x):
x = self.CONV1(x)
x = self.CONV2(x)
x = self.CONV3(x)
x = x.view(x.size(0), -1)
out = self.fc(x)
return out
# ---------------------------
# 训练与测试函数
# ---------------------------
def CNNTrain(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls, model_path, dropout_conv, dropout_fc):
"""
训练过程:训练指定轮次,记录训练与测试指标,并保存测试准确率最高的模型
"""
data_train, data_test = ZspProcess(X_train, X_test, y_train, y_test, need=True)
train_loader = DataLoader(data_train, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
model = CNN3Layers(nls=nls, dropout_conv=dropout_conv, dropout_fc=dropout_fc).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
criterion = nn.CrossEntropyLoss().to(device)
scaler = GradScaler()
best_acc = 0.0
# 用于记录最后一次测试的预测结果(用于计算混淆矩阵等指标)
final_y_true, final_y_pred = [], []
for epoch in range(n_epochs):
model.train()
train_acc_list, train_loss_list = [], []
for inputs, labels in train_loader:
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
_, predicted = torch.max(outputs.data, 1)
acc = accuracy_score(labels.cpu(), predicted.cpu())
train_acc_list.append(acc)
train_loss_list.append(loss.item())
avg_train_loss = np.mean(train_loss_list)
avg_train_acc = np.mean(train_acc_list)
writer.add_scalar('Loss/train', avg_train_loss, epoch)
writer.add_scalar('Accuracy/train', avg_train_acc, epoch)
# 测试过程
model.eval()
test_acc_list, test_loss_list = [], []
test_precision_list, test_recall_list, test_f1_list = [], [], []
y_true, y_pred = [], []
with torch.no_grad():
for inputs, labels in test_loader:
inputs = inputs.to(device)
labels = labels.to(device)
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
_, predicted = torch.max(outputs.data, 1)
acc = accuracy_score(labels.cpu(), predicted.cpu())
prec = precision_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
rec = recall_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
f1 = f1_score(labels.cpu(), predicted.cpu(), average='weighted', zero_division=1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
test_acc_list.append(acc)
test_loss_list.append(loss.item())
test_precision_list.append(prec)
test_recall_list.append(rec)
test_f1_list.append(f1)
avg_test_loss = np.mean(test_loss_list)
avg_test_acc = np.mean(test_acc_list)
avg_test_precision = np.mean(test_precision_list)
avg_test_recall = np.mean(test_recall_list)
avg_test_f1 = np.mean(test_f1_list)
writer.add_scalar('Loss/test', avg_test_loss, epoch)
writer.add_scalar('Accuracy/test', avg_test_acc, epoch)
writer.add_scalar('Precision/test', avg_test_precision, epoch)
writer.add_scalar('Recall/test', avg_test_recall, epoch)
writer.add_scalar('F1_Score/test', avg_test_f1, epoch)
print(f"Epoch [{epoch + 1}/{n_epochs}]: Train Loss={avg_train_loss:.4f}, Train Acc={avg_train_acc:.4f} | "
f"Test Loss={avg_test_loss:.4f}, Test Acc={avg_test_acc:.4f}, Precision={avg_test_precision:.4f}, "
f"Recall={avg_test_recall:.4f}, F1={avg_test_f1:.4f}")
# 如果当前测试准确率更好则保存模型
if avg_test_acc > best_acc:
best_acc = avg_test_acc
torch.save(model.state_dict(), model_path)
final_y_true = y_true.copy()
final_y_pred = y_pred.copy()
scheduler.step(avg_test_loss)
train_metrics = {
"train_loss": avg_train_loss,
"train_accuracy": avg_train_acc
}
test_metrics = {
"test_loss": avg_test_loss,
"test_accuracy": avg_test_acc,
"precision": avg_test_precision,
"recall": avg_test_recall,
"f1_score": avg_test_f1,
"confusion_matrix": confusion_matrix(final_y_true, final_y_pred)
}
return train_metrics, test_metrics
def CNNTest(X_test, y_test, BATCH_SIZE, nls, model_path, dropout_conv, dropout_fc):
"""
加载保存的模型,并在测试集上计算各项指标
"""
# 仅对测试集进行标准化处理
scaler = StandardScaler()
X_test = scaler.fit_transform(X_test)
X_test = torch.tensor(X_test[:, np.newaxis, :], dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
data_test = MyDataset(X_test, y_test, augment=False)
test_loader = DataLoader(data_test, batch_size=BATCH_SIZE, shuffle=False)
model = CNN3Layers(nls=nls, dropout_conv=dropout_conv, dropout_fc=dropout_fc).to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()
y_true, y_pred = [], []
test_loss_list = []
criterion = nn.CrossEntropyLoss().to(device)
with torch.no_grad():
for inputs, labels in test_loader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
_, predicted = torch.max(outputs.data, 1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
test_loss_list.append(loss.item())
avg_loss = np.mean(test_loss_list)
acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred, average='weighted', zero_division=1)
rec = recall_score(y_true, y_pred, average='weighted', zero_division=1)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=1)
test_metrics = {
"test_loss": avg_loss,
"test_accuracy": acc,
"precision": prec,
"recall": rec,
"f1_score": f1,
"confusion_matrix": confusion_matrix(y_true, y_pred)
}
return test_metrics
# ---------------------------
# 自定义随机搜索超参数优化函数
# ---------------------------
def optimize_hyperparameters(X_train, X_test, y_train, y_test, nls, n_iter=10, BATCH_SIZE=32, n_epochs=10):
"""
随机搜索指定次数,每次随机采样超参数(这里以 dropout_conv 和 dropout_fc 为例),
对模型进行训练和测试,最后返回使测试准确率最高的超参数配置以及对应的训练和测试指标。
"""
best_test_acc = -1.0
best_params = None
best_train_metrics = None
best_test_metrics = None
for i in range(n_iter):
# 从均匀分布中随机采样超参数
dropout_conv = np.random.uniform(0.2, 0.7) # 可根据需要调整取值范围
dropout_fc = np.random.uniform(0.3, 0.8)
print(f"\nIteration {i + 1}/{n_iter}: Testing dropout_conv={dropout_conv:.4f}, dropout_fc={dropout_fc:.4f}")
# 指定模型保存路径(每次覆盖保存最佳模型)
model_path = "best_model.pth"
# 训练模型
train_metrics, _ = CNNTrain(X_train, X_test, y_train, y_test, BATCH_SIZE, n_epochs, nls, model_path,
dropout_conv, dropout_fc)
# 评估测试指标(加载保存的最佳模型)
test_metrics = CNNTest(X_test, y_test, BATCH_SIZE, nls, model_path, dropout_conv, dropout_fc)
current_test_acc = test_metrics["test_accuracy"]
print(f"Iteration {i + 1} result: Test Accuracy = {current_test_acc:.4f}")
# 更新最佳超参数
if current_test_acc > best_test_acc:
best_test_acc = current_test_acc
best_params = {"dropout_conv": dropout_conv, "dropout_fc": dropout_fc}
best_train_metrics = train_metrics
best_test_metrics = test_metrics
return best_params, best_train_metrics, best_test_metrics

View File

@ -0,0 +1,327 @@
import numpy as np
import pandas as pd
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, precision_score, recall_score, f1_score
import sklearn.svm as svm
from sklearn.cross_decomposition import PLSRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, cross_val_score, train_test_split
import xgboost as xgb
import lightgbm as lgb
import catboost as cb
# import torch
# from torch import nn, optim
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
# 固定随机种子
def set_random_seed(seed=42):
np.random.seed(seed)
set_random_seed()
# 交叉验证(多核心支持)
def cross_validate_model(model, X, y, cv=5, n_jobs=-1):
"""
多核心交叉验证
"""
scores = cross_val_score(model, X, y, cv=cv, n_jobs=n_jobs)
print(f"Cross-validation accuracy: {scores.mean():.4f} ± {scores.std():.4f}")
return scores
# 混淆矩阵与分类报告
def evaluate_model(y_true, y_pred, dataset_name="Test"):
"""
性能评估,包含分类报告和混淆矩阵。
"""
print(f"{dataset_name} Classification Report:")
print(classification_report(y_true, y_pred))
# 计算混淆矩阵
cm = confusion_matrix(y_true, y_pred)
# 返回多个性能指标的字典,包括混淆矩阵
return {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='weighted'),
"recall": recall_score(y_true, y_pred, average='weighted'),
"f1_score": f1_score(y_true, y_pred, average='weighted'),
"confusion_matrix": cm
}
# 神经网络模型ANN
# 神经网络模型ANN
# 逻辑回归模型 (Logistic Regression)
def LogisticRegressionModel(X_train, X_test, y_train, y_test, penalty='l2', C=1.0, solver='lbfgs', max_iter=200):
"""
逻辑回归模型(适用于多分类任务)
:param penalty: 正则化类型 ('l1', 'l2', 'elasticnet', 'none')
:param C: 正则化强度的倒数(较小的 C 代表更强的正则化)
:param solver: 优化算法('lbfgs', 'liblinear', 'saga', etc.
:param max_iter: 训练的最大迭代次数
"""
# 使用 multinomial 来处理多分类问题
model = LogisticRegression(penalty=penalty, C=C, solver=solver, max_iter=max_iter, multi_class='multinomial', random_state=1)
# 交叉验证
cross_validate_model(model, X_train, y_train)
# 模型拟合
model.fit(X_train, y_train.ravel())
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# SVM 模型
def SVM(X_train, X_test, y_train, y_test, kernel='linear', C=1, gamma=1e-3):
clf = svm.SVC(C=C, kernel=kernel, gamma=gamma)
# 交叉验证
cross_validate_model(clf, X_train, y_train)
# 模型拟合
clf.fit(X_train, y_train.ravel())
# 训练集评估
y_train_pred = clf.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = clf.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# PLS-DA 模型
def PLS_DA(X_train, X_test, y_train, y_test, n_components=40):
y_train = pd.get_dummies(y_train) # One-hot 编码
model = PLSRegression(n_components=n_components)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
y_train_pred = np.argmax(y_train_pred, axis=1)
train_metrics = evaluate_model(np.argmax(y_train.values, axis=1), y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
y_test_pred = np.argmax(y_test_pred, axis=1)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# 随机森林模型RF
def RF(X_train, X_test, y_train, y_test, n_estimators=200, max_depth=15, n_jobs=-1):
clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=1, n_jobs=n_jobs)
# 交叉验证
cross_validate_model(clf, X_train, y_train, n_jobs=n_jobs)
# 模型拟合
clf.fit(X_train, y_train.ravel())
# 训练集评估
y_train_pred = clf.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = clf.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# XGBoost 模型
# 网格搜索超参数优化
# 神经网络模型ANN使用 PyTorch 实现
# def ANN(X_train, X_test, y_train, y_test, hidden_layer_sizes=(50, 30), max_iter=500):
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 检测 GPU
# X_train = torch.tensor(X_train, device=device, dtype=torch.float32)
# X_test = torch.tensor(X_test, device=device, dtype=torch.float32)
# y_train = torch.tensor(y_train, device=device, dtype=torch.long)
# y_test = torch.tensor(y_test, device=device, dtype=torch.long)
#
# # 定义简单的神经网络
# class SimpleNN(nn.Module):
# def __init__(self, input_size, hidden_sizes, output_size):
# super(SimpleNN, self).__init__()
# self.fc1 = nn.Linear(input_size, hidden_sizes[0])
# self.fc2 = nn.Linear(hidden_sizes[0], hidden_sizes[1])
# self.fc3 = nn.Linear(hidden_sizes[1], output_size)
#
# def forward(self, x):
# x = torch.relu(self.fc1(x))
# x = torch.relu(self.fc2(x))
# x = self.fc3(x)
# return x
#
# model = SimpleNN(X_train.shape[1], hidden_layer_sizes, len(torch.unique(y_train))).to(device)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)
#
# # 训练模型
# for epoch in range(max_iter):
# optimizer.zero_grad()
# outputs = model(X_train)
# loss = criterion(outputs, y_train)
# loss.backward()
# optimizer.step()
#
# # 训练集评估
# with torch.no_grad():
# y_train_pred = torch.argmax(model(X_train), dim=1)
# train_metrics = evaluate_model(y_train.cpu(), y_train_pred.cpu(), dataset_name="Train")
#
# y_test_pred = torch.argmax(model(X_test), dim=1)
# test_metrics = evaluate_model(y_test.cpu(), y_test_pred.cpu(), dataset_name="Test")
#
# return train_metrics, test_metrics
# XGBoost 模型
def XGBoost(X_train, X_test, y_train, y_test, n_estimators=100, learning_rate=0.1, max_depth=3):
model = xgb.XGBClassifier(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
random_state=1,
# tree_method='gpu_hist', # 使用 GPU 加速
gpu_id=0
)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# LightGBM 模型
def LightGBM(X_train, X_test, y_train, y_test, n_estimators=100, learning_rate=0.1, max_depth=-1, num_leaves=31):
model = lgb.LGBMClassifier(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
num_leaves=num_leaves,
random_state=1,
# device='gpu' # 使用 GPU 加速
)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# CatBoost 模型
def CatBoost(X_train, X_test, y_train, y_test, iterations=500, learning_rate=0.1, depth=6):
model = cb.CatBoostClassifier(
iterations=iterations,
learning_rate=learning_rate,
depth=depth,
random_seed=1,
# task_type='GPU', # 使用 GPU
verbose=0
)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
# AdaBoost 模型
def AdaBoost(X_train, X_test, y_train, y_test, n_estimators=50, learning_rate=1.0):
"""
AdaBoost多分类模型的实现
:param n_estimators: 基学习器的数量(迭代次数)
:param learning_rate: 学习率(对每个基学习器的贡献进行缩放)
"""
# 使用决策树作为基学习器
base_estimator = DecisionTreeClassifier(max_depth=1)
# 创建AdaBoost模型并移除不必要的参数
model = AdaBoostClassifier(
base_estimator=base_estimator,
n_estimators=n_estimators,
learning_rate=learning_rate,
random_state=1
)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics
def KNN(X_train, X_test, y_train, y_test, n_neighbors=5, weights='uniform', algorithm='auto'):
"""
K-Nearest Neighbors 模型实现
:param n_neighbors: 最近邻的数量
:param weights: 'uniform''distance',决定邻居的权重
:param algorithm: 'auto', 'ball_tree', 'kd_tree', 'brute',用于计算邻居的算法
"""
# 创建 KNN 模型
model = KNeighborsClassifier(n_neighbors=n_neighbors, weights=weights, algorithm=algorithm)
# 交叉验证
cross_validate_model(model, X_train, y_train)
# 模型拟合
model.fit(X_train, y_train)
# 训练集评估
y_train_pred = model.predict(X_train)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
# 测试集评估
y_test_pred = model.predict(X_test)
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return train_metrics, test_metrics

View File

@ -0,0 +1,235 @@
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, precision_score, recall_score, f1_score
import sklearn.svm as svm
from sklearn.cross_decomposition import PLSRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, cross_val_score, train_test_split
from skopt import BayesSearchCV
from skopt.space import Real, Integer
from xgboost import XGBClassifier
import lightgbm as lgb
import catboost as cb
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from sklearn.neural_network import MLPClassifier
# 固定随机种子
def set_random_seed(seed=42):
np.random.seed(seed)
set_random_seed()
# 交叉验证(多核心支持)
def cross_validate_model(model, X, y, cv=5, n_jobs=-1):
"""
多核心交叉验证
"""
scores = cross_val_score(model, X, y, cv=cv, n_jobs=n_jobs)
print(f"Cross-validation accuracy: {scores.mean():.4f} ± {scores.std():.4f}")
return scores
# 混淆矩阵与分类报告
def evaluate_model(y_true, y_pred, dataset_name="Test"):
"""
性能评估,包含分类报告和混淆矩阵。
"""
print(f"{dataset_name} Classification Report:")
print(classification_report(y_true, y_pred))
# 计算混淆矩阵
cm = confusion_matrix(y_true, y_pred)
# 返回多个性能指标的字典包括F1分数
return {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='weighted'),
"recall": recall_score(y_true, y_pred, average='weighted'),
"f1_score": f1_score(y_true, y_pred, average='weighted'),
"confusion_matrix": cm
}
# 逻辑回归模型 (Logistic Regression)
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
import lightgbm as lgb
import catboost as cb
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
# 1. SVM 贝叶斯优化
def optimize_SVM(X_train, y_train, X_test, y_test):
param_space = {
'C': (0.01, 10.0, 'uniform'),
'kernel': ['linear', 'poly', 'rbf', 'sigmoid'],
'gamma': (1e-4, 1e-1, 'log-uniform')
}
model = SVC()
optimizer = BayesSearchCV(model, param_space, n_iter=50, cv=5, n_jobs=-1, verbose=0, scoring='f1_weighted') # 使用f1_weighted作为评分标准
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
# 使用最优超参数训练并评估模型
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return best_params, train_metrics, test_metrics
# 2. KNN 贝叶斯优化
def optimize_KNN(X_train, y_train, X_test, y_test):
param_space = {
'n_neighbors': (1, 20),
'weights': ['uniform', 'distance'],
'algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute']
}
model = KNeighborsClassifier()
optimizer = BayesSearchCV(model, param_space, n_iter=50, cv=5, n_jobs=-1, verbose=0, scoring='f1_weighted') # 使用f1_weighted作为评分标准
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
# 使用最优超参数训练并评估模型
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return best_params, train_metrics, test_metrics
# 3. XGBoost 贝叶斯优化
def optimize_XGBoost(X_train, y_train, X_test, y_test):
param_space = {
'n_estimators': Integer(50, 500),
'max_depth': Integer(3, 10),
'learning_rate': Real(1e-4, 1.0, prior='log-uniform'),
'subsample': Real(0.1, 1.0),
'colsample_bytree': Real(0.1, 1.0)
}
model = XGBClassifier(tree_method='gpu_hist', gpu_id=0)
optimizer = BayesSearchCV(model, param_space, n_iter=50, cv=5, n_jobs=-1, verbose=0, scoring='f1_weighted') # 使用f1_weighted作为评分标准
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return best_params, train_metrics, test_metrics
# 4. Random Forest 贝叶斯优化
def optimize_RF(X_train, y_train, X_test, y_test):
param_space = {
'n_estimators': (50, 500),
'max_depth': (3, 15),
'min_samples_split': (2, 20),
'min_samples_leaf': (1, 20),
'max_features': ['auto', 'sqrt', 'log2']
}
model = RandomForestClassifier(random_state=42)
optimizer = BayesSearchCV(model, param_space, n_iter=50, cv=5, n_jobs=-1, verbose=0, scoring='f1_weighted') # 使用f1_weighted作为评分标准
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
# 使用最优超参数训练并评估模型
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return best_params, train_metrics, test_metrics
# 5. CatBoost 贝叶斯优化
def optimize_CatBoost(X_train, y_train, X_test, y_test):
param_space = {
'iterations': (50, 500),
'learning_rate': (0.01, 0.3, 'uniform'),
'depth': (3, 10),
'l2_leaf_reg': (1, 10, 'uniform'),
'bagging_temperature': (0, 1, 'uniform')
}
model = cb.CatBoostClassifier(task_type='GPU', random_seed=42, verbose=0)
optimizer = BayesSearchCV(model, param_space, n_iter=50, cv=5, n_jobs=-1, verbose=0, scoring='f1_weighted') # 使用f1_weighted作为评分标准
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
# 使用最优超参数训练并评估模型
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return best_params, train_metrics, test_metrics
# 6. Logistic Regression 贝叶斯优化
def optimize_LogisticRegression(X_train, y_train, X_test, y_test):
param_space = {
'C': (1e-5, 1e5, 'log-uniform'),
'penalty': ['l1', 'l2'],
'solver': ['lbfgs', 'liblinear', 'saga']
}
model = LogisticRegression(multi_class='multinomial', random_state=42)
optimizer = BayesSearchCV(model, param_space, n_iter=50, cv=5, n_jobs=-1, verbose=0, scoring='f1_weighted') # 使用f1_weighted作为评分标准
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
# 使用最优超参数训练并评估模型
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return best_params, train_metrics, test_metrics
# 7. Neural Network (ANN) 贝叶斯优化
def optimize_ANN(X_train, y_train, X_test, y_test):
param_space = {
'hidden_layer_sizes': [(10,), (50,), (100,), (10, 10), (50, 50)],
'activation': ['relu', 'tanh', 'logistic'],
'solver': ['adam', 'sgd'],
'alpha': (1e-5, 1e-1, 'log-uniform'),
'learning_rate': ['constant', 'invscaling', 'adaptive']
}
model = MLPClassifier(max_iter=500, random_state=42)
optimizer = BayesSearchCV(model, param_space, n_iter=50, cv=5, n_jobs=-1, verbose=0, scoring='f1_weighted') # 使用f1_weighted作为评分标准
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
# 使用最优超参数训练并评估模型
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, dataset_name="Train")
test_metrics = evaluate_model(y_test, y_test_pred, dataset_name="Test")
return best_params, train_metrics, test_metrics

View File

@ -0,0 +1,306 @@
import numpy as np
from sklearn.metrics import f1_score, classification_report
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, train_test_split, StratifiedKFold
from scipy.stats import loguniform, randint
from xgboost import XGBClassifier
import lightgbm as lgb
import catboost as cb
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import gc
import os
# 固定随机种子
def set_random_seed(seed=42):
np.random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
set_random_seed()
# 性能评估函数
def evaluate_model(y_true, y_pred, dataset_name="Test"):
print(f"\n{dataset_name} Classification Report:")
print(classification_report(y_true, y_pred))
cm = confusion_matrix(y_true, y_pred)
return {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='weighted'),
"recall": recall_score(y_true, y_pred, average='weighted'),
"f1_score": f1_score(y_true, y_pred, average='weighted'),
"confusion_matrix": cm
}
# 优化XGBoost
def optimize_XGBoost(X_train, y_train, X_test, y_test):
param_dist = {
'max_depth': randint(3, 10), # 控制树的最大深度
'learning_rate': loguniform(1e-3, 0.2), # 控制每棵树对最终结果的贡献
'subsample': [0.6, 0.8, 1.0], # 在每次迭代中随机选择部分样本进行训练
'colsample_bytree': [0.6, 0.8, 1.0], # 在每棵树的构建过程中,随机选择部分特征
'n_estimators': randint(100, 300), # 树的数量
'min_child_weight': randint(1, 10), # 子叶节点的最小样本权重和
'gamma': [0, 0.1, 0.2] # 控制模型切分节点的最小损失函数值
}
model = XGBClassifier(
tree_method='gpu_hist',
gpu_id=0,
use_label_encoder=False,
eval_metric='mlogloss',
objective='multi:softmax',
num_class=len(np.unique(y_train))
)
optimizer = RandomizedSearchCV(
model,
param_distributions=param_dist,
n_iter=30,
cv=StratifiedKFold(n_splits=3),
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
print(f"Best XGBoost Hyperparameters: {best_params}")
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, "Train")
test_metrics = evaluate_model(y_test, y_test_pred, "Test")
gc.collect()
return best_params, train_metrics, test_metrics
# 优化LightGBM
def optimize_LightGBM(X_train, y_train, X_test, y_test):
param_dist = {
'num_leaves': randint(20, 50), # 控制树的复杂度
'learning_rate': loguniform(1e-3, 0.2), # 控制每棵树对最终结果的贡献
'subsample': [0.6, 0.8, 1.0], # 在每次迭代中随机选择部分样本进行训练
'colsample_bytree': [0.6, 0.8, 1.0], # 在每棵树的构建过程中随机选择部分特征
'n_estimators': randint(100, 300), # 树的数量
'min_child_samples': randint(10, 100), # 叶子节点的最小样本数
'max_depth': [None, 3, 5, 7] # 树的最大深度
}
model = lgb.LGBMClassifier(
device_type='gpu',
objective='multiclass',
num_class=len(np.unique(y_train))
)
optimizer = RandomizedSearchCV(
model,
param_distributions=param_dist,
n_iter=30,
cv=StratifiedKFold(n_splits=3),
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
print(f"Best LightGBM Hyperparameters: {best_params}")
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, "Train")
test_metrics = evaluate_model(y_test, y_test_pred, "Test")
gc.collect()
return best_params, train_metrics, test_metrics
# 优化CatBoost
def optimize_CatBoost(X_train, y_train, X_test, y_test):
param_dist = {
'depth': randint(4, 8), # 控制树的深度
'learning_rate': loguniform(1e-3, 0.2), # 控制每棵树对最终结果的贡献
'l2_leaf_reg': randint(1, 10), # L2正则化系数
'iterations': randint(100, 300), # 树的数量
'border_count': [32, 64, 128] # 分割点的数量
}
model = cb.CatBoostClassifier(
task_type='GPU',
verbose=0,
loss_function='MultiClass'
)
optimizer = RandomizedSearchCV(
model,
param_distributions=param_dist,
n_iter=30,
cv=StratifiedKFold(n_splits=3),
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
print(f"Best CatBoost Hyperparameters: {best_params}")
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, "Train")
test_metrics = evaluate_model(y_test, y_test_pred, "Test")
gc.collect()
return best_params, train_metrics, test_metrics
# 优化SVM
def optimize_SVM(X_train, y_train, X_test, y_test):
param_dist = {
'C': loguniform(1e-2, 10), # 惩罚参数
'kernel': ['linear', 'rbf'], # 核函数
'gamma': loguniform(1e-4, 1e-1) # 核函数的系数
}
model = SVC(probability=True)
optimizer = RandomizedSearchCV(
model,
param_distributions=param_dist,
n_iter=30,
cv=StratifiedKFold(n_splits=3),
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
print(f"Best SVM Hyperparameters: {best_params}")
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, "Train")
test_metrics = evaluate_model(y_test, y_test_pred, "Test")
return best_params, train_metrics, test_metrics
# 优化KNN
def optimize_KNN(X_train, y_train, X_test, y_test):
param_grid = {
'n_neighbors': list(range(3, 20, 2)), # 邻居的数量
'weights': ['uniform', 'distance'], # 权重函数
'p': [1, 2] # 距离度量
}
model = KNeighborsClassifier(algorithm='brute')
optimizer = GridSearchCV(
model,
param_grid=param_grid,
cv=StratifiedKFold(n_splits=3),
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
print(f"Best KNN Hyperparameters: {best_params}")
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, "Train")
test_metrics = evaluate_model(y_test, y_test_pred, "Test")
return best_params, train_metrics, test_metrics
# 优化LogisticRegression
def optimize_LogisticRegression(X_train, y_train, X_test, y_test):
param_grid = {
'C': loguniform(1e-4, 1e2), # 正则化强度
'penalty': ['l2', None], # 正则化类型
'solver': ['lbfgs', 'sag', 'saga'] # 优化算法
}
model = LogisticRegression(max_iter=1000, random_state=42)
optimizer = RandomizedSearchCV(
model,
param_distributions=param_grid,
n_iter=30,
cv=StratifiedKFold(n_splits=3),
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
print(f"Best Logistic Regression Hyperparameters: {best_params}")
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, "Train")
test_metrics = evaluate_model(y_test, y_test_pred, "Test")
return best_params, train_metrics, test_metrics
# 优化RandomForest
def optimize_RF(X_train, y_train, X_test, y_test):
param_dist = {
'n_estimators': randint(100, 300), # 树的数量
'max_depth': [None, 3, 5, 7], # 树的最大深度
'min_samples_split': randint(2, 10), # 分裂内部节点所需的最小样本数
'min_samples_leaf': randint(1, 10), # 叶子节点的最小样本数
'bootstrap': [True, False], # 是否使用自助法采样
'criterion': ['gini', 'entropy'] # 划分的标准
}
model = RandomForestClassifier(random_state=42)
optimizer = RandomizedSearchCV(
model,
param_distributions=param_dist,
n_iter=30,
cv=StratifiedKFold(n_splits=3),
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
optimizer.fit(X_train, y_train)
best_params = optimizer.best_params_
print(f"Best Random Forest Hyperparameters: {best_params}")
best_model = optimizer.best_estimator_
y_train_pred = best_model.predict(X_train)
y_test_pred = best_model.predict(X_test)
train_metrics = evaluate_model(y_train, y_train_pred, "Train")
test_metrics = evaluate_model(y_test, y_test_pred, "Test")
return best_params, train_metrics, test_metrics

View File

@ -0,0 +1,49 @@
from classification_model.Classification.ClassicCls import SVM, PLS_DA, RF, XGBoost, LightGBM, CatBoost,LogisticRegressionModel,AdaBoost,KNN
# from Classification.CNN import CNN
# from Classification.CNN_Transfomer import TransformerTrainAndTest
# from Classification.CNN_SAE import SAETrainAndTest
# from Classification.SAE import SAE
# from Classification.CNN_deepseek import CNN_deepseek
from multiprocessing import Pool, cpu_count
def QualitativeAnalysis(model, X_train, X_test, y_train, y_test, n_jobs=-1):
"""
根据模型名称调用不同的分类模型,并返回训练集和测试集的评估指标。
参数:
- model: 要使用的分类模型名称
- X_train, X_test: 训练集和测试集的特征数据
- y_train, y_test: 训练集和测试集的标签数据
- n_jobs: 使用的核心数量,适用于支持多线程的模型
返回:
- train_metrics: 包含训练集 accuracy, precision, recall, f1_score 的字典
- test_metrics: 包含测试集 accuracy, precision, recall, f1_score 的字典
"""
if model == "PLS_DA":
train_metrics, test_metrics = PLS_DA(X_train, X_test, y_train, y_test)
elif model == "ANN":
train_metrics, test_metrics = ANN(X_train, X_test, y_train, y_test)
elif model == "SVM":
train_metrics, test_metrics = SVM(X_train, X_test, y_train, y_test)
elif model == "RF":
train_metrics, test_metrics = RF(X_train, X_test, y_train, y_test, n_jobs=n_jobs)
elif model == "LogisticRegression":
train_metrics, test_metrics = LogisticRegressionModel(X_train, X_test, y_train, y_test, penalty='l2', C=1.0, solver='lbfgs')
elif model == "XGBoost":
train_metrics, test_metrics = XGBoost(X_train, X_test, y_train, y_test, n_estimators=100, learning_rate=0.1, max_depth=3)
elif model == "LightGBM":
train_metrics, test_metrics = LightGBM(X_train, X_test, y_train, y_test, n_estimators=100, learning_rate=0.1, max_depth=-1, num_leaves=31)
elif model == "CatBoost":
train_metrics, test_metrics = CatBoost(X_train, X_test, y_train, y_test, iterations=500, learning_rate=0.1, depth=6)
elif model == "AdaBoost":
train_metrics, test_metrics = AdaBoost(X_train, X_test, y_train, y_test, n_estimators=50, learning_rate=1.0)
elif model == 'KNN':
train_metrics, test_metrics = KNN(X_train, X_test, y_train, y_test, n_neighbors=5)
else:
print("No such model for Qualitative Analysis")
return None, None
return train_metrics, test_metrics

View File

@ -0,0 +1,47 @@
# from Classification.CNN_HYper import
from classification_model.Classification.CNN_Transfomer import TransformerTrainAndTest
from classification_model.Classification.CNN_SAE import SAETrainAndTest
from classification_model.Classification.SAE import SAE
from classification_model.Classification.CNN_deepseek import CNN_deepseek
from multiprocessing import Pool, cpu_count
# 贝叶斯优化模型调用
from classification_model.Classification.ClassicCls_网格搜索 import optimize_SVM, optimize_KNN, optimize_XGBoost, optimize_RF, optimize_CatBoost, optimize_LogisticRegression
def QualitativeAnalysis(model, X_train, X_test, y_train, y_test, n_jobs=-1):
"""
根据模型名称调用不同的分类模型,并返回训练集和测试集的评估指标。
参数:
- model: 要使用的分类模型名称
- X_train, X_test: 训练集和测试集的特征数据
- y_train, y_test: 训练集和测试集的标签数据
- n_jobs: 使用的核心数量,适用于支持多线程的模型
返回:
- train_metrics: 包含训练集 accuracy, precision, recall, f1_score 的字典
- test_metrics: 包含测试集 accuracy, precision, recall, f1_score 的字典
"""
if model == "SVM":
best_params, train_metrics, test_metrics = optimize_SVM(X_train, y_train, X_test, y_test)
elif model == "RF":
best_params, train_metrics, test_metrics = optimize_RF(X_train, y_train, X_test, y_test)
# elif model == "optimize_CNN":
# best_params, train_metrics, test_metrics = optimize_hyperparameters(X_train, X_test, y_train, y_test, nls=10, n_iter=10)
elif model == "LogisticRegression":
best_params, train_metrics, test_metrics = optimize_LogisticRegression(X_train, y_train, X_test, y_test)
elif model == "XGBoost":
best_params, train_metrics, test_metrics = optimize_XGBoost(X_train, y_train, X_test, y_test)
elif model == "CatBoost":
best_params, train_metrics, test_metrics = optimize_CatBoost(X_train, y_train, X_test, y_test)
elif model == 'KNN':
best_params, train_metrics, test_metrics = optimize_KNN(X_train, y_train, X_test, y_test)
else:
print("No such model for Qualitative Analysis")
return None, None
return best_params,train_metrics, test_metrics

View File

@ -0,0 +1,48 @@
from classification_model.Classification.CNN_HYper import optimize_CNN
from classification_model.Classification.CNN_Transfomer import TransformerTrainAndTest
from classification_model.Classification.CNN_SAE import SAETrainAndTest
from classification_model.Classification.SAE import SAE
from classification_model.Classification.CNN_deepseek import CNN_deepseek
from multiprocessing import Pool, cpu_count
# 贝叶斯优化模型调用
from classification_model.Classification.ClassicClsHY import optimize_SVM, optimize_KNN, optimize_XGBoost, optimize_RF, optimize_CatBoost, optimize_LogisticRegression, optimize_ANN
def QualitativeAnalysis(model, X_train, X_test, y_train, y_test, n_jobs=-1):
"""
根据模型名称调用不同的分类模型,并返回训练集和测试集的评估指标。
参数:
- model: 要使用的分类模型名称
- X_train, X_test: 训练集和测试集的特征数据
- y_train, y_test: 训练集和测试集的标签数据
- n_jobs: 使用的核心数量,适用于支持多线程的模型
返回:
- train_metrics: 包含训练集 accuracy, precision, recall, f1_score 的字典
- test_metrics: 包含测试集 accuracy, precision, recall, f1_score 的字典
"""
if model == "ANN":
best_params, train_metrics, test_metrics = optimize_ANN(X_train, y_train, X_test, y_test)
elif model == "SVM":
best_params, train_metrics, test_metrics = optimize_SVM(X_train, y_train, X_test, y_test)
elif model == "RF":
best_params, train_metrics, test_metrics = optimize_RF(X_train, y_train, X_test, y_test)
elif model == "optimize_CNN":
best_params,train_metrics, test_metrics = optimize_CNN(X_train, X_test, y_train, y_test, model_path=r'H:\arithmetic\python\opensa-main(local)\opensa-main\OpenSA\tensorboard_logs\model_best.pth')
elif model == "LogisticRegression":
best_params, train_metrics, test_metrics = optimize_LogisticRegression(X_train, y_train, X_test, y_test)
elif model == "XGBoost":
best_params, train_metrics, test_metrics = optimize_XGBoost(X_train, y_train, X_test, y_test)
elif model == "CatBoost":
best_params, train_metrics, test_metrics = optimize_CatBoost(X_train, y_train, X_test, y_test)
elif model == 'KNN':
best_params, train_metrics, test_metrics = optimize_KNN(X_train, y_train, X_test, y_test)
else:
print("No such model for Qualitative Analysis")
return None, None
return best_params,train_metrics, test_metrics

View File

@ -0,0 +1,11 @@
"""
-*- coding: utf-8 -*-
@Time :2022/04/12 17:10
@Author : Pengyou FU
@blogs : https://blog.csdn.net/Echo_Code?spm=1000.2115.3001.5343
@github : https://github.com/FuSiry/OpenSA
@WeChat : Fu_siry
@LicenseApache-2.0 license
"""

View File

@ -0,0 +1,190 @@
import torch
from torch import nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch import optim
import torch.utils.data as data
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class MyDataset(data.Dataset):
def __init__(self, specs, labels):
self.specs = specs
self.labels = labels
def __getitem__(self, index):
spec, target = self.specs[index], self.labels[index]
return spec, target
def __len__(self):
return len(self.specs)
class AutoEncoder(nn.Module):
def __init__(self, inputDim, hiddenDim):
super().__init__()
self.inputDim = inputDim
self.hiddenDim = hiddenDim
self.encoder = nn.Linear(inputDim, hiddenDim, bias=True)
self.decoder = nn.Linear(hiddenDim, inputDim, bias=True)
self.act = F.relu
def forward(self, x, rep=False):
hidden = self.encoder(x)
hidden = self.act(hidden)
if rep:
return hidden
else:
out = self.decoder(hidden)
return out
class SAE(nn.Module):
def __init__(self, encoderList, output_dim):
super().__init__()
self.encoderList = encoderList
self.en1 = encoderList[0]
self.en2 = encoderList[1]
self.fc = nn.Linear(128, output_dim, bias=True) # 分类层输出维度为 num_classes
def forward(self, x):
out = x
out = self.en1(out, rep=True)
out = self.en2(out, rep=True)
out = self.fc(out)
return out
class SAE_net(object):
def __init__(self, AE_epoch=200, SAE_epoch=200,
input_dim=404, hidden1_dim=512,
hidden2_dim=128, output_dim=4, # 默认4类可在调用时传入 num_classes
batch_size=128):
self.AE_epoch = AE_epoch
self.SAE_epoch = SAE_epoch
self.input_dim = input_dim
self.hidden1_dim = hidden1_dim
self.hidden2_dim = hidden2_dim
self.output_dim = output_dim
self.batch_size = batch_size
self.train_loader = None
encoder1 = AutoEncoder(self.input_dim, self.hidden1_dim)
encoder2 = AutoEncoder(self.hidden1_dim, self.hidden2_dim)
self.encoder_list = [encoder1, encoder2]
def trainAE(self, x_train, y_train, encoderList, trainLayer, batchSize, epoch, useCuda=False):
if useCuda:
for encoder in encoderList:
encoder.to(device)
optimizer = optim.Adam(encoderList[trainLayer].parameters())
criterion = nn.MSELoss()
data_train = MyDataset(x_train, y_train)
self.train_loader = torch.utils.data.DataLoader(data_train, batch_size=batchSize, shuffle=True)
for _ in range(epoch):
for batch_idx, (x, target) in enumerate(self.train_loader):
optimizer.zero_grad()
if useCuda:
x, target = x.to(device), target.to(device)
x = Variable(x).type(torch.FloatTensor)
x = x.view(x.size(0), -1)
out = x
if trainLayer != 0:
for i in range(trainLayer):
out = encoderList[i](out, rep=True)
pred = encoderList[trainLayer](out, rep=False).cpu()
loss = criterion(pred, out)
loss.backward()
optimizer.step()
def trainClassifier(self, model, epoch, useCuda=False):
if useCuda:
model = model.to(device)
for param in model.parameters():
param.requires_grad = True
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
for _ in range(epoch):
for batch_idx, (x, target) in enumerate(self.train_loader):
optimizer.zero_grad()
if useCuda:
x, target = x.to(device), target.to(device)
x = Variable(x).type(torch.FloatTensor)
x = x.view(-1, self.input_dim)
out = model(x)
loss = criterion(out, target)
loss.backward()
optimizer.step()
self.model = model
def fit(self, x_train=None, y_train=None, X_test=None, y_test=None):
x_train = x_train[:, np.newaxis, :]
x_train = torch.from_numpy(x_train).float()
for i in range(2):
self.trainAE(x_train=x_train, y_train=y_train,
encoderList=self.encoder_list, trainLayer=i,
batchSize=self.batch_size, epoch=self.AE_epoch)
model = SAE(encoderList=self.encoder_list, output_dim=self.output_dim)
# 训练分类器并获取训练集的评估指标
train_accuracy, train_precision, train_recall, train_f1, train_cm = self.trainClassifier(model=model, epoch=self.SAE_epoch, X_train=x_train, y_train=y_train)
# 计算测试集的评估指标
test_accuracy, test_precision, test_recall, test_f1, test_cm = self.evaluate(model, X_test, y_test)
# 返回训练集和测试集的评估结果
train_metrics = {
"accuracy": train_accuracy,
"precision": train_precision,
"recall": train_recall,
"f1_score": train_f1,
"confusion_matrix": train_cm
}
test_metrics = {
"accuracy": test_accuracy,
"precision": test_precision,
"recall": test_recall,
"f1_score": test_f1,
"confusion_matrix": test_cm
}
return train_metrics, test_metrics
def evaluate(self, model, X_test, y_test):
X_test = torch.from_numpy(X_test).float()
X_test = X_test[:, np.newaxis, :]
X_test = Variable(X_test).view(-1, self.input_dim)
out = model(X_test)
_, y_pred = torch.max(out, 1)
# 计算准确率、精确率、召回率、F1分数和混淆矩阵
accuracy = accuracy_score(y_test, y_pred.numpy())
precision = precision_score(y_test, y_pred.numpy(), average='weighted')
recall = recall_score(y_test, y_pred.numpy(), average='weighted')
f1 = f1_score(y_test, y_pred.numpy(), average='weighted')
cm = confusion_matrix(y_test, y_pred.numpy())
return accuracy, precision, recall, f1, cm
def SAE(X_train, y_train, X_test, y_test, num_classes=4):
clf = SAE_net(output_dim=num_classes)
train_metrics, test_metrics = clf.fit(X_train, y_train, X_test, y_test)
return train_metrics, test_metrics