(no commit message provided)

Co-authored-by: aider (openai/DeepSeek-V3.2-Thinking) <aider@aider.chat>
This commit is contained in:
dxc
2026-02-09 11:29:37 +08:00
parent 49453d47f6
commit 89a29f0b65
6 changed files with 717 additions and 2 deletions

View File

@ -0,0 +1,123 @@
from flask import request, jsonify, current_app
from . import inbound_bp
from app.schemas.stock_schema import stock_service_schema
from app.services.inbound.service_service import ServiceService
from app.utils.decorators import token_required, role_required
@inbound_bp.route('/service', methods=['GET'])
@token_required
def get_service_list():
"""获取服务权益列表"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
keyword = request.args.get('keyword', None)
start_date = request.args.get('start_date', None)
end_date = request.args.get('end_date', None)
provider_name = request.args.get('provider_name', None)
try:
result = ServiceService.get_service_list(
page=page,
per_page=per_page,
keyword=keyword,
start_date=start_date,
end_date=end_date,
provider_name=provider_name
)
return jsonify({
'code': 200,
'msg': 'success',
'data': result
})
except Exception as e:
current_app.logger.error(f'获取服务列表失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@inbound_bp.route('/service', methods=['POST'])
@token_required
@role_required('admin,manager')
def create_service():
"""创建服务权益"""
data = request.get_json()
if not data:
return jsonify({'code': 400, 'msg': '请求数据为空'}), 400
errors = stock_service_schema.validate(data)
if errors:
return jsonify({'code': 400, 'msg': '数据校验失败', 'errors': errors}), 400
try:
service = ServiceService.create_service(data)
return jsonify({
'code': 201,
'msg': '创建成功',
'data': stock_service_schema.dump(service)
}), 201
except ValueError as e:
return jsonify({'code': 400, 'msg': str(e)}), 400
except Exception as e:
current_app.logger.error(f'创建服务权益失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@inbound_bp.route('/service/<int:service_id>', methods=['GET'])
@token_required
def get_service(service_id):
"""获取单个服务权益详情"""
try:
service = ServiceService.get_service(service_id)
return jsonify({
'code': 200,
'msg': 'success',
'data': stock_service_schema.dump(service)
})
except ValueError as e:
return jsonify({'code': 404, 'msg': str(e)}), 404
except Exception as e:
current_app.logger.error(f'获取服务权益详情失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@inbound_bp.route('/service/<int:service_id>', methods=['PUT'])
@token_required
@role_required('admin,manager')
def update_service(service_id):
"""更新服务权益"""
data = request.get_json()
if not data:
return jsonify({'code': 400, 'msg': '请求数据为空'}), 400
# 部分字段不允许更新,可在此过滤
allowed_fields = {'sale_price', 'provider_name', 'description'}
filtered_data = {k: v for k, v in data.items() if k in allowed_fields}
if not filtered_data:
return jsonify({'code': 400, 'msg': '无有效更新字段'}), 400
try:
service = ServiceService.update_service(service_id, filtered_data)
return jsonify({
'code': 200,
'msg': '更新成功',
'data': stock_service_schema.dump(service)
})
except ValueError as e:
return jsonify({'code': 404, 'msg': str(e)}), 404
except Exception as e:
current_app.logger.error(f'更新服务权益失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500
@inbound_bp.route('/service/<int:service_id>', methods=['DELETE'])
@token_required
@role_required('admin,manager')
def delete_service(service_id):
"""删除服务权益"""
try:
ServiceService.delete_service(service_id)
return jsonify({
'code': 200,
'msg': '删除成功'
})
except ValueError as e:
return jsonify({'code': 404, 'msg': str(e)}), 404
except Exception as e:
current_app.logger.error(f'删除服务权益失败: {str(e)}')
return jsonify({'code': 500, 'msg': '内部服务器错误'}), 500

View File

@ -0,0 +1,46 @@
from app import db
from datetime import datetime
class StockService(db.Model):
"""
服务权益库存表
对应数据库表: stock_service
"""
__tablename__ = 'stock_service'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
# 关联基础物料信息
base_id = db.Column(db.Integer, db.ForeignKey('material_base.id'), nullable=False)
# 系统生成的SKU格式 SRV-YYYYMMDD-XXXX
sku = db.Column(db.String(64), unique=True, nullable=False)
# 售价
sale_price = db.Column(db.Numeric(10, 2), nullable=False)
# 服务商名称
provider_name = db.Column(db.String(255), nullable=False, default='')
# 服务详情/简介
description = db.Column(db.Text, default='')
# 创建时间与更新时间
created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, nullable=False)
# 软删除标志
is_deleted = db.Column(db.Boolean, default=False, nullable=False)
# 关系(可选)
material_base = db.relationship('MaterialBase', backref='service_stocks', lazy='joined')
def to_dict(self):
"""转为字典,用于 API 响应"""
return {
'id': self.id,
'base_id': self.base_id,
'sku': self.sku,
'sale_price': float(self.sale_price) if self.sale_price is not None else 0,
'provider_name': self.provider_name,
'description': self.description,
'created_at': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
'updated_at': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
'material_name': self.material_base.name if self.material_base else None,
'spec_model': self.material_base.spec_model if self.material_base else None,
'unit': self.material_base.unit if self.material_base else None,
}

View File

@ -38,5 +38,28 @@ class StockBuySchema(Schema):
# 这里暂时不强制抛出错误,交给 Service 层处理 "SKU不存在且无名字" 的情况
class StockServiceSchema(Schema):
# 只用于输出的字段
id = fields.Int(dump_only=True)
sku = fields.Str(dump_only=True)
created_at = fields.DateTime(format='%Y-%m-%d %H:%M:%S', dump_only=True)
updated_at = fields.DateTime(format='%Y-%m-%d %H:%M:%S', dump_only=True)
material_name = fields.Str(dump_only=True)
spec_model = fields.Str(dump_only=True)
unit = fields.Str(dump_only=True)
# 输入字段
base_id = fields.Int(required=True, error_messages={"required": "必须选择基础物料"})
sale_price = fields.Float(required=True, validate=validate.Range(min=0, error="售价不能为负数"))
provider_name = fields.Str(required=True, error_messages={"required": "服务商名称不能为空"})
description = fields.Str(missing='')
@validates_schema
def validate_base_id(self, data, **kwargs):
# 可以在这里添加对 base_id 是否存在的检查,但更建议在 Service 层进行
pass
# 实例化 Schema
stock_buy_schema = StockBuySchema()
stock_buy_schema = StockBuySchema()
stock_service_schema = StockServiceSchema()

View File

@ -0,0 +1,126 @@
from app import db
from app.models.inbound.service import StockService
from app.models.material_base import MaterialBase
from datetime import datetime, timedelta
import re
class ServiceService:
"""服务权益库存业务逻辑"""
SKU_PREFIX = 'SRV'
SKU_DATE_FORMAT = '%Y%m%d'
SKU_SUFFIX_LEN = 4
@classmethod
def _generate_sku(cls):
"""生成唯一SKU格式 SRV-YYYYMMDD-XXXX"""
today_str = datetime.now().strftime(cls.SKU_DATE_FORMAT)
prefix = f'{cls.SKU_PREFIX}-{today_str}-'
# 查找今天已有的最大后缀
max_sku = db.session.query(db.func.max(StockService.sku)).filter(
StockService.sku.like(f'{prefix}%')
).scalar()
if not max_sku:
suffix_num = 1
else:
# 提取后缀数字
suffix_part = max_sku.replace(prefix, '')
match = re.match(r'^(\d+)', suffix_part)
suffix_num = int(match.group(1)) if match else 0
suffix_num += 1
# 格式化为4位数字左侧补零
suffix = str(suffix_num).zfill(cls.SKU_SUFFIX_LEN)
return f'{prefix}{suffix}'
@classmethod
def create_service(cls, data):
"""创建服务权益记录"""
# 检查基础物料是否存在
base = MaterialBase.query.get(data.get('base_id'))
if not base:
raise ValueError('基础物料不存在')
# 生成SKU
sku = cls._generate_sku()
service = StockService(
base_id=data['base_id'],
sku=sku,
sale_price=data['sale_price'],
provider_name=data['provider_name'],
description=data.get('description', '')
)
db.session.add(service)
db.session.commit()
return service
@classmethod
def get_service(cls, service_id):
"""获取单个服务权益"""
service = StockService.query.filter_by(id=service_id, is_deleted=False).first()
if not service:
raise ValueError('服务权益记录不存在')
return service
@classmethod
def update_service(cls, service_id, data):
"""更新服务权益记录"""
service = cls.get_service(service_id)
# 不允许修改 base_id 和 sku业务上不允许变更基础物料
if 'sale_price' in data:
service.sale_price = data['sale_price']
if 'provider_name' in data:
service.provider_name = data['provider_name']
if 'description' in data:
service.description = data.get('description', '')
service.updated_at = datetime.now()
db.session.commit()
return service
@classmethod
def delete_service(cls, service_id):
"""软删除服务权益"""
service = cls.get_service(service_id)
service.is_deleted = True
service.updated_at = datetime.now()
db.session.commit()
return True
@classmethod
def get_service_list(cls, page=1, per_page=20, keyword=None,
start_date=None, end_date=None, provider_name=None):
"""分页查询服务权益列表"""
query = StockService.query.filter_by(is_deleted=False)
# 关键词搜索:可搜索 SKU 或 关联物料名称
if keyword:
# 子查询查找物料名称匹配的 base_id
subquery = MaterialBase.query.filter(
MaterialBase.name.ilike(f'%{keyword}%')
).subquery()
query = query.filter(
db.or_(
StockService.sku.ilike(f'%{keyword}%'),
StockService.base_id.in_([row.id for row in db.session.query(subquery.c.id)])
)
)
if start_date:
start = datetime.strptime(start_date, '%Y-%m-%d')
query = query.filter(StockService.created_at >= start)
if end_date:
end = datetime.strptime(end_date, '%Y-%m-%d')
# 包含当天
end = end + timedelta(days=1) - timedelta(seconds=1)
query = query.filter(StockService.created_at <= end)
if provider_name:
query = query.filter(StockService.provider_name.ilike(f'%{provider_name}%'))
# 总数
total = query.count()
# 分页
items = query.order_by(StockService.created_at.desc())\
.offset((page - 1) * per_page)\
.limit(per_page).all()
return {
'items': [item.to_dict() for item in items],
'total': total,
'page': page,
'per_page': per_page
}