Files
2026-04-01 16:05:57 +08:00

202 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PerToolBox Server - 工具路由密码、二维码、加密、JSON
Copyright (C) 2024 Sea Network Technology Studio
Author: Canglan <admin@sea-studio.top>
License: AGPL v3
"""
import hashlib
import base64
import urllib.parse
import json
import qrcode
from io import BytesIO
import base64 as b64
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from fastapi import APIRouter, HTTPException, Request, Query
from ...schemas import (
HashRequest, Base64Request, URLRequest, AESRequest,
JSONValidateRequest, JSONValidateResponse
)
from ...middleware.rate_limit import rate_limit
router = APIRouter(prefix="/api/v1", tags=["tools"])
# ========== 密码生成 ==========
@router.get("/password/generate")
@rate_limit(requests=50, period=60)
async def generate_password(
request: Request, # 添加 request 参数
length: int = Query(12, ge=4, le=64),
upper: bool = Query(True),
lower: bool = Query(True),
digits: bool = Query(True),
symbols: bool = Query(True),
count: int = Query(1, ge=1, le=10)
):
import random
import string
chars = ""
if upper:
chars += string.ascii_uppercase
if lower:
chars += string.ascii_lowercase
if digits:
chars += string.digits
if symbols:
chars += "!@#$%^&*"
if not chars:
chars = string.ascii_letters + string.digits
passwords = [''.join(random.choice(chars) for _ in range(length)) for _ in range(count)]
return {"passwords": passwords if count > 1 else passwords[0]}
# ========== 二维码 ==========
@router.post("/qrcode/generate")
@rate_limit(requests=30, period=60)
async def generate_qrcode(
request: Request, # 添加 request 参数
content: str,
size: int = 10
):
if not content:
raise HTTPException(status_code=400, detail="内容不能为空")
qr = qrcode.QRCode(box_size=size, border=2)
qr.add_data(content)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffered = BytesIO()
img.save(buffered, format="PNG")
img_base64 = b64.b64encode(buffered.getvalue()).decode()
return {"qr_code": f"data:image/png;base64,{img_base64}"}
# ========== 哈希 ==========
@router.post("/crypto/hash")
@rate_limit(requests=100, period=60)
async def compute_hash(
request: Request, # 添加 request 参数
req: HashRequest
):
text = req.text.encode('utf-8')
algo = req.algorithm.lower()
if algo == "md5":
result = hashlib.md5(text).hexdigest()
elif algo == "sha1":
result = hashlib.sha1(text).hexdigest()
elif algo == "sha256":
result = hashlib.sha256(text).hexdigest()
elif algo == "sha512":
result = hashlib.sha512(text).hexdigest()
else:
raise HTTPException(status_code=400, detail="不支持的算法")
return {"algorithm": algo, "result": result}
# ========== Base64 ==========
@router.post("/crypto/base64")
@rate_limit(requests=100, period=60)
async def base64_process(
request: Request, # 添加 request 参数
req: Base64Request
):
if req.action == "encode":
result = base64.b64encode(req.text.encode('utf-8')).decode('utf-8')
elif req.action == "decode":
try:
result = base64.b64decode(req.text).decode('utf-8')
except Exception:
raise HTTPException(status_code=400, detail="Base64 解码失败")
else:
raise HTTPException(status_code=400, detail="无效的 action")
return {"action": req.action, "result": result}
# ========== URL 编解码 ==========
@router.post("/crypto/url")
@rate_limit(requests=100, period=60)
async def url_process(
request: Request, # 添加 request 参数
req: URLRequest
):
if req.action == "encode":
result = urllib.parse.quote(req.text, safe='')
elif req.action == "decode":
result = urllib.parse.unquote(req.text)
else:
raise HTTPException(status_code=400, detail="无效的 action")
return {"action": req.action, "result": result}
# ========== AES 加解密 ==========
@router.post("/crypto/aes")
@rate_limit(requests=50, period=60)
async def aes_process(
request: Request, # 添加 request 参数
req: AESRequest
):
try:
key = req.key.encode('utf-8')
mode_map = {"ECB": AES.MODE_ECB, "CBC": AES.MODE_CBC, "GCM": AES.MODE_GCM}
mode = mode_map.get(req.mode)
if not mode:
raise HTTPException(status_code=400, detail="不支持的 AES 模式")
if len(key) not in [16, 24, 32]:
raise HTTPException(status_code=400, detail="密钥长度必须为 16/24/32 字节")
if req.action == "encrypt":
cipher = AES.new(key, mode, iv=req.iv.encode('utf-8') if req.iv else None)
if req.mode == "GCM":
ciphertext = cipher.encrypt(req.text.encode('utf-8'))
result = b64.b64encode(ciphertext).decode('utf-8')
else:
padded = pad(req.text.encode('utf-8'), AES.block_size)
ciphertext = cipher.encrypt(padded)
result = b64.b64encode(ciphertext).decode('utf-8')
elif req.action == "decrypt":
ciphertext = b64.b64decode(req.text)
cipher = AES.new(key, mode, iv=req.iv.encode('utf-8') if req.iv else None)
if req.mode == "GCM":
plaintext = cipher.decrypt(ciphertext).decode('utf-8')
result = plaintext
else:
plaintext_padded = cipher.decrypt(ciphertext)
result = unpad(plaintext_padded, AES.block_size).decode('utf-8')
else:
raise HTTPException(status_code=400, detail="无效的 action")
return {"mode": req.mode, "action": req.action, "result": result}
except Exception as e:
raise HTTPException(status_code=400, detail=f"AES 操作失败: {str(e)}")
# ========== JSON 校验 ==========
@router.post("/json/validate", response_model=JSONValidateResponse)
@rate_limit(requests=100, period=60)
async def validate_json(
request: Request, # 添加 request 参数
req: JSONValidateRequest
):
try:
parsed = json.loads(req.json_string)
formatted = json.dumps(parsed, indent=2, ensure_ascii=False)
return JSONValidateResponse(valid=True, formatted=formatted)
except json.JSONDecodeError as e:
return JSONValidateResponse(
valid=False,
error={
"line": e.lineno,
"column": e.colno,
"message": e.msg
}
)