Files
SharedClassManager/backend/services/admin_service.py
2026-04-07 17:07:13 +08:00

298 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ===========================================
# 班级操行分管理系统 - 管理员服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from datetime import datetime
from models.user import UserModel
from models.student import StudentModel
from models.admin_role import AdminRoleModel
from utils.security import security
from utils.logger import get_logger
logger = get_logger(__name__)
class AdminService:
"""管理员服务"""
@staticmethod
async def get_students(
page: int = 1,
page_size: int = 20,
search: str = None
) -> Dict[str, Any]:
"""获取所有学生列表(单班级)"""
offset = (page - 1) * page_size
sql = """
SELECT student_id, student_no, name, total_points, parent_phone, status
FROM students
WHERE status = 1
"""
params = []
if search:
sql += " AND (student_no LIKE %s OR name LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
sql += " ORDER BY student_no LIMIT %s OFFSET %s"
params.extend([page_size, offset])
students = await execute_query(sql, tuple(params))
# 获取总数
count_sql = "SELECT COUNT(*) as total FROM students WHERE status = 1"
if search:
count_sql += " AND (student_no LIKE %s OR name LIKE %s)"
total_result = await execute_one(count_sql, (f"%{search}%", f"%{search}%"))
else:
total_result = await execute_one(count_sql)
total = total_result["total"] if total_result else 0
return {
"students": students,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
@staticmethod
async def import_students(
students: List[Dict],
operator_id: int,
initial_points: int = 60
) -> Dict[str, Any]:
"""
批量导入学生(单班级版本)
初始操行分默认60分
"""
results = []
success_count = 0
for student in students:
student_no = student.get("student_no", "").strip()
name = student.get("name", "").strip()
parent_phone = student.get("parent_phone", "").strip()
password = student.get("password", "").strip()
# 验证必填字段
if not student_no or not name:
results.append({
"student_no": student_no,
"success": False,
"error": "学号或姓名不能为空"
})
continue
# 验证学号格式
if not security.validate_student_no(student_no):
results.append({
"student_no": student_no,
"success": False,
"error": "学号格式错误4-20位字母数字组合"
})
continue
# 验证手机号格式(如果有)
if parent_phone and not security.validate_phone(parent_phone):
results.append({
"student_no": student_no,
"success": False,
"error": "手机号格式错误"
})
continue
# 检查学号是否已存在
existing = await StudentModel.get_by_student_no(student_no)
if existing:
results.append({
"student_no": student_no,
"success": False,
"error": "学号已存在"
})
continue
# 设置初始密码
init_password = password if password else "123456"
# 验证密码强度(可选)
is_valid, msg = security.validate_password_strength(init_password)
if not is_valid:
results.append({
"student_no": student_no,
"success": False,
"error": f"密码不符合要求: {msg}"
})
continue
# 创建学生初始操行分60分
student_id = await StudentModel.create(
student_no=student_no,
name=name,
class_id=1, # 单班级固定为1
parent_phone=parent_phone if parent_phone else None,
initial_points=initial_points
)
# 创建学生账号
await UserModel.create_student(
username=student_no,
password=init_password,
real_name=name,
student_id=student_id
)
# 创建家长账号(如果有手机号)
if parent_phone:
parent_exists = await UserModel.get_by_username(parent_phone)
if not parent_exists:
parent_name = f"{name}家长"
await UserModel.create_parent(
username=parent_phone,
password=init_password,
real_name=parent_name,
student_id=student_id
)
else:
# 手机号已被占用
results.append({
"student_no": student_no,
"success": True,
"student_id": student_id,
"warning": f"家长手机号 {parent_phone} 已被其他账号使用,未创建家长账号"
})
success_count += 1
continue
results.append({
"student_no": student_no,
"success": True,
"student_id": student_id,
"parent_phone": parent_phone if parent_phone else None
})
success_count += 1
logger.info(f"用户[{operator_id}] 导入学生: {student_no} - {name} (初始分:{initial_points})")
return {
"success": True,
"total": len(students),
"success_count": success_count,
"failed_count": len(students) - success_count,
"results": results
}
@staticmethod
async def add_student(
student_no: str,
name: str,
parent_phone: Optional[str],
operator_id: int,
initial_points: int = 60
) -> Dict[str, Any]:
"""新增学生"""
# 验证学号格式
if not security.validate_student_no(student_no):
return {"success": False, "message": "学号格式错误4-20位字母数字组合"}
# 验证手机号格式(如果有)
if parent_phone and not security.validate_phone(parent_phone):
return {"success": False, "message": "手机号格式错误"}
# 检查学号是否已存在
existing = await StudentModel.get_by_student_no(student_no)
if existing:
return {"success": False, "message": "学号已存在"}
# 创建学生初始操行分60分
student_id = await StudentModel.create(
student_no=student_no,
name=name,
class_id=1, # 单班级固定为1
parent_phone=parent_phone if parent_phone else None,
initial_points=initial_points
)
# 创建学生账号
await UserModel.create_student(
username=student_no,
password="123456",
real_name=name,
student_id=student_id
)
# 创建家长账号
if parent_phone:
parent_exists = await UserModel.get_by_username(parent_phone)
if not parent_exists:
await UserModel.create_parent(
username=parent_phone,
password="123456",
real_name=f"{name}家长",
student_id=student_id
)
logger.info(f"用户[{operator_id}] 新增学生: {student_no} - {name}")
return {"success": True, "student_id": student_id, "student_no": student_no, "name": name}
@staticmethod
async def add_admin(
username: str,
real_name: str,
password: Optional[str],
role_type: str,
operator_id: int
) -> Dict[str, Any]:
"""添加管理员(单班级)"""
# 检查用户名是否已存在
existing = await UserModel.get_by_username(username)
if existing:
return {"success": False, "message": "用户名已存在"}
# 生成随机密码(如果未提供)
if not password:
password = security.generate_random_password()
# 创建管理员账号
user_id = await UserModel.create_admin(
username=username,
password=password,
real_name=real_name
)
# 分配角色班级ID固定为1
await AdminRoleModel.create(
user_id=user_id,
role_type=role_type,
class_id=1, # 单班级固定为1
subject_id=None # 单班级版本暂不支持科目关联
)
logger.info(f"用户[{operator_id}] 添加管理员: {username} ({role_type})")
return {
"success": True,
"user_id": user_id,
"username": username,
"password": password,
"role_type": role_type
}
@staticmethod
async def get_admins() -> Dict[str, Any]:
"""获取管理员列表(单班级)"""
admins = await AdminRoleModel.get_by_class(1) # 班级ID固定为1
return {"admins": admins}