feat: add 'not contains' operator, implement inbound record export, and verify export filter rules

This commit is contained in:
DXC
2026-04-07 17:23:00 +08:00
parent 30ab1c186c
commit 6d80c90b66
7 changed files with 273 additions and 2 deletions

View File

@ -315,6 +315,8 @@ class MaterialBaseService:
filter_conditions.append(column != value)
elif operator == 'contains':
filter_conditions.append(column.ilike(f'%{value}%'))
elif operator == 'not_contains':
filter_conditions.append(~column.ilike(f'%{value}%'))
elif operator == 'ge':
try:
num_val = float(value)

View File

@ -6,6 +6,10 @@ from app.models.inbound.semi import StockSemi
from app.models.inbound.product import StockProduct
from app.models.base import MaterialBase
import traceback
from io import BytesIO
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils import get_column_letter
class InboundSummaryService:
@ -205,4 +209,174 @@ class InboundSummaryService:
except Exception as e:
print("【InboundSummaryService Error】:", str(e))
traceback.print_exc()
raise e
@staticmethod
def export_excel(keyword=None, start_date=None, end_date=None, source_type=None):
"""
导出入库记录 Excel
"""
try:
# 复用 get_list 的查询逻辑,但不分页(获取全部数据)
# 构建三个子查询
q_buy = db.session.query(
StockBuy.id.label('id'),
StockBuy.base_id.label('base_id'),
StockBuy.sku.label('sku'),
StockBuy.in_date.label('inbound_date'),
StockBuy.in_quantity.label('in_qty'),
StockBuy.stock_quantity.label('current_qty'),
cast(StockBuy.supplier_name, String).label('source_info'),
StockBuy.status.label('orig_status'),
cast(getattr(StockBuy, 'batch_number', literal('')), String).label('batch_number'),
cast(getattr(StockBuy, 'serial_number', getattr(StockBuy, 'sn', literal(''))), String).label('serial_number'),
cast(literal('buy'), String).label('source_type')
)
q_semi = db.session.query(
StockSemi.id.label('id'),
StockSemi.base_id.label('base_id'),
StockSemi.sku.label('sku'),
StockSemi.production_date.label('inbound_date'),
StockSemi.in_quantity.label('in_qty'),
StockSemi.stock_quantity.label('current_qty'),
cast(StockSemi.production_manager, String).label('source_info'),
StockSemi.status.label('orig_status'),
cast(getattr(StockSemi, 'batch_number', literal('')), String).label('batch_number'),
cast(getattr(StockSemi, 'serial_number', getattr(StockSemi, 'sn', literal(''))), String).label('serial_number'),
cast(literal('semi'), String).label('source_type')
)
q_product = db.session.query(
StockProduct.id.label('id'),
StockProduct.base_id.label('base_id'),
StockProduct.sku.label('sku'),
StockProduct.production_date.label('inbound_date'),
StockProduct.in_quantity.label('in_qty'),
StockProduct.stock_quantity.label('current_qty'),
cast(StockProduct.production_manager, String).label('source_info'),
StockProduct.status.label('orig_status'),
cast(getattr(StockProduct, 'batch_number', literal('')), String).label('batch_number'),
cast(getattr(StockProduct, 'serial_number', getattr(StockProduct, 'sn', literal(''))), String).label('serial_number'),
cast(literal('product'), String).label('source_type')
)
combined_query = union_all(q_buy, q_semi, q_product)
cte = combined_query.subquery()
query = db.session.query(
cte,
MaterialBase.name.label('material_name'),
MaterialBase.spec_model.label('spec_model'),
MaterialBase.category.label('category'),
MaterialBase.material_type.label('material_type')
).outerjoin(
MaterialBase, cte.c.base_id == MaterialBase.id
)
# 过滤条件
if keyword:
rule = or_(
cte.c.sku.ilike(f'%{keyword}%'),
cte.c.source_info.ilike(f'%{keyword}%'),
cte.c.batch_number.ilike(f'%{keyword}%'),
cte.c.serial_number.ilike(f'%{keyword}%'),
MaterialBase.name.ilike(f'%{keyword}%'),
MaterialBase.spec_model.ilike(f'%{keyword}%')
)
query = query.filter(rule)
if start_date and end_date:
query = query.filter(cte.c.inbound_date.between(start_date, end_date))
if source_type:
query = query.filter(cte.c.source_type == source_type)
# 排序
query = query.order_by(desc(cte.c.inbound_date), asc(cte.c.sku))
# 获取全部数据
rows = query.all()
# 创建 Excel
wb = Workbook()
ws = wb.active
ws.title = "入库记录"
# 表头样式
header_font = Font(bold=True, size=11, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
# 写表头
headers = ['SKU', '物品名称', '规格型号', '分类', '入库来源', '入库/生产日期', '入库数量', '批次/序列号', '供应商/负责人', '当前状态']
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
# 写数据
type_map = {'buy': '采购入库', 'semi': '半成品生产', 'product': '成品完工'}
for row_idx, row in enumerate(rows, 2):
date_str = ""
if row.inbound_date:
try:
date_str = row.inbound_date.strftime('%Y-%m-%d')
except Exception:
date_str = str(row.inbound_date)
in_qty = float(row.in_qty) if row.in_qty is not None else 0.0
current_qty = float(row.current_qty) if row.current_qty is not None else 0.0
final_status = row.orig_status
if current_qty <= 0:
final_status = "已出库"
elif current_qty < in_qty:
final_status = "部分出库"
b_num = row.batch_number or ""
s_num = row.serial_number or ""
if b_num and s_num and b_num != s_num:
display_batch_sn = f"{b_num}/{s_num}"
elif b_num:
display_batch_sn = b_num
elif s_num:
display_batch_sn = s_num
else:
display_batch_sn = "-"
ws.cell(row=row_idx, column=1, value=row.sku or "")
ws.cell(row=row_idx, column=2, value=row.material_name or "未知物品")
ws.cell(row=row_idx, column=3, value=row.spec_model or "")
ws.cell(row=row_idx, column=4, value=row.category or "")
ws.cell(row=row_idx, column=5, value=type_map.get(row.source_type, "未知类型"))
ws.cell(row=row_idx, column=6, value=date_str)
ws.cell(row=row_idx, column=7, value=in_qty)
ws.cell(row=row_idx, column=8, value=display_batch_sn)
ws.cell(row=row_idx, column=9, value=row.source_info or "")
ws.cell(row=row_idx, column=10, value=final_status)
# 自动调整列宽
for col in range(1, len(headers) + 1):
max_length = 0
column_letter = get_column_letter(col)
for row in range(2, len(rows) + 2):
cell_value = ws.cell(row=row, column=col).value
if cell_value:
max_length = max(max_length, len(str(cell_value)))
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
# 输出到字节流
file_stream = BytesIO()
wb.save(file_stream)
file_stream.seek(0)
return file_stream
except Exception as e:
print("【InboundSummaryService Export Error】:", str(e))
traceback.print_exc()
raise e