Files
ZDXX/2.1版本/routes/api.py
2026-01-08 17:41:56 +08:00

283 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import shutil
import json
import re # [新增] 引入正则模块用于解析路径
from datetime import datetime
from flask import Blueprint, jsonify, request
from sqlalchemy import desc, or_
from extensions import db
from models import Device, DeviceHistory, MaintenanceLog
# 尝试导入爬虫模块
try:
from services.core import execute_monitor_task
except ImportError:
execute_monitor_task = None
api_bp = Blueprint('api', __name__, url_prefix='/api')
# =======================
# 1. 设备概览与详情接口
# =======================
@api_bp.route('/devices_overview', methods=['GET'])
def devices_overview():
try:
devices = Device.query.all()
data_list = [d.to_dict() for d in devices]
return jsonify({'code': 200, 'data': data_list})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/device_data_by_date', methods=['GET'])
def device_data_by_date():
name = request.args.get('name')
date_str = request.args.get('date') # 前端传来的格式通常是 YYYY-MM-DD
if not name or not date_str:
return jsonify({'code': 400, 'message': 'Missing name or date'}), 400
device = Device.query.filter_by(name=name).first()
if not device:
return jsonify({'code': 404, 'message': 'Device not found'}), 404
content = None
# 1. 查历史表
# 注意:如果数据是通过 run_monitor 经过处理保存的data_time 应该是标准的 YYYY-MM-DD 格式
history_record = DeviceHistory.query.filter(
DeviceHistory.device_id == device.id,
DeviceHistory.data_time.like(f"{date_str}%")
).order_by(desc(DeviceHistory.id)).first()
if history_record:
content = history_record.json_data
# 2. 查当前状态 (如果历史表没查到,且当前状态的时间也匹配)
elif device.latest_time and device.latest_time.startswith(date_str):
content = device.json_data
if content:
return jsonify({
'code': 200,
'name': device.name,
'source': device.source,
'content': content
})
else:
return jsonify({'code': 404, 'message': 'No data for this date'}), 404
# =======================
# 2. 维修日志接口
# =======================
@api_bp.route('/logs/list', methods=['GET'])
def get_logs():
keyword = request.args.get('keyword', '')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
query = MaintenanceLog.query
if keyword:
kw = f"%{keyword}%"
query = query.filter(or_(
MaintenanceLog.device_name.like(kw),
MaintenanceLog.engineer.like(kw),
MaintenanceLog.location.like(kw),
MaintenanceLog.content.like(kw)
))
if start_date and end_date:
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
query = query.filter(MaintenanceLog.timestamp.between(start_dt, end_dt))
except ValueError:
pass
logs = query.order_by(MaintenanceLog.timestamp.desc()).all()
return jsonify({'code': 200, 'data': [l.to_dict() for l in logs]})
@api_bp.route('/logs/add', methods=['POST'])
def add_log():
data = request.get_json()
try:
new_log = MaintenanceLog(
device_name=data.get('device_name', '未知设备'),
engineer=data.get('engineer', ''),
location=data.get('location', ''),
content=data.get('content', '')
)
db.session.add(new_log)
db.session.commit()
return jsonify({'code': 200, 'message': 'Log saved'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/update', methods=['POST'])
def update_log():
data = request.get_json()
log_id = data.get('id')
if not log_id:
return jsonify({'code': 400, 'message': 'Missing Log ID'}), 400
log = MaintenanceLog.query.get(log_id)
if not log:
return jsonify({'code': 404, 'message': 'Log not found'}), 404
try:
log.device_name = data.get('device_name', log.device_name)
log.engineer = data.get('engineer', log.engineer)
log.location = data.get('location', log.location)
log.content = data.get('content', log.content)
db.session.commit()
return jsonify({'code': 200, 'message': 'Log updated'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/logs/delete', methods=['POST'])
def delete_log():
data = request.get_json()
log = MaintenanceLog.query.get(data.get('id'))
if log:
db.session.delete(log)
db.session.commit()
return jsonify({'code': 200, 'message': 'Deleted'})
return jsonify({'code': 404, 'message': 'Not found'}), 404
# =======================
# 3. 辅助与控制接口 (核心修改逻辑在 run_monitor)
# =======================
def calculate_offset(latest_time_str):
if not latest_time_str or latest_time_str == "N/A": return "从未同步"
try:
# 处理可能包含 _ 或 - 的日期
clean = str(latest_time_str).split()[0].replace('_', '-')
if len(clean) < 8: return latest_time_str
target = datetime.strptime(clean, "%Y-%m-%d").date()
diff = (datetime.now().date() - target).days
return "当天已同步" if diff == 0 else f"滞后 {diff}"
except:
return "时间解析失败"
@api_bp.route('/run_monitor', methods=['POST'])
def run_monitor():
try:
if not execute_monitor_task:
return jsonify({'code': 500, 'message': 'Core module missing'})
task_result = execute_monitor_task()
if not task_result: return jsonify({'code': 200, 'message': '任务跳过'})
scraped_list = task_result.get('device_list', [])
current_check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count = 0
for item in scraped_list:
d_name = item.get('name')
if not d_name: continue
d_raw = item.get('raw_json', {})
source = item.get('source', '')
target_time = item.get('target_time')
# ================= [核心修改部分 START] =================
# 针对 106 网站的数据,从 path 中解析标准时间
# 格式示例: "path": "/Data/2026_01_08/xiwuzhu_16_29_28.csv"
# 目标格式: "2026-01-08 16:29:28"
if '106' in str(source):
try:
path_str = d_raw.get('path', '')
# 正则匹配日期 (YYYY_MM_DD) 和 时间 (HH_MM_SS)
# 解释:/Data/(年_月_日)/任意字符_(时_分_秒).csv
match = re.search(r'/Data/(\d{4}_\d{2}_\d{2})/\w+_(\d{2}_\d{2}_\d{2})\.csv', path_str)
if match:
date_part = match.group(1).replace('_', '-') # 2026_01_08 -> 2026-01-08
time_part = match.group(2).replace('_', ':') # 16_29_28 -> 16:29:28
extracted_time = f"{date_part} {time_part}"
# 覆盖爬虫原本获取的可能不准确的时间
target_time = extracted_time
item['target_time'] = extracted_time
except Exception as parse_err:
print(f"Error parsing 106 path: {parse_err}")
# ================= [核心修改部分 END] =================
json_str = json.dumps(d_raw, ensure_ascii=False) if isinstance(d_raw, (dict, list)) else str(d_raw)
device = Device.query.filter_by(name=d_name).first()
if not device:
device = Device(name=d_name, source=source)
db.session.add(device)
db.session.flush() # 获取ID
device.status = item.get('status')
device.current_value = item.get('value')
device.latest_time = target_time # 使用解析后的时间
device.check_time = current_check_time
device.json_data = json_str
device.offset = calculate_offset(target_time)
new_history = DeviceHistory(
device_id=device.id,
status=item.get('status'),
result_data=item.get('value'),
data_time=target_time, # 存入标准时间,方便查询
json_data=json_str
)
db.session.add(new_history)
count += 1
db.session.commit()
return jsonify({'code': 200, 'message': f'更新 {count} 台设备'})
except Exception as e:
db.session.rollback()
return jsonify({'code': 500, 'message': str(e)})
@api_bp.route('/update_site', methods=['POST'])
def update_site():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.install_site = data.get('site')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
@api_bp.route('/toggle_maintenance', methods=['POST'])
def toggle_maintenance():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.is_maintaining = data.get('is_maintaining')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404
@api_bp.route('/toggle_hidden', methods=['POST'])
def toggle_hidden():
data = request.get_json()
device = Device.query.filter_by(name=data.get('name')).first()
if device:
device.is_hidden = data.get('is_hidden')
db.session.commit()
return jsonify({'code': 200})
return jsonify({'code': 404}), 404