262 lines
9.0 KiB
Python
262 lines
9.0 KiB
Python
# ===========================================
|
|
# 班级操行分管理系统 - 后端服务
|
|
#
|
|
# 开发者: Canglan
|
|
# 联系方式: admin@sea-studio.top
|
|
# 版权归属: Sea Network Technology Studio
|
|
# 许可证: MIT License
|
|
#
|
|
# 版权所有 © Sea Network Technology Studio
|
|
# ===========================================
|
|
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime
|
|
from utils.database import execute_one, execute_query, execute_insert, execute_update
|
|
from utils.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class ConductModel:
|
|
"""操行分数据模型"""
|
|
|
|
@staticmethod
|
|
async def create_record(
|
|
student_id: int,
|
|
points_change: int,
|
|
reason: str,
|
|
recorder_id: int,
|
|
recorder_name: str = None,
|
|
related_type: str = 'manual',
|
|
related_id: int = None
|
|
) -> int:
|
|
"""创建操行分记录"""
|
|
sql = """
|
|
INSERT INTO conduct_records
|
|
(student_id, points_change, reason, recorder_id, recorder_name, related_type, related_id)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
"""
|
|
return await execute_insert(sql, (
|
|
student_id, points_change, reason, recorder_id, recorder_name, related_type, related_id
|
|
))
|
|
|
|
@staticmethod
|
|
async def get_student_records(
|
|
student_id: int,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
include_revoked: bool = False
|
|
) -> List[Dict[str, Any]]:
|
|
"""获取学生操行分记录"""
|
|
sql = """
|
|
SELECT cr.*, u.real_name as recorder_name
|
|
FROM conduct_records cr
|
|
LEFT JOIN users u ON cr.recorder_id = u.user_id
|
|
WHERE cr.student_id = %s
|
|
"""
|
|
if not include_revoked:
|
|
sql += " AND cr.is_revoked = 0"
|
|
sql += " ORDER BY cr.created_at DESC LIMIT %s OFFSET %s"
|
|
return await execute_query(sql, (student_id, limit, offset))
|
|
|
|
@staticmethod
|
|
async def get_records_by_recorder(
|
|
recorder_id: int,
|
|
limit: int = 50,
|
|
offset: int = 0
|
|
) -> List[Dict[str, Any]]:
|
|
"""获取操作人提交的记录"""
|
|
sql = """
|
|
SELECT cr.*, s.name as student_name
|
|
FROM conduct_records cr
|
|
JOIN students s ON cr.student_id = s.student_id
|
|
WHERE cr.recorder_id = %s AND cr.is_revoked = 0
|
|
ORDER BY cr.created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
return await execute_query(sql, (recorder_id, limit, offset))
|
|
|
|
@staticmethod
|
|
async def get_all_records(
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
student_id: int = None,
|
|
include_revoked: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""获取所有记录(班主任/班长专用)"""
|
|
# 空字符串转为None
|
|
if start_date == "":
|
|
start_date = None
|
|
if end_date == "":
|
|
end_date = None
|
|
|
|
sql = """
|
|
SELECT cr.*, s.name as student_name, s.student_no, u.real_name as recorder_name
|
|
FROM conduct_records cr
|
|
JOIN students s ON cr.student_id = s.student_id
|
|
JOIN users u ON cr.recorder_id = u.user_id
|
|
WHERE 1=1
|
|
"""
|
|
if not include_revoked:
|
|
sql += " AND cr.is_revoked = 0"
|
|
params = []
|
|
|
|
if student_id:
|
|
sql += " AND cr.student_id = %s"
|
|
params.append(student_id)
|
|
|
|
if start_date:
|
|
sql += " AND DATE(cr.created_at) >= %s"
|
|
params.append(start_date)
|
|
|
|
if end_date:
|
|
sql += " AND DATE(cr.created_at) <= %s"
|
|
params.append(end_date)
|
|
|
|
sql += " ORDER BY cr.created_at DESC LIMIT %s OFFSET %s"
|
|
params.extend([limit, offset])
|
|
|
|
return await execute_query(sql, tuple(params))
|
|
|
|
@staticmethod
|
|
async def get_grouped_records(
|
|
student_id: int = None,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
page: int = 1,
|
|
page_size: int = 20
|
|
) -> Dict[str, Any]:
|
|
"""获取分组后的操行分记录(同批次合并)"""
|
|
if start_date == "":
|
|
start_date = None
|
|
if end_date == "":
|
|
end_date = None
|
|
|
|
conditions = ["cr.is_revoked = 0"]
|
|
params = []
|
|
|
|
if student_id:
|
|
conditions.append("cr.student_id = %s")
|
|
params.append(student_id)
|
|
if start_date:
|
|
conditions.append("cr.created_at >= %s")
|
|
params.append(start_date)
|
|
if end_date:
|
|
conditions.append("cr.created_at <= %s")
|
|
params.append(end_date + ' 23:59:59')
|
|
|
|
where_clause = " AND ".join(conditions)
|
|
|
|
count_sql = f"""
|
|
SELECT COUNT(DISTINCT CONCAT(cr.points_change, '|', cr.reason, '|', cr.recorder_id, '|', DATE_FORMAT(cr.created_at, '%%Y-%%m-%%d %%H:%%i'))) as total
|
|
FROM conduct_records cr
|
|
WHERE {where_clause}
|
|
"""
|
|
|
|
data_sql = f"""
|
|
SELECT
|
|
cr.points_change,
|
|
cr.reason,
|
|
cr.recorder_name,
|
|
DATE_FORMAT(MIN(cr.created_at), '%%Y-%%m-%%d %%H:%%i:%%s') as created_at,
|
|
GROUP_CONCAT(s.name ORDER BY s.student_id SEPARATOR ', ') as student_names,
|
|
COUNT(*) as student_count
|
|
FROM conduct_records cr
|
|
JOIN students s ON cr.student_id = s.student_id
|
|
WHERE {where_clause}
|
|
GROUP BY cr.points_change, cr.reason, cr.recorder_id, DATE_FORMAT(cr.created_at, '%%Y-%%m-%%d %%H:%%i')
|
|
ORDER BY MIN(cr.created_at) DESC
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
|
|
params_for_count = list(params)
|
|
params_for_data = list(params) + [page_size, (page - 1) * page_size]
|
|
|
|
total_result = await execute_one(count_sql, tuple(params_for_count))
|
|
total = total_result['total'] if total_result else 0
|
|
|
|
records = await execute_query(data_sql, tuple(params_for_data))
|
|
|
|
return {
|
|
"records": records,
|
|
"total": total,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total_pages": (total + page_size - 1) // page_size
|
|
}
|
|
|
|
@staticmethod
|
|
async def get_record_by_id(record_id: int) -> Optional[Dict[str, Any]]:
|
|
"""根据ID获取记录"""
|
|
sql = """
|
|
SELECT cr.*, s.name as student_name, s.total_points
|
|
FROM conduct_records cr
|
|
JOIN students s ON cr.student_id = s.student_id
|
|
WHERE cr.record_id = %s
|
|
"""
|
|
return await execute_one(sql, (record_id,))
|
|
|
|
@staticmethod
|
|
async def revoke_record(record_id: int, revoker_id: int) -> bool:
|
|
"""撤销记录"""
|
|
try:
|
|
sql = """
|
|
UPDATE conduct_records
|
|
SET is_revoked = 1, revoked_by = %s, revoked_at = NOW()
|
|
WHERE record_id = %s AND is_revoked = 0
|
|
"""
|
|
result = await execute_update(sql, (revoker_id, record_id))
|
|
return result > 0
|
|
except Exception as e:
|
|
logger.error(f"撤销记录失败: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
async def restore_record(record_id: int, restorer_id: int) -> bool:
|
|
"""反撤销(恢复)已撤销的记录"""
|
|
try:
|
|
sql = """
|
|
UPDATE conduct_records
|
|
SET is_revoked = 0, revoked_by = NULL, revoked_at = NULL
|
|
WHERE record_id = %s AND is_revoked = 1
|
|
"""
|
|
result = await execute_update(sql, (record_id,))
|
|
return result > 0
|
|
except Exception as e:
|
|
logger.error(f"恢复记录失败: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
async def batch_create_records(records_data: List[Dict]) -> List[Dict]:
|
|
"""批量创建操行分记录"""
|
|
results = []
|
|
for record in records_data:
|
|
try:
|
|
record_id = await ConductModel.create_record(
|
|
student_id=record.get('student_id'),
|
|
points_change=record.get('points_change'),
|
|
reason=record.get('reason'),
|
|
recorder_id=record.get('recorder_id'),
|
|
recorder_name=record.get('recorder_name')
|
|
)
|
|
results.append({
|
|
'student_id': record.get('student_id'),
|
|
'success': True,
|
|
'record_id': record_id
|
|
})
|
|
except Exception as e:
|
|
results.append({
|
|
'student_id': record.get('student_id'),
|
|
'success': False,
|
|
'error': str(e)
|
|
})
|
|
return results
|
|
|
|
@staticmethod
|
|
async def get_student_total_points(student_id: int) -> int:
|
|
"""获取学生当前总分"""
|
|
sql = "SELECT total_points FROM students WHERE student_id = %s"
|
|
result = await execute_one(sql, (student_id,))
|
|
return result['total_points'] if result else 100 |