Files
KCGL/inventory-backend/app/services/inbound/semi_service.py

650 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/semi_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.outbound import TransOutbound
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
import traceback
import json
class SemiInboundService:
@staticmethod
def _check_unique(base_id, serial_number, batch_number, exclude_id=None):
from app.models.inbound.semi import StockSemi
if serial_number:
query = StockSemi.query.filter(StockSemi.serial_number == serial_number)
if exclude_id:
query = query.filter(StockSemi.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.base.name if (hasattr(exists, 'base') and exists.base) else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被半成品 [{occupied_name}] 占用,请核查。")
if batch_number and base_id:
query = StockSemi.query.filter(
StockSemi.base_id == base_id,
StockSemi.batch_number == batch_number
)
if exclude_id:
query = query.filter(StockSemi.id != exclude_id)
if query.first():
raise ValueError(f"该物料已存在批号【{batch_number}】,请勿重复建单,建议在原批次上追加库存。")
@staticmethod
def search_base_material(keyword, page=1, limit=50):
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
query = query.filter(
or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw)
)
)
query = query.order_by(MaterialBase.id.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
results = []
for item in pagination.items:
results.append({
'id': item.id,
'company_name': item.company_name,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return {"items": results, "total": pagination.total, "page": page, "has_next": pagination.has_next}
except Exception as e:
traceback.print_exc()
return {"items": [], "total": 0, "page": 1, "has_next": False}
@staticmethod
def search_bom_options(keyword):
from app.models.bom import BomTable
try:
query = db.session.query(
BomTable.bom_no,
BomTable.version,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec')
).join(MaterialBase, BomTable.parent_id == MaterialBase.id)
if hasattr(BomTable, 'is_enabled'):
query = query.filter(BomTable.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
query = query.filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
)
)
results = query.distinct().limit(20).all()
return [{
'bom_no': r.bom_no,
'version': r.version,
'parent_name': r.parent_name,
'parent_spec': r.parent_spec or ''
} for r in results]
except Exception:
traceback.print_exc()
return []
@staticmethod
def handle_inbound(data):
from app.models.inbound.semi import StockSemi
try:
base_id = data.get('base_id')
if not base_id:
raise ValueError("必须选择基础物料 (缺少 base_id)")
material = MaterialBase.query.get(base_id)
if not material:
raise ValueError(f"ID为 {base_id} 的基础物料不存在")
if not material.is_enabled:
raise ValueError(f"物料【{material.name}】已停用,无法办理新入库。")
SemiInboundService._check_unique(
base_id=base_id,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number')
)
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
current_time.hour, current_time.minute, current_time.second)
except ValueError:
in_date_val = current_time
p_start = None
p_end = None
if data.get('production_start_time'):
try:
p_start = datetime.strptime(str(data['production_start_time']), '%Y-%m-%d %H:%M:%S')
except:
pass
if data.get('production_end_time'):
try:
p_end = datetime.strptime(str(data['production_end_time']), '%Y-%m-%d %H:%M:%S')
except:
pass
time_range_str = None
raw_range = data.get('production_time_range')
if isinstance(raw_range, list):
time_range_str = " ~ ".join([str(x) for x in raw_range])
elif isinstance(raw_range, str):
time_range_str = raw_range
in_qty = float(data.get('in_quantity') or 0)
raw_cost = float(data.get('raw_material_cost') or 0)
bom_code = data.get('bom_code')
bom_version = data.get('bom_version')
# 根据是否有BOM决定单价
if bom_code and bom_version:
# 使用BOM成本作为单价
unit_cost = SemiInboundService.calculate_bom_cost(bom_code, bom_version)
else:
unit_cost = raw_cost
total_value = unit_cost * in_qty
next_global_id = 0
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except Exception as e:
print("❌ 数据库序列 global_print_seq 不存在请执行SQL创建")
raise e
generated_sku = str(next_global_id).zfill(10)
final_sku = data.get('sku')
if not final_sku:
final_sku = generated_sku
final_barcode = data.get('barcode')
if not final_barcode:
final_barcode = final_sku
arrival_list = data.get('arrival_photo', [])
quality_report_list = data.get('quality_report_link', [])
if not isinstance(arrival_list, list): arrival_list = []
if not isinstance(quality_report_list, list): quality_report_list = []
new_stock = StockSemi(
base_id=material.id,
global_print_id=next_global_id,
sku=final_sku,
production_date=in_date_val,
serial_number=data.get('serial_number'),
batch_number=data.get('batch_number'),
barcode=final_barcode,
status='在库',
quality_status=data.get('quality_status', '合格'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
warehouse_location=data.get('warehouse_location'),
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_start_time=p_start,
production_end_time=p_end,
production_time_range=time_range_str,
raw_material_cost=raw_cost,
manual_cost=unit_cost, # 映射到 manual_cost 物理字段
total_price=total_value,
arrival_photo=json.dumps(arrival_list),
quality_report_link=json.dumps(quality_report_list),
detail_link=data.get('detail_link'),
remark=data.get('remark')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
print("----- SemiInboundService Error -----")
traceback.print_exc()
raise e
@staticmethod
def update_inbound(stock_id, data):
from app.models.inbound.semi import StockSemi
try:
stock = StockSemi.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
new_base_id = data.get('base_id', stock.base_id)
new_sn = data.get('serial_number', stock.serial_number)
new_bn = data.get('batch_number', stock.batch_number)
SemiInboundService._check_unique(
base_id=new_base_id,
serial_number=new_sn,
batch_number=new_bn,
exclude_id=stock_id
)
field_mapping = {
'sku': 'sku',
'barcode': 'barcode',
'warehouse_location': 'warehouse_location',
'serial_number': 'serial_number',
'batch_number': 'batch_number',
'status': 'status',
'quality_status': 'quality_status',
'bom_code': 'bom_code',
'bom_version': 'bom_version',
'work_order_code': 'work_order_code',
'production_manager': 'production_manager',
'detail_link': 'detail_link',
'remark': 'remark'
}
for frontend_key, db_attr in field_mapping.items():
if frontend_key in data:
setattr(stock, db_attr, data[frontend_key])
if 'arrival_photo' in data:
imgs = data['arrival_photo']
if isinstance(imgs, list):
stock.arrival_photo = json.dumps(imgs)
if 'quality_report_link' in data:
imgs = data['quality_report_link']
if isinstance(imgs, list):
stock.quality_report_link = json.dumps(imgs)
if 'production_start_time' in data:
try:
if data['production_start_time']:
stock.production_start_time = datetime.strptime(str(data['production_start_time']),
'%Y-%m-%d %H:%M:%S')
else:
stock.production_start_time = None
except:
pass
if 'production_end_time' in data:
try:
if data['production_end_time']:
stock.production_end_time = datetime.strptime(str(data['production_end_time']),
'%Y-%m-%d %H:%M:%S')
else:
stock.production_end_time = None
except:
pass
if 'production_time_range' in data:
raw_range = data['production_time_range']
if isinstance(raw_range, list):
stock.production_time_range = " ~ ".join([str(x) for x in raw_range])
else:
stock.production_time_range = raw_range
qty_changed = False
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
diff = new_qty - float(stock.in_quantity)
if diff != 0:
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
qty_changed = True
if 'raw_material_cost' in data:
stock.raw_material_cost = float(data['raw_material_cost'])
# 决定单件成本有BOM用BOM成本否则用raw_material_cost
bom_code = data.get('bom_code', stock.bom_code)
bom_version = data.get('bom_version', stock.bom_version)
raw_cost = float(data.get('raw_material_cost', stock.raw_material_cost or 0))
if bom_code and bom_version:
try:
unit_cost = SemiInboundService.calculate_bom_cost(bom_code, bom_version)
except Exception:
unit_cost = raw_cost
else:
unit_cost = raw_cost
stock.manual_cost = unit_cost
if 'unit_total_cost' in data or qty_changed:
qty = float(stock.in_quantity or 1)
stock.total_price = unit_cost * qty
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_inbound(stock_id):
from app.models.inbound.semi import StockSemi
try:
stock = StockSemi.query.get(stock_id)
if not stock:
raise ValueError("记录不存在")
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_outbound_history(stock_id):
try:
records = TransOutbound.query.filter_by(
source_table='stock_semi', stock_id=stock_id
).order_by(TransOutbound.outbound_time.desc()).all()
return [r.to_dict() for r in records]
except:
return []
@staticmethod
def get_list(page, limit, keyword=None, sku=None, search_field='all', statuses=None, category=None, material_type=None, company=None,
order_by_column=None, is_asc=None, advanced_filters=None):
from app.models.inbound.semi import StockSemi
try:
query = db.session.query(StockSemi).outerjoin(MaterialBase, StockSemi.base_id == MaterialBase.id)
# 1. 通用关键词搜索(支持指定字段精准搜索)
if keyword:
kw = f'%{keyword}%'
if search_field == 'name':
query = query.filter(MaterialBase.name.ilike(kw))
elif search_field == 'spec':
query = query.filter(MaterialBase.spec_model.ilike(kw))
elif search_field == 'common_name':
query = query.filter(MaterialBase.common_name.ilike(kw))
elif search_field == 'batch_number':
query = query.filter(StockSemi.batch_number.ilike(kw))
elif search_field == 'work_order_code':
query = query.filter(StockSemi.work_order_code.ilike(kw))
elif search_field == 'bom_code':
query = query.filter(StockSemi.bom_code.ilike(kw))
else: # 'all' 默认全局模糊匹配
query = query.filter(
or_(
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw),
MaterialBase.company_name.ilike(kw),
StockSemi.batch_number.ilike(kw),
StockSemi.serial_number.ilike(kw),
StockSemi.work_order_code.ilike(kw),
StockSemi.bom_code.ilike(kw)
)
)
# 1.1 SKU 独立搜索
if sku and sku.strip():
sku_str = f'%{sku.strip()}%'
query = query.filter(StockSemi.sku.ilike(sku_str))
if category and category.strip():
query = query.filter(MaterialBase.category == category.strip())
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
if company and company.strip():
query = query.filter(MaterialBase.company_name == company.strip())
if not statuses:
statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockSemi.status.in_(statuses))
else:
query = query.filter(
and_(
StockSemi.status.in_(statuses),
StockSemi.stock_quantity > 0
)
)
# 动态高级筛选
if advanced_filters:
if isinstance(advanced_filters, str):
try:
import json
advanced_filters = json.loads(advanced_filters)
except:
advanced_filters = []
if isinstance(advanced_filters, list):
field_mapping = {
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'spec_model': MaterialBase.spec_model,
'category': MaterialBase.category,
'material_type': MaterialBase.material_type,
'unit': MaterialBase.unit,
'id': StockSemi.id,
'base_id': StockSemi.base_id,
'sku': StockSemi.sku,
'inbound_date': StockSemi.production_date,
'barcode': StockSemi.barcode,
'serial_number': StockSemi.serial_number,
'batch_number': StockSemi.batch_number,
'status': StockSemi.status,
'quality_status': StockSemi.quality_status,
'qty_inbound': StockSemi.in_quantity,
'qty_stock': StockSemi.stock_quantity,
'qty_available': StockSemi.available_quantity,
'warehouse_location': StockSemi.warehouse_location,
'bom_code': StockSemi.bom_code,
'bom_version': StockSemi.bom_version,
'work_order_code': StockSemi.work_order_code,
'raw_material_cost': StockSemi.raw_material_cost,
'unit_total_cost': StockSemi.manual_cost,
'total_price': StockSemi.total_price,
'production_manager': StockSemi.production_manager,
'production_start_time': StockSemi.production_start_time,
'production_end_time': StockSemi.production_end_time,
}
for cond in advanced_filters:
field = cond.get('field')
operator = cond.get('operator')
value = cond.get('value')
if not field or not operator:
continue
model_field = field_mapping.get(field)
if model_field is None:
continue
# 防止 SQL 注入,只允许映射的字段
if operator == '=':
query = query.filter(model_field == value)
elif operator == '!=':
query = query.filter(model_field != value)
elif operator == 'like':
query = query.filter(model_field.ilike(f'%{value}%'))
elif operator == 'not_like':
query = query.filter(~model_field.ilike(f'%{value}%'))
elif operator == '>':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field > float(value))
elif operator == '<':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field < float(value))
elif operator == '>=':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field >= float(value))
elif operator == '<=':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field <= float(value))
# 动态排序
order_field = None
if order_by_column and is_asc is not None:
order_mapping = {
'id': StockSemi.id,
'base_id': StockSemi.base_id,
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'category': MaterialBase.category,
'material_type': MaterialBase.material_type,
'spec_model': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'sku': StockSemi.sku,
'inbound_date': StockSemi.production_date,
'barcode': StockSemi.barcode,
'serial_number': StockSemi.serial_number,
'batch_number': StockSemi.batch_number,
'status': StockSemi.status,
'quality_status': StockSemi.quality_status,
'qty_inbound': StockSemi.in_quantity,
'qty_stock': StockSemi.stock_quantity,
'qty_available': StockSemi.available_quantity,
'warehouse_loc': StockSemi.warehouse_location,
'bom_code': StockSemi.bom_code,
'bom_version': StockSemi.bom_version,
'work_order_code': StockSemi.work_order_code,
'raw_material_cost': StockSemi.raw_material_cost,
'unit_total_cost': StockSemi.manual_cost,
'total_price': StockSemi.total_price,
'production_manager': StockSemi.production_manager,
'production_start_time': StockSemi.production_start_time,
'production_end_time': StockSemi.production_end_time,
}
order_field = order_mapping.get(order_by_column)
if order_field is not None:
if is_asc == 'true' or is_asc == True:
query = query.order_by(order_field.asc())
else:
query = query.order_by(order_field.desc())
if order_field is None:
query = query.order_by(StockSemi.production_date.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
item_dict = item.to_dict()
item_dict['unit_total_cost'] = float(item.manual_cost or 0)
items.append(item_dict)
return {"total": pagination.total, "items": items}
except Exception as e:
print(f"List Error: {e}")
traceback.print_exc()
return {"total": 0, "items": []}
@staticmethod
def search_system_users(keyword):
from app.models.system import SysUser
try:
query = SysUser.query.filter(SysUser.status == 'active')
if keyword:
kw = f'%{keyword}%'
query = query.filter(db.or_(
SysUser.username.ilike(kw),
SysUser.email.ilike(kw)
))
query = query.order_by(SysUser.username)
users = []
for u in query.limit(20).all():
users.append({
'value': u.username,
'email': u.email
})
return users
except Exception:
return []
@staticmethod
def get_filter_options():
try:
from app.models.base import MaterialBase
categories = db.session.query(MaterialBase.category).filter(MaterialBase.category != None,
MaterialBase.category != '').distinct().all()
sorted_categories = sorted([r[0] for r in categories])
types = db.session.query(MaterialBase.material_type).filter(MaterialBase.material_type != None,
MaterialBase.material_type != '').distinct().all()
sorted_types = sorted([r[0] for r in types])
companies = db.session.query(MaterialBase.company_name).filter(MaterialBase.company_name != None,
MaterialBase.company_name != '').distinct().all()
sorted_companies = sorted([r[0] for r in companies])
return {
"categories": sorted_categories,
"types": sorted_types,
"companies": sorted_companies
}
except Exception:
traceback.print_exc()
return {"categories": [], "types": [], "companies": []}
@staticmethod
def get_history_managers(keyword=None):
from app.models.inbound.semi import StockSemi
try:
query = db.session.query(StockSemi.production_manager).filter(
StockSemi.production_manager.isnot(None),
StockSemi.production_manager != ''
)
if keyword:
query = query.filter(StockSemi.production_manager.ilike(f'%{keyword}%'))
records = query.distinct().all()
return [r[0] for r in records if r[0]]
except Exception:
traceback.print_exc()
return []
@staticmethod
def calculate_bom_cost(bom_no, bom_version):
"""
根据 BOM 编号和版本计算原材料总成本
遍历 BOM 子件,使用原生 SQL 查物理表 bom_table取每个子件在采购、半成品、成品三个表中的最高单价乘以用量后累加
"""
from app.models.inbound.buy import StockBuy
from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from sqlalchemy import func, text
try:
# 使用原生 SQL 精准查询 bom_table避免模型映射错误
sql = text("""
SELECT child_id, dosage
FROM bom_table
WHERE bom_no = :bom_no AND version = :version
""")
bom_lines = db.session.execute(sql, {'bom_no': bom_no, 'version': bom_version}).fetchall()
total_cost = 0.0
for line in bom_lines:
component_base_id = line[0] # child_id
usage_qty = float(line[1] or 1.0) # dosage
# 1. 查采购表最高价 (不含税)
buy_price = db.session.query(func.max(StockBuy.pre_tax_unit_price)).filter(
StockBuy.base_id == component_base_id
).scalar() or 0.0
# 2. 查半成品表最高价 (单件成本映射存在 manual_cost 里了)
semi_price = db.session.query(func.max(StockSemi.manual_cost)).filter(
StockSemi.base_id == component_base_id
).scalar() or 0.0
# 3. 查成品表最高价 (同样存储在 manual_cost 字段里)
product_price = db.session.query(func.max(StockProduct.manual_cost)).filter(
StockProduct.base_id == component_base_id
).scalar() or 0.0
# 4. 取三个表中的最大值,乘以用量 (dosage)
max_price = max(float(buy_price), float(semi_price), float(product_price))
total_cost += max_price * usage_qty
return round(total_cost, 2)
except Exception as e:
traceback.print_exc()
raise e