v1.7版本更新

This commit is contained in:
2026-05-21 20:15:56 +08:00
parent 74a71ddaf5
commit cb0c367eb7
54 changed files with 2292 additions and 1785 deletions

View File

@@ -54,6 +54,8 @@ REDIS_MAX_CONNECTIONS=50
JWT_SECRET_KEY=your-jwt-secret-key-min-32-chars
JWT_ALGORITHM=HS256
JWT_EXPIRE_MINUTES=30
# JWT空闲超时时间分钟- 无操作超过此时间需重新登录
JWT_IDLE_TIMEOUT_MINUTES=10
# ===========================================
# 密码加密配置

View File

@@ -22,6 +22,7 @@ from utils.database import init_db_pool, close_db_pool
from utils.redis_client import init_redis_pool, close_redis_pool
from middleware.auth_middleware import AuthMiddleware
from routes import auth, student, parent, admin, subject, semester, debug
from routes.config import router as config_router
# 设置日志
@@ -117,6 +118,7 @@ app.include_router(parent.router, prefix="/api/parent", tags=["家长端"])
app.include_router(admin.router, prefix="/api/admin", tags=["管理端"])
app.include_router(subject.router, prefix="/api/subject", tags=["科目管理"])
app.include_router(semester.router, prefix="/api/semester", tags=["学期管理"])
app.include_router(config_router, prefix="/api/config", tags=["配置"])
app.include_router(debug.router, tags=["调试"])

View File

@@ -30,6 +30,7 @@ PUBLIC_PATHS = [
r'^/health$',
r'^/api/auth/login$',
r'^/api/auth/logout$',
r'^/api/config/deduction-rules$',
]
def is_public_path(path: str) -> bool:
"""检查是否为公开路径"""

View File

@@ -56,6 +56,21 @@ class SanitizeMiddleware(BaseHTTPMiddleware):
# 去除首尾空格
value = value.strip()
# SQL注入模式检测
sql_patterns = [
r'(?i)(\bunion\b\s+\bselect\b)',
r'(?i)(\bor\b\s+\d+\s*=\s*\d+)',
r'(?i)(\bdrop\b\s+\btable\b)',
r'(?i)(\bdelete\b\s+\bfrom\b)',
r'(?i)(\binsert\b\s+\binto\b)',
r'(?i)(\bupdate\b\s+\w+\s+\bset\b)',
]
for pattern in sql_patterns:
value = re.sub(pattern, '', value)
# 路径遍历检测
value = value.replace('../', '').replace('..\\', '')
# 限制长度
if len(value) > 1000:
value = value[:1000]
@@ -106,7 +121,9 @@ def validate_reason(reason: str) -> tuple:
"""
if not reason or not reason.strip():
return False, "原因不能为空"
if len(reason) > 255:
# 计算可见字符长度(不含换行符),支持多行输入
visible_length = len(reason.replace('\n', ''))
if visible_length > 255:
return False, "原因长度不能超过255个字符"
return True, ""

View File

@@ -42,18 +42,22 @@ class HomeworkModel:
"""
return await execute_query(sql, tuple(subject_ids))
@staticmethod
@staticmethod
async def get_student_homework(student_id: int) -> List[Dict[str, Any]]:
sql = """
SELECT a.assignment_id, a.title, a.description, a.deadline,
s.subject_name, hs.status, hs.submit_time, hs.comments, hs.deduction_applied
SELECT a.assignment_id, a.title, a.description, a.deadline, a.created_at,
s.subject_name, hs.status, hs.submit_time, hs.comments, hs.deduction_applied,
cr.points_change AS points
FROM assignments a
JOIN subjects s ON a.subject_id = s.subject_id
LEFT JOIN homework_submissions hs ON a.assignment_id = hs.assignment_id AND hs.student_id = %s
LEFT JOIN conduct_records cr ON cr.related_type = 'homework'
AND cr.related_id = a.assignment_id AND cr.student_id = %s AND cr.is_revoked = 0
GROUP BY a.assignment_id
ORDER BY a.deadline ASC, a.created_at DESC
"""
return await execute_query(sql, (student_id,))
return await execute_query(sql, (student_id, student_id))
@staticmethod
async def get_submission(submission_id: int) -> Optional[Dict[str, Any]]:
sql = """

View File

@@ -44,8 +44,8 @@ class StudentModel:
async def get_all(include_disabled: bool = False) -> List[Dict[str, Any]]:
"""获取所有学生列表(单班级)"""
sql = """
SELECT student_id, student_no, name, total_points, parent_phone, status
FROM students
SELECT student_id, student_no, name, total_points, parent_phone, dormitory_number, status
FROM students
WHERE 1=1
"""
if not include_disabled:
@@ -58,17 +58,18 @@ class StudentModel:
student_no: str,
name: str,
parent_phone: str = None,
dormitory_number: str = None,
initial_points: int = 60
) -> int:
"""创建学生初始操行分默认60分"""
sql = """
INSERT INTO students (student_no, name, parent_phone, total_points)
VALUES (%s, %s, %s, %s)
INSERT INTO students (student_no, name, parent_phone, dormitory_number, total_points)
VALUES (%s, %s, %s, %s, %s)
"""
return await execute_insert(sql, (student_no, name, parent_phone, initial_points))
return await execute_insert(sql, (student_no, name, parent_phone, dormitory_number, initial_points))
@staticmethod
async def update(student_id: int, name: str = None, parent_phone: str = None, status: int = None) -> bool:
async def update(student_id: int, name: str = None, parent_phone: str = None, dormitory_number: str = None, status: int = None) -> bool:
"""更新学生信息"""
updates = []
params = []
@@ -79,6 +80,9 @@ class StudentModel:
if parent_phone is not None:
updates.append("parent_phone = %s")
params.append(parent_phone)
if dormitory_number is not None:
updates.append("dormitory_number = %s")
params.append(dormitory_number)
if status is not None:
updates.append("status = %s")
params.append(status)
@@ -101,7 +105,7 @@ class StudentModel:
@staticmethod
async def update_total_points(student_id: int, points_change: int) -> bool:
"""更新学生总分"""
sql = "UPDATE students SET total_points = total_points + %s WHERE student_id = %s"
sql = "UPDATE students SET total_points = total_points + %s, points_updated_at = CURRENT_TIMESTAMP WHERE student_id = %s"
result = await execute_update(sql, (points_change, student_id))
return result > 0
@@ -109,10 +113,10 @@ class StudentModel:
async def get_ranking(limit: int = 50) -> List[Dict[str, Any]]:
"""获取学生排行(单班级)"""
sql = """
SELECT student_id, student_no, name, total_points
SELECT student_id, student_no, name, total_points, points_updated_at
FROM students
WHERE status = 1
ORDER BY total_points DESC
ORDER BY total_points DESC, points_updated_at ASC
LIMIT %s
"""
results = await execute_query(sql, (limit,))
@@ -137,6 +141,7 @@ class StudentModel:
student_no=student.get('student_no'),
name=student.get('name'),
parent_phone=student.get('parent_phone'),
dormitory_number=student.get('dormitory_number'),
initial_points=initial_points
)
results.append({

View File

@@ -44,7 +44,7 @@ class UserModel:
@staticmethod
async def create_student(username: str, password: str, real_name: str, student_id: int) -> int:
"""创建学生账号"""
password_hash = security.sha1_md5_password(password)
password_hash = security.bcrypt_password(password)
sql = """
INSERT INTO users (username, password_hash, real_name, user_type, student_id, need_change_password)
VALUES (%s, %s, %s, 'student', %s, 1)
@@ -54,7 +54,7 @@ class UserModel:
@staticmethod
async def create_parent(username: str, password: str, real_name: str, student_id: int) -> int:
"""创建家长账号"""
password_hash = security.sha1_md5_password(password)
password_hash = security.bcrypt_password(password)
sql = """
INSERT INTO users (username, password_hash, real_name, user_type, student_id, need_change_password)
VALUES (%s, %s, %s, 'parent', %s, 0)
@@ -64,7 +64,7 @@ class UserModel:
@staticmethod
async def create_admin(username: str, password: str, real_name: str) -> int:
"""创建管理员账号"""
password_hash = security.sha1_md5_password(password)
password_hash = security.bcrypt_password(password)
sql = """
INSERT INTO users (username, password_hash, real_name, user_type, need_change_password)
VALUES (%s, %s, %s, 'admin', 1)
@@ -74,9 +74,9 @@ class UserModel:
@staticmethod
async def update_password(user_id: int, new_password: str) -> bool:
"""更新密码"""
password_hash = security.sha1_md5_password(new_password)
password_hash = security.bcrypt_password(new_password)
sql = """
UPDATE users
UPDATE users
SET password_hash = %s, need_change_password = 0
WHERE user_id = %s
"""

29
backend/routes/config.py Normal file
View File

@@ -0,0 +1,29 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from fastapi import APIRouter
from config import settings
from utils.response import success_response
router = APIRouter()
@router.get("/deduction-rules")
async def get_deduction_rules():
"""获取扣分规则配置(公开接口)"""
data = {
"DEDUCTION_HOMEWORK_NOT_SUBMIT": settings.DEDUCTION_HOMEWORK_NOT_SUBMIT,
"DEDUCTION_HOMEWORK_LATE": settings.DEDUCTION_HOMEWORK_LATE,
"DEDUCTION_ATTENDANCE_ABSENT": settings.DEDUCTION_ATTENDANCE_ABSENT,
"DEDUCTION_ATTENDANCE_LATE": settings.DEDUCTION_ATTENDANCE_LATE,
"DEDUCTION_ATTENDANCE_LEAVE": settings.DEDUCTION_ATTENDANCE_LEAVE,
"STUDENT_INITIAL_POINTS": settings.STUDENT_INITIAL_POINTS,
}
return success_response(data=data)

View File

@@ -76,6 +76,7 @@ class AddStudentRequest(BaseModel):
student_no: str = Field(..., min_length=1, max_length=20, pattern=r'^[a-zA-Z0-9]+$', description="学号")
name: str = Field(..., min_length=1, max_length=50, description="姓名")
parent_phone: Optional[str] = Field(None, max_length=11, pattern=r'^\d{0,11}$', description="家长手机号")
dormitory_number: Optional[str] = Field(None, max_length=20, description="宿舍号")
class AddAttendanceRequest(BaseModel):
@@ -109,6 +110,7 @@ class UpdateStudentRequest(BaseModel):
"""更新学生请求"""
name: Optional[str] = Field(None, min_length=1, max_length=50, description="姓名")
parent_phone: Optional[str] = Field(None, max_length=11, pattern=r'^\d{0,11}$', description="家长手机号")
dormitory_number: Optional[str] = Field(None, max_length=20, description="宿舍号")
class CreateAssignmentRequest(BaseModel):

22
backend/schemas/config.py Normal file
View File

@@ -0,0 +1,22 @@
# ===========================================
# 班级操行分管理系统 - 后端服务
#
# 开发者: Canglan
# 联系方式: admin@sea-studio.top
# 版权归属: Sea Network Technology Studio
# 许可证: MIT License
#
# 版权所有 © Sea Network Technology Studio
# ===========================================
from pydantic import BaseModel
from typing import Optional
class DeductionConfigResponse(BaseModel):
"""扣分规则配置响应"""
DEDUCTION_HOMEWORK_NOT_SUBMIT: int
DEDUCTION_HOMEWORK_LATE: int
DEDUCTION_ATTENDANCE_ABSENT: int
DEDUCTION_ATTENDANCE_LATE: int
DEDUCTION_ATTENDANCE_LEAVE: int
STUDENT_INITIAL_POINTS: int

View File

@@ -21,6 +21,7 @@ class StudentInfo(BaseModel):
name: str
total_points: int
parent_phone: Optional[str] = None
dormitory_number: Optional[str] = None
status: int

View File

@@ -88,6 +88,7 @@ class AdminService:
student_no = student.get("student_no", "").strip()
name = student.get("name", "").strip()
parent_phone = student.get("parent_phone", "").strip()
dormitory_number = student.get("dormitory_number", "").strip() if student.get("dormitory_number") else None
password = student.get("password", "").strip()
if not student_no or not name:
@@ -113,6 +114,7 @@ class AdminService:
student_no=student_no,
name=name,
parent_phone=parent_phone if parent_phone else None,
dormitory_number=dormitory_number,
initial_points=initial_points
)
existing_student_nos.add(student_no)
@@ -162,7 +164,8 @@ class AdminService:
name: str,
parent_phone: Optional[str],
operator_id: int,
initial_points: int = 60
initial_points: int = 60,
dormitory_number: Optional[str] = None
) -> Dict[str, Any]:
"""新增学生"""
if not security.validate_student_no(student_no):
@@ -179,6 +182,7 @@ class AdminService:
student_no=student_no,
name=name,
parent_phone=parent_phone if parent_phone else None,
dormitory_number=dormitory_number,
initial_points=initial_points
)
@@ -248,14 +252,14 @@ class AdminService:
return {"admins": admins}
@staticmethod
async def update_student(student_id: int, name: str = None, parent_phone: str = None) -> Dict[str, Any]:
async def update_student(student_id: int, name: str = None, parent_phone: str = None, dormitory_number: str = None) -> Dict[str, Any]:
"""编辑学生信息"""
try:
student = await StudentModel.get_by_id(student_id)
if not student:
return {"success": False, "message": "学生不存在"}
result = await StudentModel.update(student_id, name=name, parent_phone=parent_phone)
result = await StudentModel.update(student_id, name=name, parent_phone=parent_phone, dormitory_number=dormitory_number)
if result:
return {"success": True, "message": "学生信息更新成功"}
return {"success": False, "message": "更新失败"}

View File

@@ -48,11 +48,19 @@ class AuthService:
return {"success": False, "message": "用户名或密码错误"}
# 验证密码
if not security.verify_password(password, user["password_hash"]):
is_valid, needs_upgrade = security.verify_password_v2(password, user["password_hash"])
if not is_valid:
await RedisClient.set_login_attempts(username)
await LogService.write_login_log(username, 0, ip, user_agent, "用户名或密码错误")
return {"success": False, "message": "用户名或密码错误"}
# 自动升级旧哈希密码
if needs_upgrade:
try:
await UserModel.update_password(user["user_id"], password)
except Exception:
pass
# 检查账号状态
if user["status"] != 1:
await LogService.write_login_log(username, 0, ip, user_agent, "账号已被禁用")
@@ -116,8 +124,10 @@ class AuthService:
return {"success": False, "message": "用户不存在"}
# 验证原密码(强制改密时跳过)
if not force and not security.verify_password(old_password, user["password_hash"]):
return {"success": False, "message": "原密码错误"}
if not force:
is_valid, _ = security.verify_password_v2(old_password, user["password_hash"])
if not is_valid:
return {"success": False, "message": "原密码错误"}
# 验证新密码强度
is_valid, msg = security.validate_password_strength(new_password)

View File

@@ -41,7 +41,8 @@ class ParentService:
"student_id": student["student_id"],
"student_name": student["name"],
"student_no": student["student_no"],
"total_points": student["total_points"]
"total_points": student["total_points"],
"dormitory_number": student.get("dormitory_number")
}
@staticmethod

View File

@@ -12,6 +12,7 @@
import hashlib
import secrets
import re
from passlib.hash import bcrypt as bcrypt_hash
from config import settings
@@ -32,6 +33,27 @@ class SecurityUtils:
md5_hash = hashlib.md5(salted.encode('utf-8')).hexdigest()
return md5_hash
@staticmethod
def bcrypt_password(password: str) -> str:
"""使用bcrypt加密密码"""
return bcrypt_hash.using(rounds=12).hash(password)
@staticmethod
def verify_password_v2(plain_password: str, hashed_password: str) -> tuple:
"""
验证密码支持bcrypt和旧哈希
返回: (是否验证成功, 是否需要升级哈希)
"""
try:
if bcrypt_hash.verify(plain_password, hashed_password):
return True, False
except Exception:
pass
# 回退到旧的sha1_md5验证
if SecurityUtils.sha1_md5_password(plain_password) == hashed_password:
return True, True
return False, False
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
@@ -83,6 +105,21 @@ class SecurityUtils:
# 去除首尾空格
value = value.strip()
# SQL注入模式检测
sql_patterns = [
r'(?i)(\bunion\b\s+\bselect\b)',
r'(?i)(\bor\b\s+\d+\s*=\s*\d+)',
r'(?i)(\bdrop\b\s+\btable\b)',
r'(?i)(\bdelete\b\s+\bfrom\b)',
r'(?i)(\binsert\b\s+\binto\b)',
r'(?i)(\bupdate\b\s+\w+\s+\bset\b)',
]
for pattern in sql_patterns:
value = re.sub(pattern, '', value)
# 路径遍历检测
value = value.replace('../', '').replace('..\\', '')
# 限制长度
if len(value) > max_length:
value = value[:max_length]