# =========================================== # 班级操行分管理系统 - 学期服务 # # 开发者: Canglan # 联系方式: admin@sea-studio.top # 版权归属: Sea Network Technology Studio # 许可证: MIT License # # 版权所有 © Sea Network Technology Studio # =========================================== import datetime from typing import Dict, Any, List, Optional from models.semester import SemesterModel, SemesterArchiveModel from models.student import StudentModel from middleware.permission import PermissionChecker from config import settings from utils.logger import get_logger from utils.database import execute_query logger = get_logger(__name__) class SemesterService: """学期管理服务""" @staticmethod async def list_semesters() -> Dict[str, Any]: """获取学期列表""" try: semesters = await SemesterModel.get_all() return { "success": True, "semesters": semesters } except Exception as e: logger.error(f"获取学期列表失败: {e}") return {"success": False, "message": f"获取学期列表失败: {str(e)}"} @staticmethod async def create_semester( semester_name: str, start_date: str = None, end_date: str = None, operator_id: int = None ) -> Dict[str, Any]: """创建新学期""" if not semester_name or not semester_name.strip(): return {"success": False, "message": "学期名称不能为空"} try: # 创建学期(不预先 deactivate_all) semester_id = await SemesterModel.create( semester_name=semester_name.strip(), start_date=start_date, end_date=end_date ) # 判断新学期的日期范围是否包含今天,决定是否自动激活 should_activate = False if start_date is not None: try: today = datetime.date.today() s_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date() e_date = ( datetime.datetime.strptime(end_date, "%Y-%m-%d").date() if end_date is not None else None ) if s_date <= today and (e_date is None or e_date >= today): should_activate = True except (ValueError, TypeError): should_activate = False if should_activate: # 日期范围包含今天,自动激活 await SemesterModel.deactivate_all() await SemesterModel.activate(semester_id) logger.info( f"用户[{operator_id}] 创建并激活新学期: {semester_name}" ) else: # 补录历史学期或未来学期,不激活 logger.info( f"用户[{operator_id}] 创建补录学期(未激活): {semester_name}" ) return { "success": True, "message": "学期创建成功", "semester_id": semester_id } except Exception as e: logger.error(f"创建学期失败: {e}") return {"success": False, "message": f"创建学期失败: {str(e)}"} @staticmethod async def activate_semester( semester_id: int, operator_id: int = None ) -> Dict[str, Any]: """设为当前活跃学期""" try: # 检查学期是否存在 semester = await SemesterModel.get_by_id(semester_id) if not semester: return {"success": False, "message": "学期不存在"} # 已归档的学期不能激活 if semester['is_archived']: return {"success": False, "message": "已归档的学期不能设为当前学期"} # 自动将之前的活跃学期设为非活跃 await SemesterModel.deactivate_all() # 设为活跃 result = await SemesterModel.activate(semester_id) if result: logger.info(f"用户[{operator_id}] 激活了学期: {semester['semester_name']}") return {"success": True, "message": "已设为当前学期"} else: return {"success": False, "message": "激活失败"} except Exception as e: logger.error(f"激活学期失败: {e}") return {"success": False, "message": f"激活学期失败: {str(e)}"} @staticmethod async def update_semester( semester_id: int, semester_name: str = None, start_date: str = None, end_date: str = None, operator_id: int = None ) -> Dict[str, Any]: """编辑学期信息""" try: semester = await SemesterModel.get_by_id(semester_id) if not semester: return {"success": False, "message": "学期不存在"} if semester['is_archived']: return {"success": False, "message": "已归档的学期不能编辑"} result = await SemesterModel.update( semester_id=semester_id, semester_name=semester_name, start_date=start_date, end_date=end_date ) if result: logger.info(f"用户[{operator_id}] 编辑了学期: {semester['semester_name']}") return {"success": True, "message": "学期信息已更新"} else: return {"success": False, "message": "更新失败,请检查参数"} except Exception as e: logger.error(f"编辑学期失败: {e}") return {"success": False, "message": f"编辑学期失败: {str(e)}"} @staticmethod async def delete_semester( semester_id: int, operator_id: int = None ) -> Dict[str, Any]: """删除学期""" try: semester = await SemesterModel.get_by_id(semester_id) if not semester: return {"success": False, "message": "学期不存在"} # 检查是否有关联归档数据 archive_count = await SemesterModel.count_archives(semester_id) if archive_count > 0: return {"success": False, "message": f"该学期有 {archive_count} 条归档数据,无法删除"} result = await SemesterModel.delete(semester_id) if result: logger.info(f"用户[{operator_id}] 删除了学期: {semester['semester_name']}") return {"success": True, "message": "学期已删除"} else: return {"success": False, "message": "删除失败"} except Exception as e: logger.error(f"删除学期失败: {e}") return {"success": False, "message": f"删除学期失败: {str(e)}"} @staticmethod async def associate_records( semester_id: int, operator_id: int = None ) -> Dict[str, Any]: """关联记录到学期""" try: semester = await SemesterModel.get_by_id(semester_id) if not semester: return {"success": False, "message": "学期不存在"} if semester['is_archived']: return {"success": False, "message": "已归档的学期不能关联数据"} start_date = semester.get('start_date') if not start_date: return {"success": False, "message": "学期未设置开始日期,无法关联数据"} end_date = semester.get('end_date') or datetime.date.today().isoformat() counts = await SemesterModel.associate_records_by_date_range( semester_id=semester_id, start_date=start_date, end_date=end_date ) logger.info( f"用户[{operator_id}] 关联数据到学期: {semester['semester_name']}, " f"操行分 {counts['conduct']} 条, 考勤 {counts['attendance']} 条" ) return { "success": True, "message": f"关联完成:操行分 {counts['conduct']} 条,考勤 {counts['attendance']} 条", "data": counts } except Exception as e: logger.error(f"关联记录失败: {e}") return {"success": False, "message": f"关联记录失败: {str(e)}"} @staticmethod async def archive_semester( semester_id: int, operator_id: int = None, reset_scores: bool = False ) -> Dict[str, Any]: """归档学期""" try: # 检查学期是否存在 semester = await SemesterModel.get_by_id(semester_id) if not semester: return {"success": False, "message": "学期不存在"} # 已归档的不能重复归档 if semester['is_archived']: return {"success": False, "message": "该学期已归档"} # 校验开始日期:无 start_date 时作业统计会全部归零 start_date = semester.get('start_date') if not start_date: return {"success": False, "message": "学期未设置开始日期,无法进行归档"} # 获取所有活跃学生及其当前分数 students = await StudentModel.get_all(include_disabled=False) if not students: return {"success": False, "message": "没有可归档的学生数据"} total_students = len(students) # 获取学期的日期范围,用于查询考勤和作业统计 end_date = semester.get('end_date') or datetime.date.today().isoformat() # 批量查询考勤和作业统计 attendance_stats = await SemesterModel.get_attendance_stats_by_semester( semester_id, start_date, end_date ) homework_stats = await SemesterModel.get_homework_stats_by_date_range( start_date, end_date ) # 构建 attendance_map: {student_id: {status_field: cnt, ...}} attendance_map = {} for stat in attendance_stats: sid = stat['student_id'] if sid not in attendance_map: attendance_map[sid] = { 'attendance_present': 0, 'attendance_absent': 0, 'attendance_late': 0, 'attendance_leave': 0 } status_key = { 'present': 'attendance_present', 'absent': 'attendance_absent', 'late': 'attendance_late', 'leave': 'attendance_leave' }.get(stat['status']) if status_key: attendance_map[sid][status_key] = stat['cnt'] # 构建 homework_map: {student_id: {status_field: cnt, ...}} homework_map = {} for stat in homework_stats: sid = stat['student_id'] if sid not in homework_map: homework_map[sid] = { 'homework_submitted': 0, 'homework_not_submitted': 0, 'homework_late': 0 } status_key = { 'submitted': 'homework_submitted', 'not_submitted': 'homework_not_submitted', 'late': 'homework_late' }.get(stat['status']) if status_key: homework_map[sid][status_key] = stat['cnt'] # 按分数降序排列以计算排名 sorted_students = sorted(students, key=lambda s: s['total_points'], reverse=True) # 构建归档快照数据 archives_data = [] for rank, student in enumerate(sorted_students, 1): att = attendance_map.get(student['student_id'], {}) hw = homework_map.get(student['student_id'], {}) archives_data.append({ 'semester_id': semester_id, 'student_id': student['student_id'], 'student_no': student['student_no'], 'student_name': student['name'], 'final_points': student['total_points'], 'rank_position': rank, 'total_students': total_students, 'attendance_present': att.get('attendance_present', 0), 'attendance_absent': att.get('attendance_absent', 0), 'attendance_late': att.get('attendance_late', 0), 'attendance_leave': att.get('attendance_leave', 0), 'homework_submitted': hw.get('homework_submitted', 0), 'homework_not_submitted': hw.get('homework_not_submitted', 0), 'homework_late': hw.get('homework_late', 0), }) # 删除已有的归档数据以保证幂等性,再保存归档快照 await SemesterArchiveModel.delete_by_semester(semester_id) await SemesterArchiveModel.batch_create(archives_data) # 标记学期为已归档 await SemesterModel.archive(semester_id) # 归档成功后按需重置学生操行分 if reset_scores: reset_result = await SemesterService.reset_student_points() logger.info( f"用户[{operator_id}] 归档学期: {semester['semester_name']} 并重置学生操行分, " f"共 {total_students} 名学生" ) return { "success": True, "message": f"学期归档成功,共归档 {total_students} 名学生数据,已重置学生操行分" } logger.info( f"用户[{operator_id}] 归档了学期: {semester['semester_name']}, " f"共 {total_students} 名学生" ) return { "success": True, "message": f"学期归档成功,共归档 {total_students} 名学生数据" } except Exception as e: logger.error(f"归档学期失败: {e}") return {"success": False, "message": f"归档学期失败: {str(e)}"} @staticmethod async def get_archive_records( semester_id: int, page: int = 1, page_size: int = 50 ) -> Dict[str, Any]: """获取归档数据(只读)""" try: # 检查学期是否存在 semester = await SemesterModel.get_by_id(semester_id) if not semester: return {"success": False, "message": "学期不存在"} archives = await SemesterArchiveModel.get_by_semester(semester_id) total = len(archives) offset = (page - 1) * page_size paged_archives = archives[offset:offset + page_size] return { "success": True, "data": { "semester": { "semester_id": semester['semester_id'], "semester_name": semester['semester_name'], "start_date": semester.get('start_date'), "end_date": semester.get('end_date'), "is_archived": bool(semester['is_archived']) }, "archives": paged_archives, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } } except Exception as e: logger.error(f"获取归档数据失败: {e}") return {"success": False, "message": f"获取归档数据失败: {str(e)}"} @staticmethod async def reset_student_points(initial_points: int = None) -> Dict[str, Any]: """重置所有学生的操行分为初始分""" if initial_points is None: initial_points = settings.STUDENT_INITIAL_POINTS try: from utils.database import execute_update sql = "UPDATE students SET total_points = %s WHERE status = 1" affected = await execute_update(sql, (initial_points,)) logger.info(f"已重置 {affected} 名学生的操行分为 {initial_points}") return { "success": True, "message": f"已重置 {affected} 名学生的操行分为 {initial_points} 分", "affected": affected } except Exception as e: logger.error(f"重置学生分数失败: {e}") return {"success": False, "message": f"重置学生分数失败: {str(e)}"} @staticmethod async def get_active_semester() -> Dict[str, Any]: """获取当前活跃学期""" try: active = await SemesterModel.get_active() if active: return { "success": True, "semester": active } else: return { "success": True, "semester": None, "message": "当前没有活跃学期" } except Exception as e: logger.error(f"获取活跃学期失败: {e}") return {"success": False, "message": f"获取活跃学期失败: {str(e)}"}