# =========================================== # 班级操行分管理系统 - 后端服务 # # 开发者: Canglan # 联系方式: admin@sea-studio.top # 版权归属: Sea Network Technology Studio # 许可证: MIT License # # 版权所有 © Sea Network Technology Studio # =========================================== from typing import Optional, List, Dict, Any from datetime import datetime from utils.database import execute_one, execute_query, execute_insert, execute_update from utils.logger import get_logger logger = get_logger(__name__) class ConductModel: """操行分数据模型""" @staticmethod async def create_record( student_id: int, points_change: int, reason: str, recorder_id: int, recorder_name: str = None, related_type: str = 'manual', related_id: int = None ) -> int: """创建操行分记录""" sql = """ INSERT INTO conduct_records (student_id, points_change, reason, recorder_id, recorder_name, related_type, related_id) VALUES (%s, %s, %s, %s, %s, %s, %s) """ return await execute_insert(sql, ( student_id, points_change, reason, recorder_id, recorder_name, related_type, related_id )) @staticmethod async def get_student_records( student_id: int, limit: int = 50, offset: int = 0, include_revoked: bool = False ) -> List[Dict[str, Any]]: """获取学生操行分记录""" sql = """ SELECT cr.*, u.real_name as recorder_name FROM conduct_records cr LEFT JOIN users u ON cr.recorder_id = u.user_id WHERE cr.student_id = %s """ if not include_revoked: sql += " AND cr.is_revoked = 0" sql += " ORDER BY cr.created_at DESC LIMIT %s OFFSET %s" return await execute_query(sql, (student_id, limit, offset)) @staticmethod async def get_records_by_recorder( recorder_id: int, limit: int = 50, offset: int = 0 ) -> List[Dict[str, Any]]: """获取操作人提交的记录""" sql = """ SELECT cr.*, s.name as student_name FROM conduct_records cr JOIN students s ON cr.student_id = s.student_id WHERE cr.recorder_id = %s AND cr.is_revoked = 0 ORDER BY cr.created_at DESC LIMIT %s OFFSET %s """ return await execute_query(sql, (recorder_id, limit, offset)) @staticmethod async def get_all_records( limit: int = 100, offset: int = 0, start_date: str = None, end_date: str = None, student_id: int = None ) -> List[Dict[str, Any]]: """获取所有记录(班主任/班长专用)""" # 空字符串转为None if start_date == "": start_date = None if end_date == "": end_date = None sql = """ SELECT cr.*, s.name as student_name, s.student_no, u.real_name as recorder_name FROM conduct_records cr JOIN students s ON cr.student_id = s.student_id JOIN users u ON cr.recorder_id = u.user_id WHERE cr.is_revoked = 0 """ params = [] if student_id: sql += " AND cr.student_id = %s" params.append(student_id) if start_date: sql += " AND DATE(cr.created_at) >= %s" params.append(start_date) if end_date: sql += " AND DATE(cr.created_at) <= %s" params.append(end_date) sql += " ORDER BY cr.created_at DESC LIMIT %s OFFSET %s" params.extend([limit, offset]) return await execute_query(sql, tuple(params)) @staticmethod async def get_grouped_records( student_id: int = None, start_date: str = None, end_date: str = None, page: int = 1, page_size: int = 20 ) -> Dict[str, Any]: """获取分组后的操行分记录(同批次合并)""" if start_date == "": start_date = None if end_date == "": end_date = None conditions = ["cr.is_revoked = 0"] params = [] if student_id: conditions.append("cr.student_id = %s") params.append(student_id) if start_date: conditions.append("cr.created_at >= %s") params.append(start_date) if end_date: conditions.append("cr.created_at <= %s") params.append(end_date + ' 23:59:59') where_clause = " AND ".join(conditions) count_sql = f""" SELECT COUNT(DISTINCT CONCAT(cr.points_change, '|', cr.reason, '|', cr.recorder_id, '|', DATE_FORMAT(cr.created_at, '%Y-%m-%d %H:%i'))) as total FROM conduct_records cr WHERE {where_clause} """ data_sql = f""" SELECT cr.points_change, cr.reason, cr.recorder_name, DATE_FORMAT(MIN(cr.created_at), '%Y-%m-%d %H:%i:%s') as created_at, GROUP_CONCAT(s.name ORDER BY s.student_id SEPARATOR ', ') as student_names, COUNT(*) as student_count FROM conduct_records cr JOIN students s ON cr.student_id = s.student_id WHERE {where_clause} GROUP BY cr.points_change, cr.reason, cr.recorder_id, DATE_FORMAT(cr.created_at, '%Y-%m-%d %H:%i') ORDER BY MIN(cr.created_at) DESC LIMIT %s OFFSET %s """ params_for_count = list(params) params_for_data = list(params) + [page_size, (page - 1) * page_size] total_result = await execute_one(count_sql, tuple(params_for_count)) total = total_result['total'] if total_result else 0 records = await execute_query(data_sql, tuple(params_for_data)) return { "records": records, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } @staticmethod async def get_record_by_id(record_id: int) -> Optional[Dict[str, Any]]: """根据ID获取记录""" sql = """ SELECT cr.*, s.name as student_name, s.total_points FROM conduct_records cr JOIN students s ON cr.student_id = s.student_id WHERE cr.record_id = %s """ return await execute_one(sql, (record_id,)) @staticmethod async def revoke_record(record_id: int, revoker_id: int) -> bool: """撤销记录""" try: sql = """ UPDATE conduct_records SET is_revoked = 1, revoked_by = %s, revoked_at = NOW() WHERE record_id = %s AND is_revoked = 0 """ result = await execute_update(sql, (revoker_id, record_id)) return result > 0 except Exception as e: logger.error(f"撤销记录失败: {e}") return False @staticmethod async def batch_create_records(records_data: List[Dict]) -> List[Dict]: """批量创建操行分记录""" results = [] for record in records_data: try: record_id = await ConductModel.create_record( student_id=record.get('student_id'), points_change=record.get('points_change'), reason=record.get('reason'), recorder_id=record.get('recorder_id'), recorder_name=record.get('recorder_name') ) results.append({ 'student_id': record.get('student_id'), 'success': True, 'record_id': record_id }) except Exception as e: results.append({ 'student_id': record.get('student_id'), 'success': False, 'error': str(e) }) return results @staticmethod async def get_student_total_points(student_id: int) -> int: """获取学生当前总分""" sql = "SELECT total_points FROM students WHERE student_id = %s" result = await execute_one(sql, (student_id,)) return result['total_points'] if result else 100