#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ PerToolBox Server - 工具路由(密码、二维码、加密、JSON) Copyright (C) 2024 Sea Network Technology Studio Author: Canglan License: AGPL v3 """ import hashlib import base64 import urllib.parse import json import qrcode from io import BytesIO import base64 as b64 from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad from fastapi import APIRouter, HTTPException, Request, Query from ...schemas import ( HashRequest, Base64Request, URLRequest, AESRequest, JSONValidateRequest, JSONValidateResponse ) from ...middleware.rate_limit import rate_limit router = APIRouter(prefix="/api/v1", tags=["tools"]) # ========== 密码生成 ========== @router.get("/password/generate") @rate_limit(requests=50, period=60) async def generate_password( request: Request, # 添加 request 参数 length: int = Query(12, ge=4, le=64), upper: bool = Query(True), lower: bool = Query(True), digits: bool = Query(True), symbols: bool = Query(True), count: int = Query(1, ge=1, le=10) ): import random import string chars = "" if upper: chars += string.ascii_uppercase if lower: chars += string.ascii_lowercase if digits: chars += string.digits if symbols: chars += "!@#$%^&*" if not chars: chars = string.ascii_letters + string.digits passwords = [''.join(random.choice(chars) for _ in range(length)) for _ in range(count)] return {"passwords": passwords if count > 1 else passwords[0]} # ========== 二维码 ========== @router.post("/qrcode/generate") @rate_limit(requests=30, period=60) async def generate_qrcode( request: Request, # 添加 request 参数 content: str, size: int = 10 ): if not content: raise HTTPException(status_code=400, detail="内容不能为空") qr = qrcode.QRCode(box_size=size, border=2) qr.add_data(content) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffered = BytesIO() img.save(buffered, format="PNG") img_base64 = b64.b64encode(buffered.getvalue()).decode() return {"qr_code": f"data:image/png;base64,{img_base64}"} # ========== 哈希 ========== @router.post("/crypto/hash") @rate_limit(requests=100, period=60) async def compute_hash( request: Request, # 添加 request 参数 req: HashRequest ): text = req.text.encode('utf-8') algo = req.algorithm.lower() if algo == "md5": result = hashlib.md5(text).hexdigest() elif algo == "sha1": result = hashlib.sha1(text).hexdigest() elif algo == "sha256": result = hashlib.sha256(text).hexdigest() elif algo == "sha512": result = hashlib.sha512(text).hexdigest() else: raise HTTPException(status_code=400, detail="不支持的算法") return {"algorithm": algo, "result": result} # ========== Base64 ========== @router.post("/crypto/base64") @rate_limit(requests=100, period=60) async def base64_process( request: Request, # 添加 request 参数 req: Base64Request ): if req.action == "encode": result = base64.b64encode(req.text.encode('utf-8')).decode('utf-8') elif req.action == "decode": try: result = base64.b64decode(req.text).decode('utf-8') except Exception: raise HTTPException(status_code=400, detail="Base64 解码失败") else: raise HTTPException(status_code=400, detail="无效的 action") return {"action": req.action, "result": result} # ========== URL 编解码 ========== @router.post("/crypto/url") @rate_limit(requests=100, period=60) async def url_process( request: Request, # 添加 request 参数 req: URLRequest ): if req.action == "encode": result = urllib.parse.quote(req.text, safe='') elif req.action == "decode": result = urllib.parse.unquote(req.text) else: raise HTTPException(status_code=400, detail="无效的 action") return {"action": req.action, "result": result} # ========== AES 加解密 ========== @router.post("/crypto/aes") @rate_limit(requests=50, period=60) async def aes_process( request: Request, # 添加 request 参数 req: AESRequest ): try: key = req.key.encode('utf-8') mode_map = {"ECB": AES.MODE_ECB, "CBC": AES.MODE_CBC, "GCM": AES.MODE_GCM} mode = mode_map.get(req.mode) if not mode: raise HTTPException(status_code=400, detail="不支持的 AES 模式") if len(key) not in [16, 24, 32]: raise HTTPException(status_code=400, detail="密钥长度必须为 16/24/32 字节") if req.action == "encrypt": cipher = AES.new(key, mode, iv=req.iv.encode('utf-8') if req.iv else None) if req.mode == "GCM": ciphertext = cipher.encrypt(req.text.encode('utf-8')) result = b64.b64encode(ciphertext).decode('utf-8') else: padded = pad(req.text.encode('utf-8'), AES.block_size) ciphertext = cipher.encrypt(padded) result = b64.b64encode(ciphertext).decode('utf-8') elif req.action == "decrypt": ciphertext = b64.b64decode(req.text) cipher = AES.new(key, mode, iv=req.iv.encode('utf-8') if req.iv else None) if req.mode == "GCM": plaintext = cipher.decrypt(ciphertext).decode('utf-8') result = plaintext else: plaintext_padded = cipher.decrypt(ciphertext) result = unpad(plaintext_padded, AES.block_size).decode('utf-8') else: raise HTTPException(status_code=400, detail="无效的 action") return {"mode": req.mode, "action": req.action, "result": result} except Exception as e: raise HTTPException(status_code=400, detail=f"AES 操作失败: {str(e)}") # ========== JSON 校验 ========== @router.post("/json/validate", response_model=JSONValidateResponse) @rate_limit(requests=100, period=60) async def validate_json( request: Request, # 添加 request 参数 req: JSONValidateRequest ): try: parsed = json.loads(req.json_string) formatted = json.dumps(parsed, indent=2, ensure_ascii=False) return JSONValidateResponse(valid=True, formatted=formatted) except json.JSONDecodeError as e: return JSONValidateResponse( valid=False, error={ "line": e.lineno, "column": e.colno, "message": e.msg } )