Files
KCGL/inventory-backend/app/api/v1/inbound/base.py
2026-02-27 10:43:32 +08:00

211 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 文件路径: app/api/v1/inbound/base.py
from flask import Blueprint, request, jsonify, send_file, g
from app.services.inbound.base_service import MaterialBaseService
from app.utils.decorators import login_required, permission_required
import traceback
import datetime
inbound_base_bp = Blueprint('stock_base', __name__)
# ==============================================================================
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
# ==============================================================================
def get_current_user_permissions():
"""
返回当前用户拥有的所有权限码列表(包括菜单和元素)
此函数根据角色查询数据库得到权限。
"""
from flask_jwt_extended import get_jwt
from app.services.auth_service import AuthService
claims = get_jwt()
user_role = claims.get('role')
if not user_role:
return []
# 超级管理员返回所有字段权限
if user_role == 'super_admin':
return ['material_list:id', 'material_list:companyName', 'material_list:name', 'material_list:commonName', 'material_list:category', 'material_list:type',
'material_list:spec', 'material_list:unit', 'material_list:inventoryCount', 'material_list:availableCount', 'material_list:files', 'material_list:isEnabled']
perm_dict = AuthService.get_user_permissions(user_role)
# 合并菜单和元素权限
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
return perms
def filter_item_by_permissions(item_dict, user_permissions):
"""
根据用户权限过滤 item 字典,无权限的字段值置为 None
"""
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
field_to_perm = {
'id': 'material_list:id',
'companyName': 'material_list:companyName',
'name': 'material_list:name',
'commonName': 'material_list:commonName',
'category': 'material_list:category',
'type': 'material_list:type',
'spec': 'material_list:spec',
'unit': 'material_list:unit',
'inventoryCount': 'material_list:inventoryCount',
'availableCount': 'material_list:availableCount',
'generalManual': 'material_list:files',
'generalImage': 'material_list:files',
'isEnabled': 'material_list:isEnabled'
}
for field, perm_code in field_to_perm.items():
if field in item_dict and perm_code not in user_permissions:
item_dict[field] = None
return item_dict
# ==============================================================================
# 1. 搜索接口 (GET /api/v1/inbound/base/search)
# ==============================================================================
@inbound_base_bp.route('/search', methods=['GET'])
@permission_required('material_list')
def search_base():
try:
keyword = request.args.get('keyword', '')
data = MaterialBaseService.search_material(keyword)
# 字段级脱敏
user_permissions = get_current_user_permissions()
filtered_data = [filter_item_by_permissions(item, user_permissions) for item in data]
return jsonify({"code": 200, "msg": "success", "data": filtered_data})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ==============================================================================
# 2. 列表接口 (GET /api/v1/inbound/base/list)
# ==============================================================================
@inbound_base_bp.route('/list', methods=['GET'])
@permission_required('material_list')
def get_list():
try:
page = request.args.get('pageNum', 1, type=int)
limit = request.args.get('pageSize', 10, type=int)
# 构造筛选条件
filters = {
'keyword': request.args.get('keyword', ''),
'company': request.args.get('company', ''),
'category': request.args.get('category', ''),
'type': request.args.get('type', ''),
'isEnabled': request.args.get('isEnabled', None)
}
result = MaterialBaseService.get_list(page, limit, filters)
# 字段级脱敏
user_permissions = get_current_user_permissions()
if result.get('items'):
result['items'] = [filter_item_by_permissions(item, user_permissions) for item in result['items']]
return jsonify({"code": 200, "msg": "success", "data": result})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ==============================================================================
# 2.1 选项接口 (GET /api/v1/inbound/base/options)
# ==============================================================================
@inbound_base_bp.route('/options', methods=['GET'])
@permission_required('material_list')
def get_options():
try:
data = MaterialBaseService.get_distinct_options()
return jsonify({"code": 200, "msg": "success", "data": data})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ==============================================================================
# 2.2 导出接口 (GET /api/v1/inbound/base/export)
# ==============================================================================
@inbound_base_bp.route('/export', methods=['GET'])
@permission_required('material_list')
def export_data():
try:
# 获取筛选条件
filters = {
'keyword': request.args.get('keyword', ''),
'company': request.args.get('company', ''),
'category': request.args.get('category', ''),
'type': request.args.get('type', ''),
'isEnabled': request.args.get('isEnabled', None)
}
# 生成 Excel 文件流
file_stream = MaterialBaseService.export_excel(filters)
# 生成文件名:库存统计+年月日+时分秒 (北京时间 UTC+8)
# 简单处理UTC时间 + 8小时
beijing_time = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
filename = f"库存统计_{beijing_time.strftime('%Y%m%d_%H%M%S')}.xlsx"
# 发送文件
# 注意download_name 仅在较新 Flask 版本有效,旧版本可能需要手动 header
# 但通常浏览器下载名由前端 Blob 处理或 Content-Disposition 决定。
return send_file(
file_stream,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": f"导出失败: {str(e)}"}), 500
# ==============================================================================
# 3. 新增接口 (POST /api/v1/inbound/base/)
# ==============================================================================
@inbound_base_bp.route('/', methods=['POST'])
@permission_required('material_list:operation')
def create():
try:
data = request.get_json()
if not data:
return jsonify({"code": 400, "msg": "No data provided"}), 400
MaterialBaseService.create_material(data)
return jsonify({"code": 200, "msg": "新增成功"})
except ValueError as e:
# 捕获业务逻辑验证错误 (如名称为空)
return jsonify({"code": 400, "msg": str(e)}), 400
except Exception as e:
# 捕获系统错误
traceback.print_exc()
return jsonify({"code": 500, "msg": f"系统错误: {str(e)}"}), 500
# ==============================================================================
# 4. 修改接口 (PUT /api/v1/inbound/base/<id>)
# ==============================================================================
@inbound_base_bp.route('/<int:id>', methods=['PUT'])
@permission_required('material_list:operation')
def update(id):
try:
data = request.get_json()
MaterialBaseService.update_material(id, data)
return jsonify({"code": 200, "msg": "修改成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ==============================================================================
# 5. 删除接口 (DELETE /api/v1/inbound/base/<id>)
# ==============================================================================
@inbound_base_bp.route('/<int:id>', methods=['DELETE'])
@permission_required('material_list:operation')
def delete(id):
try:
MaterialBaseService.delete_material(id)
return jsonify({"code": 200, "msg": "删除成功"})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500