diff --git a/.cospec/plan/changes/fix-admin-multi-issues/task.md b/.cospec/plan/changes/fix-admin-multi-issues/task.md index adfefb8..b20db31 100644 --- a/.cospec/plan/changes/fix-admin-multi-issues/task.md +++ b/.cospec/plan/changes/fix-admin-multi-issues/task.md @@ -155,6 +155,7 @@ - main.py: 注册 AuthMiddleware 为全局中间件(先注册后执行),CORS 在 Auth 之后注册(后注册先执行) - main.py: 添加 CORS 配置启动日志和空值警告 【中间件执行顺序】CORS → Auth → access_log → 路由 + 【后续修复】将 AuthMiddleware 从 BaseHTTPMiddleware 改为纯 ASGI 中间件,解决 BaseHTTPMiddleware 提前返回响应时 CORS 头丢失的问题 【目标对象】`frontend/admin/students.php` 【修改目的】除班主任角色外,隐藏家长手机号列的显示内容,保护隐私 【修改方式】在表头 HTML 和 JS 渲染处添加 `$role` 判断 diff --git a/backend/middleware/auth_middleware.py b/backend/middleware/auth_middleware.py index 07bf971..47e4333 100644 --- a/backend/middleware/auth_middleware.py +++ b/backend/middleware/auth_middleware.py @@ -9,15 +9,13 @@ # 版权所有 © Sea Network Technology Studio # =========================================== -from fastapi import Request, HTTPException -from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials -from starlette.middleware.base import BaseHTTPMiddleware -from typing import Optional, Dict, Any, Tuple +from fastapi import Request +from typing import Dict, Any import re +import json from utils.jwt_handler import jwt_handler from utils.redis_client import RedisClient -from utils.response import unauthorized_response from utils.logger import get_logger logger = get_logger(__name__) @@ -45,58 +43,105 @@ def is_public_path(path: str) -> bool: return False -class AuthMiddleware(BaseHTTPMiddleware): - """JWT认证中间件""" +class AuthMiddleware: + """JWT认证中间件(纯ASGI实现,兼容CORS)""" - async def dispatch(self, request: Request, call_next): - # OPTIONS 预检请求跳过认证 - if request.method == "OPTIONS": - return await call_next(request) + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] not in ("http", "websocket"): + await self.app(scope, receive, send) + return - path = request.url.path + # 从 scope 中获取请求信息 + path = scope.get("path", "") + method = scope.get("method", "") + + # OPTIONS 预检请求跳过认证 + if method == "OPTIONS": + await self.app(scope, receive, send) + return # 公开路径跳过认证 if is_public_path(path): - return await call_next(request) + await self.app(scope, receive, send) + return - # 获取Authorization头 - auth_header = request.headers.get("Authorization") + # 从 headers 中获取 Authorization + headers = dict(scope.get("headers", [])) + auth_header = headers.get(b"authorization", b"").decode("utf-8") if b"authorization" in headers else None if not auth_header: - return unauthorized_response("缺少认证令牌") + await self._send_unauthorized(send, "缺少认证令牌") + return # 解析Bearer Token try: scheme, token = auth_header.split() if scheme.lower() != "bearer": - return unauthorized_response("认证格式错误") + await self._send_unauthorized(send, "认证格式错误") + return except ValueError: - return unauthorized_response("认证格式错误") + await self._send_unauthorized(send, "认证格式错误") + return # 验证Token - payload = jwt_handler.verify_token(token) - if not payload: - return unauthorized_response("令牌无效或已过期") + try: + payload = jwt_handler.verify_token(token) + if not payload: + await self._send_unauthorized(send, "令牌无效或已过期") + return + + # 验证Redis中的Token + user_id = payload.get("user_id") + stored_token = await RedisClient.get_user_token(user_id) + + if not stored_token or stored_token != token: + await self._send_unauthorized(send, "令牌已失效,请重新登录") + return + + # 将用户信息存储到scope的state中(与request.state兼容) + if "state" not in scope: + scope["state"] = {} + scope["state"]["user_id"] = payload.get("user_id") + scope["state"]["username"] = payload.get("username") + scope["state"]["user_type"] = payload.get("user_type") + scope["state"]["student_id"] = payload.get("student_id") + scope["state"]["role"] = payload.get("role") + + # 刷新Token过期时间 + from config import settings + await RedisClient.expire(f"user_token:{user_id}", settings.JWT_EXPIRE_MINUTES * 60) + + except Exception as e: + logger.error(f"认证中间件异常: {e}", exc_info=True) + await self._send_unauthorized(send, "认证服务异常,请稍后重试") + return - # 验证Redis中的Token - user_id = payload.get("user_id") - stored_token = await RedisClient.get_user_token(user_id) + await self.app(scope, receive, send) + + async def _send_unauthorized(self, send, message: str): + """发送401未授权响应""" + body = json.dumps({ + "success": False, + "code": 401, + "message": message, + "data": None + }).encode("utf-8") - if not stored_token or stored_token != token: - return unauthorized_response("令牌已失效,请重新登录") - - # 将用户信息存储到request.state - request.state.user_id = payload.get("user_id") - request.state.username = payload.get("username") - request.state.user_type = payload.get("user_type") - request.state.student_id = payload.get("student_id") - request.state.role = payload.get("role") - - # 刷新Token过期时间 - from config import settings - await RedisClient.expire(f"user_token:{user_id}", settings.JWT_EXPIRE_MINUTES * 60) - - return await call_next(request) + await send({ + "type": "http.response.start", + "status": 401, + "headers": [ + [b"content-type", b"application/json"], + [b"content-length", str(len(body)).encode()], + ], + }) + await send({ + "type": "http.response.body", + "body": body, + }) async def get_current_user(request: Request) -> Dict[str, Any]: