Fastapi hướng dẫn sử dụng get cache và set cache
from typing import Union
from fastapi import HTTPException, Request
import json
from fastapi import APIRouter, Depends
import redis.asyncio as redis
import httpx
from lib.redis import redis_client
post_route = APIRouter()
async def get_redis():
return redis_client
@post_route.get("/")
async def postroot():
return { "status" : 1 , "message" : "Đây là post index"}
@post_route.post("/{key}")
async def set_cache(key: str, request: Request, r = Depends(get_redis)):
# Ưu tiên dữ liệu JSON
value = None
content_type = request.headers.get("content-type", "")
if "application/json" in content_type:
try:
body = await request.json()
value = body.get("value")
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
elif "application/x-www-form-urlencoded" in content_type or "multipart/form-data" in content_type:
form = await request.form()
value = form.get("value")
else:
raise HTTPException(status_code=415, detail="Unsupported Content-Type")
if value is None:
raise HTTPException(status_code=422, detail="Missing 'value'")
await r.set(key, value)
return {"message": "Key stored", "key": key, "value": value}
@post_route.get("/{key}")
async def get_cache(key: str, r = Depends(get_redis)):
value = await r.get(key)
if value is None:
async with httpx.AsyncClient() as client:
res = await client.get('https://dummyjson.com/products/1')
if res.status_code != 200:
raise HTTPException(status_code=500, detail="Failed to fetch data")
data = res.json()
await r.set(key, json.dumps(data), ex=3600) # TTL: 1 giờ
return {"key": key, "value": data}
return {"key": key, "value": json.loads(value)}