修改数据获取,确保json文件完整获取

This commit is contained in:
YueL1331
2026-01-08 14:26:34 +08:00
parent a5b0b71d26
commit a8984a156c
9 changed files with 304 additions and 479 deletions

View File

@ -1,9 +1,10 @@
# app.py
import os import os
import sys import sys
from flask import Flask, jsonify from flask import Flask
from flask_cors import CORS from extensions import db, cors # ✅ 从 extensions 导入
from models import db, Device, DeviceHistory, MaintenanceLog from models import Device, DeviceHistory, MaintenanceLog # 导入模型以便 SQLAlchemy 识别
from routes.api import api_bp # 从 api.py 导入蓝图 from routes.api import api_bp
# 解决 Windows 下控制台输出乱码问题 # 解决 Windows 下控制台输出乱码问题
sys.stdout.reconfigure(encoding='utf-8') sys.stdout.reconfigure(encoding='utf-8')
@ -12,43 +13,37 @@ sys.stdout.reconfigure(encoding='utf-8')
def create_app(): def create_app():
app = Flask(__name__) app = Flask(__name__)
# 1. 配置数据库路径 # 1. 配置路径
basedir = os.path.abspath(os.path.dirname(__file__)) basedir = os.path.abspath(os.path.dirname(__file__))
# 👇👇👇 核心修复:自动创建 instance 文件夹 👇👇👇
instance_path = os.path.join(basedir, 'instance') instance_path = os.path.join(basedir, 'instance')
if not os.path.exists(instance_path): if not os.path.exists(instance_path):
os.makedirs(instance_path) os.makedirs(instance_path)
print(f"📁 检测到目录不存在,已自动创建: {instance_path}") print(f"📁 检测到目录不存在,已自动创建: {instance_path}")
# 👆👆👆 修复结束 👆👆👆
db_path = os.path.join(instance_path, 'devices.db') db_path = os.path.join(instance_path, 'devices.db')
# 配置 SQLite URI # 配置
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}' app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['JSON_AS_ASCII'] = False # 支持中文返回 app.config['JSON_AS_ASCII'] = False
# 2. 初始化插件 # 2. 初始化插件 (使用 init_app 模式)
CORS(app) # 允许跨域 cors.init_app(app) #
db.init_app(app) db.init_app(app) # ✅
# 3. 注册蓝图 (Blueprints) # 3. 注册蓝图
# 注意api.py 里已经写了 url_prefix='/api',这里不要再写,否则变 /api/api/...
app.register_blueprint(api_bp) app.register_blueprint(api_bp)
# 4. 初始化数据库表 # 4. 初始化数据库表
with app.app_context(): with app.app_context():
# 尝试创建所有表
db.create_all() db.create_all()
# print(f"✅ 数据库连接成功: {db_path}")
return app return app
# 5. 提供 Flask Shell 上下文(方便命令行调试)
app = create_app() app = create_app()
@app.shell_context_processor @app.shell_context_processor
def make_shell_context(): def make_shell_context():
return { return {
@ -58,8 +53,6 @@ def make_shell_context():
'MaintenanceLog': MaintenanceLog 'MaintenanceLog': MaintenanceLog
} }
if __name__ == '__main__': if __name__ == '__main__':
# 启动应用
print("🚀 服务正在启动: http://127.0.0.1:5000") print("🚀 服务正在启动: http://127.0.0.1:5000")
app.run(debug=True, host='0.0.0.0', port=5000) app.run(debug=True, host='0.0.0.0', port=5000)

View File

@ -1,3 +1,4 @@
#extensions.py
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS from flask_cors import CORS
from flask_apscheduler import APScheduler from flask_apscheduler import APScheduler

Binary file not shown.

Binary file not shown.

View File

@ -1,14 +1,8 @@
# models.py
from datetime import datetime from datetime import datetime
import json # 用于在模型内部处理序列化(可选,主要在业务逻辑用) import json
from extensions import db from extensions import db # ✅ 必须从 extensions 导入
# -------------------------------------------------------------------------
# 1. 设备主表 (快照表)
# 作用:永远存储所有出现过的设备。
# 逻辑:如果网页上设备消失了,这里的记录不会删,只是不会更新时间,
# 这样你就能知道它“失联”了,但数据还在。
# -------------------------------------------------------------------------
class Device(db.Model): class Device(db.Model):
__tablename__ = 'devices' __tablename__ = 'devices'
@ -16,18 +10,16 @@ class Device(db.Model):
name = db.Column(db.String(100), unique=True, index=True) name = db.Column(db.String(100), unique=True, index=True)
source = db.Column(db.String(50)) source = db.Column(db.String(50))
# --- 快照字段 (用于首页列表) --- # 快照字段
status = db.Column(db.String(50)) status = db.Column(db.String(50))
current_value = db.Column(db.String(200)) # 提取出来的核心值(方便显示) current_value = db.Column(db.String(200))
latest_time = db.Column(db.String(50)) # 数据产生时间 latest_time = db.Column(db.String(50))
# 🔥🔥🔥 【核心新增】存储该设备完整的原始 JSON 数据字符串 🔥🔥🔥
# 这样无论爬虫爬到什么奇怪字段,都可以在这里找到
json_data = db.Column(db.Text) json_data = db.Column(db.Text)
check_time = db.Column(db.String(50)) # 系统最后一次检查的时间 check_time = db.Column(db.String(50))
reason = db.Column(db.String(255)) reason = db.Column(db.String(255))
offset = db.Column(db.String(50)) offset = db.Column(db.String(50))
install_site = db.Column(db.String(100), default="") install_site = db.Column(db.String(100), default="")
is_maintaining = db.Column(db.Boolean, default=False) is_maintaining = db.Column(db.Boolean, default=False)
is_hidden = db.Column(db.Boolean, default=False) is_hidden = db.Column(db.Boolean, default=False)
@ -35,10 +27,8 @@ class Device(db.Model):
history = db.relationship('DeviceHistory', backref='device', lazy='dynamic', cascade='all, delete-orphan') history = db.relationship('DeviceHistory', backref='device', lazy='dynamic', cascade='all, delete-orphan')
def to_dict(self): def to_dict(self):
"""转字典,供前端 API 使用""" # 兼容处理 API 返回
api_status = 'offline' if self.status in ['离线', '异常', '已离线'] else 'online' api_status = 'offline' if self.status in ['离线', '异常', '已离线'] else 'online'
# 尝试解析 JSON 字符串返回给前端对象,如果解析失败则返回原字符串
raw_obj = None raw_obj = None
if self.json_data: if self.json_data:
try: try:
@ -51,10 +41,10 @@ class Device(db.Model):
'name': self.name, 'name': self.name,
'source': self.source, 'source': self.source,
'latest_time': self.latest_time, 'latest_time': self.latest_time,
'status': api_status, 'status': api_status, # 前端用这个判断颜色
'status_text': self.status, 'status_text': self.status, # 显示文本
'value': self.current_value, 'value': self.current_value,
'raw_json': raw_obj, # 🔥 前端可以看到原始数据 'raw_json': raw_obj,
'reason': self.reason, 'reason': self.reason,
'install_site': self.install_site, 'install_site': self.install_site,
'is_maintaining': self.is_maintaining, 'is_maintaining': self.is_maintaining,
@ -63,30 +53,21 @@ class Device(db.Model):
} }
# -------------------------------------------------------------------------
# 2. 历史记录表 (流水账/日志表)
# 作用:无限追加,记录每一次抓取的数据。
# -------------------------------------------------------------------------
class DeviceHistory(db.Model): class DeviceHistory(db.Model):
__tablename__ = 'device_history' __tablename__ = 'device_history'
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.Integer, db.ForeignKey('devices.id')) device_id = db.Column(db.Integer, db.ForeignKey('devices.id'))
data_time = db.Column(db.String(50)) # 网站上的时间 data_time = db.Column(db.String(50))
status = db.Column(db.String(50)) # 当时状态 status = db.Column(db.String(50))
result_data = db.Column(db.String(200), default="") # 提取值 result_data = db.Column(db.String(200), default="")
# 🔥🔥🔥 【核心新增】每一次历史记录,都保留当时的原始 JSON 包 🔥🔥🔥
json_data = db.Column(db.Text) json_data = db.Column(db.Text)
file_path = db.Column(db.String(255))
file_path = db.Column(db.String(255), default="") # 关联的归档文件路径
recorded_at = db.Column(db.DateTime, default=datetime.now) recorded_at = db.Column(db.DateTime, default=datetime.now)
# -------------------------------------------------------------------------
# 3. 维修日志表 (无变化)
# -------------------------------------------------------------------------
class MaintenanceLog(db.Model): class MaintenanceLog(db.Model):
__tablename__ = 'maintenance_log' __tablename__ = 'maintenance_log'
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)

View File

@ -1,233 +1,215 @@
# routes/api.py
import os import os
import shutil import shutil
import json # 👈 必需:用于序列化原始数据 import json
from flask import Blueprint, jsonify, request from flask import Blueprint, jsonify, request
from datetime import datetime from datetime import datetime
from models import db, Device, DeviceHistory, MaintenanceLog from extensions import db # ✅ 从 extensions 导入 db 用于提交事务
from models import Device, DeviceHistory, MaintenanceLog
# 尝试导入爬虫模块,如果没有则跳过(防止报错)
try: try:
from services.core import execute_monitor_task from services.core import execute_monitor_task
except ImportError: except ImportError:
execute_monitor_task = None execute_monitor_task = None
# ✅ 定义蓝图,设置统一前缀 /api
api_bp = Blueprint('api', __name__, url_prefix='/api') api_bp = Blueprint('api', __name__, url_prefix='/api')
# ========================================================================= def calculate_offset(latest_time_str):
# 常规接口 (隐藏、总览、地点、维修) """计算时间差的辅助函数"""
# ========================================================================= if not latest_time_str or latest_time_str == "N/A": return "从未同步"
@api_bp.route('/toggle_hidden', methods=['POST'])
def toggle_hidden():
try: try:
data = request.json clean = str(latest_time_str).split()[0].replace('_', '-')
device = Device.query.filter_by(name=data.get('name')).first() if len(clean) < 8: return latest_time_str
if device: target = datetime.strptime(clean, "%Y-%m-%d").date()
device.is_hidden = data.get('is_hidden', False) diff = (datetime.now().date() - target).days
db.session.commit() return "当天已同步" if diff == 0 else f"滞后 {diff}"
return jsonify({'message': '状态更新成功'}), 200 except:
return jsonify({'error': '设备不存在'}), 404 return "时间解析失败"
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
# 👇👇👇 修复核心:补全 devices_overview 接口 👇👇👇
@api_bp.route('/devices_overview', methods=['GET']) @api_bp.route('/devices_overview', methods=['GET'])
def devices_overview(): def devices_overview():
try: try:
# 直接读取 Device 表快照 # 获取所有设备
devices = Device.query.all() devices = Device.query.all()
return jsonify({'data': [d.to_dict() for d in devices]}) # 转换为字典列表
except Exception as e: data_list = [d.to_dict() for d in devices]
return jsonify({'error': str(e)}), 500 return jsonify({'code': 200, 'data': data_list})
@api_bp.route('/update_site', methods=['POST'])
def update_site():
try:
data = request.json
record = Device.query.filter_by(name=data.get('name')).first()
if record:
record.install_site = data.get('site')
db.session.commit()
return jsonify({'code': 200, 'message': '更新成功'})
return jsonify({'code': 404, 'message': '设备不存在'})
except Exception as e: except Exception as e:
print(f"Error in devices_overview: {e}")
return jsonify({'code': 500, 'message': str(e)}) return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/toggle_maintenance', methods=['POST'])
def toggle_maintenance():
try:
data = request.json
record = Device.query.filter_by(name=data.get('name')).first()
if record:
record.is_maintaining = data.get('is_maintaining', False)
db.session.commit()
return jsonify({'code': 200, 'message': '状态更新成功'})
return jsonify({'code': 404, 'message': '设备不存在'})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/add', methods=['POST'])
def add_log():
try:
data = request.json
new_log = MaintenanceLog(device_name=data.get('device_name'), content=data.get('content'))
db.session.add(new_log)
db.session.commit()
return jsonify({'code': 200, 'message': '日志已保存'})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
# =========================================================================
# 🔥 核心功能区
# =========================================================================
@api_bp.route('/device_history', methods=['GET'])
def get_device_history():
"""获取单个设备的历史,包含原始 JSON 数据"""
try:
name = request.args.get('name')
if not name:
return jsonify({'error': 'Missing name parameter'}), 400
device = Device.query.filter_by(name=name).first()
if not device:
return jsonify({'error': 'Device not found'}), 404
history = DeviceHistory.query.filter_by(device_id=device.id) \
.order_by(DeviceHistory.recorded_at.desc()) \
.limit(100).all()
history_data = []
for h in history:
rec_time = h.recorded_at.strftime('%Y-%m-%d %H:%M:%S') if h.recorded_at else 'N/A'
# 🔥 将数据库里存的 JSON 字符串转回对象,发给前端
raw_obj = None
if h.json_data:
try:
raw_obj = json.loads(h.json_data)
except:
raw_obj = h.json_data
history_data.append({
'data_time': h.data_time,
'status': h.status,
'value': h.result_data,
'raw_data': raw_obj, # 🔥 前端可查看详细原始数据
'file_path': h.file_path,
'recorded_at': rec_time
})
return jsonify({
'device': device.name,
'history': history_data
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/run_monitor', methods=['POST']) @api_bp.route('/run_monitor', methods=['POST'])
def run_monitor(): def run_monitor():
"""
🔥 真实爬虫逻辑:
1. 归档文件
2. 存入 Device 表(更新快照,若设备消失则保留旧快照)
3. 存入 DeviceHistory 表(追加历史,保存 Raw JSON
"""
try: try:
print(">>> 启动真实监测任务...")
if not execute_monitor_task: if not execute_monitor_task:
return jsonify({'code': 500, 'message': 'execute_monitor_task missing'}) return jsonify({'code': 500, 'message': 'Core module missing'})
# 1. 准备目录 # 1. 准备归档目录
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
binary_dir = os.path.join(base_dir, 'instance', 'binary') binary_dir = os.path.join(base_dir, 'instance', 'binary')
if not os.path.exists(binary_dir): if not os.path.exists(binary_dir): os.makedirs(binary_dir)
os.makedirs(binary_dir)
# 2. 调用爬虫 # 2. 获取数据 (列表)
task_result = execute_monitor_task() task_result = execute_monitor_task()
if not task_result: if not task_result: return jsonify({'code': 200, 'message': '任务跳过(正在运行)'})
return jsonify({'code': 500, 'message': '爬虫未返回数据'})
scraped_data_list = task_result.get('device_list', []) scraped_list = task_result.get('device_list', [])
target_time_str = task_result.get('target_time', datetime.now().strftime("%Y-%m-%d %H:%M:%S")) current_check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
temp_file_path = task_result.get('temp_file_path', None)
print(f"✅ 获取到 {len(scraped_data_list)} 条数据,时间: {target_time_str}") # 3. 处理每一条数据
for item in scraped_list:
# 3. 文件归档
db_rel_path = ""
safe_filename = "data_unknown.db"
if temp_file_path and os.path.exists(temp_file_path):
safe_filename = target_time_str.replace(' ', '_').replace(':', '-') + ".db"
final_file_path = os.path.join(binary_dir, safe_filename)
shutil.move(temp_file_path, final_file_path)
db_rel_path = f"binary/{safe_filename}"
print(f"✅ 文件已归档: {final_file_path}")
# 4. 数据库写入 (双表写入)
for item in scraped_data_list:
d_name = item.get('name') d_name = item.get('name')
if not d_name: continue d_source = item.get('source')
d_status = item.get('status')
d_value = item.get('value')
d_time = item.get('target_time')
d_raw_json = item.get('raw_json', {})
d_temp_file = item.get('temp_file')
d_status = item.get('status', 'unknown') # --- 文件归档 ---
d_value = item.get('value', '') final_db_rel_path = ""
d_site = item.get('site', '') if d_temp_file and os.path.exists(d_temp_file):
# 移动文件: instance/temp -> instance/binary
fname = os.path.basename(d_temp_file)
dest = os.path.join(binary_dir, fname)
try:
shutil.move(d_temp_file, dest)
final_db_rel_path = f"binary/{fname}"
except Exception as e:
print(f"File move error: {e}")
# 🔥 序列化:把整个字典转成 JSON 字符串 # --- 序列化 Raw JSON ---
# ensure_ascii=False 确保中文可以正常显示,而不是 \uXXXX json_str = json.dumps(d_raw_json, ensure_ascii=False)
raw_json_str = json.dumps(item, ensure_ascii=False)
# ----------------------------------------------------------- # --- A. 更新 Device (快照) ---
# 表 A: Device (快照)
# 逻辑:如果设备存在,就更新它的“最新状态”;如果不存在,就新建。
# 关键点如果爬虫这次没爬到“设备X”这里就不会执行“设备X”的数据就会保持在上次的状态。
# 这就完美解决了“网页上消失但我要展示”的需求。
# -----------------------------------------------------------
device = Device.query.filter_by(name=d_name).first() device = Device.query.filter_by(name=d_name).first()
if not device: if not device:
device = Device(name=d_name, source='Auto') device = Device(name=d_name, source=d_source)
db.session.add(device) db.session.add(device)
db.session.flush() # ID db.session.flush() # 获取 ID
device.status = d_status device.status = d_status
device.current_value = d_value device.current_value = d_value
device.latest_time = target_time_str # 数据的产生时间 device.latest_time = d_time
device.check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 系统检查时间 device.check_time = current_check_time
device.json_data = raw_json_str # 🔥 更新快照里的原始数据 device.json_data = json_str
device.offset = calculate_offset(d_time)
if d_site: # --- B. 插入 History (日志) ---
device.install_site = d_site
# -----------------------------------------------------------
# 表 B: DeviceHistory (日志)
# 逻辑:不管有没有,永远追加一条新记录。
# -----------------------------------------------------------
new_history = DeviceHistory( new_history = DeviceHistory(
device_id=device.id, device_id=device.id,
status=d_status, status=d_status,
result_data=d_value, result_data=d_value,
data_time=target_time_str, data_time=d_time,
json_data=raw_json_str, # 🔥 存入历史原始数据 json_data=json_str,
file_path=db_rel_path file_path=final_db_rel_path
) )
db.session.add(new_history) db.session.add(new_history)
db.session.commit() db.session.commit()
return jsonify({'code': 200, 'message': f'更新 {len(scraped_list)} 台设备'})
return jsonify({
'code': 200,
'message': f'检测完成,已归档 {safe_filename},更新 {len(scraped_data_list)} 台设备'
})
except Exception as e: except Exception as e:
db.session.rollback() db.session.rollback()
print(f"Monitor Error: {e}")
return jsonify({'code': 500, 'message': str(e)}) return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/device_history', methods=['GET'])
def get_device_history():
try:
name = request.args.get('name')
device = Device.query.filter_by(name=name).first()
if not device: return jsonify({'error': 'Not found'}), 404
history = DeviceHistory.query.filter_by(device_id=device.id) \
.order_by(DeviceHistory.recorded_at.desc()).limit(100).all()
data = []
for h in history:
raw = None
if h.json_data:
try:
raw = json.loads(h.json_data)
except:
raw = h.json_data
data.append({
'recorded_at': h.recorded_at.strftime('%Y-%m-%d %H:%M:%S'),
'data_time': h.data_time,
'status': h.status,
'value': h.result_data,
'file_path': h.file_path,
'raw_data': raw
})
return jsonify({'device': device.name, 'history': data})
except Exception as e:
return jsonify({'error': str(e)}), 500
# 👇👇👇 以下是 Vue 前端用到的其他接口,之前缺失的 👇👇👇
@api_bp.route('/update_site', methods=['POST'])
def update_site():
"""更新安装地点"""
data = request.get_json()
name = data.get('name')
site = data.get('site')
device = Device.query.filter_by(name=name).first()
if device:
device.install_site = site
db.session.commit()
return jsonify({'code': 200, 'message': 'Updated'})
return jsonify({'code': 404, 'message': 'Device not found'}), 404
@api_bp.route('/toggle_maintenance', methods=['POST'])
def toggle_maintenance():
"""切换维修状态"""
data = request.get_json()
name = data.get('name')
is_maintaining = data.get('is_maintaining')
device = Device.query.filter_by(name=name).first()
if device:
device.is_maintaining = is_maintaining
db.session.commit()
return jsonify({'code': 200, 'message': 'Updated'})
return jsonify({'code': 404, 'message': 'Device not found'}), 404
@api_bp.route('/toggle_hidden', methods=['POST'])
def toggle_hidden():
"""切换隐藏/回收站状态"""
data = request.get_json()
name = data.get('name')
is_hidden = data.get('is_hidden')
device = Device.query.filter_by(name=name).first()
if device:
device.is_hidden = is_hidden
db.session.commit()
return jsonify({'code': 200, 'message': 'Updated'})
return jsonify({'code': 404, 'message': 'Device not found'}), 404
@api_bp.route('/logs/add', methods=['POST'])
def add_log():
"""添加维修日志"""
data = request.get_json()
device_name = data.get('device_name')
content = data.get('content')
if not device_name or not content:
return jsonify({'code': 400, 'message': 'Missing data'}), 400
new_log = MaintenanceLog(device_name=device_name, content=content)
db.session.add(new_log)
db.session.commit()
return jsonify({'code': 200, 'message': 'Log added'})

View File

@ -1,135 +1,37 @@
# services/core.py
import logging import logging
from datetime import datetime
import threading import threading
from extensions import db
# 引入新的模型
from models import Device, DeviceHistory
# 引入爬虫逻辑 (保持相对导入不变)
from .crawler_106 import run_106_logic from .crawler_106 import run_106_logic
from .crawler_82 import run_82_logic from .crawler_82 import run_82_logic
task_lock = threading.Lock() task_lock = threading.Lock()
def calculate_offset(latest_time_str):
"""计算时间滞后天数 (保持原有逻辑)"""
if not latest_time_str or latest_time_str == "N/A":
return "从未同步"
try:
clean_date_str = str(latest_time_str).split()[0].replace('_', '-')
target_date = datetime.strptime(clean_date_str, "%Y-%m-%d").date()
diff = (datetime.now().date() - target_date).days
if diff == 0: return "当天已同步"
return f"滞后 {diff}"
except:
return "时间解析失败"
def save_record_to_db(source, name, status, reason, latest_time="N/A", content=None):
"""
智能存储逻辑:
1. 确保 Device 主表存在
2. 仅当 latest_time 发生变化时,才写入 DeviceHistory
"""
try:
# 1. 查询或创建主设备 (Device)
device = Device.query.filter_by(name=name).first()
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_offset = calculate_offset(latest_time)
is_new_data = False
if not device:
# === 新设备发现 ===
device = Device(
name=name,
source=source,
install_site="", # 默认空
is_maintaining=False,
is_hidden=False
)
db.session.add(device)
# 需要 flush 这里的 add以便后面生成 ID 存历史,但为了性能可以最后统一 commit
# 这里标记为新数据,强制存一条历史
is_new_data = True
logging.info(f"发现新设备: {name}")
else:
# === 旧设备 ===
# 判断核心逻辑:如果网站上的 latest_time 变了,说明有新数据
if latest_time != "N/A" and device.latest_time != latest_time:
is_new_data = True
# 如果网站没抓到时间(N/A),但我们库里有旧时间,我们需要更新 offset (如昨天滞后1天今天变滞后2天)
if latest_time == "N/A" and device.latest_time:
current_offset = calculate_offset(device.latest_time)
# 2. 更新主表快照信息 (无论是否有新数据,都要更新最后检查时间和状态)
device.check_time = now_str
device.status = status
device.reason = reason
device.offset = current_offset
# 只有抓到有效时间才更新主表的显示时间
if latest_time != "N/A":
device.latest_time = latest_time
# 3. 如果是新数据,写入历史表 (节省空间的核心)
if is_new_data and latest_time != "N/A":
# 先 commit 确保 device.id 存在
db.session.flush()
history = DeviceHistory(
device_id=device.id,
data_time=latest_time,
status=status
)
db.session.add(history)
logging.info(f"[{name}] 数据更新: {latest_time} -> 存入历史")
db.session.commit()
return f"{source}_{name}"
except Exception as e:
db.session.rollback()
logging.error(f"DB Error [{name}]: {e}")
return None
def execute_monitor_task(): def execute_monitor_task():
"""执行所有爬虫任务的主入口""" """
执行所有爬虫,返回一个大列表:
{'device_list': [item1, item2...], 'target_time': '...'}
"""
if task_lock.locked(): if task_lock.locked():
logging.warning(">>> 任务正在运行中,跳过本次调度") logging.warning(">>> 任务正在运行中,跳过")
return return None
with task_lock: with task_lock:
logging.info(">>> 开始执行监控任务...") logging.info(">>> 开始执行监控任务...")
active_set = set()
# 1. 运行爬虫 (传递新的 save_record_to_db) # 1. 获取 106 数据列表
run_106_logic(active_set, save_record_to_db) list_106 = run_106_logic()
run_82_logic(active_set, save_record_to_db)
# 2. 处理离线设备 (仅更新主表状态,不增加历史垃圾数据) # 2. 获取 82 数据列表
try: list_82 = run_82_logic()
# 查询所有未被隐藏且不在维修中的设备
all_devices = Device.query.all()
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for dev in all_devices: # 3. 合并
key = f"{dev.source}_{dev.name}" combined_list = list_106 + list_82
# 如果设备在维修中,或者刚才爬到了,就跳过 logging.info(f">>> 任务完成,共获取 {len(combined_list)} 条数据")
if dev.is_maintaining or (key in active_set):
continue
# 没爬到 -> 标记为离线 return {
dev.status = "已离线" 'device_list': combined_list,
dev.reason = "设备本次扫描未响应" 'target_time': None, # 具体时间已在 item 里
dev.check_time = now_str 'temp_file_path': None # 废弃旧逻辑,文件路径已在 item 里
# 注意:这里我们只改状态,不往 History 插数据,防止离线时疯狂增加重复记录 }
db.session.commit()
except Exception as e:
db.session.rollback()
logging.error(f"离线状态更新失败: {e}")
logging.info(">>> 监控任务完成。")

View File

@ -1,199 +1,159 @@
# services/crawler_106.py
import os
import requests import requests
import logging import logging
from datetime import datetime from datetime import datetime
from config import Config from config import Config
# 读取配置
CONFIG = Config.CRAWLER_CONFIG["106"] CONFIG = Config.CRAWLER_CONFIG["106"]
def get_today_str(): def get_temp_dir():
return datetime.now().strftime("%Y_%m_%d") base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
temp_dir = os.path.join(base_dir, 'instance', 'temp')
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
return temp_dir
def get_106_dynamic_token(port): def get_106_dynamic_token(port):
"""
为指定端口的站点执行登录,获取最新的 x-auth token
严格对应参考代码逻辑
"""
try: try:
login_url = f"http://106.75.72.40:{port}/api/login" login_url = f"http://106.75.72.40:{port}/api/login"
# 使用 Config 中的 login_payload
resp = requests.post(login_url, json=CONFIG["login_payload"], timeout=10) resp = requests.post(login_url, json=CONFIG["login_payload"], timeout=10)
return resp.text.strip().replace('"', '') if resp.status_code == 200 else None
if resp.status_code == 200: except:
# 登录成功后token 通常直接返回在响应体中
return resp.text.strip().replace('"', '')
else:
return None
except Exception as e:
return None return None
def find_closest_item(items, is_date_level=True): def find_closest_item(items, is_date_level=True):
"""
查找最新的日期文件夹或文件
逻辑完全复用参考代码
"""
if not items or not isinstance(items, list): return None if not items or not isinstance(items, list): return None
today = datetime.now() today = datetime.now()
scored_items = [] scored_items = []
for item in items: for item in items:
name_val = item.get('name', '') name_val = item.get('name', '')
path_val = item.get('path', '') path_val = item.get('path', '')
target_str = name_val if name_val else path_val.split('/')[-1] target_str = name_val if name_val else path_val.split('/')[-1]
try: try:
if is_date_level: if is_date_level:
# 匹配文件夹日期格式 YYYY_MM_DD
current_date = datetime.strptime(target_str, "%Y_%m_%d") current_date = datetime.strptime(target_str, "%Y_%m_%d")
else: else:
# 匹配文件修改时间
mod_str = item.get('modified', '') mod_str = item.get('modified', '')
# 处理 ISO 时间格式
current_date = datetime.fromisoformat(mod_str.replace('Z', '+00:00')) current_date = datetime.fromisoformat(mod_str.replace('Z', '+00:00'))
diff = abs((today - current_date.replace(tzinfo=None)).total_seconds()) diff = abs((today - current_date.replace(tzinfo=None)).total_seconds())
scored_items.append((diff, item, target_str)) scored_items.append((diff, item, target_str))
except: except:
continue continue
if not scored_items: return None if not scored_items: return None
# 按时间差排序,取最小的
scored_items.sort(key=lambda x: x[0]) scored_items.sort(key=lambda x: x[0])
return scored_items[0] return scored_items[0]
def run_106_logic(active_set, save_callback): def run_106_logic():
""" """返回 result_list, 每个元素是一个字典"""
106 爬虫主逻辑 results = []
active_set: 用于记录扫描到的设备key
save_callback: 存库回调函数
"""
print(">>> [106爬虫] 启动...") print(">>> [106爬虫] 启动...")
today_str = get_today_str() today_str = datetime.now().strftime("%Y_%m_%d")
# 全局 Auth 用于获取列表
main_headers = {"Authorization": CONFIG["primary_auth"], "User-Agent": "Mozilla/5.0"} main_headers = {"Authorization": CONFIG["primary_auth"], "User-Agent": "Mozilla/5.0"}
try: try:
# 获取代理列表
resp = requests.get(CONFIG["base_url"], headers=main_headers, timeout=20) resp = requests.get(CONFIG["base_url"], headers=main_headers, timeout=20)
proxies = resp.json().get('proxies', []) proxies = resp.json().get('proxies', [])
for item in proxies: for item in proxies:
name = item.get('name', '') name = item.get('name', '')
if not name.lower().endswith('_data'): continue
# --- 1. 严格过滤逻辑 (复用参考代码) ---
if not name.lower().endswith('_data'):
continue
name_upper = name.upper() name_upper = name.upper()
is_tower_underscore = "TOWER_" in name_upper is_tower_underscore = "TOWER_" in name_upper
is_tower_i = "TOWER" in name_upper and not is_tower_underscore is_tower_i = "TOWER" in name_upper and not is_tower_underscore
if not (is_tower_underscore or is_tower_i): continue
# 如果既不是 TOWER_ 也不是 TOWER (TowerI),则跳过 # 构建基础数据包
if not (is_tower_underscore or is_tower_i): data_packet = {
continue 'source': '106网站',
'name': name,
'status': '正常',
'value': '',
'target_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'raw_json': {},
'temp_file': None
}
# --- 2. 检查在线状态 ---
if str(item.get('status')).lower() != 'online': if str(item.get('status')).lower() != 'online':
key = save_callback("106网站", name, "离线", f"设备状态: {item.get('status')}") data_packet['status'] = '离线'
if key: active_set.add(key) data_packet['value'] = f"状态: {item.get('status')}"
results.append(data_packet)
continue continue
try: try:
# --- 3. 获取端口和 Token ---
port = item.get('conf', {}).get('remote_port') port = item.get('conf', {}).get('remote_port')
if not port: continue
token = get_106_dynamic_token(port) token = get_106_dynamic_token(port)
if not token: if not token:
key = save_callback("106网站", name, "异常", "Token获取失败") data_packet['status'] = '异常'
if key: active_set.add(key) data_packet['value'] = "Token获取失败"
results.append(data_packet)
continue continue
# 构造当前站点的 Headers headers = {"Authorization": CONFIG["primary_auth"], "x-auth": token}
headers = {
"Authorization": CONFIG["primary_auth"],
"x-auth": token,
"User-Agent": "Mozilla/5.0"
}
# --- 4. 路径区分逻辑 (核心差异) ---
# Tower_ 使用大写 DataTowerI 使用小写 data
api_root = "/api/resources/Data/" if is_tower_underscore else "/api/resources/data/" api_root = "/api/resources/Data/" if is_tower_underscore else "/api/resources/data/"
# Step A: 获取根目录列表
res1 = requests.get(f"http://106.75.72.40:{port}{api_root}", headers=headers, timeout=10) res1 = requests.get(f"http://106.75.72.40:{port}{api_root}", headers=headers, timeout=10)
items1 = res1.json().get('items', []) best_date = find_closest_item(res1.json().get('items', []), True)
# Step B: 寻找今日文件夹
best_date = find_closest_item(items1, is_date_level=True)
# 校验日期是否匹配
if not best_date or best_date[2] != today_str: if not best_date or best_date[2] != today_str:
key = save_callback("106网站", name, "正常", "未找到今日文件夹", data_packet['value'] = "未找到今日文件夹"
latest_time=best_date[2] if best_date else "N/A") data_packet['target_time'] = best_date[2] if best_date else "N/A"
if key: active_set.add(key) results.append(data_packet)
continue continue
# Step C: 进入日期文件夹 data_packet['target_time'] = best_date[2] # 实际数据时间
date_path = f"{api_root}{best_date[2]}/" date_path = f"{api_root}{best_date[2]}/"
res2 = requests.get(f"http://106.75.72.40:{port}{date_path}", headers=headers, timeout=10) res2 = requests.get(f"http://106.75.72.40:{port}{date_path}", headers=headers, timeout=10)
items2 = res2.json().get('items', []) best_file = find_closest_item(res2.json().get('items', []), False)
# Step D: 寻找最新文件
best_file = find_closest_item(items2, is_date_level=False)
if not best_file: if not best_file:
key = save_callback("106网站", name, "正常", "今日文件夹为空", latest_time=today_str) data_packet['value'] = "今日文件夹为空"
if key: active_set.add(key) results.append(data_packet)
continue continue
# 获取文件完整路径
file_item = best_file[1] file_item = best_file[1]
full_path = file_item.get('path') full_path = file_item.get('path') or f"{date_path}{file_item.get('name')}"
if not full_path:
full_path = f"{date_path}{file_item.get('name')}"
# --- 5. 下载内容 (根据类型区分接口) ---
final_content = ""
# 核心逻辑:获取内容
if is_tower_i: if is_tower_i:
# [TowerI 模式] 使用 /api/raw 接口获取二进制 # 下载二进制文件
download_url = f"http://106.75.72.40:{port}/api/raw{full_path}" download_url = f"http://106.75.72.40:{port}/api/raw{full_path}"
res3 = requests.get(download_url, headers=headers, timeout=20, stream=True) res3 = requests.get(download_url, headers=headers, timeout=20, stream=True)
if res3.status_code == 200: if res3.status_code == 200:
# 数据库存不下二进制,存个描述信息 safe_name = f"{name}_{datetime.now().strftime('%H%M%S')}.db"
size_bytes = len(res3.content) temp_path = os.path.join(get_temp_dir(), safe_name)
final_content = f"[Binary Data] 成功获取,大小: {size_bytes} 字节" with open(temp_path, 'wb') as f:
f.write(res3.content)
data_packet['temp_file'] = temp_path # 🔥 传递给API
data_packet['value'] = f"Binary Downloaded: {len(res3.content)} bytes"
data_packet['raw_json'] = file_item # 用文件属性充当RawData
else: else:
raise Exception(f"二进制下载失败 Code: {res3.status_code}") data_packet['status'] = '异常'
data_packet['value'] = f"下载失败: {res3.status_code}"
else: else:
# [Tower_ 模式] 使用 /api/resources 接口获取 JSON content # JSON 内容
file_api_url = f"http://106.75.72.40:{port}/api/resources{full_path}" file_api_url = f"http://106.75.72.40:{port}/api/resources{full_path}"
res3 = requests.get(file_api_url, headers=headers, timeout=20) res3 = requests.get(file_api_url, headers=headers, timeout=20)
try: try:
# 尝试获取 JSON 里的 content 字段 json_content = res3.json()
final_content = res3.json().get('content', '') data_packet['raw_json'] = json_content # 🔥 完整保存
if not final_content: data_packet['value'] = json_content.get('content', '')
final_content = "[Warning] JSON返回内容为空"
except: except:
final_content = "[Error] 无法解析JSON内容" data_packet['value'] = "JSON解析失败"
# --- 6. 最终入库 --- results.append(data_packet)
key = save_callback("106网站", name, "正常", "同步成功",
latest_time=today_str, content=final_content)
if key: active_set.add(key)
except Exception as e: except Exception as e:
# 捕获单台设备的异常,防止中断循环 data_packet['status'] = '异常'
err_msg = str(e)[:100] # 截断错误信息防止太长 data_packet['value'] = str(e)[:50]
key = save_callback("106网站", name, "异常", f"采集错误: {err_msg}") results.append(data_packet)
if key: active_set.add(key)
except Exception as e: except Exception as e:
logging.error(f"106 Crawler Global Error: {e}") logging.error(f"106 Crawler Error: {e}")
return results

View File

@ -1,56 +1,62 @@
# services/crawler_82.py
import requests import requests
import json import json
import logging import logging
from lxml import etree from lxml import etree
from config import Config from config import Config
from datetime import datetime
# 读取配置
CONFIG = Config.CRAWLER_CONFIG["82"] CONFIG = Config.CRAWLER_CONFIG["82"]
def run_82_logic(active_set, save_callback): def run_82_logic():
session = requests.Session() """返回 result_list"""
results = []
print(">>> [82爬虫] 启动...") print(">>> [82爬虫] 启动...")
session = requests.Session()
try: try:
# 1. 登录
session.post(f"{CONFIG['base_url']}/login.php", data=CONFIG["login"], timeout=10) session.post(f"{CONFIG['base_url']}/login.php", data=CONFIG["login"], timeout=10)
# 2. 获取列表
resp = session.post(f"{CONFIG['base_url']}/GetStationList.php", timeout=10) resp = session.post(f"{CONFIG['base_url']}/GetStationList.php", timeout=10)
# 使用 lxml 解析
html = etree.HTML(resp.content) html = etree.HTML(resp.content)
if html is None: if html is None: return []
print(">>> [82爬虫] 解析页面失败")
return
stations = html.xpath('//option/@value') stations = html.xpath('//option/@value')
for sid in [s for s in stations if s]: for sid in [s for s in stations if s]:
data_packet = {
'source': '82网站',
'name': str(sid),
'status': '正常',
'value': '',
'target_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'raw_json': {},
'temp_file': None
}
try: try:
# 3. 获取单个设备数据
r = session.post(f"{CONFIG['base_url']}/getLastWeatherData.php", data=str(sid), r = session.post(f"{CONFIG['base_url']}/getLastWeatherData.php", data=str(sid),
headers={'Content-Type': 'text/plain'}, timeout=10) headers={'Content-Type': 'text/plain'}, timeout=10)
# 尝试解析 JSON
try: try:
data = r.json() data = r.json()
except ValueError: except:
data = None data = None
if data: if data:
d_list = data.get('date', []) d_list = data.get('date', [])
latest = str(d_list[-1]) if d_list else "N/A" latest = str(d_list[-1]) if d_list else "N/A"
# 保存数据 data_packet['target_time'] = latest
key = save_callback("82网站", sid, "正常", "同步成功", latest_time=latest, data_packet['value'] = f"Data Points: {len(d_list)}"
content=json.dumps(data, ensure_ascii=False)) data_packet['raw_json'] = data # 🔥 存完整JSON
if key: active_set.add(key)
else: else:
key = save_callback("82网站", sid, "异常", "返回空数据") data_packet['status'] = '异常'
if key: active_set.add(key) data_packet['value'] = "返回空数据"
except Exception as e: except Exception as e:
# 单个设备失败不影响整体 data_packet['status'] = '异常'
key = save_callback("82网站", sid, "异常", "单个采集失败") data_packet['value'] = "单个采集失败"
if key: active_set.add(key)
results.append(data_packet)
except Exception as e: except Exception as e:
logging.error(f"82 Crawler Error: {e}") logging.error(f"82 Crawler Error: {e}")
return results