Files
ZDXX/1.1/test1.py
2026-01-06 15:27:10 +08:00

313 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import time
import threading
import requests
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from lxml import etree
# --- 初始化 Flask App ---
app = Flask(__name__)
CORS(app) # 解决前端 Vite 5173 端口的跨域问题
# --- 数据库配置 (SQLite) ---
# 数据库文件将生成在 backend 目录下
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///monitor_data.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# --- 数据库模型 ---
class ErrorLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
source = db.Column(db.String(50)) # 106网站 / 82网站
name = db.Column(db.String(100)) # 设备名称/站点ID
reason = db.Column(db.String(255)) # 状态描述
offset = db.Column(db.String(50)) # 日期偏移 (如: 滞后 2 天)
latest_time = db.Column(db.String(50)) # 最新文件日期
check_time = db.Column(db.String(50)) # 本次检查时间
content = db.Column(db.Text, nullable=True) # 专门存储 Tower_ 站点的 JSON 内容
# 每次启动时确保表结构已建立
with app.app_context():
db.create_all()
# --- 基础配置 ---
DATA_ROOT = "data"
FRPS_DIR = os.path.join(DATA_ROOT, "frps_106")
WEATHER_DIR = os.path.join(DATA_ROOT, "weather_82")
for d in [FRPS_DIR, WEATHER_DIR]:
os.makedirs(d, exist_ok=True)
CONFIG = {
"106": {
"base_url": "http://106.75.72.40:7500/api/proxy/tcp",
"primary_auth": "Basic YWRtaW46bGljYWhr",
"login_payload": {"username": "admin", "password": "licahk", "recaptcha": ""}
},
"82": {
"base_url": "http://82.156.1.111/weather/php",
"login": {'username': 'renlixin', 'password': 'licahk', 'login': '123'}
}
}
is_running = False # 全局任务状态锁
# --- 通用工具函数 ---
def add_error_to_db(source, name, reason, latest_time="N/A", content=None):
"""计算日期偏移并记录到数据库"""
days_diff = "N/A"
if latest_time and latest_time != "N/A":
try:
# 兼容 2024_01_01 和 2024-01-01 格式
clean_date_str = str(latest_time).split()[0].replace('_', '-')
target_date = datetime.strptime(clean_date_str, "%Y-%m-%d").date()
today_date = datetime.now().date()
diff = (today_date - target_date).days
days_diff = f"滞后 {diff}" if diff > 0 else "当天已同步"
except:
days_diff = "解析失败"
log = ErrorLog(
source=source,
name=name,
reason=reason,
offset=days_diff,
latest_time=latest_time,
check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
content=content
)
db.session.add(log)
def find_closest_item(items, is_date_level=True):
if not items or not isinstance(items, list): return None
today = datetime.now()
scored_items = []
for item in items:
if not isinstance(item, dict): continue
name_val = item.get('name', '')
path_val = item.get('path', '')
target_str = name_val if name_val else path_val.split('/')[-1]
try:
if is_date_level:
current_date = datetime.strptime(target_str, "%Y_%m_%d")
else:
mod_str = item.get('modified', '')
if mod_str:
current_date = datetime.fromisoformat(mod_str.replace('Z', '+00:00'))
else:
continue
diff = abs((today - current_date.replace(tzinfo=None)).total_seconds())
scored_items.append((diff, item, target_str))
except:
continue
if not scored_items: return None
scored_items.sort(key=lambda x: x[0])
return scored_items[0]
def process_text_content(raw_content):
if not raw_content: return ""
lines = str(raw_content).split('\n')
result, current = [], ""
for line in lines:
if " " in line:
current += line.strip()
else:
if current: result.append(current)
current = line.strip()
if current: result.append(current)
return "\n".join(result)
# --- 106 业务逻辑 ---
def get_106_dynamic_token(port):
url = f"http://106.75.72.40:{port}/api/login"
try:
resp = requests.post(url, json=CONFIG["106"]["login_payload"], timeout=10)
if resp.status_code == 200:
return resp.text.strip().replace('"', '')
except:
pass
return None
def run_106_logic():
c = CONFIG["106"]
today_str = datetime.now().strftime("%Y_%m_%d")
try:
main_headers = {"Authorization": c["primary_auth"], "User-Agent": "Mozilla/5.0"}
resp = requests.get(c["base_url"], headers=main_headers, timeout=15)
if resp.status_code != 200:
add_error_to_db("106网站", "主入口API", f"访问失败: HTTP {resp.status_code}")
return
for item in resp.json().get('proxies', []):
name = item.get('name', 'Unknown')
if not name.lower().endswith('_data'): continue
status = str(item.get('status', '')).lower().strip()
if status != 'online':
add_error_to_db("106网站", name, f"设备离线 ({status})")
continue
name_up = name.upper()
is_tower_underscore = "TOWER_" in name_up
is_tower_i = "TOWER" in name_up and not is_tower_underscore
if not (is_tower_underscore or is_tower_i): continue
try:
port = item.get('conf', {}).get('remote_port')
token = get_106_dynamic_token(port)
if not token:
add_error_to_db("106网站", name, "Token获取失败")
continue
headers = {"Authorization": c["primary_auth"], "x-auth": token, "User-Agent": "Mozilla/5.0"}
api_root = "/api/resources/Data/" if is_tower_underscore else "/api/resources/data/"
res2 = requests.get(f"http://106.75.72.40:{port}{api_root}", headers=headers, timeout=10)
best_date = find_closest_item(res2.json().get('items', []), is_date_level=True)
if not best_date or best_date[2] != today_str:
add_error_to_db("106网站", name, "未找到今日文件夹", best_date[2] if best_date else "N/A")
continue
date_path = f"{api_root}{best_date[2]}/"
res3 = requests.get(f"http://106.75.72.40:{port}{date_path}", headers=headers, timeout=10)
best_file = find_closest_item(res3.json().get('items', []), is_date_level=False)
if not best_file:
add_error_to_db("106网站", name, "文件夹内无文件", best_date[2])
continue
file_item = best_file[1]
full_path = file_item.get('path') or f"{date_path}{file_item.get('name')}"
if is_tower_i:
# TowerI 模式:.bin 文件存入磁盘
raw_url = f"http://106.75.72.40:{port}/api/raw{full_path}"
res4 = requests.get(raw_url, headers=headers, timeout=20)
if res4.status_code == 200:
save_path = os.path.join(FRPS_DIR, f"{name}_{today_str}.bin")
with open(save_path, 'wb') as f:
f.write(res4.content)
add_error_to_db("106网站", name, "运行正常 (Bin已存盘)", today_str)
else:
# Tower_ 模式JSON 内容存入数据库
file_api_url = f"http://106.75.72.40:{port}/api/resources{full_path}"
res4 = requests.get(file_api_url, headers=headers, timeout=20)
file_json = res4.json()
raw_content = file_json.get('content', '') if file_json else None
if raw_content:
clean_content = process_text_content(raw_content)
add_error_to_db("106网站", name, "运行正常", today_str, content=clean_content)
else:
add_error_to_db("106网站", name, "JSON内容为空", best_date[2])
except Exception as e:
add_error_to_db("106网站", name, f"站点处理崩溃: {str(e)}")
except Exception as e:
add_error_to_db("106网站", "全局逻辑", str(e))
# --- 82 业务逻辑 ---
def run_82_logic():
c = CONFIG["82"]
session = requests.Session()
today_fmt = datetime.now().strftime("%Y-%m-%d")
try:
session.post(f"{c['base_url']}/login.php", data=c["login"], timeout=10)
resp = session.post(f"{c['base_url']}/GetStationList.php", timeout=10)
stations = etree.HTML(resp.content).xpath('//option/@value')
stations = [s for s in stations if s and str(s).strip()]
for sid in stations:
try:
r = session.post(f"{c['base_url']}/getLastWeatherData.php", data=str(sid),
headers={'Content-Type': 'text/plain'}, timeout=10)
data = r.json()
if not data:
add_error_to_db("82网站", sid, "返回 Null 数据")
continue
latest = str(data.get('date', ['N/A'])[-1])
status_msg = "当天已同步" if latest.startswith(today_fmt) else "数据滞后"
# 82网站数据通常直接存为文件备查记录入库
add_error_to_db("82网站", sid, status_msg, latest)
# 可选将82网站的完整JSON也存入content
# db.session.query(ErrorLog).filter_by(name=sid).update({"content": json.dumps(data, ensure_ascii=False)})
except Exception as e:
add_error_to_db("82网站", sid, f"请求异常: {str(e)}")
except Exception as e:
add_error_to_db("82网站", "初始化模块", str(e))
# --- 任务调度 ---
def background_worker():
global is_running
with app.app_context():
try:
# 1. 覆盖逻辑:清空旧数据
ErrorLog.query.delete()
db.session.commit()
# 2. 执行爬虫
run_106_logic()
run_82_logic()
# 3. 最终提交
db.session.commit()
except Exception as e:
print(f"后台任务出错: {e}")
finally:
is_running = False
# --- API 路由 ---
@app.route('/api/run', methods=['POST'])
def trigger_run():
global is_running
if is_running:
return jsonify({"message": "Task already running"}), 400
is_running = True
threading.Thread(target=background_worker).start()
return jsonify({"message": "Task started"})
@app.route('/api/status', methods=['GET'])
def get_status():
return jsonify({"is_running": is_running})
@app.route('/api/logs', methods=['GET'])
def get_logs():
logs = ErrorLog.query.order_by(ErrorLog.source.desc()).all()
return jsonify([{
"source": l.source,
"name": l.name,
"reason": l.reason,
"offset": l.offset,
"latest_time": l.latest_time,
"check_time": l.check_time,
"content": l.content
} for l in logs])
if __name__ == "__main__":
# 使用 5000 端口,请确保前端 Vite 配置了正确的 Proxy
app.run(host="0.0.0.0", port=5000, debug=True)