Files
ClassManager/backend/routes/admin.py
2026-04-16 11:54:33 +08:00

476 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ===========================================
# 班级操行分管理系统 - 管理端路由
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from fastapi import APIRouter, Request, Query, UploadFile, File
from typing import Optional, List
import json
from middleware.permission import (
get_current_user,
require_teacher,
PermissionChecker
)
from services.admin_service import AdminService
from services.conduct_service import ConductService
from services.homework_service import HomeworkService
from services.attendance_service import AttendanceService
from services.log_service import LogService
from schemas.admin import (
AddPointsRequest, RevokeRequest, AddAdminRequest,
AddStudentRequest,
UpdateHomeworkStatusRequest, AddAttendanceRequest,
UpdateAdminRequest, DeleteAdminRequest, ResetPasswordRequest
)
from utils.response import success_response, error_response
from utils.logger import get_logger
from config import settings
router = APIRouter()
logger = get_logger(__name__)
# ========== 学生管理 ==========
@router.get("/students")
async def get_students(
request: Request,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=1000),
search: Optional[str] = None
):
"""获取所有学生列表(单班级)"""
user = await get_current_user(request)
result = await AdminService.get_students(page=page, page_size=page_size, search=search)
return success_response(data=result)
@router.post("/students/import")
async def import_students(request: Request, file: UploadFile = File(...)):
"""批量导入学生JSON格式初始操行分默认为60分"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可导入学生", code=403)
content = await file.read()
file_size = len(content)
if file_size > settings.MAX_UPLOAD_SIZE:
return error_response(message=f"文件大小不能超过{settings.MAX_UPLOAD_SIZE // 1024 // 1024}MB")
filename = file.filename or ""
extension = filename.split('.')[-1].lower() if '.' in filename else ''
if extension not in settings.ALLOWED_EXTENSIONS:
return error_response(message=f"不支持的文件类型,仅支持 {', '.join(settings.ALLOWED_EXTENSIONS)}")
try:
data = json.loads(content.decode('utf-8'))
students = data.get("students", [])
except json.JSONDecodeError as e:
return error_response(message=f"JSON格式错误: {str(e)}")
except UnicodeDecodeError:
return error_response(message="文件编码错误请使用UTF-8编码")
if not students:
return error_response(message="文件中没有学生数据")
result = await AdminService.import_students(
students=students,
operator_id=user["user_id"],
initial_points=60
)
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="import_students",
target_type="student",
details=f"批量导入: 成功{result['success_count']}人, 失败{result['failed_count']}",
ip=request.client.host
)
return success_response(data=result, message=f"导入完成: 成功{result['success_count']}人,失败{result['failed_count']}")
@router.post("/students")
async def add_student(request: Request, req: AddStudentRequest):
"""新增学生"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可新增学生", code=403)
result = await AdminService.add_student(
student_no=req.student_no,
name=req.name,
parent_phone=req.parent_phone,
operator_id=user["user_id"],
initial_points=60
)
if result["success"]:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="add_student",
target_type="student", target_id=result.get("student_id"),
details=f"新增学生: {req.name}({req.student_no})",
ip=request.client.host
)
return success_response(data=result, message="学生添加成功")
else:
return error_response(message=result["message"])
# ========== 操行分管理 ==========
@router.post("/conduct/add")
async def add_conduct_points(request: Request, req: AddPointsRequest):
"""批量加减分"""
user = await get_current_user(request)
result = await ConductService.add_points(
student_ids=req.student_ids,
points_change=req.points_change,
reason=req.reason,
recorder_id=user["user_id"],
recorder_name=user["username"]
)
if result["success"]:
role = await PermissionChecker.get_user_role(user["user_id"])
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role=role, operation_type="add_points",
target_type="conduct",
details=f"批量加减分: {req.points_change}分, 原因: {req.reason}, 对象: {req.student_ids}",
ip=request.client.host
)
return success_response(data=result, message="操作成功")
else:
return error_response(message=result["message"])
@router.post("/conduct/revoke")
async def revoke_conduct_record(request: Request, req: RevokeRequest):
"""撤销扣分记录"""
user = await get_current_user(request)
result = await ConductService.revoke_record(
record_id=req.record_id,
revoker_id=user["user_id"]
)
if result["success"]:
role = await PermissionChecker.get_user_role(user["user_id"])
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role=role, operation_type="revoke_record",
target_type="conduct", target_id=req.record_id,
ip=request.client.host
)
return success_response(message="撤销成功")
else:
return error_response(message=result["message"])
@router.get("/conduct/history")
async def get_conduct_history(
request: Request,
student_id: Optional[int] = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=1000),
start_date: Optional[str] = None,
end_date: Optional[str] = None
):
"""获取操行分历史记录"""
try:
user = await get_current_user(request)
result = await ConductService.get_history(
user_id=user["user_id"],
student_id=student_id,
page=page,
page_size=page_size,
start_date=start_date,
end_date=end_date
)
return success_response(data=result)
except Exception as e:
logger.error(f"获取历史记录失败: {e}", exc_info=True)
return error_response(message=f"获取历史记录失败: {str(e)}")
# ========== 作业管理 ==========
@router.get("/homework/assignments")
async def get_assignments(request: Request):
"""获取作业列表"""
user = await get_current_user(request)
role = await PermissionChecker.get_user_role(user["user_id"])
if role not in ["班主任", "学习委员"]:
return error_response(message="无权限", code=403)
result = await HomeworkService.get_assignments(user["user_id"])
return success_response(data=result)
@router.get("/homework/submissions/{assignment_id}")
async def get_submissions(request: Request, assignment_id: int):
"""获取作业提交记录"""
user = await get_current_user(request)
role = await PermissionChecker.get_user_role(user["user_id"])
if role not in ["班主任", "学习委员"]:
return error_response(message="无权限", code=403)
result = await HomeworkService.get_submissions(
assignment_id=assignment_id,
user_id=user["user_id"]
)
return success_response(data=result)
@router.post("/homework/assignment")
async def create_assignment(
request: Request,
subject_id: int,
title: str,
description: Optional[str] = None,
deadline: str = None
):
"""发布作业(班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可发布作业", code=403)
result = await HomeworkService.create_assignment(
subject_id=subject_id,
title=title,
description=description,
deadline=deadline,
created_by=user["user_id"]
)
if result["success"]:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="create_assignment",
target_type="homework",
details=f"发布作业: {title}",
ip=request.client.host
)
return success_response(data=result, message="作业发布成功")
else:
return error_response(message=result["message"])
@router.put("/homework/submission")
async def update_submission_status(request: Request, req: UpdateHomeworkStatusRequest):
"""更新作业提交状态(班主任或学习委员)"""
user = await get_current_user(request)
role = await PermissionChecker.get_user_role(user["user_id"])
if role not in ["班主任", "学习委员"]:
return error_response(message="无权进行此操作", code=403)
result = await HomeworkService.update_submission_status(
submission_id=req.submission_id,
status=req.status,
comments=req.comments,
apply_deduction=req.apply_deduction,
operator_id=user["user_id"]
)
if result["success"]:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role=role, operation_type="update_submission",
target_type="homework", target_id=req.submission_id,
details=f"状态: {req.status}",
ip=request.client.host
)
return success_response(message="状态更新成功")
else:
return error_response(message=result["message"])
# ========== 考勤管理 ==========
@router.post("/attendance")
async def add_attendance(request: Request, req: AddAttendanceRequest):
"""添加考勤记录(考勤委员)"""
user = await get_current_user(request)
role = await PermissionChecker.get_user_role(user["user_id"])
if role not in ["班主任", "考勤委员"]:
return error_response(message="无权进行此操作", code=403)
result = await AttendanceService.add_attendance(
student_id=req.student_id,
date=str(req.date),
status=req.status,
reason=req.reason,
apply_deduction=req.apply_deduction,
recorder_id=user["user_id"],
custom_deduction=req.custom_deduction
)
if result["success"]:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role=role, operation_type="add_attendance",
target_type="attendance",
details=f"学生ID: {req.student_id}, 日期: {req.date}, 状态: {req.status}",
ip=request.client.host
)
return success_response(message="考勤记录添加成功")
else:
return error_response(message=result["message"])
@router.get("/attendance/records")
async def get_attendance_records(
request: Request,
date: Optional[str] = None,
student_id: Optional[int] = None
):
"""获取考勤记录"""
user = await get_current_user(request)
result = await AttendanceService.get_records(
user_id=user["user_id"],
date=date,
student_id=student_id
)
return success_response(data=result)
# ========== 管理员管理 ==========
@router.post("/add")
async def add_admin(request: Request, req: AddAdminRequest):
"""添加管理员(班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可添加管理员", code=403)
if req.role_type not in ["班长", "学习委员", "考勤委员", "劳动委员", "志愿委员"]:
return error_response(message="无效的角色类型", code=400)
result = await AdminService.add_admin(
username=req.username,
real_name=req.real_name,
password=req.password,
role_type=req.role_type,
operator_id=user["user_id"]
)
if result["success"]:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="add_admin",
target_type="admin",
details=f"新增管理员: {req.real_name}({req.username}), 角色: {req.role_type}",
ip=request.client.host
)
return success_response(data=result, message="管理员添加成功")
else:
return error_response(message=result["message"])
@router.get("/list")
async def get_admins(request: Request):
"""获取管理员列表(班主任)"""
try:
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可查看管理员列表", code=403)
result = await AdminService.get_admins()
return success_response(data=result)
except Exception as e:
logger.error(f"获取管理员列表失败: {e}", exc_info=True)
return error_response(message=f"获取管理员列表失败: {str(e)}")
@router.put("/update/{user_id}")
async def update_admin(request: Request, user_id: int, req: UpdateAdminRequest):
"""更新管理员信息(班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可更新管理员", code=403)
if req.role_type not in ["班长", "学习委员", "考勤委员", "劳动委员", "志愿委员"]:
return error_response(message="无效的角色类型", code=400)
from models.admin_role import AdminRoleModel
result = await AdminRoleModel.update_role(
user_id=user_id,
role_type=req.role_type
)
if result:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="update_admin",
target_type="admin", target_id=user_id,
details=f"更新管理员角色为: {req.role_type}",
ip=request.client.host
)
return success_response(message="管理员更新成功")
else:
return error_response(message="更新失败或管理员不存在")
@router.delete("/delete/{user_id}")
async def delete_admin(request: Request, user_id: int):
"""删除管理员(班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可删除管理员", code=403)
# 防止删除自己
if user_id == user["user_id"]:
return error_response(message="不能删除当前登录的管理员", code=400)
from models.admin_role import AdminRoleModel
from models.user import UserModel
# 先删除角色记录
role_deleted = await AdminRoleModel.delete(user_id)
if role_deleted:
# 再删除用户账号(软删除,将状态设为禁用)
await UserModel.update_status(user_id, 0)
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="delete_admin",
target_type="admin", target_id=user_id,
details=f"删除管理员: ID={user_id}",
ip=request.client.host
)
return success_response(message="管理员删除成功")
else:
return error_response(message="删除失败或管理员不存在")
@router.post("/reset-password/{user_id}")
async def reset_admin_password(request: Request, user_id: int, req: ResetPasswordRequest):
"""重置管理员密码(班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可重置密码", code=403)
from models.user import UserModel
from utils.security import SecurityUtils
# 获取管理员信息
target_user = await UserModel.get_by_user_id(user_id)
if not target_user:
return error_response(message="管理员不存在", code=404)
if target_user["user_type"] != "admin":
return error_response(message="只能重置管理员密码", code=400)
# 使用传入的新密码
new_password = req.new_password
password_hash = SecurityUtils.sha1_md5_password(new_password)
# 更新密码
updated = await UserModel.update_password(user_id, password_hash)
if updated:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="reset_password",
target_type="admin", target_id=user_id,
details=f"重置管理员密码: {target_user['real_name']}({target_user['username']})",
ip=request.client.host
)
return success_response(message="密码重置成功")
else:
return error_response(message="密码重置失败")