Files
ClassManager/backend/middleware/permission.py
2026-04-14 19:18:11 +08:00

202 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ===========================================
# 班级操行分管理系统 - 权限验证中间件
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from fastapi import Request
from typing import List, Optional, Callable, Dict, Any
from functools import wraps
from utils.response import forbidden_response
from utils.database import execute_one
from utils.logger import get_logger
from models.admin_role import AdminRoleModel
logger = get_logger(__name__)
async def get_current_user(request: Request) -> Dict[str, Any]:
"""获取当前登录用户信息"""
return {
"user_id": getattr(request.state, 'user_id', None),
"username": getattr(request.state, 'username', None),
"user_type": getattr(request.state, 'user_type', None),
"student_id": getattr(request.state, 'student_id', None),
"role": getattr(request.state, 'role', None)
}
async def get_current_user_id(request: Request) -> int:
"""获取当前用户ID"""
return getattr(request.state, 'user_id', None)
class PermissionChecker:
"""权限检查器"""
@staticmethod
async def get_user_role(user_id: int) -> Optional[str]:
"""获取用户的管理员角色"""
sql = "SELECT role_type FROM admin_roles WHERE user_id = %s LIMIT 1"
result = await execute_one(sql, (user_id,))
return result["role_type"] if result else None
@staticmethod
async def check_is_teacher(user_id: int) -> bool:
"""检查是否为班主任"""
role = await PermissionChecker.get_user_role(user_id)
return role == "班主任"
@staticmethod
async def check_is_monitor(user_id: int) -> bool:
"""检查是否为班长"""
role = await PermissionChecker.get_user_role(user_id)
return role == "班长"
@staticmethod
async def check_is_study_commissioner(user_id: int) -> bool:
"""检查是否为学习委员"""
role = await PermissionChecker.get_user_role(user_id)
return role == "学习委员"
@staticmethod
async def check_is_attendance_rep(user_id: int) -> bool:
"""检查是否为考勤委员"""
role = await PermissionChecker.get_user_role(user_id)
return role == "考勤委员"
@staticmethod
async def check_is_labor_rep(user_id: int) -> bool:
"""检查是否为劳动委员"""
role = await PermissionChecker.get_user_role(user_id)
return role == "劳动委员"
@staticmethod
async def check_can_manage_subjects(user_id: int) -> bool:
"""检查是否可以管理科目(班主任或学习委员)"""
role = await PermissionChecker.get_user_role(user_id)
return role in ["班主任", "学习委员"]
@staticmethod
async def get_user_class_id(user_id: int) -> Optional[int]:
"""
获取用户关联的班级ID
单班级系统固定返回1
"""
# 本系统为单班级设计class_id 固定为 1
return 1
@staticmethod
async def get_user_subject_ids(user_id: int) -> List[int]:
"""获取用户管理的科目ID列表"""
admin_role = await AdminRoleModel.get_by_user_id(user_id)
if not admin_role:
return []
# 班主任可以管理所有科目
if admin_role["role_type"] == "班主任":
from models.subject import SubjectModel
subjects = await SubjectModel.get_all(is_active=True)
return [s["subject_id"] for s in subjects]
# 其他角色返回关联的科目
if admin_role.get("subject_id"):
return [admin_role["subject_id"]]
return []
@staticmethod
async def check_can_manage_student(user_id: int, student_id: int) -> bool:
"""检查是否可以管理指定学生(管理员默认可管理所有学生)"""
role = await PermissionChecker.get_user_role(user_id)
return role is not None
@staticmethod
async def check_can_revoke(user_id: int, record_id: int) -> bool:
"""
检查是否可以撤销扣分记录
班主任:可以撤销任何记录
班长:可以撤销任何记录
其他:只能撤销自己的记录
"""
sql = "SELECT recorder_id FROM conduct_records WHERE record_id = %s"
record = await execute_one(sql, (record_id,))
if not record:
return False
role = await PermissionChecker.get_user_role(user_id)
if role in ["班主任", "班长"]:
return True
return record["recorder_id"] == user_id
def require_auth(func: Callable):
"""需要认证的装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
return await func(*args, **kwargs)
return wrapper
def require_role(roles: List[str]):
"""需要特定角色的装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
user_id = request.state.user_id
user_role = await PermissionChecker.get_user_role(user_id)
if user_role not in roles:
return forbidden_response(f"需要{','.join(roles)}权限")
return await func(*args, **kwargs)
return wrapper
return decorator
def require_teacher(func: Callable):
"""需要班主任权限的装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
is_teacher = await PermissionChecker.check_is_teacher(request.state.user_id)
if not is_teacher:
return forbidden_response("需要班主任权限")
return await func(*args, **kwargs)
return wrapper
def require_monitor(func: Callable):
"""需要班长权限的装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
is_monitor = await PermissionChecker.check_is_monitor(request.state.user_id)
if not is_monitor:
return forbidden_response("需要班长权限")
return await func(*args, **kwargs)
return wrapper
def require_study_commissioner(func: Callable):
"""需要学习委员权限的装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
is_study = await PermissionChecker.check_is_study_commissioner(request.state.user_id)
if not is_study:
return forbidden_response("需要学习委员权限")
return await func(*args, **kwargs)
return wrapper