Files
KCGL/inventory-backend/app/services/inbound/product_service.py

309 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/inbound/product_service.py
from app.extensions import db
from app.models.base import MaterialBase
from app.models.outbound import TransOutbound
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_, func, text, and_
import traceback
import json
class ProductInboundService:
# ============================================================
# 1. 基础物料搜索 (已修正:完全对齐 Buy/Semi 的逻辑)
# ============================================================
@staticmethod
def search_base_material(keyword):
try:
# 1. 基础查询:必须是已启用的物料
query = MaterialBase.query.filter(MaterialBase.is_enabled == True)
# 2. 动态条件:如果传入了关键词,则增加模糊匹配条件
if keyword:
query = query.filter(
or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
)
# 3. 排序与限制按ID倒序取最新20条
query = query.order_by(MaterialBase.id.desc()).limit(20)
# 4. 结果封装:确保字段名与前端 Vue 的 handleSelect 方法一致
results = []
for item in query.all():
results.append({
'id': item.id,
'name': item.name,
'spec': item.spec_model, # 对应前端: item.spec
'category': item.category, # 对应前端: item.category
'unit': item.unit, # 对应前端: item.unit
'type': item.material_type, # 对应前端: item.type
'status': '启用'
})
return results
except Exception:
traceback.print_exc()
return []
# ============================================================
# 2. 新增入库逻辑 (强制北京时间)
# ============================================================
@staticmethod
def handle_inbound(data):
from app.models.inbound.product import StockProduct
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
# [核心修改] 强制北京时间
beijing_tz = timezone(timedelta(hours=8))
current_time = datetime.now(beijing_tz).replace(tzinfo=None)
in_date_val = current_time
if data.get('in_date'):
try:
date_str = str(data['in_date'])
if len(date_str) > 10:
in_date_val = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
else:
d_temp = datetime.strptime(date_str, '%Y-%m-%d')
in_date_val = datetime(d_temp.year, d_temp.month, d_temp.day,
current_time.hour, current_time.minute, current_time.second)
except:
in_date_val = current_time
in_qty = float(data.get('in_quantity') or 0)
p_start = data.get('production_start_time', '')
p_end = data.get('production_end_time', '')
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
# 全局流水号
seq_sql = text("SELECT nextval('global_print_seq')")
result = db.session.execute(seq_sql)
next_global_id = result.scalar()
generated_sku = str(next_global_id).zfill(10)
final_barcode = data.get('barcode') or generated_sku
photo_list = data.get('product_photo', [])
quality_list = data.get('quality_report_link', [])
inspection_list = data.get('inspection_report_link', [])
if not isinstance(photo_list, list): photo_list = []
if not isinstance(quality_list, list): quality_list = []
if not isinstance(inspection_list, list): inspection_list = []
new_stock = StockProduct(
base_id=material.id,
global_print_id=next_global_id,
sku=generated_sku,
production_date=in_date_val, # 存入 DateTime
barcode=final_barcode,
serial_number=data.get('serial_number'),
status=data.get('status', '在库'),
warehouse_location=data.get('warehouse_location'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_time_range=time_range,
raw_material_cost=float(data.get('raw_material_cost') or 0),
manual_cost=float(data.get('manual_cost') or 0),
quality_status=data.get('quality_status', '合格'),
product_photo=json.dumps(photo_list),
quality_report_link=json.dumps(quality_list),
inspection_report_link=json.dumps(inspection_list),
detail_link=data.get('detail_link'),
remark=data.get('remark'),
sale_price=float(data.get('sale_price') or 0),
order_id=data.get('order_id')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 3. 更新逻辑
# ============================================================
@staticmethod
def update_inbound(stock_id, data):
from app.models.inbound.product import StockProduct
try:
stock = StockProduct.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
fields = [
'barcode', 'serial_number', 'warehouse_location',
'status', 'quality_status', 'bom_code', 'bom_version',
'work_order_code', 'production_manager',
'detail_link', 'order_id', 'remark'
]
for f in fields:
if f in data: setattr(stock, f, data[f])
if 'product_photo' in data:
imgs = data['product_photo']
if isinstance(imgs, list): stock.product_photo = json.dumps(imgs)
if 'quality_report_link' in data:
imgs = data['quality_report_link']
if isinstance(imgs, list): stock.quality_report_link = json.dumps(imgs)
if 'inspection_report_link' in data:
imgs = data['inspection_report_link']
if isinstance(imgs, list): stock.inspection_report_link = json.dumps(imgs)
if 'sale_price' in data: stock.sale_price = float(data['sale_price'])
if 'raw_material_cost' in data: stock.raw_material_cost = float(data['raw_material_cost'])
if 'manual_cost' in data: stock.manual_cost = float(data['manual_cost'])
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
diff = new_qty - float(stock.in_quantity)
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
if 'production_start_time' in data or 'production_end_time' in data:
old_range = stock.production_time_range or " ~ "
parts = old_range.split(' ~ ')
old_start = parts[0] if len(parts) > 0 else ''
old_end = parts[1] if len(parts) > 1 else ''
start = data.get('production_start_time', old_start)
end = data.get('production_end_time', old_end)
stock.production_time_range = f"{start} ~ {end}"
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 4. 删除逻辑
# ============================================================
@staticmethod
def delete_inbound(stock_id):
from app.models.inbound.product import StockProduct
try:
stock = StockProduct.query.get(stock_id)
if stock:
db.session.delete(stock)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
raise e
# ============================================================
# 5. 出库历史
# ============================================================
@staticmethod
def get_outbound_history(stock_id):
"""获取出库历史"""
try:
records = TransOutbound.query.filter_by(
source_table='stock_product', stock_id=stock_id
).order_by(TransOutbound.outbound_time.desc()).all()
return [r.to_dict() for r in records]
except:
return []
# ============================================================
# 6. 获取列表
# ============================================================
@staticmethod
def get_list(page, limit, keyword=None, statuses=None):
from app.models.inbound.product import StockProduct
try:
query = db.session.query(StockProduct).outerjoin(MaterialBase, StockProduct.base_id == MaterialBase.id)
if keyword:
query = query.filter(or_(
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%'),
StockProduct.serial_number.ilike(f'%{keyword}%'),
StockProduct.work_order_code.ilike(f'%{keyword}%'),
StockProduct.order_id.ilike(f'%{keyword}%'),
StockProduct.sku.ilike(f'%{keyword}%')
))
if not statuses:
statuses = ['在库', '借库']
if '已出库' in statuses:
query = query.filter(StockProduct.status.in_(statuses))
else:
query = query.filter(
and_(
StockProduct.status.in_(statuses),
StockProduct.stock_quantity > 0
)
)
# 按照 production_date (入库日期) 倒序排序
pagination = query.order_by(StockProduct.production_date.desc()).paginate(page=page, per_page=limit,
error_out=False)
current_items = pagination.items
def parse_img(json_str):
if not json_str: return []
try:
return json.loads(json_str) if json_str.startswith('[') else [json_str]
except:
return []
items = []
for item in current_items:
d = item.to_dict()
# 格式化日期
date_display = ''
if item.production_date:
try:
date_display = item.production_date.strftime('%Y-%m-%d')
except:
date_display = str(item.production_date)[:10]
d['inbound_date'] = date_display
d['qty_stock'] = float(item.stock_quantity or 0)
d['qty_available'] = float(item.available_quantity or 0)
d['sum_stock'] = d['qty_stock']
d['sum_available'] = d['qty_available']
d['product_photo'] = parse_img(item.product_photo)
d['quality_report_link'] = parse_img(item.quality_report_link)
d['inspection_report_link'] = parse_img(item.inspection_report_link)
d['global_print_id'] = item.global_print_id
items.append(d)
return {"total": pagination.total, "items": items}
except:
traceback.print_exc()
return {"total": 0, "items": []}