feat: 新增企业级操作审计日志闭环模块(包含底层模型、记录装饰器与前端看板)

This commit is contained in:
DXC
2026-03-10 12:15:26 +08:00
parent 525acae423
commit be6575344a
6 changed files with 588 additions and 3 deletions

View File

@ -130,8 +130,20 @@ def create_app():
except ImportError as e:
print(f"❌ 错误: Permission 模块导入失败 (请检查 app/api/v1/permission.py 是否存在): {e}")
# 2.8 注册审计日志模块 (Audit)
# -----------------------------------------------------
# 2.8 注册库位管理模块 (Warehouse)
try:
from app.api.v1.audit import audit_bp
# 标准: /api/v1/audit/logs
app.register_blueprint(audit_bp, url_prefix='/api/v1/audit')
# 兼容: /api/audit/logs
app.register_blueprint(audit_bp, url_prefix='/api/audit', name='audit_legacy')
print("✅ Audit 模块注册成功")
except ImportError as e:
print(f"❌ 错误: Audit 模块导入失败: {e}")
# -----------------------------------------------------
# 2.9 注册库位管理模块 (Warehouse)
# -----------------------------------------------------
try:
from app.api.v1.warehouse import warehouse_bp

View File

@ -0,0 +1,122 @@
# inventory-backend/app/api/v1/audit.py
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import jwt_required, get_jwt
from app.models.audit import AuditLog
from app.extensions import db
from sqlalchemy import or_
from datetime import datetime
import json
audit_bp = Blueprint('audit', __name__)
@audit_bp.route('/logs', methods=['GET'])
@jwt_required()
def get_audit_logs():
"""获取审计日志列表(分页)"""
try:
# 分页参数
page = request.args.get('page', 1, type=int)
page_size = request.args.get('pageSize', 50, type=int)
# 筛选参数
username = request.args.get('username', '').strip()
module = request.args.get('module', '').strip()
action = request.args.get('action', '').strip()
target_id = request.args.get('target_id', '').strip()
start_date = request.args.get('start_date', '').strip()
end_date = request.args.get('end_date', '').strip()
# 构建查询
query = AuditLog.query
if username:
query = query.filter(AuditLog.username.like(f'%{username}%'))
if module:
query = query.filter(AuditLog.module == module)
if action:
query = query.filter(AuditLog.action == action)
if target_id:
query = query.filter(AuditLog.target_id == target_id)
if start_date:
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
query = query.filter(AuditLog.created_at >= start_dt)
except ValueError:
pass
if end_date:
try:
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
# 包含当天结束时间
from datetime import timedelta
end_dt = end_dt + timedelta(days=1)
query = query.filter(AuditLog.created_at < end_dt)
except ValueError:
pass
# 排序
query = query.order_by(AuditLog.created_at.desc())
# 分页
pagination = query.paginate(page=page, per_page=page_size, error_out=False)
logs = pagination.items
# 序列化
data = [log.to_dict() for log in logs]
# 获取可用的模块和操作类型(用于下拉选项)
modules = db.session.query(AuditLog.module).distinct().all()
modules = [m[0] for m in modules if m[0]]
actions = db.session.query(AuditLog.action).distinct().all()
actions = [a[0] for a in actions if a[0]]
return jsonify({
'code': 200,
'msg': '获取成功',
'data': {
'list': data,
'total': pagination.total,
'page': page,
'pageSize': page_size,
'modules': modules,
'actions': actions
}
}), 200
except Exception as e:
current_app.logger.error(f"获取审计日志失败: {str(e)}")
return jsonify({'code': 500, 'msg': f'服务器内部错误: {str(e)}'}), 500
@audit_bp.route('/logs/<int:log_id>', methods=['GET'])
@jwt_required()
def get_audit_log_detail(log_id):
"""获取单条审计日志详情"""
try:
log = AuditLog.query.get(log_id)
if not log:
return jsonify({'code': 404, 'msg': '日志不存在'}), 404
return jsonify({
'code': 200,
'msg': '获取成功',
'data': log.to_dict()
}), 200
except Exception as e:
current_app.logger.error(f"获取审计日志详情失败: {str(e)}")
return jsonify({'code': 500, 'msg': str(e)}), 500
@audit_bp.route('/modules', methods=['GET'])
@jwt_required()
def get_modules():
"""获取所有模块列表(用于筛选)"""
try:
modules = db.session.query(AuditLog.module).distinct().all()
modules = [m[0] for m in modules if m[0]]
return jsonify({'code': 200, 'data': modules}), 200
except Exception as e:
current_app.logger.error(f"获取模块列表失败: {str(e)}")
return jsonify({'code': 500, 'msg': str(e)}), 500

View File

@ -0,0 +1,47 @@
# inventory-backend/app/models/audit.py
from app.extensions import db
from datetime import datetime
class AuditLog(db.Model):
"""
操作审计日志表
记录所有关键业务操作
"""
__tablename__ = 'audit_logs'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, nullable=True, index=True) # 操作人IDsys_user.id
username = db.Column(db.String(100), nullable=False, index=True) # 操作人账号
display_name = db.Column(db.String(100)) # 操作人显示名称
action = db.Column(db.String(50), nullable=False, index=True) # 操作类型: create/update/delete/export 等
module = db.Column(db.String(50), nullable=False, index=True) # 业务模块: inbound_buy/inbound_semi/bom/user 等
target_id = db.Column(db.String(100), index=True) # 被操作的数据ID
target_name = db.Column(db.String(200)) # 被操作数据的显示名称
details = db.Column(db.JSON) # 详细变更内容 {old: {}, new: {}}
ip_address = db.Column(db.String(50)) # 操作IP
user_agent = db.Column(db.String(500)) # 浏览器UA
method = db.Column(db.String(10)) # HTTP方法
url = db.Column(db.String(500)) # 请求URL
status_code = db.Column(db.Integer) # 响应状态码
error_message = db.Column(db.Text) # 错误信息(如有)
created_at = db.Column(db.DateTime, default=datetime.now, index=True) # 操作时间
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'username': self.username,
'display_name': self.display_name,
'action': self.action,
'module': self.module,
'target_id': self.target_id,
'target_name': self.target_name,
'details': self.details,
'ip_address': self.ip_address,
'method': self.method,
'url': self.url,
'status_code': self.status_code,
'error_message': self.error_message,
'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None
}

View File

@ -1,8 +1,9 @@
# app/utils/decorators.py
from functools import wraps
from flask_jwt_extended import get_jwt, verify_jwt_in_request
from flask import jsonify, g
from flask_jwt_extended import get_jwt, verify_jwt_in_request, get_jwt_identity
from flask import jsonify, g, request
import logging
import json
def role_required(*roles):
@ -87,3 +88,104 @@ def permission_required(permission_code):
return fn(*args, **kwargs)
return decorator
return wrapper
def audit_log(module: str, action: str = None, get_target_id_fn=None, get_target_name_fn=None, get_details_fn=None):
"""
审计日志装饰器
用法: @audit_log(module='inbound_buy', action='create')
@audit_log(module='bom', action='update', get_target_id_fn=lambda: ..., get_details_fn=lambda req, resp: ...)
"""
def wrapper(fn):
@wraps(fn)
def decorator(*args, **kwargs):
# 获取请求上下文
claims = get_jwt()
user_id = get_jwt_identity()
username = claims.get('username', '')
display_name = claims.get('display_name', '')
# 获取IP
ip_address = request.headers.get('X-Forwarded-For') or request.remote_addr or ''
if ip_address and ',' in ip_address:
ip_address = ip_address.split(',')[0].strip()
# 获取请求信息
http_method = request.method
url = request.url
user_agent = request.headers.get('User-Agent', '')[:500]
# 解析 action支持动态
final_action = action
if callable(action):
final_action = action()
# 执行原函数
response = fn(*args, **kwargs)
# 只记录成功的请求(响应状态码 200/201
status_code = 200
if hasattr(response, 'status_code'):
status_code = response.status_code
if status_code in [200, 201]:
try:
from app.models.audit import AuditLog
from app.extensions import db
from flask import current_app
# 获取 target_id
target_id = None
if get_target_id_fn:
try:
target_id = get_target_id_fn()
except Exception:
pass
if not target_id and hasattr(response, 'json'):
resp_data = response.get_json()
if resp_data and isinstance(resp_data, dict):
target_id = resp_data.get('id')
# 获取 target_name
target_name = None
if get_target_name_fn:
try:
target_name = get_target_name_fn()
except Exception:
pass
# 获取 details
details = None
if get_details_fn:
try:
details = get_details_fn(request, response)
except Exception:
pass
# 保存日志
log_entry = AuditLog(
user_id=user_id,
username=username,
display_name=display_name,
action=final_action or http_method.lower(),
module=module,
target_id=str(target_id) if target_id else None,
target_name=target_name,
details=details,
ip_address=ip_address,
user_agent=user_agent,
method=http_method,
url=url,
status_code=status_code
)
db.session.add(log_entry)
db.session.commit()
except Exception as e:
current_app.logger.error(f"审计日志记录失败: {str(e)}")
db.session.rollback()
return response
return decorator
return wrapper