Files
ClassManager/backend/services/admin_service.py
2026-06-08 10:40:59 +08:00

363 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ===========================================
# 班级操行分管理系统 - 管理员服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from utils.database import execute_query, execute_one, execute_update
from models.user import UserModel
from models.student import StudentModel
from models.admin_role import AdminRoleModel
from utils.security import security
from utils.logger import get_logger
logger = get_logger(__name__)
class AdminService:
"""管理员服务"""
@staticmethod
async def get_students(
page: int = 1,
page_size: int = 20,
search: str = None,
dormitory_number: str = None
) -> Dict[str, Any]:
"""获取所有学生列表"""
offset = (page - 1) * page_size
try:
sql = """
SELECT student_id, student_no, name, total_points, parent_phone, dormitory_number, status
FROM students
WHERE status = 1
"""
params = []
if search:
sql += " AND (student_no LIKE %s OR name LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
if dormitory_number:
sql += " AND dormitory_number = %s"
params.append(dormitory_number)
sql += " ORDER BY student_no LIMIT %s OFFSET %s"
params.extend([page_size, offset])
students = await execute_query(sql, tuple(params))
has_dormitory = True
except Exception as e:
logger.warning(f"dormitory_number 列不存在,使用不含该字段的查询: {e}")
sql = """
SELECT student_id, student_no, name, total_points, parent_phone, status
FROM students
WHERE status = 1
"""
params = []
if search:
sql += " AND (student_no LIKE %s OR name LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
sql += " ORDER BY student_no LIMIT %s OFFSET %s"
params.extend([page_size, offset])
students = await execute_query(sql, tuple(params))
has_dormitory = False
# 获取总数
count_sql = "SELECT COUNT(*) as total FROM students WHERE status = 1"
count_params = []
if search:
count_sql += " AND (student_no LIKE %s OR name LIKE %s)"
count_params.extend([f"%{search}%", f"%{search}%"])
if dormitory_number and has_dormitory:
count_sql += " AND dormitory_number = %s"
count_params.append(dormitory_number)
if count_params:
total_result = await execute_one(count_sql, tuple(count_params))
else:
total_result = await execute_one(count_sql)
total = total_result["total"] if total_result else 0
return {
"students": students,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
@staticmethod
async def import_students(
students: List[Dict],
operator_id: int,
initial_points: int = 60
) -> Dict[str, Any]:
"""批量导入学生(优化版:预查重 + 批量操作)"""
results = []
success_count = 0
# 预查重:一次性获取所有已存在的学号和手机号
existing_students = await StudentModel.get_all()
existing_student_nos = {s["student_no"] for s in existing_students}
all_users = await execute_query("SELECT username FROM users WHERE status = 1")
existing_usernames = {u["username"] for u in all_users}
for student in students:
try:
student_no = student.get("student_no", "").strip()
name = student.get("name", "").strip()
parent_phone = student.get("parent_phone", "").strip()
dormitory_number = student.get("dormitory_number", "").strip() if student.get("dormitory_number") else None
password = student.get("password", "").strip()
if not student_no or not name:
results.append({"student_no": student_no, "success": False, "error": "学号或姓名不能为空"})
continue
if not security.validate_student_no(student_no):
results.append({"student_no": student_no, "success": False, "error": "学号格式错误"})
continue
if parent_phone and not security.validate_phone(parent_phone):
results.append({"student_no": student_no, "success": False, "error": "手机号格式错误"})
continue
if student_no in existing_student_nos:
results.append({"student_no": student_no, "success": False, "error": "学号已存在"})
continue
init_password = password if password else "123456"
# 创建学生记录
student_id = await StudentModel.create(
student_no=student_no,
name=name,
parent_phone=parent_phone if parent_phone else None,
dormitory_number=dormitory_number,
initial_points=initial_points
)
existing_student_nos.add(student_no)
# 创建学生登录账号
await UserModel.create_student(
username=student_no,
password=init_password,
real_name=name,
student_id=student_id
)
existing_usernames.add(student_no)
# 创建家长账号(如果手机号存在且未被注册)
if parent_phone and parent_phone not in existing_usernames:
await UserModel.create_parent(
username=parent_phone,
password=init_password,
real_name=f"{name}家长",
student_id=student_id
)
existing_usernames.add(parent_phone)
results.append({"student_no": student_no, "success": True, "student_id": student_id})
success_count += 1
logger.info(f"用户[{operator_id}] 导入学生: {student_no} - {name}")
except Exception as e:
logger.error(f"导入学生失败: {student.get('student_no', '?')} - {str(e)}")
results.append({
"student_no": student.get("student_no", ""),
"success": False,
"error": f"导入异常: {str(e)}"
})
return {
"success": True,
"total": len(students),
"success_count": success_count,
"failed_count": len(students) - success_count,
"results": results
}
@staticmethod
async def add_student(
student_no: str,
name: str,
parent_phone: Optional[str],
operator_id: int,
initial_points: int = 60,
dormitory_number: Optional[str] = None
) -> Dict[str, Any]:
"""新增学生"""
if not security.validate_student_no(student_no):
return {"success": False, "message": "学号格式错误"}
if parent_phone and not security.validate_phone(parent_phone):
return {"success": False, "message": "手机号格式错误"}
existing = await StudentModel.get_by_student_no(student_no)
if existing:
return {"success": False, "message": "学号已存在"}
student_id = await StudentModel.create(
student_no=student_no,
name=name,
parent_phone=parent_phone if parent_phone else None,
dormitory_number=dormitory_number,
initial_points=initial_points
)
await UserModel.create_student(
username=student_no,
password="123456",
real_name=name,
student_id=student_id
)
if parent_phone:
parent_exists = await UserModel.get_by_username(parent_phone)
if not parent_exists:
await UserModel.create_parent(
username=parent_phone,
password="123456",
real_name=f"{name}家长",
student_id=student_id
)
logger.info(f"用户[{operator_id}] 新增学生: {student_no} - {name}")
return {"success": True, "student_id": student_id}
@staticmethod
async def add_admin(
username: str,
real_name: str,
password: Optional[str],
role_type: str,
operator_id: int
) -> Dict[str, Any]:
"""添加管理员(支持重新激活已删除的管理员)"""
# 检查用户名是否存在(含已禁用用户,因 username 有 UNIQUE 约束)
existing = await UserModel.get_by_username_any(username)
if existing:
if existing.get('status') == 1:
return {"success": False, "message": "用户名已存在"}
# 用户已被软删除status=0重新激活
await UserModel.update_status(existing['user_id'], 1)
await UserModel.update_real_name(existing['user_id'], real_name)
user_id = existing['user_id']
# 检查是否已有管理员角色
existing_role = await AdminRoleModel.get_by_user_id(user_id)
if existing_role:
await AdminRoleModel.update_role(user_id, role_type)
else:
await AdminRoleModel.create(user_id=user_id, role_type=role_type, subject_id=None)
logger.info(f"用户[{operator_id}] 重新激活管理员: {username} ({role_type})")
return {
"success": True,
"user_id": user_id,
"username": username,
"password": None, # 重新激活不返回密码
"role_type": role_type
}
if not password:
password = security.generate_random_password()
user_id = await UserModel.create_admin(
username=username,
password=password,
real_name=real_name
)
await AdminRoleModel.create(
user_id=user_id,
role_type=role_type,
subject_id=None
)
logger.info(f"用户[{operator_id}] 添加管理员: {username} ({role_type})")
return {
"success": True,
"user_id": user_id,
"username": username,
"password": password,
"role_type": role_type
}
@staticmethod
async def get_admins() -> Dict[str, Any]:
"""获取管理员列表"""
admins = await AdminRoleModel.get_all()
return {"admins": admins}
@staticmethod
async def update_student(student_id: int, name: str = None, parent_phone: str = None, dormitory_number: str = None) -> Dict[str, Any]:
"""编辑学生信息"""
try:
student = await StudentModel.get_by_id(student_id)
if not student:
return {"success": False, "message": "学生不存在"}
result = await StudentModel.update(student_id, name=name, parent_phone=parent_phone, dormitory_number=dormitory_number)
if result:
return {"success": True, "message": "学生信息更新成功"}
return {"success": False, "message": "更新失败"}
except Exception as e:
logger.error(f"更新学生信息失败: {e}")
return {"success": False, "message": f"更新失败: {str(e)}"}
@staticmethod
async def delete_student(student_id: int) -> Dict[str, Any]:
"""删除学生(软删除)"""
try:
student = await StudentModel.get_by_id(student_id)
if not student:
return {"success": False, "message": "学生不存在"}
result = await StudentModel.delete(student_id)
if result:
user = await execute_one(
"SELECT user_id FROM users WHERE student_id = %s AND user_type = 'student'",
(student_id,)
)
if user:
await UserModel.update_status(user['user_id'], 0)
return {"success": True, "message": "学生删除成功"}
return {"success": False, "message": "删除失败"}
except Exception as e:
logger.error(f"删除学生失败: {e}")
return {"success": False, "message": f"删除失败: {str(e)}"}
@staticmethod
async def reset_student_password(student_id: int, new_password: str) -> Dict[str, Any]:
"""重置学生密码"""
try:
user = await execute_one(
"SELECT user_id FROM users WHERE student_id = %s AND user_type = 'student'",
(student_id,)
)
if not user:
return {"success": False, "message": "未找到对应的用户账号"}
# UserModel.update_password 内部会进行哈希,无需预先哈希
result = await UserModel.update_password(user['user_id'], new_password)
if result:
await execute_update(
"UPDATE users SET need_change_password = 1 WHERE user_id = %s",
(user['user_id'],)
)
return {"success": True, "message": "密码重置成功"}
return {"success": False, "message": "密码重置失败"}
except Exception as e:
logger.error(f"重置学生密码失败: {e}")
return {"success": False, "message": f"重置失败: {str(e)}"}