Files
KCGL/inventory-backend/app/services/purchase_service.py

231 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from datetime import datetime, timezone, timedelta, date
from sqlalchemy import func
from app.extensions import db
from app.models.purchase import PurchaseRequest
from app.models.base import MaterialBase
class PurchaseService:
@staticmethod
def generate_request_no():
"""生成采购单号: PUR-yyyyMMdd-HHmm-当日流水(4位)"""
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
date_str = now.strftime('%Y%m%d')
time_str = now.strftime('%H%M')
prefix = f"PUR-{date_str}-{time_str}-"
existing_count = db.session.query(func.count(func.distinct(PurchaseRequest.request_no))) \
.filter(PurchaseRequest.request_no.like(f"{prefix}%")).scalar()
return f"{prefix}{(existing_count + 1):04d}"
@staticmethod
def auto_fill_from_material(keyword: str):
"""
根据 name 或 spec_model 自动补全另一个字段
keyword: 用户输入的名称或规格
返回: {'name': ..., 'spec_model': ...} 或 None
"""
if not keyword:
return None
material = MaterialBase.query.filter(
(MaterialBase.name.ilike(f'%{keyword}%')) |
(MaterialBase.spec_model.ilike(f'%{keyword}%'))
).first()
if material:
return {
'name': material.name,
'spec_model': material.spec_model or ''
}
return None
@staticmethod
def create_purchase_request(data: dict, requester_id: int):
"""
创建采购申请
data 包含: name, spec_model, quantity, purchase_date, supplier_link, remark, images,
unit_price, total_price, approver_id
"""
request_no = PurchaseService.generate_request_no()
purchase_date = data.get('purchase_date')
if isinstance(purchase_date, str):
purchase_date = datetime.strptime(purchase_date, '%Y-%m-%d').date()
elif isinstance(purchase_date, datetime):
purchase_date = purchase_date.date()
purchase = PurchaseRequest(
request_no=request_no,
name=data['name'],
spec_model=data.get('spec_model', ''),
quantity=float(data['quantity']),
purchase_date=purchase_date,
supplier_link=data.get('supplier_link', ''),
remark=data.get('remark', ''),
images=json.dumps(data.get('images', []), ensure_ascii=False) if data.get('images') else '[]',
unit_price=float(data.get('unit_price', 0) or 0),
total_price=float(data.get('total_price', 0) or 0),
requester_id=requester_id,
approver_id=data.get('approver_id'),
status=0
)
db.session.add(purchase)
db.session.commit()
# 发送邮件给审批人
PurchaseService._notify_new_request(purchase)
return purchase
@staticmethod
def approve_purchase_request(purchase_id: int, user_id: int, action: str, reject_reason: str = None):
"""
审批采购申请
action: 'approve''reject'
"""
purchase = db.session.get(PurchaseRequest, purchase_id)
if not purchase:
raise ValueError("采购申请不存在")
if purchase.status != 0:
raise ValueError("当前状态不允许审批")
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
if action == 'approve':
purchase.status = 1
purchase.approver_id = user_id
purchase.approved_at = now
db.session.commit()
PurchaseService._notify_approved(purchase)
elif action == 'reject':
purchase.status = 2
purchase.approver_id = user_id
purchase.approved_at = now
purchase.reject_reason = reject_reason or ''
db.session.commit()
PurchaseService._notify_rejected(purchase)
else:
raise ValueError("无效的审批操作")
return purchase
@staticmethod
def get_purchase_list(page=1, per_page=20, requester_id=None, status=None):
"""获取采购申请列表,普通用户只看自己的,主管/超管看全部"""
query = PurchaseRequest.query
if requester_id is not None:
query = query.filter(PurchaseRequest.requester_id == requester_id)
if status is not None:
query = query.filter(PurchaseRequest.status == status)
query = query.order_by(PurchaseRequest.created_at.desc())
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return {
'items': [p.to_dict() for p in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
}
@staticmethod
def get_purchase_by_id(purchase_id: int):
purchase = db.session.get(PurchaseRequest, purchase_id)
return purchase.to_dict() if purchase else None
@staticmethod
def _notify_new_request(purchase):
"""发送新申请邮件给审批人"""
try:
from app.utils.email_service import send_email
from app.models.system import SysUser
if not purchase.approver_id:
return
approver = db.session.get(SysUser, purchase.approver_id)
if not approver or not approver.email:
return
subject = f"【待审批】采购申请单 {purchase.request_no}"
content = f"""您好,
您有一笔新的采购申请待审批:
申请单号:{purchase.request_no}
采购物品:{purchase.name}
规格型号:{purchase.spec_model or '-'}
采购数量:{float(purchase.quantity)}
申请时间:{purchase.created_at.strftime('%Y-%m-%d %H:%M') if purchase.created_at else '-'}
备注说明:{purchase.remark or ''}
请登录仓库管理系统进行审批。
此邮件由系统自动发送,请勿回复。
"""
send_email(approver.email, subject, content)
except Exception as e:
try:
from flask import current_app
current_app.logger.error(f"[Email] 采购申请通知审批人失败: {e}")
except Exception:
print(f"[Email] 采购申请通知审批人失败: {e}")
@staticmethod
def _notify_approved(purchase):
"""审批通过后通知申请人"""
try:
from app.utils.email_service import send_email
from app.models.system import SysUser
requester = db.session.get(SysUser, purchase.requester_id)
if not requester or not requester.email:
return
subject = f"【已通过】采购申请单 {purchase.request_no}"
content = f"""{"尊敬的 " + requester.username + ",您好" if requester.username else "您好"}
您的采购申请单 {purchase.request_no}{purchase.name})已审批通过,现已交给库管。
待库管完成入库后,您可在系统中查询采购记录。
此邮件由系统自动发送,请勿回复。
"""
send_email(requester.email, subject, content)
except Exception as e:
try:
from flask import current_app
current_app.logger.error(f"[Email] 采购申请通过通知申请人失败: {e}")
except Exception:
print(f"[Email] 采购申请通过通知申请人失败: {e}")
@staticmethod
def _notify_rejected(purchase):
"""审批驳回后通知申请人"""
try:
from app.utils.email_service import send_email
from app.models.system import SysUser
requester = db.session.get(SysUser, purchase.requester_id)
if not requester or not requester.email:
return
subject = f"【已驳回】采购申请单 {purchase.request_no}"
content = f"""{"尊敬的 " + requester.username + ",您好" if requester.username else "您好"}
您的采购申请单 {purchase.request_no}{purchase.name})已被驳回。
驳回原因:{purchase.reject_reason or '未说明'}
请登录仓库管理系统查看详情。
此邮件由系统自动发送,请勿回复。
"""
send_email(requester.email, subject, content)
except Exception as e:
print(f"[Email] 采购申请驳回通知失败: {e}")