diff --git a/1.1/test1.py b/1.1/test1.py index e12bfa6..62cfd18 100644 --- a/1.1/test1.py +++ b/1.1/test1.py @@ -2,35 +2,46 @@ import os import json import threading import requests +import logging from datetime import datetime from flask import Flask, jsonify from flask_sqlalchemy import SQLAlchemy from flask_cors import CORS +from flask_apscheduler import APScheduler from lxml import etree +# --- 配置日志 --- +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + app = Flask(__name__) CORS(app) # --- 数据库配置 --- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///monitor_data.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False +app.config['SCHEDULER_API_ENABLED'] = True # 允许通过API查看调度任务 + db = SQLAlchemy(app) +scheduler = APScheduler() -class ErrorLog(db.Model): +# --- 模型定义 --- +class MonitorRecord(db.Model): id = db.Column(db.Integer, primary_key=True) source = db.Column(db.String(50)) name = db.Column(db.String(100)) + status = db.Column(db.String(50)) # 正常 / 离线 / 异常 reason = db.Column(db.String(255)) offset = db.Column(db.String(50)) - latest_time = db.Column(db.String(50)) - check_time = db.Column(db.String(50)) + latest_time = db.Column(db.String(50)) # 数据时间 + check_time = db.Column(db.String(50)) # 爬取时间 content = db.Column(db.Text, nullable=True) with app.app_context(): db.create_all() +# --- 爬虫配置 --- CONFIG = { "106": { "base_url": "http://106.75.72.40:7500/api/proxy/tcp", @@ -46,35 +57,61 @@ CONFIG = { is_running = False -# --- 通用工具函数 --- -def add_error_to_db(source, name, reason, latest_time="N/A", content=None): - days_diff = "N/A" - if latest_time and latest_time != "N/A": - try: - # 兼容 2024_05_20 和 2024-05-20 两种格式 - clean_date_str = str(latest_time).split()[0].replace('_', '-') - target_date = datetime.strptime(clean_date_str, "%Y-%m-%d").date() - diff = (datetime.now().date() - target_date).days - days_diff = f"滞后 {diff} 天" if diff > 0 else "当天已同步" - except: - days_diff = "解析失败" - - log = ErrorLog( - source=source, name=name, reason=reason, offset=days_diff, - latest_time=latest_time, content=content, - check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S") - ) - db.session.add(log) - - -# --- 106 专用辅助函数 --- -def get_106_dynamic_token(port): - login_url = f"http://106.75.72.40:{port}/api/login" +# --- 核心辅助函数 --- +def calculate_offset(latest_time_str): + if not latest_time_str or latest_time_str == "N/A": + return "从未同步" try: + clean_date_str = str(latest_time_str).split()[0].replace('_', '-') + target_date = datetime.strptime(clean_date_str, "%Y-%m-%d").date() + diff = (datetime.now().date() - target_date).days + if diff == 0: return "当天已同步" + return f"滞后 {diff} 天" + except: + return "时间解析失败" + + +def save_record(source, name, status, reason, latest_time="N/A", content=None): + """ + Upsert 逻辑: 有则更新,无则插入 + """ + record = MonitorRecord.query.filter_by(source=source, name=name).first() + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_offset = calculate_offset(latest_time) + + if record: + if content is not None: record.content = content + if latest_time != "N/A": record.latest_time = latest_time + + record.status = status + record.reason = reason + record.check_time = now_str + # 使用当前库里的时间重新计算 offset + time_base = latest_time if latest_time != "N/A" else record.latest_time + record.offset = calculate_offset(time_base) + else: + new_record = MonitorRecord( + source=source, name=name, status=status, reason=reason, + offset=current_offset, latest_time=latest_time, + check_time=now_str, content=content + ) + db.session.add(new_record) + + try: + db.session.commit() + except Exception as e: + db.session.rollback() + logging.error(f"DB Error: {e}") + + return f"{source}_{name}" + + +# --- 106 逻辑 --- +def get_106_dynamic_token(port): + try: + login_url = f"http://106.75.72.40:{port}/api/login" resp = requests.post(login_url, json=CONFIG["106"]["login_payload"], timeout=10) - if resp.status_code == 200: - return resp.text.strip().replace('"', '') - return None + return resp.text.strip().replace('"', '') if resp.status_code == 200 else None except: return None @@ -102,62 +139,60 @@ def find_closest_item(items, is_date_level=True): return scored_items[0] -# --- 106 核心采集逻辑 --- -def run_106_logic(): +def run_106_logic(active_set): c = CONFIG["106"] today_str = datetime.now().strftime("%Y_%m_%d") main_headers = {"Authorization": c["primary_auth"], "User-Agent": "Mozilla/5.0"} - try: - resp = requests.get(c["base_url"], headers=main_headers, timeout=15) + resp = requests.get(c["base_url"], headers=main_headers, timeout=20) proxies = resp.json().get('proxies', []) - for item in proxies: name = item.get('name', '') - # 严格过滤 if not name.lower().endswith('_data'): continue - name_upper = name.upper() - is_tower_underscore = "TOWER_" in name_upper - is_tower_i = "TOWER" in name_upper and not is_tower_underscore - if not (is_tower_underscore or is_tower_i): continue + if "TOWER" not in name.upper(): continue - # 状态检查 if str(item.get('status')).lower() != 'online': - add_error_to_db("106网站", name, f"离线({item.get('status')})") + key = save_record("106网站", name, "离线", f"设备状态: {item.get('status')}") + active_set.add(key) continue try: port = item.get('conf', {}).get('remote_port') token = get_106_dynamic_token(port) if not token: - add_error_to_db("106网站", name, "Token获取失败") + key = save_record("106网站", name, "异常", "Token获取失败") + active_set.add(key) continue - headers = {"Authorization": c["primary_auth"], "x-auth": token, "User-Agent": "Mozilla/5.0"} - api_root = "/api/resources/Data/" if is_tower_underscore else "/api/resources/data/" + headers = {"Authorization": c["primary_auth"], "x-auth": token} + api_root = "/api/resources/Data/" if "TOWER_" in name.upper() else "/api/resources/data/" - # Step 1: 寻找日期目录 + # 寻找日期 res1 = requests.get(f"http://106.75.72.40:{port}{api_root}", headers=headers, timeout=10) - best_date = find_closest_item(res1.json().get('items', []), is_date_level=True) + best_date = find_closest_item(res1.json().get('items', []), True) if not best_date or best_date[2] != today_str: - add_error_to_db("106网站", name, "未找到今日文件夹", best_date[2] if best_date else "无") + key = save_record("106网站", name, "正常", "未找到今日文件夹", + latest_time=best_date[2] if best_date else "N/A") + active_set.add(key) continue - # Step 2: 寻找最新文件 + # 寻找文件 date_path = f"{api_root}{best_date[2]}/" res2 = requests.get(f"http://106.75.72.40:{port}{date_path}", headers=headers, timeout=10) - best_file = find_closest_item(res2.json().get('items', []), is_date_level=False) + best_file = find_closest_item(res2.json().get('items', []), False) if not best_file: - add_error_to_db("106网站", name, "文件夹内无文件", today_str) + key = save_record("106网站", name, "正常", "今日文件夹为空", latest_time=today_str) + active_set.add(key) continue + # 获取内容 file_item = best_file[1] full_path = file_item.get('path') or f"{date_path}{file_item.get('name')}" - # Step 3: 获取内容 - final_content = "" + # 判断下载方式 + is_tower_i = "TOWER" in name.upper() and "TOWER_" not in name.upper() if is_tower_i: download_url = f"http://106.75.72.40:{port}/api/raw{full_path}" res3 = requests.get(download_url, headers=headers, timeout=20) @@ -167,23 +202,25 @@ def run_106_logic(): res3 = requests.get(file_api_url, headers=headers, timeout=20) final_content = res3.json().get('content', '') - add_error_to_db("106网站", name, "同步成功", today_str, content=final_content) + key = save_record("106网站", name, "正常", "同步成功", latest_time=today_str, content=final_content) + active_set.add(key) except Exception as e: - add_error_to_db("106网站", name, f"采集异常: {str(e)}") - + key = save_record("106网站", name, "异常", f"采集错误: {str(e)[:50]}") + active_set.add(key) except Exception as e: - add_error_to_db("106网站", "全局错误", str(e)) + logging.error(f"106 Global Error: {e}") -# --- 82 逻辑 (保持不变) --- -def run_82_logic(): +# --- 82 逻辑 --- +def run_82_logic(active_set): c = CONFIG["82"] session = requests.Session() try: session.post(f"{c['base_url']}/login.php", data=c["login"], timeout=10) resp = session.post(f"{c['base_url']}/GetStationList.php", timeout=10) stations = etree.HTML(resp.content).xpath('//option/@value') + for sid in [s for s in stations if s]: try: r = session.post(f"{c['base_url']}/getLastWeatherData.php", data=str(sid), @@ -192,30 +229,58 @@ def run_82_logic(): if data: d_list = data.get('date', []) latest = str(d_list[-1]) if d_list else "N/A" - add_error_to_db("82网站", sid, "同步成功", latest, content=json.dumps(data, ensure_ascii=False)) + key = save_record("82网站", sid, "正常", "同步成功", latest_time=latest, + content=json.dumps(data, ensure_ascii=False)) + active_set.add(key) + else: + key = save_record("82网站", sid, "异常", "返回空数据") + active_set.add(key) except: - add_error_to_db("82网站", sid, "采集异常") + key = save_record("82网站", sid, "异常", "单个采集失败") + active_set.add(key) except Exception as e: - add_error_to_db("82网站", "初始化错误", str(e)) + logging.error(f"82 Global Error: {e}") -# --- Flask 路由 --- -def background_task(): +# --- 核心任务逻辑 --- +def execute_monitor_task(): global is_running - with app.app_context(): - ErrorLog.query.delete() # 每次开始前清理旧日志 - run_106_logic() - run_82_logic() - db.session.commit() - is_running = False + if is_running: + logging.warning("Task already running, skipping...") + return - -@app.route('/api/run', methods=['POST']) -def start(): - global is_running - if is_running: return jsonify({"status": "busy"}), 400 is_running = True - threading.Thread(target=background_task).start() + logging.info("Starting monitor task...") + + with app.app_context(): + active_set = set() + run_106_logic(active_set) + run_82_logic(active_set) + + # 掉线处理 + all_records = MonitorRecord.query.all() + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + for record in all_records: + if f"{record.source}_{record.name}" not in active_set: + record.status = "已离线" + record.reason = "设备本次未出现" + record.check_time = now_str + record.offset = calculate_offset(record.latest_time) + + try: + db.session.commit() + except: + db.session.rollback() + + is_running = False + logging.info("Monitor task finished.") + + +# --- 路由 --- +@app.route('/api/run', methods=['POST']) +def manual_start(): + if is_running: return jsonify({"status": "busy"}), 400 + threading.Thread(target=execute_monitor_task).start() return jsonify({"status": "started"}) @@ -225,17 +290,26 @@ def status(): return jsonify({"is_running": is_running}) @app.route('/api/logs') def logs(): - data = ErrorLog.query.all() + data = MonitorRecord.query.all() return jsonify([{ - "source": l.source, - "name": l.name, - "reason": l.reason, - "offset": l.offset, - "latest_time": l.latest_time, - "check_time": l.check_time, - "content": l.content + "source": l.source, "name": l.name, "status": l.status, + "reason": l.reason, "offset": l.offset, "latest_time": l.latest_time, + "check_time": l.check_time, "content": l.content } for l in data]) +# --- 调度器配置 --- +# id: 任务ID, func: 任务函数(字符串路径或引用), trigger: cron(定时), hour/minute: 时间 +@scheduler.task('cron', id='daily_job', hour=10, minute=0) +def auto_run_task(): + with app.app_context(): + logging.info("Auto scheduler triggered.") + # 在新线程中运行,避免阻塞调度器主线程 + threading.Thread(target=execute_monitor_task).start() + + if __name__ == "__main__": - app.run(debug=True, port=5000) \ No newline at end of file + scheduler.init_app(app) + scheduler.start() + # 注意:debug=True 可能会导致调度器在开发模式下运行两次,生产环境建议关闭 debug + app.run(debug=True, port=5000, use_reloader=False) \ No newline at end of file diff --git a/zhandianxinxi/my-vue-app/src/App.vue b/zhandianxinxi/my-vue-app/src/App.vue index c4d92b2..158b49c 100644 --- a/zhandianxinxi/my-vue-app/src/App.vue +++ b/zhandianxinxi/my-vue-app/src/App.vue @@ -4,25 +4,25 @@
- 红色:离线/无数据/滞后>7天 - 橘色:滞后2-7天 - 黄色:滞后1-2天 或 跨天未更新 - 绿色:今日已同步 + 红色:已离线 / 异常 / 滞后>7天 + 橘色:滞后 2-7 天 + 黄色:滞后 1-2 天 + 绿色:正常且今日已同步
@@ -32,44 +32,50 @@ 106 代理 82 气象站 - +
- + 屏蔽选中
- + - + - + - - - + + + @@ -78,131 +84,106 @@
- + {{ formatDisplayName(activeDevice.name) }} - {{ activeDevice.source }} - {{ activeDevice.latest_time }} + {{ getStatusLabel(activeDevice) }} + {{ activeDevice.latest_time }} + {{ activeDevice.check_time }}
-

- {{ is106Site ? '光谱能量分布分析 (自动滤除饱和噪点)' : '气象站光谱数据分析 (Up/Down Spec)' }} + {{ is106Site ? '光谱能量分布 (已滤除饱和值)' : '气象站光谱数据 (Up/Down Spec)' }}

- -
- {{ isContentEmpty(activeDevice) ? '暂无有效的数据内容' : '未检测到符合格式的光谱数据' }} -
- +
型号: {{ module.model }} - 序列号: {{ module.sn }} + SN: {{ module.sn }}
-
@@ -421,18 +406,12 @@ onMounted(() => { .status-idle { display: flex; align-items: center; gap: 5px; } .status-summary { margin: 15px 0; display: flex; gap: 10px; flex-wrap: wrap; } .toolbar { background: #fff; padding: 15px 20px; border-radius: 8px; display: flex; justify-content: space-between; margin-bottom: 20px; border: 1px solid #ebeef5; align-items: center; } +.name-cell { display: flex; align-items: center; } +:deep(.offline-row) { background-color: #fef0f0 !important; } .drawer-content { padding: 0 20px 20px; } .info-banner { margin-bottom: 20px; } -.section-title { border-left: 4px solid #409EFF; padding-left: 10px; margin: 25px 0 15px; font-size: 18px; display: flex; align-items: center; gap: 8px; color: #303133; } .chart-container { margin-bottom: 30px; border: 1px solid #e4e7ed; border-radius: 8px; overflow: hidden; background: #fff; } .chart-header { background: #fafafa; padding: 10px 20px; display: flex; justify-content: space-between; align-items: center; border-bottom: 1px solid #e4e7ed; } -.module-tag { font-weight: bold; color: #409EFF; font-size: 15px; } -.sn-tag { font-size: 12px; color: #606266; background: #ecf5ff; padding: 3px 8px; border-radius: 4px; border: 1px solid #d9ecff; } .echart-box { width: 100%; height: 380px; } .echart-box.no-header { margin-top: 15px; } -.empty-hint { text-align: center; padding: 50px; color: #909399; font-style: italic; background: #fff; border-radius: 8px; } - -:deep(.el-drawer__header) { margin-bottom: 0; padding: 15px 20px; background: #303133; color: #fff !important; } -:deep(.el-drawer__title) { color: #fff; font-weight: bold; } -:deep(.el-drawer__close-btn) { color: #fff; }