# inventory-backend/app/services/inbound/buy_service.py from app.extensions import db from app.models.inbound.buy import StockBuy from app.models.base import MaterialBase from datetime import datetime, timedelta, timezone from sqlalchemy import or_, func, text, and_ import traceback import json class BuyInboundService: # ============================================================ # 0. 辅助:唯一性校验 # ============================================================ @staticmethod def _check_unique(base_id, serial_number, batch_number, exclude_id=None): if serial_number: query = StockBuy.query.filter(StockBuy.serial_number == serial_number) if exclude_id: query = query.filter(StockBuy.id != exclude_id) exists = query.first() if exists: occupied_name = exists.base.name if exists.base else "未知物料" raise ValueError(f"序列号【{serial_number}】已存在!被物料 [{occupied_name}] 占用,请核查。") if batch_number and base_id: query = StockBuy.query.filter( StockBuy.base_id == base_id, StockBuy.batch_number == batch_number ) if exclude_id: query = query.filter(StockBuy.id != exclude_id) if query.first(): raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复录入,可直接在该批次下追加库存。") # ============================================================ # 1. 基础物料搜索 # ============================================================ @staticmethod def search_base_material(keyword, page=1, limit=50): try: query = MaterialBase.query.filter(MaterialBase.is_enabled == True) if keyword: k = keyword.strip() k_str = f'%{k}%' query = query.filter(and_( or_( MaterialBase.name.ilike(k_str), MaterialBase.spec_model.ilike(k_str), MaterialBase.company_name.ilike(k_str) # 支持搜公司 ) )) query = query.order_by(MaterialBase.id.desc()) pagination = query.paginate(page=page, per_page=limit, error_out=False) items = [] for item in pagination.items: items.append({ 'id': item.id, 'company_name': item.company_name, # [新增] 'name': item.name, 'spec': item.spec_model, 'category': item.category, 'unit': item.unit, 'type': item.material_type, 'brand': getattr(item, 'brand', ''), 'manufacturer': getattr(item, 'manufacturer', ''), 'pinyin': getattr(item, 'pinyin', ''), 'status': '启用' }) return { "items": items, "total": pagination.total, "page": page, "has_next": pagination.has_next } except Exception as e: traceback.print_exc() return {"items": [], "total": 0, "page": 1, "has_next": False} # ============================================================ # 2. 新增入库逻辑 # ============================================================ @staticmethod def handle_inbound(data): try: base_id = data.get('base_id') if not base_id: raise ValueError("必须选择基础物料") material = MaterialBase.query.get(base_id) if not material: raise ValueError("所选物料不存在") if not material.is_enabled: raise ValueError(f"物料【{material.name}】已停用") BuyInboundService._check_unique( base_id=base_id, serial_number=data.get('serial_number'), batch_number=data.get('batch_number') ) beijing_tz = timezone(timedelta(hours=8)) current_time = datetime.now(beijing_tz).replace(tzinfo=None) in_date_val = current_time if data.get('in_date'): try: date_str = str(data['in_date']) if len(date_str) > 10: in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') else: d_temp = datetime.strptime(date_str, '%Y-%m-%d') in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day, current_time.hour, current_time.minute, current_time.second) except: in_date_val = current_time in_qty = float(data.get('in_quantity') or 0) u_price = float(data.get('unit_price') or 0) tax_rate = float(data.get('tax_rate') or 0) # 计算税后单价 post_tax_price = float(data.get('post_tax_unit_price') or 0) if post_tax_price == 0 and u_price > 0: tax_multiplier = 1 + (tax_rate / 100) post_tax_price = u_price * tax_multiplier try: seq_sql = text("SELECT nextval('global_print_seq')") result = db.session.execute(seq_sql) next_global_id = result.scalar() except: next_global_id = None generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S') final_barcode = data.get('barcode') or generated_sku new_stock = StockBuy( base_id=material.id, global_print_id=next_global_id, sku=generated_sku, barcode=final_barcode, in_date=in_date_val, serial_number=data.get('serial_number'), batch_number=data.get('batch_number'), status=data.get('status', '在库'), in_quantity=in_qty, stock_quantity=in_qty, available_quantity=in_qty, inspection_status=data.get('inspection_status', '未检'), warehouse_location=data.get('warehouse_location'), # 价格信息 pre_tax_unit_price=u_price, post_tax_unit_price=post_tax_price, tax_rate=tax_rate, total_price=in_qty * u_price, currency=data.get('currency', 'CNY'), exchange_rate=data.get('exchange_rate', 1.0), supplier_name=data.get('supplier_name'), buyer_name=data.get('purchaser'), buyer_email=data.get('purchaser_email'), original_link=data.get('source_link'), detail_link=data.get('detail_link'), arrival_photo=json.dumps(data.get('arrival_photo', [])), inspection_report=json.dumps(data.get('inspection_report', [])) ) db.session.add(new_stock) db.session.commit() return new_stock except Exception as e: db.session.rollback() raise e # ============================================================ # 3. 更新入库 # ============================================================ @staticmethod def update_inbound(stock_id, data): try: stock = StockBuy.query.get(stock_id) if not stock: raise ValueError("记录不存在") BuyInboundService._check_unique(base_id=data.get('base_id', stock.base_id), serial_number=data.get('serial_number', stock.serial_number), batch_number=data.get('batch_number', stock.batch_number), exclude_id=stock_id) field_mapping = {'sku': 'sku', 'barcode': 'barcode', 'base_id': 'base_id', 'warehouse_location': 'warehouse_location', 'serial_number': 'serial_number', 'batch_number': 'batch_number', 'status': 'status', 'inspection_status': 'inspection_status', 'supplier_name': 'supplier_name', 'detail_link': 'detail_link', 'currency': 'currency', 'exchange_rate': 'exchange_rate', 'purchaser': 'buyer_name', 'purchaser_email': 'buyer_email', 'source_link': 'original_link'} for k, v in field_mapping.items(): if k in data: setattr(stock, v, data[k]) if 'arrival_photo' in data: stock.arrival_photo = json.dumps(data['arrival_photo']) if 'inspection_report' in data: stock.inspection_report = json.dumps(data['inspection_report']) # 更新税率 if 'tax_rate' in data: stock.tax_rate = float(data['tax_rate']) # 更新税前单价 if 'unit_price' in data: stock.pre_tax_unit_price = float(data['unit_price']) # 更新税后单价 if 'post_tax_unit_price' in data: stock.post_tax_unit_price = float(data['post_tax_unit_price']) else: # 如果税后单价没有提供,根据税前单价和税率计算 if 'unit_price' in data or 'tax_rate' in data: tax_multiplier = 1 + (float(data.get('tax_rate', stock.tax_rate or 0)) / 100) stock.post_tax_unit_price = float(stock.pre_tax_unit_price) * tax_multiplier if 'in_quantity' in data: diff = float(data['in_quantity']) - float(stock.in_quantity) if diff != 0: stock.in_quantity = float(data['in_quantity']) stock.stock_quantity = float(stock.stock_quantity) + diff stock.available_quantity = float(stock.available_quantity) + diff # 重新计算总价 stock.total_price = float(stock.in_quantity) * float(stock.pre_tax_unit_price) db.session.commit() return stock except Exception as e: db.session.rollback() raise e # ============================================================ # 4. 删除 # ============================================================ @staticmethod def delete_inbound(stock_id): try: stock = StockBuy.query.get(stock_id) if not stock: raise ValueError("记录不存在") db.session.delete(stock) db.session.commit() return True except Exception as e: db.session.rollback() raise e # ============================================================ # 5. 获取列表 (支持排序和高级筛选) # ============================================================ @staticmethod def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None, company=None, order_by='', is_asc='', advanced_filters=None): try: from sqlalchemy import and_, or_ query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id) # 1. 通用关键词搜索 if keyword: k_str = f'%{keyword.strip()}%' conditions = [ StockBuy.sku.ilike(k_str), StockBuy.barcode.ilike(k_str), StockBuy.batch_number.ilike(k_str), StockBuy.serial_number.ilike(k_str), StockBuy.supplier_name.ilike(k_str), StockBuy.buyer_name.ilike(k_str), MaterialBase.name.ilike(k_str), MaterialBase.spec_model.ilike(k_str), MaterialBase.company_name.ilike(k_str), # 关键词也支持搜公司 ] query = query.filter(or_(*conditions)) # 2. 类别独立搜索 if category and category.strip(): query = query.filter(MaterialBase.category == category.strip()) # 3. 类型独立搜索 if material_type and material_type.strip(): query = query.filter(MaterialBase.material_type == material_type.strip()) # 3.1 公司独立搜索 [新增] if company and company.strip(): query = query.filter(MaterialBase.company_name == company.strip()) # 4. 状态筛选 if not statuses: statuses = ['在库', '借库'] if '已出库' in statuses: query = query.filter(StockBuy.status.in_(statuses)) else: query = query.filter(and_(StockBuy.status.in_(statuses), StockBuy.stock_quantity > 0)) # 5. 高级动态筛选 if advanced_filters: allowed_fields = { 'company_name': MaterialBase.company_name, 'material_name': MaterialBase.name, 'material_type': MaterialBase.material_type, 'category': MaterialBase.category, 'spec_model': MaterialBase.spec_model, 'unit': MaterialBase.unit, 'sku': StockBuy.sku, 'barcode': StockBuy.barcode, 'batch_number': StockBuy.batch_number, 'serial_number': StockBuy.serial_number, 'warehouse_location': StockBuy.warehouse_location, 'status': StockBuy.status, 'inspection_status': StockBuy.inspection_status, 'qty_inbound': StockBuy.in_quantity, 'qty_stock': StockBuy.stock_quantity, 'qty_available': StockBuy.available_quantity, 'unit_price': StockBuy.pre_tax_unit_price, 'total_price': StockBuy.total_price, 'tax_rate': StockBuy.tax_rate, 'currency': StockBuy.currency, 'exchange_rate': StockBuy.exchange_rate, 'supplier_name': StockBuy.supplier_name, 'purchaser': StockBuy.buyer_name, 'purchaser_email': StockBuy.buyer_email, 'source_link': StockBuy.original_link, 'detail_link': StockBuy.detail_link, } filter_conditions = [] for condition in advanced_filters: field = condition.get('field') operator = condition.get('operator') value = condition.get('value') if not field or not operator or value is None: continue column = allowed_fields.get(field) if column is None: continue if operator == 'eq': filter_conditions.append(column == value) elif operator == 'ne': filter_conditions.append(column != value) elif operator == 'contains': filter_conditions.append(column.ilike(f'%{value}%')) elif operator == 'ge': try: num_val = float(value) filter_conditions.append(column >= num_val) except ValueError: continue elif operator == 'le': try: num_val = float(value) filter_conditions.append(column <= num_val) except ValueError: continue if filter_conditions: query = query.filter(and_(*filter_conditions)) # 6. 排序处理 if order_by: sort_field_map = { 'company_name': MaterialBase.company_name, 'material_name': MaterialBase.name, 'material_type': MaterialBase.material_type, 'category': MaterialBase.category, 'spec_model': MaterialBase.spec_model, 'unit': MaterialBase.unit, 'sku': StockBuy.sku, 'barcode': StockBuy.barcode, 'inbound_date': StockBuy.in_date, 'serial_number': StockBuy.serial_number, 'batch_number': StockBuy.batch_number, 'status': StockBuy.status, 'inspection_status': StockBuy.inspection_status, 'qty_inbound': StockBuy.in_quantity, 'qty_stock': StockBuy.stock_quantity, 'qty_available': StockBuy.available_quantity, 'warehouse_loc': StockBuy.warehouse_location, 'unit_price': StockBuy.pre_tax_unit_price, 'total_price': StockBuy.total_price, 'tax_rate': StockBuy.tax_rate, 'currency': StockBuy.currency, 'exchange_rate': StockBuy.exchange_rate, 'supplier_name': StockBuy.supplier_name, 'purchaser': StockBuy.buyer_name, 'purchaser_email': StockBuy.buyer_email, 'source_link': StockBuy.original_link, 'detail_link': StockBuy.detail_link, } column = sort_field_map.get(order_by) if column: if is_asc == 'asc': query = query.order_by(column.asc()) elif is_asc == 'desc': query = query.order_by(column.desc()) else: # 默认排序 query = query.order_by(StockBuy.in_date.desc()) pagination = query.paginate(page=page, per_page=limit, error_out=False) items = [] for item in pagination.items: items.append(item.to_dict()) # 直接使用 model 的 to_dict return {"total": pagination.total, "items": items} except Exception: traceback.print_exc() return {"total": 0, "items": []} # ============================================================ # 6. 获取筛选选项(类别、类型、公司)并排序 # ============================================================ @staticmethod def get_filter_options(): try: # 类别 categories = db.session.query(MaterialBase.category) \ .filter(MaterialBase.category != None, MaterialBase.category != '') \ .distinct().all() sorted_categories = sorted([r[0] for r in categories]) # 类型 types = db.session.query(MaterialBase.material_type) \ .filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \ .distinct().all() sorted_types = sorted([r[0] for r in types]) # [新增] 公司 companies = db.session.query(MaterialBase.company_name) \ .filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \ .distinct().all() sorted_companies = sorted([r[0] for r in companies]) return { "categories": sorted_categories, "types": sorted_types, "companies": sorted_companies } except Exception: traceback.print_exc() return {"categories": [], "types": [], "companies": []} # 7-10 建议类接口保持不变 @staticmethod def get_history_suppliers(base_id): return [r[0] for r in db.session.query(StockBuy.supplier_name).filter(StockBuy.base_id == base_id, StockBuy.supplier_name != '').distinct().all()] @staticmethod def get_history_purchasers(keyword): return [{'value': r.buyer_name, 'email': r.buyer_email} for r in db.session.query(StockBuy.buyer_name, StockBuy.buyer_email).filter( StockBuy.buyer_name != '').distinct().all()] @staticmethod def get_history_links(base_id, type): return [r[0] for r in db.session.query(StockBuy.original_link if type == 'original' else StockBuy.detail_link).filter( StockBuy.base_id == base_id).distinct().all()] @staticmethod def get_history_locations(base_id): return [r[0] for r in db.session.query(StockBuy.warehouse_location).filter(StockBuy.base_id == base_id).distinct().all()]