Files
KCGL/inventory-backend/app/api/v1/transactions.py

321 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, jsonify, request # .material -> .base refactor checked
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from app.utils.decorators import permission_required, audit_log
from app.services.auth_service import AuthService
from app.services.trans_service import TransService
from app.services.borrow_service import BorrowApprovalService
import traceback
trans_bp = Blueprint('transactions', __name__, url_prefix='/transactions')
# ==============================================================================
# 辅助函数:获取当前用户的完整权限列表(基于角色查询)
# ==============================================================================
def get_current_user_permissions():
"""
返回当前用户拥有的所有权限码列表(包括菜单和元素)
此函数根据角色查询数据库得到权限。
"""
claims = get_jwt()
user_role = claims.get('role')
if not user_role:
return []
# 超级管理员返回所有字段权限 (忽略大小写)
if user_role.upper() == 'SUPER_ADMIN':
return ['*']
perm_dict = AuthService.get_user_permissions(user_role)
# 合并菜单和元素权限
perms = perm_dict.get('menus', []) + perm_dict.get('elements', [])
return perms
def get_current_user_info():
"""获取当前用户信息和角色"""
from app.models.system import SysUser
identity = get_jwt_identity()
if not identity:
return None, None
user = SysUser.query.get(identity)
return user.id if user else None, user.role if user else None
def filter_item_by_permissions(item_dict, user_permissions, prefix='op_records'):
"""
根据用户权限过滤 item 字典,无权限的字段值置为 None
"""
# 字段名到权限码的映射(与前端 permissionMap 保持一致)
field_to_perm = {
# 'borrow_no': f'{prefix}:borrow_no',
# 'borrower_name': f'{prefix}:borrower_name',
# 'sku': f'{prefix}:sku',
# 'borrow_time': f'{prefix}:borrow_time',
# 'return_time': f'{prefix}:return_time',
# 'return_operator': f'{prefix}:return_operator',
# 'status': f'{prefix}:status',
# 'expected_return_time': f'{prefix}:expected_return_time',
# 'return_location': f'{prefix}:return_location',
# 'borrow_signature': f'{prefix}:borrow_signature',
# 'return_signature': f'{prefix}:return_signature',
}
# 如果用户是超级管理员且有 '*',则不过滤
if '*' in user_permissions:
return item_dict
for field, perm_code in field_to_perm.items():
if field in item_dict and perm_code not in user_permissions:
item_dict[field] = None
return item_dict
# --- 借库接口 ---
@trans_bp.route('/borrow', methods=['POST'])
@jwt_required()
@permission_required('op_borrow:operation')
@audit_log(
module='借库管理',
action='新增',
get_target_name_fn=lambda: request.get_json().get('borrow_no') if request.get_json() else None
)
def create_borrow():
data = request.get_json()
try:
no = TransService.create_borrow(data)
return jsonify({'code': 200, 'msg': '借用成功', 'data': {'borrow_no': no}})
except Exception as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
# --- 还库辅助:扫码查找借出记录 ---
@trans_bp.route('/return/scan', methods=['GET'])
@jwt_required()
@permission_required('op_return')
def scan_borrowed_item():
barcode = request.args.get('barcode')
if not barcode:
return jsonify({'code': 400, 'msg': '无条码'}), 400
res = TransService.scan_for_return(barcode)
if res:
return jsonify({'code': 200, 'data': res})
else:
return jsonify({'code': 404, 'msg': '未找到该物品的未还记录'}), 404
# --- 还库提交 ---
@trans_bp.route('/return', methods=['POST'])
@jwt_required()
@permission_required('op_return:operation')
@audit_log(
module='借库管理',
action='归还',
get_target_name_fn=lambda: request.get_json().get('borrow_no') if request.get_json() else None
)
def submit_return():
data = request.get_json()
user = get_jwt_identity() # 库管
try:
TransService.process_return(data, operator_name=user)
return jsonify({'code': 200, 'msg': '还库成功'})
except Exception as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
# --- 记录列表 ---
@trans_bp.route('/records', methods=['GET'])
@jwt_required()
@permission_required('op_records')
def get_records():
status = request.args.get('status', 'all')
page = int(request.args.get('page', 1))
keyword = request.args.get('keyword', '')
search_type = request.args.get('search_type', 'all')
res = TransService.get_records(page=page, limit=10, status=status, keyword=keyword, search_type=search_type)
# 字段级脱敏
user_permissions = get_current_user_permissions()
if res.get('items'):
res['items'] = [filter_item_by_permissions(item, user_permissions, 'op_records') for item in res['items']]
return jsonify({'code': 200, 'data': res})
# ==============================================================================
# 借库审批流 API与出库审批流平行
# ==============================================================================
# --- 提交借库申请 ---
@trans_bp.route('/borrow/request', methods=['POST'])
@jwt_required()
def submit_borrow_request():
"""
提交借库申请(仅存储意向,不扣库存)
请求体: { items: [...], allowed_approvers: [...], remark: '', approver_id: int }
"""
try:
user_id, user_role = get_current_user_info()
if not user_id:
return jsonify({'code': 401, 'msg': '用户未登录'}), 401
from app.models.system import SysUser
current_user = SysUser.query.get(user_id)
current_username = current_user.username if current_user else None
data = request.get_json() or {}
items = data.get('items', [])
if not items:
return jsonify({'code': 400, 'msg': '借库物品列表不能为空'}), 400
required_fields = ['name', 'spec_model', 'quantity']
for idx, item in enumerate(items):
missing = [f for f in required_fields if f not in item or str(item.get(f) or '').strip() == '']
if missing:
return jsonify({
'code': 400,
'msg': f'{idx + 1}条物品缺少必填字段: {", ".join(missing)}'
}), 400
try:
qty = float(item.get('quantity', 0))
if qty <= 0:
return jsonify({'code': 400, 'msg': f'{idx + 1}条物品的借库数量必须大于0'}), 400
except (TypeError, ValueError):
return jsonify({'code': 400, 'msg': f'{idx + 1}条物品的 quantity 格式无效'}), 400
approver_id = data.get('approver_id')
_default_approvers = [
{"type": "role", "value": "SUPERVISOR"},
{"type": "role", "value": "SUPER_ADMIN"}
]
allowed_approvers = data.get('allowed_approvers') or _default_approvers
approval = BorrowApprovalService.submit_approval(
applicant_id=user_id,
items=items,
allowed_approvers=allowed_approvers,
remark=data.get('remark'),
approver_id=approver_id,
borrower_name=current_username
)
return jsonify({'code': 200, 'msg': '借库申请已提交', 'data': approval.to_dict()}), 200
except ValueError as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
except Exception as e:
return jsonify({'code': 500, 'msg': f"接口内部报错: {str(e)}", 'trace': traceback.format_exc()}), 500
# --- 审批借库申请 ---
@trans_bp.route('/borrow/request/<int:request_id>/approve', methods=['PATCH'])
@jwt_required()
def approve_borrow_request(request_id):
"""
审批借库申请
请求体: {"action": "approve" | "reject", "reject_reason": "驳回原因"}
"""
try:
user_id, user_role = get_current_user_info()
if not user_id:
return jsonify({'code': 401, 'msg': '用户未登录'}), 401
data = request.get_json() or {}
action = data.get('action', 'approve')
reject_reason = data.get('reject_reason')
if action not in ('approve', 'reject'):
return jsonify({'code': 400, 'msg': '无效的审批操作,仅支持 approve 或 reject'}), 400
if action == 'reject' and not reject_reason:
return jsonify({'code': 400, 'msg': '驳回时必须提供原因'}), 400
success, message, approval = BorrowApprovalService.approve(
request_id=request_id,
user_id=user_id,
user_role=user_role,
action=action,
reject_reason=reject_reason
)
if not success:
return jsonify({'code': 400, 'msg': message}), 400
return jsonify({'code': 200, 'msg': message, 'data': approval.to_dict() if approval else None}), 200
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': f'服务器内部错误: {str(e)}'}), 500
# --- 获取借库审批单列表 ---
@trans_bp.route('/borrow/request', methods=['GET'])
@jwt_required()
def get_borrow_request_list():
"""
获取借库审批单列表
Query参数: page, limit, applicant_id, status
"""
try:
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
applicant_id = request.args.get('applicant_id')
if applicant_id:
applicant_id = int(applicant_id)
status = request.args.get('status')
if status is not None:
status = int(status)
result = BorrowApprovalService.get_request_list(
page=page, per_page=limit, applicant_id=applicant_id, status=status
)
return jsonify({'code': 200, 'msg': '获取成功', 'data': result}), 200
except Exception as e:
return jsonify({'code': 500, 'msg': str(e)}), 500
# --- 执行借库扣减(审批通过后调用)---
@trans_bp.route('/borrow/dispatch', methods=['POST'])
@jwt_required()
@permission_required('op_borrow:operation')
def dispatch_borrow():
"""
执行借库扣减
请求体: {
approval_id: int, // 关联的审批单ID
items: [ // 扫码选中的库存物品
{
id: int, // 库存主键(按 source_table 路由到 StockBuy/StockSemi/StockProduct
source_table: str, // 'stock_buy' | 'stock_semi' | 'stock_product'
sku: str, // 用于按 (source_table, sku) 与审批单做超额交叉校验
out_quantity: float
}
],
borrower_name: str,
signature_path: str,
remark: str,
expected_return_time: str
}
"""
try:
data = request.get_json() or {}
approval_id = data.get('approval_id')
if not approval_id:
return jsonify({'code': 400, 'msg': '缺少 approval_id'}), 400
borrow_no = TransService.execute_dispatch(
approval_id=approval_id,
items=data.get('items', []),
operator_name=get_jwt_identity(),
borrower_name=data.get('borrower_name'),
signature=data.get('signature_path'),
remark=data.get('remark'),
expected_return_time=data.get('expected_return_time')
)
return jsonify({'code': 200, 'msg': '借库成功', 'data': {'borrow_no': borrow_no}}), 200
except ValueError as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'code': 500, 'msg': f'服务器内部错误: {str(e)}'}), 500