Files
KCGL/inventory-backend/app/services/inventory_task.py

200 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
库存预警扫描与邮件通知服务
定时(或手动触发)扫描所有 is_enabled=True 且 is_ordered=False 的预警配置,
按物料配置的邮箱独立发送,不依赖 SysUser 角色。
- 库存 <= red_threshold → 红色预警邮件(发 setting.red_emails
- red_threshold < 库存 <= yellow_threshold → 黄色预警邮件(发 setting.yellow_emails
- 同一收件人在多条记录中出现 → 聚合为一封邮件
- 发送成功后更新 last_notified_at
"""
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from sqlalchemy import func
from app.extensions import db
from app.models.base import MaterialBase, MaterialWarningSetting
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
class InventoryWarningService:
@staticmethod
def _get_total_inventory(base_id: int) -> float:
"""
计算指定物料在所有库存表(采购件 + 半成品 + 成品)中的总库存量
"""
buy_q = db.session.query(func.sum(StockBuy.stock_quantity)).filter(
StockBuy.base_id == base_id
).scalar() or 0
semi_q = db.session.query(func.sum(StockSemi.stock_quantity)).filter(
StockSemi.base_id == base_id
).scalar() or 0
prod_q = db.session.query(func.sum(StockProduct.stock_quantity)).filter(
StockProduct.base_id == base_id
).scalar() or 0
return float(buy_q) + float(semi_q) + float(prod_q)
@staticmethod
def _parse_emails(email_str: str) -> list:
"""从逗号分隔字符串中提取并清洗有效邮箱列表"""
if not email_str or not email_str.strip():
return []
return [e.strip() for e in email_str.split(',') if e.strip() and '@' in e.strip()]
@staticmethod
def _build_text_table(rows: list, level: str) -> str:
"""
构建纯文本物料清单表格
Args:
rows: [{"name": ..., "spec": ..., "qty": ..., "threshold": ...}, ...]
level: "red""yellow",决定阈值列标题
"""
threshold_label = "红色阈值" if level == "red" else "黄色阈值"
lines = [
"名称 | 规格 | 当前库存 | " + threshold_label,
"-" * 60,
]
for r in rows:
name = r.get('name', '-') or '-'
spec = r.get('spec', '-') or '-'
qty = r.get('qty', '-')
th = r.get('threshold', '-')
lines.append(f"{name} | {spec} | {qty} | {th}")
return '\n'.join(lines)
@staticmethod
def check_and_send_warning_emails() -> dict:
"""
执行库存预警扫描与邮件发送
1. 查询所有 is_enabled=True 且 is_ordered=False 的预警配置
2. 按 level 归类物料,按邮箱聚合(同一邮箱 → 一封邮件)
3. 调用 send_email 发送,更新 last_notified_at
Returns:
{
"red_count": N, # 触发红色预警的物料数
"yellow_count": N, # 触发黄色预警的物料数
"red_sent": True/False,
"yellow_sent": True/False,
"timestamp": "..."
}
"""
from app.utils.email_service import send_email
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
# 查询启用了预警且未标记采购的配置
settings = MaterialWarningSetting.query.filter(
MaterialWarningSetting.is_enabled == True,
MaterialWarningSetting.is_ordered == False
).all()
red_rows_by_email = defaultdict(list) # email -> [物料row, ...]
yellow_rows_by_email = defaultdict(list)
total_red = 0
total_yellow = 0
sent_red = False
sent_yellow = False
processed_settings = []
for setting in settings:
base_id = setting.base_id
material = MaterialBase.query.get(base_id)
if not material:
continue
name = material.name
spec = material.spec_model or ''
red_th = float(setting.red_threshold) if setting.red_threshold is not None else None
yellow_th = float(setting.yellow_threshold) if setting.yellow_threshold is not None else None
inv = InventoryWarningService._get_total_inventory(base_id)
# ★ 红色预警:库存 <= red_threshold走 setting.red_emails ★
if red_th is not None and inv <= red_th:
total_red += 1
red_emails = InventoryWarningService._parse_emails(setting.red_emails)
if red_emails:
processed_settings.append(setting)
row = {
'name': name,
'spec': spec,
'qty': round(inv, 2),
'threshold': round(red_th, 2),
}
for email in red_emails:
red_rows_by_email[email].append(row)
else:
print(f"[InventoryWarning] 物料「{name}」红单跳过:无 red_emails 配置")
# ★ 黄色预警red_threshold < 库存 <= yellow_threshold走 setting.yellow_emails ★
elif (
(red_th is not None and yellow_th is not None and red_th < inv <= yellow_th)
or (red_th is None and yellow_th is not None and inv <= yellow_th)
):
total_yellow += 1
yellow_emails = InventoryWarningService._parse_emails(setting.yellow_emails)
if yellow_emails:
processed_settings.append(setting)
row = {
'name': name,
'spec': spec,
'qty': round(inv, 2),
'threshold': round(yellow_th, 2),
}
for email in yellow_emails:
yellow_rows_by_email[email].append(row)
else:
print(f"[InventoryWarning] 物料「{name}」黄单跳过:无 yellow_emails 配置")
else:
continue
# ★ 按邮箱聚合,批量发送红色预警邮件 ★
for email, rows in red_rows_by_email.items():
table = InventoryWarningService._build_text_table(rows, 'red')
subject = f"【红色预警】库存告急(共 {len(rows)} 条)"
content = (
f"您好,\n\n"
f"以下物料当前库存已达到红色预警阈值,请立即处理采购:\n\n"
f"{table}\n\n"
"详情请登录仓库管理系统查看。\n\n"
"此邮件由系统自动发送,请勿回复。"
)
send_email(email, subject, content)
sent_red = True
# ★ 按邮箱聚合,批量发送黄色预警邮件 ★
for email, rows in yellow_rows_by_email.items():
table = InventoryWarningService._build_text_table(rows, 'yellow')
subject = f"【黄色预警】库存偏低(共 {len(rows)} 条)"
content = (
f"您好,\n\n"
f"以下物料当前库存已达到黄色预警阈值,请关注采购进度:\n\n"
f"{table}\n\n"
"详情请登录仓库管理系统查看。\n\n"
"此邮件由系统自动发送,请勿回复。"
)
send_email(email, subject, content)
sent_yellow = True
# ★ 批量更新 last_notified_at ★
if processed_settings:
for s in processed_settings:
s.last_notified_at = now
db.session.commit()
return {
'red_count': total_red,
'yellow_count': total_yellow,
'red_sent': sent_red,
'yellow_sent': sent_yellow,
'timestamp': now.strftime('%Y-%m-%d %H:%M:%S')
}