202 lines
6.5 KiB
Python
202 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
PerToolBox Server - 工具路由(密码、二维码、加密、JSON)
|
||
Copyright (C) 2024 Sea Network Technology Studio
|
||
Author: Canglan <admin@sea-studio.top>
|
||
License: AGPL v3
|
||
"""
|
||
|
||
import hashlib
|
||
import base64
|
||
import urllib.parse
|
||
import json
|
||
import qrcode
|
||
from io import BytesIO
|
||
import base64 as b64
|
||
from Crypto.Cipher import AES
|
||
from Crypto.Util.Padding import pad, unpad
|
||
from fastapi import APIRouter, HTTPException, Request, Query
|
||
from ...schemas import (
|
||
HashRequest, Base64Request, URLRequest, AESRequest,
|
||
JSONValidateRequest, JSONValidateResponse
|
||
)
|
||
from ...middleware.rate_limit import rate_limit
|
||
|
||
router = APIRouter(prefix="/api/v1", tags=["tools"])
|
||
|
||
# ========== 密码生成 ==========
|
||
@router.get("/password/generate")
|
||
@rate_limit(requests=50, period=60)
|
||
async def generate_password(
|
||
request: Request, # 添加 request 参数
|
||
length: int = Query(12, ge=4, le=64),
|
||
upper: bool = Query(True),
|
||
lower: bool = Query(True),
|
||
digits: bool = Query(True),
|
||
symbols: bool = Query(True),
|
||
count: int = Query(1, ge=1, le=10)
|
||
):
|
||
import random
|
||
import string
|
||
|
||
chars = ""
|
||
if upper:
|
||
chars += string.ascii_uppercase
|
||
if lower:
|
||
chars += string.ascii_lowercase
|
||
if digits:
|
||
chars += string.digits
|
||
if symbols:
|
||
chars += "!@#$%^&*"
|
||
|
||
if not chars:
|
||
chars = string.ascii_letters + string.digits
|
||
|
||
passwords = [''.join(random.choice(chars) for _ in range(length)) for _ in range(count)]
|
||
return {"passwords": passwords if count > 1 else passwords[0]}
|
||
|
||
# ========== 二维码 ==========
|
||
@router.post("/qrcode/generate")
|
||
@rate_limit(requests=30, period=60)
|
||
async def generate_qrcode(
|
||
request: Request, # 添加 request 参数
|
||
content: str,
|
||
size: int = 10
|
||
):
|
||
if not content:
|
||
raise HTTPException(status_code=400, detail="内容不能为空")
|
||
|
||
qr = qrcode.QRCode(box_size=size, border=2)
|
||
qr.add_data(content)
|
||
qr.make(fit=True)
|
||
img = qr.make_image(fill_color="black", back_color="white")
|
||
|
||
buffered = BytesIO()
|
||
img.save(buffered, format="PNG")
|
||
img_base64 = b64.b64encode(buffered.getvalue()).decode()
|
||
|
||
return {"qr_code": f"data:image/png;base64,{img_base64}"}
|
||
|
||
# ========== 哈希 ==========
|
||
@router.post("/crypto/hash")
|
||
@rate_limit(requests=100, period=60)
|
||
async def compute_hash(
|
||
request: Request, # 添加 request 参数
|
||
req: HashRequest
|
||
):
|
||
text = req.text.encode('utf-8')
|
||
algo = req.algorithm.lower()
|
||
|
||
if algo == "md5":
|
||
result = hashlib.md5(text).hexdigest()
|
||
elif algo == "sha1":
|
||
result = hashlib.sha1(text).hexdigest()
|
||
elif algo == "sha256":
|
||
result = hashlib.sha256(text).hexdigest()
|
||
elif algo == "sha512":
|
||
result = hashlib.sha512(text).hexdigest()
|
||
else:
|
||
raise HTTPException(status_code=400, detail="不支持的算法")
|
||
|
||
return {"algorithm": algo, "result": result}
|
||
|
||
# ========== Base64 ==========
|
||
@router.post("/crypto/base64")
|
||
@rate_limit(requests=100, period=60)
|
||
async def base64_process(
|
||
request: Request, # 添加 request 参数
|
||
req: Base64Request
|
||
):
|
||
if req.action == "encode":
|
||
result = base64.b64encode(req.text.encode('utf-8')).decode('utf-8')
|
||
elif req.action == "decode":
|
||
try:
|
||
result = base64.b64decode(req.text).decode('utf-8')
|
||
except Exception:
|
||
raise HTTPException(status_code=400, detail="Base64 解码失败")
|
||
else:
|
||
raise HTTPException(status_code=400, detail="无效的 action")
|
||
|
||
return {"action": req.action, "result": result}
|
||
|
||
# ========== URL 编解码 ==========
|
||
@router.post("/crypto/url")
|
||
@rate_limit(requests=100, period=60)
|
||
async def url_process(
|
||
request: Request, # 添加 request 参数
|
||
req: URLRequest
|
||
):
|
||
if req.action == "encode":
|
||
result = urllib.parse.quote(req.text, safe='')
|
||
elif req.action == "decode":
|
||
result = urllib.parse.unquote(req.text)
|
||
else:
|
||
raise HTTPException(status_code=400, detail="无效的 action")
|
||
|
||
return {"action": req.action, "result": result}
|
||
|
||
# ========== AES 加解密 ==========
|
||
@router.post("/crypto/aes")
|
||
@rate_limit(requests=50, period=60)
|
||
async def aes_process(
|
||
request: Request, # 添加 request 参数
|
||
req: AESRequest
|
||
):
|
||
try:
|
||
key = req.key.encode('utf-8')
|
||
mode_map = {"ECB": AES.MODE_ECB, "CBC": AES.MODE_CBC, "GCM": AES.MODE_GCM}
|
||
mode = mode_map.get(req.mode)
|
||
|
||
if not mode:
|
||
raise HTTPException(status_code=400, detail="不支持的 AES 模式")
|
||
|
||
if len(key) not in [16, 24, 32]:
|
||
raise HTTPException(status_code=400, detail="密钥长度必须为 16/24/32 字节")
|
||
|
||
if req.action == "encrypt":
|
||
cipher = AES.new(key, mode, iv=req.iv.encode('utf-8') if req.iv else None)
|
||
if req.mode == "GCM":
|
||
ciphertext = cipher.encrypt(req.text.encode('utf-8'))
|
||
result = b64.b64encode(ciphertext).decode('utf-8')
|
||
else:
|
||
padded = pad(req.text.encode('utf-8'), AES.block_size)
|
||
ciphertext = cipher.encrypt(padded)
|
||
result = b64.b64encode(ciphertext).decode('utf-8')
|
||
elif req.action == "decrypt":
|
||
ciphertext = b64.b64decode(req.text)
|
||
cipher = AES.new(key, mode, iv=req.iv.encode('utf-8') if req.iv else None)
|
||
if req.mode == "GCM":
|
||
plaintext = cipher.decrypt(ciphertext).decode('utf-8')
|
||
result = plaintext
|
||
else:
|
||
plaintext_padded = cipher.decrypt(ciphertext)
|
||
result = unpad(plaintext_padded, AES.block_size).decode('utf-8')
|
||
else:
|
||
raise HTTPException(status_code=400, detail="无效的 action")
|
||
|
||
return {"mode": req.mode, "action": req.action, "result": result}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=400, detail=f"AES 操作失败: {str(e)}")
|
||
|
||
# ========== JSON 校验 ==========
|
||
@router.post("/json/validate", response_model=JSONValidateResponse)
|
||
@rate_limit(requests=100, period=60)
|
||
async def validate_json(
|
||
request: Request, # 添加 request 参数
|
||
req: JSONValidateRequest
|
||
):
|
||
try:
|
||
parsed = json.loads(req.json_string)
|
||
formatted = json.dumps(parsed, indent=2, ensure_ascii=False)
|
||
return JSONValidateResponse(valid=True, formatted=formatted)
|
||
except json.JSONDecodeError as e:
|
||
return JSONValidateResponse(
|
||
valid=False,
|
||
error={
|
||
"line": e.lineno,
|
||
"column": e.colno,
|
||
"message": e.msg
|
||
}
|
||
) |