324 lines
12 KiB
Python
324 lines
12 KiB
Python
# ===========================================
|
|
# 班级操行分管理系统 - 管理员服务
|
|
#
|
|
# 开发者: Canglan
|
|
# 联系方式: admin@sea-studio.top
|
|
# 版权归属: Sea Network Technology Studio
|
|
# 许可证: MIT License
|
|
#
|
|
# 版权所有 © Sea Network Technology Studio
|
|
# ===========================================
|
|
|
|
from typing import Dict, Any, List, Optional
|
|
from utils.database import execute_query, execute_one, execute_update
|
|
from models.user import UserModel
|
|
from models.student import StudentModel
|
|
from models.admin_role import AdminRoleModel
|
|
from utils.security import security
|
|
from utils.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class AdminService:
|
|
"""管理员服务"""
|
|
|
|
@staticmethod
|
|
async def get_students(
|
|
page: int = 1,
|
|
page_size: int = 20,
|
|
search: str = None,
|
|
dormitory_number: str = None
|
|
) -> Dict[str, Any]:
|
|
"""获取所有学生列表"""
|
|
offset = (page - 1) * page_size
|
|
|
|
sql = """
|
|
SELECT student_id, student_no, name, total_points, parent_phone, dormitory_number, status
|
|
FROM students
|
|
WHERE status = 1
|
|
"""
|
|
params = []
|
|
|
|
if search:
|
|
sql += " AND (student_no LIKE %s OR name LIKE %s)"
|
|
params.extend([f"%{search}%", f"%{search}%"])
|
|
|
|
if dormitory_number:
|
|
sql += " AND dormitory_number = %s"
|
|
params.append(dormitory_number)
|
|
|
|
sql += " ORDER BY student_no LIMIT %s OFFSET %s"
|
|
params.extend([page_size, offset])
|
|
|
|
students = await execute_query(sql, tuple(params))
|
|
|
|
# 获取总数
|
|
count_sql = "SELECT COUNT(*) as total FROM students WHERE status = 1"
|
|
count_params = []
|
|
if search:
|
|
count_sql += " AND (student_no LIKE %s OR name LIKE %s)"
|
|
count_params.extend([f"%{search}%", f"%{search}%"])
|
|
if dormitory_number:
|
|
count_sql += " AND dormitory_number = %s"
|
|
count_params.append(dormitory_number)
|
|
if count_params:
|
|
total_result = await execute_one(count_sql, tuple(count_params))
|
|
else:
|
|
total_result = await execute_one(count_sql)
|
|
total = total_result["total"] if total_result else 0
|
|
|
|
return {
|
|
"students": students,
|
|
"total": total,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total_pages": (total + page_size - 1) // page_size
|
|
}
|
|
|
|
@staticmethod
|
|
async def import_students(
|
|
students: List[Dict],
|
|
operator_id: int,
|
|
initial_points: int = 60
|
|
) -> Dict[str, Any]:
|
|
"""批量导入学生(优化版:预查重 + 批量操作)"""
|
|
results = []
|
|
success_count = 0
|
|
|
|
# 预查重:一次性获取所有已存在的学号和手机号
|
|
existing_students = await StudentModel.get_all()
|
|
existing_student_nos = {s["student_no"] for s in existing_students}
|
|
|
|
all_users = await execute_query("SELECT username FROM users WHERE status = 1")
|
|
existing_usernames = {u["username"] for u in all_users}
|
|
|
|
for student in students:
|
|
try:
|
|
student_no = student.get("student_no", "").strip()
|
|
name = student.get("name", "").strip()
|
|
parent_phone = student.get("parent_phone", "").strip()
|
|
dormitory_number = student.get("dormitory_number", "").strip() if student.get("dormitory_number") else None
|
|
password = student.get("password", "").strip()
|
|
|
|
if not student_no or not name:
|
|
results.append({"student_no": student_no, "success": False, "error": "学号或姓名不能为空"})
|
|
continue
|
|
|
|
if not security.validate_student_no(student_no):
|
|
results.append({"student_no": student_no, "success": False, "error": "学号格式错误"})
|
|
continue
|
|
|
|
if parent_phone and not security.validate_phone(parent_phone):
|
|
results.append({"student_no": student_no, "success": False, "error": "手机号格式错误"})
|
|
continue
|
|
|
|
if student_no in existing_student_nos:
|
|
results.append({"student_no": student_no, "success": False, "error": "学号已存在"})
|
|
continue
|
|
|
|
init_password = password if password else "123456"
|
|
|
|
# 创建学生记录
|
|
student_id = await StudentModel.create(
|
|
student_no=student_no,
|
|
name=name,
|
|
parent_phone=parent_phone if parent_phone else None,
|
|
dormitory_number=dormitory_number,
|
|
initial_points=initial_points
|
|
)
|
|
existing_student_nos.add(student_no)
|
|
|
|
# 创建学生登录账号
|
|
await UserModel.create_student(
|
|
username=student_no,
|
|
password=init_password,
|
|
real_name=name,
|
|
student_id=student_id
|
|
)
|
|
existing_usernames.add(student_no)
|
|
|
|
# 创建家长账号(如果手机号存在且未被注册)
|
|
if parent_phone and parent_phone not in existing_usernames:
|
|
await UserModel.create_parent(
|
|
username=parent_phone,
|
|
password=init_password,
|
|
real_name=f"{name}家长",
|
|
student_id=student_id
|
|
)
|
|
existing_usernames.add(parent_phone)
|
|
|
|
results.append({"student_no": student_no, "success": True, "student_id": student_id})
|
|
success_count += 1
|
|
logger.info(f"用户[{operator_id}] 导入学生: {student_no} - {name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"导入学生失败: {student.get('student_no', '?')} - {str(e)}")
|
|
results.append({
|
|
"student_no": student.get("student_no", ""),
|
|
"success": False,
|
|
"error": f"导入异常: {str(e)}"
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"total": len(students),
|
|
"success_count": success_count,
|
|
"failed_count": len(students) - success_count,
|
|
"results": results
|
|
}
|
|
|
|
@staticmethod
|
|
async def add_student(
|
|
student_no: str,
|
|
name: str,
|
|
parent_phone: Optional[str],
|
|
operator_id: int,
|
|
initial_points: int = 60,
|
|
dormitory_number: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""新增学生"""
|
|
if not security.validate_student_no(student_no):
|
|
return {"success": False, "message": "学号格式错误"}
|
|
|
|
if parent_phone and not security.validate_phone(parent_phone):
|
|
return {"success": False, "message": "手机号格式错误"}
|
|
|
|
existing = await StudentModel.get_by_student_no(student_no)
|
|
if existing:
|
|
return {"success": False, "message": "学号已存在"}
|
|
|
|
student_id = await StudentModel.create(
|
|
student_no=student_no,
|
|
name=name,
|
|
parent_phone=parent_phone if parent_phone else None,
|
|
dormitory_number=dormitory_number,
|
|
initial_points=initial_points
|
|
)
|
|
|
|
await UserModel.create_student(
|
|
username=student_no,
|
|
password="123456",
|
|
real_name=name,
|
|
student_id=student_id
|
|
)
|
|
|
|
if parent_phone:
|
|
parent_exists = await UserModel.get_by_username(parent_phone)
|
|
if not parent_exists:
|
|
await UserModel.create_parent(
|
|
username=parent_phone,
|
|
password="123456",
|
|
real_name=f"{name}家长",
|
|
student_id=student_id
|
|
)
|
|
|
|
logger.info(f"用户[{operator_id}] 新增学生: {student_no} - {name}")
|
|
|
|
return {"success": True, "student_id": student_id}
|
|
|
|
@staticmethod
|
|
async def add_admin(
|
|
username: str,
|
|
real_name: str,
|
|
password: Optional[str],
|
|
role_type: str,
|
|
operator_id: int
|
|
) -> Dict[str, Any]:
|
|
"""添加管理员"""
|
|
existing = await UserModel.get_by_username(username)
|
|
if existing:
|
|
return {"success": False, "message": "用户名已存在"}
|
|
|
|
if not password:
|
|
password = security.generate_random_password()
|
|
|
|
user_id = await UserModel.create_admin(
|
|
username=username,
|
|
password=password,
|
|
real_name=real_name
|
|
)
|
|
|
|
await AdminRoleModel.create(
|
|
user_id=user_id,
|
|
role_type=role_type,
|
|
subject_id=None
|
|
)
|
|
|
|
logger.info(f"用户[{operator_id}] 添加管理员: {username} ({role_type})")
|
|
|
|
return {
|
|
"success": True,
|
|
"user_id": user_id,
|
|
"username": username,
|
|
"password": password,
|
|
"role_type": role_type
|
|
}
|
|
|
|
@staticmethod
|
|
async def get_admins() -> Dict[str, Any]:
|
|
"""获取管理员列表"""
|
|
admins = await AdminRoleModel.get_all()
|
|
return {"admins": admins}
|
|
|
|
@staticmethod
|
|
async def update_student(student_id: int, name: str = None, parent_phone: str = None, dormitory_number: str = None) -> Dict[str, Any]:
|
|
"""编辑学生信息"""
|
|
try:
|
|
student = await StudentModel.get_by_id(student_id)
|
|
if not student:
|
|
return {"success": False, "message": "学生不存在"}
|
|
|
|
result = await StudentModel.update(student_id, name=name, parent_phone=parent_phone, dormitory_number=dormitory_number)
|
|
if result:
|
|
return {"success": True, "message": "学生信息更新成功"}
|
|
return {"success": False, "message": "更新失败"}
|
|
except Exception as e:
|
|
logger.error(f"更新学生信息失败: {e}")
|
|
return {"success": False, "message": f"更新失败: {str(e)}"}
|
|
|
|
@staticmethod
|
|
async def delete_student(student_id: int) -> Dict[str, Any]:
|
|
"""删除学生(软删除)"""
|
|
try:
|
|
student = await StudentModel.get_by_id(student_id)
|
|
if not student:
|
|
return {"success": False, "message": "学生不存在"}
|
|
|
|
result = await StudentModel.delete(student_id)
|
|
if result:
|
|
user = await execute_one(
|
|
"SELECT user_id FROM users WHERE student_id = %s AND user_type = 'student'",
|
|
(student_id,)
|
|
)
|
|
if user:
|
|
await UserModel.update_status(user['user_id'], 0)
|
|
return {"success": True, "message": "学生删除成功"}
|
|
return {"success": False, "message": "删除失败"}
|
|
except Exception as e:
|
|
logger.error(f"删除学生失败: {e}")
|
|
return {"success": False, "message": f"删除失败: {str(e)}"}
|
|
|
|
@staticmethod
|
|
async def reset_student_password(student_id: int, new_password: str) -> Dict[str, Any]:
|
|
"""重置学生密码"""
|
|
try:
|
|
user = await execute_one(
|
|
"SELECT user_id FROM users WHERE student_id = %s AND user_type = 'student'",
|
|
(student_id,)
|
|
)
|
|
if not user:
|
|
return {"success": False, "message": "未找到对应的用户账号"}
|
|
|
|
# UserModel.update_password 内部会进行哈希,无需预先哈希
|
|
result = await UserModel.update_password(user['user_id'], new_password)
|
|
if result:
|
|
await execute_update(
|
|
"UPDATE users SET need_change_password = 1 WHERE user_id = %s",
|
|
(user['user_id'],)
|
|
)
|
|
return {"success": True, "message": "密码重置成功"}
|
|
return {"success": False, "message": "密码重置失败"}
|
|
except Exception as e:
|
|
logger.error(f"重置学生密码失败: {e}")
|
|
return {"success": False, "message": f"重置失败: {str(e)}"} |