# -*- coding: utf-8 -*- """ 三层冒烟:端到端模块化新架构(src/new/) Layer 1 service 单元:execute_step1 在各种错误路径返回正确 dict Layer 2 view offscreen:Step1View UI 行为符合契约(不依赖 main_view) Layer 3 end-to-end:MainView 创建 → 切到 step1 → 点击 → 日志验证 运行方式(项目根):: python _smoke_new_arch.py """ from __future__ import annotations import os import sys import tempfile from pathlib import Path # 把项目根加进 sys.path(脚本可能直接从根目录运行) _PROJECT_ROOT = Path(__file__).resolve().parent if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") # cmd.exe 默认 GBK 会让中文 / emoji 爆 UnicodeEncodeError # 强制 stdout / stderr 走 utf-8(仅在 stdout 有 reconfigure 接口时生效) for _stream in (sys.stdout, sys.stderr): if hasattr(_stream, "reconfigure"): try: _stream.reconfigure(encoding="utf-8", errors="replace") except Exception: # noqa: BLE001 pass PASS = "✅" FAIL = "❌" results: list[tuple[str, bool, str]] = [] def report(layer: str, name: str, ok: bool, detail: str = ""): tag = PASS if ok else FAIL results.append((layer, name, ok, detail)) print(f"{tag} [{layer}] {name}" + (f" —— {detail}" if detail else "")) # ========================================================================= # Layer 1: service 单元测试 # ========================================================================= def smoke_service(): print("\n" + "=" * 70) print("Layer 1 —— service 单元测试(src.new.services.step1_service)") print("=" * 70) from src.new.services.step1_service import execute_step1, _resolve_mode # ---- _resolve_mode 纯逻辑 ---- report("L1", "_resolve_mode use_ndwi=True", _resolve_mode({"use_ndwi": True, "mask_path": ""}) == "ndwi", "→ ndwi") report("L1", "_resolve_mode mask=.shp", _resolve_mode({"use_ndwi": False, "mask_path": "x.shp"}) == "shp_rasterize", "→ shp_rasterize") report("L1", "_resolve_mode mask=.dat", _resolve_mode({"use_ndwi": False, "mask_path": "x.dat"}) == "existing_mask", "→ existing_mask") report("L1", "_resolve_mode mask=None", _resolve_mode({"use_ndwi": False, "mask_path": None}) == "existing_mask", "→ existing_mask (默认)") # ---- execute_step1 错误路径 ---- # 路径 1:use_ndwi=True 但 img_path 不存在 r = execute_step1({ "use_ndwi": True, "img_path": "D:/__no_such_image__.dat", "ndwi_threshold": 0.4, }) report("L1", "execute_step1 NDWI + img_path 不存在 → status=error", r.get("status") == "error", f"status={r.get('status')}, message={r.get('message')!r}") report("L1", "execute_step1 NDWI + img_path 不存在 → mode=ndwi", r.get("mode") == "ndwi", f"mode={r.get('mode')}") # 路径 2:use_ndwi=False 但 mask_path 不存在 r = execute_step1({ "use_ndwi": False, "mask_path": "D:/__no_such_mask__.dat", "img_path": "D:/__no_such_image__.dat", }) report("L1", "execute_step1 existing mask 不存在 → status=error", r.get("status") == "error", f"status={r.get('status')}, message={r.get('message')!r}") report("L1", "execute_step1 existing mask 不存在 → mode=existing_mask", r.get("mode") == "existing_mask", f"mode={r.get('mode')}") # 路径 3:use_ndwi=False + shp 但 img_path 缺 r = execute_step1({ "use_ndwi": False, "mask_path": "D:/__no_such_mask__.shp", # 故意不传 img_path }) report("L1", "execute_step1 shp 但 img_path 缺 → status=error", r.get("status") == "error", f"status={r.get('status')}, message={r.get('message')!r}") report("L1", "execute_step1 shp 但 img_path 缺 → mode=shp_rasterize", r.get("mode") == "shp_rasterize", f"mode={r.get('mode')}") # 路径 4:work_dir 注入后不会被强制转 str 报错 r = execute_step1({ "use_ndwi": True, "img_path": "D:/__no_such_image__.dat", "work_dir": "D:/workspace", }) report("L1", "execute_step1 接受 work_dir 注入(不会抛)", isinstance(r, dict) and "status" in r, f"status={r.get('status')}") # ========================================================================= # Layer 2: view offscreen 测试 # ========================================================================= def smoke_view(): print("\n" + "=" * 70) print("Layer 2 —— view offscreen 测试(src.new.views.step1_view)") print("=" * 70) from PyQt5.QtWidgets import QApplication app = QApplication.instance() or QApplication(sys.argv) from src.new.views.step1_view import Step1View from src.new.core.base_view import BaseView # ---- 类契约 ---- report("L2", "Step1View 继承 BaseView", issubclass(Step1View, BaseView), "MRO[0] == 'Step1View', MRO[1] == 'BaseView'") # ---- 实例化 ---- view = Step1View() report("L2", "Step1View 实例化成功(无异常)", view is not None, f"view={view!r}") # ---- 9 个子控件非空 ---- attrs = [ "use_existing_radio", "use_ndwi_radio", "mask_file", "img_file", "ndwi_group", "ndwi_threshold", "output_file", "hint" if hasattr(view, "hint") else "enable_checkbox", "enable_checkbox", "run_btn", ] for a in attrs: if a == "hint" and not hasattr(view, a): continue report("L2", f"子控件 {a} 非空", getattr(view, a) is not None) # ---- 默认状态:existing 模式 ---- report("L2", "默认 existing 模式:mask_file.isHidden() == False", view.mask_file.isHidden() is False) report("L2", "默认 existing 模式:ndwi_group.isHidden() == True", view.ndwi_group.isHidden() is True) report("L2", "默认 existing 模式:output_file.isHidden() == True", view.output_file.isHidden() is True) # ---- 切换到 NDWI 模式 ---- view.use_ndwi_radio.setChecked(True) report("L2", "NDWI 模式:mask_file.isHidden() == True", view.mask_file.isHidden() is True) report("L2", "NDWI 模式:ndwi_group.isHidden() == False", view.ndwi_group.isHidden() is False) report("L2", "NDWI 模式:output_file.isHidden() == False", view.output_file.isHidden() is False) # ---- update_work_directory 触发自动填充(NDWI 模式下)---- tmpdir = tempfile.mkdtemp(prefix="wqgui_smoke_").replace("\\", "/") view.update_work_directory(tmpdir) expected = f"{tmpdir}/1_water_mask/water_mask_out.dat" actual = view.output_file.get_path() report("L2", "NDWI 模式 + update_work_directory 自动填输出路径", actual == expected, f"期望={expected!r}, 实际={actual!r}") # ---- get_config 双模式断言 ---- # NDWI 模式 cfg_ndwi = view.get_config() report("L2", "get_config NDWI 模式 use_ndwi=True", cfg_ndwi.get("use_ndwi") is True) report("L2", "get_config NDWI 模式 mask_path is None", cfg_ndwi.get("mask_path") is None) report("L2", "get_config NDWI 模式 output_path 已填", cfg_ndwi.get("output_path") == expected, f"output_path={cfg_ndwi.get('output_path')!r}") # existing 模式 view.use_existing_radio.setChecked(True) cfg_existing = view.get_config() report("L2", "get_config existing 模式 use_ndwi=False", cfg_existing.get("use_ndwi") is False) report("L2", "get_config existing 模式 mask_path 来自 mask_file(默认空串)", cfg_existing.get("mask_path") == "", f"mask_path={cfg_existing.get('mask_path')!r}") report("L2", "get_config existing 模式 output_path is None", cfg_existing.get("output_path") is None) # ---- set_config 回灌 ---- view.set_config({ "use_ndwi": True, "ndwi_threshold": 0.55, "img_path": "D:/test/img.dat", "output_path": "D:/test/mask.dat", }) report("L2", "set_config 后 use_ndwi=True", view.use_ndwi_radio.isChecked() is True) report("L2", "set_config 后 ndwi_threshold=0.55", abs(view.ndwi_threshold.value() - 0.55) < 1e-6, f"实际={view.ndwi_threshold.value()}") report("L2", "set_config 后 img_file 路径", view.img_file.get_path() == "D:/test/img.dat", f"实际={view.img_file.get_path()!r}") # ---- update_from_config 双字段 ---- view.update_from_config(tmpdir, pipeline={"k": "v"}) report("L2", "update_from_config work_dir 已缓存", view.work_dir == tmpdir, f"work_dir={view.work_dir!r}") report("L2", "update_from_config pipeline 已缓存", view.pipeline == {"k": "v"}, f"pipeline={view.pipeline!r}") # ---- _on_run_clicked 父链缺失时 RuntimeError 预期 ---- # 此时 view.parent() 是 None(root 级 QWidget),必然找不到 run_single_step try: view._on_run_clicked() report("L2", "_on_run_clicked 父链缺失应抛 RuntimeError", False, "未抛异常") except RuntimeError as e: report("L2", "_on_run_clicked 父链缺失抛 RuntimeError", True, str(e)[:80]) view.deleteLater() # ========================================================================= # Layer 3: end-to-end 测试 # ========================================================================= def smoke_e2e(): print("\n" + "=" * 70) print("Layer 3 —— end-to-end(MainView 路由 + TaskWorker 调度)") print("=" * 70) from PyQt5.QtCore import QEventLoop, QTimer from PyQt5.QtWidgets import QApplication app = QApplication.instance() or QApplication(sys.argv) from src.new.main_view import MainView, ROUTES win = MainView() report("L3", "MainView 实例化成功", win is not None) report("L3", "ROUTES 共 13 条", len(ROUTES) == 13, f"实际={len(ROUTES)}") report("L3", "nav_list 项数 == 13", win.nav_list.count() == 13, f"count={win.nav_list.count()}") report("L3", "stacked 子部件数 == 13", win.stacked.count() == 13, f"count={win.stacked.count()}") report("L3", "默认 currentRow == 0(step1)", win.nav_list.currentRow() == 0) # ---- 切到 step1 ---- win.nav_list.setCurrentRow(0) view_step1 = win._views.get("step1") report("L3", "_views['step1'] 是真实 Step1View", type(view_step1).__name__ == "Step1View", f"type={type(view_step1).__name__}") # ---- 模拟点击 step1.run_btn(NDWI 模式 + 不存在的 img_path → status=error)---- view_step1.use_ndwi_radio.setChecked(True) view_step1.img_file.set_path("D:/__no_such_img__.dat") view_step1.run_btn.click() # 等 TaskWorker 跑完(offscreen + QThread 信号回到主线程) loop = QEventLoop() QTimer.singleShot(2000, loop.quit) loop.exec_() log_text = win.log_text.toPlainText() report("L3", "日志含 Router 收到 step1 请求", "收到 step1 请求" in log_text, "→ 找 ['Router] 收到 step1 请求']") report("L3", "日志含 Service✗ 错误状态", "[Service✗]" in log_text, "→ 找 ['[Service✗]']") report("L3", "日志含 mode=ndwi 标记", "mode=ndwi" in log_text, "→ 找 ['mode=ndwi']") # ---- 切到 step2 占位 ---- win.nav_list.setCurrentRow(1) view_step2 = win._views.get("step2") report("L3", "_views['step2'] 是占位 PlaceholderView", type(view_step2).__name__ == "PlaceholderView", f"type={type(view_step2).__name__}") # 清掉旧日志,重置 worker win.log_text.clear() view_step2.run_btn.click() loop = QEventLoop() QTimer.singleShot(1500, loop.quit) loop.exec_() log_text = win.log_text.toPlainText() report("L3", "step2 占位日志含 Router 收到 step2 请求", "收到 step2 请求" in log_text) report("L3", "step2 占位日志含 Service… not_implemented", "not_implemented" in log_text, f"log 片段:{log_text[-200:]!r}") win.close() # ========================================================================= # 入口 # ========================================================================= def main(): smoke_service() smoke_view() smoke_e2e() print("\n" + "=" * 70) total = len(results) passed = sum(1 for r in results if r[2]) print(f"汇总:{passed}/{total} 通过") if passed != total: print("\n失败明细:") for layer, name, ok, detail in results: if not ok: print(f" ❌ [{layer}] {name} —— {detail}") sys.exit(1) else: print("🎉 全部通过") if __name__ == "__main__": main()