Fastapi tạo đầu nối api cho fcm với khả năng tối ưu gửi với httpx và token
Bạn chạy lệnh :
python3 -m pip install -r requirements.txt
để cài đặt các gói cần thiết để chạy app .
Trong worker chạy lệnh thay :
trong config o giao dien python manager them : worker_class = 'uvicorn.workers.UvicornWorker' thay cho worker_class = "sync" như sau
worker_class = 'sync' bằng
worker_class = 'uvicorn.workers.UvicornWorker'
Và đoạn code main.py
from fastapi import FastAPI, HTTPException
from typing import List # Đảm bảo dòng này nằm trên cùng nếu chưa có
import asyncio
from pydantic import BaseModel
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from datetime import datetime, timedelta
import os
import httpx
import uvicorn
app = FastAPI()
CREDENTIAL_PATH = os.path.join(os.getcwd(), "firebase-adminsdk.json")
SCOPES = ["https://www.googleapis.com/auth/firebase.messaging"]
PROJECT_ID = "truyenvideo-3eb09"
# Token cache
_token_cache = {
"token": None,
"expire": datetime.utcnow()
}
def get_access_token():
now = datetime.utcnow()
if _token_cache["token"] and _token_cache["expire"] > now:
return _token_cache["token"]
credentials = service_account.Credentials.from_service_account_file(
CREDENTIAL_PATH, scopes=SCOPES
)
credentials.refresh(Request())
_token_cache["token"] = credentials.token
_token_cache["expire"] = now + timedelta(minutes=40) # token sống ~60 phút
return credentials.token
class FCMRequest(BaseModel):
token: str
title: str
body: str
image: str = None
url: str = None
class FCMMultiRequest(BaseModel):
tokens: List[str]
title: str
body: str
image: str = None
url: str = None
@app.get("/")
def root():
return {"status": "OK"}
# $response = Http::post('http://127.0.0.1:8000/send-fcm', [
# 'token' => 'flMcjT6Yh0afxHlezVBx7g:APA91bEebU4AAnl_f9Jlf1ZuxkFJYdDlnGJStDBoJapjOZuDIwjob_eqWYDvCBMS_jp-z2C-LDnipVZ0QX4CUCtXmMGobgu485Y01oSQpKd6PS8cpLcQhGs',
# 'title' => 'Hello!',
# 'body' => 'Đây là thông báo từ FastAPI',
# 'image' => 'https://i3.ytimg.com/vi/OiMqLZlB8DY/maxresdefault.jpg',
# 'url' => 'https://www.youtube.com/watch?v=w0Dapj9kKS0&ab_channel=H%C5%A9Mu%E1%BB%91iAudio',
# ]);
#
# // Kiểm tra phản hồi
# if ($response->successful()) {
# dd($response->json());
# } else {
# dd($response->status(), $response->body());
# }
@app.post("/send-fcm")
async def send_fcm(req: FCMRequest):
try:
access_token = get_access_token()
fcm_url = f"https://fcm.googleapis.com/v1/projects/{PROJECT_ID}/messages:send"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
message = {
"message": {
"token": req.token,
"data": {
"title": req.title,
"body": req.body,
"image": req.image or "",
"url": req.url or "",
},
}
}
async with httpx.AsyncClient() as client:
response = await client.post(fcm_url, headers=headers, json=message)
if response.status_code != 200:
return {
"status": "failed",
"status_code": response.status_code,
"error": response.text,
}
return {"status": "success", "response": response.json()}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def send_single_token(token: str, payload: dict, access_token: str):
fcm_url = f"https://fcm.googleapis.com/v1/projects/{PROJECT_ID}/messages:send"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
message = {
"message": {
"token": token,
"data": payload,
}
}
async with httpx.AsyncClient() as client:
response = await client.post(fcm_url, headers=headers, json=message)
return {
"token": token,
"status_code": response.status_code,
"response": response.json() if response.status_code == 200 else response.text
}
# send more tokens
# $response = Http::post('http://127.0.0.1:8000/send-multiple-fcm', [
# 'tokens' => [
# 'fcm_token_1',
# 'fcm_token_2',
# 'fcm_token_3',
# ],
# 'title' => '🔥 Tin nóng',
# 'body' => 'Nội dung mới đã cập nhật!',
# 'image' => 'https://i.imgur.com/banner.png',
# 'url' => 'https://truyenvideo.com'
# ]);
@app.post("/send-multiple-fcm")
async def send_multiple_fcm(req: FCMMultiRequest):
try:
access_token = get_access_token()
payload = {
"title": req.title,
"body": req.body,
"image": req.image or "",
"url": req.url or "",
}
batch_size = 500
all_results = []
for i in range(0, len(req.tokens), batch_size):
batch = req.tokens[i:i + batch_size]
tasks = [send_single_token(token, payload, access_token) for token in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
all_results.append({
"token": "unknown",
"status_code": 500,
"response": str(result)
})
else:
all_results.append(result)
# Tuỳ chọn: nghỉ giữa các batch nếu cần
await asyncio.sleep(0.2) # giảm tải nếu cần
return {
"sent_count": len(all_results),
"success": [r for r in all_results if r["status_code"] == 200],
"failed": [r for r in all_results if r["status_code"] != 200],
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)
Link download : https://drive.google.com/file/d/16YlX2y2vXiDEXL62wfHdZBG-4Sqcb-nb/view?usp=sharing