Files
WQ_GUI/_smoke_new_arch.py

343 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
三层冒烟端到端模块化新架构src/new/
Layer 1 service 单元execute_step1 在各种错误路径返回正确 dict
Layer 2 view offscreenStep1View UI 行为符合契约(不依赖 main_router
Layer 3 end-to-endMainRouter 创建 → 切到 step1 → 点击 → 日志验证
运行方式(项目根)::
python _smoke_new_arch.py
"""
from __future__ import annotations
import os
import sys
import tempfile
from pathlib import Path
# 把项目根加进 sys.path脚本可能直接从根目录运行
_PROJECT_ROOT = Path(__file__).resolve().parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
os.environ.setdefault("QT_QPA_PLATFORM", "offscreen")
# cmd.exe 默认 GBK 会让中文 / emoji 爆 UnicodeEncodeError
# 强制 stdout / stderr 走 utf-8仅在 stdout 有 reconfigure 接口时生效)
for _stream in (sys.stdout, sys.stderr):
if hasattr(_stream, "reconfigure"):
try:
_stream.reconfigure(encoding="utf-8", errors="replace")
except Exception: # noqa: BLE001
pass
PASS = ""
FAIL = ""
results: list[tuple[str, bool, str]] = []
def report(layer: str, name: str, ok: bool, detail: str = ""):
tag = PASS if ok else FAIL
results.append((layer, name, ok, detail))
print(f"{tag} [{layer}] {name}" + (f" —— {detail}" if detail else ""))
# =========================================================================
# Layer 1: service 单元测试
# =========================================================================
def smoke_service():
print("\n" + "=" * 70)
print("Layer 1 —— service 单元测试src.new.services.step1_service")
print("=" * 70)
from src.new.services.step1_service import execute_step1, _resolve_mode
# ---- _resolve_mode 纯逻辑 ----
report("L1", "_resolve_mode use_ndwi=True",
_resolve_mode({"use_ndwi": True, "mask_path": ""}) == "ndwi",
"→ ndwi")
report("L1", "_resolve_mode mask=.shp",
_resolve_mode({"use_ndwi": False, "mask_path": "x.shp"}) == "shp_rasterize",
"→ shp_rasterize")
report("L1", "_resolve_mode mask=.dat",
_resolve_mode({"use_ndwi": False, "mask_path": "x.dat"}) == "existing_mask",
"→ existing_mask")
report("L1", "_resolve_mode mask=None",
_resolve_mode({"use_ndwi": False, "mask_path": None}) == "existing_mask",
"→ existing_mask (默认)")
# ---- execute_step1 错误路径 ----
# 路径 1use_ndwi=True 但 img_path 不存在
r = execute_step1({
"use_ndwi": True,
"img_path": "D:/__no_such_image__.dat",
"ndwi_threshold": 0.4,
})
report("L1", "execute_step1 NDWI + img_path 不存在 → status=error",
r.get("status") == "error",
f"status={r.get('status')}, message={r.get('message')!r}")
report("L1", "execute_step1 NDWI + img_path 不存在 → mode=ndwi",
r.get("mode") == "ndwi",
f"mode={r.get('mode')}")
# 路径 2use_ndwi=False 但 mask_path 不存在
r = execute_step1({
"use_ndwi": False,
"mask_path": "D:/__no_such_mask__.dat",
"img_path": "D:/__no_such_image__.dat",
})
report("L1", "execute_step1 existing mask 不存在 → status=error",
r.get("status") == "error",
f"status={r.get('status')}, message={r.get('message')!r}")
report("L1", "execute_step1 existing mask 不存在 → mode=existing_mask",
r.get("mode") == "existing_mask",
f"mode={r.get('mode')}")
# 路径 3use_ndwi=False + shp 但 img_path 缺
r = execute_step1({
"use_ndwi": False,
"mask_path": "D:/__no_such_mask__.shp",
# 故意不传 img_path
})
report("L1", "execute_step1 shp 但 img_path 缺 → status=error",
r.get("status") == "error",
f"status={r.get('status')}, message={r.get('message')!r}")
report("L1", "execute_step1 shp 但 img_path 缺 → mode=shp_rasterize",
r.get("mode") == "shp_rasterize",
f"mode={r.get('mode')}")
# 路径 4work_dir 注入后不会被强制转 str 报错
r = execute_step1({
"use_ndwi": True,
"img_path": "D:/__no_such_image__.dat",
"work_dir": "D:/workspace",
})
report("L1", "execute_step1 接受 work_dir 注入(不会抛)",
isinstance(r, dict) and "status" in r,
f"status={r.get('status')}")
# =========================================================================
# Layer 2: view offscreen 测试
# =========================================================================
def smoke_view():
print("\n" + "=" * 70)
print("Layer 2 —— view offscreen 测试src.new.views.step1_view")
print("=" * 70)
from PyQt5.QtWidgets import QApplication
app = QApplication.instance() or QApplication(sys.argv)
from src.new.views.step1_view import Step1View
from src.new.core.base_view import BaseView
# ---- 类契约 ----
report("L2", "Step1View 继承 BaseView",
issubclass(Step1View, BaseView),
"MRO[0] == 'Step1View', MRO[1] == 'BaseView'")
# ---- 实例化 ----
view = Step1View()
report("L2", "Step1View 实例化成功(无异常)",
view is not None,
f"view={view!r}")
# ---- 9 个子控件非空 ----
attrs = [
"use_existing_radio", "use_ndwi_radio",
"mask_file", "img_file",
"ndwi_group", "ndwi_threshold",
"output_file", "hint" if hasattr(view, "hint") else "enable_checkbox",
"enable_checkbox", "run_btn",
]
for a in attrs:
if a == "hint" and not hasattr(view, a):
continue
report("L2", f"子控件 {a} 非空", getattr(view, a) is not None)
# ---- 默认状态existing 模式 ----
report("L2", "默认 existing 模式mask_file.isHidden() == False",
view.mask_file.isHidden() is False)
report("L2", "默认 existing 模式ndwi_group.isHidden() == True",
view.ndwi_group.isHidden() is True)
report("L2", "默认 existing 模式output_file.isHidden() == True",
view.output_file.isHidden() is True)
# ---- 切换到 NDWI 模式 ----
view.use_ndwi_radio.setChecked(True)
report("L2", "NDWI 模式mask_file.isHidden() == True",
view.mask_file.isHidden() is True)
report("L2", "NDWI 模式ndwi_group.isHidden() == False",
view.ndwi_group.isHidden() is False)
report("L2", "NDWI 模式output_file.isHidden() == False",
view.output_file.isHidden() is False)
# ---- update_work_directory 触发自动填充NDWI 模式下)----
tmpdir = tempfile.mkdtemp(prefix="wqgui_smoke_").replace("\\", "/")
view.update_work_directory(tmpdir)
expected = f"{tmpdir}/1_water_mask/water_mask_out.dat"
actual = view.output_file.get_path()
report("L2", "NDWI 模式 + update_work_directory 自动填输出路径",
actual == expected,
f"期望={expected!r}, 实际={actual!r}")
# ---- get_config 双模式断言 ----
# NDWI 模式
cfg_ndwi = view.get_config()
report("L2", "get_config NDWI 模式 use_ndwi=True",
cfg_ndwi.get("use_ndwi") is True)
report("L2", "get_config NDWI 模式 mask_path is None",
cfg_ndwi.get("mask_path") is None)
report("L2", "get_config NDWI 模式 output_path 已填",
cfg_ndwi.get("output_path") == expected,
f"output_path={cfg_ndwi.get('output_path')!r}")
# existing 模式
view.use_existing_radio.setChecked(True)
cfg_existing = view.get_config()
report("L2", "get_config existing 模式 use_ndwi=False",
cfg_existing.get("use_ndwi") is False)
report("L2", "get_config existing 模式 mask_path 来自 mask_file默认空串",
cfg_existing.get("mask_path") == "",
f"mask_path={cfg_existing.get('mask_path')!r}")
report("L2", "get_config existing 模式 output_path is None",
cfg_existing.get("output_path") is None)
# ---- set_config 回灌 ----
view.set_config({
"use_ndwi": True,
"ndwi_threshold": 0.55,
"img_path": "D:/test/img.dat",
"output_path": "D:/test/mask.dat",
})
report("L2", "set_config 后 use_ndwi=True",
view.use_ndwi_radio.isChecked() is True)
report("L2", "set_config 后 ndwi_threshold=0.55",
abs(view.ndwi_threshold.value() - 0.55) < 1e-6,
f"实际={view.ndwi_threshold.value()}")
report("L2", "set_config 后 img_file 路径",
view.img_file.get_path() == "D:/test/img.dat",
f"实际={view.img_file.get_path()!r}")
# ---- update_from_config 双字段 ----
view.update_from_config(tmpdir, pipeline={"k": "v"})
report("L2", "update_from_config work_dir 已缓存",
view.work_dir == tmpdir,
f"work_dir={view.work_dir!r}")
report("L2", "update_from_config pipeline 已缓存",
view.pipeline == {"k": "v"},
f"pipeline={view.pipeline!r}")
# ---- _on_run_clicked 父链缺失时 RuntimeError 预期 ----
# 此时 view.parent() 是 Noneroot 级 QWidget必然找不到 run_single_step
try:
view._on_run_clicked()
report("L2", "_on_run_clicked 父链缺失应抛 RuntimeError", False, "未抛异常")
except RuntimeError as e:
report("L2", "_on_run_clicked 父链缺失抛 RuntimeError", True, str(e)[:80])
view.deleteLater()
# =========================================================================
# Layer 3: end-to-end 测试
# =========================================================================
def smoke_e2e():
print("\n" + "=" * 70)
print("Layer 3 —— end-to-endMainRouter 路由 + TaskWorker 调度)")
print("=" * 70)
from PyQt5.QtCore import QEventLoop, QTimer
from PyQt5.QtWidgets import QApplication
app = QApplication.instance() or QApplication(sys.argv)
from src.new.main_router import MainRouter, ROUTES
win = MainRouter()
report("L3", "MainRouter 实例化成功", win is not None)
report("L3", "ROUTES 共 13 条", len(ROUTES) == 13, f"实际={len(ROUTES)}")
report("L3", "nav_list 项数 == 13", win.nav_list.count() == 13,
f"count={win.nav_list.count()}")
report("L3", "stacked 子部件数 == 13", win.stacked.count() == 13,
f"count={win.stacked.count()}")
report("L3", "默认 currentRow == 0step1", win.nav_list.currentRow() == 0)
# ---- 切到 step1 ----
win.nav_list.setCurrentRow(0)
view_step1 = win._views.get("step1")
report("L3", "_views['step1'] 是真实 Step1View",
type(view_step1).__name__ == "Step1View",
f"type={type(view_step1).__name__}")
# ---- 模拟点击 step1.run_btnNDWI 模式 + 不存在的 img_path → status=error----
view_step1.use_ndwi_radio.setChecked(True)
view_step1.img_file.set_path("D:/__no_such_img__.dat")
view_step1.run_btn.click()
# 等 TaskWorker 跑完offscreen + QThread 信号回到主线程)
loop = QEventLoop()
QTimer.singleShot(2000, loop.quit)
loop.exec_()
log_text = win.log_text.toPlainText()
report("L3", "日志含 Router 收到 step1 请求",
"收到 step1 请求" in log_text,
"→ 找 ['Router] 收到 step1 请求']")
report("L3", "日志含 Service✗ 错误状态",
"[Service✗]" in log_text,
"→ 找 ['[Service✗]']")
report("L3", "日志含 mode=ndwi 标记",
"mode=ndwi" in log_text,
"→ 找 ['mode=ndwi']")
# ---- 切到 step2 占位 ----
win.nav_list.setCurrentRow(1)
view_step2 = win._views.get("step2")
report("L3", "_views['step2'] 是占位 PlaceholderView",
type(view_step2).__name__ == "PlaceholderView",
f"type={type(view_step2).__name__}")
# 清掉旧日志,重置 worker
win.log_text.clear()
view_step2.run_btn.click()
loop = QEventLoop()
QTimer.singleShot(1500, loop.quit)
loop.exec_()
log_text = win.log_text.toPlainText()
report("L3", "step2 占位日志含 Router 收到 step2 请求",
"收到 step2 请求" in log_text)
report("L3", "step2 占位日志含 Service… not_implemented",
"not_implemented" in log_text,
f"log 片段:{log_text[-200:]!r}")
win.close()
# =========================================================================
# 入口
# =========================================================================
def main():
smoke_service()
smoke_view()
smoke_e2e()
print("\n" + "=" * 70)
total = len(results)
passed = sum(1 for r in results if r[2])
print(f"汇总:{passed}/{total} 通过")
if passed != total:
print("\n失败明细:")
for layer, name, ok, detail in results:
if not ok:
print(f" ❌ [{layer}] {name} —— {detail}")
sys.exit(1)
else:
print("🎉 全部通过")
if __name__ == "__main__":
main()