Files
ZDXX/1.1/test1.py
2026-01-06 15:37:21 +08:00

151 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import threading
import requests
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from lxml import etree
app = Flask(__name__)
CORS(app)
# --- 数据库配置 ---
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///monitor_data.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class ErrorLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
source = db.Column(db.String(50))
name = db.Column(db.String(100))
reason = db.Column(db.String(255))
offset = db.Column(db.String(50))
latest_time = db.Column(db.String(50))
check_time = db.Column(db.String(50))
content = db.Column(db.Text, nullable=True)
with app.app_context():
db.create_all()
CONFIG = {
"106": {
"base_url": "http://106.75.72.40:7500/api/proxy/tcp",
"primary_auth": "Basic YWRtaW46bGljYWhr",
"login_payload": {"username": "admin", "password": "licahk", "recaptcha": ""}
},
"82": {
"base_url": "http://82.156.1.111/weather/php",
"login": {'username': 'renlixin', 'password': 'licahk', 'login': '123'}
}
}
is_running = False
def add_error_to_db(source, name, reason, latest_time="N/A", content=None):
days_diff = "N/A"
if latest_time and latest_time != "N/A":
try:
clean_date_str = str(latest_time).split()[0].replace('_', '-')
target_date = datetime.strptime(clean_date_str, "%Y-%m-%d").date()
diff = (datetime.now().date() - target_date).days
days_diff = f"滞后 {diff}" if diff > 0 else "当天已同步"
except:
days_diff = "解析失败"
log = ErrorLog(
source=source, name=name, reason=reason, offset=days_diff,
latest_time=latest_time, content=content,
check_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
db.session.add(log)
# --- 106 逻辑 ---
def get_106_token(port):
try:
resp = requests.post(f"http://106.75.72.40:{port}/api/login", json=CONFIG["106"]["login_payload"], timeout=5)
return resp.text.strip().replace('"', '') if resp.status_code == 200 else None
except:
return None
def run_106_logic():
c = CONFIG["106"]
today_str = datetime.now().strftime("%Y_%m_%d")
try:
resp = requests.get(c["base_url"], headers={"Authorization": c["primary_auth"]}, timeout=10)
for item in resp.json().get('proxies', []):
name = item.get('name', '')
if not name.lower().endswith('_data'): continue
if str(item.get('status')).lower() != 'online':
add_error_to_db("106网站", name, f"离线({item.get('status')})")
continue
# 此处应包含 106 具体的 JSON 内容爬取逻辑,拿到 content 字符串
# 示例add_error_to_db("106网站", name, "运行正常", today_str, content=raw_json_str)
add_error_to_db("106网站", name, "同步成功", today_str, content='{"info": "106数据报文示例"}')
except Exception as e:
add_error_to_db("106网站", "全局错误", str(e))
# --- 82 逻辑 ---
def run_82_logic():
c = CONFIG["82"]
session = requests.Session()
today_fmt = datetime.now().strftime("%Y-%m-%d")
try:
session.post(f"{c['base_url']}/login.php", data=c["login"], timeout=10)
resp = session.post(f"{c['base_url']}/GetStationList.php", timeout=10)
stations = etree.HTML(resp.content).xpath('//option/@value')
for sid in [s for s in stations if s]:
try:
r = session.post(f"{c['base_url']}/getLastWeatherData.php", data=str(sid),
headers={'Content-Type': 'text/plain'}, timeout=10)
data = r.json()
if data:
d_list = data.get('date', [])
latest = str(d_list[-1]) if d_list else "N/A"
add_error_to_db("82网站", sid, "同步成功", latest, content=json.dumps(data, ensure_ascii=False))
except:
add_error_to_db("82网站", sid, "采集异常")
except Exception as e:
add_error_to_db("82网站", "初始化错误", str(e))
def background_task():
global is_running
with app.app_context():
ErrorLog.query.delete()
run_106_logic()
run_82_logic()
db.session.commit()
is_running = False
@app.route('/api/run', methods=['POST'])
def start():
global is_running
if is_running: return jsonify({"status": "busy"}), 400
is_running = True
threading.Thread(target=background_task).start()
return jsonify({"status": "started"})
@app.route('/api/status')
def status(): return jsonify({"is_running": is_running})
@app.route('/api/logs')
def logs():
data = ErrorLog.query.all()
return jsonify([{"source": l.source, "name": l.name, "reason": l.reason, "offset": l.offset,
"latest_time": l.latest_time, "check_time": l.check_time, "content": l.content} for l in data])
#
if __name__ == "__main__":
app.run(debug=True, port=5000)