Files
KCGL/inventory-backend/app/services/inbound/product_service.py

173 lines
6.8 KiB
Python

from app.extensions import db
from app.models.base import MaterialBase
# 引用新的 product 路径
from app.models.inbound.product import StockProduct
from datetime import datetime
from sqlalchemy import or_, func
import traceback
class ProductInboundService:
@staticmethod
def search_base_material(keyword):
try:
if not keyword: return []
query = MaterialBase.query.filter(
MaterialBase.is_enabled == True,
or_(MaterialBase.name.ilike(f'%{keyword}%'), MaterialBase.spec_model.ilike(f'%{keyword}%'))
).limit(20)
results = []
for item in query.all():
results.append({
'id': item.id, 'name': item.name, 'spec': item.spec_model,
'category': item.category, 'unit': item.unit, 'type': item.material_type
})
return results
except:
return []
@staticmethod
def handle_inbound(data):
try:
base_id = data.get('base_id')
if not base_id: raise ValueError("必须选择基础物料")
material = MaterialBase.query.get(base_id)
if not material: raise ValueError("物料不存在")
in_date_val = datetime.utcnow().date()
if data.get('in_date'):
try:
in_date_val = datetime.strptime(str(data['in_date'])[:10], '%Y-%m-%d').date()
except:
pass
in_qty = float(data.get('in_quantity') or 0)
p_start = data.get('production_start_time', '')
p_end = data.get('production_end_time', '')
time_range = f"{p_start} ~ {p_end}" if p_start or p_end else None
new_stock = StockProduct(
base_id=material.id,
sku=data.get('sku'),
production_date=in_date_val,
barcode=data.get('barcode'),
serial_number=data.get('serial_number'),
status='在库',
warehouse_location=data.get('warehouse_location'),
in_quantity=in_qty,
stock_quantity=in_qty,
available_quantity=in_qty,
bom_code=data.get('bom_code'),
bom_version=data.get('bom_version'),
work_order_code=data.get('work_order_code'),
production_manager=data.get('production_manager'),
production_time_range=time_range,
raw_material_cost=float(data.get('raw_material_cost') or 0),
manual_cost=float(data.get('manual_cost') or 0),
quality_status=data.get('quality_status', '合格'),
quality_report_link=data.get('quality_report_link'),
detail_link=data.get('detail_link'),
sale_price=float(data.get('sale_price') or 0),
inspection_report_link=data.get('inspection_report_link'),
order_id=data.get('order_id')
)
db.session.add(new_stock)
db.session.commit()
return new_stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def update_inbound(stock_id, data):
try:
stock = StockProduct.query.get(stock_id)
if not stock: raise ValueError("记录不存在")
fields = [
'sku', 'barcode', 'serial_number', 'warehouse_location',
'status', 'quality_status', 'bom_code', 'bom_version',
'work_order_code', 'production_manager', 'quality_report_link',
'detail_link', 'inspection_report_link', 'order_id'
]
for f in fields:
if f in data: setattr(stock, f, data[f])
if 'sale_price' in data: stock.sale_price = float(data['sale_price'])
if 'raw_material_cost' in data: stock.raw_material_cost = float(data['raw_material_cost'])
if 'manual_cost' in data: stock.manual_cost = float(data['manual_cost'])
if 'in_quantity' in data:
new_qty = float(data['in_quantity'])
diff = new_qty - float(stock.in_quantity)
stock.in_quantity = new_qty
stock.stock_quantity = float(stock.stock_quantity) + diff
stock.available_quantity = float(stock.available_quantity) + diff
if 'production_start_time' in data or 'production_end_time' in data:
old_range = stock.production_time_range or " ~ "
parts = old_range.split(' ~ ')
start = data.get('production_start_time', parts[0] if len(parts) > 0 else '')
end = data.get('production_end_time', parts[1] if len(parts) > 1 else '')
stock.production_time_range = f"{start} ~ {end}"
db.session.commit()
return stock
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def delete_inbound(stock_id):
try:
stock = StockProduct.query.get(stock_id)
if stock:
db.session.delete(stock)
db.session.commit()
except Exception as e:
db.session.rollback()
raise e
@staticmethod
def get_list(page, limit, keyword=None):
try:
query = db.session.query(StockProduct).outerjoin(MaterialBase, StockProduct.base_id == MaterialBase.id)
if keyword:
query = query.filter(or_(
MaterialBase.name.ilike(f'%{keyword}%'),
StockProduct.serial_number.ilike(f'%{keyword}%'),
StockProduct.work_order_code.ilike(f'%{keyword}%'),
StockProduct.order_id.ilike(f'%{keyword}%')
))
pagination = query.order_by(StockProduct.id.desc()).paginate(page=page, per_page=limit, error_out=False)
base_ids = list(set([i.base_id for i in pagination.items]))
stock_map = {}
if base_ids:
aggs = db.session.query(
StockProduct.base_id,
func.sum(StockProduct.stock_quantity).label('s'),
func.sum(StockProduct.available_quantity).label('a')
).filter(StockProduct.base_id.in_(base_ids)).group_by(StockProduct.base_id).all()
for a in aggs: stock_map[a.base_id] = {'s': float(a.s or 0), 'a': float(a.a or 0)}
items = []
for item in pagination.items:
d = item.to_dict()
stats = stock_map.get(item.base_id, {'s': 0, 'a': 0})
d['sum_stock'] = stats['s']
d['sum_available'] = stats['a']
items.append(d)
return {"total": pagination.total, "items": items}
except:
traceback.print_exc()
return {"total": 0, "items": []}