Files
KCGL/inventory-backend/app/services/inbound/buy_service.py

555 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# inventory-backend/app/services/inbound/buy_service.py
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
from sqlalchemy.exc import IntegrityError
import traceback
import json
class BuyInboundService:
# ============================================================
# 0. 辅助:唯一性校验
# ============================================================
@staticmethod
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
"""
校验序列号/批号是否已存在
返回: None 表示校验通过str 类型的错误信息表示校验失败
"""
if serial_number:
query = StockBuy.query.filter(StockBuy.serial_number == serial_number)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.base.name if exists.base else "未知物料"
return f"序列号【{serial_number}】已存在!被物料 [{occupied_name}] 占用,请核查。"
if batch_number and base_id:
query = StockBuy.query.filter(
StockBuy.base_id == base_id,
StockBuy.batch_number == batch_number
)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
if query.first():
return f"该物料已存在批号【{batch_number}】,请勿重复录入,可直接在该批次下追加库存。"
return None
# ============================================================
# 1. 基础物料搜索
# ============================================================
@staticmethod
def search_base_material(keyword, page=1, limit=50):
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
k = keyword.strip()
k_str = f'%{k}%'
query = query.filter(and_(
or_(
MaterialBase.name.ilike(k_str),
MaterialBase.spec_model.ilike(k_str),
MaterialBase.company_name.ilike(k_str) # 支持搜公司
)
))
query = query.order_by(MaterialBase.id.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
# 查询最近一次入库的库位(采购入库 + 成品入库)
last_location = ''
# 查采购入库
last_buy = StockBuy.query.filter(
StockBuy.base_id == item.id
).order_by(StockBuy.in_date.desc()).first()
if last_buy and last_buy.warehouse_location:
last_location = last_buy.warehouse_location
else:
# 查成品入库
last_product = StockProduct.query.filter(
StockProduct.base_id == item.id
).order_by(StockProduct.in_date.desc()).first()
if last_product and last_product.warehouse_location:
last_location = last_product.warehouse_location
items.append({
'id': item.id,
'company_name': item.company_name,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'brand': getattr(item, 'brand', ''),
'manufacturer': getattr(item, 'manufacturer', ''),
'pinyin': getattr(item, 'pinyin', ''),
'status': '启用',
'history_location': last_location
})
return {
"items": items,
"total": pagination.total,
"page": page,
"has_next": pagination.has_next
}
except Exception as e:
traceback.print_exc()
return {"items": [], "total": 0, "page": 1, "has_next": False}
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("所选物料不存在")
if not material.is_enabled: raise ValueError(f"物料【{material.name}】已停用")
# ============================================================
# 强制质检校验:如果物料标记为强制质检,则必须提供到检状态和检测报告
# ============================================================
if material.is_inspection_required:
inspection_status = data.get('inspection_status')
if not inspection_status:
raise ValueError(f"物料【{material.name}】为强管控物料,必须选择到检状态")
# 检查检测报告:文件列表或外部链接至少有一个
# 前端会将外部链接添加到 inspection_report 数组中一起提交
inspection_report_list = data.get('inspection_report', [])
# 过滤空字符串,只保留有效报告(字符串长度 > 0 且去除空格后非空)
valid_reports = [r for r in inspection_report_list if r and str(r).strip()]
has_report_file = valid_reports and len(valid_reports) > 0
if not has_report_file:
raise ValueError(f"物料【{material.name}】为强管控物料,必须提供检测报告文件或外部链接")
# 校验批号/序列号唯一性
unique_error = BuyInboundService._check_unique(
base_id=base_id,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number')
)
if unique_error:
return {'error': unique_error}
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day, current_time.hour,
current_time.minute, current_time.second)
except:
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
tax_rate = float(data.get('tax_rate') or 0)
# 计算税后单价
post_tax_price = float(data.get('post_tax_unit_price') or 0)
if post_tax_price == 0 and u_price > 0:
tax_multiplier = 1 + (tax_rate / 100)
post_tax_price = u_price * tax_multiplier
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except:
next_global_id = None
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
final_barcode = data.get('barcode') or generated_sku
new_stock = StockBuy(
base_id=material.id, global_print_id=next_global_id, sku=generated_sku, barcode=final_barcode,
in_date=in_date_val, serial_number=data.get('serial_number'), batch_number=data.get('batch_number'),
status=data.get('status', '在库'), in_quantity=in_qty, stock_quantity=in_qty, available_quantity=in_qty,
inspection_status=data.get('inspection_status', '未检'),
warehouse_location=data.get('warehouse_location'),
# 价格信息
pre_tax_unit_price=u_price,
post_tax_unit_price=post_tax_price,
tax_rate=tax_rate,
total_price=in_qty * u_price,
currency=data.get('currency', 'CNY'),
exchange_rate=data.get('exchange_rate', 1.0),
supplier_name=data.get('supplier_name'), buyer_name=data.get('purchaser'),
buyer_email=data.get('purchaser_email'),
original_link=data.get('source_link'), detail_link=data.get('detail_link'),
arrival_photo=json.dumps(data.get('arrival_photo', [])),
inspection_report=json.dumps(data.get('inspection_report', []))
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 3. 更新入库
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
# 获取物料信息用于校验
base_id = data.get('base_id', stock.base_id)
material = MaterialBase.query.get(base_id)
# ============================================================
# 强制质检校验:如果物料标记为强制质检,则必须提供到检状态和检测报告
# ============================================================
if material and material.is_inspection_required:
inspection_status = data.get('inspection_status', stock.inspection_status)
if not inspection_status:
raise ValueError(f"物料【{material.name}】为强管控物料,必须选择到检状态")
# 检查检测报告:文件列表至少有一个
inspection_report_list = data.get('inspection_report')
if inspection_report_list is None:
# 如果没有传入,使用现有的
import json as json_module
try:
existing_reports = json_module.loads(stock.inspection_report) if stock.inspection_report else []
except:
existing_reports = []
# 过滤空字符串,只保留有效报告
valid_reports = [r for r in existing_reports if r and str(r).strip()]
has_report_file = valid_reports and len(valid_reports) > 0
else:
# 过滤空字符串,只保留有效报告
valid_reports = [r for r in inspection_report_list if r and str(r).strip()]
has_report_file = valid_reports and len(valid_reports) > 0
if not has_report_file:
raise ValueError(f"物料【{material.name}】为强管控物料,必须提供检测报告文件或外部链接")
BuyInboundService._check_unique(base_id=base_id,
serial_number=data.get('serial_number', stock.serial_number),
batch_number=data.get('batch_number', stock.batch_number),
exclude_id=stock_id)
field_mapping = {'sku': 'sku', 'barcode': 'barcode', 'base_id': 'base_id',
'warehouse_location': 'warehouse_location', 'serial_number': 'serial_number',
'batch_number': 'batch_number', 'status': 'status',
'inspection_status': 'inspection_status', 'supplier_name': 'supplier_name',
'detail_link': 'detail_link', 'currency': 'currency', 'exchange_rate': 'exchange_rate',
'purchaser': 'buyer_name', 'purchaser_email': 'buyer_email',
'source_link': 'original_link'}
for k, v in field_mapping.items():
if k in data: setattr(stock, v, data[k])
if 'arrival_photo' in data: stock.arrival_photo = json.dumps(data['arrival_photo'])
if 'inspection_report' in data: stock.inspection_report = json.dumps(data['inspection_report'])
# 更新税率
if 'tax_rate' in data:
stock.tax_rate = float(data['tax_rate'])
# 更新税前单价
if 'unit_price' in data:
stock.pre_tax_unit_price = float(data['unit_price'])
# 更新税后单价
if 'post_tax_unit_price' in data:
stock.post_tax_unit_price = float(data['post_tax_unit_price'])
else:
# 如果税后单价没有提供,根据税前单价和税率计算
if 'unit_price' in data or 'tax_rate' in data:
tax_multiplier = 1 + (float(data.get('tax_rate', stock.tax_rate or 0)) / 100)
stock.post_tax_unit_price = float(stock.pre_tax_unit_price) * tax_multiplier
if 'in_quantity' in data:
diff = float(data['in_quantity']) - float(stock.in_quantity)
if diff != 0:
stock.in_quantity = float(data['in_quantity'])
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
# 重新计算总价
stock.total_price = float(stock.in_quantity) * float(stock.pre_tax_unit_price)
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 4. 删除
# ============================================================
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
# 提前获取物料名称用于审计日志(通过外键关系 base.name 获取)
material_name = stock.base.name if stock.base else '未知物料'
db.session.delete(stock)
db.session.commit()
return material_name
except IntegrityError:
db.session.rollback()
raise ValueError("该入库单已被出库、盘点或借还等业务关联,为保证账目完整,禁止删除!")
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 5. 获取列表 (支持排序和高级筛选)
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, sku=None, search_field='all', statuses=None, category=None, material_type=None, company=None,
order_by='', is_asc='', advanced_filters=None):
try:
from sqlalchemy import and_, or_
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
# 1. 通用关键词搜索(支持指定字段精准搜索)
if keyword:
k_str = f'%{keyword.strip()}%'
if search_field == 'name':
query = query.filter(MaterialBase.name.ilike(k_str))
elif search_field == 'spec':
query = query.filter(MaterialBase.spec_model.ilike(k_str))
elif search_field == 'common_name':
query = query.filter(MaterialBase.common_name.ilike(k_str))
elif search_field == 'barcode':
query = query.filter(StockBuy.barcode.ilike(k_str))
elif search_field == 'batch_number':
query = query.filter(StockBuy.batch_number.ilike(k_str))
elif search_field == 'supplier_name':
query = query.filter(StockBuy.supplier_name.ilike(k_str))
elif search_field == 'buyer_name':
query = query.filter(StockBuy.buyer_name.ilike(k_str))
else: # 'all' 默认全局模糊匹配
conditions = [
StockBuy.barcode.ilike(k_str),
StockBuy.batch_number.ilike(k_str),
StockBuy.serial_number.ilike(k_str),
StockBuy.supplier_name.ilike(k_str),
StockBuy.buyer_name.ilike(k_str),
MaterialBase.name.ilike(k_str),
MaterialBase.spec_model.ilike(k_str),
MaterialBase.company_name.ilike(k_str),
]
query = query.filter(or_(*conditions))
# 1.1 SKU 独立搜索
if sku and sku.strip():
sku_str = f'%{sku.strip()}%'
query = query.filter(StockBuy.sku.ilike(sku_str))
# 2. 类别独立搜索
if category and category.strip():
query = query.filter(MaterialBase.category == category.strip())
# 3. 类型独立搜索
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
# 3.1 公司独立搜索 [新增]
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())
# 4. 状态筛选
if not statuses: statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockBuy.status.in_(statuses))
else:
query = query.filter(and_(StockBuy.status.in_(statuses), StockBuy.stock_quantity > 0))
# 5. 高级动态筛选
if advanced_filters:
allowed_fields = {
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'material_type': MaterialBase.material_type,
'category': MaterialBase.category,
'spec_model': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'sku': StockBuy.sku,
'barcode': StockBuy.barcode,
'batch_number': StockBuy.batch_number,
'serial_number': StockBuy.serial_number,
'warehouse_location': StockBuy.warehouse_location,
'status': StockBuy.status,
'inspection_status': StockBuy.inspection_status,
'qty_inbound': StockBuy.in_quantity,
'qty_stock': StockBuy.stock_quantity,
'qty_available': StockBuy.available_quantity,
'unit_price': StockBuy.pre_tax_unit_price,
'total_price': StockBuy.total_price,
'tax_rate': StockBuy.tax_rate,
'currency': StockBuy.currency,
'exchange_rate': StockBuy.exchange_rate,
'supplier_name': StockBuy.supplier_name,
'purchaser': StockBuy.buyer_name,
'purchaser_email': StockBuy.buyer_email,
'source_link': StockBuy.original_link,
'detail_link': StockBuy.detail_link,
}
filter_conditions = []
for condition in advanced_filters:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
if not field or not operator or value is None:
continue
column = allowed_fields.get(field)
if column is None:
continue
if operator == 'eq':
filter_conditions.append(column == value)
elif operator == 'ne':
filter_conditions.append(column != value)
elif operator == 'contains':
filter_conditions.append(column.ilike(f'%{value}%'))
elif operator == 'not_contains':
filter_conditions.append(~column.ilike(f'%{value}%'))
elif operator == 'ge':
try:
num_val = float(value)
filter_conditions.append(column >= num_val)
except ValueError:
continue
elif operator == 'le':
try:
num_val = float(value)
filter_conditions.append(column <= num_val)
except ValueError:
continue
if filter_conditions:
query = query.filter(and_(*filter_conditions))
# 6. 排序处理
if order_by:
sort_field_map = {
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'material_type': MaterialBase.material_type,
'category': MaterialBase.category,
'spec_model': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'sku': StockBuy.sku,
'barcode': StockBuy.barcode,
'inbound_date': StockBuy.in_date,
'serial_number': StockBuy.serial_number,
'batch_number': StockBuy.batch_number,
'status': StockBuy.status,
'inspection_status': StockBuy.inspection_status,
'qty_inbound': StockBuy.in_quantity,
'qty_stock': StockBuy.stock_quantity,
'qty_available': StockBuy.available_quantity,
'warehouse_loc': StockBuy.warehouse_location,
'unit_price': StockBuy.pre_tax_unit_price,
'total_price': StockBuy.total_price,
'tax_rate': StockBuy.tax_rate,
'currency': StockBuy.currency,
'exchange_rate': StockBuy.exchange_rate,
'supplier_name': StockBuy.supplier_name,
'purchaser': StockBuy.buyer_name,
'purchaser_email': StockBuy.buyer_email,
'source_link': StockBuy.original_link,
'detail_link': StockBuy.detail_link,
}
column = sort_field_map.get(order_by)
if column:
if is_asc == 'asc':
query = query.order_by(column.asc())
elif is_asc == 'desc':
query = query.order_by(column.desc())
else:
# 默认排序
query = query.order_by(StockBuy.in_date.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
items.append(item.to_dict()) # 直接使用 model 的 to_dict
return {"total": pagination.total, "items": items}
except Exception:
traceback.print_exc()
return {"total": 0, "items": []}
# ============================================================
# 6. 获取筛选选项(类别、类型、公司)并排序
# ============================================================
@staticmethod
def get_filter_options():
try:
# 类别
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
sorted_categories = sorted([r[0] for r in categories])
# 类型
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
sorted_types = sorted([r[0] for r in types])
# [新增] 公司
companies = db.session.query(MaterialBase.company_name) \
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
.distinct().all()
sorted_companies = sorted([r[0] for r in companies])
return {
"categories": sorted_categories,
"types": sorted_types,
"companies": sorted_companies
}
except Exception:
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
# 7-10 建议类接口保持不变
@staticmethod
def get_history_suppliers(base_id):
return [r[0] for r in db.session.query(StockBuy.supplier_name).filter(StockBuy.base_id == base_id,
StockBuy.supplier_name != '').distinct().all()]
@staticmethod
def get_history_purchasers(keyword):
return [{'value': r.buyer_name, 'email': r.buyer_email} for r in
db.session.query(StockBuy.buyer_name, StockBuy.buyer_email).filter(
StockBuy.buyer_name != '').distinct().all()]
@staticmethod
def get_history_links(base_id, type):
return [r[0] for r in
db.session.query(StockBuy.original_link if type == 'original' else StockBuy.detail_link).filter(
StockBuy.base_id == base_id).distinct().all()]
@staticmethod
def get_history_locations(base_id):
return [r[0] for r in
db.session.query(StockBuy.warehouse_location).filter(StockBuy.base_id == base_id).distinct().all()]