343 lines
13 KiB
Python
343 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
三层冒烟:端到端模块化新架构(src/new/)
|
||
|
||
Layer 1 service 单元:execute_step1 在各种错误路径返回正确 dict
|
||
Layer 2 view offscreen:Step1View UI 行为符合契约(不依赖 main_view)
|
||
Layer 3 end-to-end:MainView 创建 → 切到 step1 → 点击 → 日志验证
|
||
|
||
运行方式(项目根)::
|
||
|
||
python _smoke_new_arch.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import sys
|
||
import tempfile
|
||
from pathlib import Path
|
||
|
||
# 把项目根加进 sys.path(脚本可能直接从根目录运行)
|
||
_PROJECT_ROOT = Path(__file__).resolve().parent
|
||
if str(_PROJECT_ROOT) not in sys.path:
|
||
sys.path.insert(0, str(_PROJECT_ROOT))
|
||
|
||
os.environ.setdefault("QT_QPA_PLATFORM", "offscreen")
|
||
|
||
# cmd.exe 默认 GBK 会让中文 / emoji 爆 UnicodeEncodeError
|
||
# 强制 stdout / stderr 走 utf-8(仅在 stdout 有 reconfigure 接口时生效)
|
||
for _stream in (sys.stdout, sys.stderr):
|
||
if hasattr(_stream, "reconfigure"):
|
||
try:
|
||
_stream.reconfigure(encoding="utf-8", errors="replace")
|
||
except Exception: # noqa: BLE001
|
||
pass
|
||
|
||
PASS = "✅"
|
||
FAIL = "❌"
|
||
results: list[tuple[str, bool, str]] = []
|
||
|
||
|
||
def report(layer: str, name: str, ok: bool, detail: str = ""):
|
||
tag = PASS if ok else FAIL
|
||
results.append((layer, name, ok, detail))
|
||
print(f"{tag} [{layer}] {name}" + (f" —— {detail}" if detail else ""))
|
||
|
||
|
||
# =========================================================================
|
||
# Layer 1: service 单元测试
|
||
# =========================================================================
|
||
def smoke_service():
|
||
print("\n" + "=" * 70)
|
||
print("Layer 1 —— service 单元测试(src.new.services.step1_service)")
|
||
print("=" * 70)
|
||
|
||
from src.new.services.step1_service import execute_step1, _resolve_mode
|
||
|
||
# ---- _resolve_mode 纯逻辑 ----
|
||
report("L1", "_resolve_mode use_ndwi=True",
|
||
_resolve_mode({"use_ndwi": True, "mask_path": ""}) == "ndwi",
|
||
"→ ndwi")
|
||
report("L1", "_resolve_mode mask=.shp",
|
||
_resolve_mode({"use_ndwi": False, "mask_path": "x.shp"}) == "shp_rasterize",
|
||
"→ shp_rasterize")
|
||
report("L1", "_resolve_mode mask=.dat",
|
||
_resolve_mode({"use_ndwi": False, "mask_path": "x.dat"}) == "existing_mask",
|
||
"→ existing_mask")
|
||
report("L1", "_resolve_mode mask=None",
|
||
_resolve_mode({"use_ndwi": False, "mask_path": None}) == "existing_mask",
|
||
"→ existing_mask (默认)")
|
||
|
||
# ---- execute_step1 错误路径 ----
|
||
# 路径 1:use_ndwi=True 但 img_path 不存在
|
||
r = execute_step1({
|
||
"use_ndwi": True,
|
||
"img_path": "D:/__no_such_image__.dat",
|
||
"ndwi_threshold": 0.4,
|
||
})
|
||
report("L1", "execute_step1 NDWI + img_path 不存在 → status=error",
|
||
r.get("status") == "error",
|
||
f"status={r.get('status')}, message={r.get('message')!r}")
|
||
report("L1", "execute_step1 NDWI + img_path 不存在 → mode=ndwi",
|
||
r.get("mode") == "ndwi",
|
||
f"mode={r.get('mode')}")
|
||
|
||
# 路径 2:use_ndwi=False 但 mask_path 不存在
|
||
r = execute_step1({
|
||
"use_ndwi": False,
|
||
"mask_path": "D:/__no_such_mask__.dat",
|
||
"img_path": "D:/__no_such_image__.dat",
|
||
})
|
||
report("L1", "execute_step1 existing mask 不存在 → status=error",
|
||
r.get("status") == "error",
|
||
f"status={r.get('status')}, message={r.get('message')!r}")
|
||
report("L1", "execute_step1 existing mask 不存在 → mode=existing_mask",
|
||
r.get("mode") == "existing_mask",
|
||
f"mode={r.get('mode')}")
|
||
|
||
# 路径 3:use_ndwi=False + shp 但 img_path 缺
|
||
r = execute_step1({
|
||
"use_ndwi": False,
|
||
"mask_path": "D:/__no_such_mask__.shp",
|
||
# 故意不传 img_path
|
||
})
|
||
report("L1", "execute_step1 shp 但 img_path 缺 → status=error",
|
||
r.get("status") == "error",
|
||
f"status={r.get('status')}, message={r.get('message')!r}")
|
||
report("L1", "execute_step1 shp 但 img_path 缺 → mode=shp_rasterize",
|
||
r.get("mode") == "shp_rasterize",
|
||
f"mode={r.get('mode')}")
|
||
|
||
# 路径 4:work_dir 注入后不会被强制转 str 报错
|
||
r = execute_step1({
|
||
"use_ndwi": True,
|
||
"img_path": "D:/__no_such_image__.dat",
|
||
"work_dir": "D:/workspace",
|
||
})
|
||
report("L1", "execute_step1 接受 work_dir 注入(不会抛)",
|
||
isinstance(r, dict) and "status" in r,
|
||
f"status={r.get('status')}")
|
||
|
||
|
||
# =========================================================================
|
||
# Layer 2: view offscreen 测试
|
||
# =========================================================================
|
||
def smoke_view():
|
||
print("\n" + "=" * 70)
|
||
print("Layer 2 —— view offscreen 测试(src.new.views.step1_view)")
|
||
print("=" * 70)
|
||
|
||
from PyQt5.QtWidgets import QApplication
|
||
app = QApplication.instance() or QApplication(sys.argv)
|
||
|
||
from src.new.views.step1_view import Step1View
|
||
from src.new.core.base_view import BaseView
|
||
|
||
# ---- 类契约 ----
|
||
report("L2", "Step1View 继承 BaseView",
|
||
issubclass(Step1View, BaseView),
|
||
"MRO[0] == 'Step1View', MRO[1] == 'BaseView'")
|
||
|
||
# ---- 实例化 ----
|
||
view = Step1View()
|
||
report("L2", "Step1View 实例化成功(无异常)",
|
||
view is not None,
|
||
f"view={view!r}")
|
||
|
||
# ---- 9 个子控件非空 ----
|
||
attrs = [
|
||
"use_existing_radio", "use_ndwi_radio",
|
||
"mask_file", "img_file",
|
||
"ndwi_group", "ndwi_threshold",
|
||
"output_file", "hint" if hasattr(view, "hint") else "enable_checkbox",
|
||
"enable_checkbox", "run_btn",
|
||
]
|
||
for a in attrs:
|
||
if a == "hint" and not hasattr(view, a):
|
||
continue
|
||
report("L2", f"子控件 {a} 非空", getattr(view, a) is not None)
|
||
|
||
# ---- 默认状态:existing 模式 ----
|
||
report("L2", "默认 existing 模式:mask_file.isHidden() == False",
|
||
view.mask_file.isHidden() is False)
|
||
report("L2", "默认 existing 模式:ndwi_group.isHidden() == True",
|
||
view.ndwi_group.isHidden() is True)
|
||
report("L2", "默认 existing 模式:output_file.isHidden() == True",
|
||
view.output_file.isHidden() is True)
|
||
|
||
# ---- 切换到 NDWI 模式 ----
|
||
view.use_ndwi_radio.setChecked(True)
|
||
report("L2", "NDWI 模式:mask_file.isHidden() == True",
|
||
view.mask_file.isHidden() is True)
|
||
report("L2", "NDWI 模式:ndwi_group.isHidden() == False",
|
||
view.ndwi_group.isHidden() is False)
|
||
report("L2", "NDWI 模式:output_file.isHidden() == False",
|
||
view.output_file.isHidden() is False)
|
||
|
||
# ---- update_work_directory 触发自动填充(NDWI 模式下)----
|
||
tmpdir = tempfile.mkdtemp(prefix="wqgui_smoke_").replace("\\", "/")
|
||
view.update_work_directory(tmpdir)
|
||
expected = f"{tmpdir}/1_water_mask/water_mask_out.dat"
|
||
actual = view.output_file.get_path()
|
||
report("L2", "NDWI 模式 + update_work_directory 自动填输出路径",
|
||
actual == expected,
|
||
f"期望={expected!r}, 实际={actual!r}")
|
||
|
||
# ---- get_config 双模式断言 ----
|
||
# NDWI 模式
|
||
cfg_ndwi = view.get_config()
|
||
report("L2", "get_config NDWI 模式 use_ndwi=True",
|
||
cfg_ndwi.get("use_ndwi") is True)
|
||
report("L2", "get_config NDWI 模式 mask_path is None",
|
||
cfg_ndwi.get("mask_path") is None)
|
||
report("L2", "get_config NDWI 模式 output_path 已填",
|
||
cfg_ndwi.get("output_path") == expected,
|
||
f"output_path={cfg_ndwi.get('output_path')!r}")
|
||
|
||
# existing 模式
|
||
view.use_existing_radio.setChecked(True)
|
||
cfg_existing = view.get_config()
|
||
report("L2", "get_config existing 模式 use_ndwi=False",
|
||
cfg_existing.get("use_ndwi") is False)
|
||
report("L2", "get_config existing 模式 mask_path 来自 mask_file(默认空串)",
|
||
cfg_existing.get("mask_path") == "",
|
||
f"mask_path={cfg_existing.get('mask_path')!r}")
|
||
report("L2", "get_config existing 模式 output_path is None",
|
||
cfg_existing.get("output_path") is None)
|
||
|
||
# ---- set_config 回灌 ----
|
||
view.set_config({
|
||
"use_ndwi": True,
|
||
"ndwi_threshold": 0.55,
|
||
"img_path": "D:/test/img.dat",
|
||
"output_path": "D:/test/mask.dat",
|
||
})
|
||
report("L2", "set_config 后 use_ndwi=True",
|
||
view.use_ndwi_radio.isChecked() is True)
|
||
report("L2", "set_config 后 ndwi_threshold=0.55",
|
||
abs(view.ndwi_threshold.value() - 0.55) < 1e-6,
|
||
f"实际={view.ndwi_threshold.value()}")
|
||
report("L2", "set_config 后 img_file 路径",
|
||
view.img_file.get_path() == "D:/test/img.dat",
|
||
f"实际={view.img_file.get_path()!r}")
|
||
|
||
# ---- update_from_config 双字段 ----
|
||
view.update_from_config(tmpdir, pipeline={"k": "v"})
|
||
report("L2", "update_from_config work_dir 已缓存",
|
||
view.work_dir == tmpdir,
|
||
f"work_dir={view.work_dir!r}")
|
||
report("L2", "update_from_config pipeline 已缓存",
|
||
view.pipeline == {"k": "v"},
|
||
f"pipeline={view.pipeline!r}")
|
||
|
||
# ---- _on_run_clicked 父链缺失时 RuntimeError 预期 ----
|
||
# 此时 view.parent() 是 None(root 级 QWidget),必然找不到 run_single_step
|
||
try:
|
||
view._on_run_clicked()
|
||
report("L2", "_on_run_clicked 父链缺失应抛 RuntimeError", False, "未抛异常")
|
||
except RuntimeError as e:
|
||
report("L2", "_on_run_clicked 父链缺失抛 RuntimeError", True, str(e)[:80])
|
||
|
||
view.deleteLater()
|
||
|
||
|
||
# =========================================================================
|
||
# Layer 3: end-to-end 测试
|
||
# =========================================================================
|
||
def smoke_e2e():
|
||
print("\n" + "=" * 70)
|
||
print("Layer 3 —— end-to-end(MainView 路由 + TaskWorker 调度)")
|
||
print("=" * 70)
|
||
|
||
from PyQt5.QtCore import QEventLoop, QTimer
|
||
from PyQt5.QtWidgets import QApplication
|
||
app = QApplication.instance() or QApplication(sys.argv)
|
||
|
||
from src.new.main_view import MainView, ROUTES
|
||
|
||
win = MainView()
|
||
report("L3", "MainView 实例化成功", win is not None)
|
||
report("L3", "ROUTES 共 13 条", len(ROUTES) == 13, f"实际={len(ROUTES)}")
|
||
report("L3", "nav_list 项数 == 13", win.nav_list.count() == 13,
|
||
f"count={win.nav_list.count()}")
|
||
report("L3", "stacked 子部件数 == 13", win.stacked.count() == 13,
|
||
f"count={win.stacked.count()}")
|
||
report("L3", "默认 currentRow == 0(step1)", win.nav_list.currentRow() == 0)
|
||
|
||
# ---- 切到 step1 ----
|
||
win.nav_list.setCurrentRow(0)
|
||
view_step1 = win._views.get("step1")
|
||
report("L3", "_views['step1'] 是真实 Step1View",
|
||
type(view_step1).__name__ == "Step1View",
|
||
f"type={type(view_step1).__name__}")
|
||
|
||
# ---- 模拟点击 step1.run_btn(NDWI 模式 + 不存在的 img_path → status=error)----
|
||
view_step1.use_ndwi_radio.setChecked(True)
|
||
view_step1.img_file.set_path("D:/__no_such_img__.dat")
|
||
view_step1.run_btn.click()
|
||
|
||
# 等 TaskWorker 跑完(offscreen + QThread 信号回到主线程)
|
||
loop = QEventLoop()
|
||
QTimer.singleShot(2000, loop.quit)
|
||
loop.exec_()
|
||
|
||
log_text = win.log_text.toPlainText()
|
||
report("L3", "日志含 Router 收到 step1 请求",
|
||
"收到 step1 请求" in log_text,
|
||
"→ 找 ['Router] 收到 step1 请求']")
|
||
report("L3", "日志含 Service✗ 错误状态",
|
||
"[Service✗]" in log_text,
|
||
"→ 找 ['[Service✗]']")
|
||
report("L3", "日志含 mode=ndwi 标记",
|
||
"mode=ndwi" in log_text,
|
||
"→ 找 ['mode=ndwi']")
|
||
|
||
# ---- 切到 step2(真实 view,service 仍走占位) ----
|
||
win.nav_list.setCurrentRow(1)
|
||
view_step2 = win._views.get("step2")
|
||
report("L3", "_views['step2'] 是真实 Step2View(已迁移)",
|
||
type(view_step2).__name__ == "Step2View",
|
||
f"type={type(view_step2).__name__}")
|
||
|
||
# 清掉旧日志,重置 worker
|
||
win.log_text.clear()
|
||
view_step2.run_btn.click()
|
||
loop = QEventLoop()
|
||
QTimer.singleShot(1500, loop.quit)
|
||
loop.exec_()
|
||
|
||
log_text = win.log_text.toPlainText()
|
||
report("L3", "step2 真实 view 派发后日志含 Router 收到 step2 请求",
|
||
"收到 step2 请求" in log_text)
|
||
report("L3", "step2 service 走占位:日志含 Service… not_implemented",
|
||
"not_implemented" in log_text,
|
||
f"log 片段:{log_text[-200:]!r}")
|
||
|
||
win.close()
|
||
|
||
|
||
# =========================================================================
|
||
# 入口
|
||
# =========================================================================
|
||
def main():
|
||
smoke_service()
|
||
smoke_view()
|
||
smoke_e2e()
|
||
|
||
print("\n" + "=" * 70)
|
||
total = len(results)
|
||
passed = sum(1 for r in results if r[2])
|
||
print(f"汇总:{passed}/{total} 通过")
|
||
if passed != total:
|
||
print("\n失败明细:")
|
||
for layer, name, ok, detail in results:
|
||
if not ok:
|
||
print(f" ❌ [{layer}] {name} —— {detail}")
|
||
sys.exit(1)
|
||
else:
|
||
print("🎉 全部通过")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |