feat(purchase): 新增采购申请模块后端(模型+Service+API路由)

This commit is contained in:
dxc
2026-05-12 16:33:18 +08:00
parent f2f9409206
commit 9dfcb93146
4 changed files with 530 additions and 0 deletions

View File

@ -0,0 +1,231 @@
import json
from datetime import datetime, timezone, timedelta, date
from sqlalchemy import func
from app.extensions import db
from app.models.purchase import PurchaseRequest
from app.models.base import MaterialBase
class PurchaseService:
@staticmethod
def generate_request_no():
"""生成采购单号: PUR-yyyyMMdd-HHmm-当日流水(4位)"""
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
date_str = now.strftime('%Y%m%d')
time_str = now.strftime('%H%M')
prefix = f"PUR-{date_str}-{time_str}-"
existing_count = db.session.query(func.count(func.distinct(PurchaseRequest.request_no))) \
.filter(PurchaseRequest.request_no.like(f"{prefix}%")).scalar()
return f"{prefix}{(existing_count + 1):04d}"
@staticmethod
def auto_fill_from_material(keyword: str):
"""
根据 name 或 spec_model 自动补全另一个字段
keyword: 用户输入的名称或规格
返回: {'name': ..., 'spec_model': ...} 或 None
"""
if not keyword:
return None
material = MaterialBase.query.filter(
(MaterialBase.name.ilike(f'%{keyword}%')) |
(MaterialBase.spec_model.ilike(f'%{keyword}%'))
).first()
if material:
return {
'name': material.name,
'spec_model': material.spec_model or ''
}
return None
@staticmethod
def create_purchase_request(data: dict, requester_id: int):
"""
创建采购申请
data 包含: name, spec_model, quantity, purchase_date, supplier_link, remark, images,
unit_price, total_price, approver_id
"""
request_no = PurchaseService.generate_request_no()
purchase_date = data.get('purchase_date')
if isinstance(purchase_date, str):
purchase_date = datetime.strptime(purchase_date, '%Y-%m-%d').date()
elif isinstance(purchase_date, datetime):
purchase_date = purchase_date.date()
purchase = PurchaseRequest(
request_no=request_no,
name=data['name'],
spec_model=data.get('spec_model', ''),
quantity=float(data['quantity']),
purchase_date=purchase_date,
supplier_link=data.get('supplier_link', ''),
remark=data.get('remark', ''),
images=json.dumps(data.get('images', []), ensure_ascii=False) if data.get('images') else '[]',
unit_price=float(data.get('unit_price', 0) or 0),
total_price=float(data.get('total_price', 0) or 0),
requester_id=requester_id,
approver_id=data.get('approver_id'),
status=0
)
db.session.add(purchase)
db.session.commit()
# 发送邮件给审批人
PurchaseService._notify_new_request(purchase)
return purchase
@staticmethod
def approve_purchase_request(purchase_id: int, user_id: int, action: str, reject_reason: str = None):
"""
审批采购申请
action: 'approve''reject'
"""
purchase = db.session.get(PurchaseRequest, purchase_id)
if not purchase:
raise ValueError("采购申请不存在")
if purchase.status != 0:
raise ValueError("当前状态不允许审批")
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
if action == 'approve':
purchase.status = 1
purchase.approver_id = user_id
purchase.approved_at = now
db.session.commit()
PurchaseService._notify_approved(purchase)
elif action == 'reject':
purchase.status = 2
purchase.approver_id = user_id
purchase.approved_at = now
purchase.reject_reason = reject_reason or ''
db.session.commit()
PurchaseService._notify_rejected(purchase)
else:
raise ValueError("无效的审批操作")
return purchase
@staticmethod
def get_purchase_list(page=1, per_page=20, requester_id=None, status=None):
"""获取采购申请列表,普通用户只看自己的,主管/超管看全部"""
query = PurchaseRequest.query
if requester_id is not None:
query = query.filter(PurchaseRequest.requester_id == requester_id)
if status is not None:
query = query.filter(PurchaseRequest.status == status)
query = query.order_by(PurchaseRequest.created_at.desc())
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return {
'items': [p.to_dict() for p in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
}
@staticmethod
def get_purchase_by_id(purchase_id: int):
purchase = db.session.get(PurchaseRequest, purchase_id)
return purchase.to_dict() if purchase else None
@staticmethod
def _notify_new_request(purchase):
"""发送新申请邮件给审批人"""
try:
from app.utils.email_service import send_email
from app.models.system import SysUser
if not purchase.approver_id:
return
approver = db.session.get(SysUser, purchase.approver_id)
if not approver or not approver.email:
return
subject = f"【待审批】采购申请单 {purchase.request_no}"
content = f"""您好,
您有一笔新的采购申请待审批:
申请单号:{purchase.request_no}
采购物品:{purchase.name}
规格型号:{purchase.spec_model or '-'}
采购数量:{float(purchase.quantity)}
申请时间:{purchase.created_at.strftime('%Y-%m-%d %H:%M') if purchase.created_at else '-'}
备注说明:{purchase.remark or ''}
请登录仓库管理系统进行审批。
此邮件由系统自动发送,请勿回复。
"""
send_email(approver.email, subject, content)
except Exception as e:
try:
from flask import current_app
current_app.logger.error(f"[Email] 采购申请通知审批人失败: {e}")
except Exception:
print(f"[Email] 采购申请通知审批人失败: {e}")
@staticmethod
def _notify_approved(purchase):
"""审批通过后通知申请人"""
try:
from app.utils.email_service import send_email
from app.models.system import SysUser
requester = db.session.get(SysUser, purchase.requester_id)
if not requester or not requester.email:
return
subject = f"【已通过】采购申请单 {purchase.request_no}"
content = f"""{"尊敬的 " + requester.username + ",您好" if requester.username else "您好"}
您的采购申请单 {purchase.request_no}{purchase.name})已审批通过,现已交给库管。
待库管完成入库后,您可在系统中查询采购记录。
此邮件由系统自动发送,请勿回复。
"""
send_email(requester.email, subject, content)
except Exception as e:
try:
from flask import current_app
current_app.logger.error(f"[Email] 采购申请通过通知申请人失败: {e}")
except Exception:
print(f"[Email] 采购申请通过通知申请人失败: {e}")
@staticmethod
def _notify_rejected(purchase):
"""审批驳回后通知申请人"""
try:
from app.utils.email_service import send_email
from app.models.system import SysUser
requester = db.session.get(SysUser, purchase.requester_id)
if not requester or not requester.email:
return
subject = f"【已驳回】采购申请单 {purchase.request_no}"
content = f"""{"尊敬的 " + requester.username + ",您好" if requester.username else "您好"}
您的采购申请单 {purchase.request_no}{purchase.name})已被驳回。
驳回原因:{purchase.reject_reason or '未说明'}
请登录仓库管理系统查看详情。
此邮件由系统自动发送,请勿回复。
"""
send_email(requester.email, subject, content)
except Exception as e:
print(f"[Email] 采购申请驳回通知失败: {e}")