Compare commits

...

3 Commits

Author SHA1 Message Date
DXC
170d347e21 内容部分修改 2026-05-11 17:38:29 +08:00
DXC
bf4237b160 feat: keygen_gui 增加永久授权选项 2026-05-11 09:53:02 +08:00
DXC
cf387c40ab feat: 新增离线一机一码授权系统 2026-05-11 09:48:07 +08:00
11 changed files with 1108 additions and 41 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.0 MiB

View File

@ -5,11 +5,12 @@ import sys
def _safe_add(path: str) -> None:
if not path or not os.path.isdir(path):
return
try:
if hasattr(os, "add_dll_directory"):
if hasattr(os, "add_dll_directory"):
try:
os.add_dll_directory(path)
except Exception:
pass
return
except Exception:
pass
try:
os.environ["PATH"] = path + os.pathsep + os.environ.get("PATH", "")
except Exception:
@ -21,5 +22,4 @@ base = getattr(sys, "_MEIPASS", None)
if base:
_safe_add(base)
_safe_add(os.path.join(base, "lib-dynload"))
_safe_add(os.path.join(base, "DLLs"))
_safe_add(os.path.join(base, "DLLs"))

4
src/auth/__init__.py Normal file
View File

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
"""
授权认证模块
"""

223
src/auth/keygen_gui.py Normal file
View File

@ -0,0 +1,223 @@
# -*- coding: utf-8 -*-
"""
Mega Water - 离线授权发卡器 (开发者专用)
生成绑定特定机器码的 .lic 授权文件
"""
import os
import sys
# 确保 src.auth 在 path 中
_current_dir = os.path.dirname(os.path.abspath(__file__))
_project_root = os.path.abspath(os.path.join(_current_dir, "..", ".."))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from PyQt5.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit,
QPushButton, QFileDialog, QMessageBox, QApplication, QDateEdit, QCheckBox
)
from PyQt5.QtCore import Qt, QDate
from src.auth.license_manager import generate_license
# 永久授权的标识日期
PERMANENT_EXPIRY = "2099-12-31"
class LicenseKeygenWindow(QWidget):
"""授权发卡器主窗口"""
def __init__(self):
super().__init__()
self.setWindowTitle("Mega Water - 离线授权发卡器 (开发者专用)")
self.setMinimumSize(640, 360)
self.move(400, 280)
self._default_save_path = os.path.join(_project_root, "license.lic")
self._setup_ui()
def _setup_ui(self):
# ── 全局字体:无衬线,清晰 ──
font_family = "Microsoft YaHei" if sys.platform == "win32" else "Segoe UI"
self.setStyleSheet(f"""
* {{
font-family: {font_family}, 'Segoe UI', sans-serif;
font-size: 11pt;
}}
QLabel#titleLabel {{
font-size: 16pt;
font-weight: bold;
color: #2c3e50;
}}
QLabel#tipLabel {{
font-size: 10pt;
color: #95a5a6;
}}
""")
main_layout = QVBoxLayout()
main_layout.setContentsMargins(45, 40, 45, 40)
main_layout.setSpacing(18)
# ── 标题 ──
title_label = QLabel("离线授权发卡器 (开发者专用)")
title_label.setObjectName("titleLabel")
title_label.setAlignment(Qt.AlignCenter)
main_layout.addWidget(title_label)
# ── 机器码输入行 ──
mc_layout = QHBoxLayout()
mc_layout.setSpacing(12)
mc_label = QLabel("机器码:")
mc_label.setFixedWidth(90)
self.mc_input = QLineEdit()
self.mc_input.setPlaceholderText("粘贴用户发来的 32 位机器码")
self.mc_input.setMinimumHeight(36)
self.mc_input.setMinimumWidth(400)
mc_layout.addWidget(mc_label, 0)
mc_layout.addWidget(self.mc_input, 1)
main_layout.addLayout(mc_layout)
# ── 到期时间选择行 ──
exp_layout = QHBoxLayout()
exp_layout.setSpacing(14)
exp_label = QLabel("到期时间:")
exp_label.setFixedWidth(90)
self.exp_edit = QDateEdit()
self.exp_edit.setCalendarPopup(True)
self.exp_edit.setMinimumHeight(36)
self.exp_edit.setMinimumWidth(160)
self.exp_edit.setDate(QDate.currentDate().addYears(1))
self.perm_check = QCheckBox("永久授权 (不限时)")
self.perm_check.setMinimumHeight(36)
self.perm_check.stateChanged.connect(self._on_perm_changed)
exp_layout.addWidget(exp_label, 0)
exp_layout.addWidget(self.exp_edit, 0)
exp_layout.addWidget(self.perm_check, 0)
exp_layout.addStretch(1)
main_layout.addLayout(exp_layout)
# ── 保存路径行 ──
path_layout = QHBoxLayout()
path_layout.setSpacing(12)
path_label = QLabel("保存路径:")
path_label.setFixedWidth(90)
self.path_input = QLineEdit()
self.path_input.setReadOnly(True)
self.path_input.setMinimumHeight(36)
self.browse_btn = QPushButton("浏览...")
self.browse_btn.setMinimumHeight(36)
self.browse_btn.setFixedWidth(80)
self.browse_btn.clicked.connect(self._on_browse)
path_layout.addWidget(path_label, 0)
path_layout.addWidget(self.path_input, 1)
path_layout.addWidget(self.browse_btn, 0)
main_layout.addLayout(path_layout)
# ── 弹性空间 ──
main_layout.addSpacing(10)
# ── 生成按钮 ──
self.gen_btn = QPushButton("生成授权文件 (.lic)")
self.gen_btn.setMinimumHeight(48)
self.gen_btn.setStyleSheet("""
QPushButton {
background-color: #27ae60;
color: white;
font-size: 13pt;
font-weight: bold;
border: none;
border-radius: 8px;
}
QPushButton:hover {
background-color: #2ecc71;
}
QPushButton:pressed {
background-color: #1e8449;
}
""")
self.gen_btn.clicked.connect(self._on_generate)
main_layout.addWidget(self.gen_btn)
# ── 底部提示 ──
tip_label = QLabel("生成后请将 license.lic 文件发给用户,放置到软件安装目录下即可。")
tip_label.setObjectName("tipLabel")
tip_label.setAlignment(Qt.AlignCenter)
main_layout.addWidget(tip_label)
self.setLayout(main_layout)
def _on_perm_changed(self, state):
"""永久授权复选框状态变化时,联动日期选择器"""
if state == Qt.Checked:
self.exp_edit.setEnabled(False)
else:
self.exp_edit.setEnabled(True)
def _on_browse(self):
"""打开文件对话框选择保存路径"""
path, _ = QFileDialog.getSaveFileName(
self,
"选择授权文件保存位置",
self._default_save_path,
"授权文件 (*.lic)"
)
if path:
if not path.lower().endswith(".lic"):
path += ".lic"
self.path_input.setText(path)
def _on_generate(self):
"""点击生成按钮,调用授权管理器"""
machine_code = self.mc_input.text().strip()
if not machine_code:
QMessageBox.warning(self, "输入错误", "请输入机器码")
return
output_path = self.path_input.text().strip()
if not output_path:
QMessageBox.warning(self, "输入错误", "请设置保存路径")
return
# 根据是否勾选永久授权决定日期
if self.perm_check.isChecked():
expiry_date = PERMANENT_EXPIRY
else:
expiry_date = self.exp_edit.date().toString("yyyy-MM-dd")
ok, msg = generate_license(
machine_code=machine_code,
output_path=output_path,
expiry_date=expiry_date
)
if ok:
QMessageBox.information(
self,
"生成成功",
f"✅ 授权文件已成功生成!\n\n保存路径:\n{output_path}\n\n请将此文件发给用户即可。",
QMessageBox.Ok
)
else:
QMessageBox.critical(
self,
"生成失败",
f"{msg}",
QMessageBox.Ok
)
if __name__ == "__main__":
# ── 高 DPI 自适应(必须放在 QApplication 实例化之前)──
from PyQt5.QtCore import Qt
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True)
QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
app = QApplication(sys.argv)
app.setApplicationName("LicenseKeygen")
window = LicenseKeygenWindow()
window.show()
sys.exit(app.exec_())

255
src/auth/license_dialog.py Normal file
View File

@ -0,0 +1,255 @@
# -*- coding: utf-8 -*-
"""
LicenseDialog - PyQt5 授权拦截弹窗
当授权验证失败时弹出,提示用户导入授权文件。
"""
import os
import sys
import shutil
from pathlib import Path
from PyQt5.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QLabel, QPushButton,
QTextEdit, QFileDialog, QMessageBox, QApplication
)
from PyQt5.QtCore import Qt, QTimer
from PyQt5.QtGui import QFont, QIcon, QGuiApplication
# 导入授权管理器
from src.auth.license_manager import get_machine_code, verify_license, get_license_path
class LicenseDialog(QDialog):
"""
授权验证弹窗
- 显示本机机器码(只读文本框)
- 提供"一键复制"功能
- 提供"导入授权文件"按钮
- 导入成功后提示重启
"""
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("授权验证")
self.setWindowFlags(
Qt.Dialog |
Qt.WindowTitleHint |
Qt.WindowCloseButtonHint
)
self.setModal(True)
self.setMinimumWidth(540)
self._init_ui()
self._load_machine_code()
# 窗口居中
QTimer.singleShot(0, self._center_on_screen)
def _center_on_screen(self):
"""将窗口居中到屏幕"""
screen = QGuiApplication.primaryScreen()
if screen:
geo = screen.geometry()
self.move(
(geo.width() - self.width()) // 2,
(geo.height() - self.height()) // 2
)
def _init_ui(self):
main_layout = QVBoxLayout(self)
main_layout.setContentsMargins(30, 30, 30, 20)
main_layout.setSpacing(16)
# ── 标题区 ──
title_font = QFont("Microsoft YaHei", 14, QFont.Bold)
title_label = QLabel("本软件需要授权方可运行")
title_label.setFont(title_font)
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet("color: #2c3e50;")
main_layout.addWidget(title_label)
# ── 说明文字 ──
hint_label = QLabel(
"请获取授权文件license.lic后导入"
"或联系技术支持获取授权。"
)
hint_label.setAlignment(Qt.AlignCenter)
hint_label.setStyleSheet("color: #7f8c8d; font-size: 12px;")
main_layout.addWidget(hint_label)
# ── 机器码标签 ──
code_label = QLabel("本机机器码(用于申请授权):")
code_label.setStyleSheet("font-weight: bold; color: #34495e;")
main_layout.addWidget(code_label)
# ── 机器码文本框 + 复制按钮 ──
code_layout = QHBoxLayout()
code_layout.setSpacing(8)
self.code_edit = QTextEdit()
self.code_edit.setReadOnly(True)
self.code_edit.setMaximumHeight(72)
self.code_edit.setFont(QFont("Consolas", 13))
self.code_edit.setStyleSheet(
"QTextEdit {"
" background-color: #ecf0f1;"
" border: 1px solid #bdc3c7;"
" border-radius: 4px;"
" padding: 8px;"
" color: #2c3e50;"
"}"
)
code_layout.addWidget(self.code_edit, 1)
copy_btn = QPushButton("复制")
copy_btn.setFixedWidth(72)
copy_btn.setCursor(Qt.PointingHandCursor)
copy_btn.setStyleSheet(
"QPushButton {"
" background-color: #3498db;"
" color: white;"
" border: none;"
" border-radius: 4px;"
" padding: 8px 4px;"
" font-weight: bold;"
"}"
"QPushButton:hover { background-color: #2980b9; }"
"QPushButton:pressed { background-color: #21618c; }"
)
copy_btn.clicked.connect(self._copy_code)
code_layout.addWidget(copy_btn)
main_layout.addLayout(code_layout)
# ── 导入授权文件按钮 ──
import_btn = QPushButton("导入授权文件 (.lic)")
import_btn.setCursor(Qt.PointingHandCursor)
import_btn.setStyleSheet(
"QPushButton {"
" background-color: #27ae60;"
" color: white;"
" border: none;"
" border-radius: 6px;"
" padding: 12px;"
" font-size: 14px;"
" font-weight: bold;"
"}"
"QPushButton:hover { background-color: #229954; }"
"QPushButton:pressed { background-color: #1e8449; }"
)
import_btn.clicked.connect(self._import_license)
main_layout.addWidget(import_btn)
# ── 提示文字 ──
tip_label = QLabel(
"导入后软件将自动重启生效。"
)
tip_label.setAlignment(Qt.AlignCenter)
tip_label.setStyleSheet("color: #95a5a6; font-size: 11px;")
main_layout.addWidget(tip_label)
# ── 取消按钮(退出程序)──
cancel_btn = QPushButton("退出")
cancel_btn.setCursor(Qt.PointingHandCursor)
cancel_btn.setStyleSheet(
"QPushButton {"
" background-color: #95a5a6;"
" color: white;"
" border: none;"
" border-radius: 4px;"
" padding: 8px 20px;"
"}"
"QPushButton:hover { background-color: #7f8c8d; }"
)
cancel_btn.clicked.connect(self._quit_app)
main_layout.addWidget(cancel_btn, 0, Qt.AlignRight)
main_layout.addStretch()
def _load_machine_code(self):
"""读取并显示本机机器码"""
try:
code = get_machine_code(32)
self.code_edit.setPlainText(code)
except Exception as e:
self.code_edit.setPlainText(f"读取失败: {e}")
def _copy_code(self):
"""复制机器码到剪贴板"""
clipboard = QApplication.clipboard()
clipboard.setText(self.code_edit.toPlainText().strip())
# 显示反馈
QMessageBox.information(self, "已复制", "机器码已复制到剪贴板。")
def _import_license(self):
"""打开文件选择对话框,导入 .lic 授权文件"""
file_path, _ = QFileDialog.getOpenFileName(
self,
"选择授权文件",
"",
"授权文件 (*.lic);;所有文件 (*.*)"
)
if not file_path:
return
# 验证授权文件
ok, msg = verify_license(file_path)
if not ok:
QMessageBox.warning(
self,
"授权文件无效",
f"验证失败: {msg}\n\n请确认选择了正确的授权文件。"
)
return
# 复制授权文件到标准路径
dest_path = get_license_path()
try:
dest_dir = os.path.dirname(dest_path)
if dest_dir:
os.makedirs(dest_dir, exist_ok=True)
shutil.copy2(file_path, dest_path)
except OSError as e:
QMessageBox.critical(
self,
"保存失败",
f"无法保存授权文件: {e}"
)
return
# 成功提示,重启程序
reply = QMessageBox.information(
self,
"导入成功",
"授权文件已成功导入。\n\n软件将自动重启以应用授权。"
if False else # 占位,维持下面的逻辑
"授权文件已成功导入。\n软件将自动重启以应用授权。",
QMessageBox.Ok
)
self.accept()
self._restart_app()
def _quit_app(self):
"""退出程序"""
self.reject()
sys.exit(0)
def _restart_app(self):
"""重启程序"""
self.close()
QApplication.quit()
# 延迟重启(确保 QApplication 完全退出)
import subprocess
import sys as _sys
executable = _sys.executable
if getattr(_sys, 'frozen', False):
# PyInstaller 打包环境下
subprocess.Popen([executable] + _sys.argv[1:])
else:
# 开发环境
subprocess.Popen([executable, __file__])

328
src/auth/license_manager.py Normal file
View File

@ -0,0 +1,328 @@
# -*- coding: utf-8 -*-
"""
License Manager - 离线授权管理模块
使用 HMAC-SHA256 + 盐值签名防止篡改
"""
import os
import json
import hmac
import hashlib
import base64
import uuid
import hashlib as _hashlib
import subprocess
import re
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Tuple
# ============================================================
# 第一部分:硬件指纹提取(内嵌 get_machine_code
# ============================================================
def get_cpu_id() -> Optional[str]:
"""读取 CPU 序列号Processor ID"""
try:
if sys.platform == "win32":
result = subprocess.run(
["wmic", "cpu", "get", "ProcessorId"],
capture_output=True,
text=True,
timeout=5,
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
cpu_id = result.stdout.strip().split("\n")[-1].strip()
if cpu_id:
return cpu_id
else:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "Serial" in line or "processor" in line:
cpu_id = line.split(":")[-1].strip()
if cpu_id:
return cpu_id
except Exception:
pass
return None
def get_motherboard_uuid() -> Optional[str]:
"""读取主板 UUIDBaseBoard Serial Number"""
try:
if sys.platform == "win32":
result = subprocess.run(
["wmic", "baseboard", "get", "SerialNumber"],
capture_output=True,
text=True,
timeout=5,
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
board_uuid = result.stdout.strip().split("\n")[-1].strip()
board_uuid = re.sub(r'[^a-zA-Z0-9\-]', '', board_uuid)
if board_uuid and board_uuid not in ("To be filled", "None"):
return board_uuid
else:
result = subprocess.run(
["cat", "/sys/class/dmi/id/product_uuid"],
capture_output=True,
text=True,
timeout=5,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return None
def get_machine_code(code_length: int = 32) -> str:
"""
生成唯一的机器码(硬件指纹)
参数:
code_length: 机器码长度,支持 16/24/32/48/64 位,默认 32 位
返回:
全大写字母+数字的机器码字符串
"""
cpu_id = get_cpu_id() or ""
board_uuid = get_motherboard_uuid() or ""
raw_hardware = f"{cpu_id}-{board_uuid}"
if not raw_hardware.strip("-") or len(raw_hardware) < 8:
try:
machine_name = uuid.gethostname() or ""
mac = ':'.join(re.findall('..', '%012x' % uuid.getnode()))
raw_hardware = f"{machine_name}-{mac}"
except Exception:
raw_hardware = str(uuid.getnode())
raw_hardware = re.sub(r'[^a-zA-Z0-9]', '', raw_hardware)
hash_hex = hashlib.sha256(raw_hardware.encode('utf-8')).hexdigest().upper()
hash_hex = hash_hex.replace('O', 'X').replace('L', 'Y').replace('I', 'Z')
valid_lengths = [16, 24, 32, 48, 64]
if code_length not in valid_lengths:
code_length = 32
return hash_hex[:code_length]
# ============================================================
# 第二部分:授权文件格式与签名机制
# ============================================================
# 开发者密钥(硬编码在软件中,用于验证授权文件)
# 注意:实际部署时建议对密钥进行简单混淆或从外部文件加载
DEVELOPER_SECRET = b"WaterQuality_v1_2025_SecretKey"
LICENSE_VERSION = "1.0"
def _compute_signature(payload_json: str) -> str:
"""
计算 HMAC-SHA256 签名
payload_json: JSON 序列化后的字符串(不含 signature 字段)
"""
sig = hmac.new(
DEVELOPER_SECRET,
payload_json.encode('utf-8'),
hashlib.sha256
).hexdigest().upper()
return sig
def _clean_hash(s: str) -> str:
"""清洗哈希字符串,避免混淆字符"""
return s.replace('O', 'X').replace('L', 'Y').replace('I', 'Z')
# ============================================================
# 第三部分:核心 API
# ============================================================
def get_license_path() -> str:
"""获取授权文件的标准存放路径(程序根目录)"""
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
else:
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
return os.path.join(base_dir, "license.lic")
def verify_license(license_path: Optional[str] = None) -> Tuple[bool, str]:
"""
校验授权文件是否匹配本机硬件指纹。
参数:
license_path: 授权文件路径,默认使用标准路径
返回:
(is_valid, message)
- is_valid=True 表示授权有效
- is_valid=False 表示授权无效message 为具体原因
"""
if license_path is None:
license_path = get_license_path()
# Step 1: 文件是否存在
if not os.path.isfile(license_path):
return False, "授权文件不存在"
# Step 2: 读取并解析 JSON
try:
with open(license_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
lic_data = json.loads(content)
except (json.JSONDecodeError, OSError) as e:
return False, f"授权文件格式错误: {e}"
# Step 3: 校验版本号
version = lic_data.get("version", "")
if version != LICENSE_VERSION:
return False, f"授权文件版本不匹配 (期望 {LICENSE_VERSION})"
# Step 4: 校验过期时间
expiry_str = lic_data.get("expiry", "")
if expiry_str:
try:
expiry_dt = datetime.strptime(expiry_str, "%Y-%m-%d")
if datetime.now() > expiry_dt:
return False, "授权已过期"
except ValueError:
return False, "授权文件日期格式错误"
# Step 5: 提取 payload不含 signature
payload_for_verify = {k: v for k, v in lic_data.items() if k != "signature"}
payload_json = json.dumps(payload_for_verify, sort_keys=True, ensure_ascii=False)
# Step 6: 校验签名完整性(防篡改)
expected_sig = _compute_signature(payload_json)
stored_sig = lic_data.get("signature", "").upper()
if not hmac.compare_digest(expected_sig, stored_sig):
return False, "授权文件签名校验失败(可能被篡改)"
# Step 7: 校验机器码绑定
bound_machine = lic_data.get("machine_code", "")
current_machine = get_machine_code(32)
if not hmac.compare_digest(bound_machine, current_machine):
return False, "机器码不匹配(授权文件与本机不兼容)"
return True, "授权验证通过"
def generate_license(
machine_code: str,
output_path: str,
expiry_date: Optional[str] = None,
product_name: str = "WaterQualityInversion",
max_uses: Optional[int] = None
) -> Tuple[bool, str]:
"""
为指定机器码生成合法的授权文件(供开发者使用)。
参数:
machine_code: 目标机器的机器码32位
output_path: 授权文件输出路径(含文件名,如 "D:/license.lic"
expiry_date: 有效期截止日期,格式 "YYYY-MM-DD",默认永久
product_name: 产品名称
max_uses: 最大使用次数(可选,默认不限制)
返回:
(success, message)
"""
if len(machine_code) not in (16, 24, 32, 48, 64):
return False, f"机器码长度无效(期望 16/24/32/48/64实际 {len(machine_code)}"
# 构建 payload
payload = {
"version": LICENSE_VERSION,
"product": product_name,
"machine_code": machine_code.upper(),
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"expiry": expiry_date or "",
}
if max_uses is not None:
payload["max_uses"] = max_uses
# 计算签名
payload_json = json.dumps(payload, sort_keys=True, ensure_ascii=False)
signature = _compute_signature(payload_json)
# 完整授权文件内容
lic_content = json.dumps({**payload, "signature": signature}, indent=2, ensure_ascii=False)
# 写入文件
try:
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(lic_content)
return True, f"授权文件已生成: {output_path}"
except OSError as e:
return False, f"写入授权文件失败: {e}"
def get_machine_info() -> dict:
"""获取完整机器信息(调试用)"""
return {
"cpu_id": get_cpu_id(),
"motherboard_uuid": get_motherboard_uuid(),
"machine_code_16": get_machine_code(16),
"machine_code_32": get_machine_code(32),
"license_path": get_license_path(),
}
# ============================================================
# 第四部分:便捷入口(支持直接运行)
# ============================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="WaterQuality 授权管理工具")
subparsers = parser.add_subparsers(dest="command", help="子命令")
# 子命令verify
p_verify = subparsers.add_parser("verify", help="验证本机授权")
p_verify.add_argument("-f", "--file", default=None, help="授权文件路径")
# 子命令gen / generate
p_gen = subparsers.add_parser("generate", help="为指定机器码生成授权文件")
p_gen.add_argument("-m", "--machine", required=True, help="目标机器的机器码")
p_gen.add_argument("-o", "--output", required=True, help="输出文件路径")
p_gen.add_argument("-e", "--expiry", default=None, help="有效期截止日期 YYYY-MM-DD")
p_gen.add_argument("-n", "--name", default="WaterQualityInversion", help="产品名称")
# 子命令info
subparsers.add_parser("info", help="显示本机机器信息")
args = parser.parse_args()
if args.command == "verify":
ok, msg = verify_license(args.file)
print(f"[{'OK' if ok else 'FAIL'}] {msg}")
elif args.command == "generate":
ok, msg = generate_license(args.machine, args.output, args.expiry, args.name)
print(f"[{'OK' if ok else 'FAIL'}] {msg}")
elif args.command == "info":
info = get_machine_info()
print("=" * 50)
print("硬件指纹信息")
print("=" * 50)
for k, v in info.items():
print(f" {k}: {v or '(读取失败)'}")
print("=" * 50)
# 同时演示验证
ok, msg = verify_license()
print(f"\n授权验证: [{'OK' if ok else 'FAIL'}] {msg}")
else:
parser.print_help()

View File

@ -40,6 +40,10 @@ CB_AVAILABLE = False # 注释掉catboost
import sys
import os
# PyInstaller 打包环境感知EXE 模式下强制单核,防止 Windows 派生无限重启
is_frozen_env = getattr(sys, 'frozen', False)
safe_n_jobs = 1 if is_frozen_env else -1
from src.preprocessing.spectral_Preprocessing import Preprocessing
@ -643,7 +647,7 @@ class WaterQualityModelingBatch:
config['params'],
cv=cv_strategy,
scoring=scoring,
n_jobs=-1,
n_jobs=safe_n_jobs,
verbose=1
)

View File

@ -40,16 +40,19 @@ class WaterQualityInference:
self.best_model_info = None
self.loaded_model_data = None
def load_sampling_data(self, csv_path: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
def load_sampling_data(self, csv_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
加载sampling生成的CSV数据
加载sampling生成的CSV数据(兼容 WQI 增强版 CSV
Args:
csv_path: CSV文件路径,前两列为经纬度,其余列为光谱数据
csv_path: CSV文件路径
旧版x_coord,y_coord,pixel_x,pixel_y,波长...
新版x_coord,y_coord,WQI_...,波长...
Returns:
coords: 经纬度数据 (DataFrame)
spectra: 光谱数据 (DataFrame)
coords: 经纬度数据 (DataFrame, 2列)
spectra: 光谱数据 (DataFrame, 跳过 WQI 列)
wqi_df: WQI 指数列 (DataFrame, 0或45列)
"""
print(f"正在加载采样数据: {csv_path}")
@ -71,15 +74,35 @@ class WaterQualityInference:
coords = data.iloc[:, :2].copy()
coords.columns = ['longitude', 'latitude']
# 从第5列开始为光谱数据跳过第2、3、4列的其他信息
spectra = data.iloc[:, 4:].copy()
# 动态识别光谱列(兼容 sampling_spectra.csv 列顺序变更
# 列名约定:波长为纯数字字符串如 "374.285004"WQI 为 "WQI_xxx" 前缀
# 旧版 CSV无WQIx_coord,y_coord,pixel_x,pixel_y,波长... → 取 [4:]
# 新版 CSV有WQIx_coord,y_coord,WQI_...,波长... → 过滤 WQI 列后取光谱
all_cols = list(data.columns)
spectral_col_indices = []
wqi_col_indices = []
for i, col in enumerate(all_cols):
col_str = str(col)
if col_str.startswith('WQI_'):
wqi_col_indices.append(i)
elif col_str.replace('.', '').lstrip('-').isdigit():
# 波长列:纯数字字符串
spectral_col_indices.append(i)
else:
# 其他元数据列x_coord/y_coord/pixel_x/pixel_y由 coords 接收
pass
# 光谱列 = 纯数字列WQI 已被排除)
spectra = data.iloc[:, spectral_col_indices].copy() if spectral_col_indices else data.iloc[:, 4:].copy()
# WQI 列(用于追加到预测结果输出)
wqi_df = data.iloc[:, wqi_col_indices].copy() if wqi_col_indices else pd.DataFrame()
print(f" 经纬度数据形状: {coords.shape}")
print(f" 光谱数据形状: {spectra.shape}")
print(f" 光谱数据形状: {spectra.shape} (自动识别波长列,排除 {len(wqi_col_indices)} 个WQI列)")
print(f" 经纬度范围: 经度[{coords['longitude'].min():.6f}, {coords['longitude'].max():.6f}], "
f"纬度[{coords['latitude'].min():.6f}, {coords['latitude'].max():.6f}]")
return coords, spectra
return coords, spectra, wqi_df
def random(self, data, label, test_ratio=0.2, random_state=123):
"""
@ -519,6 +542,69 @@ class WaterQualityInference:
print(f"正在应用预处理方法: {actual_preprocess_method}")
print(f"原始光谱数据形状: {spectra.shape}")
# ---- 自动特征补全50 光谱 → 补全至模型训练时的 95 维WQI 指数) ----
# 触发条件:模型期望 n_features_in_ 个特征,但当前 spectra 列数不足
# 原因training_spectra.csv 含 50 光谱 + 45 WQIsampling_spectra.csv 只有 50 光谱
# 做法与训练端calculate_all_indices完全一致的算法列表实时补全缺失的 45 个 WQI 列
model = self.loaded_model_data['model']
expected_features = getattr(model, 'n_features_in_', None)
# ---- 自动特征补全50 光谱 → 补全至模型训练时的 n_features_in_ 维WQI 指数) ----
if expected_features is not None and spectra.shape[1] < expected_features:
print(f"[特征补全] 检测到特征缺口:当前 {spectra.shape[1]} 列 < 模型期望 {expected_features} 列,"
f"正在从光谱数据实时计算 WQI 指数...")
try:
from src.utils.water_index import WaterQualityIndexCalculator
calc = WaterQualityIndexCalculator()
# 提取纯计算方法(排除 find_closest_wavelength 和 calculate_all_indices
# 以及不返回 Series 的辅助方法)
algorithm_methods = []
for m in dir(calc):
if m.startswith('_'):
continue
if m in ['find_closest_wavelength', 'calculate_all_indices']:
continue
attr = getattr(calc, m)
if callable(attr):
algorithm_methods.append(m)
original_col_count = spectra.shape[1]
for algo_name in algorithm_methods:
try:
algo_func = getattr(calc, algo_name)
result = algo_func(spectra)
# 只追加返回 Series 且长度为样本数的合法结果
if isinstance(result, pd.Series) and len(result) == len(spectra):
spectra[algo_name] = result.values
else:
spectra[algo_name] = np.nan
except Exception:
spectra[algo_name] = np.nan
print(f"[特征补全] 完成!光谱列已扩充至 {spectra.shape[1]}"
f"(追加了 {spectra.shape[1] - original_col_count} 个 WQI 指数)")
except Exception as e:
print(f"[特征补全] 失败,将使用原始光谱特征: {e}")
# ---- 防线 1强制维度对齐物理截断----
if expected_features is not None and spectra.shape[1] > expected_features:
print(f"[精准对齐] 正在将 {spectra.shape[1]} 维特征截断为模型要求的 {expected_features}")
spectra = spectra.iloc[:, :expected_features]
elif expected_features is not None and spectra.shape[1] < expected_features:
# 维度不足时填充 0
padding_cols = expected_features - spectra.shape[1]
for i in range(padding_cols):
spectra[f'_padding_{i}'] = 0.0
print(f"[精准对齐] 特征不足,填充 {padding_cols} 列 0")
# ---- 防线 2彻底清洗无穷大数值----
# 防止 WQI 计算中除零/溢出产生 np.inf / -np.inf 导致预处理崩溃
spectra = spectra.replace([np.inf, -np.inf], np.nan)
spectra = spectra.fillna(0)
print(f"[特征对齐] 最终输入维度: {spectra.shape}")
try:
# 应用预处理
spectra_processed = Preprocessing(actual_preprocess_method, spectra)
@ -573,7 +659,8 @@ class WaterQualityInference:
raise
def save_predictions(self, coords: pd.DataFrame, predictions: np.ndarray,
output_path: str, prediction_column: str = 'prediction'):
output_path: str, prediction_column: str = 'prediction',
wqi_columns: Optional[pd.DataFrame] = None):
"""
保存预测结果
@ -582,11 +669,15 @@ class WaterQualityInference:
predictions: 预测结果
output_path: 输出文件路径
prediction_column: 预测列名称
wqi_columns: Optional[pd.DataFrame] = None
"""
print(f"正在保存预测结果到: {output_path}")
# 创建结果DataFrame
result_df = coords.copy()
# 追加 WQI 水质指数列(如 sampling_spectra.csv 注入了 45 列指数)
if wqi_columns is not None and not wqi_columns.empty:
result_df = pd.concat([result_df, wqi_columns.reset_index(drop=True)], axis=1)
result_df[prediction_column] = predictions
# 确保输出目录存在
@ -659,10 +750,10 @@ class WaterQualityInference:
else:
self.load_best_model(metric=metric)
# 2. 加载采样数据
# 2. 加载采样数据coords=坐标, spectra=纯光谱, wqi_df=45个WQI指数列
print("\n步骤2: 加载采样数据")
print("-" * 40)
coords, spectra = self.load_sampling_data(sampling_csv_path)
coords, spectra, wqi_df = self.load_sampling_data(sampling_csv_path)
# 3. 数据预处理
print("\n步骤3: 数据预处理")
@ -674,10 +765,11 @@ class WaterQualityInference:
print("-" * 40)
predictions = self.predict(spectra_processed)
# 5. 保存预测结果
# 5. 保存预测结果(透传 WQI 列至最终输出文件)
print("\n步骤5: 保存预测结果")
print("-" * 40)
result_df = self.save_predictions(coords, predictions, output_csv_path, prediction_column)
result_df = self.save_predictions(coords, predictions, output_csv_path,
prediction_column, wqi_df)
print("\n" + "=" * 80)
print("推理流程完成!")
@ -747,10 +839,11 @@ class WaterQualityInference:
output_file = output_path / f"prediction_{csv_file.name}"
# 执行推理
coords, spectra = self.load_sampling_data(str(csv_file))
coords, spectra, wqi_df = self.load_sampling_data(str(csv_file))
spectra_processed = self.preprocess_spectra(spectra)
predictions = self.predict(spectra_processed)
result_df = self.save_predictions(coords, predictions, str(output_file), prediction_column)
result_df = self.save_predictions(coords, predictions, str(output_file),
prediction_column, wqi_df)
results[csv_file.name] = {
'output_file': str(output_file),
@ -908,10 +1001,11 @@ class WaterQualityInference:
output_file = output_path / f"{file_stem}{file_ext}"
# 执行推理
coords, spectra = self.load_sampling_data(str(csv_file))
coords, spectra, wqi_df = self.load_sampling_data(str(csv_file))
spectra_processed = self.preprocess_spectra(spectra)
predictions = self.predict(spectra_processed)
result_df = self.save_predictions(coords, predictions, str(output_file), prediction_column)
result_df = self.save_predictions(coords, predictions, str(output_file),
prediction_column, wqi_df)
results[file_stem] = {
'input_file': str(csv_file),

View File

@ -13,11 +13,24 @@ from src.gui.components.custom_widgets import FileSelectWidget
from src.gui.styles import ModernStylesheet
def get_resource_path(relative_path: str) -> str:
"""适配开发与 PyInstaller 环境的路径获取逻辑"""
"""适配开发与 PyInstaller 环境的路径获取逻辑
支持两种打包模式:
1. --onedir 模式:文件在 exe_root/_internal/ 下 → 检查 _internal 目录
2. --onefile 模式:文件在 sys._MEIPASS 平铺目录
"""
# 优先检查 PyInstaller onefile 模式(文件平铺在 _MEIPASS 下)
if hasattr(sys, '_MEIPASS'):
# 打包后,文件会被平铺或按 tree 结构放入临时目录
internal_path = os.path.join(sys._MEIPASS, '_internal', relative_path)
if os.path.exists(internal_path):
return internal_path
return os.path.join(sys._MEIPASS, relative_path)
# 兼容 PyInstaller onedir 模式的 _internal 目录exe 同级目录下)
exe_dir = os.path.dirname(sys.executable)
internal_path = os.path.join(exe_dir, '_internal', relative_path)
if os.path.exists(internal_path):
return internal_path
# 开发环境下:基于当前文件 (step5_5_panel.py) 的绝对路径进行回溯
# 当前在 src/gui/panels/,目标在 src/gui/model/
base_dir = Path(__file__).resolve().parent.parent / "model"

View File

@ -5,6 +5,12 @@
GUI for Water Quality Inversion Pipeline
"""
# ==============================================================================
# 🚀 终极防御:必须在全宇宙第一行强制载入 GDAL 底层 DLL绝对杜绝 0xC0000005 内存崩溃
# ==============================================================================
import osgeo
from osgeo import gdal, ogr
import os
import json
import copy
@ -31,8 +37,23 @@ from PyQt5.QtGui import QIcon, QFont, QTextCursor, QPalette, QColor, QPixmap
import sys
import traceback
import multiprocessing
import ctypes
# ==============================================================================
# 🚀 终极防御置顶:在载入任何自定义面板、样式或子模块之前,强制提前创建 QApplication
# 彻底杜绝 import 时期载入类属性 (如 QFont/QIcon/QPixmap) 触发的 QWidget 崩溃
# ==============================================================================
if multiprocessing.current_process().name == 'MainProcess':
if not QApplication.instance():
try:
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True)
QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
except Exception:
pass
_global_app = QApplication(sys.argv)
# 👇 全局异常钩子(保持不变)
def get_resource_path(relative_path: str) -> str:
"""获取资源的绝对路径,适配 PyInstaller 打包环境。
打包后资源位于 sys._MEIPASS解压临时目录开发环境则基于 __file__ 向上三级。
@ -45,10 +66,31 @@ def get_resource_path(relative_path: str) -> str:
def global_exception_handler(exc_type, exc_value, exc_traceback):
print("\n" + "="*50)
print("【严重错误拦截 - PyQt 崩溃死因】")
traceback.print_exception(exc_type, exc_value, exc_traceback)
print("="*50 + "\n")
err_lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
err_msg = "".join(err_lines)
dump_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "crash_dump.txt")
try:
with open(dump_path, "a", encoding="utf-8") as f:
f.write(f"\n{'='*60}\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]\n")
f.write(err_msg)
except Exception:
pass
try:
from PyQt5.QtWidgets import QMessageBox
from PyQt5.QtCore import Qt
msg = (
"【严重错误 - 程序即将退出】\n\n"
"错误类型: {}\n\n"
"错误信息: {}\n\n"
"详细信息已写入:\n{}".format(
exc_type.__name__,
str(exc_value),
dump_path,
)
)
QMessageBox.critical(None, "程序崩溃", msg)
except Exception:
pass
# 挂载全局异常钩子,阻止 PyQt 静默闪退
sys.excepthook = global_exception_handler
@ -89,9 +131,13 @@ from src.gui.panels.step9_panel import Step9Panel
from src.gui.panels.visualization_panel import VisualizationPanel
from src.gui.panels.report_generation_panel import ReportGenerationPanel
# Matplotlib相关导入
# Matplotlib相关导入 (推迟并加入底层防爆保护)
import matplotlib
matplotlib.use('Qt5Agg')
try:
# 确保只在主线程且安全的环境下绑定后端
matplotlib.use('Qt5Agg', force=False)
except Exception:
pass
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
from matplotlib.figure import Figure
@ -1278,7 +1324,17 @@ class WaterQualityGUI(QMainWindow):
"""水质参数反演分析系统主窗口"""
def __init__(self):
# 1. 🚀 强制设置任务栏图标(解决任务栏图标默认是 Python 黄蓝图标的问题)
# 为当前进程设置独立的 AppUserModelID
my_appid = u'mycompany.megacube.waterquality.v1'
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(my_appid)
super().__init__()
# 2. 设置窗口图标(指向你的 .ico 文件)
icon_path = get_resource_path("data/icons-1/uitubiao.ico")
self.setWindowIcon(QIcon(icon_path))
self.pipeline = None
self.worker = None
self.config_file = None
@ -2996,23 +3052,73 @@ class WaterQualityGUI(QMainWindow):
item.setForeground(QColor(ModernStylesheet.COLORS.get('text_secondary', '#666666'))) # 原始颜色
# ============================================================
# 主启动逻辑
# ============================================================
def main():
"""主函数"""
app = QApplication(sys.argv)
# 设置应用信息
import sys
import multiprocessing
from PyQt5.QtWidgets import QApplication
# 1. 多进程 Fork 环境隔离
if multiprocessing.current_process().name != 'MainProcess':
sys.exit(0)
# 2. 🚀 终极防御:必须在全宇宙第一行强制创建 QApplication 实例!
# 绝对杜绝任何后续验签模块或弹窗过早调用 QWidget 导致的崩溃
app = QApplication.instance()
if not app:
app = QApplication(sys.argv)
app.setApplicationName("Mega Water")
app.setOrganizationName("WaterQuality")
# 创建主窗口
# 3. 安全载入离线授权验证拦截
try:
from src.auth.license_manager import verify_license
from src.auth.license_dialog import LicenseDialog
except ImportError:
import os
_current_dir = os.path.dirname(os.path.abspath(__file__))
_project_root = os.path.abspath(os.path.join(_current_dir, '..', '..'))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from src.auth.license_manager import verify_license
from src.auth.license_dialog import LicenseDialog
_is_license_valid, _license_msg = verify_license()
if not _is_license_valid:
_dialog = LicenseDialog()
_dialog.exec_()
sys.exit(0)
# 4. 授权通过,正常载入主程序主界面
window = WaterQualityGUI()
window.show()
sys.exit(app.exec_())
# ==============================================================================
# 全宇宙最底部程序入口
# ==============================================================================
if __name__ == "__main__":
#冻结只显示1个exe
# multiprocessing.freeze_support()
import sys
import multiprocessing
# 1. 极其强硬的底层防御:
# 如果当前进程明确是 PyInstaller 派生的后台计算子进程,强行静默退出!
# 彻底绕过多进程钩子在尝试解包 sys.argv 时引发的 ValueError 崩溃
if multiprocessing.current_process().name != 'MainProcess':
sys.exit(0)
# 2. 安全调用 freeze_support带防爆气囊
try:
multiprocessing.freeze_support()
except Exception:
pass # 哪怕底层钩子参数解包失败,也强行保住主进程平稳过关
# 3. 正常拉起主业务逻辑
main()

View File

@ -11,6 +11,7 @@ os.environ['GDAL_FILENAME_IS_UTF8'] = 'YES'
os.environ['SHAPE_ENCODING'] = 'UTF-8'
import numpy as np
import pandas as pd
from osgeo import gdal, ogr
import spectral
from scipy import ndimage
@ -354,6 +355,45 @@ def get_spectral_sampling_points_chunked(bil_file, water_mask_shp, severe_glint=
if f:
f.close()
# ==============================================================================
# 🚀 终极手术植入点:带强行环境净化的特征引擎挂载
# ==============================================================================
# 2. 安全校验路径落盘状态
if output_csvpath and os.path.exists(str(output_csvpath)):
try:
from src.utils.water_index import WaterQualityIndexCalculator
print("\n[特征引擎挂载] 正在为采样点自动追加 45 个水质指数衍生特征...")
# 读取基础底座50列光谱
base_df = pd.read_csv(output_csvpath)
# 实例化计算器
calc = WaterQualityIndexCalculator()
# 提取有效算法
algorithm_methods = [
m for m in dir(calc)
if not m.startswith('_') and m not in ['find_closest_wavelength', 'calculate_all_indices']
]
# 就地追加 45 列衍生指数
for algo_name in algorithm_methods:
try:
algo_func = getattr(calc, algo_name)
base_df[algo_name] = algo_func(base_df)
except Exception:
base_df[algo_name] = np.nan
# 覆盖重写最终结果!
base_df.to_csv(output_csvpath, index=False, encoding='utf-8-sig')
print(f"✓ 特征扩充大功告成!当前文件总维度完美适配模型: {base_df.shape}")
except Exception as e:
print(f"⚠ 警告:追加特征失败,保留原基础光谱。死因: {e}")
# ==============================================================================
return x_out, y_out, np.array(spectral_out)
except Exception as e: