v1.2版本更新发布

This commit is contained in:
2026-04-22 00:59:29 +08:00
parent 194c076456
commit 4121e9624f
26 changed files with 1323 additions and 61 deletions

View File

@@ -54,6 +54,9 @@ PASSWORD_SALT=your-fixed-salt-string-for-password-hash
# 调试入口配置
# ===========================================
# 调试功能开关 - 设为 true 启用调试路由,生产环境必须为 false
DEBUG_ENABLED=false
# 调试入口路径 - 自定义随机路径增强安全性
DEBUG_PATH=/a7k9x2m4q8w1e3r5t6y7u8i9o0p1z2x3
# ===========================================
@@ -83,12 +86,26 @@ LABOR_POINTS_ADD=1
LABOR_POINTS_SUBTRACT=-1
# ===========================================
# 班长加减分限制配置
# 各角色加减分限制配置
# ===========================================
# 班长单次加分上限
MONITOR_MAX_ADD=5
# 班长单次扣分上限(负数)
MONITOR_MAX_SUBTRACT=-5
# 学习委员单次加减分上限(绝对值)- 正负均不可超过此值
STUDY_COMMISSIONER_MAX_POINTS=5
# 考勤委员单次扣分上限(绝对值)
ATTENDANCE_REP_MAX_POINTS=5
# 劳动委员单次加减分上限(绝对值)
LABOR_REP_MAX_POINTS=1
# 志愿委员单次加分上限
VOLUNTEER_REP_MAX_POINTS=5
# ===========================================
# 日志配置
# ===========================================

View File

@@ -47,6 +47,7 @@ class Settings:
JWT_EXPIRE_MINUTES: int = int(os.getenv("JWT_EXPIRE_MINUTES", "30"))
PASSWORD_SALT: str = os.getenv("PASSWORD_SALT", "")
DEBUG_ENABLED: bool = os.getenv("DEBUG_ENABLED", "False").lower() == "true"
DEBUG_PATH: str = os.getenv("DEBUG_PATH", "/debug_add_admin")
DEDUCTION_HOMEWORK_NOT_SUBMIT: int = int(os.getenv("DEDUCTION_HOMEWORK_NOT_SUBMIT", "2"))
@@ -61,6 +62,11 @@ class Settings:
MONITOR_MAX_ADD: int = int(os.getenv("MONITOR_MAX_ADD", "5"))
MONITOR_MAX_SUBTRACT: int = int(os.getenv("MONITOR_MAX_SUBTRACT", "-5"))
STUDY_COMMISSIONER_MAX_POINTS: int = int(os.getenv("STUDY_COMMISSIONER_MAX_POINTS", "5"))
ATTENDANCE_REP_MAX_POINTS: int = int(os.getenv("ATTENDANCE_REP_MAX_POINTS", "5"))
LABOR_REP_MAX_POINTS: int = int(os.getenv("LABOR_REP_MAX_POINTS", "1"))
VOLUNTEER_REP_MAX_POINTS: int = int(os.getenv("VOLUNTEER_REP_MAX_POINTS", "5"))
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
LOG_MAX_BYTES: int = int(os.getenv("LOG_MAX_BYTES", "104857600"))
LOG_BACKUP_COUNT: int = int(os.getenv("LOG_BACKUP_COUNT", "30"))

View File

@@ -21,7 +21,7 @@ from utils.logger import setup_logger, log_access
from utils.database import init_db_pool, close_db_pool
from utils.redis_client import init_redis_pool, close_redis_pool
from middleware.auth_middleware import AuthMiddleware
from routes import auth, student, parent, admin, subject, debug
from routes import auth, student, parent, admin, subject, semester, debug
# 设置日志
@@ -116,6 +116,7 @@ app.include_router(student.router, prefix="/api/student", tags=["学生端"])
app.include_router(parent.router, prefix="/api/parent", tags=["家长端"])
app.include_router(admin.router, prefix="/api/admin", tags=["管理端"])
app.include_router(subject.router, prefix="/api/subject", tags=["科目管理"])
app.include_router(semester.router, prefix="/api/semester", tags=["学期管理"])
app.include_router(debug.router, tags=["调试"])

View File

@@ -36,11 +36,10 @@ def is_public_path(path: str) -> bool:
for pattern in PUBLIC_PATHS:
if re.match(pattern, path):
return True
# 动态匹配调试入口路径
if settings.DEBUG_PATH and path == settings.DEBUG_PATH:
# 动态匹配调试入口路径(需同时启用调试功能)
if settings.DEBUG_ENABLED and settings.DEBUG_PATH and path == settings.DEBUG_PATH:
return True
return False
return False
class AuthMiddleware(BaseHTTPMiddleware):

View File

@@ -32,12 +32,11 @@ class AdminRoleModel:
sql = """
SELECT ar.*, u.real_name, u.username, s.subject_name
FROM admin_roles ar
JOIN users u ON ar.user_id = u.user_id
JOIN users u ON ar.user_id = u.user_id AND u.status = 1
LEFT JOIN subjects s ON ar.subject_id = s.subject_id
ORDER BY ar.role_type
"""
return await execute_query(sql)
@staticmethod
async def create(user_id: int, role_type: str, subject_id: int = None) -> int:
sql = """

166
backend/models/semester.py Normal file
View File

@@ -0,0 +1,166 @@
# ===========================================
# 班级操行分管理系统 - 学期数据模型
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Optional, List, Dict, Any
from utils.database import execute_one, execute_query, execute_insert, execute_update, execute_many
from utils.logger import get_logger
logger = get_logger(__name__)
class SemesterModel:
"""学期数据模型"""
@staticmethod
async def create(
semester_name: str,
start_date: str = None,
end_date: str = None
) -> int:
"""创建学期"""
sql = """
INSERT INTO semesters (semester_name, start_date, end_date)
VALUES (%s, %s, %s)
"""
return await execute_insert(sql, (semester_name, start_date, end_date))
@staticmethod
async def get_by_id(semester_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取学期信息"""
sql = "SELECT * FROM semesters WHERE semester_id = %s"
return await execute_one(sql, (semester_id,))
@staticmethod
async def get_all() -> List[Dict[str, Any]]:
"""获取所有学期列表"""
sql = """
SELECT semester_id, semester_name, start_date, end_date,
is_active, is_archived, created_at
FROM semesters
ORDER BY created_at DESC
"""
return await execute_query(sql)
@staticmethod
async def get_active() -> Optional[Dict[str, Any]]:
"""获取当前活跃学期"""
sql = """
SELECT semester_id, semester_name, start_date, end_date,
is_active, is_archived, created_at
FROM semesters
WHERE is_active = 1 AND is_archived = 0
LIMIT 1
"""
return await execute_one(sql)
@staticmethod
async def deactivate_all() -> int:
"""将所有学期设为非活跃"""
sql = "UPDATE semesters SET is_active = 0 WHERE is_active = 1"
return await execute_update(sql)
@staticmethod
async def activate(semester_id: int) -> bool:
"""设为当前活跃学期"""
sql = """
UPDATE semesters SET is_active = 1
WHERE semester_id = %s AND is_archived = 0
"""
result = await execute_update(sql, (semester_id,))
return result > 0
@staticmethod
async def archive(semester_id: int) -> bool:
"""归档学期"""
sql = """
UPDATE semesters SET is_archived = 1, is_active = 0
WHERE semester_id = %s AND is_archived = 0
"""
result = await execute_update(sql, (semester_id,))
return result > 0
@staticmethod
async def is_archived(semester_id: int) -> bool:
"""检查学期是否已归档"""
sql = "SELECT is_archived FROM semesters WHERE semester_id = %s"
result = await execute_one(sql, (semester_id,))
if not result:
return False
return bool(result['is_archived'])
@staticmethod
async def get_record_semester_id(record_id: int) -> Optional[int]:
"""获取操行分记录所属的学期ID"""
sql = "SELECT semester_id FROM conduct_records WHERE record_id = %s"
result = await execute_one(sql, (record_id,))
return result['semester_id'] if result else None
class SemesterArchiveModel:
"""学期归档快照数据模型"""
@staticmethod
async def batch_create(archives_data: List[Dict]) -> int:
"""批量创建归档快照"""
if not archives_data:
return 0
sql = """
INSERT INTO semester_archives
(semester_id, student_id, student_no, student_name, final_points, rank_position, total_students)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
params_list = [
(
a['semester_id'], a['student_id'], a['student_no'],
a['student_name'], a['final_points'],
a.get('rank_position'), a.get('total_students')
)
for a in archives_data
]
return await execute_many(sql, params_list)
@staticmethod
async def get_by_semester(semester_id: int) -> List[Dict[str, Any]]:
"""获取学期的归档数据"""
sql = """
SELECT archive_id, semester_id, student_id, student_no,
student_name, final_points, rank_position, total_students, archived_at
FROM semester_archives
WHERE semester_id = %s
ORDER BY rank_position ASC
"""
return await execute_query(sql, (semester_id,))
@staticmethod
async def get_by_semester_and_student(semester_id: int, student_id: int) -> Optional[Dict[str, Any]]:
"""获取指定学期指定学生的归档数据"""
sql = """
SELECT archive_id, semester_id, student_id, student_no,
student_name, final_points, rank_position, total_students, archived_at
FROM semester_archives
WHERE semester_id = %s AND student_id = %s
"""
return await execute_one(sql, (semester_id, student_id))
@staticmethod
async def get_by_student(student_id: int) -> List[Dict[str, Any]]:
"""获取学生在所有已归档学期的数据"""
sql = """
SELECT sa.archive_id, sa.semester_id, sa.student_id, sa.student_no,
sa.student_name, sa.final_points, sa.rank_position,
sa.total_students, sa.archived_at,
s.semester_name, s.start_date, s.end_date
FROM semester_archives sa
JOIN semesters s ON sa.semester_id = s.semester_id
WHERE sa.student_id = %s
ORDER BY sa.archived_at DESC
"""
return await execute_query(sql, (student_id,))

View File

@@ -120,6 +120,13 @@ class StudentModel:
row['rank'] = i + 1
return results
@staticmethod
async def get_total_count() -> int:
"""获取活跃学生总数"""
sql = "SELECT COUNT(*) as total FROM students WHERE status = 1"
result = await execute_one(sql)
return result["total"] if result else 0
@staticmethod
async def batch_create(students_data: List[Dict], initial_points: int = 60) -> List[Dict]:
"""批量创建学生"""

View File

@@ -161,10 +161,17 @@ async def revoke_conduct_record(request: Request, req: RevokeRequest):
)
if result["success"]:
role = await PermissionChecker.get_user_role(user["user_id"])
record = result.get("record", {})
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role=role, operation_type="revoke_record",
target_type="conduct", target_id=req.record_id,
details=(
f"撤销记录ID: {req.record_id}, "
f"原操作人: {record.get('recorder_name', '未知')}, "
f"原分值变动: {'+' if record.get('points_change', 0) > 0 else ''}{record.get('points_change', 0)}分, "
f"撤销操作人: {user['username']}"
),
ip=request.client.host
)
return success_response(message="撤销成功")
@@ -447,7 +454,6 @@ async def reset_admin_password(request: Request, user_id: int, req: ResetPasswor
return error_response(message="仅班主任可重置密码", code=403)
from models.user import UserModel
from utils.security import SecurityUtils
# 获取管理员信息
target_user = await UserModel.get_by_user_id(user_id)
@@ -457,12 +463,8 @@ async def reset_admin_password(request: Request, user_id: int, req: ResetPasswor
if target_user["user_type"] != "admin":
return error_response(message="只能重置管理员密码", code=400)
# 使用传入的新密码
new_password = req.new_password
password_hash = SecurityUtils.sha1_md5_password(new_password)
# 更新密码
updated = await UserModel.update_password(user_id, password_hash)
# 使用传入的新密码UserModel.update_password 内部会进行哈希)
updated = await UserModel.update_password(user_id, req.new_password)
if updated:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],

View File

@@ -32,6 +32,11 @@ class AddAdminDebugRequest(BaseModel):
@router.post(settings.DEBUG_PATH)
async def debug_add_admin(request: Request, req: AddAdminDebugRequest):
# 检查调试功能是否启用
if not settings.DEBUG_ENABLED:
from fastapi.responses import JSONResponse
return JSONResponse(status_code=404, content={"detail": "Not Found"})
from models.user import UserModel
valid_roles = ["班主任", "班长", "学习委员", "考勤委员", "劳动委员", "志愿委员"]

151
backend/routes/semester.py Normal file
View File

@@ -0,0 +1,151 @@
# ===========================================
# 班级操行分管理系统 - 学期管理路由
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from fastapi import APIRouter, Request, Query
from typing import Optional
from middleware.permission import (
get_current_user,
PermissionChecker
)
from services.semester_service import SemesterService
from services.log_service import LogService
from schemas.semester import CreateSemesterRequest
from utils.response import success_response, error_response
from utils.logger import get_logger
router = APIRouter()
logger = get_logger(__name__)
@router.get("/list")
async def list_semesters(request: Request):
"""获取学期列表(仅班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可查看学期列表", code=403)
result = await SemesterService.list_semesters()
if result["success"]:
return success_response(data=result["semesters"])
else:
return error_response(message=result["message"])
@router.get("/active")
async def get_active_semester(request: Request):
"""获取当前活跃学期"""
user = await get_current_user(request)
result = await SemesterService.get_active_semester()
if result["success"]:
return success_response(data=result.get("semester"))
else:
return error_response(message=result["message"])
@router.post("/create")
async def create_semester(request: Request, req: CreateSemesterRequest):
"""创建学期(班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可创建学期", code=403)
result = await SemesterService.create_semester(
semester_name=req.semester_name,
start_date=req.start_date,
end_date=req.end_date,
operator_id=user["user_id"]
)
if result["success"]:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="create_semester",
target_type="semester", target_id=result.get("semester_id"),
details=f"创建学期: {req.semester_name}",
ip=request.client.host
)
return success_response(data=result, message="学期创建成功")
else:
return error_response(message=result["message"])
@router.put("/activate/{semester_id}")
async def activate_semester(request: Request, semester_id: int):
"""设为当前学期(班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可设置当前学期", code=403)
result = await SemesterService.activate_semester(
semester_id=semester_id,
operator_id=user["user_id"]
)
if result["success"]:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="activate_semester",
target_type="semester", target_id=semester_id,
details=f"激活学期ID: {semester_id}",
ip=request.client.host
)
return success_response(message=result["message"])
else:
return error_response(message=result["message"])
@router.post("/archive/{semester_id}")
async def archive_semester(request: Request, semester_id: int):
"""归档学期(班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可归档学期", code=403)
result = await SemesterService.archive_semester(
semester_id=semester_id,
operator_id=user["user_id"]
)
if result["success"]:
await LogService.write_operation_log(
operator_id=user["user_id"], operator_name=user["username"],
operator_role="班主任", operation_type="archive_semester",
target_type="semester", target_id=semester_id,
details=f"归档学期ID: {semester_id}",
ip=request.client.host
)
return success_response(message=result["message"])
else:
return error_response(message=result["message"])
@router.get("/archive/{semester_id}/records")
async def get_archive_records(
request: Request,
semester_id: int,
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200)
):
"""查看归档数据(仅班主任)"""
user = await get_current_user(request)
is_teacher = await PermissionChecker.check_is_teacher(user["user_id"])
if not is_teacher:
return error_response(message="仅班主任可查看归档数据", code=403)
result = await SemesterService.get_archive_records(
semester_id=semester_id,
page=page,
page_size=page_size
)
if result["success"]:
return success_response(data=result["data"])
else:
return error_response(message=result["message"])

View File

@@ -119,4 +119,20 @@ async def get_my_info(request: Request):
result = await StudentService.get_student_info(user["student_id"])
return success_response(data=result)
return success_response(data=result)
@router.get("/semester-records")
async def get_student_semester_records(request: Request):
"""
获取当前学生的历史学期归档记录
"""
user = await get_current_user(request)
if user["user_type"] != "student":
return error_response(message="仅限学生访问", code=403)
from models.semester import SemesterArchiveModel
records = await SemesterArchiveModel.get_by_student(user["student_id"])
return success_response(data={"records": records})

View File

@@ -0,0 +1,20 @@
# ===========================================
# 班级操行分管理系统 - 学期请求模型
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from pydantic import BaseModel, Field
from typing import Optional
class CreateSemesterRequest(BaseModel):
"""创建学期请求"""
semester_name: str = Field(..., min_length=1, max_length=100, description="学期名称")
start_date: Optional[str] = Field(None, description="学期开始日期 (YYYY-MM-DD)")
end_date: Optional[str] = Field(None, description="学期结束日期 (YYYY-MM-DD)")

View File

@@ -52,17 +52,25 @@ class ConductService:
if points_change > settings.MONITOR_MAX_ADD or points_change < settings.MONITOR_MAX_SUBTRACT:
return {"success": False, "message": f"班长单次只能加减{settings.MONITOR_MAX_ADD}分以内"}
elif role == "劳动委员":
# 劳动委员固定 ±1分
if points_change not in [settings.LABOR_POINTS_ADD, settings.LABOR_POINTS_SUBTRACT]:
return {"success": False, "message": "劳动委员只能进行±1分操作"}
# 劳动委员可加减分±LABOR_REP_MAX_POINTS以内
if abs(points_change) > settings.LABOR_REP_MAX_POINTS:
return {"success": False, "message": f"劳动委员单次只能加减{settings.LABOR_REP_MAX_POINTS}分以内"}
elif role == "志愿委员":
# 志愿委员只能加分,不限制正分上限
# 志愿委员只能加分,上限VOLUNTEER_REP_MAX_POINTS
if points_change < 0:
return {"success": False, "message": "志愿委员只能加分"}
elif role in ["学习委员", "考勤委员"]:
# 学习委员和考勤委员只能扣分
if points_change > settings.VOLUNTEER_REP_MAX_POINTS:
return {"success": False, "message": f"志愿委员单次最多加{settings.VOLUNTEER_REP_MAX_POINTS}"}
elif role == "学习委员":
# 学习委员可加减分±STUDY_COMMISSIONER_MAX_POINTS以内
if abs(points_change) > settings.STUDY_COMMISSIONER_MAX_POINTS:
return {"success": False, "message": f"学习委员单次只能加减{settings.STUDY_COMMISSIONER_MAX_POINTS}分以内"}
elif role == "考勤委员":
# 考勤委员只能扣分上限ATTENDANCE_REP_MAX_POINTS
if points_change > 0:
return {"success": False, "message": "该角色只能进行扣分操作"}
return {"success": False, "message": "考勤委员只能进行扣分操作"}
if abs(points_change) > settings.ATTENDANCE_REP_MAX_POINTS:
return {"success": False, "message": f"考勤委员单次最多扣{settings.ATTENDANCE_REP_MAX_POINTS}"}
else:
return {"success": False, "message": "无权进行此操作"}
@@ -121,6 +129,9 @@ class ConductService:
if not record:
return {"success": False, "message": "记录不存在"}
# 归档后班主任仍可撤销/修改记录(任务需求#8
# 归档操作本身不可逆,但归档数据可由班主任修改
# 撤销记录
result = await ConductModel.revoke_record(record_id, revoker_id)
@@ -128,7 +139,16 @@ class ConductService:
# 反向恢复学生总分
await StudentModel.update_total_points(record["student_id"], -record["points_change"])
logger.info(f"用户[{revoker_id}] 撤销了记录[{record_id}]")
return {"success": True, "message": "撤销成功"}
return {
"success": True,
"message": "撤销成功",
"record": {
"student_id": record["student_id"],
"recorder_name": record.get("recorder_name", "未知"),
"points_change": record["points_change"],
"reason": record.get("reason", "")
}
}
else:
return {"success": False, "message": "撤销失败"}

View File

@@ -9,6 +9,7 @@
# 版权所有 © Sea Network Technology Studio
# ===========================================
import math
from typing import Dict, Any, Optional, List
from models.user import UserModel
@@ -93,22 +94,27 @@ class ParentService:
# 获取全班排名
ranking = await StudentModel.get_ranking(limit=1000)
total_students = await StudentModel.get_total_count()
# 查找当前学生排名
student_rank = None
total_students = 0
for r in ranking:
total_students += 1
if r["student_id"] == user["student_id"]:
student_rank = r["rank"]
# 计算百分比排名
percentile = None
if student_rank and total_students and total_students > 0:
percentile = math.ceil(student_rank / total_students * 100)
return {
"student_id": student["student_id"],
"student_name": student["name"],
"student_no": student["student_no"],
"total_points": student["total_points"],
"rank": student_rank,
"total_students": total_students
"total_students": total_students,
"percentile": percentile
}
@staticmethod

View File

@@ -0,0 +1,244 @@
# ===========================================
# 班级操行分管理系统 - 学期服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from models.semester import SemesterModel, SemesterArchiveModel
from models.student import StudentModel
from middleware.permission import PermissionChecker
from config import settings
from utils.logger import get_logger
from utils.database import execute_query
logger = get_logger(__name__)
class SemesterService:
"""学期管理服务"""
@staticmethod
async def list_semesters() -> Dict[str, Any]:
"""获取学期列表"""
try:
semesters = await SemesterModel.get_all()
return {
"success": True,
"semesters": semesters
}
except Exception as e:
logger.error(f"获取学期列表失败: {e}")
return {"success": False, "message": f"获取学期列表失败: {str(e)}"}
@staticmethod
async def create_semester(
semester_name: str,
start_date: str = None,
end_date: str = None,
operator_id: int = None
) -> Dict[str, Any]:
"""创建新学期"""
if not semester_name or not semester_name.strip():
return {"success": False, "message": "学期名称不能为空"}
try:
# 自动将之前的活跃学期设为非活跃
await SemesterModel.deactivate_all()
# 创建新学期并自动设为活跃
semester_id = await SemesterModel.create(
semester_name=semester_name.strip(),
start_date=start_date,
end_date=end_date
)
# 设为活跃学期
await SemesterModel.activate(semester_id)
logger.info(f"用户[{operator_id}] 创建了新学期: {semester_name}")
return {
"success": True,
"message": "学期创建成功",
"semester_id": semester_id
}
except Exception as e:
logger.error(f"创建学期失败: {e}")
return {"success": False, "message": f"创建学期失败: {str(e)}"}
@staticmethod
async def activate_semester(
semester_id: int,
operator_id: int = None
) -> Dict[str, Any]:
"""设为当前活跃学期"""
try:
# 检查学期是否存在
semester = await SemesterModel.get_by_id(semester_id)
if not semester:
return {"success": False, "message": "学期不存在"}
# 已归档的学期不能激活
if semester['is_archived']:
return {"success": False, "message": "已归档的学期不能设为当前学期"}
# 自动将之前的活跃学期设为非活跃
await SemesterModel.deactivate_all()
# 设为活跃
result = await SemesterModel.activate(semester_id)
if result:
logger.info(f"用户[{operator_id}] 激活了学期: {semester['semester_name']}")
return {"success": True, "message": "已设为当前学期"}
else:
return {"success": False, "message": "激活失败"}
except Exception as e:
logger.error(f"激活学期失败: {e}")
return {"success": False, "message": f"激活学期失败: {str(e)}"}
@staticmethod
async def archive_semester(
semester_id: int,
operator_id: int = None
) -> Dict[str, Any]:
"""归档学期"""
try:
# 检查学期是否存在
semester = await SemesterModel.get_by_id(semester_id)
if not semester:
return {"success": False, "message": "学期不存在"}
# 已归档的不能重复归档
if semester['is_archived']:
return {"success": False, "message": "该学期已归档"}
# 获取所有活跃学生及其当前分数
students = await StudentModel.get_all(include_disabled=False)
if not students:
return {"success": False, "message": "没有可归档的学生数据"}
total_students = len(students)
# 按分数降序排列以计算排名
sorted_students = sorted(students, key=lambda s: s['total_points'], reverse=True)
# 构建归档快照数据
archives_data = []
for rank, student in enumerate(sorted_students, 1):
archives_data.append({
'semester_id': semester_id,
'student_id': student['student_id'],
'student_no': student['student_no'],
'student_name': student['name'],
'final_points': student['total_points'],
'rank_position': rank,
'total_students': total_students
})
# 保存归档快照
await SemesterArchiveModel.batch_create(archives_data)
# 标记学期为已归档
await SemesterModel.archive(semester_id)
logger.info(
f"用户[{operator_id}] 归档了学期: {semester['semester_name']}, "
f"{total_students} 名学生"
)
return {
"success": True,
"message": f"学期归档成功,共归档 {total_students} 名学生数据"
}
except Exception as e:
logger.error(f"归档学期失败: {e}")
return {"success": False, "message": f"归档学期失败: {str(e)}"}
@staticmethod
async def get_archive_records(
semester_id: int,
page: int = 1,
page_size: int = 50
) -> Dict[str, Any]:
"""获取归档数据(只读)"""
try:
# 检查学期是否存在
semester = await SemesterModel.get_by_id(semester_id)
if not semester:
return {"success": False, "message": "学期不存在"}
archives = await SemesterArchiveModel.get_by_semester(semester_id)
total = len(archives)
offset = (page - 1) * page_size
paged_archives = archives[offset:offset + page_size]
return {
"success": True,
"data": {
"semester": {
"semester_id": semester['semester_id'],
"semester_name": semester['semester_name'],
"start_date": semester.get('start_date'),
"end_date": semester.get('end_date'),
"is_archived": bool(semester['is_archived'])
},
"archives": paged_archives,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
}
except Exception as e:
logger.error(f"获取归档数据失败: {e}")
return {"success": False, "message": f"获取归档数据失败: {str(e)}"}
@staticmethod
async def reset_student_points(initial_points: int = None) -> Dict[str, Any]:
"""重置所有学生的操行分为初始分"""
if initial_points is None:
initial_points = settings.STUDENT_INITIAL_POINTS
try:
from utils.database import execute_update
sql = "UPDATE students SET total_points = %s WHERE status = 1"
affected = await execute_update(sql, (initial_points,))
logger.info(f"已重置 {affected} 名学生的操行分为 {initial_points}")
return {
"success": True,
"message": f"已重置 {affected} 名学生的操行分为 {initial_points}",
"affected": affected
}
except Exception as e:
logger.error(f"重置学生分数失败: {e}")
return {"success": False, "message": f"重置学生分数失败: {str(e)}"}
@staticmethod
async def get_active_semester() -> Dict[str, Any]:
"""获取当前活跃学期"""
try:
active = await SemesterModel.get_active()
if active:
return {
"success": True,
"semester": active
}
else:
return {
"success": True,
"semester": None,
"message": "当前没有活跃学期"
}
except Exception as e:
logger.error(f"获取活跃学期失败: {e}")
return {"success": False, "message": f"获取活跃学期失败: {str(e)}"}

View File

@@ -123,9 +123,11 @@ class StudentService:
) -> Dict[str, Any]:
"""获取排行榜(单班级系统)"""
ranking = await StudentModel.get_ranking(limit=limit)
total_students = await StudentModel.get_total_count()
return {
"ranking": ranking
"ranking": ranking,
"total_students": total_students
}
@staticmethod