# inventory-backend/app/api/v1/audit.py from flask import Blueprint, request, jsonify, current_app from flask_jwt_extended import jwt_required, get_jwt from app.models.audit import AuditLog from app.extensions import db from sqlalchemy import or_ from datetime import datetime import json audit_bp = Blueprint('audit', __name__) @audit_bp.route('/logs', methods=['GET']) @jwt_required() def get_audit_logs(): """获取审计日志列表(分页)""" try: # 分页参数 page = request.args.get('page', 1, type=int) page_size = request.args.get('pageSize', 50, type=int) # 筛选参数 username = request.args.get('username', '').strip() module = request.args.get('module', '').strip() action = request.args.get('action', '').strip() target_id = request.args.get('target_id', '').strip() start_date = request.args.get('start_date', '').strip() end_date = request.args.get('end_date', '').strip() # 构建查询 query = AuditLog.query if username: query = query.filter(AuditLog.username.like(f'%{username}%')) if module: query = query.filter(AuditLog.module == module) if action: query = query.filter(AuditLog.action == action) if target_id: query = query.filter(AuditLog.target_id == target_id) if start_date: try: start_dt = datetime.strptime(start_date, '%Y-%m-%d') query = query.filter(AuditLog.created_at >= start_dt) except ValueError: pass if end_date: try: end_dt = datetime.strptime(end_date, '%Y-%m-%d') # 包含当天结束时间 from datetime import timedelta end_dt = end_dt + timedelta(days=1) query = query.filter(AuditLog.created_at < end_dt) except ValueError: pass # 排序 query = query.order_by(AuditLog.created_at.desc()) # 分页 pagination = query.paginate(page=page, per_page=page_size, error_out=False) logs = pagination.items # 序列化 data = [log.to_dict() for log in logs] # 获取可用的模块和操作类型(用于下拉选项) modules = db.session.query(AuditLog.module).distinct().all() modules = [m[0] for m in modules if m[0]] actions = db.session.query(AuditLog.action).distinct().all() actions = [a[0] for a in actions if a[0]] return jsonify({ 'code': 200, 'msg': '获取成功', 'data': { 'list': data, 'total': pagination.total, 'page': page, 'pageSize': page_size, 'modules': modules, 'actions': actions } }), 200 except Exception as e: current_app.logger.error(f"获取审计日志失败: {str(e)}") return jsonify({'code': 500, 'msg': f'服务器内部错误: {str(e)}'}), 500 @audit_bp.route('/logs/', methods=['GET']) @jwt_required() def get_audit_log_detail(log_id): """获取单条审计日志详情""" try: log = AuditLog.query.get(log_id) if not log: return jsonify({'code': 404, 'msg': '日志不存在'}), 404 return jsonify({ 'code': 200, 'msg': '获取成功', 'data': log.to_dict() }), 200 except Exception as e: current_app.logger.error(f"获取审计日志详情失败: {str(e)}") return jsonify({'code': 500, 'msg': str(e)}), 500 @audit_bp.route('/modules', methods=['GET']) @jwt_required() def get_modules(): """获取所有模块列表(用于筛选)""" try: modules = db.session.query(AuditLog.module).distinct().all() modules = [m[0] for m in modules if m[0]] return jsonify({'code': 200, 'data': modules}), 200 except Exception as e: current_app.logger.error(f"获取模块列表失败: {str(e)}") return jsonify({'code': 500, 'msg': str(e)}), 500