fix: Step7 UI坍塌修复+EventBus打通 + DRY抽离spxy/ks + GridSearchCV→RandomizedSearchCV + smoke test死链修复

This commit is contained in:
DXC
2026-06-18 11:18:27 +08:00
parent 3ee4e90b31
commit d6c003a211
6 changed files with 201 additions and 450 deletions

View File

@ -13,7 +13,7 @@ from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.model_selection import GridSearchCV, cross_val_score, KFold, train_test_split
from sklearn.model_selection import RandomizedSearchCV, cross_val_score, KFold, train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.cross_decomposition import PLSRegression
from sklearn.ensemble import GradientBoostingRegressor, AdaBoostRegressor, ExtraTreesRegressor
@ -45,6 +45,7 @@ is_frozen_env = getattr(sys, 'frozen', False)
safe_n_jobs = 1 if is_frozen_env else -1
from src.preprocessing.spectral_Preprocessing import Preprocessing
from src.core.utils.split_methods import spxy, ks
class WaterQualityModelingBatch:
@ -420,159 +421,12 @@ class WaterQualityModelingBatch:
return X_train, X_test, y_train, y_test
def spxy(self, data, label, test_size=0.2):
"""
SPXY算法划分数据集考虑X和Y空间的距离
Args:
data: shape (n_samples, n_features)
label: shape (n_samples, )
test_size: 测试集比例,默认: 0.2
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_samples, )
y_test: (n_samples, )
"""
# 确保 data 和 label 是 NumPy 数组
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
# 备份原始数据和标签
x_backup = data
y_backup = label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
# 归一化标签数据
label = (label - np.mean(label)) / np.std(label)
D = np.zeros((M, M))
Dy = np.zeros((M, M))
# 计算样本之间的距离
for i in range(M - 1):
xa = data[i, :]
ya = label[i]
for j in range((i + 1), M):
xb = data[j, :]
yb = label[j]
D[i, j] = np.linalg.norm(xa - xb)
Dy[i, j] = np.linalg.norm(ya - yb)
# 距离归一化
Dmax = np.max(D)
Dymax = np.max(Dy)
D = D / Dmax + Dy / Dymax
# 找到最远的两个点
maxD = D.max(axis=0)
index_row = D.argmax(axis=0)
index_column = maxD.argmax()
m = np.zeros(N, dtype=int)
m[0] = index_row[index_column]
m[1] = index_column
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
# 根据距离选择训练集
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros(M - i)
for j in range(M - i):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(samples, m)
# 划分训练集和测试集
X_train = data[m, :]
y_train = y_backup[m]
X_test = data[m_complement, :]
y_test = y_backup[m_complement]
return X_train, X_test, y_train, y_test
"""SPXY算法划分数据集委托至 src.core.utils.split_methods.spxy"""
return spxy(data, label, test_size=test_size)
def ks(self, data, label, test_size=0.2):
"""
Kennard-Stone算法划分数据集
Args:
data: shape (n_samples, n_features)
label: shape (n_sample, )
test_size: 测试集比例,默认: 0.2
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_samples, )
y_test: (n_samples, )
"""
# 确保 data 和 label 是 NumPy 数组
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
D = np.zeros((M, M))
for i in range((M - 1)):
xa = data[i, :]
for j in range((i + 1), M):
xb = data[j, :]
D[i, j] = np.linalg.norm(xa - xb)
maxD = np.max(D, axis=0)
index_row = np.argmax(D, axis=0)
index_column = np.argmax(maxD)
m = np.zeros(N)
m[0] = np.array(index_row[index_column])
m[1] = np.array(index_column)
m = m.astype(int)
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros((M - i))
for j in range((M - i)):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(np.arange(data.shape[0]), m)
X_train = data[m, :]
y_train = label[m]
X_test = data[m_complement, :]
y_test = label[m_complement]
return X_train, X_test, y_train, y_test
"""Kennard-Stone算法划分数据集委托至 src.core.utils.split_methods.ks"""
return ks(data, label, test_size=test_size)
def split_data(self, X: np.ndarray, y: pd.Series, method: str = "random",
test_size: float = 0.2, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
@ -652,21 +506,20 @@ class WaterQualityModelingBatch:
elif model_name == 'LightGBM':
base_model.set_params(verbose=-1)
# 网格搜索 - 使用KFold代替StratifiedKFold
# 随机搜索 —— 替代穷举式 GridSearchCV大幅降低寻优时间
cv_strategy = KFold(n_splits=cv_folds, shuffle=True, random_state=random_state)
grid_search = GridSearchCV(
grid_search = RandomizedSearchCV(
base_model,
config['params'],
n_iter=10,
cv=cv_strategy,
scoring=scoring,
n_jobs=safe_n_jobs,
verbose=1
random_state=random_state,
verbose=1,
)
# 在训练集上训练模型
# with parallel_backend("threading", n_jobs=-1):
# grid_search.fit(X_train, y_train)
grid_search.fit(X_train, y_train)
# 获取最佳模型

View File

@ -13,6 +13,7 @@ import sys
import os
from src.preprocessing.spectral_Preprocessing import Preprocessing
from src.core.utils.split_methods import spxy, ks
# try:
# from modeling import WaterQualityModeling
@ -138,159 +139,12 @@ class WaterQualityInference:
return X_train, X_test, y_train, y_test
def spxy(self, data, label, test_size=0.2):
"""
SPXY算法划分数据集考虑X和Y空间的距离
Args:
data: shape (n_samples, n_features)
label: shape (n_samples, )
test_size: 测试集比例,默认: 0.2
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_samples, )
y_test: (n_samples, )
"""
# 确保 data 和 label 是 NumPy 数组
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
# 备份原始数据和标签
x_backup = data
y_backup = label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
# 归一化标签数据
label = (label - np.mean(label)) / np.std(label)
D = np.zeros((M, M))
Dy = np.zeros((M, M))
# 计算样本之间的距离
for i in range(M - 1):
xa = data[i, :]
ya = label[i]
for j in range((i + 1), M):
xb = data[j, :]
yb = label[j]
D[i, j] = np.linalg.norm(xa - xb)
Dy[i, j] = np.linalg.norm(ya - yb)
# 距离归一化
Dmax = np.max(D)
Dymax = np.max(Dy)
D = D / Dmax + Dy / Dymax
# 找到最远的两个点
maxD = D.max(axis=0)
index_row = D.argmax(axis=0)
index_column = maxD.argmax()
m = np.zeros(N, dtype=int)
m[0] = index_row[index_column]
m[1] = index_column
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
# 根据距离选择训练集
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros(M - i)
for j in range(M - i):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(samples, m)
# 划分训练集和测试集
X_train = data[m, :]
y_train = y_backup[m]
X_test = data[m_complement, :]
y_test = y_backup[m_complement]
return X_train, X_test, y_train, y_test
"""SPXY算法划分数据集委托至 src.core.utils.split_methods.spxy"""
return spxy(data, label, test_size=test_size)
def ks(self, data, label, test_size=0.2):
"""
Kennard-Stone算法划分数据集
Args:
data: shape (n_samples, n_features)
label: shape (n_sample, )
test_size: 测试集比例,默认: 0.2
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_samples, )
y_test: (n_samples, )
"""
# 确保 data 和 label 是 NumPy 数组
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
D = np.zeros((M, M))
for i in range((M - 1)):
xa = data[i, :]
for j in range((i + 1), M):
xb = data[j, :]
D[i, j] = np.linalg.norm(xa - xb)
maxD = np.max(D, axis=0)
index_row = np.argmax(D, axis=0)
index_column = np.argmax(maxD)
m = np.zeros(N)
m[0] = np.array(index_row[index_column])
m[1] = np.array(index_column)
m = m.astype(int)
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros((M - i))
for j in range((M - i)):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(np.arange(data.shape[0]), m)
X_train = data[m, :]
y_train = label[m]
X_test = data[m_complement, :]
y_test = label[m_complement]
return X_train, X_test, y_train, y_test
"""Kennard-Stone算法划分数据集委托至 src.core.utils.split_methods.ks"""
return ks(data, label, test_size=test_size)
def split_data(self, X: np.ndarray, y: pd.Series, method: str = "random",
test_size: float = 0.2, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:

View File

@ -24,6 +24,7 @@ from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.model_selection import GridSearchCV, cross_val_score, KFold, train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.cross_decomposition import PLSRegression
from src.core.utils.split_methods import spxy, ks
# 第三方模型导入
# try:
@ -256,133 +257,12 @@ class WaterQualityScatterBatch:
return X_train, X_test, y_train, y_test
def spxy(self, data, label, test_size=0.2):
"""SPXY算法划分数据集"""
# 确保 data 和 label 是 NumPy 数组
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
# 备份原始数据和标签
x_backup = data
y_backup = label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
# 归一化标签数据
label = (label - np.mean(label)) / np.std(label)
D = np.zeros((M, M))
Dy = np.zeros((M, M))
# 计算样本之间的距离
for i in range(M - 1):
xa = data[i, :]
ya = label[i]
for j in range((i + 1), M):
xb = data[j, :]
yb = label[j]
D[i, j] = np.linalg.norm(xa - xb)
Dy[i, j] = np.linalg.norm(ya - yb)
# 距离归一化
Dmax = np.max(D)
Dymax = np.max(Dy)
D = D / Dmax + Dy / Dymax
# 找到最远的两个点
maxD = D.max(axis=0)
index_row = D.argmax(axis=0)
index_column = maxD.argmax()
m = np.zeros(N, dtype=int)
m[0] = index_row[index_column]
m[1] = index_column
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
# 根据距离选择训练集
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros(M - i)
for j in range(M - i):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(samples, m)
# 划分训练集和测试集
X_train = data[m, :]
y_train = y_backup[m]
X_test = data[m_complement, :]
y_test = y_backup[m_complement]
return X_train, X_test, y_train, y_test
"""SPXY算法划分数据集(委托至 src.core.utils.split_methods.spxy"""
return spxy(data, label, test_size=test_size)
def ks(self, data, label, test_size=0.2):
"""Kennard-Stone算法划分数据集"""
# 确保 data 和 label 是 NumPy 数组
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
D = np.zeros((M, M))
for i in range((M - 1)):
xa = data[i, :]
for j in range((i + 1), M):
xb = data[j, :]
D[i, j] = np.linalg.norm(xa - xb)
maxD = np.max(D, axis=0)
index_row = np.argmax(D, axis=0)
index_column = np.argmax(maxD)
m = np.zeros(N)
m[0] = np.array(index_row[index_column])
m[1] = np.array(index_column)
m = m.astype(int)
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros((M - i))
for j in range((M - i)):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(np.arange(data.shape[0]), m)
X_train = data[m, :]
y_train = label[m]
X_test = data[m_complement, :]
y_test = label[m_complement]
return X_train, X_test, y_train, y_test
"""Kennard-Stone算法划分数据集(委托至 src.core.utils.split_methods.ks"""
return ks(data, label, test_size=test_size)
def split_data(self, X: np.ndarray, y: pd.Series, method: str = "random",
test_size: float = 0.2, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:

View File

@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
"""
数据集划分算法 —— SPXY / Kennard-Stone
从 modeling_batch.py / inference_batch.py / sctter_batch.py 中抽离,
消除三处完全相同的重复实现。
"""
import numpy as np
import pandas as pd
def spxy(data, label, test_size=0.2):
"""
SPXY算法划分数据集考虑X和Y空间的距离
Args:
data: shape (n_samples, n_features) —— np.ndarray 或 pd.DataFrame
label: shape (n_samples, ) —— np.ndarray 或 pd.Series
test_size: 测试集比例,默认: 0.2
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_samples, )
y_test: (n_samples, )
"""
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
x_backup = data
y_backup = label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
label = (label - np.mean(label)) / np.std(label)
D = np.zeros((M, M))
Dy = np.zeros((M, M))
for i in range(M - 1):
xa = data[i, :]
ya = label[i]
for j in range((i + 1), M):
xb = data[j, :]
yb = label[j]
D[i, j] = np.linalg.norm(xa - xb)
Dy[i, j] = np.linalg.norm(ya - yb)
Dmax = np.max(D)
Dymax = np.max(Dy)
D = D / Dmax + Dy / Dymax
maxD = D.max(axis=0)
index_row = D.argmax(axis=0)
index_column = maxD.argmax()
m = np.zeros(N, dtype=int)
m[0] = index_row[index_column]
m[1] = index_column
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros(M - i)
for j in range(M - i):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(samples, m)
X_train = data[m, :]
y_train = y_backup[m]
X_test = data[m_complement, :]
y_test = y_backup[m_complement]
return X_train, X_test, y_train, y_test
def ks(data, label, test_size=0.2):
"""
Kennard-Stone算法划分数据集
Args:
data: shape (n_samples, n_features) —— np.ndarray 或 pd.DataFrame
label: shape (n_samples, ) —— np.ndarray 或 pd.Series
test_size: 测试集比例,默认: 0.2
Returns:
X_train: (n_samples, n_features)
X_test: (n_samples, n_features)
y_train: (n_samples, )
y_test: (n_samples, )
"""
data = data.to_numpy() if isinstance(data, pd.DataFrame) else data
label = label.to_numpy() if isinstance(label, pd.Series) else label
M = data.shape[0]
N = round((1 - test_size) * M)
samples = np.arange(M)
D = np.zeros((M, M))
for i in range((M - 1)):
xa = data[i, :]
for j in range((i + 1), M):
xb = data[j, :]
D[i, j] = np.linalg.norm(xa - xb)
maxD = np.max(D, axis=0)
index_row = np.argmax(D, axis=0)
index_column = np.argmax(maxD)
m = np.zeros(N)
m[0] = np.array(index_row[index_column])
m[1] = np.array(index_column)
m = m.astype(int)
dminmax = np.zeros(N)
dminmax[1] = D[m[0], m[1]]
for i in range(2, N):
pool = np.delete(samples, m[:i])
dmin = np.zeros((M - i))
for j in range((M - i)):
indexa = pool[j]
d = np.zeros(i)
for k in range(i):
indexb = m[k]
if indexa < indexb:
d[k] = D[indexa, indexb]
else:
d[k] = D[indexb, indexa]
dmin[j] = np.min(d)
dminmax[i] = np.max(dmin)
index = np.argmax(dmin)
m[i] = pool[index]
m_complement = np.delete(np.arange(data.shape[0]), m)
X_train = data[m, :]
y_train = label[m]
X_test = data[m_complement, :]
y_test = label[m_complement]
return X_train, X_test, y_train, y_test

View File

@ -27,7 +27,7 @@ from PyQt5.QtCore import Qt
from PyQt5.QtGui import QBrush, QColor, QFont
from PyQt5.QtWidgets import (
QCheckBox, QGroupBox, QHBoxLayout, QLabel, QListWidget,
QListWidgetItem, QMessageBox, QPushButton, QVBoxLayout,
QListWidgetItem, QMessageBox, QPushButton, QSizePolicy, QVBoxLayout,
)
from src.gui.components.custom_widgets import FileSelectWidget
@ -127,6 +127,8 @@ class Step7View(BaseView):
self.formula_list = QListWidget()
self.formula_list.setSelectionMode(QListWidget.MultiSelection)
self.formula_list.setMinimumHeight(300)
self.formula_list.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
# view 层不需要 itemChanged 副作用service 接管时再启用
self.formula_list.blockSignals(True)
formula_outer_layout.addWidget(self.formula_list)
@ -149,7 +151,7 @@ class Step7View(BaseView):
self.run_btn = QPushButton("立即执行计算")
self.run_btn.setStyleSheet(ModernStylesheet.get_button_stylesheet("success"))
self.run_btn.setMinimumHeight(40)
self.run_btn.clicked.connect(self._on_run_clicked)
self.run_btn.clicked.connect(self._on_run_single_clicked)
layout.addWidget(self.run_btn)
layout.addStretch()
@ -316,5 +318,7 @@ class Step7View(BaseView):
# ------------------------------------------------------------------
# 执行入口
# ------------------------------------------------------------------
def _on_run_clicked(self):
self.dispatch_execute("step7", self.get_config())
def _on_run_single_clicked(self):
from src.gui.core.event_bus import global_event_bus
config = self.get_config()
global_event_bus.publish('RequestRunSingleStep', {'step_name': 'step7', 'config': config})

View File

@ -3,40 +3,42 @@
Smoke test for: 彻底修复底层写入路径与掩膜联动
验证三件事(不依赖 GUI / 不实例化 Pipeline避免触发 osgeo/gdal 导入):
1. pipeline.step10_map 内部对 output_image_path 的 override 逻辑正确:
1. Step12KrigingHandler.execute 内部对 output_image_path 的 override 逻辑正确:
- 路径不在 visualization_dir 下 → 被强制重定向
- 路径在 visualization_dir 下 → 保留
- 路径为 None/空 → 用 forced 默认值
2. step11_map_panel.update_from_config 含 pipeline.get_step_output_dir('step1') 调用
3. _step_path_resolver._FALLBACK_DIR_TABLE['water_mask'] == '1_water_mask'
注:原 water_quality_inversion_pipeline_GUI.py 已删除,
step10_map() 逻辑已迁移至 Step12KrigingHandlersrc/core/handlers/step12_kriging.py
"""
import re
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
PIPELINE_FILE = ROOT / "src" / "core" / "water_quality_inversion_pipeline_GUI.py"
HANDLER_FILE = ROOT / "src" / "core" / "handlers" / "step12_kriging.py"
PANEL_FILE = ROOT / "src" / "gui" / "panels" / "step11_map_panel.py"
RESOLVER_FILE = ROOT / "src" / "gui" / "panels" / "_step_path_resolver.py"
MAP_FILE = ROOT / "src" / "postprocessing" / "map.py"
def test_step10_map_forced_override():
"""纯文本级检查 step10_map 是否含强制重定向逻辑。"""
text = PIPELINE_FILE.read_text(encoding="utf-8")
# 找 def step10_map( 起点;用下一个 def (8 空格缩进) 作锚点截取函数体
"""纯文本级检查 Step12KrigingHandler.execute 是否含强制重定向逻辑。"""
text = HANDLER_FILE.read_text(encoding="utf-8")
# 找 def execute( 起点;用下一个 def (4 空格缩进) 作锚点截取函数体
m = re.search(
r"def step10_map\([^\)]*\)[^\n]*:\n(.*?)(?=\n def |\nclass |\Z)",
r"def execute\(self, context[^\)]*\)[^\n]*:\n(.*?)(?=\n def |\nclass |\Z)",
text, re.DOTALL,
)
assert m, "找不到 step10_map 函数"
assert m, "找不到 execute 方法"
body = m.group(1)
# 关键标记
assert "forced_image_path" in body, "step10_map 应计算 forced_image_path"
assert "强制重定向" in body, "step10_map 应有重定向提示文本"
assert "self.visualization_dir" in body, "step10_map 应引用 self.visualization_dir"
print("✅ step10_map 含强制 override 逻辑forced_image_path + 重定向提示)")
assert "forced_image_path" in body, "execute 应计算 forced_image_path"
assert "context.visualization_dir" in body, "execute 应引用 context.visualization_dir"
print("✅ Step12KrigingHandler.execute 含强制 override 逻辑forced_image_path")
def test_step10_map_accepts_in_visualization_dir():