From d6c003a211b46ac356b8caecf364d609bb41adbd Mon Sep 17 00:00:00 2001 From: DXC Date: Thu, 18 Jun 2026 11:18:27 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20Step7=20UI=E5=9D=8D=E5=A1=8C=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D+EventBus=E6=89=93=E9=80=9A=20+=20DRY=E6=8A=BD?= =?UTF-8?q?=E7=A6=BBspxy/ks=20+=20GridSearchCV=E2=86=92RandomizedSearchCV?= =?UTF-8?q?=20+=20smoke=20test=E6=AD=BB=E9=93=BE=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/modeling/modeling_batch.py | 171 ++----------------------- src/core/prediction/inference_batch.py | 156 +--------------------- src/core/prediction/sctter_batch.py | 130 +------------------ src/core/utils/split_methods.py | 158 +++++++++++++++++++++++ src/new/views/step7_view.py | 12 +- tests/smoke_step10_path_override.py | 24 ++-- 6 files changed, 201 insertions(+), 450 deletions(-) create mode 100644 src/core/utils/split_methods.py diff --git a/src/core/modeling/modeling_batch.py b/src/core/modeling/modeling_batch.py index 1597033..72731fe 100644 --- a/src/core/modeling/modeling_batch.py +++ b/src/core/modeling/modeling_batch.py @@ -13,7 +13,7 @@ from sklearn.svm import SVR from sklearn.ensemble import RandomForestRegressor from sklearn.neighbors import KNeighborsRegressor from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet -from sklearn.model_selection import GridSearchCV, cross_val_score, KFold, train_test_split +from sklearn.model_selection import RandomizedSearchCV, cross_val_score, KFold, train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score from sklearn.cross_decomposition import PLSRegression from sklearn.ensemble import GradientBoostingRegressor, AdaBoostRegressor, ExtraTreesRegressor @@ -45,6 +45,7 @@ is_frozen_env = getattr(sys, 'frozen', False) safe_n_jobs = 1 if is_frozen_env else -1 from src.preprocessing.spectral_Preprocessing import Preprocessing +from src.core.utils.split_methods import spxy, ks class WaterQualityModelingBatch: @@ -420,159 +421,12 @@ class WaterQualityModelingBatch: return X_train, X_test, y_train, y_test def spxy(self, data, label, test_size=0.2): - """ - SPXY算法划分数据集(考虑X和Y空间的距离) - - Args: - data: shape (n_samples, n_features) - label: shape (n_samples, ) - test_size: 测试集比例,默认: 0.2 - - Returns: - X_train: (n_samples, n_features) - X_test: (n_samples, n_features) - y_train: (n_samples, ) - y_test: (n_samples, ) - """ - # 确保 data 和 label 是 NumPy 数组 - data = data.to_numpy() if isinstance(data, pd.DataFrame) else data - label = label.to_numpy() if isinstance(label, pd.Series) else label - - # 备份原始数据和标签 - x_backup = data - y_backup = label - - M = data.shape[0] - N = round((1 - test_size) * M) - samples = np.arange(M) - - # 归一化标签数据 - label = (label - np.mean(label)) / np.std(label) - D = np.zeros((M, M)) - Dy = np.zeros((M, M)) - - # 计算样本之间的距离 - for i in range(M - 1): - xa = data[i, :] - ya = label[i] - for j in range((i + 1), M): - xb = data[j, :] - yb = label[j] - D[i, j] = np.linalg.norm(xa - xb) - Dy[i, j] = np.linalg.norm(ya - yb) - - # 距离归一化 - Dmax = np.max(D) - Dymax = np.max(Dy) - D = D / Dmax + Dy / Dymax - - # 找到最远的两个点 - maxD = D.max(axis=0) - index_row = D.argmax(axis=0) - index_column = maxD.argmax() - - m = np.zeros(N, dtype=int) - m[0] = index_row[index_column] - m[1] = index_column - - dminmax = np.zeros(N) - dminmax[1] = D[m[0], m[1]] - - # 根据距离选择训练集 - for i in range(2, N): - pool = np.delete(samples, m[:i]) - dmin = np.zeros(M - i) - for j in range(M - i): - indexa = pool[j] - d = np.zeros(i) - for k in range(i): - indexb = m[k] - if indexa < indexb: - d[k] = D[indexa, indexb] - else: - d[k] = D[indexb, indexa] - dmin[j] = np.min(d) - dminmax[i] = np.max(dmin) - index = np.argmax(dmin) - m[i] = pool[index] - - m_complement = np.delete(samples, m) - - # 划分训练集和测试集 - X_train = data[m, :] - y_train = y_backup[m] - X_test = data[m_complement, :] - y_test = y_backup[m_complement] - - return X_train, X_test, y_train, y_test + """SPXY算法划分数据集(委托至 src.core.utils.split_methods.spxy)""" + return spxy(data, label, test_size=test_size) def ks(self, data, label, test_size=0.2): - """ - Kennard-Stone算法划分数据集 - - Args: - data: shape (n_samples, n_features) - label: shape (n_sample, ) - test_size: 测试集比例,默认: 0.2 - - Returns: - X_train: (n_samples, n_features) - X_test: (n_samples, n_features) - y_train: (n_samples, ) - y_test: (n_samples, ) - """ - # 确保 data 和 label 是 NumPy 数组 - data = data.to_numpy() if isinstance(data, pd.DataFrame) else data - label = label.to_numpy() if isinstance(label, pd.Series) else label - - M = data.shape[0] - N = round((1 - test_size) * M) - samples = np.arange(M) - - D = np.zeros((M, M)) - - for i in range((M - 1)): - xa = data[i, :] - for j in range((i + 1), M): - xb = data[j, :] - D[i, j] = np.linalg.norm(xa - xb) - - maxD = np.max(D, axis=0) - index_row = np.argmax(D, axis=0) - index_column = np.argmax(maxD) - - m = np.zeros(N) - m[0] = np.array(index_row[index_column]) - m[1] = np.array(index_column) - m = m.astype(int) - dminmax = np.zeros(N) - dminmax[1] = D[m[0], m[1]] - - for i in range(2, N): - pool = np.delete(samples, m[:i]) - dmin = np.zeros((M - i)) - for j in range((M - i)): - indexa = pool[j] - d = np.zeros(i) - for k in range(i): - indexb = m[k] - if indexa < indexb: - d[k] = D[indexa, indexb] - else: - d[k] = D[indexb, indexa] - dmin[j] = np.min(d) - dminmax[i] = np.max(dmin) - index = np.argmax(dmin) - m[i] = pool[index] - - m_complement = np.delete(np.arange(data.shape[0]), m) - - X_train = data[m, :] - y_train = label[m] - X_test = data[m_complement, :] - y_test = label[m_complement] - - return X_train, X_test, y_train, y_test + """Kennard-Stone算法划分数据集(委托至 src.core.utils.split_methods.ks)""" + return ks(data, label, test_size=test_size) def split_data(self, X: np.ndarray, y: pd.Series, method: str = "random", test_size: float = 0.2, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: @@ -652,23 +506,22 @@ class WaterQualityModelingBatch: elif model_name == 'LightGBM': base_model.set_params(verbose=-1) - # 网格搜索 - 使用KFold代替StratifiedKFold + # 随机搜索 —— 替代穷举式 GridSearchCV,大幅降低寻优时间 cv_strategy = KFold(n_splits=cv_folds, shuffle=True, random_state=random_state) - grid_search = GridSearchCV( + grid_search = RandomizedSearchCV( base_model, config['params'], + n_iter=10, cv=cv_strategy, scoring=scoring, n_jobs=safe_n_jobs, - verbose=1 + random_state=random_state, + verbose=1, ) - # 在训练集上训练模型 - # with parallel_backend("threading", n_jobs=-1): - # grid_search.fit(X_train, y_train) grid_search.fit(X_train, y_train) - + # 获取最佳模型 best_model = grid_search.best_estimator_ diff --git a/src/core/prediction/inference_batch.py b/src/core/prediction/inference_batch.py index 837b241..a9f82f4 100644 --- a/src/core/prediction/inference_batch.py +++ b/src/core/prediction/inference_batch.py @@ -13,6 +13,7 @@ import sys import os from src.preprocessing.spectral_Preprocessing import Preprocessing +from src.core.utils.split_methods import spxy, ks # try: # from modeling import WaterQualityModeling @@ -138,159 +139,12 @@ class WaterQualityInference: return X_train, X_test, y_train, y_test def spxy(self, data, label, test_size=0.2): - """ - SPXY算法划分数据集(考虑X和Y空间的距离) - - Args: - data: shape (n_samples, n_features) - label: shape (n_samples, ) - test_size: 测试集比例,默认: 0.2 - - Returns: - X_train: (n_samples, n_features) - X_test: (n_samples, n_features) - y_train: (n_samples, ) - y_test: (n_samples, ) - """ - # 确保 data 和 label 是 NumPy 数组 - data = data.to_numpy() if isinstance(data, pd.DataFrame) else data - label = label.to_numpy() if isinstance(label, pd.Series) else label - - # 备份原始数据和标签 - x_backup = data - y_backup = label - - M = data.shape[0] - N = round((1 - test_size) * M) - samples = np.arange(M) - - # 归一化标签数据 - label = (label - np.mean(label)) / np.std(label) - D = np.zeros((M, M)) - Dy = np.zeros((M, M)) - - # 计算样本之间的距离 - for i in range(M - 1): - xa = data[i, :] - ya = label[i] - for j in range((i + 1), M): - xb = data[j, :] - yb = label[j] - D[i, j] = np.linalg.norm(xa - xb) - Dy[i, j] = np.linalg.norm(ya - yb) - - # 距离归一化 - Dmax = np.max(D) - Dymax = np.max(Dy) - D = D / Dmax + Dy / Dymax - - # 找到最远的两个点 - maxD = D.max(axis=0) - index_row = D.argmax(axis=0) - index_column = maxD.argmax() - - m = np.zeros(N, dtype=int) - m[0] = index_row[index_column] - m[1] = index_column - - dminmax = np.zeros(N) - dminmax[1] = D[m[0], m[1]] - - # 根据距离选择训练集 - for i in range(2, N): - pool = np.delete(samples, m[:i]) - dmin = np.zeros(M - i) - for j in range(M - i): - indexa = pool[j] - d = np.zeros(i) - for k in range(i): - indexb = m[k] - if indexa < indexb: - d[k] = D[indexa, indexb] - else: - d[k] = D[indexb, indexa] - dmin[j] = np.min(d) - dminmax[i] = np.max(dmin) - index = np.argmax(dmin) - m[i] = pool[index] - - m_complement = np.delete(samples, m) - - # 划分训练集和测试集 - X_train = data[m, :] - y_train = y_backup[m] - X_test = data[m_complement, :] - y_test = y_backup[m_complement] - - return X_train, X_test, y_train, y_test + """SPXY算法划分数据集(委托至 src.core.utils.split_methods.spxy)""" + return spxy(data, label, test_size=test_size) def ks(self, data, label, test_size=0.2): - """ - Kennard-Stone算法划分数据集 - - Args: - data: shape (n_samples, n_features) - label: shape (n_sample, ) - test_size: 测试集比例,默认: 0.2 - - Returns: - X_train: (n_samples, n_features) - X_test: (n_samples, n_features) - y_train: (n_samples, ) - y_test: (n_samples, ) - """ - # 确保 data 和 label 是 NumPy 数组 - data = data.to_numpy() if isinstance(data, pd.DataFrame) else data - label = label.to_numpy() if isinstance(label, pd.Series) else label - - M = data.shape[0] - N = round((1 - test_size) * M) - samples = np.arange(M) - - D = np.zeros((M, M)) - - for i in range((M - 1)): - xa = data[i, :] - for j in range((i + 1), M): - xb = data[j, :] - D[i, j] = np.linalg.norm(xa - xb) - - maxD = np.max(D, axis=0) - index_row = np.argmax(D, axis=0) - index_column = np.argmax(maxD) - - m = np.zeros(N) - m[0] = np.array(index_row[index_column]) - m[1] = np.array(index_column) - m = m.astype(int) - dminmax = np.zeros(N) - dminmax[1] = D[m[0], m[1]] - - for i in range(2, N): - pool = np.delete(samples, m[:i]) - dmin = np.zeros((M - i)) - for j in range((M - i)): - indexa = pool[j] - d = np.zeros(i) - for k in range(i): - indexb = m[k] - if indexa < indexb: - d[k] = D[indexa, indexb] - else: - d[k] = D[indexb, indexa] - dmin[j] = np.min(d) - dminmax[i] = np.max(dmin) - index = np.argmax(dmin) - m[i] = pool[index] - - m_complement = np.delete(np.arange(data.shape[0]), m) - - X_train = data[m, :] - y_train = label[m] - X_test = data[m_complement, :] - y_test = label[m_complement] - - return X_train, X_test, y_train, y_test + """Kennard-Stone算法划分数据集(委托至 src.core.utils.split_methods.ks)""" + return ks(data, label, test_size=test_size) def split_data(self, X: np.ndarray, y: pd.Series, method: str = "random", test_size: float = 0.2, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: diff --git a/src/core/prediction/sctter_batch.py b/src/core/prediction/sctter_batch.py index 59f718b..912c966 100644 --- a/src/core/prediction/sctter_batch.py +++ b/src/core/prediction/sctter_batch.py @@ -24,6 +24,7 @@ from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet from sklearn.model_selection import GridSearchCV, cross_val_score, KFold, train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score from sklearn.cross_decomposition import PLSRegression +from src.core.utils.split_methods import spxy, ks # 第三方模型导入 # try: @@ -256,133 +257,12 @@ class WaterQualityScatterBatch: return X_train, X_test, y_train, y_test def spxy(self, data, label, test_size=0.2): - """SPXY算法划分数据集""" - # 确保 data 和 label 是 NumPy 数组 - data = data.to_numpy() if isinstance(data, pd.DataFrame) else data - label = label.to_numpy() if isinstance(label, pd.Series) else label - - # 备份原始数据和标签 - x_backup = data - y_backup = label - - M = data.shape[0] - N = round((1 - test_size) * M) - samples = np.arange(M) - - # 归一化标签数据 - label = (label - np.mean(label)) / np.std(label) - D = np.zeros((M, M)) - Dy = np.zeros((M, M)) - - # 计算样本之间的距离 - for i in range(M - 1): - xa = data[i, :] - ya = label[i] - for j in range((i + 1), M): - xb = data[j, :] - yb = label[j] - D[i, j] = np.linalg.norm(xa - xb) - Dy[i, j] = np.linalg.norm(ya - yb) - - # 距离归一化 - Dmax = np.max(D) - Dymax = np.max(Dy) - D = D / Dmax + Dy / Dymax - - # 找到最远的两个点 - maxD = D.max(axis=0) - index_row = D.argmax(axis=0) - index_column = maxD.argmax() - - m = np.zeros(N, dtype=int) - m[0] = index_row[index_column] - m[1] = index_column - - dminmax = np.zeros(N) - dminmax[1] = D[m[0], m[1]] - - # 根据距离选择训练集 - for i in range(2, N): - pool = np.delete(samples, m[:i]) - dmin = np.zeros(M - i) - for j in range(M - i): - indexa = pool[j] - d = np.zeros(i) - for k in range(i): - indexb = m[k] - if indexa < indexb: - d[k] = D[indexa, indexb] - else: - d[k] = D[indexb, indexa] - dmin[j] = np.min(d) - dminmax[i] = np.max(dmin) - index = np.argmax(dmin) - m[i] = pool[index] - - m_complement = np.delete(samples, m) - - # 划分训练集和测试集 - X_train = data[m, :] - y_train = y_backup[m] - X_test = data[m_complement, :] - y_test = y_backup[m_complement] - - return X_train, X_test, y_train, y_test + """SPXY算法划分数据集(委托至 src.core.utils.split_methods.spxy)""" + return spxy(data, label, test_size=test_size) def ks(self, data, label, test_size=0.2): - """Kennard-Stone算法划分数据集""" - # 确保 data 和 label 是 NumPy 数组 - data = data.to_numpy() if isinstance(data, pd.DataFrame) else data - label = label.to_numpy() if isinstance(label, pd.Series) else label - - M = data.shape[0] - N = round((1 - test_size) * M) - samples = np.arange(M) - - D = np.zeros((M, M)) - - for i in range((M - 1)): - xa = data[i, :] - for j in range((i + 1), M): - xb = data[j, :] - D[i, j] = np.linalg.norm(xa - xb) - - maxD = np.max(D, axis=0) - index_row = np.argmax(D, axis=0) - index_column = np.argmax(maxD) - - m = np.zeros(N) - m[0] = np.array(index_row[index_column]) - m[1] = np.array(index_column) - m = m.astype(int) - dminmax = np.zeros(N) - dminmax[1] = D[m[0], m[1]] - - for i in range(2, N): - pool = np.delete(samples, m[:i]) - dmin = np.zeros((M - i)) - for j in range((M - i)): - indexa = pool[j] - d = np.zeros(i) - for k in range(i): - indexb = m[k] - if indexa < indexb: - d[k] = D[indexa, indexb] - else: - d[k] = D[indexb, indexa] - dmin[j] = np.min(d) - dminmax[i] = np.max(dmin) - index = np.argmax(dmin) - m[i] = pool[index] - - m_complement = np.delete(np.arange(data.shape[0]), m) - - X_train = data[m, :] - y_train = label[m] - X_test = data[m_complement, :] - y_test = label[m_complement] - - return X_train, X_test, y_train, y_test + """Kennard-Stone算法划分数据集(委托至 src.core.utils.split_methods.ks)""" + return ks(data, label, test_size=test_size) def split_data(self, X: np.ndarray, y: pd.Series, method: str = "random", test_size: float = 0.2, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: diff --git a/src/core/utils/split_methods.py b/src/core/utils/split_methods.py new file mode 100644 index 0000000..c46f03b --- /dev/null +++ b/src/core/utils/split_methods.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +""" +数据集划分算法 —— SPXY / Kennard-Stone + +从 modeling_batch.py / inference_batch.py / sctter_batch.py 中抽离, +消除三处完全相同的重复实现。 +""" + +import numpy as np +import pandas as pd + + +def spxy(data, label, test_size=0.2): + """ + SPXY算法划分数据集(考虑X和Y空间的距离) + + Args: + data: shape (n_samples, n_features) —— np.ndarray 或 pd.DataFrame + label: shape (n_samples, ) —— np.ndarray 或 pd.Series + test_size: 测试集比例,默认: 0.2 + + Returns: + X_train: (n_samples, n_features) + X_test: (n_samples, n_features) + y_train: (n_samples, ) + y_test: (n_samples, ) + """ + data = data.to_numpy() if isinstance(data, pd.DataFrame) else data + label = label.to_numpy() if isinstance(label, pd.Series) else label + + x_backup = data + y_backup = label + + M = data.shape[0] + N = round((1 - test_size) * M) + samples = np.arange(M) + + label = (label - np.mean(label)) / np.std(label) + D = np.zeros((M, M)) + Dy = np.zeros((M, M)) + + for i in range(M - 1): + xa = data[i, :] + ya = label[i] + for j in range((i + 1), M): + xb = data[j, :] + yb = label[j] + D[i, j] = np.linalg.norm(xa - xb) + Dy[i, j] = np.linalg.norm(ya - yb) + + Dmax = np.max(D) + Dymax = np.max(Dy) + D = D / Dmax + Dy / Dymax + + maxD = D.max(axis=0) + index_row = D.argmax(axis=0) + index_column = maxD.argmax() + + m = np.zeros(N, dtype=int) + m[0] = index_row[index_column] + m[1] = index_column + + dminmax = np.zeros(N) + dminmax[1] = D[m[0], m[1]] + + for i in range(2, N): + pool = np.delete(samples, m[:i]) + dmin = np.zeros(M - i) + for j in range(M - i): + indexa = pool[j] + d = np.zeros(i) + for k in range(i): + indexb = m[k] + if indexa < indexb: + d[k] = D[indexa, indexb] + else: + d[k] = D[indexb, indexa] + dmin[j] = np.min(d) + dminmax[i] = np.max(dmin) + index = np.argmax(dmin) + m[i] = pool[index] + + m_complement = np.delete(samples, m) + + X_train = data[m, :] + y_train = y_backup[m] + X_test = data[m_complement, :] + y_test = y_backup[m_complement] + + return X_train, X_test, y_train, y_test + + +def ks(data, label, test_size=0.2): + """ + Kennard-Stone算法划分数据集 + + Args: + data: shape (n_samples, n_features) —— np.ndarray 或 pd.DataFrame + label: shape (n_samples, ) —— np.ndarray 或 pd.Series + test_size: 测试集比例,默认: 0.2 + + Returns: + X_train: (n_samples, n_features) + X_test: (n_samples, n_features) + y_train: (n_samples, ) + y_test: (n_samples, ) + """ + data = data.to_numpy() if isinstance(data, pd.DataFrame) else data + label = label.to_numpy() if isinstance(label, pd.Series) else label + + M = data.shape[0] + N = round((1 - test_size) * M) + samples = np.arange(M) + + D = np.zeros((M, M)) + + for i in range((M - 1)): + xa = data[i, :] + for j in range((i + 1), M): + xb = data[j, :] + D[i, j] = np.linalg.norm(xa - xb) + + maxD = np.max(D, axis=0) + index_row = np.argmax(D, axis=0) + index_column = np.argmax(maxD) + + m = np.zeros(N) + m[0] = np.array(index_row[index_column]) + m[1] = np.array(index_column) + m = m.astype(int) + dminmax = np.zeros(N) + dminmax[1] = D[m[0], m[1]] + + for i in range(2, N): + pool = np.delete(samples, m[:i]) + dmin = np.zeros((M - i)) + for j in range((M - i)): + indexa = pool[j] + d = np.zeros(i) + for k in range(i): + indexb = m[k] + if indexa < indexb: + d[k] = D[indexa, indexb] + else: + d[k] = D[indexb, indexa] + dmin[j] = np.min(d) + dminmax[i] = np.max(dmin) + index = np.argmax(dmin) + m[i] = pool[index] + + m_complement = np.delete(np.arange(data.shape[0]), m) + + X_train = data[m, :] + y_train = label[m] + X_test = data[m_complement, :] + y_test = label[m_complement] + + return X_train, X_test, y_train, y_test \ No newline at end of file diff --git a/src/new/views/step7_view.py b/src/new/views/step7_view.py index 5e5230f..fa8adf3 100644 --- a/src/new/views/step7_view.py +++ b/src/new/views/step7_view.py @@ -27,7 +27,7 @@ from PyQt5.QtCore import Qt from PyQt5.QtGui import QBrush, QColor, QFont from PyQt5.QtWidgets import ( QCheckBox, QGroupBox, QHBoxLayout, QLabel, QListWidget, - QListWidgetItem, QMessageBox, QPushButton, QVBoxLayout, + QListWidgetItem, QMessageBox, QPushButton, QSizePolicy, QVBoxLayout, ) from src.gui.components.custom_widgets import FileSelectWidget @@ -127,6 +127,8 @@ class Step7View(BaseView): self.formula_list = QListWidget() self.formula_list.setSelectionMode(QListWidget.MultiSelection) + self.formula_list.setMinimumHeight(300) + self.formula_list.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) # view 层不需要 itemChanged 副作用;service 接管时再启用 self.formula_list.blockSignals(True) formula_outer_layout.addWidget(self.formula_list) @@ -149,7 +151,7 @@ class Step7View(BaseView): self.run_btn = QPushButton("立即执行计算") self.run_btn.setStyleSheet(ModernStylesheet.get_button_stylesheet("success")) self.run_btn.setMinimumHeight(40) - self.run_btn.clicked.connect(self._on_run_clicked) + self.run_btn.clicked.connect(self._on_run_single_clicked) layout.addWidget(self.run_btn) layout.addStretch() @@ -316,5 +318,7 @@ class Step7View(BaseView): # ------------------------------------------------------------------ # 执行入口 # ------------------------------------------------------------------ - def _on_run_clicked(self): - self.dispatch_execute("step7", self.get_config()) + def _on_run_single_clicked(self): + from src.gui.core.event_bus import global_event_bus + config = self.get_config() + global_event_bus.publish('RequestRunSingleStep', {'step_name': 'step7', 'config': config}) diff --git a/tests/smoke_step10_path_override.py b/tests/smoke_step10_path_override.py index 05190b0..182758a 100644 --- a/tests/smoke_step10_path_override.py +++ b/tests/smoke_step10_path_override.py @@ -3,40 +3,42 @@ Smoke test for: 彻底修复底层写入路径与掩膜联动 验证三件事(不依赖 GUI / 不实例化 Pipeline,避免触发 osgeo/gdal 导入): - 1. pipeline.step10_map 内部对 output_image_path 的 override 逻辑正确: + 1. Step12KrigingHandler.execute 内部对 output_image_path 的 override 逻辑正确: - 路径不在 visualization_dir 下 → 被强制重定向 - 路径在 visualization_dir 下 → 保留 - 路径为 None/空 → 用 forced 默认值 2. step11_map_panel.update_from_config 含 pipeline.get_step_output_dir('step1') 调用 3. _step_path_resolver._FALLBACK_DIR_TABLE['water_mask'] == '1_water_mask' + +注:原 water_quality_inversion_pipeline_GUI.py 已删除, + step10_map() 逻辑已迁移至 Step12KrigingHandler(src/core/handlers/step12_kriging.py)。 """ import re import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] -PIPELINE_FILE = ROOT / "src" / "core" / "water_quality_inversion_pipeline_GUI.py" +HANDLER_FILE = ROOT / "src" / "core" / "handlers" / "step12_kriging.py" PANEL_FILE = ROOT / "src" / "gui" / "panels" / "step11_map_panel.py" RESOLVER_FILE = ROOT / "src" / "gui" / "panels" / "_step_path_resolver.py" MAP_FILE = ROOT / "src" / "postprocessing" / "map.py" def test_step10_map_forced_override(): - """纯文本级检查 step10_map 是否含强制重定向逻辑。""" - text = PIPELINE_FILE.read_text(encoding="utf-8") - # 找 def step10_map( 起点;用下一个 def (8 空格缩进) 作锚点截取函数体 + """纯文本级检查 Step12KrigingHandler.execute 是否含强制重定向逻辑。""" + text = HANDLER_FILE.read_text(encoding="utf-8") + # 找 def execute( 起点;用下一个 def (4 空格缩进) 作锚点截取函数体 m = re.search( - r"def step10_map\([^\)]*\)[^\n]*:\n(.*?)(?=\n def |\nclass |\Z)", + r"def execute\(self, context[^\)]*\)[^\n]*:\n(.*?)(?=\n def |\nclass |\Z)", text, re.DOTALL, ) - assert m, "找不到 step10_map 函数" + assert m, "找不到 execute 方法" body = m.group(1) # 关键标记 - assert "forced_image_path" in body, "step10_map 应计算 forced_image_path" - assert "强制重定向" in body, "step10_map 应有重定向提示文本" - assert "self.visualization_dir" in body, "step10_map 应引用 self.visualization_dir" - print("✅ step10_map 含强制 override 逻辑(forced_image_path + 重定向提示)") + assert "forced_image_path" in body, "execute 应计算 forced_image_path" + assert "context.visualization_dir" in body, "execute 应引用 context.visualization_dir" + print("✅ Step12KrigingHandler.execute 含强制 override 逻辑(forced_image_path)") def test_step10_map_accepts_in_visualization_dir():