Files
KCGL/inventory-backend/app/api/v1/inbound/base.py
dxc 73ee163352 feat: add MaterialBase permission control with field-level filtering
Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
2026-02-27 10:16:43 +08:00

213 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 文件路径: app/api/v1/inbound/base.py
from flask import Blueprint, request, jsonify, send_file, g
from app.services.inbound.base_service import MaterialBaseService
from app.utils.decorators import login_required, permission_required
import traceback
import datetime
inbound_base_bp = Blueprint('stock_base', __name__)
# ==============================================================================
# 辅助函数:获取当前用户的字段级权限列表(基于角色查询)
# ==============================================================================
def get_current_field_permissions():
"""
返回当前用户拥有的字段权限码列表(例如 ['id','companyName',...]
超级管理员返回所有权限。
此函数为示例实现,实际应根据项目权限模型完善。
"""
# TODO: 从 JWT 或数据库查询当前用户角色对应的权限码
# 这里假设角色为 'admin'/'manager' 拥有全部字段权限,其他角色只有部分
# 实际应替换为真实的权限查询逻辑
from flask_jwt_extended import get_jwt
claims = get_jwt()
user_role = claims.get('role')
if user_role == 'super_admin':
# 所有字段权限
return ['id', 'companyName', 'name', 'commonName', 'category', 'type',
'spec', 'unit', 'inventoryCount', 'availableCount', 'files', 'isEnabled']
if user_role in ['admin', 'manager']:
return ['id', 'companyName', 'name', 'commonName', 'category', 'type',
'spec', 'unit', 'inventoryCount', 'availableCount', 'files', 'isEnabled']
# 普通用户只有部分权限
return ['name', 'spec', 'unit', 'inventoryCount', 'availableCount']
def filter_item_by_permissions(item_dict, field_permissions):
"""
根据字段权限过滤 item 字典,无权限的字段值置为 None
"""
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
field_to_perm = {
'id': 'id',
'companyName': 'companyName',
'name': 'name',
'commonName': 'commonName',
'category': 'category',
'type': 'type',
'spec': 'spec',
'unit': 'unit',
'inventoryCount': 'inventoryCount',
'availableCount': 'availableCount',
'generalManual': 'files',
'generalImage': 'files',
'isEnabled': 'isEnabled'
}
for field, perm_code in field_to_perm.items():
if field in item_dict and perm_code not in field_permissions:
item_dict[field] = None
return item_dict
# ==============================================================================
# 1. 搜索接口 (GET /api/v1/inbound/base/search)
# ==============================================================================
@inbound_base_bp.route('/search', methods=['GET'])
@permission_required('material:base:read')
def search_base():
try:
keyword = request.args.get('keyword', '')
data = MaterialBaseService.search_material(keyword)
# 字段级脱敏
field_perms = get_current_field_permissions()
filtered_data = [filter_item_by_permissions(item, field_perms) for item in data]
return jsonify({"code": 200, "msg": "success", "data": filtered_data})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ==============================================================================
# 2. 列表接口 (GET /api/v1/inbound/base/list)
# ==============================================================================
@inbound_base_bp.route('/list', methods=['GET'])
@permission_required('material:base:read')
def get_list():
try:
page = request.args.get('pageNum', 1, type=int)
limit = request.args.get('pageSize', 10, type=int)
# 构造筛选条件
filters = {
'keyword': request.args.get('keyword', ''),
'company': request.args.get('company', ''),
'category': request.args.get('category', ''),
'type': request.args.get('type', ''),
'isEnabled': request.args.get('isEnabled', None)
}
result = MaterialBaseService.get_list(page, limit, filters)
# 字段级脱敏
field_perms = get_current_field_permissions()
if result.get('items'):
result['items'] = [filter_item_by_permissions(item, field_perms) for item in result['items']]
return jsonify({"code": 200, "msg": "success", "data": result})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ==============================================================================
# 2.1 选项接口 (GET /api/v1/inbound/base/options)
# ==============================================================================
@inbound_base_bp.route('/options', methods=['GET'])
@permission_required('material:base:read')
def get_options():
try:
data = MaterialBaseService.get_distinct_options()
return jsonify({"code": 200, "msg": "success", "data": data})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ==============================================================================
# 2.2 导出接口 (GET /api/v1/inbound/base/export)
# ==============================================================================
@inbound_base_bp.route('/export', methods=['GET'])
@permission_required('material:base:read')
def export_data():
try:
# 获取筛选条件
filters = {
'keyword': request.args.get('keyword', ''),
'company': request.args.get('company', ''),
'category': request.args.get('category', ''),
'type': request.args.get('type', ''),
'isEnabled': request.args.get('isEnabled', None)
}
# 生成 Excel 文件流
file_stream = MaterialBaseService.export_excel(filters)
# 生成文件名:库存统计+年月日+时分秒 (北京时间 UTC+8)
# 简单处理UTC时间 + 8小时
beijing_time = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
filename = f"库存统计_{beijing_time.strftime('%Y%m%d_%H%M%S')}.xlsx"
# 发送文件
# 注意download_name 仅在较新 Flask 版本有效,旧版本可能需要手动 header
# 但通常浏览器下载名由前端 Blob 处理或 Content-Disposition 决定。
return send_file(
file_stream,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": f"导出失败: {str(e)}"}), 500
# ==============================================================================
# 3. 新增接口 (POST /api/v1/inbound/base/)
# ==============================================================================
@inbound_base_bp.route('/', methods=['POST'])
@permission_required('material:base:write')
def create():
try:
data = request.get_json()
if not data:
return jsonify({"code": 400, "msg": "No data provided"}), 400
MaterialBaseService.create_material(data)
return jsonify({"code": 200, "msg": "新增成功"})
except ValueError as e:
# 捕获业务逻辑验证错误 (如名称为空)
return jsonify({"code": 400, "msg": str(e)}), 400
except Exception as e:
# 捕获系统错误
traceback.print_exc()
return jsonify({"code": 500, "msg": f"系统错误: {str(e)}"}), 500
# ==============================================================================
# 4. 修改接口 (PUT /api/v1/inbound/base/<id>)
# ==============================================================================
@inbound_base_bp.route('/<int:id>', methods=['PUT'])
@permission_required('material:base:write')
def update(id):
try:
data = request.get_json()
MaterialBaseService.update_material(id, data)
return jsonify({"code": 200, "msg": "修改成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ==============================================================================
# 5. 删除接口 (DELETE /api/v1/inbound/base/<id>)
# ==============================================================================
@inbound_base_bp.route('/<int:id>', methods=['DELETE'])
@permission_required('material:base:delete')
def delete(id):
try:
MaterialBaseService.delete_material(id)
return jsonify({"code": 200, "msg": "删除成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500