feat: implement RBAC and field masking for system_user module

Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
This commit is contained in:
dxc
2026-02-27 14:28:48 +08:00
parent 3f83e8742b
commit 6fa5233ea6
2 changed files with 103 additions and 15 deletions

View File

@ -2,10 +2,56 @@
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import jwt_required, get_jwt
from app.services.auth_service import AuthService
from app.utils.decorators import permission_required
auth_bp = Blueprint('auth', __name__)
# ==============================================================================
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
# ==============================================================================
def get_current_user_permissions():
"""
返回当前用户拥有的所有权限码列表(包括菜单和元素)
此函数根据角色查询数据库得到权限。
"""
claims = get_jwt()
user_role = claims.get('role')
if not user_role:
return []
# 超级管理员返回所有字段权限
if user_role == 'super_admin':
return ['system_user:*']
perm_dict = AuthService.get_user_permissions(user_role)
# 合并菜单和元素权限
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
return perms
def filter_item_by_permissions(item_dict, user_permissions):
"""
根据用户权限过滤 item 字典,无权限的字段值置为 None
"""
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
field_to_perm = {
'id': 'system_user:id',
'username': 'system_user:username',
'account_id': 'system_user:account_id',
'email': 'system_user:email',
'department': 'system_user:department',
'role': 'system_user:role',
'status': 'system_user:status',
'created_at': 'system_user:created_at',
}
# 如果用户是超级管理员且有 'system_user:*',则不过滤
if 'system_user:*' in user_permissions:
return item_dict
for field, perm_code in field_to_perm.items():
if field in item_dict and perm_code not in user_permissions:
item_dict[field] = None
return item_dict
@auth_bp.route('/login', methods=['POST'])
def login():
try:
@ -34,6 +80,7 @@ def login():
@auth_bp.route('/user/create', methods=['POST'])
@jwt_required()
@permission_required('system_user:operation')
def create_user():
try:
data = request.get_json()
@ -51,6 +98,7 @@ def create_user():
# [新增] 更新用户
@auth_bp.route('/user/<int:user_id>', methods=['PUT'])
@jwt_required()
@permission_required('system_user:operation')
def update_user(user_id):
try:
data = request.get_json()
@ -67,10 +115,14 @@ def update_user(user_id):
@auth_bp.route('/users', methods=['GET'])
@jwt_required()
@permission_required('system_user')
def get_users():
try:
users = AuthService.get_all_users()
return jsonify({'msg': '获取成功', 'data': users}), 200
# 字段级脱敏
user_permissions = get_current_user_permissions()
filtered_users = [filter_item_by_permissions(user, user_permissions) for user in users]
return jsonify({'msg': '获取成功', 'data': filtered_users}), 200
except Exception as e:
current_app.logger.error(f"Get Users Failed: {str(e)}")
return jsonify({'msg': '获取用户列表失败'}), 500
@ -78,8 +130,9 @@ def get_users():
@auth_bp.route('/user/<int:user_id>', methods=['DELETE'])
@jwt_required()
@permission_required('system_user:operation')
def delete_user(user_id):
try:
try {
claims = get_jwt()
operator_role = claims.get('role')
@ -104,4 +157,4 @@ def get_my_permissions():
return jsonify({'msg': '获取成功', 'data': permissions}), 200
except Exception as e:
current_app.logger.error(f"Get Permissions Failed: {str(e)}")
return jsonify({'msg': '获取权限失败'}), 500
return jsonify({'msg': '获取权限失败'}), 500