581 lines
28 KiB
Python
581 lines
28 KiB
Python
# inventory-backend/app/services/inbound/buy_service.py
|
|
from flask_jwt_extended import get_jwt
|
|
from app.extensions import db
|
|
from app.models.inbound.buy import StockBuy
|
|
from app.models.inbound.product import StockProduct
|
|
from app.models.base import MaterialBase
|
|
from datetime import datetime, timedelta, timezone
|
|
from sqlalchemy import or_, func, text, and_
|
|
from sqlalchemy.exc import IntegrityError
|
|
import traceback
|
|
import json
|
|
|
|
|
|
class BuyInboundService:
|
|
|
|
# ============================================================
|
|
# 0. 辅助:唯一性校验
|
|
# ============================================================
|
|
@staticmethod
|
|
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
|
|
if serial_number:
|
|
query = StockBuy.query.filter(StockBuy.serial_number == serial_number)
|
|
if exclude_id:
|
|
query = query.filter(StockBuy.id != exclude_id)
|
|
exists = query.first()
|
|
if exists:
|
|
occupied_name = exists.base.name if exists.base else "未知物料"
|
|
raise ValueError(f"序列号【{serial_number}】已存在!被物料 [{occupied_name}] 占用,请核查。")
|
|
|
|
if batch_number and base_id:
|
|
query = StockBuy.query.filter(
|
|
StockBuy.base_id == base_id,
|
|
StockBuy.batch_number == batch_number
|
|
)
|
|
if exclude_id:
|
|
query = query.filter(StockBuy.id != exclude_id)
|
|
if query.first():
|
|
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复录入,可直接在该批次下追加库存。")
|
|
|
|
# ============================================================
|
|
# 1. 基础物料搜索
|
|
# ============================================================
|
|
@staticmethod
|
|
def search_base_material(keyword, page=1, limit=50):
|
|
try:
|
|
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
|
|
|
|
if keyword:
|
|
k = keyword.strip()
|
|
k_str = f'%{k}%'
|
|
query = query.filter(and_(
|
|
or_(
|
|
MaterialBase.name.ilike(k_str),
|
|
MaterialBase.spec_model.ilike(k_str),
|
|
MaterialBase.company_name.ilike(k_str) # 支持搜公司
|
|
)
|
|
))
|
|
|
|
query = query.order_by(MaterialBase.id.desc())
|
|
pagination = query.paginate(page=page, per_page=limit, error_out=False)
|
|
|
|
items = []
|
|
for item in pagination.items:
|
|
items.append({
|
|
'id': item.id,
|
|
'company_name': item.company_name, # [新增]
|
|
'name': item.name,
|
|
'spec': item.spec_model,
|
|
'category': item.category,
|
|
'unit': item.unit,
|
|
'type': item.material_type,
|
|
'brand': getattr(item, 'brand', ''),
|
|
'manufacturer': getattr(item, 'manufacturer', ''),
|
|
'pinyin': getattr(item, 'pinyin', ''),
|
|
'status': '启用'
|
|
})
|
|
|
|
return {
|
|
"items": items,
|
|
"total": pagination.total,
|
|
"page": page,
|
|
"has_next": pagination.has_next
|
|
}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return {"items": [], "total": 0, "page": 1, "has_next": False}
|
|
|
|
# ============================================================
|
|
# 2. 新增入库逻辑
|
|
# ============================================================
|
|
@staticmethod
|
|
def handle_inbound(data):
|
|
try:
|
|
base_id = data.get('base_id')
|
|
if not base_id: raise ValueError("必须选择基础物料")
|
|
material = MaterialBase.query.get(base_id)
|
|
if not material: raise ValueError("所选物料不存在")
|
|
if not material.is_enabled: raise ValueError(f"物料【{material.name}】已停用")
|
|
|
|
# ============================================================
|
|
# 强制质检校验:如果物料标记为强制质检,则必须提供到检状态和检测报告
|
|
# ============================================================
|
|
if material.is_inspection_required:
|
|
inspection_status = data.get('inspection_status')
|
|
if not inspection_status:
|
|
raise ValueError(f"物料【{material.name}】为强管控物料,必须选择到检状态")
|
|
|
|
# 检查检测报告:文件列表或外部链接至少有一个
|
|
# 前端会将外部链接添加到 inspection_report 数组中一起提交
|
|
inspection_report_list = data.get('inspection_report', [])
|
|
# 过滤空字符串,只保留有效报告(字符串长度 > 0 且去除空格后非空)
|
|
valid_reports = [r for r in inspection_report_list if r and str(r).strip()]
|
|
has_report_file = valid_reports and len(valid_reports) > 0
|
|
|
|
if not has_report_file:
|
|
raise ValueError(f"物料【{material.name}】为强管控物料,必须提供检测报告文件或外部链接")
|
|
|
|
BuyInboundService._check_unique(
|
|
base_id=base_id,
|
|
serial_number=data.get('serial_number'),
|
|
batch_number=data.get('batch_number')
|
|
)
|
|
|
|
beijing_tz = timezone(timedelta(hours=8))
|
|
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
|
|
in_date_val = current_time
|
|
if data.get('in_date'):
|
|
try:
|
|
date_str = str(data['in_date'])
|
|
if len(date_str) > 10:
|
|
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
|
|
else:
|
|
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
|
|
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day, current_time.hour,
|
|
current_time.minute, current_time.second)
|
|
except:
|
|
in_date_val = current_time
|
|
|
|
in_qty = float(data.get('in_quantity') or 0)
|
|
u_price = float(data.get('unit_price') or 0)
|
|
tax_rate = float(data.get('tax_rate') or 0)
|
|
|
|
# 计算税后单价
|
|
post_tax_price = float(data.get('post_tax_unit_price') or 0)
|
|
if post_tax_price == 0 and u_price > 0:
|
|
tax_multiplier = 1 + (tax_rate / 100)
|
|
post_tax_price = u_price * tax_multiplier
|
|
|
|
try:
|
|
seq_sql = text("SELECT nextval('global_print_seq')")
|
|
result = db.session.execute(seq_sql)
|
|
next_global_id = result.scalar()
|
|
except:
|
|
next_global_id = None
|
|
|
|
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
|
|
final_barcode = data.get('barcode') or generated_sku
|
|
|
|
new_stock = StockBuy(
|
|
base_id=material.id, global_print_id=next_global_id, sku=generated_sku, barcode=final_barcode,
|
|
in_date=in_date_val, serial_number=data.get('serial_number'), batch_number=data.get('batch_number'),
|
|
status=data.get('status', '在库'), in_quantity=in_qty, stock_quantity=in_qty, available_quantity=in_qty,
|
|
inspection_status=data.get('inspection_status', '未检'),
|
|
warehouse_location=data.get('warehouse_location'),
|
|
|
|
# 价格信息
|
|
pre_tax_unit_price=u_price,
|
|
post_tax_unit_price=post_tax_price,
|
|
tax_rate=tax_rate,
|
|
total_price=in_qty * u_price,
|
|
currency=data.get('currency', 'CNY'),
|
|
exchange_rate=data.get('exchange_rate', 1.0),
|
|
|
|
supplier_name=data.get('supplier_name'), buyer_name=data.get('purchaser'),
|
|
buyer_email=data.get('purchaser_email'),
|
|
original_link=data.get('source_link'), detail_link=data.get('detail_link'),
|
|
arrival_photo=json.dumps(data.get('arrival_photo', [])),
|
|
inspection_report=json.dumps(data.get('inspection_report', []))
|
|
)
|
|
db.session.add(new_stock)
|
|
db.session.commit()
|
|
return new_stock
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
raise e
|
|
|
|
# ============================================================
|
|
# 3. 更新入库
|
|
# ============================================================
|
|
@staticmethod
|
|
def update_inbound(stock_id, data):
|
|
try:
|
|
stock = StockBuy.query.get(stock_id)
|
|
if not stock: raise ValueError("记录不存在")
|
|
|
|
# 获取物料信息用于校验
|
|
base_id = data.get('base_id', stock.base_id)
|
|
material = MaterialBase.query.get(base_id)
|
|
|
|
# ============================================================
|
|
# 强制质检校验:如果物料标记为强制质检,则必须提供到检状态和检测报告
|
|
# ============================================================
|
|
if material and material.is_inspection_required:
|
|
inspection_status = data.get('inspection_status', stock.inspection_status)
|
|
if not inspection_status:
|
|
raise ValueError(f"物料【{material.name}】为强管控物料,必须选择到检状态")
|
|
|
|
# 检查检测报告:文件列表至少有一个
|
|
inspection_report_list = data.get('inspection_report')
|
|
if inspection_report_list is None:
|
|
# 如果没有传入,使用现有的
|
|
import json as json_module
|
|
try:
|
|
existing_reports = json_module.loads(stock.inspection_report) if stock.inspection_report else []
|
|
except:
|
|
existing_reports = []
|
|
# 过滤空字符串,只保留有效报告
|
|
valid_reports = [r for r in existing_reports if r and str(r).strip()]
|
|
has_report_file = valid_reports and len(valid_reports) > 0
|
|
else:
|
|
# 过滤空字符串,只保留有效报告
|
|
valid_reports = [r for r in inspection_report_list if r and str(r).strip()]
|
|
has_report_file = valid_reports and len(valid_reports) > 0
|
|
|
|
if not has_report_file:
|
|
raise ValueError(f"物料【{material.name}】为强管控物料,必须提供检测报告文件或外部链接")
|
|
|
|
BuyInboundService._check_unique(base_id=base_id,
|
|
serial_number=data.get('serial_number', stock.serial_number),
|
|
batch_number=data.get('batch_number', stock.batch_number),
|
|
exclude_id=stock_id)
|
|
|
|
field_mapping = {'sku': 'sku', 'barcode': 'barcode', 'base_id': 'base_id',
|
|
'warehouse_location': 'warehouse_location', 'serial_number': 'serial_number',
|
|
'batch_number': 'batch_number', 'status': 'status',
|
|
'inspection_status': 'inspection_status', 'supplier_name': 'supplier_name',
|
|
'detail_link': 'detail_link', 'currency': 'currency', 'exchange_rate': 'exchange_rate',
|
|
'purchaser': 'buyer_name', 'purchaser_email': 'buyer_email',
|
|
'source_link': 'original_link'}
|
|
for k, v in field_mapping.items():
|
|
if k in data: setattr(stock, v, data[k])
|
|
|
|
if 'arrival_photo' in data: stock.arrival_photo = json.dumps(data['arrival_photo'])
|
|
if 'inspection_report' in data: stock.inspection_report = json.dumps(data['inspection_report'])
|
|
|
|
# 更新税率
|
|
if 'tax_rate' in data:
|
|
stock.tax_rate = float(data['tax_rate'])
|
|
|
|
# 更新税前单价
|
|
if 'unit_price' in data:
|
|
stock.pre_tax_unit_price = float(data['unit_price'])
|
|
|
|
# 更新税后单价
|
|
if 'post_tax_unit_price' in data:
|
|
stock.post_tax_unit_price = float(data['post_tax_unit_price'])
|
|
else:
|
|
# 如果税后单价没有提供,根据税前单价和税率计算
|
|
if 'unit_price' in data or 'tax_rate' in data:
|
|
tax_multiplier = 1 + (float(data.get('tax_rate', stock.tax_rate or 0)) / 100)
|
|
stock.post_tax_unit_price = float(stock.pre_tax_unit_price) * tax_multiplier
|
|
|
|
if 'in_quantity' in data:
|
|
diff = float(data['in_quantity']) - float(stock.in_quantity)
|
|
if diff != 0:
|
|
stock.in_quantity = float(data['in_quantity'])
|
|
stock.stock_quantity = float(stock.stock_quantity) + diff
|
|
stock.available_quantity = float(stock.available_quantity) + diff
|
|
|
|
# 重新计算总价
|
|
stock.total_price = float(stock.in_quantity) * float(stock.pre_tax_unit_price)
|
|
db.session.commit()
|
|
return stock
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
raise e
|
|
|
|
# ============================================================
|
|
# 4. 删除
|
|
# ============================================================
|
|
@staticmethod
|
|
def delete_inbound(stock_id):
|
|
try:
|
|
stock = StockBuy.query.get(stock_id)
|
|
if not stock: raise ValueError("记录不存在")
|
|
# 提前获取物料名称用于审计日志(通过外键关系 base.name 获取)
|
|
material_name = stock.base.name if stock.base else '未知物料'
|
|
db.session.delete(stock)
|
|
db.session.commit()
|
|
return material_name
|
|
except IntegrityError:
|
|
db.session.rollback()
|
|
raise ValueError("该入库单已被出库、盘点或借还等业务关联,为保证账目完整,禁止删除!")
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
raise e
|
|
|
|
# ============================================================
|
|
# 5. 获取列表 (支持排序和高级筛选)
|
|
# ============================================================
|
|
@staticmethod
|
|
def get_list(page, limit, keyword=None, sku=None, search_field='all', statuses=None, category=None, material_type=None, company=None,
|
|
order_by='', is_asc='', advanced_filters=None):
|
|
try:
|
|
from sqlalchemy import and_, or_
|
|
query = db.session.query(StockBuy).outerjoin(MaterialBase, StockBuy.base_id == MaterialBase.id)
|
|
|
|
# 1. 通用关键词搜索(支持指定字段精准搜索)
|
|
if keyword:
|
|
k_str = f'%{keyword.strip()}%'
|
|
if search_field == 'name':
|
|
query = query.filter(MaterialBase.name.ilike(k_str))
|
|
elif search_field == 'spec':
|
|
query = query.filter(MaterialBase.spec_model.ilike(k_str))
|
|
elif search_field == 'common_name':
|
|
query = query.filter(MaterialBase.common_name.ilike(k_str))
|
|
elif search_field == 'barcode':
|
|
query = query.filter(StockBuy.barcode.ilike(k_str))
|
|
elif search_field == 'batch_number':
|
|
query = query.filter(StockBuy.batch_number.ilike(k_str))
|
|
elif search_field == 'supplier_name':
|
|
query = query.filter(StockBuy.supplier_name.ilike(k_str))
|
|
elif search_field == 'buyer_name':
|
|
query = query.filter(StockBuy.buyer_name.ilike(k_str))
|
|
else: # 'all' 默认全局模糊匹配
|
|
conditions = [
|
|
StockBuy.barcode.ilike(k_str),
|
|
StockBuy.batch_number.ilike(k_str),
|
|
StockBuy.serial_number.ilike(k_str),
|
|
StockBuy.supplier_name.ilike(k_str),
|
|
StockBuy.buyer_name.ilike(k_str),
|
|
MaterialBase.name.ilike(k_str),
|
|
MaterialBase.spec_model.ilike(k_str),
|
|
MaterialBase.company_name.ilike(k_str),
|
|
]
|
|
query = query.filter(or_(*conditions))
|
|
|
|
# 1.1 SKU 独立搜索
|
|
if sku and sku.strip():
|
|
sku_str = f'%{sku.strip()}%'
|
|
query = query.filter(StockBuy.sku.ilike(sku_str))
|
|
|
|
# 2. 类别独立搜索
|
|
if category and category.strip():
|
|
query = query.filter(MaterialBase.category == category.strip())
|
|
|
|
# 3. 类型独立搜索
|
|
if material_type and material_type.strip():
|
|
query = query.filter(MaterialBase.material_type == material_type.strip())
|
|
|
|
# ============================================================
|
|
# 【行级数据隔离】基于 JWT 中的 company_name 进行过滤
|
|
# ============================================================
|
|
claims = get_jwt()
|
|
user_role = claims.get('role', '').upper() if claims.get('role') else ''
|
|
user_company = claims.get('company_name', '')
|
|
|
|
if user_role != 'SUPER_ADMIN':
|
|
# 普通用户:强制隔离!无视前端传的 company 参数
|
|
if user_company:
|
|
query = query.filter(MaterialBase.company_name == user_company)
|
|
else:
|
|
# 超级管理员:允许跨公司视角
|
|
if company and company.strip():
|
|
query = query.filter(MaterialBase.company_name == company.strip())
|
|
|
|
# 4. 状态筛选
|
|
if not statuses: statuses = ['在库', '借库']
|
|
if '已出库' in statuses:
|
|
query = query.filter(StockBuy.status.in_(statuses))
|
|
else:
|
|
query = query.filter(and_(StockBuy.status.in_(statuses), StockBuy.stock_quantity > 0))
|
|
|
|
# 5. 高级动态筛选
|
|
if advanced_filters:
|
|
allowed_fields = {
|
|
'company_name': MaterialBase.company_name,
|
|
'material_name': MaterialBase.name,
|
|
'material_type': MaterialBase.material_type,
|
|
'category': MaterialBase.category,
|
|
'spec_model': MaterialBase.spec_model,
|
|
'unit': MaterialBase.unit,
|
|
'sku': StockBuy.sku,
|
|
'barcode': StockBuy.barcode,
|
|
'batch_number': StockBuy.batch_number,
|
|
'serial_number': StockBuy.serial_number,
|
|
'warehouse_location': StockBuy.warehouse_location,
|
|
'status': StockBuy.status,
|
|
'inspection_status': StockBuy.inspection_status,
|
|
'qty_inbound': StockBuy.in_quantity,
|
|
'qty_stock': StockBuy.stock_quantity,
|
|
'qty_available': StockBuy.available_quantity,
|
|
'unit_price': StockBuy.pre_tax_unit_price,
|
|
'total_price': StockBuy.total_price,
|
|
'tax_rate': StockBuy.tax_rate,
|
|
'currency': StockBuy.currency,
|
|
'exchange_rate': StockBuy.exchange_rate,
|
|
'supplier_name': StockBuy.supplier_name,
|
|
'purchaser': StockBuy.buyer_name,
|
|
'purchaser_email': StockBuy.buyer_email,
|
|
'source_link': StockBuy.original_link,
|
|
'detail_link': StockBuy.detail_link,
|
|
}
|
|
filter_conditions = []
|
|
for condition in advanced_filters:
|
|
field = condition.get('field')
|
|
operator = condition.get('operator')
|
|
value = condition.get('value')
|
|
if not field or not operator or value is None:
|
|
continue
|
|
column = allowed_fields.get(field)
|
|
if column is None:
|
|
continue
|
|
if operator == 'eq':
|
|
filter_conditions.append(column == value)
|
|
elif operator == 'ne':
|
|
filter_conditions.append(column != value)
|
|
elif operator == 'contains':
|
|
filter_conditions.append(column.ilike(f'%{value}%'))
|
|
elif operator == 'not_contains':
|
|
filter_conditions.append(~column.ilike(f'%{value}%'))
|
|
elif operator == 'ge':
|
|
try:
|
|
num_val = float(value)
|
|
filter_conditions.append(column >= num_val)
|
|
except ValueError:
|
|
continue
|
|
elif operator == 'le':
|
|
try:
|
|
num_val = float(value)
|
|
filter_conditions.append(column <= num_val)
|
|
except ValueError:
|
|
continue
|
|
if filter_conditions:
|
|
query = query.filter(and_(*filter_conditions))
|
|
|
|
# 6. 排序处理
|
|
if order_by:
|
|
sort_field_map = {
|
|
'company_name': MaterialBase.company_name,
|
|
'material_name': MaterialBase.name,
|
|
'material_type': MaterialBase.material_type,
|
|
'category': MaterialBase.category,
|
|
'spec_model': MaterialBase.spec_model,
|
|
'unit': MaterialBase.unit,
|
|
'sku': StockBuy.sku,
|
|
'barcode': StockBuy.barcode,
|
|
'inbound_date': StockBuy.in_date,
|
|
'serial_number': StockBuy.serial_number,
|
|
'batch_number': StockBuy.batch_number,
|
|
'status': StockBuy.status,
|
|
'inspection_status': StockBuy.inspection_status,
|
|
'qty_inbound': StockBuy.in_quantity,
|
|
'qty_stock': StockBuy.stock_quantity,
|
|
'qty_available': StockBuy.available_quantity,
|
|
'warehouse_loc': StockBuy.warehouse_location,
|
|
'unit_price': StockBuy.pre_tax_unit_price,
|
|
'total_price': StockBuy.total_price,
|
|
'tax_rate': StockBuy.tax_rate,
|
|
'currency': StockBuy.currency,
|
|
'exchange_rate': StockBuy.exchange_rate,
|
|
'supplier_name': StockBuy.supplier_name,
|
|
'purchaser': StockBuy.buyer_name,
|
|
'purchaser_email': StockBuy.buyer_email,
|
|
'source_link': StockBuy.original_link,
|
|
'detail_link': StockBuy.detail_link,
|
|
}
|
|
column = sort_field_map.get(order_by)
|
|
if column:
|
|
if is_asc == 'asc':
|
|
query = query.order_by(column.asc())
|
|
elif is_asc == 'desc':
|
|
query = query.order_by(column.desc())
|
|
else:
|
|
# 默认排序
|
|
query = query.order_by(StockBuy.in_date.desc())
|
|
|
|
pagination = query.paginate(page=page, per_page=limit, error_out=False)
|
|
items = []
|
|
for item in pagination.items:
|
|
items.append(item.to_dict()) # 直接使用 model 的 to_dict
|
|
return {"total": pagination.total, "items": items}
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return {"total": 0, "items": []}
|
|
|
|
# ============================================================
|
|
# 6. 获取筛选选项(类别、类型、公司)并排序
|
|
# ============================================================
|
|
@staticmethod
|
|
def get_filter_options():
|
|
try:
|
|
# 类别
|
|
categories = db.session.query(MaterialBase.category) \
|
|
.filter(MaterialBase.category != None, MaterialBase.category != '') \
|
|
.distinct().all()
|
|
sorted_categories = sorted([r[0] for r in categories])
|
|
|
|
# 类型
|
|
types = db.session.query(MaterialBase.material_type) \
|
|
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
|
|
.distinct().all()
|
|
sorted_types = sorted([r[0] for r in types])
|
|
|
|
# [新增] 公司
|
|
companies = db.session.query(MaterialBase.company_name) \
|
|
.filter(MaterialBase.company_name != None, MaterialBase.company_name != '') \
|
|
.distinct().all()
|
|
sorted_companies = sorted([r[0] for r in companies])
|
|
|
|
return {
|
|
"categories": sorted_categories,
|
|
"types": sorted_types,
|
|
"companies": sorted_companies
|
|
}
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return {"categories": [], "types": [], "companies": []}
|
|
|
|
# 7-10 建议类接口保持不变
|
|
@staticmethod
|
|
def get_history_suppliers(base_id):
|
|
return [r[0] for r in db.session.query(StockBuy.supplier_name).filter(StockBuy.base_id == base_id,
|
|
StockBuy.supplier_name != '').distinct().all()]
|
|
|
|
@staticmethod
|
|
def get_history_purchasers(keyword):
|
|
return [{'value': r.buyer_name, 'email': r.buyer_email} for r in
|
|
db.session.query(StockBuy.buyer_name, StockBuy.buyer_email).filter(
|
|
StockBuy.buyer_name != '').distinct().all()]
|
|
|
|
@staticmethod
|
|
def get_history_links(base_id, type):
|
|
return [r[0] for r in
|
|
db.session.query(StockBuy.original_link if type == 'original' else StockBuy.detail_link).filter(
|
|
StockBuy.base_id == base_id).distinct().all()]
|
|
|
|
@staticmethod
|
|
def get_history_locations(base_id):
|
|
return [r[0] for r in
|
|
db.session.query(StockBuy.warehouse_location).filter(StockBuy.base_id == base_id).distinct().all()]
|
|
|
|
@staticmethod
|
|
def get_last_location_by_base_id(base_id):
|
|
"""
|
|
获取指定物料最近一次入库的库位(跨表查询)
|
|
查询顺序:采购入库 -> 成品入库 -> 半成品入库,返回最新入库的库位
|
|
"""
|
|
from app.models.inbound.semi import StockSemi
|
|
|
|
# 1. 查询采购入库最新记录
|
|
last_buy = StockBuy.query.filter(
|
|
StockBuy.base_id == base_id
|
|
).order_by(StockBuy.in_date.desc()).first()
|
|
|
|
# 2. 查询成品入库最新记录
|
|
last_product = StockProduct.query.filter(
|
|
StockProduct.base_id == base_id
|
|
).order_by(StockProduct.production_date.desc()).first()
|
|
|
|
# 3. 查询半成品入库最新记录
|
|
last_semi = StockSemi.query.filter(
|
|
StockSemi.base_id == base_id
|
|
).order_by(StockSemi.production_date.desc()).first()
|
|
|
|
# 比较三个表中的最新入库时间,返回最新的库位
|
|
candidates = []
|
|
if last_buy and last_buy.warehouse_location:
|
|
candidates.append((last_buy.in_date, last_buy.warehouse_location))
|
|
if last_product and last_product.warehouse_location:
|
|
candidates.append((last_product.production_date, last_product.warehouse_location))
|
|
if last_semi and last_semi.warehouse_location:
|
|
candidates.append((last_semi.production_date, last_semi.warehouse_location))
|
|
|
|
if not candidates:
|
|
return ""
|
|
|
|
# 按时间倒序排序,返回最新的库位
|
|
candidates.sort(key=lambda x: x[0] if x[0] else datetime.min, reverse=True)
|
|
return candidates[0][1] if candidates[0][1] else ""
|