Files
SCGL/backend/app/routers/work_order_kanban.py

145 lines
4.7 KiB
Python

"""工单看板接口
路由路径:
- GET /api/pms/work_order_kanban - 获取工单列表(分页)
- GET /api/pms/work_order_kanban/status-counts - 获取各状态工单数量
"""
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from sqlalchemy import or_
from typing import Optional
from app.database import get_db_pms
from app.models.production import PmsWorkOrder, PmsProject, WorkOrderStatus
from app.models.inventory import MaterialBase
router = APIRouter(prefix="/api/pms/work_order_kanban", tags=["工单看板"])
class WorkOrderKanbanResponse:
"""工单看板响应"""
def __init__(
self,
id: int,
work_order_no: str,
project_id: int,
target_base_id: int,
target_quantity: int,
assignee_name: str | None,
status: WorkOrderStatus,
created_at,
updated_at,
project_name: str | None,
material_name: str | None,
material_spec: str | None,
):
self.id = id
self.work_order_no = work_order_no
self.project_id = project_id
self.target_base_id = target_base_id
self.target_quantity = target_quantity
self.assignee_name = assignee_name
self.status = status
self.created_at = created_at
self.updated_at = updated_at
self.project_name = project_name
self.material_name = material_name
self.material_spec = material_spec
class PaginatedWorkOrders:
"""分页工单响应"""
def __init__(self, items: list, total: int, page: int, size: int, total_pages: int):
self.items = items
self.total = total
self.page = page
self.size = size
self.total_pages = total_pages
@router.get("")
async def get_work_orders_kanban(
status: Optional[WorkOrderStatus] = Query(None, description="工单状态筛选"),
project_id: Optional[int] = Query(None, description="项目ID筛选"),
assignee_name: Optional[str] = Query(None, description="负责人筛选"),
search: Optional[str] = Query(None, description="工单号/产品名称搜索"),
page: int = Query(1, ge=1, description="页码"),
size: int = Query(20, ge=1, le=100, description="每页数量"),
db: Session = Depends(get_db_pms)
):
"""工单看板查询接口(分页)"""
query = db.query(PmsWorkOrder)
if status:
query = query.filter(PmsWorkOrder.status == status)
if project_id:
query = query.filter(PmsWorkOrder.project_id == project_id)
if assignee_name:
query = query.filter(PmsWorkOrder.assignee_name.ilike(f"%{assignee_name}%"))
if search:
search_pattern = f"%{search}%"
query = query.outerjoin(
PmsProject,
PmsWorkOrder.project_id == PmsProject.id
).outerjoin(
MaterialBase,
PmsWorkOrder.target_base_id == MaterialBase.id
).filter(
or_(
PmsWorkOrder.work_order_no.ilike(search_pattern),
MaterialBase.name.ilike(search_pattern),
)
)
total = query.count()
offset = (page - 1) * size
work_orders = (
query.order_by(PmsWorkOrder.created_at.desc())
.offset(offset)
.limit(size)
.all()
)
items = []
for wo in work_orders:
project = db.query(PmsProject).filter(PmsProject.id == wo.project_id).first()
material = db.query(MaterialBase).filter(MaterialBase.id == wo.target_base_id).first()
item = {
"id": wo.id,
"work_order_no": wo.work_order_no,
"project_id": wo.project_id,
"target_base_id": wo.target_base_id,
"target_quantity": wo.target_quantity,
"assignee_name": wo.assignee_name,
"status": wo.status.value,
"created_at": wo.created_at.isoformat() if wo.created_at else None,
"updated_at": wo.updated_at.isoformat() if wo.updated_at else None,
"project_name": project.name if project else None,
"material_name": material.name if material else None,
"material_spec": material.spec_model if material else None,
}
items.append(item)
total_pages = (total + size - 1) // size if total > 0 else 1
return {
"items": items,
"total": total,
"page": page,
"size": size,
"total_pages": total_pages,
}
@router.get("/status-counts")
async def get_status_counts(db: Session = Depends(get_db_pms)):
"""获取各状态工单数量统计"""
result = {}
for status in WorkOrderStatus:
count = db.query(PmsWorkOrder).filter(PmsWorkOrder.status == status).count()
result[status.value] = count
return result