Files
ClassManager/backend/services/admin_service.py
2026-04-21 16:40:35 +08:00

248 lines
8.8 KiB
Python

# ===========================================
# 班级操行分管理系统 - 管理员服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from utils.database import execute_query, execute_one
from models.user import UserModel
from models.student import StudentModel
from models.admin_role import AdminRoleModel
from utils.security import security
from utils.logger import get_logger
logger = get_logger(__name__)
class AdminService:
"""管理员服务"""
@staticmethod
async def get_students(
page: int = 1,
page_size: int = 20,
search: str = None
) -> Dict[str, Any]:
"""获取所有学生列表"""
offset = (page - 1) * page_size
sql = """
SELECT student_id, student_no, name, total_points, parent_phone, status
FROM students
WHERE status = 1
"""
params = []
if search:
sql += " AND (student_no LIKE %s OR name LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
sql += " ORDER BY student_no LIMIT %s OFFSET %s"
params.extend([page_size, offset])
students = await execute_query(sql, tuple(params))
# 获取总数
count_sql = "SELECT COUNT(*) as total FROM students WHERE status = 1"
if search:
count_sql += " AND (student_no LIKE %s OR name LIKE %s)"
total_result = await execute_one(count_sql, (f"%{search}%", f"%{search}%"))
else:
total_result = await execute_one(count_sql)
total = total_result["total"] if total_result else 0
return {
"students": students,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
@staticmethod
async def import_students(
students: List[Dict],
operator_id: int,
initial_points: int = 60
) -> Dict[str, Any]:
"""批量导入学生(优化版:预查重 + 批量操作)"""
results = []
success_count = 0
# 预查重:一次性获取所有已存在的学号和手机号
existing_students = await StudentModel.get_all()
existing_student_nos = {s["student_no"] for s in existing_students}
all_users = await execute_query("SELECT username FROM users WHERE status = 1")
existing_usernames = {u["username"] for u in all_users}
for student in students:
try:
student_no = student.get("student_no", "").strip()
name = student.get("name", "").strip()
parent_phone = student.get("parent_phone", "").strip()
password = student.get("password", "").strip()
if not student_no or not name:
results.append({"student_no": student_no, "success": False, "error": "学号或姓名不能为空"})
continue
if not security.validate_student_no(student_no):
results.append({"student_no": student_no, "success": False, "error": "学号格式错误"})
continue
if parent_phone and not security.validate_phone(parent_phone):
results.append({"student_no": student_no, "success": False, "error": "手机号格式错误"})
continue
if student_no in existing_student_nos:
results.append({"student_no": student_no, "success": False, "error": "学号已存在"})
continue
init_password = password if password else "123456"
# 创建学生记录
student_id = await StudentModel.create(
student_no=student_no,
name=name,
parent_phone=parent_phone if parent_phone else None,
initial_points=initial_points
)
existing_student_nos.add(student_no)
# 创建学生登录账号
await UserModel.create_student(
username=student_no,
password=init_password,
real_name=name,
student_id=student_id
)
existing_usernames.add(student_no)
# 创建家长账号(如果手机号存在且未被注册)
if parent_phone and parent_phone not in existing_usernames:
await UserModel.create_parent(
username=parent_phone,
password=init_password,
real_name=f"{name}家长",
student_id=student_id
)
existing_usernames.add(parent_phone)
results.append({"student_no": student_no, "success": True, "student_id": student_id})
success_count += 1
logger.info(f"用户[{operator_id}] 导入学生: {student_no} - {name}")
except Exception as e:
logger.error(f"导入学生失败: {student.get('student_no', '?')} - {str(e)}")
results.append({
"student_no": student.get("student_no", ""),
"success": False,
"error": f"导入异常: {str(e)}"
})
return {
"success": True,
"total": len(students),
"success_count": success_count,
"failed_count": len(students) - success_count,
"results": results
}
@staticmethod
async def add_student(
student_no: str,
name: str,
parent_phone: Optional[str],
operator_id: int,
initial_points: int = 60
) -> Dict[str, Any]:
"""新增学生"""
if not security.validate_student_no(student_no):
return {"success": False, "message": "学号格式错误"}
if parent_phone and not security.validate_phone(parent_phone):
return {"success": False, "message": "手机号格式错误"}
existing = await StudentModel.get_by_student_no(student_no)
if existing:
return {"success": False, "message": "学号已存在"}
student_id = await StudentModel.create(
student_no=student_no,
name=name,
parent_phone=parent_phone if parent_phone else None,
initial_points=initial_points
)
await UserModel.create_student(
username=student_no,
password="123456",
real_name=name,
student_id=student_id
)
if parent_phone:
parent_exists = await UserModel.get_by_username(parent_phone)
if not parent_exists:
await UserModel.create_parent(
username=parent_phone,
password="123456",
real_name=f"{name}家长",
student_id=student_id
)
logger.info(f"用户[{operator_id}] 新增学生: {student_no} - {name}")
return {"success": True, "student_id": student_id}
@staticmethod
async def add_admin(
username: str,
real_name: str,
password: Optional[str],
role_type: str,
operator_id: int
) -> Dict[str, Any]:
"""添加管理员"""
existing = await UserModel.get_by_username(username)
if existing:
return {"success": False, "message": "用户名已存在"}
if not password:
password = security.generate_random_password()
user_id = await UserModel.create_admin(
username=username,
password=password,
real_name=real_name
)
await AdminRoleModel.create(
user_id=user_id,
role_type=role_type,
subject_id=None
)
logger.info(f"用户[{operator_id}] 添加管理员: {username} ({role_type})")
return {
"success": True,
"user_id": user_id,
"username": username,
"password": password,
"role_type": role_type
}
@staticmethod
async def get_admins() -> Dict[str, Any]:
"""获取管理员列表"""
admins = await AdminRoleModel.get_all()
return {"admins": admins}