"""物料查询路由(只读,从库存库查询)""" from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from pydantic import BaseModel from app.database import SessionLocalInventory from app.models import MaterialBase, BomTable router = APIRouter(prefix="/api", tags=["物料查询"]) class MaterialSearchItem(BaseModel): id: int name: str spec_model: str | None bom_no: str | None version: str | None bom_key: str class Config: from_attributes = True class MaterialSearchResponse(BaseModel): items: list[MaterialSearchItem] total: int page: int size: int @router.get("/material", response_model=list[dict]) async def get_material_list(db: Session = Depends(lambda: SessionLocalInventory())): materials = db.query(MaterialBase).all() return [ { "id": m.id, "material_code": str(m.id), "material_name": m.name, "specification": m.spec_model or "", "unit": m.unit or "", "unit_price": 0 } for m in materials ] @router.get("/material/search", response_model=MaterialSearchResponse) async def search_material( q: str = Query("", description="搜索词"), page: int = Query(1, ge=1, description="页码"), size: int = Query(20, ge=1, le=100, description="每页条数"), ): """ 成品搜索接口 Step 1: distinct() 查出所有启用的不重复 (parent_id, bom_no, version) 组合 Step 2: 关联 material_base 获取成品名称和规格型号 Step 3: 按搜索词过滤,生成唯一 bom_key 返回 """ from sqlalchemy import cast, String db = SessionLocalInventory() try: # ── Step 1: 所有启用的不重复 (parent_id, bom_no, version) ── active_boms = ( db.query( BomTable.parent_id, BomTable.bom_no, BomTable.version ) .filter(BomTable.is_enabled == True) .distinct() .subquery() ) # ── Step 2: 关联 material_base 获取名称和规格 ── base = ( db.query( active_boms.c.parent_id.label("id"), MaterialBase.name, MaterialBase.spec_model, active_boms.c.bom_no, active_boms.c.version ) .join(MaterialBase, MaterialBase.id == active_boms.c.parent_id) ) if q: pat = f"%{q}%" rows = ( base .filter( (MaterialBase.name.ilike(pat)) | (MaterialBase.spec_model.ilike(pat)) | (active_boms.c.bom_no.ilike(pat)) | (cast(active_boms.c.parent_id, String).ilike(pat)) ) .limit(50) .all() ) return MaterialSearchResponse( items=[_make_item(r) for r in rows], total=len(rows), page=1, size=len(rows), ) total = base.count() offset = (page - 1) * size rows = base.order_by(active_boms.c.parent_id).offset(offset).limit(size).all() return MaterialSearchResponse( items=[_make_item(r) for r in rows], total=total, page=page, size=size, ) finally: db.close() def _make_item(row) -> MaterialSearchItem: mapping = dict(row._mapping) bom_key = f"{mapping['id']}_{mapping.get('bom_no') or ''}_{mapping.get('version') or ''}" return MaterialSearchItem( id=mapping["id"], name=mapping["name"], spec_model=mapping.get("spec_model"), bom_no=mapping.get("bom_no"), version=mapping.get("version"), bom_key=bom_key, )