Fast api kết hợp với mongodb demo
from fastapi import APIRouter, Request
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from pymongo.asynchronous.collection import ReturnDocument
from urllib3 import request
from db import db
router = APIRouter()
MONGO_URL = "mongodb+srv://xxxx:xxxx@server1.sicpm.mongodb.net/?retryWrites=true&w=majority&appName=Server1"
class ViewInput(BaseModel):
post_id: int
view: int
# lấy tất cả thông tin
@router.get('/')
async def root(request: Request):
db = request.app.state.mongo_db # Lấy MongoDB từ app.state
items = await db["views"].find().to_list(100) # Lấy tối đa 100 document
# Chuyển ObjectId sang str
for item in items:
item["_id"] = str(item["_id"])
return {"status": 1, "data": items}
# lấy thông tin của 1 post_id
@router.get('/show/{post_id}')
async def show(post_id: int):
client = AsyncIOMotorClient(MONGO_URL)
db1 = client["prompts"]
item = await db1["views"].find_one({"post_id": post_id})
if item:
item["_id"] = str(item["_id"])
return {"item": item}
# update hoặc tạo mới
@router.post('/')
async def create_item(item: ViewInput):
# db = request.app.state.mongo_db
data = item.dict()
data.pop("view", None) # loại bỏ field 'view' để không conflict
client = AsyncIOMotorClient(MONGO_URL)
db1 = client["prompts"]
status_insert = await db1["views"].find_one_and_update({'post_id': item.post_id}, {
"$inc": {"view": item.view},
"$setOnInsert": data
}, upsert=True, return_document=ReturnDocument.AFTER)
# status = await db1["views"].insert_one(item.dict())
if status_insert:
status_insert['_id'] = str(status_insert['_id'])
return {'status': status_insert}
# xoá dữ liệu
@router.delete("/")
async def delete_view(item: ViewInput):
client = AsyncIOMotorClient(MONGO_URL)
db1 = client["prompts"]
status_delete = await db1["views"].delete_one({"post_id": item.post_id})
if status_delete.deleted_count == 0:
return {"status": 0, "message": "Document not found"}
return {"status": 1, "message": "Document deleted"}