# 文件路径: app/services/inbound/base_service.py from app.extensions import db from app.models.base import MaterialBase from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct # from app.models.inbound.service import StockService from sqlalchemy import or_, and_, func import traceback import json import io import datetime # 需要 pip install openpyxl from openpyxl import Workbook from openpyxl.styles import Font, Alignment, Border, Side, PatternFill from collections import defaultdict class MaterialBaseService: """ 基础物料服务层 负责处理 MaterialBase 的增删改查及搜索逻辑 """ @staticmethod def search_material(keyword): """ 根据关键字搜索已启用的基础物料 (供 /api/v1/inbound/base/search 接口调用) """ try: if not keyword: return [] query = MaterialBase.query.filter( MaterialBase.is_enabled == True, or_( MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.common_name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%'), # 支持搜索公司名 MaterialBase.company_name.ilike(f'%{keyword}%') ) ) # [修改1] 增加返回数量限制 # 原为 limit(20),现改为 1000,确保前端能获取所有(或足够多)的数据 query = query.limit(1000) # 获取查询结果对象列表 db_items = query.all() # [修改2] 规格型号排序逻辑 # 要求:只考虑 '/' 前面的内容进行排序 # 使用 Python 的 sort 方法,提取 spec_model 中 '/' 前的部分 def get_sort_key(item): if not item.spec_model: return "" # 如果包含 '/',取前半部分;否则取整个字符串 parts = item.spec_model.split('/') return parts[0] if len(parts) > 0 else item.spec_model # 执行排序 db_items.sort(key=get_sort_key) results = [] for item in db_items: results.append({ 'id': item.id, # 必须保留ID供前端逻辑使用,视觉上的隐藏请在前端处理 'companyName': item.company_name, 'name': item.name, 'commonName': item.common_name, 'spec': item.spec_model, 'category': item.category, 'unit': item.unit, 'type': item.material_type, 'status': '启用' }) return results except Exception as e: traceback.print_exc() return [] @staticmethod def _get_stock_counts(stock_query): """ 辅助函数:安全计算库存列表的总数量 """ total_inv = 0 total_avail = 0 try: items = list(stock_query) # 触发查询 except: items = [] for x in items: # 1. 获取库存数 (兼容不同字段名) q = getattr(x, 'stock_quantity', getattr(x, 'in_quantity', 0)) # 优先取库存,其次入库 # 2. 获取可用数 a = getattr(x, 'available_quantity', q) try: total_inv += float(q if q is not None else 0) total_avail += float(a if a is not None else 0) except: pass return total_inv, total_avail @staticmethod def get_list(page, limit, filters=None): """ 获取基础信息列表 (带分页、高级筛选和全字段排序) """ try: # 构建聚合子查询 buy_sub = db.session.query( StockBuy.base_id, func.sum(StockBuy.stock_quantity).label('buy_inv'), func.sum(StockBuy.available_quantity).label('buy_avail') ).group_by(StockBuy.base_id).subquery() semi_sub = db.session.query( StockSemi.base_id, func.sum(StockSemi.stock_quantity).label('semi_inv'), func.sum(StockSemi.available_quantity).label('semi_avail') ).group_by(StockSemi.base_id).subquery() prod_sub = db.session.query( StockProduct.base_id, func.sum(StockProduct.stock_quantity).label('prod_inv'), func.sum(StockProduct.available_quantity).label('prod_avail') ).group_by(StockProduct.base_id).subquery() # 总库存和可用数的 SQL 表达式 total_inv = func.coalesce(buy_sub.c.buy_inv, 0) + \ func.coalesce(semi_sub.c.semi_inv, 0) + \ func.coalesce(prod_sub.c.prod_inv, 0) total_avail = func.coalesce(buy_sub.c.buy_avail, 0) + \ func.coalesce(semi_sub.c.semi_avail, 0) + \ func.coalesce(prod_sub.c.prod_avail, 0) # 主查询,关联聚合子查询 query = db.session.query( MaterialBase, total_inv.label('total_inv'), total_avail.label('total_avail') ).outerjoin(buy_sub, MaterialBase.id == buy_sub.c.base_id) \ .outerjoin(semi_sub, MaterialBase.id == semi_sub.c.base_id) \ .outerjoin(prod_sub, MaterialBase.id == prod_sub.c.base_id) if filters: # 1. 关键词模糊搜索 if filters.get('keyword'): kw = f"%{filters['keyword']}%" query = query.filter(or_( MaterialBase.name.ilike(kw), MaterialBase.common_name.ilike(kw), MaterialBase.spec_model.ilike(kw) )) # 2. 精确筛选 company = filters.get('company') if company is not None and company != '': query = query.filter(MaterialBase.company_name.ilike(company.strip())) category = filters.get('category') if category is not None and category != '': query = query.filter(MaterialBase.category.ilike(category.strip())) type_val = filters.get('type') if type_val is not None and type_val != '': query = query.filter(MaterialBase.material_type.ilike(type_val.strip())) if filters.get('isEnabled') is not None: is_active = bool(int(filters['isEnabled'])) query = query.filter_by(is_enabled=is_active) # 3. 高级动态筛选 advanced_filters = filters.get('advancedFilters', []) if advanced_filters: allowed_fields = { 'companyName': 'company_name', 'name': 'name', 'commonName': 'common_name', 'category': 'category', 'type': 'material_type', 'spec': 'spec_model', 'unit': 'unit', 'inventoryCount': total_inv, 'availableCount': total_avail } filter_conditions = [] for condition in advanced_filters: field = condition.get('field') operator = condition.get('operator') value = condition.get('value') if not field or not operator or value is None: continue db_field = allowed_fields.get(field) if not db_field: continue # 对于聚合字段 (inventoryCount, availableCount),需要使用子查询别名 if isinstance(db_field, type(total_inv)): column = db_field else: column = getattr(MaterialBase, db_field, None) if column is None: continue if operator == 'eq': filter_conditions.append(column == value) elif operator == 'ne': filter_conditions.append(column != value) elif operator == 'contains': filter_conditions.append(column.ilike(f'%{value}%')) elif operator == 'ge': try: num_val = float(value) filter_conditions.append(column >= num_val) except ValueError: continue elif operator == 'le': try: num_val = float(value) filter_conditions.append(column <= num_val) except ValueError: continue if filter_conditions: query = query.filter(and_(*filter_conditions)) # 排序处理(支持全字段) order_by_column = filters.get('orderByColumn', '') is_asc = filters.get('isAsc', None) if order_by_column: # 字段映射 sort_field_map = { 'companyName': MaterialBase.company_name, 'name': MaterialBase.name, 'commonName': MaterialBase.common_name, 'category': MaterialBase.category, 'type': MaterialBase.material_type, 'spec': MaterialBase.spec_model, 'unit': MaterialBase.unit, 'inventoryCount': total_inv, 'availableCount': total_avail } sort_column = sort_field_map.get(order_by_column) if sort_column is not None: if is_asc == 'asc': query = query.order_by(sort_column.asc()) elif is_asc == 'desc': query = query.order_by(sort_column.desc()) else: # 默认排序:优先按总库存数降序,当库存相同时,再按规格型号升序 query = query.order_by(total_inv.desc(), MaterialBase.spec_model.asc()) # 分页 pagination = query.paginate(page=page, per_page=limit, error_out=False) items_list = [] for item, inv, avail in pagination.items: item_dict = item.to_dict() item_dict['inventoryCount'] = float(inv) if inv is not None else 0.0 item_dict['availableCount'] = float(avail) if avail is not None else 0.0 items_list.append(item_dict) return {"total": pagination.total, "items": items_list} except Exception as e: traceback.print_exc() print(f"查询基础信息列表失败: {e}") return {"total": 0, "items": []} @staticmethod def get_distinct_options(): """ 获取所有已存在的类别、类型、公司 (去重且排序) """ try: # 1. 类别 (获取后在内存或前端做层级处理,这里先按字母序返回扁平列表) categories = db.session.query(MaterialBase.category) \ .filter(MaterialBase.category != None, MaterialBase.category != '') \ .distinct().all() # 对类别进行排序 sorted_categories = sorted([c[0] for c in categories]) # 2. 类型 types = db.session.query(MaterialBase.material_type) \ .filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \ .distinct().all() sorted_types = sorted([t[0] for t in types]) # 3. 公司 companies = db.session.query(MaterialBase.company_name) \ .filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \ .distinct().all() sorted_companies = sorted([c[0] for c in companies]) return { "categories": sorted_categories, "types": sorted_types, "companies": sorted_companies } except Exception as e: traceback.print_exc() return {"categories": [], "types": [], "companies": []} @staticmethod def create_material(data): """新增基础信息""" try: if not data.get('name') or not data.get('spec'): raise ValueError("名称和规格型号不能为空") exist = MaterialBase.query.filter_by( name=data['name'], spec_model=data['spec'] ).first() if exist: raise ValueError(f"已存在相同名称和规格的数据 (ID: {exist.id})") new_material = MaterialBase( company_name=data.get('companyName'), name=data['name'], common_name=data.get('commonName'), spec_model=data['spec'], category=data.get('category'), material_type=data.get('type'), unit=data.get('unit'), visibility_level=data.get('visibilityLevel'), manual_link=json.dumps(data.get('generalManual', [])), product_image=json.dumps(data.get('generalImage', [])), is_enabled=True if data.get('isEnabled', 1) == 1 else False ) db.session.add(new_material) db.session.commit() return new_material except Exception as e: db.session.rollback() raise e @staticmethod def update_material(m_id, data): """修改基础信息""" try: material = MaterialBase.query.get(m_id) if not material: raise ValueError("数据不存在") # 更新字段 if 'companyName' in data: material.company_name = data['companyName'] if 'name' in data: material.name = data['name'] if 'commonName' in data: material.common_name = data['commonName'] if 'spec' in data: material.spec_model = data['spec'] if 'category' in data: material.category = data['category'] if 'type' in data: material.material_type = data['type'] if 'unit' in data: material.unit = data['unit'] if 'visibilityLevel' in data: material.visibility_level = data['visibilityLevel'] if 'generalManual' in data: material.manual_link = json.dumps(data['generalManual']) if 'generalImage' in data: material.product_image = json.dumps(data['generalImage']) if 'isEnabled' in data: material.is_enabled = bool(int(data['isEnabled'])) db.session.commit() return material except Exception as e: db.session.rollback() raise e @staticmethod def delete_material(m_id): """ 删除基础信息 (带依赖检查) """ try: material = MaterialBase.query.get(m_id) if not material: raise ValueError("数据不存在") buy_usage_count = StockBuy.query.filter_by(base_id=m_id).count() semi_usage_count = StockSemi.query.filter_by(base_id=m_id).count() prod_usage_count = StockProduct.query.filter_by(base_id=m_id).count() total_usage = buy_usage_count + semi_usage_count + prod_usage_count if total_usage > 0: raise ValueError( f"无法删除:该基础物料正被使用中。\n" f"- 采购库存记录: {buy_usage_count} 条\n" f"- 半成品库存记录: {semi_usage_count} 条\n" f"- 成品库存记录: {prod_usage_count} 条\n" f"请先清理相关库存或仅‘禁用’此条目。" ) db.session.delete(material) db.session.commit() return True except Exception as e: db.session.rollback() print(f"删除基础信息失败: {e}") raise e # ============================================================================== # [核心修改] 统一资产统计导出(增加最高单价计算逻辑) # ============================================================================== @staticmethod def export_excel(filters=None, user_permissions=None): """ 全口径资产统计报表: 根据基础信息列表和库存表,计算出每个物料的最高历史单价并进行导出 """ try: # 1. 构造基础信息的筛选条件 (用于过滤库存) filter_conditions = [] if filters: if filters.get('keyword'): kw = f"%{filters['keyword']}%" filter_conditions.append(or_( MaterialBase.name.ilike(kw), MaterialBase.common_name.ilike(kw), MaterialBase.spec_model.ilike(kw), MaterialBase.company_name.ilike(kw) )) company = filters.get('company') if company is not None and company != '': filter_conditions.append(MaterialBase.company_name.ilike(company.strip())) category = filters.get('category') if category is not None and category != '': filter_conditions.append(MaterialBase.category.ilike(category.strip())) type_val = filters.get('type') if type_val is not None and type_val != '': filter_conditions.append(MaterialBase.material_type.ilike(type_val.strip())) if filters.get('isEnabled') is not None: is_active = bool(int(filters['isEnabled'])) filter_conditions.append(MaterialBase.is_enabled == is_active) # 2. 分别查询三个库存表,并 Join MaterialBase 进行筛选 # 2.1 采购库存 (StockBuy) query_buy = db.session.query(StockBuy, MaterialBase).join( MaterialBase, StockBuy.base_id == MaterialBase.id ) for cond in filter_conditions: query_buy = query_buy.filter(cond) list_buy = query_buy.all() # 2.2 半成品库存 (StockSemi) query_semi = db.session.query(StockSemi, MaterialBase).join( MaterialBase, StockSemi.base_id == MaterialBase.id ) for cond in filter_conditions: query_semi = query_semi.filter(cond) list_semi = query_semi.all() # 2.3 成品库存 (StockProduct) query_product = db.session.query(StockProduct, MaterialBase).join( MaterialBase, StockProduct.base_id == MaterialBase.id ) for cond in filter_conditions: query_product = query_product.filter(cond) list_product = query_product.all() # ==================================================== # [核心新增] 预先计算每个 base_id 的全局最高历史单价 # 优先级:采购件 > 半成品 > 成品 # ==================================================== buy_max_prices = {} for stock, base in list_buy: price = float(stock.pre_tax_unit_price or 0) if price > buy_max_prices.get(base.id, 0): buy_max_prices[base.id] = price semi_max_prices = {} for stock, base in list_semi: # 半成品的单价直接取自 manual_cost 字段(单件总成本) price = float(stock.manual_cost or 0) if price > semi_max_prices.get(base.id, 0): semi_max_prices[base.id] = price product_max_prices = {} for stock, base in list_product: # 成品的单价直接取自 manual_cost 字段(单件总成本) price = float(stock.manual_cost or 0) if price > product_max_prices.get(base.id, 0): product_max_prices[base.id] = price # 构造获取某个物料最高价的闭包函数 def get_highest_price(base_id): if base_id in buy_max_prices and buy_max_prices[base_id] > 0: return buy_max_prices[base_id] if base_id in semi_max_prices and semi_max_prices[base_id] > 0: return semi_max_prices[base_id] if base_id in product_max_prices and product_max_prices[base_id] > 0: return product_max_prices[base_id] return 0.0 # 3. 数据整合 all_rows = [] # 处理采购件 for stock, base in list_buy: qty = float(stock.stock_quantity or 0) # 使用该物料的全局最高单价作为不含税单价 highest_excl_price = get_highest_price(base.id) tax_rate = float(stock.tax_rate or 0) # 计算含税单价和总额 highest_incl_price = highest_excl_price * (1 + tax_rate / 100.0) total_val_excl = qty * highest_excl_price total_val_incl = qty * highest_incl_price ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku all_rows.append({ "base": base, "type_name": "采购件", "ident": ident, "loc": stock.warehouse_location, "source": stock.supplier_name, "date": stock.in_date, "qty": qty, "avail": float(stock.available_quantity or 0), "price_excl": highest_excl_price, "total_val_excl": total_val_excl, "tax": tax_rate, "price_incl": highest_incl_price, "total_val": total_val_incl }) # 处理半成品 for stock, base in list_semi: qty = float(stock.stock_quantity or 0) # 半成品的单价直接取自 manual_cost 字段(单件总成本) unit_cost = float(stock.manual_cost or 0) total_val_excl = qty * unit_cost total_val_incl = qty * unit_cost # 半成品无税 ident = stock.batch_number or stock.serial_number or stock.barcode or stock.sku all_rows.append({ "base": base, "type_name": "半成品", "ident": ident, "loc": stock.warehouse_location, "source": stock.production_manager, "date": stock.production_date, "qty": qty, "avail": float(stock.available_quantity or 0), "price_excl": unit_cost, "total_val_excl": total_val_excl, "tax": 0.0, "price_incl": unit_cost, "total_val": total_val_incl }) # 处理成品 for stock, base in list_product: qty = float(stock.stock_quantity or 0) # 成品的单价直接取自 manual_cost 字段(单件总成本) unit_cost = float(stock.manual_cost or 0) total_val_excl = qty * unit_cost total_val_incl = qty * unit_cost ident = stock.serial_number or stock.barcode or stock.sku all_rows.append({ "base": base, "type_name": "成品", "ident": ident, "loc": stock.warehouse_location, "source": stock.production_manager, "date": stock.production_date, "qty": qty, "avail": float(stock.available_quantity or 0), "price_excl": unit_cost, "total_val_excl": total_val_excl, "tax": 0.0, "price_incl": unit_cost, "total_val": total_val_incl }) # 4. 排序:按公司 -> 规格型号 -> 基础ID -> 批号 排序 all_rows.sort(key=lambda x: ( x['base'].company_name or "", x['base'].spec_model or "", x['base'].id, x['ident'] or "" )) # 5. 生成 Excel wb = Workbook() ws = wb.active ws.title = "库存统计" # 表头 (严格对应你的图 5) headers = [ "所属公司", "资产名称", "规格型号", "物料类型", "类别一级", "类别二级", "类别三级", "类别四级", "类别五级", "计量单位", "库存性质", "唯一标识码 (批号/SN)", "仓库位置", "资产来源", "入库/生产日期", "库存数量", "可用数量", "单价/成本 (不含税)", "资产总额 (不含税)", "税率 (%)", "单价/成本 (含税)", "资产总额 (含税)" ] ws.append(headers) # 确定各字段在表头中的列索引 col_idx = {} for idx, header in enumerate(headers): if header == "所属公司": col_idx['companyName'] = idx elif header == "资产名称": col_idx['name'] = idx elif header == "规格型号": col_idx['spec'] = idx elif header == "物料类型": col_idx['type'] = idx elif header in ("类别一级", "类别二级", "类别三级", "类别四级", "类别五级"): col_idx.setdefault('category_cols', []).append(idx) elif header == "计量单位": col_idx['unit'] = idx elif header == "库存数量": col_idx['inventoryCount'] = idx elif header == "可用数量": col_idx['availableCount'] = idx elif header == "单价/成本 (不含税)": col_idx['price_excl'] = idx elif header == "资产总额 (不含税)": col_idx['total_val_excl'] = idx elif header == "税率 (%)": col_idx['tax'] = idx elif header == "单价/成本 (含税)": col_idx['price_incl'] = idx elif header == "资产总额 (含税)": col_idx['total_val'] = idx # 样式 header_fill = PatternFill(start_color="D7E4BC", end_color="D7E4BC", fill_type="solid") border_style = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin')) for cell in ws[1]: cell.font = Font(bold=True, name='微软雅黑') cell.alignment = Alignment(horizontal='center', vertical='center') cell.fill = header_fill cell.border = border_style # 字段到权限码的映射 field_to_perm = { 'companyName': 'material_list:companyName', 'name': 'material_list:name', 'spec': 'material_list:spec', 'type': 'material_list:type', 'unit': 'material_list:unit', 'category': 'material_list:category', 'inventoryCount': 'material_list:inventoryCount', 'availableCount': 'material_list:availableCount' } # 写入数据,并脱敏 for r in all_rows: base = r['base'] # 类别拆分 cat_parts = (base.category or "").split('/') while len(cat_parts) < 5: cat_parts.append("") # 日期格式化 date_str = r['date'].strftime('%Y-%m-%d') if isinstance(r['date'], datetime.date) else "" row_val = [ base.company_name, base.name, base.spec_model, base.material_type, cat_parts[0], cat_parts[1], cat_parts[2], cat_parts[3], cat_parts[4], base.unit, r['type_name'], r['ident'], r['loc'], r['source'], date_str, r['qty'], r['avail'], r['price_excl'], r['total_val_excl'], r['tax'], r['price_incl'], r['total_val'] ] # 根据用户权限脱敏 if user_permissions is not None: for field, perm_code in field_to_perm.items(): if perm_code not in user_permissions: if field == 'category': for cat_idx in col_idx.get('category_cols', []): row_val[cat_idx] = '' elif field in col_idx: row_val[col_idx[field]] = '' # 联动脱敏:根据数据来源,校验对应模块的价格/成本权限 if user_permissions is not None: # 超级管理员拥有所有权限,跳过价格脱敏 if 'material_list:*' in user_permissions: # 拥有通配符权限,不隐藏价格列 pass else: has_price_perm = True row_type = r['type_name'] # 根据数据来源检查对应模块的权限 if row_type == '采购件': # 校验采购模块的价格权限 has_price_perm = any(p in user_permissions for p in ['inbound_buy:postTaxUnitPrice', 'inbound_buy:preTaxUnitPrice', 'inbound_buy:totalAmount']) elif row_type == '半成品': # 校验半成品模块的成本权限 has_price_perm = any(p in user_permissions for p in ['inbound_semi:rawMaterialCost', 'inbound_semi:manualCost']) elif row_type == '成品': # 校验成品模块的成本权限 has_price_perm = any(p in user_permissions for p in ['inbound_product:rawMaterialCost', 'inbound_product:manualCost']) else: # 未知类型,默认隐藏价格列 has_price_perm = False # 如果没有对应模块的价格查看权限,则清空涉密的5个列 if not has_price_perm: for p_col in ['price_excl', 'total_val_excl', 'tax', 'price_incl', 'total_val']: if p_col in col_idx: row_val[col_idx[p_col]] = '' ws.append(row_val) # 列宽调整 dims = {} for row in ws.rows: for cell in row: if cell.value: dims[cell.column_letter] = max((dims.get(cell.column_letter, 0), len(str(cell.value)))) for col, value in dims.items(): ws.column_dimensions[col].width = min(value + 2, 30) output = io.BytesIO() wb.save(output) output.seek(0) return output except Exception as e: traceback.print_exc() raise e