后端bug修复

This commit is contained in:
2026-04-09 15:17:47 +08:00
parent 6b1b586fe3
commit 93a1af5958
5 changed files with 333 additions and 8 deletions

View File

@@ -1,5 +1,5 @@
# =========================================== # ===========================================
# 班级操行分管理系统 - 后端服务 # 班级操行分管理系统 - 权限验证中间件
# #
# 开发者: Canglan # 开发者: Canglan
# 联系方式: admin@sea-studio.top # 联系方式: admin@sea-studio.top
@@ -10,7 +10,7 @@
# =========================================== # ===========================================
from fastapi import Request, HTTPException from fastapi import Request, HTTPException
from typing import List, Optional, Callable from typing import List, Optional, Callable, Dict, Any
from functools import wraps from functools import wraps
from utils.response import forbidden_response from utils.response import forbidden_response
@@ -20,6 +20,22 @@ from utils.logger import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
async def get_current_user(request: Request) -> Dict[str, Any]:
"""获取当前登录用户信息"""
return {
"user_id": getattr(request.state, 'user_id', None),
"username": getattr(request.state, 'username', None),
"user_type": getattr(request.state, 'user_type', None),
"student_id": getattr(request.state, 'student_id', None),
"role": getattr(request.state, 'role', None)
}
async def get_current_user_id(request: Request) -> int:
"""获取当前用户ID"""
return getattr(request.state, 'user_id', None)
class PermissionChecker: class PermissionChecker:
"""权限检查器""" """权限检查器"""

View File

@@ -9,7 +9,54 @@
# 版权所有 © Sea Network Technology Studio # 版权所有 © Sea Network Technology Studio
# =========================================== # ===========================================
# 在 admin.py 中修改导入接口 from fastapi import APIRouter, Request, Query, UploadFile, File
from typing import Optional, List
import json
from middleware.permission import (
get_current_user,
require_teacher,
require_monitor,
PermissionChecker
)
from services.admin_service import AdminService
from services.conduct_service import ConductService
from services.homework_service import HomeworkService
from services.attendance_service import AttendanceService
from schemas.admin import (
AddPointsRequest, RevokeRequest, AddAdminRequest,
UpdateHomeworkStatusRequest, AddAttendanceRequest
)
from utils.response import success_response, error_response, paginated_response
from utils.logger import get_logger
from config import settings
router = APIRouter()
logger = get_logger(__name__)
# ========== 学生管理 ==========
@router.get("/students")
async def get_students(
request: Request,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
search: Optional[str] = None
):
"""
获取所有学生列表(单班级)
"""
user = await get_current_user(request)
result = await AdminService.get_students(
page=page,
page_size=page_size,
search=search
)
return success_response(data=result)
@router.post("/students/import") @router.post("/students/import")
async def import_students( async def import_students(
@@ -28,7 +75,6 @@ async def import_students(
return error_response(message="仅班主任可导入学生", code=403) return error_response(message="仅班主任可导入学生", code=403)
# 检查文件大小 # 检查文件大小
file_size = 0
content = await file.read() content = await file.read()
file_size = len(content) file_size = len(content)
@@ -43,7 +89,6 @@ async def import_students(
# 解析JSON # 解析JSON
try: try:
import json
data = json.loads(content.decode('utf-8')) data = json.loads(content.decode('utf-8'))
students = data.get("students", []) students = data.get("students", [])
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
@@ -62,3 +107,268 @@ async def import_students(
) )
return success_response(data=result, message=f"导入完成: 成功{result['success_count']}人,失败{result['failed_count']}") return success_response(data=result, message=f"导入完成: 成功{result['success_count']}人,失败{result['failed_count']}")
@router.post("/students")
async def add_student(
request: Request,
student_no: str,
name: str,
parent_phone: Optional[str] = None
):
"""
新增学生
"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可新增学生", code=403)
result = await AdminService.add_student(
student_no=student_no,
name=name,
parent_phone=parent_phone,
operator_id=user["user_id"],
initial_points=60
)
if result["success"]:
return success_response(data=result, message="学生添加成功")
else:
return error_response(message=result["message"])
# ========== 操行分管理 ==========
@router.post("/conduct/add")
async def add_conduct_points(request: Request, req: AddPointsRequest):
"""
批量加减分
"""
user = await get_current_user(request)
result = await ConductService.add_points(
student_ids=req.student_ids,
points_change=req.points_change,
reason=req.reason,
recorder_id=user["user_id"],
recorder_name=user["username"]
)
if result["success"]:
return success_response(data=result, message="操作成功")
else:
return error_response(message=result["message"])
@router.post("/conduct/revoke")
async def revoke_conduct_record(request: Request, req: RevokeRequest):
"""
撤销扣分记录
"""
user = await get_current_user(request)
result = await ConductService.revoke_record(
record_id=req.record_id,
revoker_id=user["user_id"]
)
if result["success"]:
return success_response(message="撤销成功")
else:
return error_response(message=result["message"])
@router.get("/conduct/history")
async def get_conduct_history(
request: Request,
student_id: Optional[int] = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
start_date: Optional[str] = None,
end_date: Optional[str] = None
):
"""
获取操行分历史记录
"""
user = await get_current_user(request)
result = await ConductService.get_history(
user_id=user["user_id"],
student_id=student_id,
page=page,
page_size=page_size,
start_date=start_date,
end_date=end_date
)
return success_response(data=result)
# ========== 作业管理 ==========
@router.get("/homework/assignments")
async def get_assignments(request: Request):
"""
获取作业列表
"""
user = await get_current_user(request)
result = await HomeworkService.get_assignments(user["user_id"])
return success_response(data=result)
@router.get("/homework/submissions/{assignment_id}")
async def get_submissions(request: Request, assignment_id: int):
"""
获取作业提交记录
"""
user = await get_current_user(request)
result = await HomeworkService.get_submissions(
assignment_id=assignment_id,
user_id=user["user_id"]
)
return success_response(data=result)
@router.post("/homework/assignment")
async def create_assignment(
request: Request,
subject_id: int,
title: str,
description: Optional[str] = None,
deadline: str = None
):
"""
发布作业(班主任)
"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可发布作业", code=403)
result = await HomeworkService.create_assignment(
subject_id=subject_id,
title=title,
description=description,
deadline=deadline,
created_by=user["user_id"]
)
if result["success"]:
return success_response(data=result, message="作业发布成功")
else:
return error_response(message=result["message"])
@router.put("/homework/submission")
async def update_submission_status(request: Request, req: UpdateHomeworkStatusRequest):
"""
更新作业提交状态(科代表)
"""
user = await get_current_user(request)
result = await HomeworkService.update_submission_status(
submission_id=req.submission_id,
status=req.status,
comments=req.comments,
apply_deduction=req.apply_deduction,
operator_id=user["user_id"]
)
if result["success"]:
return success_response(message="状态更新成功")
else:
return error_response(message=result["message"])
# ========== 考勤管理 ==========
@router.post("/attendance")
async def add_attendance(request: Request, req: AddAttendanceRequest):
"""
添加考勤记录(考勤委员)
"""
user = await get_current_user(request)
result = await AttendanceService.add_attendance(
student_id=req.student_id,
date=str(req.date),
status=req.status,
reason=req.reason,
apply_deduction=req.apply_deduction,
recorder_id=user["user_id"]
)
if result["success"]:
return success_response(message="考勤记录添加成功")
else:
return error_response(message=result["message"])
@router.get("/attendance/records")
async def get_attendance_records(
request: Request,
date: Optional[str] = None,
student_id: Optional[int] = None
):
"""
获取考勤记录
"""
user = await get_current_user(request)
result = await AttendanceService.get_records(
user_id=user["user_id"],
date=date,
student_id=student_id
)
return success_response(data=result)
# ========== 管理员管理 ==========
@router.post("/admin/add")
async def add_admin(request: Request, req: AddAdminRequest):
"""
添加管理员(班主任)
"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可添加管理员", code=403)
result = await AdminService.add_admin(
username=req.username,
real_name=req.real_name,
password=req.password,
role_type=req.role_type,
operator_id=user["user_id"]
)
if result["success"]:
return success_response(data=result, message="管理员添加成功")
else:
return error_response(message=result["message"])
@router.get("/admin/list")
async def get_admins(request: Request):
"""
获取管理员列表(班主任)
"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可查看管理员列表", code=403)
result = await AdminService.get_admins()
return success_response(data=result)

View File

@@ -1,5 +1,5 @@
# =========================================== # ===========================================
# 班级操行分管理系统 - 后端服务 # 班级操行分管理系统 - 数据库连接池
# #
# 开发者: Canglan # 开发者: Canglan
# 联系方式: admin@sea-studio.top # 联系方式: admin@sea-studio.top
@@ -34,7 +34,6 @@ async def init_db_pool() -> None:
db=settings.DB_NAME, db=settings.DB_NAME,
minsize=1, minsize=1,
maxsize=settings.DB_POOL_SIZE, maxsize=settings.DB_POOL_SIZE,
maxsize=settings.DB_MAX_OVERFLOW,
autocommit=False, autocommit=False,
charset='utf8mb4', charset='utf8mb4',
cursorclass=aiomysql.DictCursor cursorclass=aiomysql.DictCursor