feat: implement permission checking and field-level data masking
Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
This commit is contained in:
@ -10,34 +10,32 @@ inbound_base_bp = Blueprint('stock_base', __name__)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# 辅助函数:获取当前用户的字段级权限列表(基于角色查询)
|
||||
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
|
||||
# ==============================================================================
|
||||
def get_current_field_permissions():
|
||||
def get_current_user_permissions():
|
||||
"""
|
||||
返回当前用户拥有的字段权限码列表(例如 ['id','companyName',...])
|
||||
超级管理员返回所有权限。
|
||||
此函数为示例实现,实际应根据项目权限模型完善。
|
||||
返回当前用户拥有的所有权限码列表(包括菜单和元素)
|
||||
此函数根据角色查询数据库得到权限。
|
||||
"""
|
||||
# TODO: 从 JWT 或数据库查询当前用户角色对应的权限码
|
||||
# 这里假设角色为 'admin'/'manager' 拥有全部字段权限,其他角色只有部分
|
||||
# 实际应替换为真实的权限查询逻辑
|
||||
from flask_jwt_extended import get_jwt
|
||||
from app.services.auth_service import AuthService
|
||||
claims = get_jwt()
|
||||
user_role = claims.get('role')
|
||||
if not user_role:
|
||||
return []
|
||||
# 超级管理员返回所有字段权限
|
||||
if user_role == 'super_admin':
|
||||
# 所有字段权限
|
||||
return ['id', 'companyName', 'name', 'commonName', 'category', 'type',
|
||||
'spec', 'unit', 'inventoryCount', 'availableCount', 'files', 'isEnabled']
|
||||
if user_role in ['admin', 'manager']:
|
||||
return ['id', 'companyName', 'name', 'commonName', 'category', 'type',
|
||||
'spec', 'unit', 'inventoryCount', 'availableCount', 'files', 'isEnabled']
|
||||
# 普通用户只有部分权限
|
||||
return ['name', 'spec', 'unit', 'inventoryCount', 'availableCount']
|
||||
perm_dict = AuthService.get_user_permissions(user_role)
|
||||
# 合并菜单和元素权限
|
||||
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
|
||||
return perms
|
||||
|
||||
|
||||
def filter_item_by_permissions(item_dict, field_permissions):
|
||||
def filter_item_by_permissions(item_dict, user_permissions):
|
||||
"""
|
||||
根据字段权限过滤 item 字典,无权限的字段值置为 None
|
||||
根据用户权限过滤 item 字典,无权限的字段值置为 None
|
||||
"""
|
||||
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
|
||||
field_to_perm = {
|
||||
@ -56,7 +54,7 @@ def filter_item_by_permissions(item_dict, field_permissions):
|
||||
'isEnabled': 'isEnabled'
|
||||
}
|
||||
for field, perm_code in field_to_perm.items():
|
||||
if field in item_dict and perm_code not in field_permissions:
|
||||
if field in item_dict and perm_code not in user_permissions:
|
||||
item_dict[field] = None
|
||||
return item_dict
|
||||
|
||||
@ -71,8 +69,8 @@ def search_base():
|
||||
keyword = request.args.get('keyword', '')
|
||||
data = MaterialBaseService.search_material(keyword)
|
||||
# 字段级脱敏
|
||||
field_perms = get_current_field_permissions()
|
||||
filtered_data = [filter_item_by_permissions(item, field_perms) for item in data]
|
||||
user_permissions = get_current_user_permissions()
|
||||
filtered_data = [filter_item_by_permissions(item, user_permissions) for item in data]
|
||||
return jsonify({"code": 200, "msg": "success", "data": filtered_data})
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
@ -100,9 +98,9 @@ def get_list():
|
||||
|
||||
result = MaterialBaseService.get_list(page, limit, filters)
|
||||
# 字段级脱敏
|
||||
field_perms = get_current_field_permissions()
|
||||
user_permissions = get_current_user_permissions()
|
||||
if result.get('items'):
|
||||
result['items'] = [filter_item_by_permissions(item, field_perms) for item in result['items']]
|
||||
result['items'] = [filter_item_by_permissions(item, user_permissions) for item in result['items']]
|
||||
return jsonify({"code": 200, "msg": "success", "data": result})
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
|
||||
@ -67,15 +67,20 @@ def permission_required(permission_code):
|
||||
if user_role == 'super_admin':
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
# TODO: 根据角色和 permission_code 查询数据库验证权限
|
||||
# 此处为示例逻辑:假设角色 'admin' 和 'manager' 拥有所有权限
|
||||
# 实际项目中应替换为真实的权限查询
|
||||
if user_role in ['admin', 'manager']:
|
||||
return fn(*args, **kwargs)
|
||||
# 根据角色查询数据库中的权限
|
||||
try:
|
||||
from app.services.auth_service import AuthService
|
||||
perm_dict = AuthService.get_user_permissions(user_role)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to fetch permissions for role {user_role}: {e}")
|
||||
return jsonify(msg='权限查询失败'), 403
|
||||
|
||||
# 其他角色暂时拒绝,并记录日志
|
||||
# 合并菜单和元素权限
|
||||
all_perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
|
||||
if permission_code not in all_perms:
|
||||
logging.warning(
|
||||
f'Permission check not implemented for {permission_code}, user role {user_role}. Access denied.')
|
||||
f'Permission check failed for {permission_code}, user role {user_role}.')
|
||||
return jsonify(msg='权限不足:您没有访问此资源的权限'), 403
|
||||
return fn(*args, **kwargs)
|
||||
return decorator
|
||||
return wrapper
|
||||
|
||||
Reference in New Issue
Block a user