# inventory-backend/app/services/inbound/service_service.py from app.extensions import db from app.models.inbound.service import StockService from app.models.base import MaterialBase from datetime import datetime, timedelta import re import traceback class ServiceService: """服务权益库存业务逻辑""" SKU_PREFIX = 'SRV' SKU_DATE_FORMAT = '%Y%m%d' SKU_SUFFIX_LEN = 4 @classmethod def _generate_sku(cls): """生成唯一SKU,格式 SRV-YYYYMMDD-XXXX""" today_str = datetime.now().strftime(cls.SKU_DATE_FORMAT) prefix = f'{cls.SKU_PREFIX}-{today_str}-' # 查找今天已有的最大后缀 max_sku = db.session.query(db.func.max(StockService.sku)).filter( StockService.sku.like(f'{prefix}%') ).scalar() if not max_sku: suffix_num = 1 else: # 提取后缀数字 suffix_part = max_sku.replace(prefix, '') try: match = re.search(r'(\d+)$', suffix_part) suffix_num = int(match.group(1)) if match else 0 except: suffix_num = 0 suffix_num += 1 # 格式化为4位数字,左侧补零 suffix = str(suffix_num).zfill(cls.SKU_SUFFIX_LEN) return f'{prefix}{suffix}' @classmethod def search_base_material(cls, keyword): """搜索基础物料,供前端远程选择""" try: # 只查询已启用的物料 query = MaterialBase.query.filter(MaterialBase.is_enabled == True) if keyword: query = query.filter( db.or_( MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%'), ) ) query = query.order_by(MaterialBase.id.desc()).limit(20) results = [] for item in query.all(): results.append({ 'id': item.id, 'name': item.name, 'spec': item.spec_model, 'category': item.category, 'unit': item.unit, 'type': item.material_type, }) return results except Exception: traceback.print_exc() return [] @classmethod def create_service(cls, data): """创建服务权益记录""" # 1. 检查基础物料 base_id = data.get('base_id') base = MaterialBase.query.get(base_id) if not base: raise ValueError('基础物料不存在') if not base.is_enabled: raise ValueError(f"物料【{base.name}】已停用,无法创建新的服务权益。") # 2. 生成SKU sku = cls._generate_sku() # 3. 创建对象 (不包含库存数量字段) service = StockService( base_id=data['base_id'], sku=sku, sale_price=data.get('sale_price', 0), provider_name=data.get('provider_name', ''), description=data.get('description', ''), # 可选字段映射 service_category=data.get('service_category', ''), contract_id=data.get('contract_id', ''), contact_person=data.get('contact_person', ''), valid_period=data.get('valid_period', ''), cost_price=data.get('cost_price', 0) ) db.session.add(service) db.session.commit() return service @classmethod def get_service(cls, service_id): """获取单个详情""" service = StockService.query.filter_by(id=service_id, is_deleted=False).first() if not service: raise ValueError('服务权益记录不存在') return service @classmethod def update_service(cls, service_id, data): """更新服务权益""" service = cls.get_service(service_id) # 允许更新的字段 if 'sale_price' in data: service.sale_price = data['sale_price'] if 'provider_name' in data: service.provider_name = data['provider_name'] if 'description' in data: service.description = data.get('description', '') if 'cost_price' in data: service.cost_price = data.get('cost_price', 0) if 'contract_id' in data: service.contract_id = data.get('contract_id', '') if 'contact_person' in data: service.contact_person = data.get('contact_person', '') if 'valid_period' in data: service.valid_period = data.get('valid_period', '') service.updated_at = datetime.now() db.session.commit() return service @classmethod def delete_service(cls, service_id): """软删除""" service = cls.get_service(service_id) # 提前获取服务名称用于审计日志 service_name = service.service_name service.is_deleted = True service.updated_at = datetime.now() db.session.commit() return service_name @classmethod def get_service_list(cls, page=1, per_page=20, keyword=None, start_date=None, end_date=None, provider_name=None): """分页查询列表""" try: query = StockService.query.filter_by(is_deleted=False) # 关键词联表搜索 if keyword: query = query.join(StockService.base).filter( db.or_( StockService.sku.ilike(f'%{keyword}%'), MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%') ) ) # 日期过滤 if start_date: try: start = datetime.strptime(start_date, '%Y-%m-%d') query = query.filter(StockService.created_at >= start) except ValueError: pass if end_date: try: end = datetime.strptime(end_date, '%Y-%m-%d') # 包含当天结束 end = end + timedelta(days=1) - timedelta(seconds=1) query = query.filter(StockService.created_at <= end) except ValueError: pass # 服务商过滤 if provider_name: query = query.filter(StockService.provider_name.ilike(f'%{provider_name}%')) total = query.count() items = query.order_by(StockService.created_at.desc()) \ .offset((page - 1) * per_page) \ .limit(per_page).all() return { 'items': [item.to_dict() for item in items], 'total': total, 'page': page, 'per_page': per_page } except Exception as e: traceback.print_exc() raise e @classmethod def get_history_providers(cls, base_id): """获取历史供应商""" try: query = db.session.query(StockService.provider_name).filter( StockService.base_id == base_id, StockService.provider_name.isnot(None), StockService.provider_name != '' ).distinct().order_by(StockService.provider_name) return [row[0] for row in query.all()] except Exception: return [] @classmethod def search_system_users(cls, keyword): """搜索系统用户(占位)""" return [] @classmethod def get_filter_options(cls): """获取筛选下拉选项""" try: categories = db.session.query(MaterialBase.category) \ .filter(MaterialBase.category != None, MaterialBase.category != '') \ .distinct().all() types = db.session.query(MaterialBase.material_type) \ .filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \ .distinct().all() return { "categories": [r[0] for r in categories], "types": [r[0] for r in types] } except Exception: return {"categories": [], "types": []}