# =========================================== # 班级操行分管理系统 - 后端服务 # # 开发者: Canglan # 联系方式: admin@sea-studio.top # 版权归属: Sea Network Technology Studio # 许可证: MIT License # # 版权所有 © Sea Network Technology Studio # =========================================== from typing import Dict, Any, Optional from datetime import datetime from models.user import UserModel from models.student import StudentModel from models.admin_role import AdminRoleModel from services.log_service import LogService from utils.security import security from utils.jwt_handler import jwt_handler from utils.redis_client import RedisClient from utils.database import execute_update from utils.logger import get_logger logger = get_logger(__name__) class AuthService: """认证服务""" @staticmethod async def login(username: str, password: str, ip: str, user_agent: str = None) -> Dict[str, Any]: """ 用户登录 """ # 检查登录失败次数 attempts = await RedisClient.get(f"login_attempts:{username}") if attempts and int(attempts) >= 5: await LogService.write_login_log(username, 0, ip, user_agent, "登录失败次数过多") return {"success": False, "message": "登录失败次数过多,请5分钟后重试"} # 获取用户信息 user = await UserModel.get_by_username(username) if not user: await RedisClient.set_login_attempts(username) await LogService.write_login_log(username, 0, ip, user_agent, "用户名或密码错误") return {"success": False, "message": "用户名或密码错误"} # 验证密码 if not security.verify_password(password, user["password_hash"]): await RedisClient.set_login_attempts(username) await LogService.write_login_log(username, 0, ip, user_agent, "用户名或密码错误") return {"success": False, "message": "用户名或密码错误"} # 检查账号状态 if user["status"] != 1: await LogService.write_login_log(username, 0, ip, user_agent, "账号已被禁用") return {"success": False, "message": "账号已被禁用"} # 清除登录失败记录 await RedisClient.clear_login_attempts(username) # 更新最后登录信息 await UserModel.update_last_login(user["user_id"], ip) # 获取用户角色(如果是管理员) role = None if user["user_type"] == "admin": admin_role = await AdminRoleModel.get_by_user_id(user["user_id"]) role = admin_role["role_type"] if admin_role else None # 生成Token token = jwt_handler.create_token( user_id=user["user_id"], username=user["username"], user_type=user["user_type"], student_id=user["student_id"], role=role, real_name=user["real_name"] ) # 存储Token到Redis await RedisClient.set_user_token(user["user_id"], token) # 确定跳转路径 redirect = AuthService._get_redirect_path(user["user_type"], role) await LogService.write_login_log(username, 1, ip, user_agent) return { "success": True, "token": token, "user_id": user["user_id"], "username": user["username"], "real_name": user["real_name"], "user_type": user["user_type"], "student_id": user["student_id"], "role": role, "need_change_password": user["need_change_password"] == 1, "redirect": redirect } @staticmethod async def logout(user_id: int) -> Dict[str, Any]: """用户登出""" await RedisClient.delete_user_token(user_id) return {"success": True, "message": "登出成功"} @staticmethod async def change_password(user_id: int, old_password: str, new_password: str, force: bool = False) -> Dict[str, Any]: """修改密码""" # 获取用户信息 user = await UserModel.get_by_user_id(user_id) if not user: return {"success": False, "message": "用户不存在"} # 验证原密码(强制改密时跳过) if not force and not security.verify_password(old_password, user["password_hash"]): return {"success": False, "message": "原密码错误"} # 验证新密码强度 is_valid, msg = security.validate_password_strength(new_password) if not is_valid: return {"success": False, "message": msg} # 更新密码 result = await UserModel.update_password(user_id, new_password) if result: # 清除所有Token await RedisClient.delete_user_token(user_id) return {"success": True, "message": "密码修改成功"} else: return {"success": False, "message": "密码修改失败"} @staticmethod async def get_user_info(user_id: int) -> Optional[Dict[str, Any]]: """获取用户信息""" user = await UserModel.get_by_user_id(user_id) if not user: return None result = { "user_id": user["user_id"], "username": user["username"], "real_name": user["real_name"], "user_type": user["user_type"], "need_change_password": user["need_change_password"] == 1 } # 获取学生信息 if user["student_id"]: student = await StudentModel.get_by_id(user["student_id"]) if student: result["student_no"] = student["student_no"] result["student_name"] = student["name"] result["total_points"] = student["total_points"] # 获取管理员角色 if user["user_type"] == "admin": admin_role = await AdminRoleModel.get_by_user_id(user_id) if admin_role: result["role"] = admin_role["role_type"] return result @staticmethod def _get_redirect_path(user_type: str, role: str = None) -> str: """获取跳转路径""" if user_type == "student": return "/student/dashboard.php" elif user_type == "parent": return "/parent/dashboard.php" else: return "/admin/dashboard.php"