from sqlalchemy import select, literal, union_all, desc, asc, func, or_, cast, String, Numeric, \ Date # .material -> .base refactor checked from app.extensions import db from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct from app.models.base import MaterialBase import traceback from io import BytesIO from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment from openpyxl.utils import get_column_letter class InboundSummaryService: @staticmethod def get_list(page=1, per_page=10, keyword=None, start_date=None, end_date=None, source_type=None): """ 聚合查询: 1. 联合 StockBuy, StockSemi, StockProduct 三张表 2. 关联 MaterialBase 获取名称规格 3. 计算动态状态 (库存耗尽显示已出库) 4. 排序:默认按入库日期倒序 (最近的在前) """ try: # ========================================================= # 1. 构建三个子查询 (Subqueries) # 使用 getattr 动态安全地获取 batch_number 和 serial_number(或 sn) # 避免因不同表结构缺失字段导致报错 # ========================================================= # --- A. 采购件 (StockBuy) --- q_buy = db.session.query( StockBuy.id.label('id'), StockBuy.base_id.label('base_id'), StockBuy.sku.label('sku'), StockBuy.in_date.label('inbound_date'), StockBuy.in_quantity.label('in_qty'), StockBuy.stock_quantity.label('current_qty'), cast(StockBuy.supplier_name, String).label('source_info'), StockBuy.status.label('orig_status'), cast(getattr(StockBuy, 'batch_number', literal('')), String).label('batch_number'), cast(getattr(StockBuy, 'serial_number', getattr(StockBuy, 'sn', literal(''))), String).label( 'serial_number'), cast(literal('buy'), String).label('source_type') ) # --- B. 半成品 (StockSemi) --- q_semi = db.session.query( StockSemi.id.label('id'), StockSemi.base_id.label('base_id'), StockSemi.sku.label('sku'), StockSemi.production_date.label('inbound_date'), StockSemi.in_quantity.label('in_qty'), StockSemi.stock_quantity.label('current_qty'), cast(StockSemi.production_manager, String).label('source_info'), StockSemi.status.label('orig_status'), cast(getattr(StockSemi, 'batch_number', literal('')), String).label('batch_number'), cast(getattr(StockSemi, 'serial_number', getattr(StockSemi, 'sn', literal(''))), String).label( 'serial_number'), cast(literal('semi'), String).label('source_type') ) # --- C. 成品 (StockProduct) --- q_product = db.session.query( StockProduct.id.label('id'), StockProduct.base_id.label('base_id'), StockProduct.sku.label('sku'), StockProduct.production_date.label('inbound_date'), StockProduct.in_quantity.label('in_qty'), StockProduct.stock_quantity.label('current_qty'), cast(StockProduct.production_manager, String).label('source_info'), StockProduct.status.label('orig_status'), cast(getattr(StockProduct, 'batch_number', literal('')), String).label('batch_number'), cast(getattr(StockProduct, 'serial_number', getattr(StockProduct, 'sn', literal(''))), String).label( 'serial_number'), cast(literal('product'), String).label('source_type') ) # ========================================================= # 2. 组合查询 (UNION ALL) # ========================================================= combined_query = union_all(q_buy, q_semi, q_product) cte = combined_query.subquery() # ========================================================= # 3. 主查询:关联 MaterialBase # ========================================================= query = db.session.query( cte, MaterialBase.name.label('material_name'), MaterialBase.spec_model.label('spec_model'), MaterialBase.category.label('category'), MaterialBase.material_type.label('material_type') ).outerjoin( MaterialBase, cte.c.base_id == MaterialBase.id ) # ========================================================= # 4. 过滤条件 # ========================================================= if keyword: rule = or_( cte.c.sku.ilike(f'%{keyword}%'), cte.c.source_info.ilike(f'%{keyword}%'), cte.c.batch_number.ilike(f'%{keyword}%'), cte.c.serial_number.ilike(f'%{keyword}%'), # 加入对序列号的搜索支持 MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%') ) query = query.filter(rule) # 日期补全:解决零点截断问题 if end_date and len(str(end_date).strip()) == 10: end_date = f"{str(end_date).strip()} 23:59:59" if start_date and len(str(start_date).strip()) == 10: start_date = f"{str(start_date).strip()} 00:00:00" if start_date and end_date: query = query.filter(cte.c.inbound_date.between(start_date, end_date)) if source_type: query = query.filter(cte.c.source_type == source_type) # ========================================================= # 5. 获取总数 # ========================================================= count_query = db.session.query(func.count()) \ .select_from(cte) \ .outerjoin(MaterialBase, cte.c.base_id == MaterialBase.id) if keyword: count_query = count_query.filter(rule) if start_date and end_date: count_query = count_query.filter(cte.c.inbound_date.between(start_date, end_date)) if source_type: count_query = count_query.filter(cte.c.source_type == source_type) total = count_query.scalar() or 0 # ========================================================= # 6. 排序与分页 # ========================================================= # ★★★ 修改处:优先按入库日期倒序排列 (最近的在前) ★★★ # 如果日期相同,再按 SKU 排序,保证分页稳定性 query = query.order_by(desc(cte.c.inbound_date), asc(cte.c.sku)) pagination = query.limit(per_page).offset((page - 1) * per_page).all() # ========================================================= # 7. 数据格式化 # ========================================================= items = [] type_map = { 'buy': '采购入库', 'semi': '半成品生产', 'product': '成品完工' } for row in pagination: date_str = "" if row.inbound_date: try: date_str = row.inbound_date.strftime('%Y-%m-%d') except Exception: date_str = str(row.inbound_date) in_qty = float(row.in_qty) if row.in_qty is not None else 0.0 current_qty = float(row.current_qty) if row.current_qty is not None else 0.0 # 状态逻辑 final_status = row.orig_status if current_qty <= 0: final_status = "已出库" elif current_qty < in_qty: final_status = "部分出库" # 处理 批次 / 序列号 展示逻辑 b_num = row.batch_number or "" s_num = row.serial_number or "" if b_num and s_num and b_num != s_num: display_batch_sn = f"{b_num} / {s_num}" else: display_batch_sn = b_num or s_num items.append({ 'id': row.id, 'sku': row.sku or "", 'name': row.material_name or "未知物品", 'spec_model': row.spec_model or "", 'category': row.category or "", 'material_type': row.material_type or "", 'inbound_date': date_str, 'quantity': in_qty, 'current_qty': current_qty, 'source_info': row.source_info or "", 'status': final_status, 'source_type': row.source_type, 'type_label': type_map.get(row.source_type, "未知类型"), 'batch_number': display_batch_sn # 输出统一的批次与序列号组合字符串给前端 }) return { 'items': items, 'total': total, 'pages': (total + per_page - 1) // per_page if per_page > 0 else 0, 'current_page': page } except Exception as e: print("【InboundSummaryService Error】:", str(e)) traceback.print_exc() raise e @staticmethod def export_excel(keyword=None, start_date=None, end_date=None, source_type=None): """ 导出入库记录 Excel """ try: # 复用 get_list 的查询逻辑,但不分页(获取全部数据) # 构建三个子查询 q_buy = db.session.query( StockBuy.id.label('id'), StockBuy.base_id.label('base_id'), StockBuy.sku.label('sku'), StockBuy.in_date.label('inbound_date'), StockBuy.in_quantity.label('in_qty'), StockBuy.stock_quantity.label('current_qty'), cast(StockBuy.supplier_name, String).label('source_info'), StockBuy.status.label('orig_status'), cast(getattr(StockBuy, 'batch_number', literal('')), String).label('batch_number'), cast(getattr(StockBuy, 'serial_number', getattr(StockBuy, 'sn', literal(''))), String).label('serial_number'), cast(literal('buy'), String).label('source_type') ) q_semi = db.session.query( StockSemi.id.label('id'), StockSemi.base_id.label('base_id'), StockSemi.sku.label('sku'), StockSemi.production_date.label('inbound_date'), StockSemi.in_quantity.label('in_qty'), StockSemi.stock_quantity.label('current_qty'), cast(StockSemi.production_manager, String).label('source_info'), StockSemi.status.label('orig_status'), cast(getattr(StockSemi, 'batch_number', literal('')), String).label('batch_number'), cast(getattr(StockSemi, 'serial_number', getattr(StockSemi, 'sn', literal(''))), String).label('serial_number'), cast(literal('semi'), String).label('source_type') ) q_product = db.session.query( StockProduct.id.label('id'), StockProduct.base_id.label('base_id'), StockProduct.sku.label('sku'), StockProduct.production_date.label('inbound_date'), StockProduct.in_quantity.label('in_qty'), StockProduct.stock_quantity.label('current_qty'), cast(StockProduct.production_manager, String).label('source_info'), StockProduct.status.label('orig_status'), cast(getattr(StockProduct, 'batch_number', literal('')), String).label('batch_number'), cast(getattr(StockProduct, 'serial_number', getattr(StockProduct, 'sn', literal(''))), String).label('serial_number'), cast(literal('product'), String).label('source_type') ) combined_query = union_all(q_buy, q_semi, q_product) cte = combined_query.subquery() query = db.session.query( cte, MaterialBase.name.label('material_name'), MaterialBase.spec_model.label('spec_model'), MaterialBase.category.label('category'), MaterialBase.material_type.label('material_type') ).outerjoin( MaterialBase, cte.c.base_id == MaterialBase.id ) # 过滤条件 if keyword: rule = or_( cte.c.sku.ilike(f'%{keyword}%'), cte.c.source_info.ilike(f'%{keyword}%'), cte.c.batch_number.ilike(f'%{keyword}%'), cte.c.serial_number.ilike(f'%{keyword}%'), MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%') ) query = query.filter(rule) # 日期补全:解决零点截断问题 if end_date and len(str(end_date).strip()) == 10: end_date = f"{str(end_date).strip()} 23:59:59" if start_date and len(str(start_date).strip()) == 10: start_date = f"{str(start_date).strip()} 00:00:00" if start_date and end_date: query = query.filter(cte.c.inbound_date.between(start_date, end_date)) if source_type: query = query.filter(cte.c.source_type == source_type) # 排序 query = query.order_by(desc(cte.c.inbound_date), asc(cte.c.sku)) # 获取全部数据 rows = query.all() # 创建 Excel wb = Workbook() ws = wb.active ws.title = "入库记录" # 表头样式 header_font = Font(bold=True, size=11, color="FFFFFF") header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") header_alignment = Alignment(horizontal="center", vertical="center") # 写表头 headers = ['SKU', '物品名称', '规格型号', '分类', '入库来源', '入库/生产日期', '入库数量', '批次/序列号', '供应商/负责人', '当前状态'] for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.font = header_font cell.fill = header_fill cell.alignment = header_alignment # 写数据 type_map = {'buy': '采购入库', 'semi': '半成品生产', 'product': '成品完工'} for row_idx, row in enumerate(rows, 2): date_str = "" if row.inbound_date: try: date_str = row.inbound_date.strftime('%Y-%m-%d') except Exception: date_str = str(row.inbound_date) in_qty = float(row.in_qty) if row.in_qty is not None else 0.0 current_qty = float(row.current_qty) if row.current_qty is not None else 0.0 final_status = row.orig_status if current_qty <= 0: final_status = "已出库" elif current_qty < in_qty: final_status = "部分出库" b_num = row.batch_number or "" s_num = row.serial_number or "" if b_num and s_num and b_num != s_num: display_batch_sn = f"{b_num}/{s_num}" elif b_num: display_batch_sn = b_num elif s_num: display_batch_sn = s_num else: display_batch_sn = "-" ws.cell(row=row_idx, column=1, value=row.sku or "") ws.cell(row=row_idx, column=2, value=row.material_name or "未知物品") ws.cell(row=row_idx, column=3, value=row.spec_model or "") ws.cell(row=row_idx, column=4, value=row.category or "") ws.cell(row=row_idx, column=5, value=type_map.get(row.source_type, "未知类型")) ws.cell(row=row_idx, column=6, value=date_str) ws.cell(row=row_idx, column=7, value=in_qty) ws.cell(row=row_idx, column=8, value=display_batch_sn) ws.cell(row=row_idx, column=9, value=row.source_info or "") ws.cell(row=row_idx, column=10, value=final_status) # 自动调整列宽 for col in range(1, len(headers) + 1): max_length = 0 column_letter = get_column_letter(col) for row in range(2, len(rows) + 2): cell_value = ws.cell(row=row, column=col).value if cell_value: max_length = max(max_length, len(str(cell_value))) adjusted_width = min(max_length + 2, 50) ws.column_dimensions[column_letter].width = adjusted_width # 输出到字节流 file_stream = BytesIO() wb.save(file_stream) file_stream.seek(0) return file_stream except Exception as e: print("【InboundSummaryService Export Error】:", str(e)) traceback.print_exc() raise e