v0.1测试

This commit is contained in:
2026-04-07 17:07:13 +08:00
parent 593973f598
commit 6b1b586fe3
80 changed files with 9073 additions and 32 deletions

View File

@@ -0,0 +1,11 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================

View File

@@ -0,0 +1,298 @@
# ===========================================
# 班级操行分管理系统 - 管理员服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from datetime import datetime
from models.user import UserModel
from models.student import StudentModel
from models.admin_role import AdminRoleModel
from utils.security import security
from utils.logger import get_logger
logger = get_logger(__name__)
class AdminService:
"""管理员服务"""
@staticmethod
async def get_students(
page: int = 1,
page_size: int = 20,
search: str = None
) -> Dict[str, Any]:
"""获取所有学生列表(单班级)"""
offset = (page - 1) * page_size
sql = """
SELECT student_id, student_no, name, total_points, parent_phone, status
FROM students
WHERE status = 1
"""
params = []
if search:
sql += " AND (student_no LIKE %s OR name LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
sql += " ORDER BY student_no LIMIT %s OFFSET %s"
params.extend([page_size, offset])
students = await execute_query(sql, tuple(params))
# 获取总数
count_sql = "SELECT COUNT(*) as total FROM students WHERE status = 1"
if search:
count_sql += " AND (student_no LIKE %s OR name LIKE %s)"
total_result = await execute_one(count_sql, (f"%{search}%", f"%{search}%"))
else:
total_result = await execute_one(count_sql)
total = total_result["total"] if total_result else 0
return {
"students": students,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
@staticmethod
async def import_students(
students: List[Dict],
operator_id: int,
initial_points: int = 60
) -> Dict[str, Any]:
"""
批量导入学生(单班级版本)
初始操行分默认60分
"""
results = []
success_count = 0
for student in students:
student_no = student.get("student_no", "").strip()
name = student.get("name", "").strip()
parent_phone = student.get("parent_phone", "").strip()
password = student.get("password", "").strip()
# 验证必填字段
if not student_no or not name:
results.append({
"student_no": student_no,
"success": False,
"error": "学号或姓名不能为空"
})
continue
# 验证学号格式
if not security.validate_student_no(student_no):
results.append({
"student_no": student_no,
"success": False,
"error": "学号格式错误4-20位字母数字组合"
})
continue
# 验证手机号格式(如果有)
if parent_phone and not security.validate_phone(parent_phone):
results.append({
"student_no": student_no,
"success": False,
"error": "手机号格式错误"
})
continue
# 检查学号是否已存在
existing = await StudentModel.get_by_student_no(student_no)
if existing:
results.append({
"student_no": student_no,
"success": False,
"error": "学号已存在"
})
continue
# 设置初始密码
init_password = password if password else "123456"
# 验证密码强度(可选)
is_valid, msg = security.validate_password_strength(init_password)
if not is_valid:
results.append({
"student_no": student_no,
"success": False,
"error": f"密码不符合要求: {msg}"
})
continue
# 创建学生初始操行分60分
student_id = await StudentModel.create(
student_no=student_no,
name=name,
class_id=1, # 单班级固定为1
parent_phone=parent_phone if parent_phone else None,
initial_points=initial_points
)
# 创建学生账号
await UserModel.create_student(
username=student_no,
password=init_password,
real_name=name,
student_id=student_id
)
# 创建家长账号(如果有手机号)
if parent_phone:
parent_exists = await UserModel.get_by_username(parent_phone)
if not parent_exists:
parent_name = f"{name}家长"
await UserModel.create_parent(
username=parent_phone,
password=init_password,
real_name=parent_name,
student_id=student_id
)
else:
# 手机号已被占用
results.append({
"student_no": student_no,
"success": True,
"student_id": student_id,
"warning": f"家长手机号 {parent_phone} 已被其他账号使用,未创建家长账号"
})
success_count += 1
continue
results.append({
"student_no": student_no,
"success": True,
"student_id": student_id,
"parent_phone": parent_phone if parent_phone else None
})
success_count += 1
logger.info(f"用户[{operator_id}] 导入学生: {student_no} - {name} (初始分:{initial_points})")
return {
"success": True,
"total": len(students),
"success_count": success_count,
"failed_count": len(students) - success_count,
"results": results
}
@staticmethod
async def add_student(
student_no: str,
name: str,
parent_phone: Optional[str],
operator_id: int,
initial_points: int = 60
) -> Dict[str, Any]:
"""新增学生"""
# 验证学号格式
if not security.validate_student_no(student_no):
return {"success": False, "message": "学号格式错误4-20位字母数字组合"}
# 验证手机号格式(如果有)
if parent_phone and not security.validate_phone(parent_phone):
return {"success": False, "message": "手机号格式错误"}
# 检查学号是否已存在
existing = await StudentModel.get_by_student_no(student_no)
if existing:
return {"success": False, "message": "学号已存在"}
# 创建学生初始操行分60分
student_id = await StudentModel.create(
student_no=student_no,
name=name,
class_id=1, # 单班级固定为1
parent_phone=parent_phone if parent_phone else None,
initial_points=initial_points
)
# 创建学生账号
await UserModel.create_student(
username=student_no,
password="123456",
real_name=name,
student_id=student_id
)
# 创建家长账号
if parent_phone:
parent_exists = await UserModel.get_by_username(parent_phone)
if not parent_exists:
await UserModel.create_parent(
username=parent_phone,
password="123456",
real_name=f"{name}家长",
student_id=student_id
)
logger.info(f"用户[{operator_id}] 新增学生: {student_no} - {name}")
return {"success": True, "student_id": student_id, "student_no": student_no, "name": name}
@staticmethod
async def add_admin(
username: str,
real_name: str,
password: Optional[str],
role_type: str,
operator_id: int
) -> Dict[str, Any]:
"""添加管理员(单班级)"""
# 检查用户名是否已存在
existing = await UserModel.get_by_username(username)
if existing:
return {"success": False, "message": "用户名已存在"}
# 生成随机密码(如果未提供)
if not password:
password = security.generate_random_password()
# 创建管理员账号
user_id = await UserModel.create_admin(
username=username,
password=password,
real_name=real_name
)
# 分配角色班级ID固定为1
await AdminRoleModel.create(
user_id=user_id,
role_type=role_type,
class_id=1, # 单班级固定为1
subject_id=None # 单班级版本暂不支持科目关联
)
logger.info(f"用户[{operator_id}] 添加管理员: {username} ({role_type})")
return {
"success": True,
"user_id": user_id,
"username": username,
"password": password,
"role_type": role_type
}
@staticmethod
async def get_admins() -> Dict[str, Any]:
"""获取管理员列表(单班级)"""
admins = await AdminRoleModel.get_by_class(1) # 班级ID固定为1
return {"admins": admins}

View File

@@ -0,0 +1,114 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, Optional
from datetime import datetime
from models.attendance import AttendanceModel
from models.student import StudentModel
from models.conduct import ConductModel
from middleware.permission import PermissionChecker
from config import settings
from utils.logger import get_logger
logger = get_logger(__name__)
class AttendanceService:
"""考勤服务"""
@staticmethod
async def add_attendance(
student_id: int,
date: str,
status: str,
reason: Optional[str],
apply_deduction: bool,
recorder_id: int
) -> Dict[str, Any]:
"""添加考勤记录"""
# 检查权限
role = await PermissionChecker.get_user_role(recorder_id)
if role not in ["班主任", "考勤委员"]:
return {"success": False, "message": "无权进行此操作"}
# 检查是否同班级
can_manage = await PermissionChecker.check_can_manage_student(recorder_id, student_id)
if not can_manage:
return {"success": False, "message": "无权操作该学生"}
# 添加考勤记录
attendance_id = await AttendanceModel.create_record(
student_id=student_id,
date=date,
status=status,
reason=reason,
recorder_id=recorder_id
)
if not attendance_id:
return {"success": False, "message": "添加考勤记录失败"}
# 应用扣分
if apply_deduction and status in ["absent", "late", "leave"]:
# 确定扣分数值
if status == "absent":
points_change = -settings.DEDUCTION_ATTENDANCE_ABSENT
elif status == "late":
points_change = -settings.DEDUCTION_ATTENDANCE_LATE
else:
points_change = -settings.DEDUCTION_ATTENDANCE_LEAVE
# 创建扣分记录
student = await StudentModel.get_by_id(student_id)
if student:
await ConductModel.create_record(
student_id=student_id,
points_change=points_change,
reason=f"考勤异常: {status}",
recorder_id=recorder_id,
related_type="attendance",
related_id=attendance_id
)
# 标记已应用扣分
await AttendanceModel.mark_deduction_applied(attendance_id)
logger.info(f"用户[{recorder_id}] 添加考勤记录[{attendance_id}] -> {status}")
return {"success": True, "message": "考勤记录添加成功"}
@staticmethod
async def get_records(
user_id: int,
date: Optional[str] = None,
student_id: Optional[int] = None
) -> Dict[str, Any]:
"""获取考勤记录"""
role = await PermissionChecker.get_user_role(user_id)
if role in ["班主任", "考勤委员"]:
class_id = await PermissionChecker.get_user_class_id(user_id)
records = await AttendanceModel.get_class_records(
class_id=class_id,
date=date,
student_id=student_id
)
elif student_id:
# 查看指定学生
can_manage = await PermissionChecker.check_can_manage_student(user_id, student_id)
if not can_manage:
return {"error": "无权查看该学生记录"}
records = await AttendanceModel.get_student_records(student_id)
else:
records = []
return {"records": records}

View File

@@ -0,0 +1,169 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, Optional
from datetime import datetime
from models.user import UserModel
from models.student import StudentModel
from models.admin_role import AdminRoleModel
from utils.security import security
from utils.jwt_handler import jwt_handler
from utils.redis_client import RedisClient
from utils.database import execute_update
from utils.logger import get_logger
logger = get_logger(__name__)
class AuthService:
"""认证服务"""
@staticmethod
async def login(username: str, password: str, ip: str) -> Dict[str, Any]:
"""
用户登录
"""
# 检查登录失败次数
attempts = await RedisClient.get(f"login_attempts:{username}")
if attempts and int(attempts) >= 5:
return {"success": False, "message": "登录失败次数过多请15分钟后重试"}
# 获取用户信息
user = await UserModel.get_by_username(username)
if not user:
await RedisClient.set_login_attempts(username)
return {"success": False, "message": "用户名或密码错误"}
# 验证密码
if not security.verify_password(password, user["password_hash"]):
await RedisClient.set_login_attempts(username)
return {"success": False, "message": "用户名或密码错误"}
# 检查账号状态
if user["status"] != 1:
return {"success": False, "message": "账号已被禁用"}
# 清除登录失败记录
await RedisClient.clear_login_attempts(username)
# 更新最后登录信息
await UserModel.update_last_login(user["user_id"], ip)
# 获取用户角色(如果是管理员)
role = None
if user["user_type"] == "admin":
admin_role = await AdminRoleModel.get_by_user_id(user["user_id"])
role = admin_role["role_type"] if admin_role else None
# 生成Token
token = jwt_handler.create_token(
user_id=user["user_id"],
username=user["username"],
user_type=user["user_type"],
student_id=user["student_id"],
role=role
)
# 存储Token到Redis
await RedisClient.set_user_token(user["user_id"], token)
# 确定跳转路径
redirect = AuthService._get_redirect_path(user["user_type"], role)
return {
"success": True,
"token": token,
"user_id": user["user_id"],
"username": user["username"],
"real_name": user["real_name"],
"user_type": user["user_type"],
"need_change_password": user["need_change_password"] == 1,
"redirect": redirect
}
@staticmethod
async def logout(user_id: int) -> Dict[str, Any]:
"""用户登出"""
await RedisClient.delete_user_token(user_id)
return {"success": True, "message": "登出成功"}
@staticmethod
async def change_password(user_id: int, old_password: str, new_password: str) -> Dict[str, Any]:
"""修改密码"""
# 获取用户信息
user = await UserModel.get_by_user_id(user_id)
if not user:
return {"success": False, "message": "用户不存在"}
# 验证原密码
if not security.verify_password(old_password, user["password_hash"]):
return {"success": False, "message": "原密码错误"}
# 验证新密码强度
is_valid, msg = security.validate_password_strength(new_password)
if not is_valid:
return {"success": False, "message": msg}
# 更新密码
result = await UserModel.update_password(user_id, new_password)
if result:
# 清除所有Token
await RedisClient.delete_user_token(user_id)
return {"success": True, "message": "密码修改成功"}
else:
return {"success": False, "message": "密码修改失败"}
@staticmethod
async def get_user_info(user_id: int) -> Optional[Dict[str, Any]]:
"""获取用户信息"""
user = await UserModel.get_by_user_id(user_id)
if not user:
return None
result = {
"user_id": user["user_id"],
"username": user["username"],
"real_name": user["real_name"],
"user_type": user["user_type"],
"need_change_password": user["need_change_password"] == 1
}
# 获取学生信息
if user["student_id"]:
student = await StudentModel.get_by_id(user["student_id"])
if student:
result["student_no"] = student["student_no"]
result["student_name"] = student["name"]
result["class_id"] = student["class_id"]
result["class_name"] = student["class_name"]
result["total_points"] = student["total_points"]
# 获取管理员角色
if user["user_type"] == "admin":
admin_role = await AdminRoleModel.get_by_user_id(user_id)
if admin_role:
result["role"] = admin_role["role_type"]
result["class_id"] = admin_role["class_id"]
return result
@staticmethod
def _get_redirect_path(user_type: str, role: str = None) -> str:
"""获取跳转路径"""
if user_type == "student":
return "/student/dashboard.php"
elif user_type == "parent":
return "/parent/dashboard.php"
else:
return "/admin/dashboard.php"

View File

@@ -0,0 +1,182 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from datetime import datetime
from models.student import StudentModel
from models.conduct import ConductModel
from models.user import UserModel
from middleware.permission import PermissionChecker
from config import settings
from utils.logger import get_logger
logger = get_logger(__name__)
class ConductService:
"""操行分服务"""
@staticmethod
async def add_points(
student_ids: List[int],
points_change: int,
reason: str,
recorder_id: int,
recorder_name: str
) -> Dict[str, Any]:
"""
批量加减分
"""
# 验证分值
if points_change == 0:
return {"success": False, "message": "分值不能为0"}
# 获取操作人角色
role = await PermissionChecker.get_user_role(recorder_id)
# 权限验证
if role == "班主任":
# 班主任无限制
pass
elif role == "班长":
# 班长限制 ±5分
if points_change > settings.MONITOR_MAX_ADD or points_change < settings.MONITOR_MAX_SUBTRACT:
return {"success": False, "message": f"班长单次只能加减{settings.MONITOR_MAX_ADD}分以内"}
elif role == "劳动委员":
# 劳动委员固定 ±1分
if points_change not in [settings.LABOR_POINTS_ADD, settings.LABOR_POINTS_SUBTRACT]:
return {"success": False, "message": "劳动委员只能进行±1分操作"}
elif role in ["科代表", "考勤委员"]:
# 科代表和考勤委员只能扣分
if points_change > 0:
return {"success": False, "message": "该角色只能进行扣分操作"}
else:
return {"success": False, "message": "无权进行此操作"}
# 批量处理
success_count = 0
fail_count = 0
details = []
for student_id in student_ids:
try:
# 检查学生是否存在
student = await StudentModel.get_by_id(student_id)
if not student:
details.append({"student_id": student_id, "error": "学生不存在"})
fail_count += 1
continue
# 创建记录
record_id = await ConductModel.create_record(
student_id=student_id,
points_change=points_change,
reason=reason,
recorder_id=recorder_id,
recorder_name=recorder_name
)
details.append({"student_id": student_id, "success": True, "record_id": record_id})
success_count += 1
logger.info(f"用户[{recorder_id}] 对学生[{student_id}] 进行 {points_change} 分操作")
except Exception as e:
details.append({"student_id": student_id, "error": str(e)})
fail_count += 1
return {
"success": True,
"success_count": success_count,
"fail_count": fail_count,
"details": details
}
@staticmethod
async def revoke_record(record_id: int, revoker_id: int) -> Dict[str, Any]:
"""撤销扣分记录"""
# 检查权限
can_revoke = await PermissionChecker.check_can_revoke(revoker_id, record_id)
if not can_revoke:
return {"success": False, "message": "无权撤销此记录"}
# 撤销记录
result = await ConductModel.revoke_record(record_id, revoker_id)
if result:
logger.info(f"用户[{revoker_id}] 撤销了记录[{record_id}]")
return {"success": True, "message": "撤销成功"}
else:
return {"success": False, "message": "撤销失败"}
@staticmethod
async def get_history(
user_id: int,
student_id: Optional[int] = None,
page: int = 1,
page_size: int = 20,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
"""获取历史记录"""
role = await PermissionChecker.get_user_role(user_id)
offset = (page - 1) * page_size
# 班主任/班长可查看全班
if role in ["班主任", "班长"]:
user_class = await PermissionChecker.get_user_class_id(user_id)
records = await ConductModel.get_all_records(
class_id=user_class,
limit=page_size,
offset=offset,
start_date=start_date,
end_date=end_date
)
# 获取总数
from utils.database import execute_one
count_sql = """
SELECT COUNT(*) as total FROM conduct_records cr
JOIN students s ON cr.student_id = s.student_id
WHERE s.class_id = %s AND cr.is_revoked = 0
"""
total_result = await execute_one(count_sql, (user_class,))
total = total_result["total"] if total_result else 0
elif student_id:
# 查看指定学生(需权限验证)
can_manage = await PermissionChecker.check_can_manage_student(user_id, student_id)
if not can_manage:
return {"error": "无权查看该学生记录"}
records = await ConductModel.get_student_records(
student_id=student_id,
limit=page_size,
offset=offset
)
total = len(await ConductModel.get_student_records(student_id, limit=10000))
else:
# 查看自己提交的记录
records = await ConductModel.get_records_by_recorder(
recorder_id=user_id,
limit=page_size,
offset=offset
)
total = len(await ConductModel.get_records_by_recorder(user_id, limit=10000))
return {
"records": records,
"page": page,
"page_size": page_size,
"total": total,
"total_pages": (total + page_size - 1) // page_size
}

View File

@@ -0,0 +1,131 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from datetime import datetime
from models.homework import HomeworkModel
from models.student import StudentModel
from models.conduct import ConductModel
from middleware.permission import PermissionChecker
from config import settings
from utils.logger import get_logger
logger = get_logger(__name__)
class HomeworkService:
"""作业服务"""
@staticmethod
async def get_assignments(user_id: int) -> Dict[str, Any]:
"""获取作业列表"""
role = await PermissionChecker.get_user_role(user_id)
if role == "班主任":
class_id = await PermissionChecker.get_user_class_id(user_id)
assignments = await HomeworkModel.get_assignments_by_class(class_id)
elif role == "科代表":
class_id = await PermissionChecker.get_user_class_id(user_id)
subject_ids = await PermissionChecker.get_user_subject_ids(user_id)
assignments = await HomeworkModel.get_assignments_by_subjects(class_id, subject_ids)
else:
assignments = []
return {"assignments": assignments}
@staticmethod
async def create_assignment(
subject_id: int,
title: str,
description: Optional[str],
deadline: str,
created_by: int
) -> Dict[str, Any]:
"""创建作业"""
class_id = await PermissionChecker.get_user_class_id(created_by)
assignment_id = await HomeworkModel.create_assignment(
class_id=class_id,
subject_id=subject_id,
title=title,
description=description,
deadline=deadline,
created_by=created_by
)
if assignment_id:
logger.info(f"用户[{created_by}] 创建作业[{assignment_id}]: {title}")
return {"success": True, "assignment_id": assignment_id}
else:
return {"success": False, "message": "创建作业失败"}
@staticmethod
async def update_submission_status(
submission_id: int,
status: str,
comments: Optional[str],
apply_deduction: bool,
operator_id: int
) -> Dict[str, Any]:
"""更新作业提交状态"""
# 获取提交记录信息
submission = await HomeworkModel.get_submission(submission_id)
if not submission:
return {"success": False, "message": "提交记录不存在"}
# 检查权限
role = await PermissionChecker.get_user_role(operator_id)
if role == "科代表":
# 检查是否管理该科目
subject_ids = await PermissionChecker.get_user_subject_ids(operator_id)
if submission["subject_id"] not in subject_ids:
return {"success": False, "message": "无权操作此作业"}
elif role != "班主任":
return {"success": False, "message": "无权进行此操作"}
# 更新状态
result = await HomeworkModel.update_submission(
submission_id=submission_id,
status=status,
comments=comments,
updated_by=operator_id
)
if not result:
return {"success": False, "message": "更新失败"}
# 应用扣分
if apply_deduction and status in ["not_submitted", "late"]:
# 确定扣分数值
if status == "not_submitted":
points_change = -settings.DEDUCTION_HOMEWORK_NOT_SUBMIT
else:
points_change = -settings.DEDUCTION_HOMEWORK_LATE
# 创建扣分记录
student = await StudentModel.get_by_id(submission["student_id"])
if student:
await ConductModel.create_record(
student_id=submission["student_id"],
points_change=points_change,
reason=f"作业未提交/迟交: {submission['title']}",
recorder_id=operator_id,
related_type="homework",
related_id=submission["assignment_id"]
)
# 标记已应用扣分
await HomeworkModel.mark_deduction_applied(submission_id)
logger.info(f"用户[{operator_id}] 更新作业提交状态[{submission_id}] -> {status}")
return {"success": True, "message": "状态更新成功"}

View File

@@ -0,0 +1,82 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, Optional
from models.user import UserModel
from models.student import StudentModel
from models.conduct import ConductModel
from models.homework import HomeworkModel
from models.attendance import AttendanceModel
from utils.logger import get_logger
logger = get_logger(__name__)
class ParentService:
"""家长服务"""
@staticmethod
async def get_child_conduct(parent_id: int) -> Dict[str, Any]:
"""获取子女操行分(仅总分,家长端不显示详细记录)"""
# 获取家长关联的学生
user = await UserModel.get_by_user_id(parent_id)
if not user or not user["student_id"]:
return {"error": "未关联学生"}
student = await StudentModel.get_by_id(user["student_id"])
if not student:
return {"error": "学生不存在"}
return {
"student_id": student["student_id"],
"student_name": student["name"],
"student_no": student["student_no"],
"total_points": student["total_points"]
}
@staticmethod
async def get_child_homework(parent_id: int) -> Dict[str, Any]:
"""获取子女作业情况"""
user = await UserModel.get_by_user_id(parent_id)
if not user or not user["student_id"]:
return {"error": "未关联学生"}
student = await StudentModel.get_by_id(user["student_id"])
if not student:
return {"error": "学生不存在"}
homework = await HomeworkModel.get_student_homework(user["student_id"])
return {
"student_id": student["student_id"],
"student_name": student["name"],
"homework": homework
}
@staticmethod
async def get_child_attendance(parent_id: int) -> Dict[str, Any]:
"""获取子女考勤记录"""
user = await UserModel.get_by_user_id(parent_id)
if not user or not user["student_id"]:
return {"error": "未关联学生"}
student = await StudentModel.get_by_id(user["student_id"])
if not student:
return {"error": "学生不存在"}
records = await AttendanceModel.get_student_records(user["student_id"])
return {
"student_id": student["student_id"],
"student_name": student["name"],
"records": records
}

View File

@@ -0,0 +1,146 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from models.student import StudentModel
from models.conduct import ConductModel
from models.homework import HomeworkModel
from models.attendance import AttendanceModel
from middleware.permission import PermissionChecker
from utils.logger import get_logger
logger = get_logger(__name__)
class StudentService:
"""学生服务"""
@staticmethod
async def get_conduct_history(
student_id: int,
limit: int = 50,
offset: int = 0
) -> Dict[str, Any]:
"""获取学生操行分历史(学生端显示,扣分项操作人显示为班主任)"""
student = await StudentModel.get_by_id(student_id)
if not student:
return {"error": "学生不存在"}
records = await ConductModel.get_student_records(
student_id=student_id,
limit=limit,
offset=offset
)
# 处理记录:扣分项的操作人统一显示为"班主任"
for record in records:
if record["points_change"] < 0: # 扣分项
record["recorder_name"] = "班主任"
# 加分项保持原操作人不变
return {
"student_id": student_id,
"student_name": student["name"],
"total_points": student["total_points"],
"records": records
}
@staticmethod
async def get_homework_status(student_id: int) -> Dict[str, Any]:
"""获取学生作业情况"""
student = await StudentModel.get_by_id(student_id)
if not student:
return {"error": "学生不存在"}
homework = await HomeworkModel.get_student_homework(student_id)
# 统计
total = len(homework)
submitted = sum(1 for h in homework if h["status"] == "submitted")
not_submitted = sum(1 for h in homework if h["status"] == "not_submitted")
late = sum(1 for h in homework if h["status"] == "late")
return {
"student_id": student_id,
"student_name": student["name"],
"statistics": {
"total": total,
"submitted": submitted,
"not_submitted": not_submitted,
"late": late
},
"homework": homework
}
@staticmethod
async def get_attendance_records(
student_id: int,
month: Optional[str] = None
) -> Dict[str, Any]:
"""获取学生考勤记录"""
student = await StudentModel.get_by_id(student_id)
if not student:
return {"error": "学生不存在"}
records = await AttendanceModel.get_student_records(
student_id=student_id,
month=month
)
# 统计
present = sum(1 for r in records if r["status"] == "present")
absent = sum(1 for r in records if r["status"] == "absent")
late = sum(1 for r in records if r["status"] == "late")
leave = sum(1 for r in records if r["status"] == "leave")
return {
"student_id": student_id,
"student_name": student["name"],
"statistics": {
"present": present,
"absent": absent,
"late": late,
"leave": leave,
"total": len(records)
},
"records": records
}
@staticmethod
async def get_ranking(
user_id: int,
class_id: Optional[int] = None,
limit: int = 50
) -> Dict[str, Any]:
"""获取排行榜"""
# 如果未指定班级,获取用户所在班级
if not class_id:
user = await StudentModel.get_by_id(user_id) if user_id else None
if user:
class_id = user["class_id"]
else:
admin_class = await PermissionChecker.get_user_class_id(user_id)
if admin_class:
class_id = admin_class
ranking = await StudentModel.get_ranking(class_id=class_id, limit=limit)
return {
"class_id": class_id,
"ranking": ranking
}
@staticmethod
async def get_student_info(student_id: int) -> Optional[Dict[str, Any]]:
"""获取学生个人信息"""
return await StudentModel.get_by_id(student_id)

View File

@@ -0,0 +1,77 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from typing import Dict, Any, List, Optional
from models.subject import SubjectModel
from utils.logger import get_logger
logger = get_logger(__name__)
class SubjectService:
"""科目服务"""
@staticmethod
async def get_subjects(is_active: Optional[bool] = None) -> Dict[str, Any]:
"""获取科目列表"""
subjects = await SubjectModel.get_all(is_active=is_active)
return {
"subjects": subjects,
"total": len(subjects)
}
@staticmethod
async def create_subject(
subject_name: str,
subject_code: Optional[str],
sort_order: int = 0
) -> Dict[str, Any]:
"""创建科目"""
# 检查是否已存在
existing = await SubjectModel.get_by_name(subject_name)
if existing:
return {"success": False, "message": "科目名称已存在"}
subject_id = await SubjectModel.create(
subject_name=subject_name,
subject_code=subject_code,
sort_order=sort_order
)
if subject_id:
logger.info(f"创建科目: {subject_name}")
return {"success": True, "subject_id": subject_id}
else:
return {"success": False, "message": "创建科目失败"}
@staticmethod
async def update_subject(subject_id: int, **kwargs) -> Dict[str, Any]:
"""更新科目"""
result = await SubjectModel.update(subject_id, **kwargs)
if result:
logger.info(f"更新科目: {subject_id}")
return {"success": True}
else:
return {"success": False, "message": "更新科目失败"}
@staticmethod
async def delete_subject(subject_id: int) -> Dict[str, Any]:
"""删除科目(软删除)"""
result = await SubjectModel.delete(subject_id)
if result:
logger.info(f"禁用科目: {subject_id}")
return {"success": True}
else:
return {"success": False, "message": "禁用科目失败"}