Build REST Apis
Problem
Building REST APIs requires request validation, proper error handling, documentation generation, and high performance. Traditional Flask approaches lack type safety and require manual validation logic.
Solution
1. Basic FastAPI Application
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, EmailStr, Field
from typing import List, Optional
app = FastAPI(
title="User API",
description="REST API for user management",
version="1.0.0"
)
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
age: Optional[int] = Field(None, ge=0, le=150)
class UserResponse(BaseModel):
id: int
username: str
email: str
age: Optional[int]
class Config:
from_attributes = True # Enable ORM mode
users_db = {}
user_id_counter = 1
@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
"""Create a new user."""
global user_id_counter
# Check for duplicate username
if any(u["username"] == user.username for u in users_db.values()):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
# Create user
user_data = user.dict()
user_data["id"] = user_id_counter
users_db[user_id_counter] = user_data
user_id_counter += 1
return user_data
@app.get("/users", response_model=List[UserResponse])
async def list_users(skip: int = 0, limit: int = 10):
"""List all users with pagination."""
users_list = list(users_db.values())
return users_list[skip : skip + limit]
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""Get user by ID."""
if user_id not in users_db:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return users_db[user_id]
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: UserCreate):
"""Update user by ID."""
if user_id not in users_db:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user_data = user.dict()
user_data["id"] = user_id
users_db[user_id] = user_data
return user_data
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int):
"""Delete user by ID."""
if user_id not in users_db:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
del users_db[user_id]2. Dependency Injection for Database Sessions
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from typing import AsyncGenerator
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Provide database session for request."""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@app.post("/users", response_model=UserResponse)
async def create_user(
user: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""Create user with database persistence."""
db_user = User(**user.dict())
db.add(db_user)
await db.flush()
await db.refresh(db_user)
return db_user
@app.get("/users", response_model=List[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 10,
db: AsyncSession = Depends(get_db)
):
"""List users from database."""
from sqlalchemy import select
stmt = select(User).offset(skip).limit(limit)
result = await db.execute(stmt)
return result.scalars().all()3. Authentication and Authorization
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
SECRET_KEY = "your-secret-key-keep-it-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""Verify JWT token and return current user."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# Fetch user from database
from sqlalchemy import select
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
@app.post("/auth/login", response_model=Token)
async def login(
username: str,
password: str,
db: AsyncSession = Depends(get_db)
):
"""Authenticate user and return JWT token."""
from sqlalchemy import select
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user or not verify_password(password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=UserResponse)
async def read_users_me(current_user: User = Depends(get_current_user)):
"""Get current authenticated user."""
return current_user4. Request Validation and Error Handling
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import validator
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8)
@validator('username')
def username_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('Username must be alphanumeric')
return v
@validator('password')
def password_strength(cls, v):
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain at least one uppercase letter')
return v
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle validation errors with custom response."""
errors = []
for error in exc.errors():
errors.append({
"field": ".".join(str(loc) for loc in error["loc"]),
"message": error["msg"],
"type": error["type"]
})
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": errors}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle unexpected errors."""
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": "Internal server error"}
)5. API Versioning and Documentation
from fastapi import APIRouter
router_v1 = APIRouter(prefix="/api/v1", tags=["v1"])
@router_v1.get("/users")
async def list_users_v1():
return {"version": "1.0", "users": []}
router_v2 = APIRouter(prefix="/api/v2", tags=["v2"])
@router_v2.get("/users")
async def list_users_v2(include_inactive: bool = False):
return {"version": "2.0", "users": [], "includes_inactive": include_inactive}
app.include_router(router_v1)
app.include_router(router_v2)
from fastapi.openapi.utils import get_openapi
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="User Management API",
version="2.0.0",
description="Complete REST API for user management with authentication",
routes=app.routes,
)
# Add custom fields
openapi_schema["info"]["contact"] = {
"name": "API Support",
"email": "support@example.com"
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi6. Background Tasks and Async Processing
from fastapi import BackgroundTasks
import smtplib
from email.message import EmailMessage
def send_email(email: str, message: str):
"""Send email in background."""
msg = EmailMessage()
msg.set_content(message)
msg["Subject"] = "User Registration"
msg["From"] = "noreply@example.com"
msg["To"] = email
# Send email (simplified)
# with smtplib.SMTP("localhost") as server:
# server.send_message(msg)
print(f"Sending email to {email}: {message}")
@app.post("/users", response_model=UserResponse)
async def create_user(
user: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""Create user and send welcome email in background."""
db_user = User(**user.dict())
db.add(db_user)
await db.flush()
# Add background task
background_tasks.add_task(
send_email,
user.email,
f"Welcome {user.username}!"
)
await db.refresh(db_user)
return db_userHow It Works
graph TD
A[HTTP Request] --> B[FastAPI Router]
B --> C{Path Matches?}
C -->|Yes| D[Run Dependencies]
D --> E[Validate Request with Pydantic]
E --> F{Valid?}
F -->|Yes| G[Execute Handler]
F -->|No| H[Return 422 Validation Error]
G --> I{Exception?}
I -->|Yes| J[Exception Handler]
I -->|No| K[Serialize Response]
K --> L[Return HTTP Response]
J --> L
style A fill:#0173B2
style E fill:#DE8F05
style G fill:#029E73
style L fill:#CC78BC
Request Lifecycle:
- Routing: FastAPI matches request path and method to handler
- Dependencies: Executes dependency functions (DB session, auth, etc.)
- Validation: Pydantic validates request body/query/path parameters
- Handler Execution: Async handler processes request
- Response Serialization: Pydantic serializes response model
- Documentation: Automatic OpenAPI schema generation
Variations
Rate Limiting Middleware
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import time
from collections import defaultdict
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.requests = defaultdict(list)
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host
now = time.time()
# Clean old requests
self.requests[client_ip] = [
req_time for req_time in self.requests[client_ip]
if now - req_time < 60
]
# Check rate limit
if len(self.requests[client_ip]) >= self.requests_per_minute:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded"
)
self.requests[client_ip].append(now)
response = await call_next(request)
return response
app.add_middleware(RateLimitMiddleware, requests_per_minute=100)CORS Configuration
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com"], # Specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)File Upload Handling
from fastapi import File, UploadFile
import shutil
from pathlib import Path
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
"""Handle file upload."""
upload_dir = Path("uploads")
upload_dir.mkdir(exist_ok=True)
file_path = upload_dir / file.filename
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {
"filename": file.filename,
"content_type": file.content_type,
"size": file_path.stat().st_size
}Common Pitfalls
1. Blocking Operations in Async Handlers
Problem: Blocking calls freeze the event loop.
@app.get("/slow")
async def slow_endpoint():
time.sleep(5) # Blocks event loop!
return {"status": "done"}
@app.get("/fast")
async def fast_endpoint():
await asyncio.sleep(5) # Non-blocking
return {"status": "done"}
from concurrent.futures import ThreadPoolExecutor
import asyncio
executor = ThreadPoolExecutor()
@app.get("/blocking")
async def blocking_endpoint():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, blocking_function)
return {"result": result}2. Missing Response Model Validation
Problem: Returning data that doesn’t match schema.
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return users_db.get(user_id) # Could return None!
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
user = users_db.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user # Validated against UserResponse3. Not Handling Database Transactions Properly
Problem: Uncommitted changes or resource leaks.
@app.post("/users")
async def create_user(user: UserCreate):
session = AsyncSessionLocal()
db_user = User(**user.dict())
session.add(db_user)
# Forgot to commit! Changes lost
return db_user
@app.post("/users", response_model=UserResponse)
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
db_user = User(**user.dict())
db.add(db_user)
# Dependency handles commit/rollback
return db_user4. Exposing Internal Models Directly
Problem: Database models leak implementation details.
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
return await db.get(User, user_id) # Exposes all fields, relations
class UserResponse(BaseModel):
id: int
username: str
email: str
# password_hash excluded!
class Config:
from_attributes = True
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
user = await db.get(User, user_id)
return user # Pydantic filters fields5. Ignoring Security Best Practices
Problem: Vulnerable to common attacks.
SECRET_KEY = "password123"
@app.get("/search")
async def search(query: str):
# SQL injection risk!
return await db.execute(f"SELECT * FROM users WHERE name = '{query}'")
import os
from sqlalchemy import select
SECRET_KEY = os.getenv("SECRET_KEY")
@app.get("/search")
async def search(query: str, db: AsyncSession = Depends(get_db)):
stmt = select(User).where(User.name == query) # Parameterized
result = await db.execute(stmt)
return result.scalars().all()Related Patterns
Related Tutorial: See Intermediate Tutorial - Web Development. Related How-To: See Use Advanced Async Patterns, Work with Databases. Related Cookbook: See Cookbook recipe “REST API Patterns”.
Last updated