# =========================================== # 班级操行分管理系统 - 学生数据模型 # # 开发者: Canglan # 联系方式: admin@sea-studio.top # 版权归属: Sea Network Technology Studio # 许可证: MIT License # # 版权所有 © Sea Network Technology Studio # =========================================== from typing import Optional, List, Dict, Any from utils.database import execute_one, execute_query, execute_insert, execute_update, execute_many from utils.security import security from utils.logger import get_logger logger = get_logger(__name__) class StudentModel: """学生数据模型""" @staticmethod async def get_by_id(student_id: int) -> Optional[Dict[str, Any]]: """根据ID获取学生信息""" sql = """ SELECT s.* FROM students s WHERE s.student_id = %s """ return await execute_one(sql, (student_id,)) @staticmethod async def get_by_student_no(student_no: str) -> Optional[Dict[str, Any]]: """根据学号获取学生信息""" sql = """ SELECT s.* FROM students s WHERE s.student_no = %s """ return await execute_one(sql, (student_no,)) @staticmethod async def get_all(include_disabled: bool = False) -> List[Dict[str, Any]]: """获取所有学生列表(单班级)""" sql = """ SELECT student_id, student_no, name, total_points, parent_phone, dormitory_number, status FROM students WHERE 1=1 """ if not include_disabled: sql += " AND status = 1" sql += " ORDER BY student_no" return await execute_query(sql) @staticmethod async def get_dormitory_list() -> List[str]: """获取所有不重复的宿舍号列表""" try: sql = """ SELECT DISTINCT dormitory_number FROM students WHERE status = 1 AND dormitory_number IS NOT NULL AND dormitory_number != '' ORDER BY dormitory_number """ rows = await execute_query(sql) return [row["dormitory_number"] for row in rows] except Exception as e: logger.warning(f"dormitory_number 列不存在,返回空列表: {e}") return [] @staticmethod async def create( student_no: str, name: str, parent_phone: str = None, dormitory_number: str = None, initial_points: int = 60 ) -> int: """创建学生(初始操行分默认60分)""" if dormitory_number is not None: sql = """ INSERT INTO students (student_no, name, parent_phone, dormitory_number, total_points) VALUES (%s, %s, %s, %s, %s) """ return await execute_insert(sql, (student_no, name, parent_phone, dormitory_number, initial_points)) else: sql = """ INSERT INTO students (student_no, name, parent_phone, total_points) VALUES (%s, %s, %s, %s) """ return await execute_insert(sql, (student_no, name, parent_phone, initial_points)) @staticmethod async def update(student_id: int, name: str = None, parent_phone: str = None, dormitory_number: str = None, status: int = None) -> bool: """更新学生信息""" updates = [] params = [] has_dormitory = False if name is not None: updates.append("name = %s") params.append(name) if parent_phone is not None: updates.append("parent_phone = %s") params.append(parent_phone) if dormitory_number is not None: updates.append("dormitory_number = %s") params.append(dormitory_number) has_dormitory = True if status is not None: updates.append("status = %s") params.append(status) if not updates: return True params.append(student_id) sql = f"UPDATE students SET {', '.join(updates)} WHERE student_id = %s" try: result = await execute_update(sql, tuple(params)) return result > 0 except Exception as e: if has_dormitory: logger.warning(f"dormitory_number 列不存在,尝试不含该字段重试: {e}") retry_updates = [] retry_params = [] if name is not None: retry_updates.append("name = %s") retry_params.append(name) if parent_phone is not None: retry_updates.append("parent_phone = %s") retry_params.append(parent_phone) if status is not None: retry_updates.append("status = %s") retry_params.append(status) if not retry_updates: return True retry_params.append(student_id) sql = f"UPDATE students SET {', '.join(retry_updates)} WHERE student_id = %s" result = await execute_update(sql, tuple(retry_params)) return result > 0 raise @staticmethod async def delete(student_id: int) -> bool: """删除学生(软删除)""" sql = "UPDATE students SET status = 0 WHERE student_id = %s" result = await execute_update(sql, (student_id,)) return result > 0 @staticmethod async def update_total_points(student_id: int, points_change: int) -> bool: """更新学生总分""" sql = "UPDATE students SET total_points = total_points + %s WHERE student_id = %s" result = await execute_update(sql, (points_change, student_id)) return result > 0 @staticmethod async def get_ranking(limit: int = 50) -> List[Dict[str, Any]]: """获取学生排行(单班级)""" sql = """ SELECT student_id, student_no, name, total_points FROM students WHERE status = 1 ORDER BY total_points DESC, student_id ASC LIMIT %s """ results = await execute_query(sql, (limit,)) for i, row in enumerate(results): row['rank'] = i + 1 return results @staticmethod async def get_total_count() -> int: """获取活跃学生总数""" sql = "SELECT COUNT(*) as total FROM students WHERE status = 1" result = await execute_one(sql) return result["total"] if result else 0 @staticmethod async def batch_create(students_data: List[Dict], initial_points: int = 60) -> List[Dict]: """批量创建学生""" results = [] for student in students_data: try: student_id = await StudentModel.create( student_no=student.get('student_no'), name=student.get('name'), parent_phone=student.get('parent_phone'), dormitory_number=student.get('dormitory_number'), initial_points=initial_points ) results.append({ 'student_no': student.get('student_no'), 'success': True, 'student_id': student_id }) except Exception as e: results.append({ 'student_no': student.get('student_no'), 'success': False, 'error': str(e) }) return results