213 lines
7.2 KiB
Python
213 lines
7.2 KiB
Python
# ===========================================
|
|
# 班级操行分管理系统 - 权限验证中间件
|
|
#
|
|
# 开发者: Canglan
|
|
# 联系方式: admin@sea-studio.top
|
|
# 版权归属: Sea Network Technology Studio
|
|
# 许可证: MIT License
|
|
#
|
|
# 版权所有 © Sea Network Technology Studio
|
|
# ===========================================
|
|
|
|
from fastapi import Request, HTTPException
|
|
from typing import List, Optional, Callable, Dict, Any
|
|
from functools import wraps
|
|
|
|
from utils.response import forbidden_response
|
|
from utils.database import execute_one
|
|
from utils.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
async def get_current_user(request: Request) -> Dict[str, Any]:
|
|
"""获取当前登录用户信息"""
|
|
return {
|
|
"user_id": getattr(request.state, 'user_id', None),
|
|
"username": getattr(request.state, 'username', None),
|
|
"user_type": getattr(request.state, 'user_type', None),
|
|
"student_id": getattr(request.state, 'student_id', None),
|
|
"role": getattr(request.state, 'role', None)
|
|
}
|
|
|
|
|
|
async def get_current_user_id(request: Request) -> int:
|
|
"""获取当前用户ID"""
|
|
return getattr(request.state, 'user_id', None)
|
|
|
|
|
|
class PermissionChecker:
|
|
"""权限检查器"""
|
|
|
|
@staticmethod
|
|
async def get_user_role(user_id: int) -> Optional[str]:
|
|
"""获取用户的管理员角色"""
|
|
sql = """
|
|
SELECT role_type FROM admin_roles
|
|
WHERE user_id = %s
|
|
LIMIT 1
|
|
"""
|
|
result = await execute_one(sql, (user_id,))
|
|
return result["role_type"] if result else None
|
|
|
|
@staticmethod
|
|
async def get_user_class_id(user_id: int) -> Optional[int]:
|
|
"""获取用户管理的班级ID"""
|
|
sql = """
|
|
SELECT class_id FROM admin_roles
|
|
WHERE user_id = %s
|
|
LIMIT 1
|
|
"""
|
|
result = await execute_one(sql, (user_id,))
|
|
return result["class_id"] if result else None
|
|
|
|
@staticmethod
|
|
async def get_user_subject_ids(user_id: int) -> List[int]:
|
|
"""获取科代表管理的科目ID列表"""
|
|
sql = """
|
|
SELECT subject_id FROM admin_roles
|
|
WHERE user_id = %s AND role_type = '科代表'
|
|
"""
|
|
results = await execute_one(sql, (user_id,))
|
|
if results:
|
|
return [r["subject_id"] for r in results] if isinstance(results, list) else [results["subject_id"]]
|
|
return []
|
|
|
|
@staticmethod
|
|
async def check_is_teacher(user_id: int) -> bool:
|
|
"""检查是否为班主任"""
|
|
role = await PermissionChecker.get_user_role(user_id)
|
|
return role == "班主任"
|
|
|
|
@staticmethod
|
|
async def check_is_monitor(user_id: int) -> bool:
|
|
"""检查是否为班长"""
|
|
role = await PermissionChecker.get_user_role(user_id)
|
|
return role == "班长"
|
|
|
|
@staticmethod
|
|
async def check_is_subject_rep(user_id: int, subject_id: int = None) -> bool:
|
|
"""检查是否为科代表"""
|
|
role = await PermissionChecker.get_user_role(user_id)
|
|
if role != "科代表":
|
|
return False
|
|
if subject_id:
|
|
subject_ids = await PermissionChecker.get_user_subject_ids(user_id)
|
|
return subject_id in subject_ids
|
|
return True
|
|
|
|
@staticmethod
|
|
async def check_is_attendance_rep(user_id: int) -> bool:
|
|
"""检查是否为考勤委员"""
|
|
role = await PermissionChecker.get_user_role(user_id)
|
|
return role == "考勤委员"
|
|
|
|
@staticmethod
|
|
async def check_is_labor_rep(user_id: int) -> bool:
|
|
"""检查是否为劳动委员"""
|
|
role = await PermissionChecker.get_user_role(user_id)
|
|
return role == "劳动委员"
|
|
|
|
@staticmethod
|
|
async def check_can_revoke(user_id: int, record_id: int) -> bool:
|
|
"""
|
|
检查是否可以撤销扣分记录
|
|
班主任:可以撤销任何记录
|
|
班长:可以撤销任何记录
|
|
其他:只能撤销自己的记录
|
|
"""
|
|
# 获取记录信息
|
|
sql = "SELECT recorder_id FROM conduct_records WHERE record_id = %s"
|
|
record = await execute_one(sql, (record_id,))
|
|
if not record:
|
|
return False
|
|
|
|
role = await PermissionChecker.get_user_role(user_id)
|
|
|
|
# 班主任或班长可以撤销任何记录
|
|
if role in ["班主任", "班长"]:
|
|
return True
|
|
|
|
# 其他人只能撤销自己的记录
|
|
return record["recorder_id"] == user_id
|
|
|
|
@staticmethod
|
|
async def check_can_manage_student(user_id: int, student_id: int) -> bool:
|
|
"""检查是否可以管理该学生(同班级)"""
|
|
# 获取学生班级
|
|
sql = "SELECT class_id FROM students WHERE student_id = %s"
|
|
student = await execute_one(sql, (student_id,))
|
|
if not student:
|
|
return False
|
|
|
|
# 获取管理员管理的班级
|
|
admin_class_id = await PermissionChecker.get_user_class_id(user_id)
|
|
|
|
return admin_class_id == student["class_id"]
|
|
|
|
|
|
def require_auth(func: Callable):
|
|
"""需要认证的装饰器"""
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
request = kwargs.get('request')
|
|
if not request or not hasattr(request, 'state') or not hasattr(request.state, 'user_id'):
|
|
return forbidden_response("请先登录")
|
|
return await func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def require_role(roles: List[str]):
|
|
"""需要特定角色的装饰器"""
|
|
def decorator(func: Callable):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
request = kwargs.get('request')
|
|
if not request or not hasattr(request, 'state'):
|
|
return forbidden_response("请先登录")
|
|
|
|
user_id = request.state.user_id
|
|
user_role = await PermissionChecker.get_user_role(user_id)
|
|
|
|
if user_role not in roles:
|
|
return forbidden_response(f"需要{','.join(roles)}权限")
|
|
|
|
return await func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_teacher(func: Callable):
|
|
"""需要班主任权限的装饰器"""
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
request = kwargs.get('request')
|
|
if not request or not hasattr(request, 'state'):
|
|
return forbidden_response("请先登录")
|
|
|
|
user_id = request.state.user_id
|
|
is_teacher = await PermissionChecker.check_is_teacher(user_id)
|
|
|
|
if not is_teacher:
|
|
return forbidden_response("需要班主任权限")
|
|
|
|
return await func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def require_monitor(func: Callable):
|
|
"""需要班长权限的装饰器"""
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
request = kwargs.get('request')
|
|
if not request or not hasattr(request, 'state'):
|
|
return forbidden_response("请先登录")
|
|
|
|
user_id = request.state.user_id
|
|
is_monitor = await PermissionChecker.check_is_monitor(user_id)
|
|
|
|
if not is_monitor:
|
|
return forbidden_response("需要班长权限")
|
|
|
|
return await func(*args, **kwargs)
|
|
return wrapper |