Files
ClassManager/backend/services/student_service.py
2026-05-28 15:38:32 +08:00

140 lines
4.5 KiB
Python

# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from models.student import StudentModel
from models.conduct import ConductModel
from models.attendance import AttendanceModel
from middleware.permission import PermissionChecker
from utils.database import execute_query
from utils.logger import get_logger
logger = get_logger(__name__)
class StudentService:
"""学生服务"""
@staticmethod
async def get_conduct_history(
student_id: int,
limit: int = 50,
offset: int = 0
) -> Dict[str, Any]:
"""获取学生操行分历史(学生端显示,扣分项操作人显示为班主任)"""
student = await StudentModel.get_by_id(student_id)
if not student:
return {"error": "学生不存在"}
records = await ConductModel.get_student_records(
student_id=student_id,
limit=limit,
offset=offset
)
# 处理记录:扣分项的操作人统一显示为"班主任"
for record in records:
if record["points_change"] < 0: # 扣分项
record["recorder_name"] = "班主任"
# 加分项保持原操作人不变
return {
"student_id": student_id,
"student_name": student["name"],
"total_points": student["total_points"],
"records": records
}
@staticmethod
async def get_homework_status(student_id: int) -> Dict[str, Any]:
"""获取学生作业扣分记录"""
student = await StudentModel.get_by_id(student_id)
if not student:
return {"error": "学生不存在"}
# 查询作业相关的操行分记录
sql = """
SELECT cr.record_id, cr.points_change, cr.reason, cr.created_at,
cr.related_type, cr.recorder_name
FROM conduct_records cr
WHERE cr.student_id = %s AND cr.related_type = 'homework' AND cr.is_revoked = 0
ORDER BY cr.created_at DESC
"""
records = await execute_query(sql, (student_id,))
# 统计
total = len(records)
deductions = sum(1 for r in records if r["points_change"] < 0)
return {
"student_id": student_id,
"student_name": student["name"],
"statistics": {
"total": total,
"deductions": deductions
},
"homework": records
}
@staticmethod
async def get_attendance_records(
student_id: int,
month: Optional[str] = None
) -> Dict[str, Any]:
"""获取学生考勤记录"""
student = await StudentModel.get_by_id(student_id)
if not student:
return {"error": "学生不存在"}
records = await AttendanceModel.get_student_records(
student_id=student_id,
month=month
)
# 统计
present = sum(1 for r in records if r["status"] == "present")
absent = sum(1 for r in records if r["status"] == "absent")
late = sum(1 for r in records if r["status"] == "late")
leave = sum(1 for r in records if r["status"] == "leave")
return {
"student_id": student_id,
"student_name": student["name"],
"statistics": {
"present": present,
"absent": absent,
"late": late,
"leave": leave,
"total": len(records)
},
"records": records
}
@staticmethod
async def get_ranking(
user_id: int,
limit: int = 50
) -> Dict[str, Any]:
"""获取排行榜(单班级系统)"""
ranking = await StudentModel.get_ranking(limit=limit)
total_students = await StudentModel.get_total_count()
return {
"ranking": ranking,
"total_students": total_students
}
@staticmethod
async def get_student_info(student_id: int) -> Optional[Dict[str, Any]]:
"""获取学生个人信息"""
return await StudentModel.get_by_id(student_id)