from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask_cors import CORS from flask_jwt_extended import JWTManager # 确保引入了 JWTManager from flask import current_app from datetime import datetime, timezone, timedelta import redis # 1. 创建扩展实例(此时未绑定具体的 App) db = SQLAlchemy() migrate = Migrate() cors = CORS() jwt = JWTManager() # 必须实例化 # Redis 客户端 (单设备登录互踢 + JWT Token 黑名单用) redis_client = None # Redis Key 前缀 _JWT_BLOCKED_USER_PREFIX = "jwt_blocked_user:" # 存储被删除/禁用的 user_id def beijing_time(): """获取北京时间 (UTC+8),剥离时区信息以兼容数据库 naive DateTime 字段""" return datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) # ============================================================================= # 全局 JWT Token 黑名单拦截器 # 原理:Flask-JWT-Extended 在每次 @jwt_required() 验证时, # 会自动触发 token_in_blocklist_loader 回调。 # 若该回调返回 True(命中黑名单),请求直接被 401 拒绝,后续代码不会执行。 # ============================================================================= @jwt.token_in_blocklist_loader def check_if_token_is_revoked(jwt_header, jwt_payload): """ 全局 JWT 黑名单检查:每次 @jwt_required() 调用时自动触发。 无论 AI(Dify)还是人类用户调用的接口,均受此拦截。 检查逻辑: 1. 通过 jwt_payload['sub'](user_id)查询 Redis 黑名单 2. 若 user_id 存在于黑名单 → 返回 True → 请求被 401 拒绝 3. 若 Redis 不可用(fail-open)→ 放行(不影响正常业务) """ user_id = jwt_payload.get('sub') if user_id is None: return False global redis_client if redis_client is None: return False try: blocked_key = f"{_JWT_BLOCKED_USER_PREFIX}{user_id}" is_blocked = redis_client.exists(blocked_key) if is_blocked: current_app.logger.warning( f"🚫 JWT revoked for deleted/disabled user: user_id={user_id}" ) return bool(is_blocked) except Exception as e: current_app.logger.error(f"JWT blocklist check error: {e}") return False # Redis 出错时 fail-open,不阻断正常业务 def revoke_all_tokens_for_user(user_id): """ 将指定用户的 ID 加入 JWT 黑名单(14 天)。 效果:该用户的所有已发放 Token(无论是否过期)瞬间失效。 由 delete_user() / update_user(status!='active') 时调用。 """ global redis_client if redis_client is None: current_app.logger.warning( f"Redis unavailable, cannot revoke tokens for user_id={user_id}" ) return try: blocked_key = f"{_JWT_BLOCKED_USER_PREFIX}{user_id}" ttl_seconds = 14 * 24 * 3600 # 14 天,与 Refresh Token 有效期对齐 redis_client.setex(blocked_key, ttl_seconds, "1") current_app.logger.info(f"✅ User {user_id} added to JWT blocklist (TTL={ttl_seconds}s)") except Exception as e: current_app.logger.error(f"Failed to revoke tokens for user_id={user_id}: {e}") # 2. 定义初始化函数 (供工厂函数 create_app 调用) def init_extensions(app): """ 统一初始化所有 Flask 扩展 """ global redis_client # 初始化数据库 db.init_app(app) # 初始化迁移工具 migrate.init_app(app, db) # 初始化跨域设置 (允许 /api/* 路径被所有来源访问) cors.init_app(app, resources={r"/api/*": {"origins": "*"}}) # 初始化 JWT (这一步至关重要,缺少它会导致 500 错误) jwt.init_app(app) # 初始化 Redis (单设备登录互踢) redis_url = app.config.get('REDIS_URL', 'redis://localhost:6379/0') try: redis_client = redis.from_url(redis_url, decode_responses=True) redis_client.ping() app.logger.info("✅ Redis connected successfully") except Exception as e: app.logger.warning(f"⚠️ Redis connection failed: {e}, single-device login will be disabled") # ★ 注册 SQLAlchemy 审计监听器 # 必须在 db.init_app 之后调用,确保所有模型已映射 try: from app.core.audit_listener import register_audit_listeners with app.app_context(): count = register_audit_listeners(db) app.logger.info(f"✅ 审计监听器注册成功,共绑定 {count} 个模型") except Exception as e: app.logger.error(f"⚠️ 审计监听器注册失败: {e}")