diff --git a/2_1banben/app.py b/2_1banben/app.py index 072def4..4692552 100644 --- a/2_1banben/app.py +++ b/2_1banben/app.py @@ -17,6 +17,7 @@ try: from config import Config from extensions import db from models import Device, DeviceHistory + # 引入核心爬虫调度 from services.core import execute_monitor_task try: @@ -28,14 +29,19 @@ try: from routes.api import api_bp as device_bp from routes.api import calculate_offset except ImportError: - from routes.api import device_bp, calculate_offset + # 兜底逻辑,防止缺失 calculate_offset 导致崩溃 + def calculate_offset(target_time): + return 0 + + + from routes.api import device_bp except ImportError as e: - print(f"❌ 严重错误: 模块导入失败。详细信息: {e}") + print(f"❌ [启动错误] 模块导入失败: {e}") sys.exit(1) # ============================================================================== -# 2. 智能路径配置 (适配 PyInstaller 的 _internal 和 _MEIPASS) +# 2. 智能路径配置 # ============================================================================== RESOURCE_BASE = Config.BASE_DIR INSTANCE_PATH = Config.INSTANCE_DIR @@ -43,57 +49,59 @@ INSTANCE_PATH = Config.INSTANCE_DIR def find_static_folder(base_path): """ - 全能路径搜寻逻辑,按优先级查找 web_dist + 全能路径搜寻逻辑,适配 PyInstaller 打包环境 """ - # 1. PyInstaller 打包后的特殊路径 if getattr(sys, 'frozen', False): if hasattr(sys, '_MEIPASS'): mei_path = os.path.join(sys._MEIPASS, 'web_dist') if os.path.exists(os.path.join(mei_path, 'index.html')): return mei_path - internal_path = os.path.join(base_path, '_internal', 'web_dist') if os.path.exists(os.path.join(internal_path, 'index.html')): return internal_path - # 2. 当前目录 (exe 同级) path = os.path.join(base_path, 'web_dist') if os.path.exists(os.path.join(path, 'index.html')): return path - # 3. 开发环境上一级 parent_path = os.path.join(os.path.dirname(base_path), 'web_dist') if os.path.exists(os.path.join(parent_path, 'index.html')): return parent_path - return path STATIC_FOLDER = find_static_folder(RESOURCE_BASE) - mimetypes.add_type('application/javascript', '.js') mimetypes.add_type('text/css', '.css') # ============================================================================== -# 3. 核心定时任务逻辑 +# 3. 核心定时任务逻辑 (深度优化版) # ============================================================================== def auto_monitor_job(app): """ - 每天 17:00 触发的爬虫任务。 - 修复:移除不匹配的 create_time 字段,并确保 Session 清理。 + [关键修复] + 1. 使用 app.app_context() 确保线程中有 Flask 上下文 + 2. 使用 db.session.remove() 强制清理旧连接 + 3. 使用 db.session.merge() 确保对象状态被正确追踪 + 4. 增加详细日志,对比爬虫返回的数据与入库行为 """ with app.app_context(): + # A. 强制清理会话,确保线程获取的是全新的数据库连接 + db.session.remove() + tz = pytz.timezone('Asia/Shanghai') now_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S') print(f"\n{'=' * 50}") - print(f"⏰ [定时任务触发] 北京时间: {now_str}") + print(f"⏰ [定时任务启动] {now_str}") if not execute_monitor_task: + print("❌ 错误: execute_monitor_task 未定义") return try: + # B. 执行爬虫 task_result = execute_monitor_task() if not task_result: @@ -104,62 +112,95 @@ def auto_monitor_job(app): print(f"📦 [数据获取] 爬取到 {len(scraped_list)} 条设备数据") current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - stats = {'new_device': 0, 'history_added': 0} + stats = {'updated': 0, 'history': 0} for item in scraped_list: d_name = item.get('name') if not d_name: continue + # --- 1. 数据解包与默认值处理 --- + # 显式提取,防止 None 覆盖数据库现有的值(如果业务需要) + # 这里假设爬虫返回 None 就是要写入 None,或者空字符串 + raw_status = item.get('status', '未知') + raw_value = item.get('value', '') f_count = item.get('num_files', 0) + + # 时间处理:必须有时间,否则用当前时间 target_date = item.get('target_time') + if not target_date: + target_date = current_time - # A. 更新 Device 表 + raw_json = item.get('raw_json', {}) + + # [调试日志] 仅打印第一条或特定的设备,防止刷屏,但能帮你确认数据是否为空 + # if '0025' in d_name: + # print(f" >>> [写入前检查] {d_name}: Value='{raw_value}' | Files={f_count}") + + # --- 2. 数据库操作 (使用 Merge 机制) --- + # 先尝试查询 device = Device.query.filter_by(name=d_name).first() - if not device: - device = Device(name=d_name, source=item.get('source'), install_site="") - db.session.add(device) - db.session.flush() - stats['new_device'] += 1 - device.status = item.get('status') - device.current_value = item.get('value') + if not device: + # 如果不存在,新建对象 + device = Device(name=d_name, source=item.get('source', '自动爬虫'), install_site="") + db.session.add(device) + db.session.flush() # 立即获取 ID + + # 更新字段 + device.status = raw_status + device.current_value = raw_value device.latest_time = target_date device.check_time = current_time device.file_count = f_count - device.offset = calculate_offset(target_date) - # JSON 处理 + # 计算 Offset + try: + device.offset = calculate_offset(target_date) + except: + device.offset = 0 + + # JSON 数据合并 old_json = {} try: - if device.json_data: old_json = json.loads(device.json_data) + if device.json_data: + old_json = json.loads(device.json_data) except: old_json = {} - new_json = item.get('raw_json', {}) - if isinstance(new_json, dict): old_json.update(new_json) + + if isinstance(raw_json, dict): + old_json.update(raw_json) + device.json_data = json.dumps(old_json, ensure_ascii=False) - # B. 新增 History 记录 - # [修复点] 移除了 create_time 参数,防止报错 - history_entry = DeviceHistory( - device_id=device.id, - status=item.get('status'), - result_data=item.get('value'), - data_time=target_date, - json_data=device.json_data, - file_count=f_count - # create_time=datetime.now() # 已删除:你的 models.py 中没有定义这个字段 - ) - db.session.add(history_entry) - stats['history_added'] += 1 + # [核心修复] 使用 merge 告诉 Session "这个对象归你管,请更新它" + # 这能解决后台线程中 "DetachedInstanceError" 或更新丢失的问题 + db.session.merge(device) + stats['updated'] += 1 - db.session.flush() + # --- 3. 写入历史记录 --- + history = DeviceHistory( + device_id=device.id, + status=raw_status, + result_data=raw_value, + data_time=target_date, + file_count=f_count, + json_data=device.json_data + ) + db.session.add(history) + stats['history'] += 1 + + # C. 提交事务 db.session.commit() - print(f"✅ [入库成功] 新增设备: {stats['new_device']} | 新增历史: {stats['history_added']}") + print(f"✅ [入库成功] 设备更新: {stats['updated']} | 历史追加: {stats['history']}") except Exception as e: db.session.rollback() print(f"❌ [严重异常] 数据写入失败: {e}") + # 打印堆栈以便排查 + import traceback + traceback.print_exc() finally: + # D. 再次清理 Session,防止内存泄漏或污染下一次任务 db.session.remove() print(f"{'=' * 50}\n") @@ -168,10 +209,7 @@ def auto_monitor_job(app): # 4. Flask 应用工厂 # ============================================================================== def create_app(): - # 调试路径 print(f"🔍 [前端路径锁定] {STATIC_FOLDER}") - if not os.path.exists(os.path.join(STATIC_FOLDER, 'index.html')): - print(f"❌ [严重警告] 仍然无法找到 index.html,请检查 PyInstaller 是否将 web_dist 打包进了 _internal 目录。") app = Flask(__name__, static_folder=STATIC_FOLDER, instance_path=INSTANCE_PATH) CORS(app) @@ -180,20 +218,25 @@ def create_app(): os.makedirs(app.instance_path, exist_ok=True) app.config.from_object(Config) + + # 初始化 DB db.init_app(app) + # 初始化调度器 scheduler = APScheduler() scheduler.init_app(app) scheduler.start() + # --- 添加定时任务 --- + # 注意:这里我们传递 [app] 作为参数,确保 job 函数内能获取到 app 上下文 scheduler.add_job( id='daily_monitor_task', func=auto_monitor_job, args=[app], trigger='cron', hour=17, - minute=0, - second=0, + minute=00, + second=00, misfire_grace_time=3600, timezone=pytz.timezone('Asia/Shanghai') ) @@ -203,29 +246,27 @@ def create_app(): @app.route('/api/force_run') def force_run_task(): + """手动触发接口:复用同一个 auto_monitor_job 函数,确保逻辑一致""" auto_monitor_job(app) - return jsonify({'code': 200, 'msg': '手动触发成功,历史记录已追加'}) + return jsonify({'code': 200, 'msg': '手动触发成功,请查看服务器日志'}) @app.route('/') def serve_index(): try: return send_from_directory(app.static_folder, 'index.html') except Exception: - return "

错误:找不到前端文件

", 404 + return "Frontend Error", 404 @app.route('/') def serve_static(path): + if path.startswith('api'): + return jsonify({'code': 404, 'message': 'API endpoint not found'}), 404 + file_path = os.path.join(app.static_folder, path) if os.path.exists(file_path): return send_from_directory(app.static_folder, path) - if path.startswith('api'): - return jsonify({'code': 404, 'message': 'API endpoint not found'}), 404 - - try: - return send_from_directory(app.static_folder, 'index.html') - except Exception: - return "Frontend not found", 404 + return send_from_directory(app.static_folder, 'index.html') with app.app_context(): db.create_all() @@ -238,4 +279,5 @@ if __name__ == '__main__': debug_mode = not getattr(sys, 'frozen', False) print(f"🚀 服务启动中... 数据库: {app.config['SQLALCHEMY_DATABASE_URI']}") + # 注意:use_reloader=False 防止调度器在 Debug 模式下运行两次 app.run(host='0.0.0.0', port=5000, debug=debug_mode, use_reloader=False) \ No newline at end of file diff --git a/2_1banben/services/core.py b/2_1banben/services/core.py index 8eeae7e..2c89b61 100644 --- a/2_1banben/services/core.py +++ b/2_1banben/services/core.py @@ -4,11 +4,13 @@ import threading import traceback from datetime import datetime -# 动态导入,防止文件缺失导致整个程序启动失败 +# ============================================================================== +# 1. 动态导入模块 +# ============================================================================== try: from .crawler_106 import run_106_logic -except ImportError: - print("⚠️ 警告: 未找到 crawler_106 模块") +except ImportError as e: + print(f"⚠️ [系统警告] 无法导入 crawler_106: {e}") def run_106_logic(): @@ -16,13 +18,14 @@ except ImportError: try: from .crawler_82 import run_82_logic -except ImportError: - print("⚠️ 警告: 未找到 crawler_82 模块") +except ImportError as e: + print(f"⚠️ [系统警告] 无法导入 crawler_82: {e}") def run_82_logic(): return [] +# 全局任务锁 task_lock = threading.Lock() @@ -34,12 +37,13 @@ def execute_monitor_task(): # 1. 锁机制:防止任务重复运行 if task_lock.locked(): logging.warning(">>> 任务正在运行中,跳过") - print(">>> ⚠️ 任务正在运行中,本次请求跳过") + print(">>> ⚠️ [调度] 任务正在运行中,本次请求跳过") return None with task_lock: + start_time = datetime.now() logging.info(">>> 开始执行监控任务...") - print(f"--- [任务开始] {datetime.now().strftime('%H:%M:%S')} ---") + print(f"--- [任务开始] {start_time.strftime('%H:%M:%S')} ---") all_results = [] @@ -47,28 +51,27 @@ def execute_monitor_task(): # 2. 执行 106 爬虫 # ========================== try: + print(f">>> [106爬虫] 启动...") list_106 = run_106_logic() + if list_106: count = len(list_106) print(f"✅ 106爬虫获取数据: {count} 条") - - # 🔍 [调试] 打印第一条数据,确认 num_files 是否存在 - if count > 0: - first = list_106[0] - print(f" [调试检查] 106样本: {first.get('name')} | num_files={first.get('num_files')}") - all_results.extend(list_106) else: - print("⚠️ 106爬虫未返回数据") + print("⚠️ 106爬虫运行完成,但未返回任何数据 (空列表)") + except Exception as e: - print(f"❌ 106爬虫执行失败: {e}") + print(f"❌ 106爬虫执行严重失败: {e}") traceback.print_exc() # ========================== # 3. 执行 82 爬虫 # ========================== try: + print(f">>> [82爬虫] 启动...") list_82 = run_82_logic() + if list_82: print(f"✅ 82爬虫获取数据: {len(list_82)} 条") @@ -76,20 +79,26 @@ def execute_monitor_task(): for item in list_82: if 'num_files' not in item: item['num_files'] = 0 + if 'status' not in item: + item['status'] = 'Unknown' all_results.extend(list_82) + else: + print("⚠️ 82爬虫运行完成,但未返回数据") + except Exception as e: - print(f"❌ 82爬虫执行失败: {e}") + print(f"❌ 82爬虫执行严重失败: {e}") traceback.print_exc() # ========================== # 4. 汇总返回 # ========================== + duration = (datetime.now() - start_time).total_seconds() logging.info(f">>> 任务完成,共获取 {len(all_results)} 条数据") - print(f"--- [任务结束] 总计获取: {len(all_results)} 台设备 ---") + print(f"--- [任务结束] 总耗时: {duration:.2f}秒 | 总计获取: {len(all_results)} 台设备 ---") return { 'device_list': all_results, 'target_time': None, # 具体时间已在 item['target_time'] 里 - 'temp_file_path': None # 废弃旧逻辑,文件路径已在 item['temp_file'] 里 + 'temp_file_path': None # 废弃旧逻辑 } \ No newline at end of file diff --git a/2_1banben/services/crawler_106.py b/2_1banben/services/crawler_106.py index de81da9..451ed0e 100644 --- a/2_1banben/services/crawler_106.py +++ b/2_1banben/services/crawler_106.py @@ -9,6 +9,7 @@ CONFIG = Config.CRAWLER_CONFIG["106"] def get_temp_dir(): + """获取临时文件存储目录""" base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) temp_dir = os.path.join(base_dir, 'instance', 'temp') if not os.path.exists(temp_dir): @@ -17,6 +18,7 @@ def get_temp_dir(): def get_106_dynamic_token(port): + """获取动态登录 Token""" try: login_url = f"http://106.75.72.40:{port}/api/login" resp = requests.post(login_url, json=CONFIG["login_payload"], timeout=10) @@ -26,59 +28,82 @@ def get_106_dynamic_token(port): def find_closest_item(items, is_date_level=True): + """ + 在列表中找到与当前日期最接近的文件夹或文件 + """ if not items or not isinstance(items, list): return None today = datetime.now() scored_items = [] + for item in items: name_val = item.get('name', '') path_val = item.get('path', '') + # 如果是日期层级,名字通常是 2026_02_08 这种格式 target_str = name_val if name_val else path_val.split('/')[-1] + try: if is_date_level: + # 解析文件夹日期格式: YYYY_MM_DD current_date = datetime.strptime(target_str, "%Y_%m_%d") else: + # 解析文件修改时间 mod_str = item.get('modified', '') current_date = datetime.fromisoformat(mod_str.replace('Z', '+00:00')) + + # 计算与当前时间的差距 diff = abs((today - current_date.replace(tzinfo=None)).total_seconds()) scored_items.append((diff, item, target_str)) except: continue + if not scored_items: return None + # 按时间差排序,取最小的 scored_items.sort(key=lambda x: x[0]) return scored_items[0] def run_106_logic(): - """返回 result_list, 每个元素是一个字典""" + """ + 106 爬虫主逻辑 + 返回 result_list, 每个元素是一个字典 + """ results = [] print(">>> [106爬虫] 启动...") - # today_str = datetime.now().strftime("%Y_%m_%d") # ❌ 移除严格的“今天”判断 main_headers = {"Authorization": CONFIG["primary_auth"], "User-Agent": "Mozilla/5.0"} try: + # 0. 获取代理列表 (设备列表) resp = requests.get(CONFIG["base_url"], headers=main_headers, timeout=20) proxies = resp.json().get('proxies', []) for item in proxies: name = item.get('name', '') + # 过滤规则:必须以 _data 结尾 if not name.lower().endswith('_data'): continue + name_upper = name.upper() is_tower_underscore = "TOWER_" in name_upper is_tower_i = "TOWER" in name_upper and not is_tower_underscore + + # 过滤规则:必须包含 TOWER 相关标识 if not (is_tower_underscore or is_tower_i): continue - # 构建基础数据包 + # --- 构建基础数据包 --- + # 默认使用标准当前时间作为兜底,防止后续步骤失败时时间为空 + current_standard_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + data_packet = { 'source': '106网站', 'name': name, 'status': '正常', 'value': '', - 'target_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + 'target_time': current_standard_time, 'raw_json': {}, 'temp_file': None, - 'num_files': 0 # ✅ 默认值 + 'num_files': 0 } + # 检查在线状态 if str(item.get('status')).lower() != 'online': data_packet['status'] = '离线' data_packet['value'] = f"状态: {item.get('status')}" @@ -86,6 +111,7 @@ def run_106_logic(): continue try: + # 获取端口和 Token port = item.get('conf', {}).get('remote_port') token = get_106_dynamic_token(port) if not token: @@ -97,42 +123,51 @@ def run_106_logic(): headers = {"Authorization": CONFIG["primary_auth"], "x-auth": token} api_root = "/api/resources/Data/" if is_tower_underscore else "/api/resources/data/" - # 1. 获取日期列表 + # --- 1. 获取日期文件夹列表 --- res1 = requests.get(f"http://106.75.72.40:{port}{api_root}", headers=headers, timeout=10) best_date = find_closest_item(res1.json().get('items', []), True) - # ✅ 修改点:如果找不到任何日期文件夹,才报错。否则,即使是旧日期也继续往下走。 if not best_date: data_packet['value'] = "未找到任何日期文件夹" results.append(data_packet) continue - data_packet['target_time'] = best_date[2] # 记录找到的那个日期 (比如 2026_02_02) - date_path = f"{api_root}{best_date[2]}/" + # ============================================================================== + # ✅ [核心修复] 时间格式标准化 + # 原逻辑: data_packet['target_time'] = best_date[2] (得到 "2026_02_08") + # 新逻辑: 将 "2026_02_08" 转换为 "2026-02-08 HH:MM:SS" + # ============================================================================== + raw_folder_name = best_date[2] # 例如 "2026_02_08" + formatted_date_part = raw_folder_name.replace('_', '-') # 变成 "2026-02-08" + current_time_part = datetime.now().strftime("%H:%M:%S") - # 2. 请求具体日期的文件夹内容 (这一步能获取 numFiles) + # 覆盖默认时间,确保数据库存入的是标准时间戳格式 + data_packet['target_time'] = f"{formatted_date_part} {current_time_part}" + + date_path = f"{api_root}{raw_folder_name}/" + + # --- 2. 请求具体日期的文件夹内容 (获取 numFiles) --- res2 = requests.get(f"http://106.75.72.40:{port}{date_path}", headers=headers, timeout=10) - folder_data = res2.json() # 获取完整JSON + folder_data = res2.json() - # ✅ 核心:提取 numFiles (只要请求成功,这里一定能拿到) file_count = folder_data.get('numFiles', 0) data_packet['num_files'] = file_count - print(f" -> {name}: 找到日期 {best_date[2]}, 文件数: {file_count}") + print(f" -> {name}: 找到日期 {formatted_date_part}, 文件数: {file_count}") - # 3. 找该文件夹里最新的文件 + # --- 3. 找该文件夹里最新的文件 --- best_file = find_closest_item(folder_data.get('items', []), False) if not best_file: - data_packet['value'] = "文件夹为空" # 这种情况下 numFiles 应该是 0 + data_packet['value'] = "文件夹为空" results.append(data_packet) continue file_item = best_file[1] full_path = file_item.get('path') or f"{date_path}{file_item.get('name')}" - # 4. 下载/读取内容逻辑 + # --- 4. 下载/读取内容逻辑 --- if is_tower_i: - # 下载二进制文件 + # [二进制文件] 下载逻辑 download_url = f"http://106.75.72.40:{port}/api/raw{full_path}" res3 = requests.get(download_url, headers=headers, timeout=20, stream=True) if res3.status_code == 200: @@ -143,18 +178,19 @@ def run_106_logic(): data_packet['temp_file'] = temp_path data_packet['value'] = f"Binary Downloaded: {len(res3.content)} bytes" - data_packet['raw_json'] = file_item # 借用 file_item 充当 raw_json + data_packet['raw_json'] = file_item # 借用 file_item 充当 raw_json else: data_packet['status'] = '异常' data_packet['value'] = f"下载失败: {res3.status_code}" else: - # JSON 内容 + # [文本文件] JSON 解析逻辑 file_api_url = f"http://106.75.72.40:{port}/api/resources{full_path}" res3 = requests.get(file_api_url, headers=headers, timeout=20) try: json_content = res3.json() data_packet['raw_json'] = json_content - data_packet['value'] = json_content.get('content', '') + # 尝试提取 content 内容,如果没有则截取部分 JSON 字符串 + data_packet['value'] = json_content.get('content', str(json_content)[:100]) except: data_packet['value'] = "JSON解析失败" @@ -162,7 +198,7 @@ def run_106_logic(): except Exception as e: data_packet['status'] = '异常' - data_packet['value'] = str(e)[:50] + data_packet['value'] = str(e)[:100] results.append(data_packet) except Exception as e: