Files
SharedClassManager/backend/services/semester_service.py
2026-04-22 02:37:27 +08:00

327 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ===========================================
# 班级操行分管理系统 - 学期服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
import datetime
from typing import Dict, Any, List, Optional
from models.semester import SemesterModel, SemesterArchiveModel
from models.student import StudentModel
from middleware.permission import PermissionChecker
from config import settings
from utils.logger import get_logger
from utils.database import execute_query
logger = get_logger(__name__)
class SemesterService:
"""学期管理服务"""
@staticmethod
async def list_semesters() -> Dict[str, Any]:
"""获取学期列表"""
try:
semesters = await SemesterModel.get_all()
return {
"success": True,
"semesters": semesters
}
except Exception as e:
logger.error(f"获取学期列表失败: {e}")
return {"success": False, "message": f"获取学期列表失败: {str(e)}"}
@staticmethod
async def create_semester(
semester_name: str,
start_date: str = None,
end_date: str = None,
operator_id: int = None
) -> Dict[str, Any]:
"""创建新学期"""
if not semester_name or not semester_name.strip():
return {"success": False, "message": "学期名称不能为空"}
try:
# 创建学期(不预先 deactivate_all
semester_id = await SemesterModel.create(
semester_name=semester_name.strip(),
start_date=start_date,
end_date=end_date
)
# 判断新学期的日期范围是否包含今天,决定是否自动激活
should_activate = False
if start_date is not None:
try:
today = datetime.date.today()
s_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
e_date = (
datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
if end_date is not None
else None
)
if s_date <= today and (e_date is None or e_date >= today):
should_activate = True
except (ValueError, TypeError):
should_activate = False
if should_activate:
# 日期范围包含今天,自动激活
await SemesterModel.deactivate_all()
await SemesterModel.activate(semester_id)
logger.info(
f"用户[{operator_id}] 创建并激活新学期: {semester_name}"
)
else:
# 补录历史学期或未来学期,不激活
logger.info(
f"用户[{operator_id}] 创建补录学期(未激活): {semester_name}"
)
return {
"success": True,
"message": "学期创建成功",
"semester_id": semester_id
}
except Exception as e:
logger.error(f"创建学期失败: {e}")
return {"success": False, "message": f"创建学期失败: {str(e)}"}
@staticmethod
async def activate_semester(
semester_id: int,
operator_id: int = None
) -> Dict[str, Any]:
"""设为当前活跃学期"""
try:
# 检查学期是否存在
semester = await SemesterModel.get_by_id(semester_id)
if not semester:
return {"success": False, "message": "学期不存在"}
# 已归档的学期不能激活
if semester['is_archived']:
return {"success": False, "message": "已归档的学期不能设为当前学期"}
# 自动将之前的活跃学期设为非活跃
await SemesterModel.deactivate_all()
# 设为活跃
result = await SemesterModel.activate(semester_id)
if result:
logger.info(f"用户[{operator_id}] 激活了学期: {semester['semester_name']}")
return {"success": True, "message": "已设为当前学期"}
else:
return {"success": False, "message": "激活失败"}
except Exception as e:
logger.error(f"激活学期失败: {e}")
return {"success": False, "message": f"激活学期失败: {str(e)}"}
@staticmethod
async def archive_semester(
semester_id: int,
operator_id: int = None
) -> Dict[str, Any]:
"""归档学期"""
try:
# 检查学期是否存在
semester = await SemesterModel.get_by_id(semester_id)
if not semester:
return {"success": False, "message": "学期不存在"}
# 已归档的不能重复归档
if semester['is_archived']:
return {"success": False, "message": "该学期已归档"}
# 校验开始日期:无 start_date 时作业统计会全部归零
start_date = semester.get('start_date')
if not start_date:
return {"success": False, "message": "学期未设置开始日期,无法进行归档"}
# 获取所有活跃学生及其当前分数
students = await StudentModel.get_all(include_disabled=False)
if not students:
return {"success": False, "message": "没有可归档的学生数据"}
total_students = len(students)
# 获取学期的日期范围,用于查询考勤和作业统计
end_date = semester.get('end_date') or datetime.date.today().isoformat()
# 批量查询考勤和作业统计
attendance_stats = await SemesterModel.get_attendance_stats_by_semester(
semester_id, start_date, end_date
)
homework_stats = await SemesterModel.get_homework_stats_by_date_range(
start_date, end_date
)
# 构建 attendance_map: {student_id: {status_field: cnt, ...}}
attendance_map = {}
for stat in attendance_stats:
sid = stat['student_id']
if sid not in attendance_map:
attendance_map[sid] = {
'attendance_present': 0, 'attendance_absent': 0,
'attendance_late': 0, 'attendance_leave': 0
}
status_key = {
'present': 'attendance_present',
'absent': 'attendance_absent',
'late': 'attendance_late',
'leave': 'attendance_leave'
}.get(stat['status'])
if status_key:
attendance_map[sid][status_key] = stat['cnt']
# 构建 homework_map: {student_id: {status_field: cnt, ...}}
homework_map = {}
for stat in homework_stats:
sid = stat['student_id']
if sid not in homework_map:
homework_map[sid] = {
'homework_submitted': 0, 'homework_not_submitted': 0,
'homework_late': 0
}
status_key = {
'submitted': 'homework_submitted',
'not_submitted': 'homework_not_submitted',
'late': 'homework_late'
}.get(stat['status'])
if status_key:
homework_map[sid][status_key] = stat['cnt']
# 按分数降序排列以计算排名
sorted_students = sorted(students, key=lambda s: s['total_points'], reverse=True)
# 构建归档快照数据
archives_data = []
for rank, student in enumerate(sorted_students, 1):
att = attendance_map.get(student['student_id'], {})
hw = homework_map.get(student['student_id'], {})
archives_data.append({
'semester_id': semester_id,
'student_id': student['student_id'],
'student_no': student['student_no'],
'student_name': student['name'],
'final_points': student['total_points'],
'rank_position': rank,
'total_students': total_students,
'attendance_present': att.get('attendance_present', 0),
'attendance_absent': att.get('attendance_absent', 0),
'attendance_late': att.get('attendance_late', 0),
'attendance_leave': att.get('attendance_leave', 0),
'homework_submitted': hw.get('homework_submitted', 0),
'homework_not_submitted': hw.get('homework_not_submitted', 0),
'homework_late': hw.get('homework_late', 0),
})
# 删除已有的归档数据以保证幂等性,再保存归档快照
await SemesterArchiveModel.delete_by_semester(semester_id)
await SemesterArchiveModel.batch_create(archives_data)
# 标记学期为已归档
await SemesterModel.archive(semester_id)
logger.info(
f"用户[{operator_id}] 归档了学期: {semester['semester_name']}, "
f"{total_students} 名学生"
)
return {
"success": True,
"message": f"学期归档成功,共归档 {total_students} 名学生数据"
}
except Exception as e:
logger.error(f"归档学期失败: {e}")
return {"success": False, "message": f"归档学期失败: {str(e)}"}
@staticmethod
async def get_archive_records(
semester_id: int,
page: int = 1,
page_size: int = 50
) -> Dict[str, Any]:
"""获取归档数据(只读)"""
try:
# 检查学期是否存在
semester = await SemesterModel.get_by_id(semester_id)
if not semester:
return {"success": False, "message": "学期不存在"}
archives = await SemesterArchiveModel.get_by_semester(semester_id)
total = len(archives)
offset = (page - 1) * page_size
paged_archives = archives[offset:offset + page_size]
return {
"success": True,
"data": {
"semester": {
"semester_id": semester['semester_id'],
"semester_name": semester['semester_name'],
"start_date": semester.get('start_date'),
"end_date": semester.get('end_date'),
"is_archived": bool(semester['is_archived'])
},
"archives": paged_archives,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
}
except Exception as e:
logger.error(f"获取归档数据失败: {e}")
return {"success": False, "message": f"获取归档数据失败: {str(e)}"}
@staticmethod
async def reset_student_points(initial_points: int = None) -> Dict[str, Any]:
"""重置所有学生的操行分为初始分"""
if initial_points is None:
initial_points = settings.STUDENT_INITIAL_POINTS
try:
from utils.database import execute_update
sql = "UPDATE students SET total_points = %s WHERE status = 1"
affected = await execute_update(sql, (initial_points,))
logger.info(f"已重置 {affected} 名学生的操行分为 {initial_points}")
return {
"success": True,
"message": f"已重置 {affected} 名学生的操行分为 {initial_points}",
"affected": affected
}
except Exception as e:
logger.error(f"重置学生分数失败: {e}")
return {"success": False, "message": f"重置学生分数失败: {str(e)}"}
@staticmethod
async def get_active_semester() -> Dict[str, Any]:
"""获取当前活跃学期"""
try:
active = await SemesterModel.get_active()
if active:
return {
"success": True,
"semester": active
}
else:
return {
"success": True,
"semester": None,
"message": "当前没有活跃学期"
}
except Exception as e:
logger.error(f"获取活跃学期失败: {e}")
return {"success": False, "message": f"获取活跃学期失败: {str(e)}"}