Files

271 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import pandas as pd
from sklearn.cross_decomposition import PLSRegression
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import KFold
from itertools import combinations
import matplotlib.pyplot as plt
def synergy_interval_pls(X, y, n_intervals=20, n_combinations=2, max_components=15, cv_folds=5):
"""
协同区间偏最小二乘法 (Synergy Interval PLS, SiPLS) 进行特征选择
参数:
X: 光谱矩阵 (n_samples, n_wavelengths)
y: 浓度/属性向量 (n_samples,)
n_intervals: 将光谱分成多少个等宽区间
n_combinations: 每次选择的区间组合数量 (通常2-4)
max_components: PLS的最大主成分数
cv_folds: 交叉验证折数
返回:
best_intervals: 最优的区间组合
best_rmsecv: 最优组合的RMSECV
best_n_components: 最优的主成分数
selected_wavelengths: 选择的波长索引
"""
n_samples, n_wavelengths = X.shape
# 将光谱分成等宽的区间
interval_size = n_wavelengths // n_intervals
intervals = []
for i in range(n_intervals):
start_idx = i * interval_size
if i == n_intervals - 1:
# 最后一个区间包含剩余的所有波长
end_idx = n_wavelengths
else:
end_idx = (i + 1) * interval_size
intervals.append((start_idx, end_idx))
print(f"{n_wavelengths} 个波长分成 {n_intervals} 个区间:")
for i, (start, end) in enumerate(intervals):
print(f" 区间 {i+1}: 波长 {start}-{end-1} (宽度: {end-start})")
# 生成所有可能的区间组合
interval_combinations = list(combinations(range(n_intervals), n_combinations))
print(f"\n总共 {len(interval_combinations)}{n_combinations} 区间的组合")
best_rmsecv = float('inf')
best_intervals = None
best_n_components = None
results = []
# 对每个组合进行评估
for combo_idx, combo in enumerate(interval_combinations):
if (combo_idx + 1) % 50 == 0:
print(f"正在处理组合 {combo_idx + 1}/{len(interval_combinations)}")
# 合并选中区间的光谱数据
selected_wavelengths = []
for interval_idx in combo:
start_idx, end_idx = intervals[interval_idx]
selected_wavelengths.extend(range(start_idx, end_idx))
X_selected = X[:, selected_wavelengths]
# 对不同主成分数进行交叉验证
kf = KFold(n_splits=cv_folds, shuffle=True, random_state=42)
rmse_results = []
for n_comp in range(1, min(max_components + 1, X_selected.shape[1] + 1)):
rmse_scores = []
for train_idx, test_idx in kf.split(X_selected):
X_train, X_test = X_selected[train_idx], X_selected[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
pls = PLSRegression(n_components=n_comp)
pls.fit(X_train, y_train)
y_pred = pls.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
rmse_scores.append(rmse)
mean_rmse = np.mean(rmse_scores)
rmse_results.append(mean_rmse)
# 找到该组合的最佳主成分数和RMSE
min_rmse_idx = np.argmin(rmse_results)
min_rmse = rmse_results[min_rmse_idx]
best_comp = min_rmse_idx + 1
results.append({
'intervals': combo,
'rmsecv': min_rmse,
'n_components': best_comp,
'wavelengths': selected_wavelengths
})
# 更新全局最优
if min_rmse < best_rmsecv:
best_rmsecv = min_rmse
best_intervals = combo
best_n_components = best_comp
print("最优结果:")
print(f" 区间组合: {best_intervals}")
print(f" RMSECV: {best_rmsecv:.6f}")
print(f" 主成分数: {best_n_components}")
print(f" 选择的波长数: {len(results[-1]['wavelengths'])}")
# 返回最优区间的波长索引
selected_wavelengths = []
for interval_idx in best_intervals:
start_idx, end_idx = intervals[interval_idx]
selected_wavelengths.extend(range(start_idx, end_idx))
return selected_wavelengths, best_rmsecv, best_n_components
def sipls_feature_selection(X, y, n_intervals_list=[10, 15, 20], n_combinations_list=[2, 3, 4],
max_components=15, cv_folds=5):
"""
高级SiPLS特征选择尝试不同的参数组合
参数:
X: 光谱矩阵 (n_samples, n_wavelengths)
y: 浓度/属性向量 (n_samples,)
n_intervals_list: 尝试的区间数量列表
n_combinations_list: 尝试的组合数量列表
max_components: PLS的最大主成分数
cv_folds: 交叉验证折数
返回:
best_result: 包含最优结果的字典
"""
best_overall_rmsecv = float('inf')
best_overall_result = None
print("=== SiPLS 特征选择 ===")
print(f"数据形状: {X.shape}")
print(f"尝试的参数组合: {len(n_intervals_list)} × {len(n_combinations_list)} = {len(n_intervals_list) * len(n_combinations_list)}")
for n_intervals in n_intervals_list:
for n_combinations in n_combinations_list:
print(f"\n--- 测试参数: 区间数={n_intervals}, 组合数={n_combinations} ---")
try:
selected_wavelengths, rmsecv, n_components = synergy_interval_pls(
X, y,
n_intervals=n_intervals,
n_combinations=n_combinations,
max_components=max_components,
cv_folds=cv_folds
)
if rmsecv < best_overall_rmsecv:
best_overall_rmsecv = rmsecv
best_overall_result = {
'selected_wavelengths': selected_wavelengths,
'rmsecv': rmsecv,
'n_components': n_components,
'n_intervals': n_intervals,
'n_combinations': n_combinations,
'selection_ratio': len(selected_wavelengths) / X.shape[1]
}
except Exception as e:
print(f"参数组合 (区间数={n_intervals}, 组合数={n_combinations}) 处理失败: {str(e)}")
continue
if best_overall_result:
print("=== 最终最优结果 ===")
print(f"区间数: {best_overall_result['n_intervals']}")
print(f"组合数: {best_overall_result['n_combinations']}")
print(f"RMSECV: {best_overall_result['rmsecv']:.6f}")
print(f"主成分数: {best_overall_result['n_components']}")
print(f"选择的波长数: {len(best_overall_result['selected_wavelengths'])}")
print(f"选择率: {best_overall_result['selection_ratio']:.3f}")
return best_overall_result
def plot_sipls_results(X, selected_wavelengths, title="SiPLS Selected Wavelengths"):
"""
绘制SiPLS选择结果的可视化图
参数:
X: 原始光谱矩阵
selected_wavelengths: 选择的波长索引
title: 图表标题
"""
n_wavelengths = X.shape[1]
wavelength_indices = np.arange(n_wavelengths)
# 创建选择掩码
selection_mask = np.zeros(n_wavelengths, dtype=bool)
selection_mask[selected_wavelengths] = True
plt.figure(figsize=(12, 6))
# 绘制平均光谱
mean_spectrum = np.mean(X, axis=0)
plt.plot(wavelength_indices, mean_spectrum, 'b-', alpha=0.7, label='Mean Spectrum')
# 高亮选择的波长
plt.scatter(wavelength_indices[selection_mask], mean_spectrum[selection_mask],
color='red', s=50, alpha=0.8, label='Selected Wavelengths')
plt.xlabel('Wavelength Index')
plt.ylabel('Intensity')
plt.title(title)
plt.legend()
plt.grid(True, alpha=0.3)
return plt.gcf()
# 使用示例
if __name__ == "__main__":
# 生成模拟光谱数据
np.random.seed(42)
n_samples = 100
n_wavelengths = 1000
# 模拟光谱数据(高斯峰)
wavelengths = np.linspace(400, 2500, n_wavelengths)
X = np.zeros((n_samples, n_wavelengths))
# 添加一些特征峰
peak_positions = [500, 800, 1200, 1800, 2200] # nm
peak_indices = [np.argmin(np.abs(wavelengths - pos)) for pos in peak_positions]
for i in range(n_samples):
for peak_idx in peak_indices:
# 添加高斯峰
gaussian = np.exp(-0.5 * ((np.arange(n_wavelengths) - peak_idx) / 50)**2)
X[i] += gaussian * np.random.uniform(0.5, 1.5)
# 添加噪声
X[i] += np.random.normal(0, 0.1, n_wavelengths)
# 生成模拟浓度数据(与某些峰相关)
y = (X[:, peak_indices[0]] + X[:, peak_indices[2]] + X[:, peak_indices[4]]) / 3
y += np.random.normal(0, 0.05, n_samples) # 添加噪声
print("模拟数据生成完成")
print(f"数据形状: {X.shape}")
print(".3f")
# 运行SiPLS特征选择
result = sipls_feature_selection(
X, y,
n_intervals_list=[10, 15],
n_combinations_list=[2, 3],
max_components=10,
cv_folds=5
)
if result:
print(f"\n选择的波长索引: {result['selected_wavelengths'][:10]}...") # 只显示前10个
# 绘制结果
fig = plot_sipls_results(X, result['selected_wavelengths'])
plt.show()