Files
2026-02-25 09:42:51 +08:00

92 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
-*- coding: utf-8 -*-
@Time :2022/04/12 17:10
@Author : Pengyou FU
@blogs : https://blog.csdn.net/Echo_Code?spm=1000.2115.3001.5343
@github : https://github.com/FuSiry/OpenSA
@WeChat : Fu_siry
@LicenseApache-2.0 license
"""
from sklearn.cross_decomposition import PLSRegression
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import ShuffleSplit, cross_val_score
from numpy.linalg import matrix_rank as rank
import numpy as np
class UVE:
def __init__(self, x, y, ncomp=20, nrep=500, testSize=0.2):
"""
初始化 UVE 模型。
参数:
x : np.ndarray预测变量矩阵输入数据
y : np.ndarray标签目标值
ncomp : intPLS 中的最大潜变量数量,默认为 20
nrep : int重复次数默认为 500
testSize : float训练集中划分的测试集比例默认为 0.2
"""
self.x = x
self.y = y
self.ncomp = min(ncomp, rank(x)) # 确保潜变量数量不超过矩阵秩
self.nrep = nrep
self.testSize = testSize
self.criteria = None # 存储标准化系数
self.featureIndex = None # 存储特征排序索引
self.featureR2 = np.full(self.x.shape[1], np.nan) # 存储 R² 值
self.selFeature = None # 存储最终选择的特征索引
def calcCriteria(self):
"""计算每个变量的标准化系数 (meanCoef / stdCoef)。"""
PLSCoef = np.zeros((self.nrep, self.x.shape[1])) # 存储每次迭代的 PLS 系数
ss = ShuffleSplit(n_splits=self.nrep, test_size=self.testSize)
# 遍历每次划分的数据集,计算 PLS 系数
for step, (train, test) in enumerate(ss.split(self.x, self.y)):
xtrain, ytrain = self.x[train], self.y[train]
plsModel = PLSRegression(n_components=min(self.ncomp, rank(xtrain)))
plsModel.fit(xtrain, ytrain)
PLSCoef[step, :] = plsModel.coef_.flatten()
# 使用 np.divide 处理除法,避免除以零的问题
meanCoef = np.mean(PLSCoef, axis=0)
stdCoef = np.std(PLSCoef, axis=0)
self.criteria = np.divide(meanCoef, stdCoef, out=np.zeros_like(meanCoef), where=stdCoef != 0)
def evalCriteria(self, cv=3):
"""基于标准化系数评估每个变量组合的 R² 值。"""
# 按标准化系数的绝对值降序排序,获取特征的索引
self.featureIndex = np.argsort(-np.abs(self.criteria))
# 依次增加特征,计算每个组合的 R² 值
for i in range(self.x.shape[1]):
xi = self.x[:, self.featureIndex[:i + 1]] # 选择前 i+1 个特征
# 根据特征数量选择回归模型
if i < self.ncomp:
regModel = LinearRegression()
else:
regModel = PLSRegression(n_components=min(self.ncomp, rank(xi)))
# 进行交叉验证并存储 R² 值
cvScore = cross_val_score(regModel, xi, self.y, cv=cv, scoring='r2')
self.featureR2[i] = np.mean(cvScore)
def cutFeature(self, *args):
"""根据 R² 最大值选择特征,并返回所选特征的索引(列号)。"""
# 找到 R² 最大值对应的索引位置
cuti = np.nanargmax(self.featureR2) # 使用 nanargmax 以避免 NaN 的影响
self.selFeature = self.featureIndex[:cuti + 1] # 最优特征索引
# 如果传入其他数据集,返回筛选后的数据
if len(args) != 0:
returnx = list(args)
for i, argi in enumerate(args):
if argi.shape[1] == self.x.shape[1]:
returnx[i] = argi[:, self.selFeature]
return returnx
# 返回所选特征的索引(列号)
return self.selFeature