feat(step9): 新增浓度反演模块及 GUI 面板

This commit is contained in:
DXC
2026-06-09 17:55:25 +08:00
parent 4ca90b0e79
commit c3cc2ef77e
6 changed files with 1098 additions and 92 deletions

View File

@ -16,6 +16,15 @@ from src.core.algorithms.glint_detection.detectors import (
remove_shoreline_buffer,
calculate_glint_mask,
)
from src.core.algorithms.qaa.qaas_baseline import QAABaselineSolver
from src.core.algorithms.concentration_inversion import (
ChlorophyllInversion,
CDOMInversion,
TurbidityInversion,
TotalNitrogenInversion,
TotalPhosphorusInversion,
ConcentrationPipeline,
)
__all__ = [
# 插值
@ -33,4 +42,13 @@ __all__ = [
'create_shoreline_buffer',
'remove_shoreline_buffer',
'calculate_glint_mask',
# QAA
'QAABaselineSolver',
# 浓度反演
'ChlorophyllInversion',
'CDOMInversion',
'TurbidityInversion',
'TotalNitrogenInversion',
'TotalPhosphorusInversion',
'ConcentrationPipeline',
]

View File

@ -0,0 +1,662 @@
# -*- coding: utf-8 -*-
"""
水质浓度反演模块
基于 QAA Step 8 输出的光谱吸收/散射系数 (a_lambda, bb_lambda)
通过生物光学模型反演水质参数浓度。
主要反演目标:
- 叶绿素 A (Chl-a)675nm 吸收峰法
- 浊度 (Turbidity):后向散射系数法
- CDOM 吸收系数 a_dg(440):指数衰减法
- 总氮 (TN) / 总磷 (TP):光学代理回归框架
参考:
- Lee, Z.P. et al. (2002/2010/2014) QAA 系列
- Bricaud, A. et al. (1998) Limnol. Oceanogr. — 叶绿素比吸收系数
- Carder, K.L. et al. (1999) Marine Technology Society — CDOM 指数衰减
"""
from __future__ import annotations
import os
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
# ------------------------------------------------------------------
# 公共系数表(来自 Bricaud et al. 1998 等文献,内陆水体典型值)
# ------------------------------------------------------------------
# 叶绿素比吸收系数 a*_ph(675) 单位m²/mg
# 随叶绿素浓度范围变化Bricaud 经验值
CHLA_SPECIFIC_ABSORPTION: Dict[str, float] = {
"low": 0.055, # 寡营养水体Chla < 5 mg/m³
"medium": 0.040, # 中营养Chla 5-30 mg/m³
"high": 0.028, # 富营养Chla 30-100 mg/m³
"bloom": 0.020, # 藻华Chla > 100 mg/m³
}
# CDOM 指数衰减斜率 S单位nm⁻¹内陆水体典型范围 0.010-0.025
CDOM_S_LOOKUP: Dict[str, float] = {
"low_turbidity": 0.010, # 清澈寡营养
"medium_turbidity": 0.015, # 中等浊度
"high_turbidity": 0.020, # 高浊度富营养
"bloom": 0.025, # 藻华主导
}
# 纯水吸收系数表400-800nmBabin et al. 2003 简化值单位m⁻¹
PURE_WATER_A: Dict[int, float] = {
400: 0.0064, 410: 0.0066, 420: 0.0068, 430: 0.0072,
440: 0.0080, 450: 0.0092, 460: 0.0105, 470: 0.0120,
480: 0.0135, 490: 0.0155, 500: 0.0175, 510: 0.0200,
520: 0.0230, 530: 0.0270, 540: 0.0315, 550: 0.0370,
560: 0.0435, 570: 0.0510, 580: 0.0600, 590: 0.0710,
600: 0.0830, 610: 0.0960, 620: 0.1110, 630: 0.1280,
640: 0.1470, 650: 0.1680, 660: 0.1920, 670: 0.2180,
675: 0.2450, 680: 0.2750, 690: 0.3100, 700: 0.3500,
710: 0.3950, 720: 0.4450, 730: 0.5000, 740: 0.5600,
750: 0.6250, 760: 0.6950, 770: 0.7700, 780: 0.8500,
790: 0.9300, 800: 1.0100,
}
def _interp_pure_water_a(wavelength: float) -> float:
"""线性插值获取纯水吸收系数"""
wl_int = {k for k in PURE_WATER_A if k <= int(wavelength)}
if not wl_int:
return PURE_WATER_A[min(PURE_WATER_A.keys())]
k_low = max(wl_int)
k_high = min({k for k in PURE_WATER_A if k >= int(wavelength)} or {k_low})
if k_low == k_high:
return float(PURE_WATER_A[k_low])
w = (wavelength - k_low) / (k_high - k_low)
return float(PURE_WATER_A[k_low]) * (1 - w) + float(PURE_WATER_A[k_high]) * w
# ------------------------------------------------------------------
# 叶绿素反演器
# ------------------------------------------------------------------
class ChlorophyllInversion:
"""
基于 675nm 吸收峰法的叶绿素 A 浓度反演。
原理:
总吸收 a(675) = a_w(675) + a_ph(675) + a_dg(675)
其中 a_ph(675) 是叶绿素特征吸收峰,
a_dg(675) ≈ a_dg(440) * exp(-S * (675-440))
步骤:
1. 从 a(λ) 减去纯水吸收 a_w(λ)
2. 用线性基线法估算 a_dg(675)baseline(675) = mean[a(665), a(685)]
3. a_ph(675) = a(675) - a_w(675) - baseline(675)
4. Chla = a_ph(675) / a*_ph(675)
Parameters
----------
specific_absorption : float, optional
叶绿素比吸收系数 a*_ph(675),单位 m²/mg。
若为 None使用浓度自适应估算逻辑。
lake_case : str, optional
水体类型标识,用于自动选择比吸收系数,
支持 "oligotrophic_clear" / "medium" / "bloom_dominant" / "turbid_mixed"
"""
def __init__(
self,
specific_absorption: Optional[float] = None,
lake_case: Optional[str] = None
):
self.specific_absorption = specific_absorption
self.lake_case = lake_case or "medium"
def run_inversion(
self,
wavelengths: np.ndarray,
a_lambda: np.ndarray,
bb_lambda: Optional[np.ndarray] = None
) -> Dict:
"""
执行叶绿素 A 反演。
Parameters
----------
wavelengths : np.ndarray
波长数组nm形状 (n_bands,)。
a_lambda : np.ndarray
总吸收系数 a(λ),形状 (n_bands,)。
bb_lambda : np.ndarray, optional
后向散射系数(暂未使用,保留扩展接口)。
Returns
-------
dict
包含键:
- chla_mg_m3 : 叶绿素 A 浓度mg/m³
- a_ph_675 : 675nm 处叶绿素吸收m⁻¹
- baseline_675 : 675nm 处 CDOM+NAP 基线m⁻¹
- a_w_675 : 纯水吸收m⁻¹
"""
wavelengths = np.asarray(wavelengths, dtype=np.float64)
a_lambda = np.asarray(a_lambda, dtype=np.float64)
aw_675 = _interp_pure_water_a(675.0)
wl_arr = wavelengths
a_arr = a_lambda
a_665 = float(np.interp(665, wl_arr, a_arr, left=np.nan, right=np.nan))
a_675 = float(np.interp(675, wl_arr, a_arr, left=np.nan, right=np.nan))
a_685 = float(np.interp(685, wl_arr, a_arr, left=np.nan, right=np.nan))
if not np.isfinite(a_665) or not np.isfinite(a_675) or not np.isfinite(a_685):
return {
"chla_mg_m3": np.nan,
"a_ph_675": np.nan,
"baseline_675": np.nan,
"a_w_675": aw_675,
"warning": "675nm 波段缺失,无法进行叶绿素反演",
}
baseline_675 = (a_665 + a_685) / 2.0
a_ph_675 = max(a_675 - aw_675 - baseline_675, 0.0)
if self.specific_absorption is not None:
a_star = self.specific_absorption
else:
a_star = self._adaptive_specific_absorption(a_ph_675)
if a_star <= 0:
return {
"chla_mg_m3": np.nan,
"a_ph_675": a_ph_675,
"baseline_675": baseline_675,
"a_w_675": aw_675,
"warning": "比吸收系数为非正值",
}
chla = a_ph_675 / a_star
return {
"chla_mg_m3": chla,
"a_ph_675": a_ph_675,
"baseline_675": baseline_675,
"a_w_675": aw_675,
}
def _adaptive_specific_absorption(self, a_ph_675: float) -> float:
"""根据 a_ph(675) 量级自适应选择比吸收系数"""
if a_ph_675 < 0.05:
return CHLA_SPECIFIC_ABSORPTION["low"]
elif a_ph_675 < 0.2:
return CHLA_SPECIFIC_ABSORPTION["medium"]
elif a_ph_675 < 0.5:
return CHLA_SPECIFIC_ABSORPTION["high"]
else:
return CHLA_SPECIFIC_ABSORPTION["bloom"]
def invert_to_csv(
self,
input_csv: str,
output_csv: str,
sample_id_col: str = "sample_id"
) -> str:
"""
从 a_lambda_results.csv 批量反演叶绿素并保存结果。
Parameters
----------
input_csv : str
Step 8 输出的 a_lambda_results.csv 路径。
output_csv : str
保存路径。
sample_id_col : str
样本 ID 列名。
Returns
-------
str
输出文件路径。
"""
df = pd.read_csv(input_csv, encoding="utf-8-sig")
df = df.sort_values([sample_id_col, "Wavelength"])
results = []
for sid, group in df.groupby(sample_id_col, sort=False):
wl = group["Wavelength"].values.astype(np.float64)
a = group["a_lambda"].values.astype(np.float64)
res = self.run_inversion(wl, a)
res[sample_id_col] = sid
results.append(res)
out_df = pd.DataFrame(results)
cols = [sample_id_col, "chla_mg_m3", "a_ph_675", "baseline_675", "a_w_675"]
cols = [c for c in cols if c in out_df.columns]
out_df = out_df[cols]
os.makedirs(os.path.dirname(output_csv) or ".", exist_ok=True)
out_df.to_csv(output_csv, index=False, float_format="%.6f")
return output_csv
# ------------------------------------------------------------------
# CDOM 反演器
# ------------------------------------------------------------------
class CDOMInversion:
"""
基于指数衰减模型的 CDOM 吸收系数反演。
原理:
a_dg(λ) = a_dg(λ₀) * exp(-S * (λ - λ₀))
取 λ₀ = 440nm蓝光峰S 由水体类型决定,
通过 a(550) ≈ a_w(550) + a_dg(550) 反推 a_dg(440)。
Parameters
----------
S : float, optional
CDOM 指数衰减斜率nm⁻¹。若为 None根据 lake_case 自动选择。
reference_wavelength : int
参考波长,默认 440nm。
"""
def __init__(
self,
S: Optional[float] = None,
reference_wavelength: int = 440
):
self.S = S
self.ref_wl = reference_wavelength
def run_inversion(
self,
wavelengths: np.ndarray,
a_lambda: np.ndarray
) -> Dict:
"""
执行 CDOM 反演。
Parameters
----------
wavelengths : np.ndarray
波长数组。
a_lambda : np.ndarray
总吸收系数 a(λ)。
Returns
-------
dict
包含键:
- a_dg_440 : 440nm 处 CDOM 吸收m⁻¹
- S : 使用的衰减斜率
"""
wavelengths = np.asarray(wavelengths, dtype=np.float64)
a_lambda = np.asarray(a_lambda, dtype=np.float64)
if self.S is None:
S = CDOM_S_LOOKUP["medium_turbidity"]
else:
S = self.S
a_440 = float(np.interp(440, wavelengths, a_lambda, left=np.nan, right=np.nan))
a_550 = float(np.interp(550, wavelengths, a_lambda, left=np.nan, right=np.nan))
aw_440 = _interp_pure_water_a(440.0)
aw_550 = _interp_pure_water_a(550.0)
a_dg_550 = max(a_550 - aw_550, 0.0)
delta_wl = 550 - self.ref_wl
a_dg_440 = a_dg_550 * np.exp(S * delta_wl)
return {
"a_dg_440": a_dg_440,
"a_dg_550": a_dg_550,
"S": S,
}
def invert_to_csv(
self,
input_csv: str,
output_csv: str,
sample_id_col: str = "sample_id"
) -> str:
"""从 a_lambda_results.csv 批量反演 CDOM 并保存结果。"""
df = pd.read_csv(input_csv, encoding="utf-8-sig")
df = df.sort_values([sample_id_col, "Wavelength"])
results = []
for sid, group in df.groupby(sample_id_col, sort=False):
wl = group["Wavelength"].values.astype(np.float64)
a = group["a_lambda"].values.astype(np.float64)
res = self.run_inversion(wl, a)
res[sample_id_col] = sid
results.append(res)
out_df = pd.DataFrame(results)
cols = [sample_id_col, "a_dg_440", "a_dg_550", "S"]
cols = [c for c in cols if c in out_df.columns]
out_df = out_df[cols]
os.makedirs(os.path.dirname(output_csv) or ".", exist_ok=True)
out_df.to_csv(output_csv, index=False, float_format="%.6f")
return output_csv
# ------------------------------------------------------------------
# 浊度反演器
# ------------------------------------------------------------------
class TurbidityInversion:
"""
基于后向散射系数的光学浊度反演。
原理(简化模型):
Turbidity (NTU) ≈ k * b_b(550)
其中 b_b(550) 是 550nm 处的后向散射系数,
k 为经验系数(内陆水体典型值 1.0-3.0)。
Parameters
----------
k : float
经验系数。默认值 2.0。
reference_wavelength : int
参考波段,默认 550nm。
"""
def __init__(self, k: float = 2.0, reference_wavelength: int = 550):
self.k = k
self.ref_wl = reference_wavelength
def run_inversion(
self,
wavelengths: np.ndarray,
bb_lambda: np.ndarray
) -> Dict:
"""
执行浊度反演。
Parameters
----------
wavelengths : np.ndarray
波长数组。
bb_lambda : np.ndarray
后向散射系数 b_b(λ)。
Returns
-------
dict
包含键:
- turbidity_ntu : 浊度NTU
- bb_ref : 参考波段处的 b_b 值
"""
wavelengths = np.asarray(wavelengths, dtype=np.float64)
bb_lambda = np.asarray(bb_lambda, dtype=np.float64)
bb_ref = float(np.interp(
self.ref_wl, wavelengths, bb_lambda, left=np.nan, right=np.nan
))
turbidity = self.k * bb_ref
return {
"turbidity_ntu": turbidity,
"bb_ref": bb_ref,
}
def invert_to_csv(
self,
input_csv: str,
output_csv: str,
sample_id_col: str = "sample_id"
) -> str:
"""从 a_lambda_results.csv 批量反演浊度并保存结果。"""
df = pd.read_csv(input_csv, encoding="utf-8-sig")
if "bb_lambda" not in df.columns:
raise ValueError("输入 CSV 中缺少 bb_lambda 列")
df = df.sort_values([sample_id_col, "Wavelength"])
results = []
for sid, group in df.groupby(sample_id_col, sort=False):
wl = group["Wavelength"].values.astype(np.float64)
bb = group["bb_lambda"].values.astype(np.float64)
res = self.run_inversion(wl, bb)
res[sample_id_col] = sid
results.append(res)
out_df = pd.DataFrame(results)
cols = [sample_id_col, "turbidity_ntu", "bb_ref"]
cols = [c for c in cols if c in out_df.columns]
out_df = out_df[cols]
os.makedirs(os.path.dirname(output_csv) or ".", exist_ok=True)
out_df.to_csv(output_csv, index=False, float_format="%.6f")
return output_csv
# ------------------------------------------------------------------
# 总氮 / 总磷反演器(光学代理回归框架)
# ------------------------------------------------------------------
class TotalNitrogenInversion:
"""
总氮 (TN) 光学代理回归模型。
框架说明:
TN 与 Chla 之间通常存在正相关R² ≈ 0.5-0.7
本类提供回归框架,实际系数需由实测数据标定。
公式(线性代理):
TN (mg/L) = α * Chla + β * Turbidity + γ
Parameters
----------
alpha : float
Chla 系数。默认 0.05。
beta : float
浊度系数。默认 0.10。
gamma : float
截距。默认 0.20。
"""
def __init__(
self,
alpha: float = 0.05,
beta: float = 0.10,
gamma: float = 0.20
):
self.alpha = alpha
self.beta = beta
self.gamma = gamma
def run_inversion(
self,
chla_mg_m3: float,
turbidity_ntu: float
) -> Dict:
"""执行总氮反演(光学代理法)。"""
tn = self.alpha * chla_mg_m3 + self.beta * turbidity_ntu + self.gamma
return {"tn_mg_L": tn}
def calibrate(
self,
samples: List[Dict]
) -> None:
"""
用实测样本标定回归系数。
Parameters
----------
samples : list[dict]
样本列表,每项包含 'chla', 'turbidity', 'tn' 键。
"""
try:
import numpy as np
X = np.array([[s["chla"], s["turbidity"]] for s in samples])
y = np.array([s["tn"] for s in samples])
coeffs, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
self.alpha, self.beta = coeffs
self.gamma = float(np.mean(y - self.alpha * X[:, 0] - self.beta * X[:, 1]))
except Exception as e:
raise RuntimeError(f"标定失败: {e}")
class TotalPhosphorusInversion:
"""
总磷 (TP) 光学代理回归模型。
框架说明:
TP 与 Chla / 浊度均相关(湖泊富营养化阶段尤为明显),
提供双变量线性回归框架,实际系数需由实测数据标定。
公式(线性代理):
TP (mg/L) = α * Chla + β * Turbidity + γ
Parameters
----------
alpha : float
Chla 系数。默认 0.002。
beta : float
浊度系数。默认 0.005。
gamma : float
截距。默认 0.010。
"""
def __init__(
self,
alpha: float = 0.002,
beta: float = 0.005,
gamma: float = 0.010
):
self.alpha = alpha
self.beta = beta
self.gamma = gamma
def run_inversion(
self,
chla_mg_m3: float,
turbidity_ntu: float
) -> Dict:
"""执行总磷反演(光学代理法)。"""
tp = self.alpha * chla_mg_m3 + self.beta * turbidity_ntu + self.gamma
return {"tp_mg_L": tp}
def calibrate(
self,
samples: List[Dict]
) -> None:
"""用实测样本标定回归系数。"""
try:
import numpy as np
X = np.array([[s["chla"], s["turbidity"]] for s in samples])
y = np.array([s["tp"] for s in samples])
coeffs, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
self.alpha, self.beta = coeffs
self.gamma = float(np.mean(y - self.alpha * X[:, 0] - self.beta * X[:, 1]))
except Exception as e:
raise RuntimeError(f"标定失败: {e}")
# ------------------------------------------------------------------
# 一站式浓度反演流水线
# ------------------------------------------------------------------
class ConcentrationPipeline:
"""
整合 Chlorophyll / CDOM / Turbidity / TN / TP 反演的一站式流水线。
接收 Step 8 输出的 a_lambda_results.csv
输出 final_concentrations.csv含所有水质参数浓度列
Parameters
----------
lake_case : str, optional
水体类型,用于 Chla 比吸收系数自适应选择。
S_cdom : float, optional
CDOM 衰减斜率(若为 None自动选择
k_turbidity : float
浊度经验系数。
tn_params : dict, optional
总氮反演初始参数。
tp_params : dict, optional
总磷反演初始参数。
"""
def __init__(
self,
lake_case: str = "medium",
S_cdom: Optional[float] = None,
k_turbidity: float = 2.0,
tn_params: Optional[Dict] = None,
tp_params: Optional[Dict] = None,
):
self.lake_case = lake_case
self.chla_inv = ChlorophyllInversion(lake_case=lake_case)
self.cdom_inv = CDOMInversion(S=S_cdom)
self.turb_inv = TurbidityInversion(k=k_turbidity)
self.tn_inv = TotalNitrogenInversion(**(tn_params or {}))
self.tp_inv = TotalPhosphorusInversion(**(tp_params or {}))
def run_pipeline(
self,
input_csv: str,
output_csv: str,
sample_id_col: str = "sample_id"
) -> str:
"""
执行完整浓度反演流水线。
Parameters
----------
input_csv : str
Step 8 输出的 a_lambda_results.csv 路径。
output_csv : str
输出 final_concentrations.csv 路径。
sample_id_col : str
样本 ID 列名。
Returns
-------
str
输出文件路径。
"""
df = pd.read_csv(input_csv, encoding="utf-8-sig")
if "bb_lambda" not in df.columns:
df["bb_lambda"] = np.nan
df = df.sort_values([sample_id_col, "Wavelength"])
results = []
for sid, group in df.groupby(sample_id_col, sort=False):
wl = group["Wavelength"].values.astype(np.float64)
a = group["a_lambda"].values.astype(np.float64)
bb = group["bb_lambda"].values.astype(np.float64) \
if "bb_lambda" in group.columns and group["bb_lambda"].notna().any() \
else None
chla_res = self.chla_inv.run_inversion(wl, a)
cdom_res = self.cdom_inv.run_inversion(wl, a)
if bb is not None and np.any(np.isfinite(bb)):
turb_res = self.turb_inv.run_inversion(wl, bb)
else:
turb_res = {"turbidity_ntu": np.nan, "bb_ref": np.nan}
chla_val = chla_res.get("chla_mg_m3", np.nan)
turb_val = turb_res.get("turbidity_ntu", np.nan)
tn_res = self.tn_inv.run_inversion(chla_val, turb_val)
tp_res = self.tp_inv.run_inversion(chla_val, turb_val)
row = {
sample_id_col: sid,
"Chla_mg_m3": chla_val,
"a_ph_675_m1": chla_res.get("a_ph_675", np.nan),
"CDOM_a_dg_440_m1": cdom_res.get("a_dg_440", np.nan),
"Turbidity_NTU": turb_val,
"TN_mg_L": tn_res.get("tn_mg_L", np.nan),
"TP_mg_L": tp_res.get("tp_mg_L", np.nan),
}
results.append(row)
out_df = pd.DataFrame(results)
os.makedirs(os.path.dirname(output_csv) or ".", exist_ok=True)
out_df.to_csv(output_csv, index=False, float_format="%.6f")
return output_csv

View File

@ -657,7 +657,7 @@ class WaterQualityInversionPipeline:
self._notify("completed", f"训练光谱数据已保存: {result}")
return result
def step8_water_quality_indices(self,
def step6_water_quality_indices(self,
training_csv_path: Optional[str] = None,
formula_csv_file: Optional[str] = None,
formula_names: Optional[List[str]] = None,
@ -743,7 +743,116 @@ class WaterQualityInversionPipeline:
self._record_step_time("步骤6: 训练机器学习模型", 0, 0)
self._notify("completed", f"模型训练完成,结果保存在: {result}")
return result
def step8_qaa_inversion(self, **config):
"""步骤8: QAA 物理推导(非经验模型)"""
import numpy as np
import pandas as pd
from src.core.algorithms.qaa import QAABaselineSolver
from src.utils.water_owt_config import get_lambda_0
qaa_cfg = config.get('step8_qaa', {})
lake_name = qaa_cfg.get('lake_name', 'Unknown')
lambda_0 = qaa_cfg.get('lambda_0', get_lambda_0(lake_name))
output_dir = os.path.join(self.work_dir, "8_QAA_Inversion")
os.makedirs(output_dir, exist_ok=True)
output_path = qaa_cfg.get('output_path') or os.path.join(output_dir, "a_lambda_results.csv")
spectrum_csv = qaa_cfg.get('spectrum_csv_path')
if not spectrum_csv:
spectrum_csv = config.get('training_csv_path')
if not spectrum_csv or not os.path.exists(spectrum_csv):
# 回退:扫描 work_dir 下 step5 的产物目录,找第一个 .csv
fallback_candidates = []
step5_dir = os.path.join(self.work_dir, "5_Training_Spectra")
if os.path.isdir(step5_dir):
for f in sorted(os.listdir(step5_dir)):
if f.lower().endswith('.csv'):
fallback_candidates.append(os.path.join(step5_dir, f))
if fallback_candidates:
spectrum_csv = fallback_candidates[0]
msg = f"[Step 8] spectrum_csv_path 为空,已自动回退到 step5 产物: {spectrum_csv}"
(self.logger.info if hasattr(self, 'logger') else print)(msg)
else:
msg = f"[Step 8] 训练光谱 CSV 不存在或路径为空: {spectrum_csv}"
(self.logger.info if hasattr(self, 'logger') else print)(msg)
return
df = pd.read_csv(spectrum_csv, encoding="utf-8-sig")
col_names = df.columns.tolist()
wavelength_col_idx = None
for i, col in enumerate(col_names):
try:
float(col)
wavelength_col_idx = i
break
except (ValueError, TypeError):
pass
if wavelength_col_idx is None:
msg = "[Step 8] 无法从 CSV 列名中识别波长信息"
(self.logger.info if hasattr(self, 'logger') else print)(msg)
return
wavelengths = np.array([float(c) for c in col_names[wavelength_col_idx:]], dtype=np.float64)
data_matrix = df.iloc[:, wavelength_col_idx:].values.astype(np.float64)
if data_matrix.ndim == 1:
data_matrix = data_matrix[np.newaxis, :]
solver = QAABaselineSolver()
raw_result = solver.run_inversion(wavelengths, data_matrix, lambda_0)
# run_inversion 返回:单样本 → dict多样本 → list[dict]
if isinstance(raw_result, list):
sample_results = raw_result
else:
sample_results = [raw_result]
rows_out = []
for i, sample_result in enumerate(sample_results):
wl_arr = wavelengths
a_arr = sample_result['a_lambda']
bb_arr = sample_result['bb_lambda']
for j, wl in enumerate(wl_arr):
rows_out.append({
'sample_id': f"sample_{i}",
'Wavelength': wl,
'a_lambda': a_arr[j],
'bb_lambda': bb_arr[j],
})
result_df = pd.DataFrame(rows_out)
result_df.to_csv(output_path, index=False, float_format='%.8f')
msg = f"Step 8: QAA 反演完毕,水域={lake_name},λ₀={lambda_0}nm结果保存于: {output_path}"
(self.logger.info if hasattr(self, 'logger') else print)(msg)
def step9_concentration_inversion(self, **config):
"""步骤9: 浓度反演(基于 QAA Step 8 输出的 a_lambda/bb_lambda"""
from src.core.algorithms.concentration_inversion import ConcentrationPipeline
conc_cfg = config.get('step9_concentration', {})
input_csv = conc_cfg.get('input_csv')
output_csv = conc_cfg.get('output_csv')
lake_case = conc_cfg.get('lake_case', 'medium')
if not input_csv or not os.path.exists(input_csv):
msg = f"[Step 9] QAA 结果文件不存在或路径为空: {input_csv}"
(self.logger.info if hasattr(self, 'logger') else print)(msg)
return
if not output_csv:
output_dir = os.path.join(self.work_dir, "9_Concentration")
os.makedirs(output_dir, exist_ok=True)
output_csv = os.path.join(output_dir, "final_concentrations.csv")
pipeline = ConcentrationPipeline(lake_case=lake_case)
result_csv = pipeline.run_pipeline(input_csv, output_csv)
msg = f"Step 9: 浓度反演完毕,结果保存于: {result_csv}"
(self.logger.info if hasattr(self, 'logger') else print)(msg)
def step10_sampling(self, deglint_img_path: Optional[str] = None,
interval: int = 50,
sample_radius: int = 5,
@ -1521,13 +1630,13 @@ class WaterQualityInversionPipeline:
else:
self._notify("步骤5: 光谱提取", "skipped", "未配置")
# 步骤8: 计算水质指数
if 'step8' in config:
self._notify("步骤8: 水质指数计算", "start")
self.step8_water_quality_indices(**config['step8'])
self._notify("步骤8: 水质指数计算", "completed", f"(输出: {self.indices_path})")
# 步骤6: 计算水质指数
if 'step6' in config:
self._notify("步骤6: 水质光谱指数计算", "start")
self.step6_water_quality_indices(**config['step6'])
self._notify("步骤6: 水质光谱指数计算", "completed", f"(输出: {self.indices_path})")
else:
self._notify("步骤8: 水质指数计算", "skipped", "未配置")
self._notify("步骤6: 水质光谱指数计算", "skipped", "未配置")
# 步骤7: 训练模型
if 'step7' in config:
@ -1713,7 +1822,7 @@ class WaterQualityInversionPipeline:
pipeline_info['step3'] = {'status': 'completed', 'output_file': str(self.deglint_img_path) if self.deglint_img_path else 'N/A'}
pipeline_info['step4'] = {'status': 'completed', 'output_file': str(self.processed_csv_path) if self.processed_csv_path else 'N/A'}
pipeline_info['step5'] = {'status': 'completed', 'output_file': str(self.training_csv_path) if self.training_csv_path else 'N/A'}
pipeline_info['step8'] = {'status': 'completed', 'output_file': str(self.indices_path) if self.indices_path else 'N/A'}
pipeline_info['step6'] = {'status': 'completed', 'output_file': str(self.indices_path) if self.indices_path else 'N/A'}
pipeline_info['step7'] = {'status': 'completed', 'output_file': str(self.models_dir)}
pipeline_info['step9'] = {'status': 'completed', 'output_file': str(self.custom_regression_path) if self.custom_regression_path else 'N/A'}
pipeline_info['training_params'] = config.get('step7', {})
@ -2158,7 +2267,7 @@ def main():
# 单步运行时建议显式指定完整流程中可省略将使用步骤2输出的耀斑掩膜
# 'glint_mask_path': r"path/to/severe_glint_area.dat",
},
'step8': {
'step6': {
'formula_csv_file': 'path/to/water_quality_formulas.csv', # 公式CSV文件路径
'formula_names': ['Al10SABI', 'TurbBe16RedOverViolet'], # 要计算的公式名称列表
'output_filename': 'water_quality_indices.csv',