Files
SharedClassManager/backend/services/auth_service.py
2026-04-29 15:20:28 +08:00

176 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, Optional
from datetime import datetime
from models.user import UserModel
from models.student import StudentModel
from models.admin_role import AdminRoleModel
from services.log_service import LogService
from utils.security import security
from utils.jwt_handler import jwt_handler
from utils.redis_client import RedisClient
from utils.database import execute_update
from utils.logger import get_logger
logger = get_logger(__name__)
class AuthService:
"""认证服务"""
@staticmethod
async def login(username: str, password: str, ip: str, user_agent: str = None) -> Dict[str, Any]:
"""
用户登录
"""
# 检查登录失败次数
attempts = await RedisClient.get(f"login_attempts:{username}")
if attempts and int(attempts) >= 5:
await LogService.write_login_log(username, 0, ip, user_agent, "登录失败次数过多")
return {"success": False, "message": "登录失败次数过多请5分钟后重试"}
# 获取用户信息
user = await UserModel.get_by_username(username)
if not user:
await RedisClient.set_login_attempts(username)
await LogService.write_login_log(username, 0, ip, user_agent, "用户名或密码错误")
return {"success": False, "message": "用户名或密码错误"}
# 验证密码
if not security.verify_password(password, user["password_hash"]):
await RedisClient.set_login_attempts(username)
await LogService.write_login_log(username, 0, ip, user_agent, "用户名或密码错误")
return {"success": False, "message": "用户名或密码错误"}
# 检查账号状态
if user["status"] != 1:
await LogService.write_login_log(username, 0, ip, user_agent, "账号已被禁用")
return {"success": False, "message": "账号已被禁用"}
# 清除登录失败记录
await RedisClient.clear_login_attempts(username)
# 更新最后登录信息
await UserModel.update_last_login(user["user_id"], ip)
# 获取用户角色(如果是管理员)
role = None
if user["user_type"] == "admin":
admin_role = await AdminRoleModel.get_by_user_id(user["user_id"])
role = admin_role["role_type"] if admin_role else None
# 生成Token
token = jwt_handler.create_token(
user_id=user["user_id"],
username=user["username"],
user_type=user["user_type"],
student_id=user["student_id"],
role=role,
real_name=user["real_name"]
)
# 存储Token到Redis
await RedisClient.set_user_token(user["user_id"], token)
# 确定跳转路径
redirect = AuthService._get_redirect_path(user["user_type"], role)
await LogService.write_login_log(username, 1, ip, user_agent)
return {
"success": True,
"token": token,
"user_id": user["user_id"],
"username": user["username"],
"real_name": user["real_name"],
"user_type": user["user_type"],
"student_id": user["student_id"],
"role": role,
"need_change_password": user["need_change_password"] == 1,
"redirect": redirect
}
@staticmethod
async def logout(user_id: int) -> Dict[str, Any]:
"""用户登出"""
await RedisClient.delete_user_token(user_id)
return {"success": True, "message": "登出成功"}
@staticmethod
async def change_password(user_id: int, old_password: str, new_password: str, force: bool = False) -> Dict[str, Any]:
"""修改密码"""
# 获取用户信息
user = await UserModel.get_by_user_id(user_id)
if not user:
return {"success": False, "message": "用户不存在"}
# 验证原密码(强制改密时跳过)
if not force and not security.verify_password(old_password, user["password_hash"]):
return {"success": False, "message": "原密码错误"}
# 验证新密码强度
is_valid, msg = security.validate_password_strength(new_password)
if not is_valid:
return {"success": False, "message": msg}
# 更新密码
result = await UserModel.update_password(user_id, new_password)
if result:
# 清除所有Token
await RedisClient.delete_user_token(user_id)
return {"success": True, "message": "密码修改成功"}
else:
return {"success": False, "message": "密码修改失败"}
@staticmethod
async def get_user_info(user_id: int) -> Optional[Dict[str, Any]]:
"""获取用户信息"""
user = await UserModel.get_by_user_id(user_id)
if not user:
return None
result = {
"user_id": user["user_id"],
"username": user["username"],
"real_name": user["real_name"],
"user_type": user["user_type"],
"need_change_password": user["need_change_password"] == 1
}
# 获取学生信息
if user["student_id"]:
student = await StudentModel.get_by_id(user["student_id"])
if student:
result["student_no"] = student["student_no"]
result["student_name"] = student["name"]
result["total_points"] = student["total_points"]
# 获取管理员角色
if user["user_type"] == "admin":
admin_role = await AdminRoleModel.get_by_user_id(user_id)
if admin_role:
result["role"] = admin_role["role_type"]
return result
@staticmethod
def _get_redirect_path(user_type: str, role: str = None) -> str:
"""获取跳转路径"""
if user_type == "student":
return "/student/dashboard.php"
elif user_type == "parent":
return "/parent/dashboard.php"
else:
return "/admin/dashboard.php"