services/step2-5:打通前四个预处理步骤的真实后端独立服务

新增 src/new/services/{step2,step3,step4,step5}_service.py 四个独立后端服务:
This commit is contained in:
DXC
2026-06-17 09:15:22 +08:00
parent ef3de632d3
commit f8d5ea2eb8
6 changed files with 672 additions and 13 deletions

View File

@ -293,7 +293,7 @@ def smoke_e2e():
"mode=ndwi" in log_text,
"→ 找 ['mode=ndwi']")
# ---- 切到 step2真实 viewservice 仍走占位 ----
# ---- 切到 step2真实 view + 真实 service ----
win.nav_list.setCurrentRow(1)
view_step2 = win._views.get("step2")
report("L3", "_views['step2'] 是真实 Step2View已迁移",
@ -310,8 +310,8 @@ def smoke_e2e():
log_text = win.log_text.toPlainText()
report("L3", "step2 真实 view 派发后日志含 Router 收到 step2 请求",
"收到 step2 请求" in log_text)
report("L3", "step2 service 走占位:日志含 Service… not_implemented",
"not_implemented" in log_text,
report("L3", "step2 真实 service 已迁移:空 img_path 触发 Service✗ 错误分支",
"[Service✗]" in log_text and "execute_step2" in log_text,
f"log 片段:{log_text[-200:]!r}")
win.close()

View File

@ -68,38 +68,38 @@ ROUTES = [
"service_module": "src.new.services.step1_service",
"service_func": "execute_step1",
},
# -------- 业务路由(view 全部已迁移;service 仍走占位 --------
# -------- 业务路由(前 4 个预处理步骤已迁移到真实 service --------
{
"id": "step2",
"name": "2. 耀斑检测",
"view_module": "src.new.views.step2_view",
"view_class": "Step2View",
"service_module": "src.new.services.placeholder_service",
"service_func": "execute_placeholder",
"service_module": "src.new.services.step2_service",
"service_func": "execute_step2",
},
{
"id": "step3",
"name": "3. 耀斑去除",
"view_module": "src.new.views.step3_view",
"view_class": "Step3View",
"service_module": "src.new.services.placeholder_service",
"service_func": "execute_placeholder",
"service_module": "src.new.services.step3_service",
"service_func": "execute_step3",
},
{
"id": "step4",
"name": "4. 采样点布设",
"view_module": "src.new.views.step4_view",
"view_class": "Step4View",
"service_module": "src.new.services.placeholder_service",
"service_func": "execute_placeholder",
"service_module": "src.new.services.step4_service",
"service_func": "execute_step4",
},
{
"id": "step5",
"name": "5. 数据清洗",
"view_module": "src.new.views.step5_view",
"view_class": "Step5View",
"service_module": "src.new.services.placeholder_service",
"service_func": "execute_placeholder",
"service_module": "src.new.services.step5_service",
"service_func": "execute_step5",
},
{
"id": "step6",
@ -233,7 +233,7 @@ class MainView(QMainWindow):
self._build_routes()
self._log("[Boot] MainView 初始化完成")
self._log(f"[Boot] 已注册 {len(ROUTES)} 条路由13 个 view 全部真实service 仍走占位")
self._log(f"[Boot] 已注册 {len(ROUTES)} 条路由13 个 view 全部真实,前 5 个步骤 service 已迁移")
# ------------------------------------------------------------------
# UI 布局:左侧导航 + 右侧 stacked + 底部日志

View File

@ -0,0 +1,170 @@
# -*- coding: utf-8 -*-
"""
Step2 后端计算服务(耀斑区域识别)
====================================
纯计算函数——绝对不引用 PyQt、绝对不引用 main_view、绝对不读写全局变量。它只
1. 从 ``config`` 字典读取参数;
2. 调用旧版 ``GlintDetectionStep.run`` 执行耀斑检测;
3. 返回结果字典 ``{status, output_path, message, mode}``。
调用入口(由 main_view 在后台 QThread 中调用):
execute_step2({
"img_path": "D:/ref.bsq", # 输入影像
"glint_wave": 750.0, # 耀斑检测波长 (nm)
"method": "otsu", # otsu/zscore/percentile/iqr/adaptive/multi_band
"max_area": 50, # 最大连通域面积可选0 表示不过滤)
"buffer_size": 10, # 岸边缓冲区大小可选0 表示不设置)
"water_mask_path": "D:/mask.dat", # 水域掩膜(可选)
"output_path": "D:/severe_glint_area.dat", # 耀斑掩膜输出
"work_dir": "D:/workspace", # 工作目录main_view 注入)
})
返回字典字段:
* ``status`` : "completed" | "skipped" | "error"
* ``output_path`` : 生成的 .dat 耀斑掩膜文件路径(失败时为 None
* ``message`` : 人类可读说明
* ``mode`` : "ndwi" | "otsu" | "zscore" | ...(实际调用的方法,便于 UI 提示)
注意:与 step1_service 一致,``status`` 沿用 "completed" 语义main_view._on_step_done
据此走 Service✓ 成功分支),对应用户契约中的 "success"
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict
from src.core.steps.glint_detection_step import GlintDetectionStep
def _resolve_glint_dir(output_path: str | None, work_dir: str) -> Path:
"""根据 output_path / work_dir 计算耀斑检测输出目录"""
if output_path:
return Path(output_path).parent
return Path(work_dir) / "2_Glint_Detection"
def _clean_int_param(value: Any, default: int = 0) -> int | None:
"""SpinBox 的 0 值表示"不设置",转 None 透传给底层"""
if value is None:
return default
try:
v = int(value)
except (TypeError, ValueError):
return default
return v if v > 0 else None
def execute_step2(config: Dict[str, Any]) -> Dict[str, Any]:
"""Step 2 后端计算入口——纯函数
Args:
config: 由前端 view.get_config() 序列化、再经 main_view 注入 work_dir 的字典
Returns:
标准结果字典 ``{status, output_path, message, mode}``
"""
# ---------- 入参规整 ----------
img_path = config.get("img_path")
glint_wave = float(config.get("glint_wave", 750.0))
method = str(config.get("method", "otsu")).lower()
enabled = bool(config.get("enabled", True))
water_mask_path = config.get("water_mask_path")
output_path = config.get("output_path")
work_dir = config.get("work_dir") or "."
glint_dir = _resolve_glint_dir(output_path, work_dir)
# ---------- 提前失败检查 ----------
if not enabled:
return {
"status": "skipped",
"output_path": None,
"message": "用户禁用此步骤enabled=False",
"mode": method,
}
if not img_path:
return {
"status": "error",
"output_path": None,
"message": "未提供输入影像路径img_path",
"mode": method,
}
if not Path(img_path).exists():
return {
"status": "error",
"output_path": None,
"message": f"输入影像不存在: {img_path}",
"mode": method,
}
# ---------- 构建底层 kwargs可选字段None 一律不传,让底层走默认) ----------
kwargs: Dict[str, Any] = {
"img_path": img_path,
"glint_wave": glint_wave,
"method": method,
"water_mask_path": water_mask_path,
"glint_dir": glint_dir,
"output_path": output_path,
"callback": None, # 日志由 main_view 统一接管
}
max_area = _clean_int_param(config.get("max_area"))
if max_area is not None:
kwargs["max_area"] = max_area
buffer_size = _clean_int_param(config.get("buffer_size"))
if buffer_size is not None:
kwargs["buffer_size"] = buffer_size
# multi_band 专用参数(如有)
if method == "multi_band":
for key in ("multi_band_waves", "sub_method", "weights"):
if config.get(key) is not None:
kwargs[key] = config[key]
# ---------- 执行(包一层 try/except 把异常转 dict避免炸线程 ----------
try:
result_path = GlintDetectionStep.run(**kwargs)
except FileNotFoundError as e:
return {
"status": "error",
"output_path": None,
"message": f"文件不存在: {e}",
"mode": method,
}
except ValueError as e:
return {
"status": "error",
"output_path": None,
"message": f"参数错误: {e}",
"mode": method,
}
except Exception as e: # noqa: BLE001 —— service 层兜底捕获所有
return {
"status": "error",
"output_path": None,
"message": f"{type(e).__name__}: {e}",
"mode": method,
}
# ---------- 成功路径 ----------
p = Path(result_path)
if not p.exists():
return {
"status": "error",
"output_path": None,
"message": f"GlintDetectionStep.run 未生成文件: {result_path}",
"mode": method,
}
return {
"status": "completed",
"output_path": str(p).replace("\\", "/"),
"message": f"耀斑掩膜已生成: {p.name}",
"mode": method,
}

View File

@ -0,0 +1,207 @@
# -*- coding: utf-8 -*-
"""
Step3 后端计算服务(耀斑去除)
====================================
纯计算函数——绝对不引用 PyQt、绝对不引用 main_view、绝对不读写全局变量。它只
1. 从 ``config`` 字典读取参数;
2. 调用旧版 ``GlintRemovalStep.run`` 执行去耀斑4 种方法Goodman/Kutser/Hedley/SUGAR
3. 返回结果字典 ``{status, output_path, message, mode}``。
调用入口(由 main_view 在后台 QThread 中调用):
execute_step3({
"img_path": "D:/ref.bsq", # 输入影像(去耀斑前)
"method": "goodman", # goodman/kutser/hedley/sugar
"enabled": True,
"interpolate_zeros": False, # 是否先做 0 值像素插值
"interpolation_method": "bilinear",
"water_mask_path": "D:/mask.dat", # 水域掩膜(可选)
"output_path": "D:/deglint_image.bsq",
# 方法专属参数(按 method 任选一组)
"nir_lower": 65, "nir_upper": 91, "goodman_A": 1.9e-5, "goodman_B": 0.1, # goodman
"oxy_band": 38, "lower_oxy": 36, "upper_oxy": 49, "nir_band": 47, # kutser
"hedley_nir_band": 47, # hedley
"sugar_iter": 3, "sugar_sigma": 1.0, "sugar_estimate_background": True,
"sugar_glint_mask_method": "cdf", "sugar_termination_thresh": 20.0,
"sugar_bounds": [(1, 2)], # sugar
"work_dir": "D:/workspace", # 工作目录main_view 注入)
})
返回字典字段:
* ``status`` : "completed" | "skipped" | "error"
* ``output_path`` : 生成的 .bsq 去耀斑影像路径(失败时为 None
* ``message`` : 人类可读说明
* ``mode`` : 实际调用的去耀斑方法名,便于 UI 提示
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict
from src.core.steps.glint_removal_step import GlintRemovalStep
def _resolve_dirs(output_path: str | None, work_dir: str) -> tuple[Path, Path]:
"""根据 output_path / work_dir 计算 (deglint_dir, water_mask_dir)"""
if output_path:
deglint_dir = Path(output_path).parent
else:
deglint_dir = Path(work_dir) / "3_Deglint"
water_mask_dir = Path(work_dir) / "1_water_mask"
return deglint_dir, water_mask_dir
def _normalize_method(method: str) -> str:
"""方法名标准化(与 GlintRemovalStep.run 内部规则保持一致)"""
raw = str(method).lower()
if "kutser" in raw:
return "kutser"
if "goodman" in raw:
return "goodman"
if "hedley" in raw:
return "hedley"
if "sugar" in raw:
return "sugar"
return raw
def _build_method_kwargs(method: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""按 method 从 config 中抽取对应的方法专属参数"""
if method == "goodman":
return {
"nir_lower": int(config.get("nir_lower", 65)),
"nir_upper": int(config.get("nir_upper", 91)),
"goodman_A": float(config.get("goodman_A", 0.000019)),
"goodman_B": float(config.get("goodman_B", 0.1)),
}
if method == "kutser":
return {
"oxy_band": int(config.get("oxy_band", 38)),
"lower_oxy": int(config.get("lower_oxy", 36)),
"upper_oxy": int(config.get("upper_oxy", 49)),
"nir_band": int(config.get("nir_band", 47)),
}
if method == "hedley":
return {
"hedley_nir_band": int(config.get("hedley_nir_band", 47)),
}
if method == "sugar":
bounds = config.get("sugar_bounds")
if bounds is None or not isinstance(bounds, (list, tuple)):
bounds = [(1, 2)]
return {
"sugar_iter": int(config.get("sugar_iter", 3)),
"sugar_sigma": float(config.get("sugar_sigma", 1.0)),
"sugar_estimate_background": bool(config.get("sugar_estimate_background", True)),
"sugar_glint_mask_method": str(config.get("sugar_glint_mask_method", "cdf")),
"sugar_termination_thresh": float(config.get("sugar_termination_thresh", 20.0)),
"sugar_bounds": bounds,
}
return {}
def execute_step3(config: Dict[str, Any]) -> Dict[str, Any]:
"""Step 3 后端计算入口——纯函数
Args:
config: 由前端 view.get_config() 序列化、再经 main_view 注入 work_dir 的字典
Returns:
标准结果字典 ``{status, output_path, message, mode}``
"""
# ---------- 入参规整 ----------
img_path = config.get("img_path")
raw_method = config.get("method", "goodman")
method = _normalize_method(raw_method)
enabled = bool(config.get("enabled", True))
interpolate_zeros = bool(config.get("interpolate_zeros", False))
interpolation_method = str(config.get("interpolation_method", "bilinear"))
water_mask_path = config.get("water_mask_path")
output_path = config.get("output_path")
work_dir = config.get("work_dir") or "."
deglint_dir, water_mask_dir = _resolve_dirs(output_path, work_dir)
# ---------- 提前失败检查 ----------
if not enabled:
return {
"status": "skipped",
"output_path": img_path,
"message": "用户禁用此步骤enabled=False保留原始影像",
"mode": method,
}
if not img_path:
return {
"status": "error",
"output_path": None,
"message": "未提供输入影像路径img_path",
"mode": method,
}
if not Path(img_path).exists():
return {
"status": "error",
"output_path": None,
"message": f"输入影像不存在: {img_path}",
"mode": method,
}
# ---------- 构建底层 kwargs ----------
method_kwargs = _build_method_kwargs(method, config)
kwargs: Dict[str, Any] = {
"img_path": img_path,
"method": method,
"water_mask": water_mask_path,
"interpolate_zeros": interpolate_zeros,
"interpolation_method": interpolation_method,
"deglint_dir": deglint_dir,
"water_mask_dir": water_mask_dir,
"output_path": output_path,
"callback": None, # 日志由 main_view 统一接管
}
kwargs.update(method_kwargs)
# ---------- 执行(包一层 try/except 把异常转 dict避免炸线程 ----------
try:
result_path = GlintRemovalStep.run(**kwargs)
except FileNotFoundError as e:
return {
"status": "error",
"output_path": None,
"message": f"文件不存在: {e}",
"mode": method,
}
except ValueError as e:
return {
"status": "error",
"output_path": None,
"message": f"参数错误: {e}",
"mode": method,
}
except Exception as e: # noqa: BLE001 —— service 层兜底捕获所有
return {
"status": "error",
"output_path": None,
"message": f"{type(e).__name__}: {e}",
"mode": method,
}
# ---------- 成功路径 ----------
p = Path(result_path)
if not p.exists():
return {
"status": "error",
"output_path": None,
"message": f"GlintRemovalStep.run 未生成文件: {result_path}",
"mode": method,
}
return {
"status": "completed",
"output_path": str(p).replace("\\", "/"),
"message": f"去耀斑影像已生成: {p.name}",
"mode": method,
}

View File

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
"""
Step4 后端计算服务(采样点布设)
====================================
纯计算函数——绝对不引用 PyQt、绝对不引用 main_view、绝对不读写全局变量。它只
1. 从 ``config`` 字典读取参数;
2. 调用旧版 ``PredictionStep.generate_sampling_points`` 在去耀斑影像上按规则
生成采样点并提取平均光谱(自适应 / 固定间隔);
3. 返回结果字典 ``{status, output_path, message, mode}``。
调用入口(由 main_view 在后台 QThread 中调用):
execute_step4({
"deglint_img_path": "D:/deglint_image.bsq", # 去耀斑影像
"water_mask_path": "D:/water_mask.dat", # 水域掩膜(可选)
"interval": 50, # 采样点间隔 (像素)
"sample_radius": 5, # 采样半径 (像素)
"chunk_size": 1000, # 分块大小(控制内存)
"use_adaptive_sampling": True, # 是否按水体宽度自适应
"enabled": True,
"output_path": "D:/sampling_spectra.csv", # 输出采样点 CSV
"work_dir": "D:/workspace", # 工作目录main_view 注入)
})
返回字典字段:
* ``status`` : "completed" | "skipped" | "error"
* ``output_path`` : 生成的 .csv 采样点光谱文件路径(失败时为 None
* ``message`` : 人类可读说明
* ``mode`` : "adaptive" | "fixed"(采样模式,便于 UI 提示)
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict
from src.core.steps.prediction_step import PredictionStep
def _resolve_output_dir(output_path: str | None, work_dir: str) -> Path:
"""根据 output_path / work_dir 计算采样点输出目录"""
if output_path:
return Path(output_path).parent
return Path(work_dir) / "4_Sampling"
def execute_step4(config: Dict[str, Any]) -> Dict[str, Any]:
"""Step 4 后端计算入口——纯函数
Args:
config: 由前端 view.get_config() 序列化、再经 main_view 注入 work_dir 的字典
Returns:
标准结果字典 ``{status, output_path, message, mode}``
"""
# ---------- 入参规整 ----------
deglint_img_path = config.get("deglint_img_path")
water_mask_path = config.get("water_mask_path")
interval = int(config.get("interval", 50))
sample_radius = int(config.get("sample_radius", 5))
chunk_size = int(config.get("chunk_size", 1000))
use_adaptive_sampling = bool(config.get("use_adaptive_sampling", True))
enabled = bool(config.get("enabled", True))
output_path = config.get("output_path")
work_dir = config.get("work_dir") or "."
output_dir = _resolve_output_dir(output_path, work_dir)
mode = "adaptive" if use_adaptive_sampling else "fixed"
# ---------- 提前失败检查 ----------
if not enabled:
return {
"status": "skipped",
"output_path": None,
"message": "用户禁用此步骤enabled=False",
"mode": mode,
}
if not deglint_img_path:
return {
"status": "error",
"output_path": None,
"message": "未提供去耀斑影像路径deglint_img_path",
"mode": mode,
}
if not Path(deglint_img_path).exists():
return {
"status": "error",
"output_path": None,
"message": f"去耀斑影像不存在: {deglint_img_path}",
"mode": mode,
}
# ---------- 执行(包一层 try/except 把异常转 dict避免炸线程 ----------
try:
result_path = PredictionStep.generate_sampling_points(
deglint_img_path=deglint_img_path,
interval=interval,
sample_radius=sample_radius,
chunk_size=chunk_size,
water_mask_path=water_mask_path,
glint_mask_path=None, # 耀斑掩膜由 step2 产出,新架构未在 view 暴露
output_dir=output_dir,
use_adaptive_sampling=use_adaptive_sampling,
callback=None, # 日志由 main_view 统一接管
)
except FileNotFoundError as e:
return {
"status": "error",
"output_path": None,
"message": f"文件不存在: {e}",
"mode": mode,
}
except ValueError as e:
return {
"status": "error",
"output_path": None,
"message": f"参数错误: {e}",
"mode": mode,
}
except Exception as e: # noqa: BLE001 —— service 层兜底捕获所有
return {
"status": "error",
"output_path": None,
"message": f"{type(e).__name__}: {e}",
"mode": mode,
}
# ---------- 成功路径 ----------
p = Path(result_path)
if not p.exists():
return {
"status": "error",
"output_path": None,
"message": f"PredictionStep.generate_sampling_points 未生成文件: {result_path}",
"mode": mode,
}
return {
"status": "completed",
"output_path": str(p).replace("\\", "/"),
"message": f"采样点光谱数据已保存: {p.name}",
"mode": mode,
}

View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
"""
Step5 后端计算服务(数据清洗)
====================================
纯计算函数——绝对不引用 PyQt、绝对不引用 main_view、绝对不读写全局变量。它只
1. 从 ``config`` 字典读取参数;
2. 调用旧版 ``DataPreparationStep.process_csv`` 读取水质 CSV
筛选剔除异常值后写出新的 CSV
3. 返回结果字典 ``{status, output_path, message, mode}``。
调用入口(由 main_view 在后台 QThread 中调用):
execute_step5({
"csv_path": "D:/water_quality.csv", # 输入水质参数 CSV
"enabled": True,
"output_path": "D:/processed_data.csv", # 处理后输出 CSV
"work_dir": "D:/workspace", # 工作目录main_view 注入)
})
返回字典字段:
* ``status`` : "completed" | "skipped" | "error"
* ``output_path`` : 生成的 .csv 清洗后文件路径(失败时为 None
* ``message`` : 人类可读说明
* ``mode`` : "csv_clean"(便于 UI 提示)
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict
from src.core.steps.data_preparation_step import DataPreparationStep
def _resolve_output_dir(output_path: str | None, work_dir: str) -> Path:
"""根据 output_path / work_dir 计算清洗后 CSV 输出目录"""
if output_path:
return Path(output_path).parent
return Path(work_dir) / "5_Data_Cleaning"
def execute_step5(config: Dict[str, Any]) -> Dict[str, Any]:
"""Step 5 后端计算入口——纯函数
Args:
config: 由前端 view.get_config() 序列化、再经 main_view 注入 work_dir 的字典
Returns:
标准结果字典 ``{status, output_path, message, mode}``
"""
# ---------- 入参规整 ----------
csv_path = config.get("csv_path")
enabled = bool(config.get("enabled", True))
output_path = config.get("output_path")
work_dir = config.get("work_dir") or "."
output_dir = _resolve_output_dir(output_path, work_dir)
mode = "csv_clean"
# ---------- 提前失败检查 ----------
if not enabled:
return {
"status": "skipped",
"output_path": None,
"message": "用户禁用此步骤enabled=False",
"mode": mode,
}
if not csv_path:
return {
"status": "error",
"output_path": None,
"message": "未提供水质参数 CSV 路径csv_path",
"mode": mode,
}
if not Path(csv_path).exists():
return {
"status": "error",
"output_path": None,
"message": f"水质参数 CSV 不存在: {csv_path}",
"mode": mode,
}
# ---------- 显式构造 output_path如未指定----------
# process_csv 内部会用 output_dir/processed_data.csv 自行拼接,
# 此处显式指定 output_path 以保持契约一致。
if not output_path:
output_path = str(output_dir / "processed_data.csv").replace("\\", "/")
# ---------- 执行(包一层 try/except 把异常转 dict避免炸线程 ----------
try:
result_path = DataPreparationStep.process_csv(
csv_path=csv_path,
output_dir=output_dir,
callback=None, # 日志由 main_view 统一接管
)
except FileNotFoundError as e:
return {
"status": "error",
"output_path": None,
"message": f"文件不存在: {e}",
"mode": mode,
}
except ValueError as e:
return {
"status": "error",
"output_path": None,
"message": f"参数错误: {e}",
"mode": mode,
}
except Exception as e: # noqa: BLE001 —— service 层兜底捕获所有
return {
"status": "error",
"output_path": None,
"message": f"{type(e).__name__}: {e}",
"mode": mode,
}
# ---------- 成功路径 ----------
p = Path(result_path)
if not p.exists():
return {
"status": "error",
"output_path": None,
"message": f"DataPreparationStep.process_csv 未生成文件: {result_path}",
"mode": mode,
}
return {
"status": "completed",
"output_path": str(p).replace("\\", "/"),
"message": f"清洗后 CSV 已保存: {p.name}",
"mode": mode,
}