Files
KCGL/inventory-backend/app/services/inbound/buy_service.py

313 lines
15 KiB
Python

from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.base import MaterialBase
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
import traceback
import json
class BuyInboundService:
# ============================================================
# 0. 辅助:唯一性校验
# ============================================================
@staticmethod
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
if serial_number:
query = StockBuy.query.filter(StockBuy.serial_number == serial_number)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.base.name if exists.base else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被物料 [{occupied_name}] 占用,请核查。")
if batch_number and base_id:
query = StockBuy.query.filter(
StockBuy.base_id == base_id,
StockBuy.batch_number == batch_number
)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
if query.first():
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复录入,可直接在该批次下追加库存。")
# ============================================================
# 1. 基础物料搜索
# ============================================================
@staticmethod
def search_base_material(keyword, page=1, limit=50):
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
k = keyword.strip()
k_str = f'%{k}%'
query = query.filter(and_(
or_(
MaterialBase.name.ilike(k_str),
MaterialBase.spec_model.ilike(k_str)
)
))
query = query.order_by(MaterialBase.id.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
items.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'brand': getattr(item, 'brand', ''),
'manufacturer': getattr(item, 'manufacturer', ''),
'pinyin': getattr(item, 'pinyin', ''),
'status': '启用'
})
return {
"items": items,
"total": pagination.total,
"page": page,
"has_next": pagination.has_next
}
except Exception as e:
traceback.print_exc()
return {"items": [], "total": 0, "page": 1, "has_next": False}
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("所选物料不存在")
if not material.is_enabled: raise ValueError(f"物料【{material.name}】已停用")
BuyInboundService._check_unique(
base_id=base_id,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number')
)
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day, current_time.hour,
current_time.minute, current_time.second)
except:
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except:
next_global_id = None
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
final_barcode = data.get('barcode') or generated_sku
new_stock = StockBuy(
base_id=material.id, global_print_id=next_global_id, sku=generated_sku, barcode=final_barcode,
in_date=in_date_val, serial_number=data.get('serial_number'), batch_number=data.get('batch_number'),
status=data.get('status', '在库'), in_quantity=in_qty, stock_quantity=in_qty, available_quantity=in_qty,
inspection_status=data.get('inspection_status', '未检'),
warehouse_location=data.get('warehouse_location'),
unit_price=u_price, total_price=in_qty * u_price, currency=data.get('currency', 'CNY'),
exchange_rate=data.get('exchange_rate', 1.0),
supplier_name=data.get('supplier_name'), buyer_name=data.get('purchaser'),
buyer_email=data.get('purchaser_email'),
original_link=data.get('source_link'), detail_link=data.get('detail_link'),
arrival_photo=json.dumps(data.get('arrival_photo', [])),
inspection_report=json.dumps(data.get('inspection_report', []))
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 3. 更新入库
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
BuyInboundService._check_unique(base_id=data.get('base_id', stock.base_id),
serial_number=data.get('serial_number', stock.serial_number),
batch_number=data.get('batch_number', stock.batch_number),
exclude_id=stock_id)
field_mapping = {'sku': 'sku', 'barcode': 'barcode', 'base_id': 'base_id',
'warehouse_location': 'warehouse_location', 'serial_number': 'serial_number',
'batch_number': 'batch_number', 'status': 'status',
'inspection_status': 'inspection_status', 'supplier_name': 'supplier_name',
'detail_link': 'detail_link', 'currency': 'currency', 'exchange_rate': 'exchange_rate',
'purchaser': 'buyer_name', 'purchaser_email': 'buyer_email',
'source_link': 'original_link'}
for k, v in field_mapping.items():
if k in data: setattr(stock, v, data[k])
if 'arrival_photo' in data: stock.arrival_photo = json.dumps(data['arrival_photo'])
if 'inspection_report' in data: stock.inspection_report = json.dumps(data['inspection_report'])
if 'in_quantity' in data:
diff = float(data['in_quantity']) - float(stock.in_quantity)
if diff != 0:
stock.in_quantity = float(data['in_quantity'])
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
if 'unit_price' in data: stock.unit_price = float(data['unit_price'])
stock.total_price = float(stock.in_quantity) * float(stock.unit_price)
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 4. 删除
# ============================================================
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 5. 获取列表
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None):
try:
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
# 1. 通用关键词搜索
if keyword:
k_str = f'%{keyword.strip()}%'
conditions = [
StockBuy.sku.ilike(k_str),
StockBuy.barcode.ilike(k_str),
StockBuy.batch_number.ilike(k_str),
StockBuy.serial_number.ilike(k_str),
StockBuy.supplier_name.ilike(k_str),
StockBuy.buyer_name.ilike(k_str),
MaterialBase.name.ilike(k_str), # 名称
MaterialBase.spec_model.ilike(k_str), # 规格
]
query = query.filter(or_(*conditions))
# 2. 类别独立搜索
if category and category.strip():
query = query.filter(MaterialBase.category == category.strip()) # 下拉框通常是精确匹配
# 3. 类型独立搜索
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip()) # 精确匹配
# 4. 状态筛选
if not statuses: statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockBuy.status.in_(statuses))
else:
query = query.filter(and_(StockBuy.status.in_(statuses), StockBuy.stock_quantity > 0))
pagination = query.order_by(StockBuy.in_date.desc()).paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
items.append({
'id': item.id, 'base_id': item.base_id, 'material_name': item.base.name if item.base else '',
'spec_model': item.base.spec_model if item.base else '',
'category': item.base.category if item.base else '', 'unit': item.base.unit if item.base else '',
'material_type': item.base.material_type if item.base else '', 'sku': item.sku,
'inbound_date': str(item.in_date)[:10] if item.in_date else '', 'barcode': item.barcode,
'serial_number': item.serial_number, 'batch_number': item.batch_number, 'status': item.status,
'inspection_status': item.inspection_status, 'qty_inbound': float(item.in_quantity or 0),
'qty_stock': float(item.stock_quantity or 0), 'qty_available': float(item.available_quantity or 0),
'warehouse_loc': item.warehouse_location, 'unit_price': float(item.unit_price or 0),
'total_price': float(item.total_price or 0), 'currency': item.currency,
'exchange_rate': float(item.exchange_rate or 1), 'supplier_name': item.supplier_name,
'purchaser': item.buyer_name, 'purchaser_email': item.buyer_email,
'source_link': item.original_link, 'detail_link': item.detail_link,
'arrival_photo': json.loads(item.arrival_photo) if item.arrival_photo else [],
'inspection_report': json.loads(item.inspection_report) if item.inspection_report else [],
'global_print_id': item.global_print_id
})
return {"total": pagination.total, "items": items}
except Exception:
traceback.print_exc()
return {"total": 0, "items": []}
# ============================================================
# 6. [新增] 获取筛选选项(类别、类型)
# ============================================================
@staticmethod
def get_filter_options():
try:
# 获取所有非空的类别
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
# 获取所有非空的类型
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
return {
"categories": [r[0] for r in categories],
"types": [r[0] for r in types]
}
except Exception:
traceback.print_exc()
return {"categories": [], "types": []}
# 7-10 建议类接口保持不变
@staticmethod
def get_history_suppliers(base_id):
return [r[0] for r in db.session.query(StockBuy.supplier_name).filter(StockBuy.base_id == base_id,
StockBuy.supplier_name != '').distinct().all()]
@staticmethod
def get_history_purchasers(keyword):
return [{'value': r.buyer_name, 'email': r.buyer_email} for r in
db.session.query(StockBuy.buyer_name, StockBuy.buyer_email).filter(
StockBuy.buyer_name != '').distinct().all()]
@staticmethod
def get_history_links(base_id, type):
return [r[0] for r in
db.session.query(StockBuy.original_link if type == 'original' else StockBuy.detail_link).filter(
StockBuy.base_id == base_id).distinct().all()]
@staticmethod
def get_history_locations(base_id):
return [r[0] for r in
db.session.query(StockBuy.warehouse_location).filter(StockBuy.base_id == base_id).distinct().all()]