Fastapi tạo đầu nối api cho fcm với khả năng tối ưu gửi với httpx và token

Fastapi tạo đầu nối api cho fcm với khả năng tối ưu gửi với httpx và token

Bạn chạy lệnh : 

python3 -m pip install -r requirements.txt 

để cài đặt các gói cần thiết để chạy app .

Trong worker chạy lệnh thay :

trong config o giao dien python manager them : worker_class = 'uvicorn.workers.UvicornWorker' thay cho worker_class = "sync" như sau

worker_class = 'sync' bằng
worker_class = 'uvicorn.workers.UvicornWorker'

Và đoạn code main.py

from fastapi import FastAPI, HTTPException
from typing import List  # Đảm bảo dòng này nằm trên cùng nếu chưa có
import asyncio
from pydantic import BaseModel
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from datetime import datetime, timedelta
import os
import httpx
import uvicorn

app = FastAPI()

CREDENTIAL_PATH = os.path.join(os.getcwd(), "firebase-adminsdk.json")
SCOPES = ["https://www.googleapis.com/auth/firebase.messaging"]
PROJECT_ID = "truyenvideo-3eb09"

# Token cache
_token_cache = {
    "token": None,
    "expire": datetime.utcnow()
}

def get_access_token():
    now = datetime.utcnow()
    if _token_cache["token"] and _token_cache["expire"] > now:
        return _token_cache["token"]

    credentials = service_account.Credentials.from_service_account_file(
        CREDENTIAL_PATH, scopes=SCOPES
    )
    credentials.refresh(Request())
    _token_cache["token"] = credentials.token
    _token_cache["expire"] = now + timedelta(minutes=40)  # token sống ~60 phút
    return credentials.token

class FCMRequest(BaseModel):
    token: str
    title: str
    body: str
    image: str = None
    url: str = None

class FCMMultiRequest(BaseModel):
    tokens: List[str]
    title: str
    body: str
    image: str = None
    url: str = None

@app.get("/")
def root():
    return {"status": "OK"}

# $response = Http::post('http://127.0.0.1:8000/send-fcm', [
#     'token' => 'flMcjT6Yh0afxHlezVBx7g:APA91bEebU4AAnl_f9Jlf1ZuxkFJYdDlnGJStDBoJapjOZuDIwjob_eqWYDvCBMS_jp-z2C-LDnipVZ0QX4CUCtXmMGobgu485Y01oSQpKd6PS8cpLcQhGs',
#     'title' => 'Hello!',
#     'body' => 'Đây là thông báo từ FastAPI',
#     'image' => 'https://i3.ytimg.com/vi/OiMqLZlB8DY/maxresdefault.jpg',
#     'url'   => 'https://www.youtube.com/watch?v=w0Dapj9kKS0&ab_channel=H%C5%A9Mu%E1%BB%91iAudio',
# ]);
#
# // Kiểm tra phản hồi
# if ($response->successful()) {
#     dd($response->json());
# } else {
#     dd($response->status(), $response->body());
# }

@app.post("/send-fcm")
async def send_fcm(req: FCMRequest):
    try:
        access_token = get_access_token()

        fcm_url = f"https://fcm.googleapis.com/v1/projects/{PROJECT_ID}/messages:send"
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        message = {
            "message": {
                "token": req.token,
                "data": {
                    "title": req.title,
                    "body": req.body,
                    "image": req.image or "",
                    "url": req.url or "",
                },
            }
        }

        async with httpx.AsyncClient() as client:
            response = await client.post(fcm_url, headers=headers, json=message)

        if response.status_code != 200:
            return {
                "status": "failed",
                "status_code": response.status_code,
                "error": response.text,
            }

        return {"status": "success", "response": response.json()}

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

async def send_single_token(token: str, payload: dict, access_token: str):
    fcm_url = f"https://fcm.googleapis.com/v1/projects/{PROJECT_ID}/messages:send"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }

    message = {
        "message": {
            "token": token,
            "data": payload,
        }
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(fcm_url, headers=headers, json=message)
        return {
            "token": token,
            "status_code": response.status_code,
            "response": response.json() if response.status_code == 200 else response.text
        }

# send more tokens
# $response = Http::post('http://127.0.0.1:8000/send-multiple-fcm', [
#     'tokens' => [
#         'fcm_token_1',
#         'fcm_token_2',
#         'fcm_token_3',
#     ],
#     'title' => '🔥 Tin nóng',
#     'body' => 'Nội dung mới đã cập nhật!',
#     'image' => 'https://i.imgur.com/banner.png',
#     'url' => 'https://truyenvideo.com'
# ]);


@app.post("/send-multiple-fcm")
async def send_multiple_fcm(req: FCMMultiRequest):
    try:
        access_token = get_access_token()
        payload = {
            "title": req.title,
            "body": req.body,
            "image": req.image or "",
            "url": req.url or "",
        }
        batch_size = 500
        all_results = []

        for i in range(0, len(req.tokens), batch_size):
            batch = req.tokens[i:i + batch_size]
            tasks = [send_single_token(token, payload, access_token) for token in batch]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            for result in results:
                if isinstance(result, Exception):
                    all_results.append({
                        "token": "unknown",
                        "status_code": 500,
                        "response": str(result)
                    })
                else:
                    all_results.append(result)

            # Tuỳ chọn: nghỉ giữa các batch nếu cần
            await asyncio.sleep(0.2)  # giảm tải nếu cần

        return {
            "sent_count": len(all_results),
            "success": [r for r in all_results if r["status_code"] == 200],
            "failed": [r for r in all_results if r["status_code"] != 200],
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)

Link download : https://drive.google.com/file/d/16YlX2y2vXiDEXL62wfHdZBG-4Sqcb-nb/view?usp=sharing