# =========================================== # 班级操行分管理系统 - 权限验证中间件 # # 开发者: Canglan # 联系方式: admin@sea-studio.top # 版权归属: Sea Network Technology Studio # 许可证: MIT License # # 版权所有 © Sea Network Technology Studio # =========================================== from fastapi import Request from typing import List, Optional, Callable, Dict, Any from functools import wraps from utils.response import forbidden_response from utils.database import execute_one from utils.logger import get_logger logger = get_logger(__name__) async def get_current_user(request: Request) -> Dict[str, Any]: """获取当前登录用户信息""" return { "user_id": getattr(request.state, 'user_id', None), "username": getattr(request.state, 'username', None), "user_type": getattr(request.state, 'user_type', None), "student_id": getattr(request.state, 'student_id', None), "role": getattr(request.state, 'role', None) } async def get_current_user_id(request: Request) -> int: """获取当前用户ID""" return getattr(request.state, 'user_id', None) class PermissionChecker: """权限检查器""" @staticmethod async def get_user_role(user_id: int) -> Optional[str]: """获取用户的管理员角色""" sql = "SELECT role_type FROM admin_roles WHERE user_id = %s LIMIT 1" result = await execute_one(sql, (user_id,)) return result["role_type"] if result else None @staticmethod async def check_is_teacher(user_id: int) -> bool: """检查是否为班主任""" role = await PermissionChecker.get_user_role(user_id) return role == "班主任" @staticmethod async def check_is_monitor(user_id: int) -> bool: """检查是否为班长""" role = await PermissionChecker.get_user_role(user_id) return role == "班长" @staticmethod async def check_is_study_commissioner(user_id: int) -> bool: """检查是否为学习委员""" role = await PermissionChecker.get_user_role(user_id) return role == "学习委员" @staticmethod async def check_is_attendance_rep(user_id: int) -> bool: """检查是否为考勤委员""" role = await PermissionChecker.get_user_role(user_id) return role == "考勤委员" @staticmethod async def check_is_labor_rep(user_id: int) -> bool: """检查是否为劳动委员""" role = await PermissionChecker.get_user_role(user_id) return role == "劳动委员" @staticmethod async def check_can_manage_subjects(user_id: int) -> bool: """检查是否可以管理科目(班主任或学习委员)""" role = await PermissionChecker.get_user_role(user_id) return role in ["班主任", "学习委员"] @staticmethod async def get_user_class_id(user_id: int) -> Optional[int]: """ 获取用户关联的班级ID 单班级系统,固定返回1 """ # 本系统为单班级设计,class_id 固定为 1 return 1 @staticmethod async def check_can_revoke(user_id: int, record_id: int) -> bool: """ 检查是否可以撤销扣分记录 班主任:可以撤销任何记录 班长:可以撤销任何记录 其他:只能撤销自己的记录 """ sql = "SELECT recorder_id FROM conduct_records WHERE record_id = %s" record = await execute_one(sql, (record_id,)) if not record: return False role = await PermissionChecker.get_user_role(user_id) if role in ["班主任", "班长"]: return True return record["recorder_id"] == user_id def require_auth(func: Callable): """需要认证的装饰器""" @wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') if not request or not hasattr(request.state, 'user_id'): return forbidden_response("请先登录") return await func(*args, **kwargs) return wrapper def require_role(roles: List[str]): """需要特定角色的装饰器""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') if not request or not hasattr(request.state, 'user_id'): return forbidden_response("请先登录") user_id = request.state.user_id user_role = await PermissionChecker.get_user_role(user_id) if user_role not in roles: return forbidden_response(f"需要{','.join(roles)}权限") return await func(*args, **kwargs) return wrapper return decorator def require_teacher(func: Callable): """需要班主任权限的装饰器""" @wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') if not request or not hasattr(request.state, 'user_id'): return forbidden_response("请先登录") is_teacher = await PermissionChecker.check_is_teacher(request.state.user_id) if not is_teacher: return forbidden_response("需要班主任权限") return await func(*args, **kwargs) return wrapper def require_monitor(func: Callable): """需要班长权限的装饰器""" @wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') if not request or not hasattr(request.state, 'user_id'): return forbidden_response("请先登录") is_monitor = await PermissionChecker.check_is_monitor(request.state.user_id) if not is_monitor: return forbidden_response("需要班长权限") return await func(*args, **kwargs) return wrapper def require_study_commissioner(func: Callable): """需要学习委员权限的装饰器""" @wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') if not request or not hasattr(request.state, 'user_id'): return forbidden_response("请先登录") is_study = await PermissionChecker.check_is_study_commissioner(request.state.user_id) if not is_study: return forbidden_response("需要学习委员权限") return await func(*args, **kwargs) return wrapper