Files
KCGL/inventory-backend/app/services/inbound/buy_service.py

585 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# inventory-backend/app/services/inbound/buy_service.py
from flask_jwt_extended import get_jwt
from app.extensions import db
from app.models.inbound.buy import StockBuy
from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
from sqlalchemy.exc import IntegrityError
import traceback
import json
class BuyInboundService:
# ============================================================
# 0. 辅助:唯一性校验
# ============================================================
@staticmethod
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
if serial_number:
query = StockBuy.query.filter(StockBuy.serial_number == serial_number)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.base.name if exists.base else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被物料 [{occupied_name}] 占用,请核查。")
if batch_number and base_id:
query = StockBuy.query.filter(
StockBuy.base_id == base_id,
StockBuy.batch_number == batch_number
)
if exclude_id:
query = query.filter(StockBuy.id != exclude_id)
if query.first():
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复录入,可直接在该批次下追加库存。")
# ============================================================
# 1. 基础物料搜索
# ============================================================
@staticmethod
def search_base_material(keyword, page=1, limit=50):
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
k = keyword.strip()
k_str = f'%{k}%'
query = query.filter(and_(
or_(
MaterialBase.name.ilike(k_str),
MaterialBase.spec_model.ilike(k_str),
MaterialBase.company_name.ilike(k_str) # 支持搜公司
)
))
query = query.order_by(MaterialBase.id.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
items.append({
'id': item.id,
'company_name': item.company_name, # [新增]
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'brand': getattr(item, 'brand', ''),
'manufacturer': getattr(item, 'manufacturer', ''),
'pinyin': getattr(item, 'pinyin', ''),
'status': '启用'
})
return {
"items": items,
"total": pagination.total,
"page": page,
"has_next": pagination.has_next
}
except Exception as e:
traceback.print_exc()
return {"items": [], "total": 0, "page": 1, "has_next": False}
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("所选物料不存在")
if not material.is_enabled: raise ValueError(f"物料【{material.name}】已停用")
# ============================================================
# 强制质检校验:如果物料标记为强制质检,则必须提供到检状态和检测报告
# ============================================================
if material.is_inspection_required:
inspection_status = data.get('inspection_status')
if not inspection_status:
raise ValueError(f"物料【{material.name}】为强管控物料,必须选择到检状态")
# 检查检测报告:文件列表或外部链接至少有一个
# 前端会将外部链接添加到 inspection_report 数组中一起提交
inspection_report_list = data.get('inspection_report', [])
# 过滤空字符串,只保留有效报告(字符串长度 > 0 且去除空格后非空)
valid_reports = [r for r in inspection_report_list if r and str(r).strip()]
has_report_file = valid_reports and len(valid_reports) > 0
if not has_report_file:
raise ValueError(f"物料【{material.name}】为强管控物料,必须提供检测报告文件或外部链接")
BuyInboundService._check_unique(
base_id=base_id,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number')
)
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day, current_time.hour,
current_time.minute, current_time.second)
except:
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
u_price = float(data.get('unit_price') or 0)
tax_rate = float(data.get('tax_rate') or 0)
# 计算税后单价
post_tax_price = float(data.get('post_tax_unit_price') or 0)
if post_tax_price == 0 and u_price > 0:
tax_multiplier = 1 + (tax_rate / 100)
post_tax_price = u_price * tax_multiplier
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except:
next_global_id = None
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
final_barcode = data.get('barcode') or generated_sku
new_stock = StockBuy(
base_id=material.id, global_print_id=next_global_id, sku=generated_sku, barcode=final_barcode,
in_date=in_date_val, serial_number=data.get('serial_number'), batch_number=data.get('batch_number'),
status=data.get('status', '在库'), in_quantity=in_qty, stock_quantity=in_qty, available_quantity=in_qty,
inspection_status=data.get('inspection_status', '未检'),
warehouse_location=data.get('warehouse_location'),
# 价格信息
pre_tax_unit_price=u_price,
post_tax_unit_price=post_tax_price,
tax_rate=tax_rate,
total_price=in_qty * u_price,
currency=data.get('currency', 'CNY'),
exchange_rate=data.get('exchange_rate', 1.0),
supplier_name=data.get('supplier_name'), buyer_name=data.get('purchaser'),
buyer_email=data.get('purchaser_email'),
original_link=data.get('source_link'), detail_link=data.get('detail_link'),
arrival_photo=json.dumps(data.get('arrival_photo', [])),
inspection_report=json.dumps(data.get('inspection_report', []))
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 3. 更新入库
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
# 获取物料信息用于校验
base_id = data.get('base_id', stock.base_id)
material = MaterialBase.query.get(base_id)
# ============================================================
# 强制质检校验:如果物料标记为强制质检,则必须提供到检状态和检测报告
# ============================================================
if material and material.is_inspection_required:
inspection_status = data.get('inspection_status', stock.inspection_status)
if not inspection_status:
raise ValueError(f"物料【{material.name}】为强管控物料,必须选择到检状态")
# 检查检测报告:文件列表至少有一个
inspection_report_list = data.get('inspection_report')
if inspection_report_list is None:
# 如果没有传入,使用现有的
import json as json_module
try:
existing_reports = json_module.loads(stock.inspection_report) if stock.inspection_report else []
except:
existing_reports = []
# 过滤空字符串,只保留有效报告
valid_reports = [r for r in existing_reports if r and str(r).strip()]
has_report_file = valid_reports and len(valid_reports) > 0
else:
# 过滤空字符串,只保留有效报告
valid_reports = [r for r in inspection_report_list if r and str(r).strip()]
has_report_file = valid_reports and len(valid_reports) > 0
if not has_report_file:
raise ValueError(f"物料【{material.name}】为强管控物料,必须提供检测报告文件或外部链接")
BuyInboundService._check_unique(base_id=base_id,
serial_number=data.get('serial_number', stock.serial_number),
batch_number=data.get('batch_number', stock.batch_number),
exclude_id=stock_id)
field_mapping = {'sku': 'sku', 'barcode': 'barcode', 'base_id': 'base_id',
'warehouse_location': 'warehouse_location', 'serial_number': 'serial_number',
'batch_number': 'batch_number', 'status': 'status',
'inspection_status': 'inspection_status', 'supplier_name': 'supplier_name',
'detail_link': 'detail_link', 'currency': 'currency', 'exchange_rate': 'exchange_rate',
'purchaser': 'buyer_name', 'purchaser_email': 'buyer_email',
'source_link': 'original_link'}
for k, v in field_mapping.items():
if k in data: setattr(stock, v, data[k])
if 'arrival_photo' in data: stock.arrival_photo = json.dumps(data['arrival_photo'])
if 'inspection_report' in data: stock.inspection_report = json.dumps(data['inspection_report'])
# 更新税率
if 'tax_rate' in data:
stock.tax_rate = float(data['tax_rate'])
# 更新税前单价
if 'unit_price' in data:
stock.pre_tax_unit_price = float(data['unit_price'])
# 更新税后单价
if 'post_tax_unit_price' in data:
stock.post_tax_unit_price = float(data['post_tax_unit_price'])
else:
# 如果税后单价没有提供,根据税前单价和税率计算
if 'unit_price' in data or 'tax_rate' in data:
tax_multiplier = 1 + (float(data.get('tax_rate', stock.tax_rate or 0)) / 100)
stock.post_tax_unit_price = float(stock.pre_tax_unit_price) * tax_multiplier
if 'in_quantity' in data:
diff = float(data['in_quantity']) - float(stock.in_quantity)
if diff != 0:
stock.in_quantity = float(data['in_quantity'])
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
# 重新计算总价
stock.total_price = float(stock.in_quantity) * float(stock.pre_tax_unit_price)
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 4. 删除
# ============================================================
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockBuy.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
# 提前获取物料名称用于审计日志(通过外键关系 base.name 获取)
material_name = stock.base.name if stock.base else '未知物料'
db.session.delete(stock)
db.session.commit()
return material_name
except IntegrityError:
db.session.rollback()
raise ValueError("该入库单已被出库、盘点或借还等业务关联,为保证账目完整,禁止删除!")
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 5. 获取列表 (支持排序和高级筛选)
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, sku=None, search_field='all', statuses=None, category=None, material_type=None, company=None,
order_by='', is_asc='', advanced_filters=None):
try:
from sqlalchemy import and_, or_
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
# 1. 通用关键词搜索(支持指定字段精准搜索)
if keyword:
k_str = f'%{keyword.strip()}%'
if search_field == 'name':
query = query.filter(MaterialBase.name.ilike(k_str))
elif search_field == 'spec':
query = query.filter(MaterialBase.spec_model.ilike(k_str))
elif search_field == 'common_name':
query = query.filter(MaterialBase.common_name.ilike(k_str))
elif search_field == 'barcode':
query = query.filter(StockBuy.barcode.ilike(k_str))
elif search_field == 'batch_number':
query = query.filter(StockBuy.batch_number.ilike(k_str))
elif search_field == 'supplier_name':
query = query.filter(StockBuy.supplier_name.ilike(k_str))
elif search_field == 'buyer_name':
query = query.filter(StockBuy.buyer_name.ilike(k_str))
else: # 'all' 默认全局模糊匹配
conditions = [
StockBuy.barcode.ilike(k_str),
StockBuy.batch_number.ilike(k_str),
StockBuy.serial_number.ilike(k_str),
StockBuy.supplier_name.ilike(k_str),
StockBuy.buyer_name.ilike(k_str),
MaterialBase.name.ilike(k_str),
MaterialBase.spec_model.ilike(k_str),
MaterialBase.company_name.ilike(k_str),
]
query = query.filter(or_(*conditions))
# 1.1 SKU 独立搜索
if sku and sku.strip():
sku_str = f'%{sku.strip()}%'
query = query.filter(StockBuy.sku.ilike(sku_str))
# 2. 类别独立搜索
if category and category.strip():
query = query.filter(MaterialBase.category == category.strip())
# 3. 类型独立搜索
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
# ============================================================
# 【行级数据隔离】基于 JWT 中的 company_name 进行过滤
# ============================================================
claims = get_jwt()
user_role = claims.get('role', '').upper() if claims.get('role') else ''
user_company = claims.get('company_name', '')
if user_role != 'SUPER_ADMIN':
# 【显式拒绝越权】如果前端传了公司参数且不是当前用户的公司返回403
if company and company.strip() and company.strip() != user_company:
from flask import abort
abort(403, description=f'越权访问:您无权查询 {company} 的数据')
# 正常查询本公司数据
if user_company:
query = query.filter(MaterialBase.company_name == user_company)
else:
# 超级管理员:允许跨公司视角
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())
# 4. 状态筛选
if not statuses: statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockBuy.status.in_(statuses))
else:
query = query.filter(and_(StockBuy.status.in_(statuses), StockBuy.stock_quantity > 0))
# 5. 高级动态筛选
if advanced_filters:
allowed_fields = {
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'material_type': MaterialBase.material_type,
'category': MaterialBase.category,
'spec_model': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'sku': StockBuy.sku,
'barcode': StockBuy.barcode,
'batch_number': StockBuy.batch_number,
'serial_number': StockBuy.serial_number,
'warehouse_location': StockBuy.warehouse_location,
'status': StockBuy.status,
'inspection_status': StockBuy.inspection_status,
'qty_inbound': StockBuy.in_quantity,
'qty_stock': StockBuy.stock_quantity,
'qty_available': StockBuy.available_quantity,
'unit_price': StockBuy.pre_tax_unit_price,
'total_price': StockBuy.total_price,
'tax_rate': StockBuy.tax_rate,
'currency': StockBuy.currency,
'exchange_rate': StockBuy.exchange_rate,
'supplier_name': StockBuy.supplier_name,
'purchaser': StockBuy.buyer_name,
'purchaser_email': StockBuy.buyer_email,
'source_link': StockBuy.original_link,
'detail_link': StockBuy.detail_link,
}
filter_conditions = []
for condition in advanced_filters:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
if not field or not operator or value is None:
continue
column = allowed_fields.get(field)
if column is None:
continue
if operator == 'eq':
filter_conditions.append(column == value)
elif operator == 'ne':
filter_conditions.append(column != value)
elif operator == 'contains':
filter_conditions.append(column.ilike(f'%{value}%'))
elif operator == 'not_contains':
filter_conditions.append(~column.ilike(f'%{value}%'))
elif operator == 'ge':
try:
num_val = float(value)
filter_conditions.append(column >= num_val)
except ValueError:
continue
elif operator == 'le':
try:
num_val = float(value)
filter_conditions.append(column <= num_val)
except ValueError:
continue
if filter_conditions:
query = query.filter(and_(*filter_conditions))
# 6. 排序处理
if order_by:
sort_field_map = {
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'material_type': MaterialBase.material_type,
'category': MaterialBase.category,
'spec_model': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'sku': StockBuy.sku,
'barcode': StockBuy.barcode,
'inbound_date': StockBuy.in_date,
'serial_number': StockBuy.serial_number,
'batch_number': StockBuy.batch_number,
'status': StockBuy.status,
'inspection_status': StockBuy.inspection_status,
'qty_inbound': StockBuy.in_quantity,
'qty_stock': StockBuy.stock_quantity,
'qty_available': StockBuy.available_quantity,
'warehouse_loc': StockBuy.warehouse_location,
'unit_price': StockBuy.pre_tax_unit_price,
'total_price': StockBuy.total_price,
'tax_rate': StockBuy.tax_rate,
'currency': StockBuy.currency,
'exchange_rate': StockBuy.exchange_rate,
'supplier_name': StockBuy.supplier_name,
'purchaser': StockBuy.buyer_name,
'purchaser_email': StockBuy.buyer_email,
'source_link': StockBuy.original_link,
'detail_link': StockBuy.detail_link,
}
column = sort_field_map.get(order_by)
if column:
if is_asc == 'asc':
query = query.order_by(column.asc())
elif is_asc == 'desc':
query = query.order_by(column.desc())
else:
# 默认排序
query = query.order_by(StockBuy.in_date.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
items.append(item.to_dict()) # 直接使用 model 的 to_dict
return {"total": pagination.total, "items": items}
except Exception:
traceback.print_exc()
return {"total": 0, "items": []}
# ============================================================
# 6. 获取筛选选项(类别、类型、公司)并排序
# ============================================================
@staticmethod
def get_filter_options():
try:
# 类别
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
sorted_categories = sorted([r[0] for r in categories])
# 类型
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
sorted_types = sorted([r[0] for r in types])
# [新增] 公司
companies = db.session.query(MaterialBase.company_name) \
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
.distinct().all()
sorted_companies = sorted([r[0] for r in companies])
return {
"categories": sorted_categories,
"types": sorted_types,
"companies": sorted_companies
}
except Exception:
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
# 7-10 建议类接口保持不变
@staticmethod
def get_history_suppliers(base_id):
return [r[0] for r in db.session.query(StockBuy.supplier_name).filter(StockBuy.base_id == base_id,
StockBuy.supplier_name != '').distinct().all()]
@staticmethod
def get_history_purchasers(keyword):
return [{'value': r.buyer_name, 'email': r.buyer_email} for r in
db.session.query(StockBuy.buyer_name, StockBuy.buyer_email).filter(
StockBuy.buyer_name != '').distinct().all()]
@staticmethod
def get_history_links(base_id, type):
return [r[0] for r in
db.session.query(StockBuy.original_link if type == 'original' else StockBuy.detail_link).filter(
StockBuy.base_id == base_id).distinct().all()]
@staticmethod
def get_history_locations(base_id):
return [r[0] for r in
db.session.query(StockBuy.warehouse_location).filter(StockBuy.base_id == base_id).distinct().all()]
@staticmethod
def get_last_location_by_base_id(base_id):
"""
获取指定物料最近一次入库的库位(跨表查询)
查询顺序:采购入库 -> 成品入库 -> 半成品入库,返回最新入库的库位
"""
from app.models.inbound.semi import StockSemi
# 1. 查询采购入库最新记录
last_buy = StockBuy.query.filter(
StockBuy.base_id == base_id
).order_by(StockBuy.in_date.desc()).first()
# 2. 查询成品入库最新记录
last_product = StockProduct.query.filter(
StockProduct.base_id == base_id
).order_by(StockProduct.production_date.desc()).first()
# 3. 查询半成品入库最新记录
last_semi = StockSemi.query.filter(
StockSemi.base_id == base_id
).order_by(StockSemi.production_date.desc()).first()
# 比较三个表中的最新入库时间,返回最新的库位
candidates = []
if last_buy and last_buy.warehouse_location:
candidates.append((last_buy.in_date, last_buy.warehouse_location))
if last_product and last_product.warehouse_location:
candidates.append((last_product.production_date, last_product.warehouse_location))
if last_semi and last_semi.warehouse_location:
candidates.append((last_semi.production_date, last_semi.warehouse_location))
if not candidates:
return ""
# 按时间倒序排序,返回最新的库位
candidates.sort(key=lambda x: x[0] if x[0] else datetime.min, reverse=True)
return candidates[0][1] if candidates[0][1] else ""