Files
ClassManager/backend/middleware/permission.py
2026-04-10 14:18:07 +08:00

122 lines
4.6 KiB
Python

# ===========================================
# 班级操行分管理系统 - 权限验证中间件
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from fastapi import Request
from typing import List, Optional, Callable, Dict, Any
from functools import wraps
from utils.response import forbidden_response
from utils.database import execute_one
async def get_current_user(request: Request) -> Dict[str, Any]:
return {
"user_id": getattr(request.state, 'user_id', None),
"username": getattr(request.state, 'username', None),
"user_type": getattr(request.state, 'user_type', None),
"student_id": getattr(request.state, 'student_id', None),
"role": getattr(request.state, 'role', None)
}
async def get_current_user_id(request: Request) -> int:
return getattr(request.state, 'user_id', None)
class PermissionChecker:
@staticmethod
async def get_user_role(user_id: int) -> Optional[str]:
sql = "SELECT role_type FROM admin_roles WHERE user_id = %s LIMIT 1"
result = await execute_one(sql, (user_id,))
return result["role_type"] if result else None
@staticmethod
async def check_is_teacher(user_id: int) -> bool:
role = await PermissionChecker.get_user_role(user_id)
return role == "班主任"
@staticmethod
async def check_is_monitor(user_id: int) -> bool:
role = await PermissionChecker.get_user_role(user_id)
return role == "班长"
@staticmethod
async def check_is_study_commissioner(user_id: int) -> bool:
role = await PermissionChecker.get_user_role(user_id)
return role == "学习委员"
@staticmethod
async def check_is_attendance_rep(user_id: int) -> bool:
role = await PermissionChecker.get_user_role(user_id)
return role == "考勤委员"
@staticmethod
async def check_is_labor_rep(user_id: int) -> bool:
role = await PermissionChecker.get_user_role(user_id)
return role == "劳动委员"
@staticmethod
async def check_can_manage_subjects(user_id: int) -> bool:
role = await PermissionChecker.get_user_role(user_id)
return role in ["班主任", "学习委员"]
@staticmethod
async def check_can_revoke(user_id: int, record_id: int) -> bool:
sql = "SELECT recorder_id FROM conduct_records WHERE record_id = %s"
record = await execute_one(sql, (record_id,))
if not record:
return False
role = await PermissionChecker.get_user_role(user_id)
if role in ["班主任", "班长"]:
return True
return record["recorder_id"] == user_id
def require_auth(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
return await func(*args, **kwargs)
return wrapper
def require_role(roles: List[str]):
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
user_id = request.state.user_id
user_role = await PermissionChecker.get_user_role(user_id)
if user_role not in roles:
return forbidden_response(f"需要{','.join(roles)}权限")
return await func(*args, **kwargs)
return wrapper
return decorator
def require_teacher(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
if not await PermissionChecker.check_is_teacher(request.state.user_id):
return forbidden_response("需要班主任权限")
return await func(*args, **kwargs)
return wrapper
def require_study_commissioner(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request or not hasattr(request.state, 'user_id'):
return forbidden_response("请先登录")
if not await PermissionChecker.check_is_study_commissioner(request.state.user_id):
return forbidden_response("需要学习委员权限")
return await func(*args, **kwargs)
return wrapper