# =========================================== # 班级操行分管理系统 - 管理端路由 # # 开发者: Canglan # 联系方式: admin@sea-studio.top # 版权归属: Sea Network Technology Studio # 许可证: MIT License # # 版权所有 © Sea Network Technology Studio # =========================================== from fastapi import APIRouter, Request, Query, UploadFile, File from typing import Optional, List import json from middleware.permission import ( get_current_user, require_teacher, PermissionChecker ) from services.admin_service import AdminService from services.conduct_service import ConductService from services.homework_service import HomeworkService from services.attendance_service import AttendanceService from services.log_service import LogService from schemas.admin import ( AddPointsRequest, RevokeRequest, AddAdminRequest, AddStudentRequest, UpdateStudentRequest, UpdateHomeworkStatusRequest, AddAttendanceRequest, UpdateAdminRequest, DeleteAdminRequest, ResetPasswordRequest, CreateAssignmentRequest ) from utils.response import success_response, error_response from utils.logger import get_logger from config import settings router = APIRouter() logger = get_logger(__name__) # ========== 学生管理 ========== @router.get("/students") async def get_students( request: Request, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=1000), search: Optional[str] = None ): """获取所有学生列表(单班级)""" user = await get_current_user(request) if user["user_type"] != "admin": return error_response(message="仅管理员可查看学生列表", code=403) result = await AdminService.get_students(page=page, page_size=page_size, search=search) return success_response(data=result) @router.post("/students/import") async def import_students(request: Request, file: UploadFile = File(...)): """批量导入学生(JSON格式),初始操行分默认为60分""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可导入学生", code=403) content = await file.read() file_size = len(content) if file_size > settings.MAX_UPLOAD_SIZE: return error_response(message=f"文件大小不能超过{settings.MAX_UPLOAD_SIZE // 1024 // 1024}MB") filename = file.filename or "" extension = filename.split('.')[-1].lower() if '.' in filename else '' if extension not in settings.ALLOWED_EXTENSIONS: return error_response(message=f"不支持的文件类型,仅支持 {', '.join(settings.ALLOWED_EXTENSIONS)}") try: data = json.loads(content.decode('utf-8')) students = data.get("students", []) except json.JSONDecodeError as e: return error_response(message=f"JSON格式错误: {str(e)}") except UnicodeDecodeError: return error_response(message="文件编码错误,请使用UTF-8编码") if not students: return error_response(message="文件中没有学生数据") result = await AdminService.import_students( students=students, operator_id=user["user_id"], initial_points=60 ) await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="import_students", target_type="student", details=f"批量导入: 成功{result['success_count']}人, 失败{result['failed_count']}人", ip=request.client.host ) return success_response(data=result, message=f"导入完成: 成功{result['success_count']}人,失败{result['failed_count']}人") @router.post("/students") async def add_student(request: Request, req: AddStudentRequest): """新增学生""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可新增学生", code=403) result = await AdminService.add_student( student_no=req.student_no, name=req.name, parent_phone=req.parent_phone, operator_id=user["user_id"], initial_points=60 ) if result["success"]: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="add_student", target_type="student", target_id=result.get("student_id"), details=f"新增学生: {req.name}({req.student_no})", ip=request.client.host ) return success_response(data=result, message="学生添加成功") else: return error_response(message=result["message"]) @router.put("/students/{student_id}") async def update_student(request: Request, student_id: int, req: UpdateStudentRequest): """编辑学生信息(班主任)""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可编辑学生信息", code=403) result = await AdminService.update_student( student_id=student_id, name=req.name, parent_phone=req.parent_phone ) if result["success"]: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="update_student", target_type="student", target_id=student_id, details=f"编辑学生ID: {student_id}", ip=request.client.host ) return success_response(message=result["message"]) else: return error_response(message=result["message"]) @router.delete("/students/{student_id}") async def delete_student(request: Request, student_id: int): """删除学生(班主任)""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可删除学生", code=403) result = await AdminService.delete_student(student_id=student_id) if result["success"]: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="delete_student", target_type="student", target_id=student_id, details=f"删除学生ID: {student_id}", ip=request.client.host ) return success_response(message=result["message"]) else: return error_response(message=result["message"]) @router.post("/students/reset-password/{student_id}") async def reset_student_password(request: Request, student_id: int, req: ResetPasswordRequest): """重置学生密码(班主任)""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可重置学生密码", code=403) result = await AdminService.reset_student_password( student_id=student_id, new_password=req.new_password ) if result["success"]: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="reset_student_password", target_type="student", target_id=student_id, details=f"重置学生密码, 学生ID: {student_id}", ip=request.client.host ) return success_response(message=result["message"]) else: return error_response(message=result["message"]) # ========== 操行分管理 ========== @router.post("/conduct/add") async def add_conduct_points(request: Request, req: AddPointsRequest): """批量加减分""" user = await get_current_user(request) result = await ConductService.add_points( student_ids=req.student_ids, points_change=req.points_change, reason=req.reason, recorder_id=user["user_id"], recorder_name=user["username"] ) if result["success"]: role = await PermissionChecker.get_user_role(user["user_id"]) await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role=role, operation_type="add_points", target_type="conduct", details=f"批量加减分: {req.points_change}分, 原因: {req.reason}, 对象: {req.student_ids}", ip=request.client.host ) return success_response(data=result, message="操作成功") else: return error_response(message=result["message"]) @router.post("/conduct/revoke") async def revoke_conduct_record(request: Request, req: RevokeRequest): """撤销扣分记录""" user = await get_current_user(request) result = await ConductService.revoke_record( record_id=req.record_id, revoker_id=user["user_id"] ) if result["success"]: role = await PermissionChecker.get_user_role(user["user_id"]) record = result.get("record", {}) await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role=role, operation_type="revoke_record", target_type="conduct", target_id=req.record_id, details=( f"撤销记录ID: {req.record_id}, " f"原操作人: {record.get('recorder_name', '未知')}, " f"原分值变动: {'+' if record.get('points_change', 0) > 0 else ''}{record.get('points_change', 0)}分, " f"撤销操作人: {user['username']}" ), ip=request.client.host ) return success_response(message="撤销成功") else: return error_response(message=result["message"]) @router.post("/conduct/restore") async def restore_conduct_record(request: Request, req: RevokeRequest): """反撤销(恢复)已撤销的记录""" user = await get_current_user(request) result = await ConductService.restore_record( record_id=req.record_id, restorer_id=user["user_id"] ) if result["success"]: record = result.get("record", {}) await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="restore_record", target_type="conduct", target_id=req.record_id, details=( f"反撤销记录ID: {req.record_id}, " f"原操作人: {record.get('recorder_name', '未知')}, " f"原分值变动: {'+' if record.get('points_change', 0) > 0 else ''}{record.get('points_change', 0)}分, " f"反撤销操作人: {user['username']}" ), ip=request.client.host ) return success_response(message="反撤销成功") else: return error_response(message=result["message"]) @router.get("/conduct/history") async def get_conduct_history( request: Request, student_id: Optional[int] = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=1000), start_date: Optional[str] = None, end_date: Optional[str] = None, grouped: bool = Query(False), related_type: Optional[str] = None ): """获取操行分历史记录""" try: user = await get_current_user(request) if user["user_type"] != "admin": return error_response(message="仅管理员可查看历史记录", code=403) result = await ConductService.get_history( user_id=user["user_id"], student_id=student_id, page=page, page_size=page_size, start_date=start_date, end_date=end_date, grouped=grouped, related_type=related_type ) return success_response(data=result) except Exception as e: logger.error(f"获取历史记录失败: {e}", exc_info=True) return error_response(message=f"获取历史记录失败: {str(e)}") # ========== 作业管理 ========== @router.get("/homework/assignments") async def get_assignments(request: Request): """获取作业列表""" user = await get_current_user(request) role = await PermissionChecker.get_user_role(user["user_id"]) if role not in ["班主任", "学习委员"]: return error_response(message="无权限", code=403) result = await HomeworkService.get_assignments(user["user_id"]) return success_response(data=result) @router.get("/homework/submissions/{assignment_id}") async def get_submissions(request: Request, assignment_id: int): """获取作业提交记录""" user = await get_current_user(request) role = await PermissionChecker.get_user_role(user["user_id"]) if role not in ["班主任", "学习委员"]: return error_response(message="无权限", code=403) result = await HomeworkService.get_submissions( assignment_id=assignment_id, user_id=user["user_id"] ) return success_response(data=result) @router.post("/homework/assignment") async def create_assignment(request: Request, req: CreateAssignmentRequest): """发布作业(班主任)""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可发布作业", code=403) result = await HomeworkService.create_assignment( subject_id=req.subject_id, title=req.title, description=req.description, deadline=req.deadline, created_by=user["user_id"] ) if result["success"]: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="create_assignment", target_type="homework", details=f"发布作业: {title}", ip=request.client.host ) return success_response(data=result, message="作业发布成功") else: return error_response(message=result["message"]) @router.put("/homework/submission") async def update_submission_status(request: Request, req: UpdateHomeworkStatusRequest): """更新作业提交状态(班主任或学习委员)""" user = await get_current_user(request) role = await PermissionChecker.get_user_role(user["user_id"]) if role not in ["班主任", "学习委员"]: return error_response(message="无权进行此操作", code=403) result = await HomeworkService.update_submission_status( submission_id=req.submission_id, status=req.status, comments=req.comments, apply_deduction=req.apply_deduction, operator_id=user["user_id"] ) if result["success"]: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role=role, operation_type="update_submission", target_type="homework", target_id=req.submission_id, details=f"状态: {req.status}", ip=request.client.host ) return success_response(message="状态更新成功") else: return error_response(message=result["message"]) # ========== 考勤管理 ========== @router.post("/attendance") async def add_attendance(request: Request, req: AddAttendanceRequest): """添加考勤记录(考勤委员)""" user = await get_current_user(request) role = await PermissionChecker.get_user_role(user["user_id"]) if role not in ["班主任", "考勤委员"]: return error_response(message="无权进行此操作", code=403) result = await AttendanceService.add_attendance( student_id=req.student_id, date=str(req.date), status=req.status, reason=req.reason, apply_deduction=req.apply_deduction, recorder_id=user["user_id"], custom_deduction=req.custom_deduction, slot=req.slot ) if result["success"]: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role=role, operation_type="add_attendance", target_type="attendance", details=f"学生ID: {req.student_id}, 日期: {req.date}, 状态: {req.status}", ip=request.client.host ) return success_response(message="考勤记录添加成功") else: return error_response(message=result["message"]) @router.get("/attendance/records") async def get_attendance_records( request: Request, date: Optional[str] = None, student_id: Optional[int] = None, slot: Optional[str] = None ): """获取考勤记录""" user = await get_current_user(request) role = await PermissionChecker.get_user_role(user["user_id"]) if role not in ["班主任", "考勤委员"]: return error_response(message="无权查看考勤记录", code=403) result = await AttendanceService.get_records( user_id=user["user_id"], date=date, student_id=student_id, slot=slot ) return success_response(data=result) # ========== 管理员管理 ========== @router.post("/add") async def add_admin(request: Request, req: AddAdminRequest): """添加管理员(班主任)""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可添加管理员", code=403) if req.role_type not in ["班长", "学习委员", "考勤委员", "劳动委员", "志愿委员"]: return error_response(message="无效的角色类型", code=400) result = await AdminService.add_admin( username=req.username, real_name=req.real_name, password=req.password, role_type=req.role_type, operator_id=user["user_id"] ) if result["success"]: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="add_admin", target_type="admin", details=f"新增管理员: {req.real_name}({req.username}), 角色: {req.role_type}", ip=request.client.host ) return success_response(data=result, message="管理员添加成功") else: return error_response(message=result["message"]) @router.get("/list") async def get_admins(request: Request): """获取管理员列表(班主任)""" try: user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可查看管理员列表", code=403) result = await AdminService.get_admins() return success_response(data=result) except Exception as e: logger.error(f"获取管理员列表失败: {e}", exc_info=True) return error_response(message=f"获取管理员列表失败: {str(e)}") @router.put("/update/{user_id}") async def update_admin(request: Request, user_id: int, req: UpdateAdminRequest): """更新管理员信息(班主任)""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可更新管理员", code=403) if req.role_type not in ["班长", "学习委员", "考勤委员", "劳动委员", "志愿委员"]: return error_response(message="无效的角色类型", code=400) from models.admin_role import AdminRoleModel from models.user import UserModel # 更新角色 result = await AdminRoleModel.update_role( user_id=user_id, role_type=req.role_type ) # 更新姓名 if req.real_name: await UserModel.update_real_name(user_id, req.real_name) if result: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="update_admin", target_type="admin", target_id=user_id, details=f"更新管理员角色为: {req.role_type}, 姓名: {req.real_name}", ip=request.client.host ) return success_response(message="管理员更新成功") else: return error_response(message="更新失败或管理员不存在") @router.delete("/delete/{user_id}") async def delete_admin(request: Request, user_id: int): """删除管理员(班主任)""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可删除管理员", code=403) # 防止删除自己 if user_id == user["user_id"]: return error_response(message="不能删除当前登录的管理员", code=400) from models.admin_role import AdminRoleModel from models.user import UserModel # 先删除角色记录 role_deleted = await AdminRoleModel.delete(user_id) if role_deleted: # 再删除用户账号(软删除,将状态设为禁用) await UserModel.update_status(user_id, 0) await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="delete_admin", target_type="admin", target_id=user_id, details=f"删除管理员: ID={user_id}", ip=request.client.host ) return success_response(message="管理员删除成功") else: return error_response(message="删除失败或管理员不存在") @router.post("/reset-password/{user_id}") async def reset_admin_password(request: Request, user_id: int, req: ResetPasswordRequest): """重置管理员密码(班主任)""" user = await get_current_user(request) is_teacher = await PermissionChecker.check_is_teacher(user["user_id"]) if not is_teacher: return error_response(message="仅班主任可重置密码", code=403) from models.user import UserModel # 获取管理员信息 target_user = await UserModel.get_by_user_id(user_id) if not target_user: return error_response(message="管理员不存在", code=404) if target_user["user_type"] != "admin": return error_response(message="只能重置管理员密码", code=400) # 使用传入的新密码(UserModel.update_password 内部会进行哈希) updated = await UserModel.update_password(user_id, req.new_password) if updated: await LogService.write_operation_log( operator_id=user["user_id"], operator_name=user["real_name"], operator_role="班主任", operation_type="reset_password", target_type="admin", target_id=user_id, details=f"重置管理员密码: {target_user['real_name']}({target_user['username']})", ip=request.client.host ) return success_response(message="密码重置成功") else: return error_response(message="密码重置失败")