Files
KCGL/inventory-backend/app/services/inbound/product_service.py

620 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/product_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.outbound import TransOutbound
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
from sqlalchemy.exc import IntegrityError
import traceback
import json
class ProductInboundService:
@staticmethod
def _check_unique(serial_number, exclude_id=None):
from app.models.inbound.product import StockProduct
if serial_number:
query = StockProduct.query.filter(StockProduct.serial_number == serial_number)
if exclude_id:
query = query.filter(StockProduct.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.base.name if (hasattr(exists, 'base') and exists.base) else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被成品 [{occupied_name}] 占用,请核查。")
@staticmethod
def search_base_material(keyword, page=1, limit=50):
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
query = query.filter(
or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw)
)
)
query = query.order_by(MaterialBase.id.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
results = []
for item in pagination.items:
results.append({
'id': item.id,
'company_name': item.company_name,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return {
"items": results,
"total": pagination.total,
"page": page,
"has_next": pagination.has_next
}
except Exception:
traceback.print_exc()
return {"items": [], "total": 0, "page": 1, "has_next": False}
@staticmethod
def search_bom_options(keyword):
from app.models.bom import BomTable
try:
query = db.session.query(
BomTable.bom_no,
BomTable.version,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec')
).join(MaterialBase, BomTable.parent_id == MaterialBase.id)
if hasattr(BomTable, 'is_enabled'):
query = query.filter(BomTable.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
query = query.filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
)
)
results = query.distinct().limit(20).all()
return [{
'bom_no': r.bom_no,
'version': r.version,
'parent_name': r.parent_name,
'parent_spec': r.parent_spec or ''
} for r in results]
except Exception:
traceback.print_exc()
return []
@staticmethod
def handle_inbound(data):
from app.models.inbound.product import StockProduct
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
if not material.is_enabled:
raise ValueError(f"物料【{material.name}】已停用,无法办理新入库。")
ProductInboundService._check_unique(
serial_number=data.get('serial_number')
)
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
current_time.hour, current_time.minute, current_time.second)
except:
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
raw_cost = float(data.get('raw_material_cost') or 0)
manual_cost = 0.0 # 字段已弃用,保持向后兼容
unit_total_cost = float(data.get('unit_total_cost') or raw_cost or 0)
total_price = unit_total_cost * in_qty
p_start = data.get('production_start_time', '')
p_end = data.get('production_end_time', '')
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except:
next_global_id = None
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
final_barcode = data.get('barcode') or generated_sku
photo_list = data.get('product_photo', [])
quality_list = data.get('quality_report_link', [])
inspection_list = data.get('inspection_report_link', [])
if not isinstance(photo_list, list): photo_list = []
if not isinstance(quality_list, list): quality_list = []
if not isinstance(inspection_list, list): inspection_list = []
new_stock = StockProduct(
base_id=material.id,
global_print_id=next_global_id,
sku=generated_sku,
production_date=in_date_val,
barcode=final_barcode,
serial_number=data.get('serial_number'),
status=data.get('status', '在库'),
warehouse_location=data.get('warehouse_location'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_time_range=time_range,
raw_material_cost=raw_cost,
manual_cost=unit_total_cost,
quality_status=data.get('quality_status', '合格'),
product_photo=json.dumps(photo_list),
quality_report_link=json.dumps(quality_list),
inspection_report_link=json.dumps(inspection_list),
detail_link=data.get('detail_link'),
remark=data.get('remark'),
sale_price=float(data.get('sale_price') or 0),
order_id=data.get('order_id')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_inbound(stock_id, data):
from app.models.inbound.product import StockProduct
try:
stock = StockProduct.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
if 'serial_number' in data:
ProductInboundService._check_unique(
serial_number=data['serial_number'],
exclude_id=stock_id
)
fields = [
'barcode', 'serial_number', 'warehouse_location',
'status', 'quality_status', 'bom_code', 'bom_version',
'work_order_code', 'production_manager',
'detail_link', 'order_id', 'remark'
]
for f in fields:
if f in data: setattr(stock, f, data[f])
if 'product_photo' in data:
imgs = data['product_photo']
if isinstance(imgs, list): stock.product_photo = json.dumps(imgs)
if 'quality_report_link' in data:
imgs = data['quality_report_link']
if isinstance(imgs, list): stock.quality_report_link = json.dumps(imgs)
if 'inspection_report_link' in data:
imgs = data['inspection_report_link']
if isinstance(imgs, list): stock.inspection_report_link = json.dumps(imgs)
if 'sale_price' in data: stock.sale_price = float(data['sale_price'])
if 'raw_material_cost' in data: stock.raw_material_cost = float(data['raw_material_cost'])
if 'unit_total_cost' in data: stock.manual_cost = float(data['unit_total_cost']) # 映射到 manual_cost 物理字段
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
diff = new_qty - float(stock.in_quantity)
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
if 'production_start_time' in data or 'production_end_time' in data:
old_range = stock.production_time_range or " ~ "
parts = old_range.split(' ~ ')
old_start = parts[0] if len(parts) > 0 else ''
old_end = parts[1] if len(parts) > 1 else ''
start = data.get('production_start_time', old_start)
end = data.get('production_end_time', old_end)
stock.production_time_range = f"{start} ~ {end}"
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_inbound(stock_id):
from app.models.inbound.product import StockProduct
try:
stock = StockProduct.query.get(stock_id)
if stock:
# 提前获取物料名称用于审计日志(通过外键关系 base.name 获取)
material_name = stock.base.name if stock.base else '未知物料'
db.session.delete(stock)
db.session.commit()
return material_name
return None
except IntegrityError:
db.session.rollback()
raise ValueError("该入库单已被出库、盘点或借还等业务关联,为保证账目完整,禁止删除!")
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_outbound_history(stock_id):
try:
records = TransOutbound.query.filter_by(
source_table='stock_product', stock_id=stock_id
).order_by(TransOutbound.outbound_time.desc()).all()
return [r.to_dict() for r in records]
except:
return []
@staticmethod
def get_list(page, limit, keyword=None, sku=None, search_field='all', statuses=None, category=None, material_type=None, company=None,
order_by_column=None, is_asc=None, advanced_filters=None):
from app.models.inbound.product import StockProduct
try:
query = db.session.query(StockProduct).outerjoin(MaterialBase, StockProduct.base_id == MaterialBase.id)
# 1. 通用关键词搜索(支持指定字段精准搜索)
if keyword:
kw = f'%{keyword}%'
if search_field == 'name':
query = query.filter(MaterialBase.name.ilike(kw))
elif search_field == 'spec':
query = query.filter(MaterialBase.spec_model.ilike(kw))
elif search_field == 'common_name':
query = query.filter(MaterialBase.common_name.ilike(kw))
elif search_field == 'serial_number':
query = query.filter(StockProduct.serial_number.ilike(kw))
elif search_field == 'work_order_code':
query = query.filter(StockProduct.work_order_code.ilike(kw))
elif search_field == 'order_id':
query = query.filter(StockProduct.order_id.ilike(kw))
else: # 'all' 默认全局模糊匹配
query = query.filter(or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw),
StockProduct.serial_number.ilike(kw),
StockProduct.work_order_code.ilike(kw),
StockProduct.order_id.ilike(kw)
))
# 1.1 SKU 独立搜索
if sku and sku.strip():
sku_str = f'%{sku.strip()}%'
query = query.filter(StockProduct.sku.ilike(sku_str))
if category and category.strip():
query = query.filter(MaterialBase.category == category.strip())
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())
if not statuses:
statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockProduct.status.in_(statuses))
else:
query = query.filter(
and_(
StockProduct.status.in_(statuses),
StockProduct.stock_quantity > 0
)
)
# 高级动态筛选
if advanced_filters:
if isinstance(advanced_filters, str):
try:
import json
advanced_filters = json.loads(advanced_filters)
except:
advanced_filters = []
if isinstance(advanced_filters, list):
field_mapping = {
'id': StockProduct.id,
'base_id': StockProduct.base_id,
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'spec_model': MaterialBase.spec_model,
'category': MaterialBase.category,
'material_type': MaterialBase.material_type,
'unit': MaterialBase.unit,
'sku': StockProduct.sku,
'inbound_date': StockProduct.production_date,
'barcode': StockProduct.barcode,
'serial_number': StockProduct.serial_number,
'batch_number': StockProduct.serial_number,
'status': StockProduct.status,
'quality_status': StockProduct.quality_status,
'in_quantity': StockProduct.in_quantity,
'stock_quantity': StockProduct.stock_quantity,
'available_quantity': StockProduct.available_quantity,
'warehouse_location': StockProduct.warehouse_location,
'bom_code': StockProduct.bom_code,
'bom_version': StockProduct.bom_version,
'work_order_code': StockProduct.work_order_code,
'raw_material_cost': StockProduct.raw_material_cost,
'unit_total_cost': StockProduct.manual_cost,
'order_id': StockProduct.order_id,
'sale_price': StockProduct.sale_price,
'production_manager': StockProduct.production_manager,
'total_price': (StockProduct.manual_cost * StockProduct.in_quantity),
}
for cond in advanced_filters:
field = cond.get('field')
operator = cond.get('operator')
value = cond.get('value')
if not field or not operator:
continue
model_field = field_mapping.get(field)
if model_field is None:
continue
# 防止 SQL 注入,只允许映射的字段
if operator == '=':
query = query.filter(model_field == value)
elif operator == '!=':
query = query.filter(model_field != value)
elif operator == 'like':
query = query.filter(model_field.ilike(f'%{value}%'))
elif operator == 'not_like':
query = query.filter(~model_field.ilike(f'%{value}%'))
elif operator == '>':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field > float(value))
elif operator == '<':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field < float(value))
elif operator == '>=':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field >= float(value))
elif operator == '<=':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field <= float(value))
# 动态排序
order_field = None
if order_by_column and is_asc is not None:
order_mapping = {
'id': StockProduct.id,
'base_id': StockProduct.base_id,
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'category': MaterialBase.category,
'material_type': MaterialBase.material_type,
'spec_model': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'sku': StockProduct.sku,
'inbound_date': StockProduct.production_date,
'barcode': StockProduct.barcode,
'serial_number': StockProduct.serial_number,
'batch_number': StockProduct.serial_number,
'status': StockProduct.status,
'quality_status': StockProduct.quality_status,
'in_quantity': StockProduct.in_quantity,
'stock_quantity': StockProduct.stock_quantity,
'available_quantity': StockProduct.available_quantity,
'warehouse_location': StockProduct.warehouse_location,
'bom_code': StockProduct.bom_code,
'bom_version': StockProduct.bom_version,
'work_order_code': StockProduct.work_order_code,
'raw_material_cost': StockProduct.raw_material_cost,
'unit_total_cost': StockProduct.manual_cost,
'order_id': StockProduct.order_id,
'sale_price': StockProduct.sale_price,
'production_manager': StockProduct.production_manager,
'total_price': (StockProduct.manual_cost * StockProduct.in_quantity),
}
order_field = order_mapping.get(order_by_column)
if order_field is not None:
if is_asc == 'true' or is_asc == True:
query = query.order_by(order_field.asc())
else:
query = query.order_by(order_field.desc())
if order_field is None:
query = query.order_by(StockProduct.production_date.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
current_items = pagination.items
def parse_img(json_str):
if not json_str: return []
try:
return json.loads(json_str) if json_str.startswith('[') else [json_str]
except:
return []
items = []
for item in current_items:
item_dict = item.to_dict()
item_dict['unit_total_cost'] = float(item.manual_cost or 0)
items.append(item_dict)
return {"total": pagination.total, "items": items}
except Exception as e:
traceback.print_exc()
return {"total": 0, "items": []}
@staticmethod
def search_system_users(keyword):
from app.models.system import SysUser
try:
query = SysUser.query.filter(SysUser.status == 'active')
if keyword:
kw = f'%{keyword}%'
query = query.filter(db.or_(
SysUser.username.ilike(kw),
SysUser.email.ilike(kw)
))
query = query.order_by(SysUser.username)
users = []
for u in query.limit(20).all():
users.append({
'value': u.username,
'email': u.email
})
return users
except Exception:
return []
@staticmethod
def get_filter_options():
try:
from app.models.base import MaterialBase
categories = db.session.query(MaterialBase.category).filter(MaterialBase.category != None,
MaterialBase.category != '').distinct().all()
sorted_categories = sorted([r[0] for r in categories])
types = db.session.query(MaterialBase.material_type).filter(MaterialBase.material_type != None,
MaterialBase.material_type != '').distinct().all()
sorted_types = sorted([r[0] for r in types])
companies = db.session.query(MaterialBase.company_name).filter(MaterialBase.company_name != None,
MaterialBase.company_name != '').distinct().all()
sorted_companies = sorted([r[0] for r in companies])
return {
"categories": sorted_categories,
"types": sorted_types,
"companies": sorted_companies
}
except Exception:
import traceback
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
@staticmethod
def get_history_managers(keyword=None):
from app.models.inbound.product import StockProduct
try:
query = db.session.query(StockProduct.production_manager).filter(
StockProduct.production_manager.isnot(None),
StockProduct.production_manager != ''
)
if keyword:
query = query.filter(StockProduct.production_manager.ilike(f'%{keyword}%'))
records = query.distinct().all()
return [r[0] for r in records if r[0]]
except Exception:
traceback.print_exc()
return []
# ============================================================
# 9. BOM 原材料成本自动核算 (新增)
# ============================================================
@staticmethod
def calculate_bom_cost(bom_no, bom_version):
"""
根据 BOM 编号和版本计算原材料总成本
遍历 BOM 子件,使用原生 SQL 查物理表 bom_table取每个子件在采购、半成品、成品三个表中的最高单价乘以用量后累加
"""
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from sqlalchemy import func, text
try:
# 使用原生 SQL 精准查询 bom_table避免模型映射错误
sql = text("""
SELECT child_id, dosage
FROM bom_table
WHERE bom_no = :bom_no AND version = :version
""")
bom_lines = db.session.execute(sql, {'bom_no': bom_no, 'version': bom_version}).fetchall()
total_cost = 0.0
for line in bom_lines:
component_base_id = line[0] # child_id
usage_qty = float(line[1] or 1.0) # dosage
# 1. 查采购表最高价 (不含税)
buy_price = db.session.query(func.max(StockBuy.pre_tax_unit_price)).filter(
StockBuy.base_id == component_base_id
).scalar() or 0.0
# 2. 查半成品表最高价 (单件成本映射存在 manual_cost 里了)
semi_price = db.session.query(func.max(StockSemi.manual_cost)).filter(
StockSemi.base_id == component_base_id
).scalar() or 0.0
# 3. 查成品表最高价 (同样存储在 manual_cost 字段里)
product_price = db.session.query(func.max(StockProduct.manual_cost)).filter(
StockProduct.base_id == component_base_id
).scalar() or 0.0
# 4. 取三个表中的最大值,乘以用量 (dosage)
max_price = max(float(buy_price), float(semi_price), float(product_price))
total_cost += max_price * usage_qty
return round(total_cost, 2)
except Exception as e:
traceback.print_exc()
raise e
@staticmethod
def get_last_location_by_base_id(base_id):
"""
获取指定物料最近一次入库的库位(跨表查询)
查询顺序:成品入库 -> 采购入库 -> 半成品入库,返回最新入库的库位
"""
from app.models.inbound.product import StockProduct
# 1. 查询成品入库最新记录
last_product = StockProduct.query.filter(
StockProduct.base_id == base_id
).order_by(StockProduct.production_date.desc()).first()
# 2. 查询采购入库最新记录
last_buy = StockBuy.query.filter(
StockBuy.base_id == base_id
).order_by(StockBuy.in_date.desc()).first()
# 3. 查询半成品入库最新记录
last_semi = StockSemi.query.filter(
StockSemi.base_id == base_id
).order_by(StockSemi.production_date.desc()).first()
# 比较三个表中的最新入库时间,返回最新的库位
candidates = []
if last_product and last_product.warehouse_location:
candidates.append((last_product.production_date, last_product.warehouse_location))
if last_buy and last_buy.warehouse_location:
candidates.append((last_buy.in_date, last_buy.warehouse_location))
if last_semi and last_semi.warehouse_location:
candidates.append((last_semi.production_date, last_semi.warehouse_location))
if not candidates:
return ""
# 按时间倒序排序,返回最新的库位
candidates.sort(key=lambda x: x[0] if x[0] else datetime.min, reverse=True)
return candidates[0][1] if candidates[0][1] else ""