|
|
9b794d7f64
|
inventory-backend/app/api/v1/material/base.py
```python
<<<<<<< SEARCH
@auth_required()
def get(self):
"""获取基础物料列表"""
page = request.args.get('page', 1, type=int)
size = request.args.get('size', 100, type=int)
keyword = request.args.get('keyword', '').strip()
category = request.args.get('category', '').strip()
type_ = request.args.get('type', '').strip()
company = request.args.get('company', '').strip()
is_enabled = request.args.get('isEnabled', type=int)
data = material_base_service.get_material_base_list(
page=page,
size=size,
keyword=keyword,
category=category,
type_=type_,
company=company,
is_enabled=is_enabled
)
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
=======
@auth_required()
def get(self):
"""获取基础物料列表(支持排序和高级筛选)"""
page = request.args.get('page', 1, type=int)
size = request.args.get('size', 100, type=int)
keyword = request.args.get('keyword', '').strip()
category = request.args.get('category', '').strip()
type_ = request.args.get('type', '').strip()
company = request.args.get('company', '').strip()
is_enabled = request.args.get('isEnabled', type=int)
order_by = request.args.get('orderByColumn', '').strip()
is_asc = request.args.get('isAsc', '').strip()
advanced_filters = request.args.get('advancedFilters', '[]')
try:
filters = json.loads(advanced_filters) if advanced_filters else []
except json.JSONDecodeError:
filters = []
data = material_base_service.get_material_base_list(
page=page,
size=size,
keyword=keyword,
category=category,
type_=type_,
company=company,
is_enabled=is_enabled,
order_by=order_by,
is_asc=is_asc,
advanced_filters=filters
)
return jsonify({
'code': 200,
'msg': 'success',
'data': data
})
>>>>>>> REPLACE
```
inventory-backend/app/services/material_base_service.py
```python
<<<<<<< SEARCH
def get_material_base_list(page=1, size=100, keyword='', category='', type_='', company='', is_enabled=None):
"""查询基础物料列表"""
query = MaterialBase.query.filter_by(is_deleted=0)
if keyword:
query = query.filter(
or_(
MaterialBase.name.like(f'%{keyword}%'),
MaterialBase.common_name.like(f'%{keyword}%'),
MaterialBase.spec.like(f'%{keyword}%')
)
)
if category:
query = query.filter(MaterialBase.category == category)
if type_:
query = query.filter(MaterialBase.type == type_)
if company:
query = query.filter(MaterialBase.company_name == company)
if is_enabled is not None:
query = query.filter(MaterialBase.is_enabled == is_enabled)
total = query.count()
items = query.offset((page - 1) * size).limit(size).all()
return {
'items': [item.to_dict() for item in items],
'total': total,
'page': page,
'size': size
}
=======
def get_material_base_list(page=1, size=100, keyword='', category='', type_='', company='', is_enabled=None,
order_by='', is_asc='', advanced_filters=None):
"""查询基础物料列表(支持排序和高级筛选)"""
from app.models.base import MaterialBase
from sqlalchemy import or_, and_, text
query = MaterialBase.query.filter_by(is_deleted=0)
# 基础搜索条件
if keyword:
query = query.filter(
or_(
MaterialBase.name.like(f'%{keyword}%'),
MaterialBase.common_name.like(f'%{keyword}%'),
MaterialBase.spec.like(f'%{keyword}%')
)
)
if category:
query = query.filter(MaterialBase.category == category)
if type_:
query = query.filter(MaterialBase.type == type_)
if company:
query = query.filter(MaterialBase.company_name == company)
if is_enabled is not None:
query = query.filter(MaterialBase.is_enabled == is_enabled)
# 高级动态筛选
if advanced_filters:
filter_conditions = []
allowed_fields = {
'companyName': 'company_name',
'name': 'name',
'commonName': 'common_name',
'category': 'category',
'type': 'type',
'spec': 'spec',
'unit': 'unit',
'inventoryCount': 'inventory_count',
'availableCount': 'available_count'
}
for condition in advanced_filters:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
if not field or not operator or value is None:
continue
# 字段白名单校验
db_field = allowed_fields.get(field)
if not db_field:
continue
# 防止 SQL 注入:只允许预定义的字段名
column = getattr(MaterialBase, db_field, None)
if column is None:
continue
# 根据操作符构建条件
if operator == 'eq':
filter_conditions.append(column == value)
elif operator == 'ne':
filter_conditions.append(column != value)
elif operator == 'contains':
filter_conditions.append(column.like(f'%{value}%'))
elif operator == 'ge':
try:
num_val = float(value)
filter_conditions.append(column >= num_val)
except ValueError:
continue
elif operator == 'le':
try:
num_val = float(value)
filter_conditions.append(column <= num_val)
except ValueError:
continue
if filter_conditions:
query = query.filter(and_(*filter_conditions))
# 排序处理
if order_by:
allowed_sort_fields = {
'companyName': 'company_name',
'name': 'name',
'commonName': 'common_name',
'category': 'category',
'type': 'type',
'spec': 'spec',
'unit': 'unit',
'inventoryCount': 'inventory_count',
'availableCount': 'available_count'
}
db_field = allowed_sort_fields.get(order_by)
if db_field:
column = getattr(MaterialBase, db_field)
if is_asc == 'asc':
query = query.order_by(column.asc())
elif is_asc == 'desc':
query = query.order_by(column.desc())
total = query.count()
items = query.offset((page - 1) * size).limit(size).all()
return {
'items': [item.to_dict() for item in items],
'total': total,
'page': page,
'size': size
}
>>>>>>> REPLACE
```
inventory-backend/app/models/base.py
```python
<<<<<<< SEARCH
class MaterialBase(db.Model):
__tablename__ = 'material_base'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
company_name = db.Column(db.String(100), nullable=False, comment='所属公司')
name = db.Column(db.String(200), nullable=False, comment='名称')
common_name = db.Column(db.String(200), comment='俗名')
category = db.Column(db.String(50), comment='类别')
type = db.Column(db.String(50), comment='类型')
spec = db.Column(db.String(300), comment='规格型号')
unit = db.Column(db.String(20), comment='单位')
inventory_count = db.Column(db.Float, default=0, comment='库存数')
available_count = db.Column(db.Float, default=0, comment='可用数')
is_enabled = db.Column(db.Integer, default=1, comment='启用状态 1启用 0停用')
is_deleted = db.Column(db.Integer, default=0, comment='删除标志 0未删除 1已删除')
create_time = db.Column(db.DateTime, default=datetime.now, comment='创建时间')
update_time = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, comment='更新时间')
=======
class MaterialBase(db.Model):
__tablename__ = 'material_base'
# 允许排序和筛选的字段映射(前端字段名 -> 数据库字段名)
SORT_FIELD_MAP = {
'companyName': 'company_name',
'name': 'name',
'commonName': 'common_name',
'category': 'category',
'type': 'type',
'spec': 'spec',
'unit': 'unit',
'inventoryCount': 'inventory_count',
'availableCount': 'available_count'
}
FILTER_FIELD_MAP = {
'companyName': 'company_name',
'name': 'name',
'commonName': 'common_name',
'category': 'category',
'type': 'type',
'spec': 'spec',
'unit': 'unit',
'inventoryCount': 'inventory_count',
'availableCount': 'available_count'
}
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
company_name = db.Column(db.String(100), nullable=False, comment='所属公司')
name = db.Column(db.String(200), nullable=False, comment='名称')
common_name = db.Column(db.String(200), comment='俗名')
category = db.Column(db.String(50), comment='类别')
type = db.Column(db.String(50), comment='类型')
spec = db.Column(db.String(300), comment='规格型号')
unit = db.Column(db.String(20), comment='单位')
inventory_count = db.Column(db.Float, default=0, comment='库存数')
available_count = db.Column(db.Float, default=0, comment='可用数')
is_enabled = db.Column(db.Integer, default=1, comment='启用状态 1启用 0停用')
is_deleted = db.Column(db.Integer, default=0, comment='删除标志 0未删除 1已删除')
create_time = db.Column(db.DateTime, default=datetime.now, comment='创建时间')
update_time = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, comment='更新时间')
>>>>>>> REPLACE
```
Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
|
2026-03-02 15:18:38 +08:00 |
|