Files
KCGL/inventory-backend/app/services/inbound/buy_service.py

450 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.base import MaterialBase
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
import traceback
import json
class BuyInboundService:
# ============================================================
# 0. 辅助:唯一性校验
# ============================================================
@staticmethod
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
"""
校验序列号和批号的唯一性逻辑
"""
# 1. 序列号 (SN) 全局唯一校验
if serial_number:
query = StockBuy.query.filter(StockBuy.serial_number == serial_number)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.base.name if exists.base else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被物料 [{occupied_name}] 占用,请核查。")
# 2. 批号 (BN) 同物料唯一校验
if batch_number and base_id:
query = StockBuy.query.filter(
StockBuy.base_id == base_id,
StockBuy.batch_number == batch_number
)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
if query.first():
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复录入,可直接在该批次下追加库存。")
# ============================================================
# 1. 基础物料搜索 (参照 Base 模块:全量模糊匹配)
# ============================================================
@staticmethod
def search_base_material(keyword):
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
# [参照 Base 逻辑] 简单直接的模糊匹配LIKE %keyword%
k_str = f'%{keyword.strip()}%'
conditions = [
MaterialBase.name.ilike(k_str), # 名称
MaterialBase.spec_model.ilike(k_str), # 规格
MaterialBase.pinyin.ilike(k_str), # 拼音
MaterialBase.category.ilike(k_str), # 类别
MaterialBase.material_type.ilike(k_str) # 类型
]
# 安全地添加可能存在的扩展字段 (品牌/厂家)
if hasattr(MaterialBase, 'brand'):
conditions.append(MaterialBase.brand.ilike(k_str))
if hasattr(MaterialBase, 'manufacturer'):
conditions.append(MaterialBase.manufacturer.ilike(k_str))
query = query.filter(or_(*conditions))
# [参照 Base 逻辑] 移除 limit返回所有结果
query = query.order_by(MaterialBase.id.desc())
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
# 使用 getattr 防止字段不存在报错
'brand': getattr(item, 'brand', ''),
'manufacturer': getattr(item, 'manufacturer', ''),
'pinyin': getattr(item, 'pinyin', ''),
'status': '启用'
})
return results
except Exception as e:
traceback.print_exc()
return []
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
try:
base_id = data.get('base_id')
if not base_id:
raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material:
raise ValueError("所选物料不存在")
if not material.is_enabled:
raise ValueError(f"物料【{material.name}】已停用,无法办理新入库。")
BuyInboundService._check_unique(
base_id=base_id,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number')
)
# 时间处理
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
current_time.hour, current_time.minute, current_time.second)
except:
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
# 获取全局打印ID
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except:
next_global_id = None
# SKU 生成
if next_global_id:
generated_sku = str(next_global_id).zfill(10)
else:
generated_sku = datetime.now().strftime('%Y%m%d%H%M%S')
final_barcode = data.get('barcode') or generated_sku
arrival_list = data.get('arrival_photo', [])
report_list = data.get('inspection_report', [])
new_stock = StockBuy(
base_id=material.id,
global_print_id=next_global_id,
sku=generated_sku,
barcode=final_barcode,
in_date=in_date_val,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
status=data.get('status', '在库'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
inspection_status=data.get('inspection_status', '未检'),
warehouse_location=data.get('warehouse_location'),
unit_price=u_price,
total_price=in_qty * u_price,
currency=data.get('currency', 'CNY'),
exchange_rate=data.get('exchange_rate', 1.0),
supplier_name=data.get('supplier_name'),
buyer_name=data.get('purchaser'),
buyer_email=data.get('purchaser_email'),
original_link=data.get('source_link'),
detail_link=data.get('detail_link'),
arrival_photo=json.dumps(arrival_list),
inspection_report=json.dumps(report_list)
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 3. 更新入库逻辑
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
try:
stock = StockBuy.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
new_base_id = data.get('base_id', stock.base_id)
new_sn = data.get('serial_number', stock.serial_number)
new_bn = data.get('batch_number', stock.batch_number)
BuyInboundService._check_unique(
base_id=new_base_id,
serial_number=new_sn,
batch_number=new_bn,
exclude_id=stock_id
)
# 更新字段
field_mapping = {
'sku': 'sku', 'barcode': 'barcode', 'base_id': 'base_id',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number', 'batch_number': 'batch_number',
'status': 'status', 'inspection_status': 'inspection_status',
'supplier_name': 'supplier_name', 'detail_link': 'detail_link',
'currency': 'currency', 'exchange_rate': 'exchange_rate',
'purchaser': 'buyer_name', 'purchaser_email': 'buyer_email',
'source_link': 'original_link'
}
for k, v in field_mapping.items():
if k in data: setattr(stock, v, data[k])
if 'arrival_photo' in data and isinstance(data['arrival_photo'], list):
stock.arrival_photo = json.dumps(data['arrival_photo'])
if 'inspection_report' in data and isinstance(data['inspection_report'], list):
stock.inspection_report = json.dumps(data['inspection_report'])
# 库存数量变更逻辑
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
diff = new_qty - float(stock.in_quantity)
if diff != 0:
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
if 'unit_price' in data:
stock.unit_price = float(data['unit_price'])
stock.total_price = float(stock.in_quantity) * float(stock.unit_price)
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 4. 删除逻辑
# ============================================================
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 5. 获取列表 (参照 Base 逻辑:全量模糊匹配)
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, statuses=None):
try:
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
if keyword:
# [简单匹配] 只要任一字段包含该字符串,即匹配
k_str = f'%{keyword.strip()}%'
conditions = [
# 库存业务字段
StockBuy.sku.ilike(k_str),
StockBuy.barcode.ilike(k_str),
StockBuy.batch_number.ilike(k_str),
StockBuy.serial_number.ilike(k_str),
StockBuy.supplier_name.ilike(k_str),
StockBuy.buyer_name.ilike(k_str),
# 基础物料关联字段
MaterialBase.name.ilike(k_str),
MaterialBase.spec_model.ilike(k_str),
MaterialBase.pinyin.ilike(k_str),
MaterialBase.category.ilike(k_str)
]
if hasattr(MaterialBase, 'brand'):
conditions.append(MaterialBase.brand.ilike(k_str))
if hasattr(MaterialBase, 'manufacturer'):
conditions.append(MaterialBase.manufacturer.ilike(k_str))
query = query.filter(or_(*conditions))
if not statuses:
statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockBuy.status.in_(statuses))
else:
query = query.filter(and_(StockBuy.status.in_(statuses), StockBuy.stock_quantity > 0))
pagination = query.order_by(StockBuy.in_date.desc()).paginate(page=page, per_page=limit, error_out=False)
current_items = pagination.items
def parse_img(json_str):
if not json_str: return []
try:
return json.loads(json_str) if json_str.startswith('[') else [json_str]
except:
return []
items = []
for item in current_items:
qty_stock = float(item.stock_quantity or 0)
qty_avail = float(item.available_quantity or 0)
date_display = ''
if item.in_date:
try:
date_display = item.in_date.strftime('%Y-%m-%d')
except:
date_display = str(item.in_date)[:10]
d = {
'id': item.id,
'base_id': item.base_id,
'material_name': item.base.name if item.base else '',
'spec_model': item.base.spec_model if item.base else '',
'category': item.base.category if item.base else '',
'unit': item.base.unit if item.base else '',
'material_type': item.base.material_type if item.base else '',
'brand': getattr(item.base, 'brand', '') if item.base else '',
'manufacturer': getattr(item.base, 'manufacturer', '') if item.base else '',
'pinyin': getattr(item.base, 'pinyin', '') if item.base else '',
'sku': item.sku,
'inbound_date': date_display,
'barcode': item.barcode,
'serial_number': item.serial_number,
'batch_number': item.batch_number,
'status': item.status,
'inspection_status': item.inspection_status,
'qty_inbound': float(item.in_quantity or 0),
'qty_stock': qty_stock,
'qty_available': qty_avail,
'warehouse_loc': item.warehouse_location,
'unit_price': float(item.unit_price or 0),
'total_price': float(item.total_price or 0),
'currency': item.currency,
'exchange_rate': float(item.exchange_rate or 1),
'supplier_name': item.supplier_name,
'purchaser': item.buyer_name,
'purchaser_email': item.buyer_email,
'source_link': item.original_link,
'detail_link': item.detail_link,
'arrival_photo': parse_img(item.arrival_photo),
'inspection_report': parse_img(item.inspection_report),
'global_print_id': item.global_print_id
}
items.append(d)
return {"total": pagination.total, "items": items}
except Exception as e:
traceback.print_exc()
return {"total": 0, "items": []}
# ============================================================
# 6. 供应商历史查询 (移除限制)
# ============================================================
@staticmethod
def get_history_suppliers(base_id):
try:
query = db.session.query(StockBuy.supplier_name).filter(
StockBuy.base_id == base_id,
StockBuy.supplier_name.isnot(None),
StockBuy.supplier_name != ''
).distinct().order_by(StockBuy.supplier_name)
# [修改] 移除 limit
suppliers = [row[0] for row in query.all()]
return suppliers
except Exception:
return []
# ============================================================
# 7. 采购人建议 (移除限制)
# ============================================================
@staticmethod
def get_history_purchasers(keyword):
try:
query = db.session.query(StockBuy.buyer_name, StockBuy.buyer_email) \
.filter(StockBuy.buyer_name.isnot(None), StockBuy.buyer_name != '')
if keyword:
kw = f'%{keyword}%'
query = query.filter(or_(
StockBuy.buyer_name.ilike(kw),
StockBuy.buyer_email.ilike(kw)
))
# [修改] 移除 limit
results = query.distinct().all()
users = []
for row in results:
users.append({
'value': row.buyer_name,
'email': row.buyer_email or ''
})
return users
except Exception:
traceback.print_exc()
return []
# ============================================================
# 8. 链接建议 (移除限制)
# ============================================================
@staticmethod
def get_history_links(base_id, link_type='original'):
try:
target_col = StockBuy.original_link if link_type == 'original' else StockBuy.detail_link
query = db.session.query(target_col).filter(
StockBuy.base_id == base_id,
target_col.isnot(None),
target_col != ''
).distinct()
# [修改] 移除 limit
links = [row[0] for row in query.all()]
return links
except Exception:
return []
# ============================================================
# 9. 库位建议 (移除限制)
# ============================================================
@staticmethod
def get_history_locations(base_id):
try:
query = db.session.query(StockBuy.warehouse_location).filter(
StockBuy.base_id == base_id,
StockBuy.warehouse_location.isnot(None),
StockBuy.warehouse_location != ''
).distinct()
# [修改] 移除 limit
locs = [row[0] for row in query.all()]
return locs
except Exception:
return []