#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ PerToolBox Server - 工具路由(密码、二维码、加密、JSON) Copyright (C) 2024 Sea Network Technology Studio Author: Canglan License: AGPL v3 """ import hashlib import base64 import urllib.parse import json import qrcode from io import BytesIO import base64 as b64 from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad from fastapi import APIRouter, HTTPException from ...schemas import ( HashRequest, Base64Request, URLRequest, AESRequest, JSONValidateRequest, JSONValidateResponse ) from ...middleware.rate_limit import rate_limit router = APIRouter(prefix="/api/v1", tags=["tools"]) # ========== 密码生成 ========== @router.get("/password/generate") @rate_limit(requests=50, period=60) async def generate_password( length: int = 12, upper: bool = True, lower: bool = True, digits: bool = True, symbols: bool = True, count: int = 1 ): import random import string chars = "" if upper: chars += string.ascii_uppercase if lower: chars += string.ascii_lowercase if digits: chars += string.digits if symbols: chars += "!@#$%^&*" if not chars: chars = string.ascii_letters + string.digits passwords = [''.join(random.choice(chars) for _ in range(length)) for _ in range(count)] return {"passwords": passwords if count > 1 else passwords[0]} # ========== 二维码 ========== @router.post("/qrcode/generate") @rate_limit(requests=30, period=60) async def generate_qrcode(content: str, size: int = 10): if not content: raise HTTPException(status_code=400, detail="内容不能为空") qr = qrcode.QRCode(box_size=size, border=2) qr.add_data(content) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffered = BytesIO() img.save(buffered, format="PNG") img_base64 = b64.b64encode(buffered.getvalue()).decode() return {"qr_code": f"data:image/png;base64,{img_base64}"} # ========== 哈希 ========== @router.post("/crypto/hash") @rate_limit(requests=100, period=60) async def compute_hash(request: HashRequest): text = request.text.encode('utf-8') algo = request.algorithm.lower() if algo == "md5": result = hashlib.md5(text).hexdigest() elif algo == "sha1": result = hashlib.sha1(text).hexdigest() elif algo == "sha256": result = hashlib.sha256(text).hexdigest() elif algo == "sha512": result = hashlib.sha512(text).hexdigest() else: raise HTTPException(status_code=400, detail="不支持的算法") return {"algorithm": algo, "result": result} # ========== Base64 ========== @router.post("/crypto/base64") @rate_limit(requests=100, period=60) async def base64_process(request: Base64Request): if request.action == "encode": result = base64.b64encode(request.text.encode('utf-8')).decode('utf-8') elif request.action == "decode": try: result = base64.b64decode(request.text).decode('utf-8') except Exception: raise HTTPException(status_code=400, detail="Base64 解码失败") else: raise HTTPException(status_code=400, detail="无效的 action") return {"action": request.action, "result": result} # ========== URL 编解码 ========== @router.post("/crypto/url") @rate_limit(requests=100, period=60) async def url_process(request: URLRequest): if request.action == "encode": result = urllib.parse.quote(request.text, safe='') elif request.action == "decode": result = urllib.parse.unquote(request.text) else: raise HTTPException(status_code=400, detail="无效的 action") return {"action": request.action, "result": result} # ========== AES 加解密 ========== @router.post("/crypto/aes") @rate_limit(requests=50, period=60) async def aes_process(request: AESRequest): try: key = request.key.encode('utf-8') mode_map = {"ECB": AES.MODE_ECB, "CBC": AES.MODE_CBC, "GCM": AES.MODE_GCM} mode = mode_map.get(request.mode) if not mode: raise HTTPException(status_code=400, detail="不支持的 AES 模式") # 密钥长度处理 if len(key) not in [16, 24, 32]: raise HTTPException(status_code=400, detail="密钥长度必须为 16/24/32 字节") if request.action == "encrypt": cipher = AES.new(key, mode, iv=request.iv.encode('utf-8') if request.iv else None) if request.mode == "GCM": ciphertext = cipher.encrypt(request.text.encode('utf-8')) result = b64.b64encode(ciphertext).decode('utf-8') else: padded = pad(request.text.encode('utf-8'), AES.block_size) ciphertext = cipher.encrypt(padded) result = b64.b64encode(ciphertext).decode('utf-8') elif request.action == "decrypt": ciphertext = b64.b64decode(request.text) cipher = AES.new(key, mode, iv=request.iv.encode('utf-8') if request.iv else None) if request.mode == "GCM": plaintext = cipher.decrypt(ciphertext).decode('utf-8') result = plaintext else: plaintext_padded = cipher.decrypt(ciphertext) result = unpad(plaintext_padded, AES.block_size).decode('utf-8') else: raise HTTPException(status_code=400, detail="无效的 action") return {"mode": request.mode, "action": request.action, "result": result} except Exception as e: raise HTTPException(status_code=400, detail=f"AES 操作失败: {str(e)}") # ========== JSON 校验 ========== @router.post("/json/validate", response_model=JSONValidateResponse) @rate_limit(requests=100, period=60) async def validate_json(request: JSONValidateRequest): try: parsed = json.loads(request.json_string) formatted = json.dumps(parsed, indent=2, ensure_ascii=False) return JSONValidateResponse(valid=True, formatted=formatted) except json.JSONDecodeError as e: return JSONValidateResponse( valid=False, error={ "line": e.lineno, "column": e.colno, "message": e.msg } )