import uuid # .material -> .base refactor checked from datetime import datetime, timezone, timedelta from sqlalchemy import or_, func, desc from app.extensions import db from app.models.outbound import TransOutbound # 引入所有库存模型以进行查询 from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct # 引入基础信息表 from app.models.base import MaterialBase class OutboundService: @staticmethod def generate_outbound_no(): """ 生成出库单号: OUT-yyyyMMdd-HHmm-当日流水(4位) 例如: OUT-20260205-1558-0001 """ beijing_tz = timezone(timedelta(hours=8)) now = datetime.now(beijing_tz) date_str = now.strftime('%Y%m%d') time_str = now.strftime('%H%M') prefix = f"OUT-{date_str}-" existing_count = db.session.query(func.count(func.distinct(TransOutbound.outbound_no))) \ .filter(TransOutbound.outbound_no.like(f"{prefix}%")).scalar() sequence = existing_count + 1 return f"OUT-{date_str}-{time_str}-{sequence:04d}" @staticmethod def get_stock_by_barcode(barcode): """ 根据扫码内容查找对应的库存物品,并附带价格信息 """ if not barcode: return None clean_code = barcode.strip() def get_price(item, table_type): if table_type == 'stock_product': return float(item.sale_price) if item.sale_price else 0 elif table_type == 'stock_buy': return float(item.pre_tax_unit_price) if item.pre_tax_unit_price else 0 return 0 prod = StockProduct.query.filter( or_(StockProduct.barcode == clean_code, StockProduct.sku == clean_code) ).first() if prod: res = OutboundService._format_scan_result(prod, 'stock_product') res['price'] = get_price(prod, 'stock_product') return res semi = StockSemi.query.filter( or_(StockSemi.barcode == clean_code, StockSemi.sku == clean_code) ).first() if semi: res = OutboundService._format_scan_result(semi, 'stock_semi') res['price'] = 0 return res buy = StockBuy.query.filter( or_(StockBuy.barcode == clean_code, StockBuy.sku == clean_code) ).first() if buy: res = OutboundService._format_scan_result(buy, 'stock_buy') res['price'] = get_price(buy, 'stock_buy') return res return None @staticmethod def _format_scan_result(item, table_name): base_name = "" base_spec = "" base_cat = "" base_type = "" if hasattr(item, 'base') and item.base: base_name = item.base.name base_spec = item.base.spec_model base_cat = item.base.category base_type = item.base.material_type if not base_name and hasattr(item, 'base_id') and item.base_id: try: base_info = MaterialBase.query.get(item.base_id) if base_info: base_name = base_info.name base_spec = base_info.spec_model base_cat = base_info.category base_type = base_info.material_type except Exception: pass if not base_name and hasattr(item, 'material_name'): base_name = item.material_name stock_qty = float(item.stock_quantity) if item.stock_quantity else 0 avail_qty = float(item.available_quantity) if item.available_quantity else 0 return { 'id': item.id, 'sku': item.sku, 'name': base_name or "未知物品", 'spec_model': base_spec or "", 'category': base_cat or "", 'material_type': base_type or "", 'source_table': table_name, 'stock_quantity': stock_qty, 'available_quantity': avail_qty, 'batch_number': getattr(item, 'batch_number', ''), 'warehouse_location': getattr(item, 'warehouse_location', ''), 'barcode': getattr(item, 'barcode', '') } @staticmethod def create_outbound_batch(data, operator_name='System'): items = data.get('items', []) if not items: raise ValueError("出库商品列表不能为空") outbound_no = OutboundService.generate_outbound_no() common_data = { 'outbound_no': outbound_no, 'consumer_name': data.get('consumer_name'), 'outbound_type': data.get('outbound_type', 'SALES'), 'signature_path': data.get('signature_path'), 'operator_name': operator_name, 'remark': data.get('remark') } beijing_tz = timezone(timedelta(hours=8)) current_time = datetime.now(beijing_tz).replace(tzinfo=None) model_map = { 'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct } try: for item in items: source_table = item.get('source_table') stock_id = item.get('stock_id') quantity = float(item.get('quantity', 0)) unit_price = float(item.get('price', 0)) if quantity <= 0: raise ValueError(f"SKU {item.get('sku')} 的出库数量必须大于0") ModelClass = model_map.get(source_table) if not ModelClass: continue stock_record = ModelClass.query.with_for_update().get(stock_id) if not stock_record: raise ValueError(f"库存记录不存在 (ID: {stock_id})") if float(stock_record.available_quantity) < quantity: raise ValueError(f"SKU {stock_record.sku} 库存不足,当前可用: {stock_record.available_quantity}") stock_record.stock_quantity = float(stock_record.stock_quantity) - quantity stock_record.available_quantity = float(stock_record.available_quantity) - quantity new_record = TransOutbound( sku=item.get('sku'), source_table=source_table, stock_id=stock_id, barcode=item.get('barcode'), quantity=quantity, unit_price=unit_price, outbound_time=current_time, **common_data ) db.session.add(new_record) db.session.commit() return outbound_no except Exception as e: db.session.rollback() raise e @staticmethod def get_grouped_list(page=1, per_page=10, keyword=None, start_date=None, end_date=None): """ 查询出库记录(按出库单号分组),包含详细物品信息 支持跨表搜索:单号、领用人、SKU、物料名称、规格型号 """ # 1. 查询分页单号 # 如果有关键词,需要联表搜索物料名称和规格型号 if keyword: # 子查询:关联 material_base 表获取物料名称和规格型号 material_join = db.session.query( TransOutbound.outbound_no ).join( MaterialBase, TransOutbound.sku == MaterialBase.sku ).filter(or_( MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%') )).subquery() # 主搜索条件:单号、领用人、SKU + 物料名称、规格型号 keyword_conditions = or_( TransOutbound.outbound_no.ilike(f'%{keyword}%'), TransOutbound.consumer_name.ilike(f'%{keyword}%'), TransOutbound.sku.ilike(f'%{keyword}%'), TransOutbound.outbound_no.in_(material_join) # 匹配物料名称/规格型号的单号 ) else: keyword_conditions = None stmt = db.session.query( TransOutbound.outbound_no, func.max(TransOutbound.outbound_time).label('max_time') ).group_by(TransOutbound.outbound_no) if keyword_conditions is not None: stmt = stmt.filter(keyword_conditions) if start_date and end_date: stmt = stmt.filter(TransOutbound.outbound_time.between(start_date, end_date)) stmt = stmt.order_by(desc('max_time')) # 使用 distinct 确保跨表查询不重复 stmt = stmt.distinct() pagination = stmt.paginate(page=page, per_page=per_page, error_out=False) outbound_nos = [row.outbound_no for row in pagination.items] if not outbound_nos: return { 'items': [], 'total': 0, 'pages': 0, 'current_page': page } # 2. 查询详细记录 details = TransOutbound.query.filter(TransOutbound.outbound_no.in_(outbound_nos)).all() # 3. 组装数据并查询物品详情 grouped_map = {} # 映射表模型以便查询 model_map = { 'stock_buy': StockBuy, 'stock_semi': StockSemi, 'stock_product': StockProduct } for d in details: ono = d.outbound_no if ono not in grouped_map: grouped_map[ono] = { 'outbound_no': ono, 'outbound_time': d.outbound_time.strftime('%Y-%m-%d %H:%M:%S'), 'outbound_type': d.outbound_type, 'consumer_name': d.consumer_name, 'operator_name': d.operator_name, 'signature_path': d.signature_path, 'remark': d.remark, 'total_amount': 0.0, 'items': [] } # --- 查询物品详细信息 (名称, 规格, 类型, 类别) --- item_name = "未知物品" item_spec = "" item_cat = "" item_type = "" ModelClass = model_map.get(d.source_table) if ModelClass and d.stock_id: # 注意:这里在循环中查询可能会有N+1问题,但考虑到单页数据量(通常每单条目不多),暂时可接受 # 生产环境建议优化为预加载或批量查询 try: stock_item = ModelClass.query.get(d.stock_id) if stock_item and stock_item.base: item_name = stock_item.base.name item_spec = stock_item.base.spec_model item_cat = stock_item.base.category item_type = stock_item.base.material_type elif stock_item and hasattr(stock_item, 'base_id') and stock_item.base_id: base_info = MaterialBase.query.get(stock_item.base_id) if base_info: item_name = base_info.name item_spec = base_info.spec_model item_cat = base_info.category item_type = base_info.material_type except Exception as e: print(f"Error fetching detail for stock_id {d.stock_id}: {e}") # 计算金额 price = float(d.unit_price) if d.unit_price else 0 qty = float(d.quantity) subtotal = price * qty grouped_map[ono]['total_amount'] += subtotal grouped_map[ono]['items'].append({ 'sku': d.sku, 'name': item_name, 'spec_model': item_spec, 'category': item_cat, 'material_type': item_type, 'quantity': qty, 'unit_price': price, 'subtotal': subtotal }) # 4. 排序输出 result_list = [] for ono in outbound_nos: if ono in grouped_map: obj = grouped_map[ono] obj['items'].sort(key=lambda x: x['unit_price'], reverse=True) obj['total_amount'] = round(obj['total_amount'], 2) result_list.append(obj) return { 'items': result_list, 'total': pagination.total, 'pages': pagination.pages, 'current_page': page }