Files
KCGL/inventory-backend/app/api/v1/audit.py

123 lines
4.1 KiB
Python

# inventory-backend/app/api/v1/audit.py
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import jwt_required, get_jwt
from app.models.audit import AuditLog
from app.extensions import db
from sqlalchemy import or_
from datetime import datetime
import json
audit_bp = Blueprint('audit', __name__)
@audit_bp.route('/logs', methods=['GET'])
@jwt_required()
def get_audit_logs():
"""获取审计日志列表(分页)"""
try:
# 分页参数
page = request.args.get('page', 1, type=int)
page_size = request.args.get('pageSize', 50, type=int)
# 筛选参数
username = request.args.get('username', '').strip()
module = request.args.get('module', '').strip()
action = request.args.get('action', '').strip()
target_id = request.args.get('target_id', '').strip()
start_date = request.args.get('start_date', '').strip()
end_date = request.args.get('end_date', '').strip()
# 构建查询
query = AuditLog.query
if username:
query = query.filter(AuditLog.username.like(f'%{username}%'))
if module:
query = query.filter(AuditLog.module == module)
if action:
query = query.filter(AuditLog.action == action)
if target_id:
query = query.filter(AuditLog.target_id == target_id)
if start_date:
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
query = query.filter(AuditLog.created_at >= start_dt)
except ValueError:
pass
if end_date:
try:
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
# 包含当天结束时间
from datetime import timedelta
end_dt = end_dt + timedelta(days=1)
query = query.filter(AuditLog.created_at < end_dt)
except ValueError:
pass
# 排序
query = query.order_by(AuditLog.created_at.desc())
# 分页
pagination = query.paginate(page=page, per_page=page_size, error_out=False)
logs = pagination.items
# 序列化
data = [log.to_dict() for log in logs]
# 获取可用的模块和操作类型(用于下拉选项)
modules = db.session.query(AuditLog.module).distinct().all()
modules = [m[0] for m in modules if m[0]]
actions = db.session.query(AuditLog.action).distinct().all()
actions = [a[0] for a in actions if a[0]]
return jsonify({
'code': 200,
'msg': '获取成功',
'data': {
'list': data,
'total': pagination.total,
'page': page,
'pageSize': page_size,
'modules': modules,
'actions': actions
}
}), 200
except Exception as e:
current_app.logger.error(f"获取审计日志失败: {str(e)}")
return jsonify({'code': 500, 'msg': f'服务器内部错误: {str(e)}'}), 500
@audit_bp.route('/logs/<int:log_id>', methods=['GET'])
@jwt_required()
def get_audit_log_detail(log_id):
"""获取单条审计日志详情"""
try:
log = AuditLog.query.get(log_id)
if not log:
return jsonify({'code': 404, 'msg': '日志不存在'}), 404
return jsonify({
'code': 200,
'msg': '获取成功',
'data': log.to_dict()
}), 200
except Exception as e:
current_app.logger.error(f"获取审计日志详情失败: {str(e)}")
return jsonify({'code': 500, 'msg': str(e)}), 500
@audit_bp.route('/modules', methods=['GET'])
@jwt_required()
def get_modules():
"""获取所有模块列表(用于筛选)"""
try:
modules = db.session.query(AuditLog.module).distinct().all()
modules = [m[0] for m in modules if m[0]]
return jsonify({'code': 200, 'data': modules}), 200
except Exception as e:
current_app.logger.error(f"获取模块列表失败: {str(e)}")
return jsonify({'code': 500, 'msg': str(e)}), 500