457 lines
19 KiB
Python
457 lines
19 KiB
Python
# ===========================================
|
||
# 班级操行分管理系统 - 学期服务
|
||
#
|
||
# 开发者: Canglan
|
||
# 联系方式: admin@sea-studio.top
|
||
# 版权归属: Sea Network Technology Studio
|
||
# 许可证: MIT License
|
||
#
|
||
# 版权所有 © Sea Network Technology Studio
|
||
# ===========================================
|
||
|
||
import datetime
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
from models.semester import SemesterModel, SemesterArchiveModel
|
||
from models.student import StudentModel
|
||
from middleware.permission import PermissionChecker
|
||
from config import settings
|
||
from utils.logger import get_logger
|
||
from utils.database import execute_query
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class SemesterService:
|
||
"""学期管理服务"""
|
||
|
||
@staticmethod
|
||
async def list_semesters() -> Dict[str, Any]:
|
||
"""获取学期列表"""
|
||
try:
|
||
semesters = await SemesterModel.get_all()
|
||
today = datetime.date.today()
|
||
for sem in semesters:
|
||
counts = await SemesterModel.count_records_by_semester(sem['semester_id'])
|
||
sem['conduct_count'] = counts['conduct_count']
|
||
sem['attendance_count'] = counts['attendance_count']
|
||
# 计算当前周数(仅活跃学期且有开始日期时)
|
||
sem['current_week'] = None
|
||
if sem.get('is_active') and sem.get('start_date'):
|
||
try:
|
||
s_date = sem['start_date']
|
||
if isinstance(s_date, str):
|
||
s_date = datetime.datetime.strptime(s_date, '%Y-%m-%d').date()
|
||
delta = (today - s_date).days
|
||
if delta >= 0:
|
||
sem['current_week'] = delta // 7 + 1
|
||
except (ValueError, TypeError):
|
||
pass
|
||
return {
|
||
"success": True,
|
||
"semesters": semesters
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取学期列表失败: {e}")
|
||
return {"success": False, "message": f"获取学期列表失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def create_semester(
|
||
semester_name: str,
|
||
start_date: str = None,
|
||
end_date: str = None,
|
||
operator_id: int = None
|
||
) -> Dict[str, Any]:
|
||
"""创建新学期"""
|
||
if not semester_name or not semester_name.strip():
|
||
return {"success": False, "message": "学期名称不能为空"}
|
||
|
||
try:
|
||
# 创建学期(不预先 deactivate_all)
|
||
semester_id = await SemesterModel.create(
|
||
semester_name=semester_name.strip(),
|
||
start_date=start_date,
|
||
end_date=end_date
|
||
)
|
||
|
||
# 判断新学期的日期范围是否包含今天,决定是否自动激活
|
||
should_activate = False
|
||
if start_date is not None:
|
||
try:
|
||
today = datetime.date.today()
|
||
s_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
|
||
e_date = (
|
||
datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
|
||
if end_date is not None
|
||
else None
|
||
)
|
||
if s_date <= today and (e_date is None or e_date >= today):
|
||
should_activate = True
|
||
except (ValueError, TypeError):
|
||
should_activate = False
|
||
|
||
if should_activate:
|
||
# 日期范围包含今天,自动激活
|
||
await SemesterModel.deactivate_all()
|
||
await SemesterModel.activate(semester_id)
|
||
logger.info(
|
||
f"用户[{operator_id}] 创建并激活新学期: {semester_name}"
|
||
)
|
||
else:
|
||
# 补录历史学期或未来学期,不激活
|
||
logger.info(
|
||
f"用户[{operator_id}] 创建补录学期(未激活): {semester_name}"
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "学期创建成功",
|
||
"semester_id": semester_id
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"创建学期失败: {e}")
|
||
return {"success": False, "message": f"创建学期失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def activate_semester(
|
||
semester_id: int,
|
||
operator_id: int = None
|
||
) -> Dict[str, Any]:
|
||
"""设为当前活跃学期"""
|
||
try:
|
||
# 检查学期是否存在
|
||
semester = await SemesterModel.get_by_id(semester_id)
|
||
if not semester:
|
||
return {"success": False, "message": "学期不存在"}
|
||
|
||
# 已归档的学期不能激活
|
||
if semester['is_archived']:
|
||
return {"success": False, "message": "已归档的学期不能设为当前学期"}
|
||
|
||
# 自动将之前的活跃学期设为非活跃
|
||
await SemesterModel.deactivate_all()
|
||
|
||
# 设为活跃
|
||
result = await SemesterModel.activate(semester_id)
|
||
|
||
if result:
|
||
logger.info(f"用户[{operator_id}] 激活了学期: {semester['semester_name']}")
|
||
return {"success": True, "message": "已设为当前学期"}
|
||
else:
|
||
return {"success": False, "message": "激活失败"}
|
||
except Exception as e:
|
||
logger.error(f"激活学期失败: {e}")
|
||
return {"success": False, "message": f"激活学期失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def update_semester(
|
||
semester_id: int,
|
||
semester_name: str = None,
|
||
start_date: str = None,
|
||
end_date: str = None,
|
||
operator_id: int = None
|
||
) -> Dict[str, Any]:
|
||
"""编辑学期信息"""
|
||
try:
|
||
semester = await SemesterModel.get_by_id(semester_id)
|
||
if not semester:
|
||
return {"success": False, "message": "学期不存在"}
|
||
|
||
if semester['is_archived']:
|
||
return {"success": False, "message": "已归档的学期不能编辑"}
|
||
|
||
result = await SemesterModel.update(
|
||
semester_id=semester_id,
|
||
semester_name=semester_name,
|
||
start_date=start_date,
|
||
end_date=end_date
|
||
)
|
||
|
||
if result:
|
||
logger.info(f"用户[{operator_id}] 编辑了学期: {semester['semester_name']}")
|
||
return {"success": True, "message": "学期信息已更新"}
|
||
else:
|
||
return {"success": False, "message": "更新失败,请检查参数"}
|
||
except Exception as e:
|
||
logger.error(f"编辑学期失败: {e}")
|
||
return {"success": False, "message": f"编辑学期失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def delete_semester(
|
||
semester_id: int,
|
||
operator_id: int = None
|
||
) -> Dict[str, Any]:
|
||
"""删除学期"""
|
||
try:
|
||
semester = await SemesterModel.get_by_id(semester_id)
|
||
if not semester:
|
||
return {"success": False, "message": "学期不存在"}
|
||
|
||
# 检查是否有关联归档数据
|
||
archive_count = await SemesterModel.count_archives(semester_id)
|
||
if archive_count > 0:
|
||
return {"success": False, "message": f"该学期有 {archive_count} 条归档数据,无法删除"}
|
||
|
||
result = await SemesterModel.delete(semester_id)
|
||
|
||
if result:
|
||
logger.info(f"用户[{operator_id}] 删除了学期: {semester['semester_name']}")
|
||
return {"success": True, "message": "学期已删除"}
|
||
else:
|
||
return {"success": False, "message": "删除失败"}
|
||
except Exception as e:
|
||
logger.error(f"删除学期失败: {e}")
|
||
return {"success": False, "message": f"删除学期失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def associate_records(
|
||
semester_id: int,
|
||
operator_id: int = None
|
||
) -> Dict[str, Any]:
|
||
"""关联记录到学期"""
|
||
try:
|
||
semester = await SemesterModel.get_by_id(semester_id)
|
||
if not semester:
|
||
return {"success": False, "message": "学期不存在"}
|
||
|
||
if semester['is_archived']:
|
||
return {"success": False, "message": "已归档的学期不能关联数据"}
|
||
|
||
start_date = semester.get('start_date')
|
||
if not start_date:
|
||
return {"success": False, "message": "学期未设置开始日期,无法关联数据"}
|
||
|
||
end_date = semester.get('end_date') or datetime.date.today().isoformat()
|
||
|
||
counts = await SemesterModel.associate_records_by_date_range(
|
||
semester_id=semester_id,
|
||
start_date=start_date,
|
||
end_date=end_date
|
||
)
|
||
|
||
logger.info(
|
||
f"用户[{operator_id}] 关联数据到学期: {semester['semester_name']}, "
|
||
f"操行分 {counts['conduct']} 条, 考勤 {counts['attendance']} 条"
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"关联完成:操行分 {counts['conduct']} 条,考勤 {counts['attendance']} 条",
|
||
"data": counts
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"关联记录失败: {e}")
|
||
return {"success": False, "message": f"关联记录失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def archive_semester(
|
||
semester_id: int,
|
||
operator_id: int = None,
|
||
reset_scores: bool = False
|
||
) -> Dict[str, Any]:
|
||
"""归档学期"""
|
||
try:
|
||
# 检查学期是否存在
|
||
semester = await SemesterModel.get_by_id(semester_id)
|
||
if not semester:
|
||
return {"success": False, "message": "学期不存在"}
|
||
|
||
# 已归档的不能重复归档
|
||
if semester['is_archived']:
|
||
return {"success": False, "message": "该学期已归档"}
|
||
|
||
# 校验开始日期:无 start_date 时作业统计会全部归零
|
||
start_date = semester.get('start_date')
|
||
if not start_date:
|
||
return {"success": False, "message": "学期未设置开始日期,无法进行归档"}
|
||
|
||
# 获取所有活跃学生及其当前分数
|
||
students = await StudentModel.get_all(include_disabled=False)
|
||
if not students:
|
||
return {"success": False, "message": "没有可归档的学生数据"}
|
||
|
||
total_students = len(students)
|
||
|
||
# 获取学期的日期范围,用于查询考勤和作业统计
|
||
end_date = semester.get('end_date') or datetime.date.today().isoformat()
|
||
|
||
# 批量查询考勤和作业统计
|
||
attendance_stats = await SemesterModel.get_attendance_stats_by_semester(
|
||
semester_id, start_date, end_date
|
||
)
|
||
homework_stats = await SemesterModel.get_homework_stats_by_date_range(
|
||
start_date, end_date
|
||
)
|
||
|
||
# 构建 attendance_map: {student_id: {status_field: cnt, ...}}
|
||
attendance_map = {}
|
||
for stat in attendance_stats:
|
||
sid = stat['student_id']
|
||
if sid not in attendance_map:
|
||
attendance_map[sid] = {
|
||
'attendance_present': 0, 'attendance_absent': 0,
|
||
'attendance_late': 0, 'attendance_leave': 0
|
||
}
|
||
status_key = {
|
||
'present': 'attendance_present',
|
||
'absent': 'attendance_absent',
|
||
'late': 'attendance_late',
|
||
'leave': 'attendance_leave'
|
||
}.get(stat['status'])
|
||
if status_key:
|
||
attendance_map[sid][status_key] = stat['cnt']
|
||
|
||
# 构建 homework_map: {student_id: {status_field: cnt, ...}}
|
||
homework_map = {}
|
||
for stat in homework_stats:
|
||
sid = stat['student_id']
|
||
if sid not in homework_map:
|
||
homework_map[sid] = {
|
||
'homework_submitted': 0, 'homework_not_submitted': 0,
|
||
'homework_late': 0
|
||
}
|
||
status_key = {
|
||
'submitted': 'homework_submitted',
|
||
'not_submitted': 'homework_not_submitted',
|
||
'late': 'homework_late'
|
||
}.get(stat['status'])
|
||
if status_key:
|
||
homework_map[sid][status_key] = stat['cnt']
|
||
|
||
# 按分数降序排列以计算排名
|
||
sorted_students = sorted(students, key=lambda s: s['total_points'], reverse=True)
|
||
|
||
# 构建归档快照数据
|
||
archives_data = []
|
||
for rank, student in enumerate(sorted_students, 1):
|
||
att = attendance_map.get(student['student_id'], {})
|
||
hw = homework_map.get(student['student_id'], {})
|
||
archives_data.append({
|
||
'semester_id': semester_id,
|
||
'student_id': student['student_id'],
|
||
'student_no': student['student_no'],
|
||
'student_name': student['name'],
|
||
'final_points': student['total_points'],
|
||
'rank_position': rank,
|
||
'total_students': total_students,
|
||
'attendance_present': att.get('attendance_present', 0),
|
||
'attendance_absent': att.get('attendance_absent', 0),
|
||
'attendance_late': att.get('attendance_late', 0),
|
||
'attendance_leave': att.get('attendance_leave', 0),
|
||
'homework_submitted': hw.get('homework_submitted', 0),
|
||
'homework_not_submitted': hw.get('homework_not_submitted', 0),
|
||
'homework_late': hw.get('homework_late', 0),
|
||
})
|
||
# 删除已有的归档数据以保证幂等性,再保存归档快照
|
||
await SemesterArchiveModel.delete_by_semester(semester_id)
|
||
await SemesterArchiveModel.batch_create(archives_data)
|
||
|
||
# 标记学期为已归档
|
||
await SemesterModel.archive(semester_id)
|
||
|
||
# 归档成功后按需重置学生操行分
|
||
if reset_scores:
|
||
reset_result = await SemesterService.reset_student_points()
|
||
logger.info(
|
||
f"用户[{operator_id}] 归档学期: {semester['semester_name']} 并重置学生操行分, "
|
||
f"共 {total_students} 名学生"
|
||
)
|
||
return {
|
||
"success": True,
|
||
"message": f"学期归档成功,共归档 {total_students} 名学生数据,已重置学生操行分"
|
||
}
|
||
|
||
logger.info(
|
||
f"用户[{operator_id}] 归档了学期: {semester['semester_name']}, "
|
||
f"共 {total_students} 名学生"
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"学期归档成功,共归档 {total_students} 名学生数据"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"归档学期失败: {e}")
|
||
return {"success": False, "message": f"归档学期失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def get_archive_records(
|
||
semester_id: int,
|
||
page: int = 1,
|
||
page_size: int = 50
|
||
) -> Dict[str, Any]:
|
||
"""获取归档数据(只读)"""
|
||
try:
|
||
# 检查学期是否存在
|
||
semester = await SemesterModel.get_by_id(semester_id)
|
||
if not semester:
|
||
return {"success": False, "message": "学期不存在"}
|
||
|
||
archives = await SemesterArchiveModel.get_by_semester(semester_id)
|
||
|
||
total = len(archives)
|
||
offset = (page - 1) * page_size
|
||
paged_archives = archives[offset:offset + page_size]
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"semester": {
|
||
"semester_id": semester['semester_id'],
|
||
"semester_name": semester['semester_name'],
|
||
"start_date": semester.get('start_date'),
|
||
"end_date": semester.get('end_date'),
|
||
"is_archived": bool(semester['is_archived'])
|
||
},
|
||
"archives": paged_archives,
|
||
"total": total,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"total_pages": (total + page_size - 1) // page_size
|
||
}
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取归档数据失败: {e}")
|
||
return {"success": False, "message": f"获取归档数据失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def reset_student_points(initial_points: int = None) -> Dict[str, Any]:
|
||
"""重置所有学生的操行分为初始分"""
|
||
if initial_points is None:
|
||
initial_points = settings.STUDENT_INITIAL_POINTS
|
||
|
||
try:
|
||
from utils.database import execute_update
|
||
sql = "UPDATE students SET total_points = %s WHERE status = 1"
|
||
affected = await execute_update(sql, (initial_points,))
|
||
|
||
logger.info(f"已重置 {affected} 名学生的操行分为 {initial_points}")
|
||
return {
|
||
"success": True,
|
||
"message": f"已重置 {affected} 名学生的操行分为 {initial_points} 分",
|
||
"affected": affected
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"重置学生分数失败: {e}")
|
||
return {"success": False, "message": f"重置学生分数失败: {str(e)}"}
|
||
|
||
@staticmethod
|
||
async def get_active_semester() -> Dict[str, Any]:
|
||
"""获取当前活跃学期"""
|
||
try:
|
||
active = await SemesterModel.get_active()
|
||
if active:
|
||
return {
|
||
"success": True,
|
||
"semester": active
|
||
}
|
||
else:
|
||
return {
|
||
"success": True,
|
||
"semester": None,
|
||
"message": "当前没有活跃学期"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取活跃学期失败: {e}")
|
||
return {"success": False, "message": f"获取活跃学期失败: {str(e)}"}
|