579 lines
26 KiB
Python
579 lines
26 KiB
Python
# app/services/inbound/product_service.py
|
||
from app.extensions import db
|
||
from app.models.base import MaterialBase
|
||
from app.models.outbound import TransOutbound
|
||
from datetime import datetime, timedelta, timezone
|
||
from sqlalchemy import or_, func, text, and_
|
||
from sqlalchemy.exc import IntegrityError
|
||
import traceback
|
||
import json
|
||
|
||
|
||
class ProductInboundService:
|
||
|
||
@staticmethod
|
||
def _check_unique(serial_number, exclude_id=None):
|
||
from app.models.inbound.product import StockProduct
|
||
if serial_number:
|
||
query = StockProduct.query.filter(StockProduct.serial_number == serial_number)
|
||
if exclude_id:
|
||
query = query.filter(StockProduct.id != exclude_id)
|
||
exists = query.first()
|
||
if exists:
|
||
occupied_name = exists.base.name if (hasattr(exists, 'base') and exists.base) else "未知物料"
|
||
raise ValueError(f"序列号【{serial_number}】已存在!被成品 [{occupied_name}] 占用,请核查。")
|
||
|
||
@staticmethod
|
||
def search_base_material(keyword, page=1, limit=50):
|
||
try:
|
||
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
|
||
if keyword:
|
||
kw = f'%{keyword}%'
|
||
query = query.filter(
|
||
or_(
|
||
MaterialBase.name.ilike(kw),
|
||
MaterialBase.spec_model.ilike(kw),
|
||
MaterialBase.company_name.ilike(kw)
|
||
)
|
||
)
|
||
query = query.order_by(MaterialBase.id.desc())
|
||
pagination = query.paginate(page=page, per_page=limit, error_out=False)
|
||
results = []
|
||
for item in pagination.items:
|
||
results.append({
|
||
'id': item.id,
|
||
'company_name': item.company_name,
|
||
'name': item.name,
|
||
'spec': item.spec_model,
|
||
'category': item.category,
|
||
'unit': item.unit,
|
||
'type': item.material_type,
|
||
'status': '启用'
|
||
})
|
||
return {
|
||
"items": results,
|
||
"total": pagination.total,
|
||
"page": page,
|
||
"has_next": pagination.has_next
|
||
}
|
||
except Exception:
|
||
traceback.print_exc()
|
||
return {"items": [], "total": 0, "page": 1, "has_next": False}
|
||
|
||
@staticmethod
|
||
def search_bom_options(keyword):
|
||
from app.models.bom import BomTable
|
||
try:
|
||
query = db.session.query(
|
||
BomTable.bom_no,
|
||
BomTable.version,
|
||
MaterialBase.name.label('parent_name'),
|
||
MaterialBase.spec_model.label('parent_spec')
|
||
).join(MaterialBase, BomTable.parent_id == MaterialBase.id)
|
||
|
||
if hasattr(BomTable, 'is_enabled'):
|
||
query = query.filter(BomTable.is_enabled == True)
|
||
|
||
if keyword:
|
||
kw = f'%{keyword}%'
|
||
query = query.filter(
|
||
or_(
|
||
BomTable.bom_no.ilike(kw),
|
||
MaterialBase.name.ilike(kw),
|
||
MaterialBase.spec_model.ilike(kw)
|
||
)
|
||
)
|
||
|
||
results = query.distinct().limit(20).all()
|
||
|
||
return [{
|
||
'bom_no': r.bom_no,
|
||
'version': r.version,
|
||
'parent_name': r.parent_name,
|
||
'parent_spec': r.parent_spec or ''
|
||
} for r in results]
|
||
except Exception:
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
@staticmethod
|
||
def handle_inbound(data):
|
||
from app.models.inbound.product import StockProduct
|
||
try:
|
||
base_id = data.get('base_id')
|
||
if not base_id: raise ValueError("必须选择基础物料")
|
||
material = MaterialBase.query.get(base_id)
|
||
if not material: raise ValueError("物料不存在")
|
||
if not material.is_enabled:
|
||
raise ValueError(f"物料【{material.name}】已停用,无法办理新入库。")
|
||
|
||
ProductInboundService._check_unique(
|
||
serial_number=data.get('serial_number')
|
||
)
|
||
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
|
||
in_date_val = current_time
|
||
if data.get('in_date'):
|
||
try:
|
||
date_str = str(data['in_date'])
|
||
if len(date_str) > 10:
|
||
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
|
||
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
|
||
current_time.hour, current_time.minute, current_time.second)
|
||
except:
|
||
in_date_val = current_time
|
||
|
||
in_qty = float(data.get('in_quantity') or 0)
|
||
raw_cost = float(data.get('raw_material_cost') or 0)
|
||
manual_cost = 0.0 # 字段已弃用,保持向后兼容
|
||
unit_total_cost = float(data.get('unit_total_cost') or raw_cost or 0)
|
||
total_price = unit_total_cost * in_qty
|
||
|
||
p_start = data.get('production_start_time', '')
|
||
p_end = data.get('production_end_time', '')
|
||
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
|
||
|
||
try:
|
||
seq_sql = text("SELECT nextval('global_print_seq')")
|
||
result = db.session.execute(seq_sql)
|
||
next_global_id = result.scalar()
|
||
except:
|
||
next_global_id = None
|
||
|
||
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
|
||
final_barcode = data.get('barcode') or generated_sku
|
||
|
||
photo_list = data.get('product_photo', [])
|
||
quality_list = data.get('quality_report_link', [])
|
||
inspection_list = data.get('inspection_report_link', [])
|
||
if not isinstance(photo_list, list): photo_list = []
|
||
if not isinstance(quality_list, list): quality_list = []
|
||
if not isinstance(inspection_list, list): inspection_list = []
|
||
|
||
new_stock = StockProduct(
|
||
base_id=material.id,
|
||
global_print_id=next_global_id,
|
||
sku=generated_sku,
|
||
production_date=in_date_val,
|
||
barcode=final_barcode,
|
||
serial_number=data.get('serial_number'),
|
||
status=data.get('status', '在库'),
|
||
warehouse_location=data.get('warehouse_location'),
|
||
in_quantity=in_qty,
|
||
stock_quantity=in_qty,
|
||
available_quantity=in_qty,
|
||
bom_code=data.get('bom_code'),
|
||
bom_version=data.get('bom_version'),
|
||
work_order_code=data.get('work_order_code'),
|
||
production_manager=data.get('production_manager'),
|
||
production_time_range=time_range,
|
||
raw_material_cost=raw_cost,
|
||
manual_cost=unit_total_cost,
|
||
quality_status=data.get('quality_status', '合格'),
|
||
product_photo=json.dumps(photo_list),
|
||
quality_report_link=json.dumps(quality_list),
|
||
inspection_report_link=json.dumps(inspection_list),
|
||
detail_link=data.get('detail_link'),
|
||
remark=data.get('remark'),
|
||
sale_price=float(data.get('sale_price') or 0),
|
||
order_id=data.get('order_id')
|
||
)
|
||
db.session.add(new_stock)
|
||
db.session.commit()
|
||
return new_stock
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def update_inbound(stock_id, data):
|
||
from app.models.inbound.product import StockProduct
|
||
try:
|
||
stock = StockProduct.query.get(stock_id)
|
||
if not stock: raise ValueError("记录不存在")
|
||
|
||
if 'serial_number' in data:
|
||
ProductInboundService._check_unique(
|
||
serial_number=data['serial_number'],
|
||
exclude_id=stock_id
|
||
)
|
||
|
||
fields = [
|
||
'barcode', 'serial_number', 'warehouse_location',
|
||
'status', 'quality_status', 'bom_code', 'bom_version',
|
||
'work_order_code', 'production_manager',
|
||
'detail_link', 'order_id', 'remark'
|
||
]
|
||
for f in fields:
|
||
if f in data: setattr(stock, f, data[f])
|
||
|
||
if 'product_photo' in data:
|
||
imgs = data['product_photo']
|
||
if isinstance(imgs, list): stock.product_photo = json.dumps(imgs)
|
||
if 'quality_report_link' in data:
|
||
imgs = data['quality_report_link']
|
||
if isinstance(imgs, list): stock.quality_report_link = json.dumps(imgs)
|
||
if 'inspection_report_link' in data:
|
||
imgs = data['inspection_report_link']
|
||
if isinstance(imgs, list): stock.inspection_report_link = json.dumps(imgs)
|
||
|
||
if 'sale_price' in data: stock.sale_price = float(data['sale_price'])
|
||
if 'raw_material_cost' in data: stock.raw_material_cost = float(data['raw_material_cost'])
|
||
if 'unit_total_cost' in data: stock.manual_cost = float(data['unit_total_cost']) # 映射到 manual_cost 物理字段
|
||
|
||
if 'in_quantity' in data:
|
||
new_qty = float(data['in_quantity'])
|
||
diff = new_qty - float(stock.in_quantity)
|
||
stock.in_quantity = new_qty
|
||
stock.stock_quantity = float(stock.stock_quantity) + diff
|
||
stock.available_quantity = float(stock.available_quantity) + diff
|
||
|
||
|
||
if 'production_start_time' in data or 'production_end_time' in data:
|
||
old_range = stock.production_time_range or " ~ "
|
||
parts = old_range.split(' ~ ')
|
||
old_start = parts[0] if len(parts) > 0 else ''
|
||
old_end = parts[1] if len(parts) > 1 else ''
|
||
start = data.get('production_start_time', old_start)
|
||
end = data.get('production_end_time', old_end)
|
||
stock.production_time_range = f"{start} ~ {end}"
|
||
|
||
db.session.commit()
|
||
return stock
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def delete_inbound(stock_id):
|
||
from app.models.inbound.product import StockProduct
|
||
try:
|
||
stock = StockProduct.query.get(stock_id)
|
||
if stock:
|
||
# 提前获取物料名称用于审计日志(通过外键关系 base.name 获取)
|
||
material_name = stock.base.name if stock.base else '未知物料'
|
||
db.session.delete(stock)
|
||
db.session.commit()
|
||
return material_name
|
||
return None
|
||
except IntegrityError:
|
||
db.session.rollback()
|
||
raise ValueError("该入库单已被出库、盘点或借还等业务关联,为保证账目完整,禁止删除!")
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
raise e
|
||
|
||
@staticmethod
|
||
def get_outbound_history(stock_id):
|
||
try:
|
||
records = TransOutbound.query.filter_by(
|
||
source_table='stock_product', stock_id=stock_id
|
||
).order_by(TransOutbound.outbound_time.desc()).all()
|
||
return [r.to_dict() for r in records]
|
||
except:
|
||
return []
|
||
|
||
@staticmethod
|
||
def get_list(page, limit, keyword=None, sku=None, search_field='all', statuses=None, category=None, material_type=None, company=None,
|
||
order_by_column=None, is_asc=None, advanced_filters=None):
|
||
from app.models.inbound.product import StockProduct
|
||
try:
|
||
query = db.session.query(StockProduct).outerjoin(MaterialBase, StockProduct.base_id == MaterialBase.id)
|
||
# 1. 通用关键词搜索(支持指定字段精准搜索)
|
||
if keyword:
|
||
kw = f'%{keyword}%'
|
||
if search_field == 'name':
|
||
query = query.filter(MaterialBase.name.ilike(kw))
|
||
elif search_field == 'spec':
|
||
query = query.filter(MaterialBase.spec_model.ilike(kw))
|
||
elif search_field == 'common_name':
|
||
query = query.filter(MaterialBase.common_name.ilike(kw))
|
||
elif search_field == 'serial_number':
|
||
query = query.filter(StockProduct.serial_number.ilike(kw))
|
||
elif search_field == 'work_order_code':
|
||
query = query.filter(StockProduct.work_order_code.ilike(kw))
|
||
elif search_field == 'order_id':
|
||
query = query.filter(StockProduct.order_id.ilike(kw))
|
||
else: # 'all' 默认全局模糊匹配
|
||
query = query.filter(or_(
|
||
MaterialBase.name.ilike(kw),
|
||
MaterialBase.spec_model.ilike(kw),
|
||
MaterialBase.company_name.ilike(kw),
|
||
StockProduct.serial_number.ilike(kw),
|
||
StockProduct.work_order_code.ilike(kw),
|
||
StockProduct.order_id.ilike(kw)
|
||
))
|
||
# 1.1 SKU 独立搜索
|
||
if sku and sku.strip():
|
||
sku_str = f'%{sku.strip()}%'
|
||
query = query.filter(StockProduct.sku.ilike(sku_str))
|
||
if category and category.strip():
|
||
query = query.filter(MaterialBase.category == category.strip())
|
||
if material_type and material_type.strip():
|
||
query = query.filter(MaterialBase.material_type == material_type.strip())
|
||
|
||
if company and company.strip():
|
||
query = query.filter(MaterialBase.company_name == company.strip())
|
||
|
||
if not statuses:
|
||
statuses = ['在库', '借库']
|
||
if '已出库' in statuses:
|
||
query = query.filter(StockProduct.status.in_(statuses))
|
||
else:
|
||
query = query.filter(
|
||
and_(
|
||
StockProduct.status.in_(statuses),
|
||
StockProduct.stock_quantity > 0
|
||
)
|
||
)
|
||
|
||
# 高级动态筛选
|
||
if advanced_filters:
|
||
if isinstance(advanced_filters, str):
|
||
try:
|
||
import json
|
||
advanced_filters = json.loads(advanced_filters)
|
||
except:
|
||
advanced_filters = []
|
||
if isinstance(advanced_filters, list):
|
||
field_mapping = {
|
||
'id': StockProduct.id,
|
||
'base_id': StockProduct.base_id,
|
||
'company_name': MaterialBase.company_name,
|
||
'material_name': MaterialBase.name,
|
||
'spec_model': MaterialBase.spec_model,
|
||
'category': MaterialBase.category,
|
||
'material_type': MaterialBase.material_type,
|
||
'unit': MaterialBase.unit,
|
||
'sku': StockProduct.sku,
|
||
'inbound_date': StockProduct.production_date,
|
||
'barcode': StockProduct.barcode,
|
||
'serial_number': StockProduct.serial_number,
|
||
'batch_number': StockProduct.serial_number,
|
||
'status': StockProduct.status,
|
||
'quality_status': StockProduct.quality_status,
|
||
'in_quantity': StockProduct.in_quantity,
|
||
'stock_quantity': StockProduct.stock_quantity,
|
||
'available_quantity': StockProduct.available_quantity,
|
||
'warehouse_location': StockProduct.warehouse_location,
|
||
'bom_code': StockProduct.bom_code,
|
||
'bom_version': StockProduct.bom_version,
|
||
'work_order_code': StockProduct.work_order_code,
|
||
'raw_material_cost': StockProduct.raw_material_cost,
|
||
'unit_total_cost': StockProduct.manual_cost,
|
||
'order_id': StockProduct.order_id,
|
||
'sale_price': StockProduct.sale_price,
|
||
'production_manager': StockProduct.production_manager,
|
||
'total_price': (StockProduct.manual_cost * StockProduct.in_quantity),
|
||
}
|
||
for cond in advanced_filters:
|
||
field = cond.get('field')
|
||
operator = cond.get('operator')
|
||
value = cond.get('value')
|
||
if not field or not operator:
|
||
continue
|
||
model_field = field_mapping.get(field)
|
||
if model_field is None:
|
||
continue
|
||
# 防止 SQL 注入,只允许映射的字段
|
||
if operator == '=':
|
||
query = query.filter(model_field == value)
|
||
elif operator == '!=':
|
||
query = query.filter(model_field != value)
|
||
elif operator == 'like':
|
||
query = query.filter(model_field.ilike(f'%{value}%'))
|
||
elif operator == 'not_like':
|
||
query = query.filter(~model_field.ilike(f'%{value}%'))
|
||
elif operator == '>':
|
||
if value.replace('.', '', 1).isdigit():
|
||
query = query.filter(model_field > float(value))
|
||
elif operator == '<':
|
||
if value.replace('.', '', 1).isdigit():
|
||
query = query.filter(model_field < float(value))
|
||
elif operator == '>=':
|
||
if value.replace('.', '', 1).isdigit():
|
||
query = query.filter(model_field >= float(value))
|
||
elif operator == '<=':
|
||
if value.replace('.', '', 1).isdigit():
|
||
query = query.filter(model_field <= float(value))
|
||
|
||
# 动态排序
|
||
order_field = None
|
||
if order_by_column and is_asc is not None:
|
||
order_mapping = {
|
||
'id': StockProduct.id,
|
||
'base_id': StockProduct.base_id,
|
||
'company_name': MaterialBase.company_name,
|
||
'material_name': MaterialBase.name,
|
||
'category': MaterialBase.category,
|
||
'material_type': MaterialBase.material_type,
|
||
'spec_model': MaterialBase.spec_model,
|
||
'unit': MaterialBase.unit,
|
||
'sku': StockProduct.sku,
|
||
'inbound_date': StockProduct.production_date,
|
||
'barcode': StockProduct.barcode,
|
||
'serial_number': StockProduct.serial_number,
|
||
'batch_number': StockProduct.serial_number,
|
||
'status': StockProduct.status,
|
||
'quality_status': StockProduct.quality_status,
|
||
'in_quantity': StockProduct.in_quantity,
|
||
'stock_quantity': StockProduct.stock_quantity,
|
||
'available_quantity': StockProduct.available_quantity,
|
||
'warehouse_location': StockProduct.warehouse_location,
|
||
'bom_code': StockProduct.bom_code,
|
||
'bom_version': StockProduct.bom_version,
|
||
'work_order_code': StockProduct.work_order_code,
|
||
'raw_material_cost': StockProduct.raw_material_cost,
|
||
'unit_total_cost': StockProduct.manual_cost,
|
||
'order_id': StockProduct.order_id,
|
||
'sale_price': StockProduct.sale_price,
|
||
'production_manager': StockProduct.production_manager,
|
||
'total_price': (StockProduct.manual_cost * StockProduct.in_quantity),
|
||
}
|
||
order_field = order_mapping.get(order_by_column)
|
||
if order_field is not None:
|
||
if is_asc == 'true' or is_asc == True:
|
||
query = query.order_by(order_field.asc())
|
||
else:
|
||
query = query.order_by(order_field.desc())
|
||
if order_field is None:
|
||
query = query.order_by(StockProduct.production_date.desc())
|
||
|
||
pagination = query.paginate(page=page, per_page=limit, error_out=False)
|
||
current_items = pagination.items
|
||
|
||
def parse_img(json_str):
|
||
if not json_str: return []
|
||
try:
|
||
return json.loads(json_str) if json_str.startswith('[') else [json_str]
|
||
except:
|
||
return []
|
||
|
||
items = []
|
||
for item in current_items:
|
||
item_dict = item.to_dict()
|
||
item_dict['unit_total_cost'] = float(item.manual_cost or 0)
|
||
items.append(item_dict)
|
||
return {"total": pagination.total, "items": items}
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
return {"total": 0, "items": []}
|
||
|
||
@staticmethod
|
||
def search_system_users(keyword):
|
||
from app.models.system import SysUser
|
||
try:
|
||
query = SysUser.query.filter(SysUser.status == 'active')
|
||
if keyword:
|
||
kw = f'%{keyword}%'
|
||
query = query.filter(db.or_(
|
||
SysUser.username.ilike(kw),
|
||
SysUser.email.ilike(kw)
|
||
))
|
||
query = query.order_by(SysUser.username)
|
||
users = []
|
||
for u in query.limit(20).all():
|
||
users.append({
|
||
'value': u.username,
|
||
'email': u.email
|
||
})
|
||
return users
|
||
except Exception:
|
||
return []
|
||
|
||
@staticmethod
|
||
def get_filter_options():
|
||
try:
|
||
from app.models.base import MaterialBase
|
||
categories = db.session.query(MaterialBase.category).filter(MaterialBase.category != None,
|
||
MaterialBase.category != '').distinct().all()
|
||
sorted_categories = sorted([r[0] for r in categories])
|
||
|
||
types = db.session.query(MaterialBase.material_type).filter(MaterialBase.material_type != None,
|
||
MaterialBase.material_type != '').distinct().all()
|
||
sorted_types = sorted([r[0] for r in types])
|
||
|
||
companies = db.session.query(MaterialBase.company_name).filter(MaterialBase.company_name != None,
|
||
MaterialBase.company_name != '').distinct().all()
|
||
sorted_companies = sorted([r[0] for r in companies])
|
||
|
||
return {
|
||
"categories": sorted_categories,
|
||
"types": sorted_types,
|
||
"companies": sorted_companies
|
||
}
|
||
except Exception:
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {"categories": [], "types": [], "companies": []}
|
||
|
||
@staticmethod
|
||
def get_history_managers(keyword=None):
|
||
from app.models.inbound.product import StockProduct
|
||
try:
|
||
query = db.session.query(StockProduct.production_manager).filter(
|
||
StockProduct.production_manager.isnot(None),
|
||
StockProduct.production_manager != ''
|
||
)
|
||
if keyword:
|
||
query = query.filter(StockProduct.production_manager.ilike(f'%{keyword}%'))
|
||
records = query.distinct().all()
|
||
return [r[0] for r in records if r[0]]
|
||
except Exception:
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
# ============================================================
|
||
# 9. BOM 原材料成本自动核算 (新增)
|
||
# ============================================================
|
||
@staticmethod
|
||
def calculate_bom_cost(bom_no, bom_version):
|
||
"""
|
||
根据 BOM 编号和版本计算原材料总成本
|
||
遍历 BOM 子件,使用原生 SQL 查物理表 bom_table,取每个子件在采购、半成品、成品三个表中的最高单价,乘以用量后累加
|
||
"""
|
||
from app.models.inbound.buy import StockBuy
|
||
from app.models.inbound.semi import StockSemi
|
||
from app.models.inbound.product import StockProduct
|
||
from sqlalchemy import func, text
|
||
try:
|
||
# 使用原生 SQL 精准查询 bom_table,避免模型映射错误
|
||
sql = text("""
|
||
SELECT child_id, dosage
|
||
FROM bom_table
|
||
WHERE bom_no = :bom_no AND version = :version
|
||
""")
|
||
bom_lines = db.session.execute(sql, {'bom_no': bom_no, 'version': bom_version}).fetchall()
|
||
|
||
total_cost = 0.0
|
||
for line in bom_lines:
|
||
component_base_id = line[0] # child_id
|
||
usage_qty = float(line[1] or 1.0) # dosage
|
||
|
||
# 1. 查采购表最高价 (不含税)
|
||
buy_price = db.session.query(func.max(StockBuy.pre_tax_unit_price)).filter(
|
||
StockBuy.base_id == component_base_id
|
||
).scalar() or 0.0
|
||
|
||
# 2. 查半成品表最高价 (单件成本映射存在 manual_cost 里了)
|
||
semi_price = db.session.query(func.max(StockSemi.manual_cost)).filter(
|
||
StockSemi.base_id == component_base_id
|
||
).scalar() or 0.0
|
||
|
||
# 3. 查成品表最高价 (同样存储在 manual_cost 字段里)
|
||
product_price = db.session.query(func.max(StockProduct.manual_cost)).filter(
|
||
StockProduct.base_id == component_base_id
|
||
).scalar() or 0.0
|
||
|
||
# 4. 取三个表中的最大值,乘以用量 (dosage)
|
||
max_price = max(float(buy_price), float(semi_price), float(product_price))
|
||
total_cost += max_price * usage_qty
|
||
|
||
return round(total_cost, 2)
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
raise e
|