Files
KCGL/inventory-backend/app/api/v1/inbound/buy.py

420 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify
from app.services.inbound.buy_service import BuyInboundService
from app.utils.decorators import permission_required, audit_log
import traceback
inbound_buy_bp = Blueprint('stock_buy', __name__)
# ==============================================================================
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
# ==============================================================================
def get_current_user_permissions():
"""
返回当前用户拥有的所有权限码列表(包括菜单和元素)
此函数根据角色查询数据库得到权限。
"""
from flask_jwt_extended import get_jwt
from app.services.auth_service import AuthService
claims = get_jwt()
user_role = claims.get('role')
if not user_role:
return []
# 超级管理员返回所有字段权限 (忽略大小写)
if user_role.upper() == 'SUPER_ADMIN':
# 返回所有以 inbound_buy: 开头的权限码(这里我们返回一个特殊标记,表示全部)
# 为了简单,我们返回 ['inbound_buy:*'],在过滤函数中特殊处理
return ['inbound_buy:*']
perm_dict = AuthService.get_user_permissions(user_role)
# 合并菜单和元素权限
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
return perms
def filter_item_by_permissions(item_dict, user_permissions):
"""
根据用户权限过滤 item 字典,无权限的字段值置为 None
"""
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
field_to_perm = {
'id': 'inbound_buy:id',
'base_id': 'inbound_buy:base_id',
'global_print_id': 'inbound_buy:global_print_id',
'sku': 'inbound_buy:sku',
'barcode': 'inbound_buy:barcode',
'in_date': 'inbound_buy:in_date',
'serial_number': 'inbound_buy:sn_bn',
'batch_number': 'inbound_buy:sn_bn',
'status': 'inbound_buy:status',
'in_quantity': 'inbound_buy:in_quantity',
'stock_quantity': 'inbound_buy:stock_quantity',
'available_quantity': 'inbound_buy:available_quantity',
'inspection_status': 'inbound_buy:inspection_status',
'warehouse_location': 'inbound_buy:warehouse_location',
'unit_price': 'inbound_buy:unit_price',
'post_tax_unit_price': 'inbound_buy:post_tax_unit_price',
'tax_rate': 'inbound_buy:tax_rate',
'total_price': 'inbound_buy:total_price',
'currency': 'inbound_buy:currency',
'exchange_rate': 'inbound_buy:exchange_rate',
'supplier_name': 'inbound_buy:supplier_name',
'buyer_name': 'inbound_buy:buyer_name',
'buyer_email': 'inbound_buy:buyer_email',
'original_link': 'inbound_buy:original_link',
'detail_link': 'inbound_buy:detail_link',
'arrival_photo': 'inbound_buy:arrival_photo',
'inspection_report': 'inbound_buy:inspection_report',
'material_name': 'inbound_buy:material_name',
'spec_model': 'inbound_buy:spec_model',
'category': 'inbound_buy:category',
'unit': 'inbound_buy:unit',
'material_type': 'inbound_buy:material_type',
'company_name': 'inbound_buy:company_name',
}
# 如果用户是超级管理员且有 'inbound_buy:*',则不过滤
if 'inbound_buy:*' in user_permissions:
return item_dict
for field, perm_code in field_to_perm.items():
# 提取不带前缀的基础权限码(如 'serial_number'
base_perm_code = perm_code.split(':')[-1] if ':' in perm_code else perm_code
# 如果用户的权限列表中,既没有长格式,也没有短格式,才将字段设为 None
if field in item_dict and perm_code not in user_permissions and base_perm_code not in user_permissions:
item_dict[field] = None
return item_dict
# ------------------------------------------------------------------
# 0. 基础物料搜索
# ------------------------------------------------------------------
@inbound_buy_bp.route('/search-base', methods=['GET'])
@permission_required('inbound_buy')
def search_base():
try:
keyword = request.args.get('keyword', '')
page = request.args.get('page', 1, type=int)
# 固定每次加载50条
limit = 50
result = BuyInboundService.search_base_material(keyword, page, limit)
return jsonify({
"code": 200,
"msg": "success",
"data": result['items'],
"total": result['total'],
"has_next": result['has_next']
})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 1. 获取列表 (修改:接收 category, material_type, orderByColumn, isAsc, advancedFilters)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/list', methods=['GET'])
@permission_required('inbound_buy')
def get_list():
try:
page = request.args.get('page', 1, type=int)
limit = request.args.get('pageSize', 15, type=int)
keyword = request.args.get('keyword', '')
sku = request.args.get('sku', '')
search_field = request.args.get('searchField', 'all')
# 新增筛选参数
category = request.args.get('category', '')
material_type = request.args.get('material_type', '')
company = request.args.get('company', '')
# 排序参数
order_by = request.args.get('orderByColumn', '').strip()
is_asc = request.args.get('isAsc', '').strip()
# 高级筛选参数
advanced_filters_raw = request.args.get('advancedFilters', '[]')
import json
try:
advanced_filters = json.loads(advanced_filters_raw) if advanced_filters_raw else []
except json.JSONDecodeError:
advanced_filters = []
# 状态参数处理
statuses_str = request.args.get('statuses', '')
statuses = statuses_str.split(',') if statuses_str else []
result = BuyInboundService.get_list(page, limit, keyword, sku, search_field, statuses, category, material_type, company,
order_by, is_asc, advanced_filters)
# 字段级脱敏
user_permissions = get_current_user_permissions()
if result.get('items'):
result['items'] = [filter_item_by_permissions(item, user_permissions) for item in result['items']]
return jsonify({"code": 200, "msg": "success", "data": result})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 2. 新增入库
# ------------------------------------------------------------------
@inbound_buy_bp.route('/submit', methods=['POST'])
@permission_required('inbound_buy:operation')
@audit_log(
module='采购入库',
action='新增',
get_target_name_fn=lambda: request.get_json().get('material_name') if request.get_json() else None
)
def submit():
try:
data = request.get_json()
if not data:
return jsonify({"code": 400, "msg": "No data"}), 400
# 数据清洗:移除用户没有权限的字段
user_permissions = get_current_user_permissions()
# 超级管理员不过滤
if 'inbound_buy:*' not in user_permissions:
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
field_to_perm = {
'id': 'inbound_buy:id',
'base_id': 'inbound_buy:base_id',
'global_print_id': 'inbound_buy:global_print_id',
'sku': 'inbound_buy:sku',
'barcode': 'inbound_buy:barcode',
'in_date': 'inbound_buy:in_date',
'serial_number': 'inbound_buy:serial_number',
'batch_number': 'inbound_buy:batch_number',
'status': 'inbound_buy:status',
'in_quantity': 'inbound_buy:in_quantity',
'stock_quantity': 'inbound_buy:stock_quantity',
'available_quantity': 'inbound_buy:available_quantity',
'inspection_status': 'inbound_buy:inspection_status',
'warehouse_location': 'inbound_buy:warehouse_location',
'unit_price': 'inbound_buy:unit_price',
'post_tax_unit_price': 'inbound_buy:post_tax_unit_price',
'tax_rate': 'inbound_buy:tax_rate',
'total_price': 'inbound_buy:total_price',
'currency': 'inbound_buy:currency',
'exchange_rate': 'inbound_buy:exchange_rate',
'supplier_name': 'inbound_buy:supplier_name',
'buyer_name': 'inbound_buy:buyer_name',
'buyer_email': 'inbound_buy:buyer_email',
'original_link': 'inbound_buy:original_link',
'detail_link': 'inbound_buy:detail_link',
'arrival_photo': 'inbound_buy:arrival_photo',
'inspection_report': 'inbound_buy:inspection_report',
'material_name': 'inbound_buy:material_name',
'spec_model': 'inbound_buy:spec_model',
'category': 'inbound_buy:category',
'unit': 'inbound_buy:unit',
'material_type': 'inbound_buy:material_type',
'company_name': 'inbound_buy:company_name',
}
# 复制一份,避免遍历时修改字典
for field in list(data.keys()):
perm_code = field_to_perm.get(field)
# 提取不带前缀的基础权限码(如 'serial_number'
base_perm_code = perm_code.split(':')[-1] if ':' in perm_code else perm_code
# 如果用户的权限列表中,既没有长格式,也没有短格式,才移除该字段
if perm_code and perm_code not in user_permissions and base_perm_code not in user_permissions:
data.pop(field, None)
# 库位必填校验(安全兜底)
location = data.get('warehouse_location', '').strip()
if not location:
return jsonify({"code": 400, "msg": "入库失败:库位为必填项,不能为空!"}), 400
new_stock = BuyInboundService.handle_inbound(data)
return jsonify({
"code": 200,
"msg": "入库成功",
"data": new_stock.to_dict()
})
except Exception as e:
traceback.print_exc()
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 3. 更新入库
# ------------------------------------------------------------------
@inbound_buy_bp.route('/<int:id>', methods=['PUT'])
@permission_required('inbound_buy:operation')
@audit_log(
module='采购入库',
action='修改',
get_target_id_fn=lambda: request.view_args.get('id'),
get_target_name_fn=lambda: request.get_json().get('material_name') if request.get_json() else None
)
def update_buy(id):
try:
data = request.get_json()
# 数据清洗:移除用户没有权限的字段
user_permissions = get_current_user_permissions()
# 超级管理员不过滤
if 'inbound_buy:*' not in user_permissions:
field_to_perm = {
'id': 'inbound_buy:id',
'base_id': 'inbound_buy:base_id',
'global_print_id': 'inbound_buy:global_print_id',
'sku': 'inbound_buy:sku',
'barcode': 'inbound_buy:barcode',
'in_date': 'inbound_buy:in_date',
'serial_number': 'inbound_buy:serial_number',
'batch_number': 'inbound_buy:batch_number',
'status': 'inbound_buy:status',
'in_quantity': 'inbound_buy:in_quantity',
'stock_quantity': 'inbound_buy:stock_quantity',
'available_quantity': 'inbound_buy:available_quantity',
'inspection_status': 'inbound_buy:inspection_status',
'warehouse_location': 'inbound_buy:warehouse_location',
'unit_price': 'inbound_buy:unit_price',
'post_tax_unit_price': 'inbound_buy:post_tax_unit_price',
'tax_rate': 'inbound_buy:tax_rate',
'total_price': 'inbound_buy:total_price',
'currency': 'inbound_buy:currency',
'exchange_rate': 'inbound_buy:exchange_rate',
'supplier_name': 'inbound_buy:supplier_name',
'buyer_name': 'inbound_buy:buyer_name',
'buyer_email': 'inbound_buy:buyer_email',
'original_link': 'inbound_buy:original_link',
'detail_link': 'inbound_buy:detail_link',
'arrival_photo': 'inbound_buy:arrival_photo',
'inspection_report': 'inbound_buy:inspection_report',
'material_name': 'inbound_buy:material_name',
'spec_model': 'inbound_buy:spec_model',
'category': 'inbound_buy:category',
'unit': 'inbound_buy:unit',
'material_type': 'inbound_buy:material_type',
'company_name': 'inbound_buy:company_name',
}
# 复制一份,避免遍历时修改字典
for field in list(data.keys()):
perm_code = field_to_perm.get(field)
# 提取不带前缀的基础权限码(如 'serial_number'
base_perm_code = perm_code.split(':')[-1] if ':' in perm_code else perm_code
# 如果用户的权限列表中,既没有长格式,也没有短格式,才移除该字段
if perm_code and perm_code not in user_permissions and base_perm_code not in user_permissions:
data.pop(field, None)
BuyInboundService.update_inbound(id, data)
return jsonify({"code": 200, "msg": "更新成功"})
except Exception as e:
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 4. 删除
# ------------------------------------------------------------------
@inbound_buy_bp.route('/<int:id>', methods=['DELETE'])
@permission_required('inbound_buy:operation')
@audit_log(
module='采购入库',
action='删除',
get_target_id_fn=lambda: request.view_args.get('id')
)
def delete_buy(id):
try:
material_name = BuyInboundService.delete_inbound(id)
return jsonify({"code": 200, "msg": "删除成功", "material_name": material_name})
except ValueError as ve:
# 捕获业务拦截的报错,返回友好的 msg
return jsonify({"code": 400, "msg": str(ve)})
except Exception as e:
import traceback
traceback.print_exc() # 在控制台打印真实错误堆栈
return jsonify({"code": 500, "msg": f"服务器内部错误详情: {str(e)}"}), 500
# ------------------------------------------------------------------
# 5. [新增] 获取筛选下拉选项 (修复404的关键)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/options', methods=['GET'])
@permission_required('inbound_buy')
def get_options():
try:
data = BuyInboundService.get_filter_options()
return jsonify({"code": 200, "msg": "success", "data": data})
except Exception as e:
return jsonify({"code": 500, "msg": str(e)}), 500
# ------------------------------------------------------------------
# 6. 获取关联的出库历史 (如果有)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/<int:id>/history', methods=['GET'])
@permission_required('inbound_buy')
def get_history(id):
# 如果没有出库模块,这个接口可能为空,但为保持兼容性保留
return jsonify({"code": 200, "msg": "success", "data": []})
# ------------------------------------------------------------------
# 7. 供应商建议
# ------------------------------------------------------------------
@inbound_buy_bp.route('/suggestions/suppliers', methods=['GET'])
@permission_required('inbound_buy')
def get_supplier_suggestions():
base_id = request.args.get('base_id', type=int)
if not base_id:
return jsonify({"code": 400, "msg": "base_id required"}), 400
data = BuyInboundService.get_history_suppliers(base_id)
return jsonify({"code": 200, "msg": "success", "data": data})
# ------------------------------------------------------------------
# 8. 采购人建议 (全局)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/suggestions/users', methods=['GET'])
@permission_required('inbound_buy')
def get_user_suggestions():
keyword = request.args.get('keyword', '')
data = BuyInboundService.get_history_purchasers(keyword)
return jsonify({"code": 200, "msg": "success", "data": data})
# ------------------------------------------------------------------
# 9. 链接建议
# ------------------------------------------------------------------
@inbound_buy_bp.route('/suggestions/links', methods=['GET'])
@permission_required('inbound_buy')
def get_link_suggestions():
base_id = request.args.get('base_id', type=int)
link_type = request.args.get('type', 'original') # original or detail
if not base_id:
return jsonify({"code": 400, "msg": "base_id required"}), 400
data = BuyInboundService.get_history_links(base_id, link_type)
return jsonify({"code": 200, "msg": "success", "data": data})
# ------------------------------------------------------------------
# 10. 库位建议
# ------------------------------------------------------------------
@inbound_buy_bp.route('/suggestions/locations', methods=['GET'])
@permission_required('inbound_buy')
def get_location_suggestions():
base_id = request.args.get('base_id', type=int)
if not base_id:
return jsonify({"code": 400, "msg": "base_id required"}), 400
data = BuyInboundService.get_history_locations(base_id)
return jsonify({"code": 200, "msg": "success", "data": data})
# ------------------------------------------------------------------
# 11. 获取最近一次入库的库位(跨表查询)
# ------------------------------------------------------------------
@inbound_buy_bp.route('/last-location', methods=['GET'])
@permission_required('inbound_buy')
def get_last_location():
"""
获取指定物料最近一次入库的库位
查询顺序:采购入库 -> 成品入库 -> 半成品入库,返回最新入库的库位
"""
base_id = request.args.get('base_id', type=int)
if not base_id:
return jsonify({"code": 400, "msg": "base_id required"}), 400
location = BuyInboundService.get_last_location_by_base_id(base_id)
return jsonify({"code": 200, "msg": "success", "data": {"location": location}})