Asyncio Fast API & AsyncPG with SqlAlchemy¶
The last two years, I focused on learning and practicing Python AsyncIO
development. I find AsyncIO very attractive
in terms of performance, especially for web applications where request processing spends most of its time waiting
for a database or an API call to return results.
Most of my Asyncio experience is based on pure Python for real time processing tasks combined with Django
for web
interfaces. Django ORM
is not ( yet ) fully asynchronous compatible, some complex queries need to be wrapped
in an sync_to_async
decorator.
FastAPI
micro-framework combined with SqlAlchemy
seems to bee a good fully AsyncIo compatible environment, let’s
build some APIs with it and explore it’s possibilities.
The goal¶
Setup a working FastApi AsyncIO environment for a basic blog application with :
SqlAlchemy
as ORM with async sessions.Alembic
to manage database migrationsFastAPI
Asynchronous API endpointsPytest
with AsyncIo for testing
Full git repository : https://github.com/pipoupiwam/fastapi-blog
Python requirements¶
Install the following requirements in a virtualenv
&> cat project/requirements.txt
# Main APP
# FastAPI framework
fastapi==0.108.0
# ORM
SQLAlchemy==2.0.25
# Replacement for psycopg2 to use asyncio with postgres.
asyncpg==0.29.0
# Database migrations
alembic==1.13.1
Project Structure¶
project/
__init__.py
app/
database/
db.py
models.py
services.py
main.py
schemas.py
migrations/
env.py
script.py.mako
alembic.ini
requirements.txt
Database Setup¶
The database related code will be under app/database.
Create the database¶
First of all we need to create a database for our project. In my case I created a database using psql
&> sudo su postgres
&> psql
&> CREATE DATABASE my_project_db;
DATABASE CREATED
Models¶
We will use SqlAlchemy for our ORM : https://docs.sqlalchemy.org/en/20/
We will use AsyncSession
with the postgresql+asyncpg driver.
&> cat project/app/database/db.py
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://postgres:password@localhost:5432/project_db"
engine = AsyncEngine(create_engine(DATABASE_URL, echo=True, future=True))
Next we need to create a function to get a Session in order to query the database
&> cat project/app/main.py
from fastapi FastAPI
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
from .database.db import engine
app = FastAPI()
async def get_db() -> AsyncSession:
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
yield session
This function could have been declared in db.py but declaring it in the API makes it easier to implement testing fixtures.
Migrations¶
Alembic
will help us manage our database migrations.
Alembic documentation : https://alembic.sqlalchemy.org/en/latest/tutorial.html
We must start by initializing alembic configuration for our project.
&> alembic init alembic
This command will generate the following files :
migrations/
versions/
.empty
env.py
script.py.mako
alembic.ini
To make things work we need to edit alembic.ini to specify to which database we will connect :
&> cat project/alembic.ini
sqlalchemy.url = postgresql+asyncpg://postgres:password@localhost:5432/project_db
Then we need to inform alembic
about the existence of our database models.
&> cat project/alembic.ini
from app.database.models import Base
target_metadata = Base.metadata
The next step is to generate the initial migrations for our models
&> alembic revision --autogenerate -m "create_author_article"
This command will generate a migration file in project/versions
Finally we can apply the generated migration in order to create the Author and Article tables in database.
&> alembic upgrade head
CRUD operations¶
We will extract the “business” logic of our application into dedicated Services. This allows us to create reusable and easily testable functions.
The API layer will be responsible for all the permissions, authorization logic and will use our services to implement the business logic.
Designing our code this way allows us to test our services in a true unitary manner.
&> cat project/app/database/services.py
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from . import models
class ArticleService:
"""
Service class for the Article model.
You should implement Article related methods in this class.
"""
@staticmethod
async def list_articles(*, db: AsyncSession):
result = await db.execute(select(models.Article))
articles = result.scalars().all()
return articles
@staticmethod
async def get_article(*, db: AsyncSession, article_id):
instance = await db.get(models.Article, article_id)
if instance is None:
raise HTTPException(status_code=404, detail="Article does not exist")
return instance
@staticmethod
async def create_article(*, db: AsyncSession, title, content, author_id):
await AuthorService.get_author(db=db, author_id=author_id)
article_instance = models.Article(title=title, content=content, author_id=author_id) # type: ignore[call-arg]
db.add(article_instance)
await db.commit()
await db.refresh(article_instance)
return article_instance
@staticmethod
async def delete_article(*, db: AsyncSession, article_id):
instance = await ArticleService.get_article(db=db, article_id=article_id)
logger.info(f"Deleting Article : {article_id}")
await db.delete(instance)
await db.commit()
return instance
class AuthorService:
"""
Service class for the Author model.
You should implement Author related methods in this class
"""
@staticmethod
async def get_author(*, db: AsyncSession, author_id):
instance = await db.get(models.Author, author_id)
if instance is None:
raise HTTPException(status_code=404, detail="Author does not exist")
return instance
@staticmethod
async def list_authors(*, db: AsyncSession):
result = await db.execute(select(models.Author))
authors = result.scalars().all()
return authors
@staticmethod
async def create_author(*, db: AsyncSession, first_name, last_name):
author_instance = models.Author(first_name=first_name, last_name=last_name) # type: ignore[call-arg]
db.add(author_instance)
await db.commit()
await db.refresh(author_instance)
return author_instance
@staticmethod
async def delete_author(*, db: AsyncSession, author_id):
instance = await AuthorService.get_author(db=db, author_id=author_id)
await db.delete(instance)
await db.commit()
return instance
These services may be extended later to implement some more complex features.
Pydantic schemas¶
In order to ease the serialization of our database models we will use pydantic
.
Pydantic documentation : https://docs.pydantic.dev/latest/concepts/models/
This library will allow us automatically serialize our SqlAlchemy
into JSON
and vice-versa.
Another benefit is the automatically generated OpenAPI
schema which provides us with a Swagger
testable documentation.
&> cat project/appschemas.py
from datetime import datetime
from pydantic import BaseModel
# Article Model
class ArticleBase(BaseModel):
title: str
content: str
author_id: int
class Article(ArticleBase):
"""
Database model for Article table
"""
id: int
class Config:
orm_mode = True
# Author Model
class AuthorBase(BaseModel):
first_name: str
last_name: str
class Author(AuthorBase):
"""
Database model for Author table
"""
id: int
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
ArticleBase
Schema define all writable elements of our model, it is not mapped to our SqlAlchemy Article model
( orm_mode = True
is absent ). Also it does not implement the created_at
and updated_at
fields as we do not want to
expose these fields to the writable APIs.
The Article
model inherits ArticleBase
and specifies that it is an SqlAlchemy mapped model with orm_mode = True
This mechanism allows us to reproduce the DjangoRestFramework read_only=True/False
system.
The same logic applies to the Author
models.
APIs¶
Finally we can implement the APIs
&> cat project/app/main.py
from fastapi import Depends, FastAPI, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
from . import schemas
from .database.db import engine
from .database.services import AuthorService, ArticleService
app = FastAPI()
async def get_db() -> AsyncSession:
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
yield session
# Authors
@app.get("/authors", response_model=list[schemas.Author], status_code=status.HTTP_200_OK)
async def list_authors(db: AsyncSession = Depends(get_db)):
"""
List authors
"""
authors = await AuthorService.list_authors(db=db)
return authors
@app.get("/authors/{author_id}", response_model=schemas.Author, status_code=status.HTTP_200_OK)
async def delete_author(author_id: int, db: AsyncSession = Depends(get_db)):
author = await AuthorService.get_author(db=db, author_id=author_id)
return author
@app.post("/authors", response_model=schemas.Author, status_code=status.HTTP_201_CREATED)
async def create_author(author: schemas.AuthorBase, db: AsyncSession = Depends(get_db)):
"""
Create an Author
"""
author_instance = await AuthorService.create_author(
db=db,
first_name=author.first_name,
last_name=author.last_name
)
return author_instance
@app.delete("/authors/{author_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_author(author_id: int, db: AsyncSession = Depends(get_db)):
await AuthorService.delete_author(db=db, author_id=author_id)
# Articles
@app.get("/articles", response_model=list[schemas.Article], status_code=status.HTTP_200_OK)
async def list_articles(db: AsyncSession = Depends(get_db)):
"""
List Articles
"""
articles = await ArticleService.list_articles(db=db)
return articles
@app.get("/articles/{article_id}", response_model=schemas.Article, status_code=status.HTTP_200_OK)
async def get_author(article_id: int, db: AsyncSession = Depends(get_db)):
author = await ArticleService.get_article(db=db, article_id=article_id)
return author
@app.post("/articles", response_model=schemas.Article, status_code=status.HTTP_201_CREATED)
async def create_article(article: schemas.ArticleBase, db: AsyncSession = Depends(get_db)):
"""
Create an article
"""
article_instance = await ArticleService.create_article(
db=db, title=article.title,
content=article.title,
author_id=article.author_id
)
return article_instance
@app.delete("/articles/{article_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_article(article_id: int, db: AsyncSession = Depends(get_db)):
await ArticleService.delete_article(db=db, article_id=article_id)
Now you should be able to run the API and visit http://localhost:8000/docs
&> uvicorn app.main:app --reload
Note that each api view is decorated with @app.method
and provide information about :
The HTTP method
@app.get
of the endpointThe route and url parameters
@app.get("/articles/{article_id}")
The response Model
response_model=schemas.Article
used for serializationThe HTTP status_code
status_code=status.HTTP_200_OK
This information will be used by FastAPI to generate the OpenAPI schema, validate and serialize input and output JSON data
Customizing the API documentation¶
Swagger customisation is located in a separate article
Testing with Pytest¶
The test part of this project is located in a separate article.
Read Fast API & PyTest
Conclusions¶
We implemented a basic API writing and reading a database, with schema migrations and data serialization.
FastAPI is a good way to develop Asynchronous APIs for micro services.