feat: add full-column sorting and advanced filtering to semi module

Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
This commit is contained in:
dxc
2026-03-02 16:18:51 +08:00
parent 893be24071
commit 37f4b1a94f
3 changed files with 264 additions and 37 deletions

View File

@ -343,7 +343,8 @@ class SemiInboundService:
return []
@staticmethod
def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None, company=None):
def get_list(page, limit, keyword=None, statuses=None, category=None, material_type=None, company=None,
order_by_column=None, is_asc=None, advanced_filters=None):
from app.models.inbound.semi import StockSemi
try:
query = db.session.query(StockSemi).outerjoin(MaterialBase, StockSemi.base_id == MaterialBase.id)
@ -380,11 +381,108 @@ class SemiInboundService:
StockSemi.stock_quantity > 0
)
)
pagination = query.order_by(StockSemi.production_date.desc()).paginate(page=page, per_page=limit,
error_out=False)
# 动态高级筛选
if advanced_filters:
if isinstance(advanced_filters, str):
try:
import json
advanced_filters = json.loads(advanced_filters)
except:
advanced_filters = []
if isinstance(advanced_filters, list):
field_mapping = {
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'spec_model': MaterialBase.spec_model,
'category': MaterialBase.category,
'material_type': MaterialBase.material_type,
'status': StockSemi.status,
'quality_status': StockSemi.quality_status,
'warehouse_location': StockSemi.warehouse_location,
'bom_code': StockSemi.bom_code,
'work_order_code': StockSemi.work_order_code,
'qty_stock': StockSemi.stock_quantity,
'qty_available': StockSemi.available_quantity,
'unit_total_cost': StockSemi.manual_cost,
'raw_material_cost': StockSemi.raw_material_cost,
}
for cond in advanced_filters:
field = cond.get('field')
operator = cond.get('operator')
value = cond.get('value')
if not field or not operator:
continue
model_field = field_mapping.get(field)
if model_field is None:
continue
# 防止 SQL 注入,只允许映射的字段
if operator == '=':
query = query.filter(model_field == value)
elif operator == '!=':
query = query.filter(model_field != value)
elif operator == 'like':
query = query.filter(model_field.ilike(f'%{value}%'))
elif operator == 'not_like':
query = query.filter(~model_field.ilike(f'%{value}%'))
elif operator == '>':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field > float(value))
elif operator == '<':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field < float(value))
elif operator == '>=':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field >= float(value))
elif operator == '<=':
if value.replace('.', '', 1).isdigit():
query = query.filter(model_field <= float(value))
# 动态排序
order_field = None
if order_by_column and is_asc is not None:
order_mapping = {
'id': StockSemi.id,
'base_id': StockSemi.base_id,
'company_name': MaterialBase.company_name,
'material_name': MaterialBase.name,
'category': MaterialBase.category,
'material_type': MaterialBase.material_type,
'spec_model': MaterialBase.spec_model,
'unit': MaterialBase.unit,
'sku': StockSemi.sku,
'inbound_date': StockSemi.production_date,
'barcode': StockSemi.barcode,
'serial_number': StockSemi.serial_number,
'batch_number': StockSemi.batch_number,
'status': StockSemi.status,
'quality_status': StockSemi.quality_status,
'qty_inbound': StockSemi.in_quantity,
'qty_stock': StockSemi.stock_quantity,
'qty_available': StockSemi.available_quantity,
'warehouse_loc': StockSemi.warehouse_location,
'bom_code': StockSemi.bom_code,
'bom_version': StockSemi.bom_version,
'work_order_code': StockSemi.work_order_code,
'raw_material_cost': StockSemi.raw_material_cost,
'unit_total_cost': StockSemi.manual_cost,
'total_price': StockSemi.total_price,
'production_manager': StockSemi.production_manager,
'production_start_time': StockSemi.production_start_time,
'production_end_time': StockSemi.production_end_time,
}
order_field = order_mapping.get(order_by_column)
if order_field is not None:
if is_asc == 'true' or is_asc == True:
query = query.order_by(order_field.asc())
else:
query = query.order_by(order_field.desc())
if order_field is None:
query = query.order_by(StockSemi.production_date.desc())
pagination = query.paginate(page=page, per_page=limit, error_out=False)
items = []
for item in pagination.items:
# 把 manual_cost 伪装成 unit_total_cost 返回给前端
item_dict = item.to_dict()
item_dict['unit_total_cost'] = float(item.manual_cost or 0)
items.append(item_dict)