# app/services/inbound/semi_service.py from app.extensions import db from app.models.base import MaterialBase from app.models.outbound import TransOutbound from datetime import datetime, timedelta, timezone from sqlalchemy import or_, func, text, and_ import traceback import json class SemiInboundService: @staticmethod def _check_unique(base_id, serial_number, batch_number, exclude_id=None): from app.models.inbound.semi import StockSemi if serial_number: query = StockSemi.query.filter(StockSemi.serial_number == serial_number) if exclude_id: query = query.filter(StockSemi.id != exclude_id) exists = query.first() if exists: occupied_name = exists.base.name if (hasattr(exists, 'base') and exists.base) else "未知物料" raise ValueError(f"序列号【{serial_number}】已存在!被半成品 [{occupied_name}] 占用,请核查。") if batch_number and base_id: query = StockSemi.query.filter( StockSemi.base_id == base_id, StockSemi.batch_number == batch_number ) if exclude_id: query = query.filter(StockSemi.id != exclude_id) if query.first(): raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复建单,建议在原批次上追加库存。") @staticmethod def search_base_material(keyword, page=1, limit=50): try: query = MaterialBase.query.filter(MaterialBase.is_enabled == True) if keyword: kw = f'%{keyword}%' query = query.filter( or_( MaterialBase.name.ilike(kw), MaterialBase.spec_model.ilike(kw), MaterialBase.company_name.ilike(kw) ) ) query = query.order_by(MaterialBase.id.desc()) pagination = query.paginate(page=page, per_page=limit, error_out=False) results = [] for item in pagination.items: results.append({ 'id': item.id, 'company_name': item.company_name, 'name': item.name, 'spec': item.spec_model, 'category': item.category, 'unit': item.unit, 'type': item.material_type, 'status': '启用' }) return {"items": results, "total": pagination.total, "page": page, "has_next": pagination.has_next} except Exception as e: traceback.print_exc() return {"items": [], "total": 0, "page": 1, "has_next": False} @staticmethod def search_bom_options(keyword): from app.models.bom import BomTable try: query = db.session.query( BomTable.bom_no, BomTable.version, MaterialBase.name.label('parent_name'), MaterialBase.spec_model.label('parent_spec') ).join(MaterialBase, BomTable.parent_id == MaterialBase.id) if hasattr(BomTable, 'is_enabled'): query = query.filter(BomTable.is_enabled == True) if keyword: kw = f'%{keyword}%' query = query.filter( or_( BomTable.bom_no.ilike(kw), MaterialBase.name.ilike(kw), MaterialBase.spec_model.ilike(kw) ) ) results = query.distinct().limit(20).all() return [{ 'bom_no': r.bom_no, 'version': r.version, 'parent_name': r.parent_name, 'parent_spec': r.parent_spec or '' } for r in results] except Exception: traceback.print_exc() return [] @staticmethod def handle_inbound(data): from app.models.inbound.semi import StockSemi try: base_id = data.get('base_id') if not base_id: raise ValueError("必须选择基础物料 (缺少 base_id)") material = MaterialBase.query.get(base_id) if not material: raise ValueError(f"ID为 {base_id} 的基础物料不存在") if not material.is_enabled: raise ValueError(f"物料【{material.name}】已停用,无法办理新入库。") SemiInboundService._check_unique( base_id=base_id, serial_number=data.get('serial_number'), batch_number=data.get('batch_number') ) beijing_tz = timezone(timedelta(hours=8)) current_time = datetime.now(beijing_tz).replace(tzinfo=None) in_date_val = current_time if data.get('in_date'): try: date_str = str(data['in_date']) if len(date_str) > 10: in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') else: d_temp = datetime.strptime(date_str, '%Y-%m-%d') in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day, current_time.hour, current_time.minute, current_time.second) except ValueError: in_date_val = current_time p_start = None p_end = None if data.get('production_start_time'): try: p_start = datetime.strptime(str(data['production_start_time']), '%Y-%m-%d %H:%M:%S') except: pass if data.get('production_end_time'): try: p_end = datetime.strptime(str(data['production_end_time']), '%Y-%m-%d %H:%M:%S') except: pass time_range_str = None raw_range = data.get('production_time_range') if isinstance(raw_range, list): time_range_str = " ~ ".join([str(x) for x in raw_range]) elif isinstance(raw_range, str): time_range_str = raw_range in_qty = float(data.get('in_quantity') or 0) raw_cost = float(data.get('raw_material_cost') or 0) bom_code = data.get('bom_code') bom_version = data.get('bom_version') # 根据是否有BOM决定单价 if bom_code and bom_version: # 使用BOM成本作为单价 unit_cost = SemiInboundService.calculate_bom_cost(bom_code, bom_version) else: unit_cost = raw_cost total_value = unit_cost * in_qty next_global_id = 0 try: seq_sql = text("SELECT nextval('global_print_seq')") result = db.session.execute(seq_sql) next_global_id = result.scalar() except Exception as e: print("❌ 数据库序列 global_print_seq 不存在,请执行SQL创建!") raise e generated_sku = str(next_global_id).zfill(10) final_sku = data.get('sku') if not final_sku: final_sku = generated_sku final_barcode = data.get('barcode') if not final_barcode: final_barcode = final_sku arrival_list = data.get('arrival_photo', []) quality_report_list = data.get('quality_report_link', []) if not isinstance(arrival_list, list): arrival_list = [] if not isinstance(quality_report_list, list): quality_report_list = [] new_stock = StockSemi( base_id=material.id, global_print_id=next_global_id, sku=final_sku, production_date=in_date_val, serial_number=data.get('serial_number'), batch_number=data.get('batch_number'), barcode=final_barcode, status='在库', quality_status=data.get('quality_status', '合格'), in_quantity=in_qty, stock_quantity=in_qty, available_quantity=in_qty, warehouse_location=data.get('warehouse_location'), bom_code=data.get('bom_code'), bom_version=data.get('bom_version'), work_order_code=data.get('work_order_code'), production_manager=data.get('production_manager'), production_start_time=p_start, production_end_time=p_end, production_time_range=time_range_str, raw_material_cost=raw_cost, manual_cost=unit_cost, # 映射到 manual_cost 物理字段 total_price=total_value, arrival_photo=json.dumps(arrival_list), quality_report_link=json.dumps(quality_report_list), detail_link=data.get('detail_link'), remark=data.get('remark') ) db.session.add(new_stock) db.session.commit() return new_stock except Exception as e: db.session.rollback() print("----- SemiInboundService Error -----") traceback.print_exc() raise e @staticmethod def update_inbound(stock_id, data): from app.models.inbound.semi import StockSemi try: stock = StockSemi.query.get(stock_id) if not stock: raise ValueError("记录不存在") new_base_id = data.get('base_id', stock.base_id) new_sn = data.get('serial_number', stock.serial_number) new_bn = data.get('batch_number', stock.batch_number) SemiInboundService._check_unique( base_id=new_base_id, serial_number=new_sn, batch_number=new_bn, exclude_id=stock_id ) field_mapping = { 'sku': 'sku', 'barcode': 'barcode', 'warehouse_location': 'warehouse_location', 'serial_number': 'serial_number', 'batch_number': 'batch_number', 'status': 'status', 'quality_status': 'quality_status', 'bom_code': 'bom_code', 'bom_version': 'bom_version', 'work_order_code': 'work_order_code', 'production_manager': 'production_manager', 'detail_link': 'detail_link', 'remark': 'remark' } for frontend_key, db_attr in field_mapping.items(): if frontend_key in data: setattr(stock, db_attr, data[frontend_key]) if 'arrival_photo' in data: imgs = data['arrival_photo'] if isinstance(imgs, list): stock.arrival_photo = json.dumps(imgs) if 'quality_report_link' in data: imgs = data['quality_report_link'] if isinstance(imgs, list): stock.quality_report_link = json.dumps(imgs) if 'production_start_time' in data: try: if data['production_start_time']: stock.production_start_time = datetime.strptime(str(data['production_start_time']), '%Y-%m-%d %H:%M:%S') else: stock.production_start_time = None except: pass if 'production_end_time' in data: try: if data['production_end_time']: stock.production_end_time = datetime.strptime(str(data['production_end_time']), '%Y-%m-%d %H:%M:%S') else: stock.production_end_time = None except: pass if 'production_time_range' in data: raw_range = data['production_time_range'] if isinstance(raw_range, list): stock.production_time_range = " ~ ".join([str(x) for x in raw_range]) else: stock.production_time_range = raw_range qty_changed = False if 'in_quantity' in data: new_qty = float(data['in_quantity']) diff = new_qty - float(stock.in_quantity) if diff != 0: stock.in_quantity = new_qty stock.stock_quantity = float(stock.stock_quantity) + diff stock.available_quantity = float(stock.available_quantity) + diff qty_changed = True if 'raw_material_cost' in data: stock.raw_material_cost = float(data['raw_material_cost']) # 决定单件成本:有BOM用BOM成本,否则用raw_material_cost bom_code = data.get('bom_code', stock.bom_code) bom_version = data.get('bom_version', stock.bom_version) raw_cost = float(data.get('raw_material_cost', stock.raw_material_cost or 0)) if bom_code and bom_version: try: unit_cost = SemiInboundService.calculate_bom_cost(bom_code, bom_version) except Exception: unit_cost = raw_cost else: unit_cost = raw_cost stock.manual_cost = unit_cost if 'unit_total_cost' in data or qty_changed: qty = float(stock.in_quantity or 1) stock.total_price = unit_cost * qty db.session.commit() return stock except Exception as e: db.session.rollback() raise e @staticmethod def delete_inbound(stock_id): from app.models.inbound.semi import StockSemi try: stock = StockSemi.query.get(stock_id) if not stock: raise ValueError("记录不存在") db.session.delete(stock) db.session.commit() return True except Exception as e: db.session.rollback() raise e @staticmethod def get_outbound_history(stock_id): try: records = TransOutbound.query.filter_by( source_table='stock_semi', stock_id=stock_id ).order_by(TransOutbound.outbound_time.desc()).all() return [r.to_dict() for r in records] except: return [] @staticmethod def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None, company=None, order_by_column=None, is_asc=None, advanced_filters=None): from app.models.inbound.semi import StockSemi try: query = db.session.query(StockSemi).outerjoin(MaterialBase, StockSemi.base_id == MaterialBase.id) if keyword: kw = f'%{keyword}%' query = query.filter( or_( MaterialBase.name.ilike(kw), MaterialBase.spec_model.ilike(kw), MaterialBase.company_name.ilike(kw), StockSemi.batch_number.ilike(kw), StockSemi.serial_number.ilike(kw), StockSemi.sku.ilike(kw), StockSemi.work_order_code.ilike(kw), StockSemi.bom_code.ilike(kw) ) ) if category and category.strip(): query = query.filter(MaterialBase.category == category.strip()) if material_type and material_type.strip(): query = query.filter(MaterialBase.material_type == material_type.strip()) if company and company.strip(): query = query.filter(MaterialBase.company_name == company.strip()) if not statuses: statuses = ['在库', '借库'] if '已出库' in statuses: query = query.filter(StockSemi.status.in_(statuses)) else: query = query.filter( and_( StockSemi.status.in_(statuses), StockSemi.stock_quantity > 0 ) ) # 动态高级筛选 if advanced_filters: if isinstance(advanced_filters, str): try: import json advanced_filters = json.loads(advanced_filters) except: advanced_filters = [] if isinstance(advanced_filters, list): field_mapping = { 'company_name': MaterialBase.company_name, 'material_name': MaterialBase.name, 'spec_model': MaterialBase.spec_model, 'category': MaterialBase.category, 'material_type': MaterialBase.material_type, 'unit': MaterialBase.unit, 'id': StockSemi.id, 'base_id': StockSemi.base_id, 'sku': StockSemi.sku, 'inbound_date': StockSemi.production_date, 'barcode': StockSemi.barcode, 'serial_number': StockSemi.serial_number, 'batch_number': StockSemi.batch_number, 'status': StockSemi.status, 'quality_status': StockSemi.quality_status, 'qty_inbound': StockSemi.in_quantity, 'qty_stock': StockSemi.stock_quantity, 'qty_available': StockSemi.available_quantity, 'warehouse_location': StockSemi.warehouse_location, 'bom_code': StockSemi.bom_code, 'bom_version': StockSemi.bom_version, 'work_order_code': StockSemi.work_order_code, 'raw_material_cost': StockSemi.raw_material_cost, 'unit_total_cost': StockSemi.manual_cost, 'total_price': StockSemi.total_price, 'production_manager': StockSemi.production_manager, 'production_start_time': StockSemi.production_start_time, 'production_end_time': StockSemi.production_end_time, } for cond in advanced_filters: field = cond.get('field') operator = cond.get('operator') value = cond.get('value') if not field or not operator: continue model_field = field_mapping.get(field) if model_field is None: continue # 防止 SQL 注入,只允许映射的字段 if operator == '=': query = query.filter(model_field == value) elif operator == '!=': query = query.filter(model_field != value) elif operator == 'like': query = query.filter(model_field.ilike(f'%{value}%')) elif operator == 'not_like': query = query.filter(~model_field.ilike(f'%{value}%')) elif operator == '>': if value.replace('.', '', 1).isdigit(): query = query.filter(model_field > float(value)) elif operator == '<': if value.replace('.', '', 1).isdigit(): query = query.filter(model_field < float(value)) elif operator == '>=': if value.replace('.', '', 1).isdigit(): query = query.filter(model_field >= float(value)) elif operator == '<=': if value.replace('.', '', 1).isdigit(): query = query.filter(model_field <= float(value)) # 动态排序 order_field = None if order_by_column and is_asc is not None: order_mapping = { 'id': StockSemi.id, 'base_id': StockSemi.base_id, 'company_name': MaterialBase.company_name, 'material_name': MaterialBase.name, 'category': MaterialBase.category, 'material_type': MaterialBase.material_type, 'spec_model': MaterialBase.spec_model, 'unit': MaterialBase.unit, 'sku': StockSemi.sku, 'inbound_date': StockSemi.production_date, 'barcode': StockSemi.barcode, 'serial_number': StockSemi.serial_number, 'batch_number': StockSemi.batch_number, 'status': StockSemi.status, 'quality_status': StockSemi.quality_status, 'qty_inbound': StockSemi.in_quantity, 'qty_stock': StockSemi.stock_quantity, 'qty_available': StockSemi.available_quantity, 'warehouse_loc': StockSemi.warehouse_location, 'bom_code': StockSemi.bom_code, 'bom_version': StockSemi.bom_version, 'work_order_code': StockSemi.work_order_code, 'raw_material_cost': StockSemi.raw_material_cost, 'unit_total_cost': StockSemi.manual_cost, 'total_price': StockSemi.total_price, 'production_manager': StockSemi.production_manager, 'production_start_time': StockSemi.production_start_time, 'production_end_time': StockSemi.production_end_time, } order_field = order_mapping.get(order_by_column) if order_field is not None: if is_asc == 'true' or is_asc == True: query = query.order_by(order_field.asc()) else: query = query.order_by(order_field.desc()) if order_field is None: query = query.order_by(StockSemi.production_date.desc()) pagination = query.paginate(page=page, per_page=limit, error_out=False) items = [] for item in pagination.items: item_dict = item.to_dict() item_dict['unit_total_cost'] = float(item.manual_cost or 0) items.append(item_dict) return {"total": pagination.total, "items": items} except Exception as e: print(f"List Error: {e}") traceback.print_exc() return {"total": 0, "items": []} @staticmethod def search_system_users(keyword): from app.models.system import SysUser try: query = SysUser.query.filter(SysUser.status == 'active') if keyword: kw = f'%{keyword}%' query = query.filter(db.or_( SysUser.username.ilike(kw), SysUser.email.ilike(kw) )) query = query.order_by(SysUser.username) users = [] for u in query.limit(20).all(): users.append({ 'value': u.username, 'email': u.email }) return users except Exception: return [] @staticmethod def get_filter_options(): try: from app.models.base import MaterialBase categories = db.session.query(MaterialBase.category).filter(MaterialBase.category != None, MaterialBase.category != '').distinct().all() sorted_categories = sorted([r[0] for r in categories]) types = db.session.query(MaterialBase.material_type).filter(MaterialBase.material_type != None, MaterialBase.material_type != '').distinct().all() sorted_types = sorted([r[0] for r in types]) companies = db.session.query(MaterialBase.company_name).filter(MaterialBase.company_name != None, MaterialBase.company_name != '').distinct().all() sorted_companies = sorted([r[0] for r in companies]) return { "categories": sorted_categories, "types": sorted_types, "companies": sorted_companies } except Exception: traceback.print_exc() return {"categories": [], "types": [], "companies": []} @staticmethod def get_history_managers(keyword=None): from app.models.inbound.semi import StockSemi try: query = db.session.query(StockSemi.production_manager).filter( StockSemi.production_manager.isnot(None), StockSemi.production_manager != '' ) if keyword: query = query.filter(StockSemi.production_manager.ilike(f'%{keyword}%')) records = query.distinct().all() return [r[0] for r in records if r[0]] except Exception: traceback.print_exc() return [] @staticmethod def calculate_bom_cost(bom_no, bom_version): """ 根据 BOM 编号和版本计算原材料总成本 遍历 BOM 子件,使用原生 SQL 查物理表 bom_table,取每个子件在采购、半成品、成品三个表中的最高单价,乘以用量后累加 """ from app.models.inbound.buy import StockBuy from app.models.inbound.semi import StockSemi from app.models.inbound.product import StockProduct from sqlalchemy import func, text try: # 使用原生 SQL 精准查询 bom_table,避免模型映射错误 sql = text(""" SELECT child_id, dosage FROM bom_table WHERE bom_no = :bom_no AND version = :version """) bom_lines = db.session.execute(sql, {'bom_no': bom_no, 'version': bom_version}).fetchall() total_cost = 0.0 for line in bom_lines: component_base_id = line[0] # child_id usage_qty = float(line[1] or 1.0) # dosage # 1. 查采购表最高价 (不含税) buy_price = db.session.query(func.max(StockBuy.pre_tax_unit_price)).filter( StockBuy.base_id == component_base_id ).scalar() or 0.0 # 2. 查半成品表最高价 (单件成本映射存在 manual_cost 里了) semi_price = db.session.query(func.max(StockSemi.manual_cost)).filter( StockSemi.base_id == component_base_id ).scalar() or 0.0 # 3. 查成品表最高价 (同样存储在 manual_cost 字段里) product_price = db.session.query(func.max(StockProduct.manual_cost)).filter( StockProduct.base_id == component_base_id ).scalar() or 0.0 # 4. 取三个表中的最大值,乘以用量 (dosage) max_price = max(float(buy_price), float(semi_price), float(product_price)) total_cost += max_price * usage_qty return round(total_cost, 2) except Exception as e: traceback.print_exc() raise e