Files
KCGL/inventory-backend/app/services/inbound/product_service.py

388 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/product_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.outbound import TransOutbound
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
import traceback
import json
class ProductInboundService:
# ============================================================
# 0. 辅助:唯一性校验
# ============================================================
@staticmethod
def _check_unique(serial_number, exclude_id=None):
from app.models.inbound.product import StockProduct
if serial_number:
query = StockProduct.query.filter(StockProduct.serial_number == serial_number)
if exclude_id:
query = query.filter(StockProduct.id != exclude_id)
exists = query.first()
if exists:
occupied_name = exists.base.name if (hasattr(exists, 'base') and exists.base) else "未知物料"
raise ValueError(f"序列号【{serial_number}】已存在!被成品 [{occupied_name}] 占用,请核查。")
# ============================================================
# 1. 基础物料搜索
# ============================================================
@staticmethod
def search_base_material(keyword):
try:
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
query = query.order_by(MaterialBase.id.desc()).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model,
'category': item.category,
'unit': item.unit,
'type': item.material_type,
'status': '启用'
})
return results
except Exception:
traceback.print_exc()
return []
# ============================================================
# 1.5 [新增] BOM 搜索逻辑
# ============================================================
@staticmethod
def search_bom_options(keyword):
from app.models.bom import BomTable
try:
# 关联查询BOM表 + 父件基础信息表
query = db.session.query(
BomTable.bom_no,
BomTable.version,
MaterialBase.name.label('parent_name'),
MaterialBase.spec_model.label('parent_spec')
).join(MaterialBase, BomTable.parent_id == MaterialBase.id)
# 只查询启用的BOM
if hasattr(BomTable, 'is_enabled'):
query = query.filter(BomTable.is_enabled == True)
if keyword:
kw = f'%{keyword}%'
# 支持搜索BOM编号、父件名称、父件规格
query = query.filter(
or_(
BomTable.bom_no.ilike(kw),
MaterialBase.name.ilike(kw),
MaterialBase.spec_model.ilike(kw)
)
)
# 去重并限制数量
results = query.distinct().limit(20).all()
return [{
'bom_no': r.bom_no,
'version': r.version,
'parent_name': r.parent_name,
'parent_spec': r.parent_spec or ''
} for r in results]
except Exception:
traceback.print_exc()
return []
# ============================================================
# 2. 新增入库逻辑
# ============================================================
@staticmethod
def handle_inbound(data):
from app.models.inbound.product import StockProduct
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
if not material.is_enabled:
raise ValueError(f"物料【{material.name}】已停用,无法办理新入库。")
ProductInboundService._check_unique(
serial_number=data.get('serial_number')
)
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
current_time.hour, current_time.minute, current_time.second)
except:
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
p_start = data.get('production_start_time', '')
p_end = data.get('production_end_time', '')
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
try:
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
except:
next_global_id = None
generated_sku = str(next_global_id).zfill(10) if next_global_id else datetime.now().strftime('%Y%m%d%H%M%S')
final_barcode = data.get('barcode') or generated_sku
photo_list = data.get('product_photo', [])
quality_list = data.get('quality_report_link', [])
inspection_list = data.get('inspection_report_link', [])
if not isinstance(photo_list, list): photo_list = []
if not isinstance(quality_list, list): quality_list = []
if not isinstance(inspection_list, list): inspection_list = []
new_stock = StockProduct(
base_id=material.id,
global_print_id=next_global_id,
sku=generated_sku,
production_date=in_date_val,
barcode=final_barcode,
serial_number=data.get('serial_number'),
status=data.get('status', '在库'),
warehouse_location=data.get('warehouse_location'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_time_range=time_range,
raw_material_cost=float(data.get('raw_material_cost') or 0),
manual_cost=float(data.get('manual_cost') or 0),
quality_status=data.get('quality_status', '合格'),
product_photo=json.dumps(photo_list),
quality_report_link=json.dumps(quality_list),
inspection_report_link=json.dumps(inspection_list),
detail_link=data.get('detail_link'),
remark=data.get('remark'),
sale_price=float(data.get('sale_price') or 0),
order_id=data.get('order_id')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 3. 更新逻辑
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
from app.models.inbound.product import StockProduct
try:
stock = StockProduct.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
if 'serial_number' in data:
ProductInboundService._check_unique(
serial_number=data['serial_number'],
exclude_id=stock_id
)
fields = [
'barcode', 'serial_number', 'warehouse_location',
'status', 'quality_status', 'bom_code', 'bom_version',
'work_order_code', 'production_manager',
'detail_link', 'order_id', 'remark'
]
for f in fields:
if f in data: setattr(stock, f, data[f])
if 'product_photo' in data:
imgs = data['product_photo']
if isinstance(imgs, list): stock.product_photo = json.dumps(imgs)
if 'quality_report_link' in data:
imgs = data['quality_report_link']
if isinstance(imgs, list): stock.quality_report_link = json.dumps(imgs)
if 'inspection_report_link' in data:
imgs = data['inspection_report_link']
if isinstance(imgs, list): stock.inspection_report_link = json.dumps(imgs)
if 'sale_price' in data: stock.sale_price = float(data['sale_price'])
if 'raw_material_cost' in data: stock.raw_material_cost = float(data['raw_material_cost'])
if 'manual_cost' in data: stock.manual_cost = float(data['manual_cost'])
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
diff = new_qty - float(stock.in_quantity)
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
if 'production_start_time' in data or 'production_end_time' in data:
old_range = stock.production_time_range or " ~ "
parts = old_range.split(' ~ ')
old_start = parts[0] if len(parts) > 0 else ''
old_end = parts[1] if len(parts) > 1 else ''
start = data.get('production_start_time', old_start)
end = data.get('production_end_time', old_end)
stock.production_time_range = f"{start} ~ {end}"
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 4. 删除逻辑
# ============================================================
@staticmethod
def delete_inbound(stock_id):
from app.models.inbound.product import StockProduct
try:
stock = StockProduct.query.get(stock_id)
if stock:
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 5. 出库历史
# ============================================================
@staticmethod
def get_outbound_history(stock_id):
try:
records = TransOutbound.query.filter_by(
source_table='stock_product', stock_id=stock_id
).order_by(TransOutbound.outbound_time.desc()).all()
return [r.to_dict() for r in records]
except:
return []
# ============================================================
# 6. 获取列表
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None):
from app.models.inbound.product import StockProduct
try:
query = db.session.query(StockProduct).outerjoin(MaterialBase, StockProduct.base_id == MaterialBase.id)
if keyword:
query = query.filter(or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
StockProduct.serial_number.ilike(f'%{keyword}%'),
StockProduct.work_order_code.ilike(f'%{keyword}%'),
StockProduct.order_id.ilike(f'%{keyword}%'),
StockProduct.sku.ilike(f'%{keyword}%')
))
if category and category.strip():
query = query.filter(MaterialBase.category == category.strip())
if material_type and material_type.strip():
query = query.filter(MaterialBase.material_type == material_type.strip())
if not statuses:
statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockProduct.status.in_(statuses))
else:
query = query.filter(
and_(
StockProduct.status.in_(statuses),
StockProduct.stock_quantity > 0
)
)
pagination = query.order_by(StockProduct.production_date.desc()).paginate(page=page, per_page=limit,
error_out=False)
current_items = pagination.items
def parse_img(json_str):
if not json_str: return []
try:
return json.loads(json_str) if json_str.startswith('[') else [json_str]
except:
return []
items = []
for item in current_items:
d = item.to_dict()
date_display = ''
if item.production_date:
try:
date_display = item.production_date.strftime('%Y-%m-%d')
except:
date_display = str(item.production_date)[:10]
d['inbound_date'] = date_display
d['qty_stock'] = float(item.stock_quantity or 0)
d['qty_available'] = float(item.available_quantity or 0)
d['sum_stock'] = d['qty_stock']
d['sum_available'] = d['qty_available']
d['product_photo'] = parse_img(item.product_photo)
d['quality_report_link'] = parse_img(item.quality_report_link)
d['inspection_report_link'] = parse_img(item.inspection_report_link)
d['global_print_id'] = item.global_print_id
items.append(d)
return {"total": pagination.total, "items": items}
except:
traceback.print_exc()
return {"total": 0, "items": []}
@staticmethod
def search_system_users(keyword):
from app.models.system import SysUser
try:
query = SysUser.query.filter(SysUser.status == 'active')
if keyword:
kw = f'%{keyword}%'
query = query.filter(db.or_(
SysUser.username.ilike(kw),
SysUser.email.ilike(kw)
))
query = query.order_by(SysUser.username)
users = []
for u in query.limit(20).all():
users.append({
'value': u.username,
'email': u.email
})
return users
except Exception:
return []
@staticmethod
def get_filter_options():
try:
from app.models.base import MaterialBase
categories = db.session.query(MaterialBase.category) \
.filter(MaterialBase.category != None, MaterialBase.category != '') \
.distinct().all()
types = db.session.query(MaterialBase.material_type) \
.filter(MaterialBase.material_type != None, MaterialBase.material_type != '') \
.distinct().all()
return {
"categories": [r[0] for r in categories],
"types": [r[0] for r in types]
}
except Exception:
import traceback
traceback.print_exc()
return {"categories": [], "types": []}