Intermediate
Group 11: Advanced Dependency Injection
Example 28: Class-Based Dependencies
Dependencies can be classes, not just functions. Class-based dependencies use __call__ to make instances callable, enabling stateful dependencies that initialize resources once and reuse them across requests.
# main.py - Class-based dependencies for stateful resource sharing
from typing import Annotated
from fastapi import FastAPI, Depends, HTTPException
app = FastAPI()
class Paginator: # => Class-based dependency
def __init__(self, max_limit: int = 100):
self.max_limit = max_limit # => Configuration stored on instance
# => Injected once at dependency registration time
def __call__( # => __call__ makes instance behave like function
self,
skip: int = 0,
limit: int = 20,
) -> dict:
if limit > self.max_limit: # => Enforce maximum page size
raise HTTPException(
status_code=400,
detail=f"limit cannot exceed {self.max_limit}",
)
return {"skip": skip, "limit": min(limit, self.max_limit)}
# => Returns pagination dict for handler
paginate = Paginator(max_limit=50) # => Create instance with custom configuration
# => This instance is reused across requests
class RateLimiter: # => Stateful class tracking request counts
def __init__(self, max_requests: int = 10):
self.max_requests = max_requests
self.requests: dict[str, int] = {}
# => In-memory counter per client IP
# => Production: use Redis for distributed limiting
def __call__(self, request) -> None:
# => request: fastapi.Request (type hint omitted for brevity)
from fastapi import Request
client_ip = "127.0.0.1" # => Simplified; use request.client.host in real code
count = self.requests.get(client_ip, 0)
if count >= self.max_requests:
raise HTTPException(status_code=429, detail="Too many requests")
# => 429 Too Many Requests
self.requests[client_ip] = count + 1
rate_limit = RateLimiter(max_requests=5)
@app.get("/items")
def list_items(
pagination: Annotated[dict, Depends(paginate)],
# => paginate instance is callable; Depends calls it
_: Annotated[None, Depends(rate_limit)],
# => rate_limit instance checks and increments counter
):
return {"items": [], **pagination}Key Takeaway: Class instances with __call__ work as dependencies. Initialize class-level state (configuration, counters) once; __call__ runs per request with shared state.
Why It Matters: Class-based dependencies enable the stateful patterns that function-based dependencies cannot express cleanly. A connection pool, a loaded ML model, or a Redis client initialized once at startup and shared across thousands of requests requires class-level state. Without class-based dependencies, these resources either become module-level globals (hard to test) or are reconstructed per-request (wasteful). FastAPI’s Depends() system treats instances identically to functions, making stateful and stateless dependencies interchangeable at the usage site.
Example 29: Nested Dependencies and Dependency Scoping
Dependencies can themselves declare dependencies, creating chains. FastAPI resolves the full dependency tree, caches each dependency result within a single request, and ensures teardown order is correct.
# main.py - Nested dependencies with request-scoped caching
from typing import Annotated, Generator
from fastapi import FastAPI, Depends
app = FastAPI()
# Simulated database connection class
class DBConnection:
def __init__(self, conn_str: str):
self.conn_str = conn_str
self.closed = False
print(f"[DB] Opened connection to {conn_str}")
# => Printed once per request, not per dependency use
def close(self):
self.closed = True
print(f"[DB] Closed connection to {conn_str}")
# Level 1 dependency: database connection
def get_db() -> Generator[DBConnection, None, None]:
# => Generator dependency enables setup + teardown
db = DBConnection("postgresql://localhost/mydb")
# => Setup: runs before yielding
try:
yield db # => Yield the resource; FastAPI injects what is yielded
finally:
db.close() # => Teardown: runs after handler returns
# => Guaranteed even if handler raises an exception
# Level 2 dependency: uses db connection
def get_user_repo(
db: Annotated[DBConnection, Depends(get_db)],
# => Declares dependency on get_db
# => get_db is called once; result cached for this request
) -> dict:
return {"repo": "UserRepo", "db": db.conn_str}
# => UserRepo wraps the db connection
def get_order_repo(
db: Annotated[DBConnection, Depends(get_db)],
# => get_db is the SAME dependency as above
# => FastAPI caches get_db result: same db object injected
# => Connection opened ONCE, not twice
) -> dict:
return {"repo": "OrderRepo", "db": db.conn_str}
# Level 3 handler: uses both repos
@app.get("/orders/user/{user_id}")
def user_orders(
user_id: int,
user_repo: Annotated[dict, Depends(get_user_repo)],
order_repo: Annotated[dict, Depends(get_order_repo)],
# => Both repos declared; get_db called once total
):
return {
"user_id": user_id,
"user_repo": user_repo["repo"],
"order_repo": order_repo["repo"],
"same_db": user_repo["db"] == order_repo["db"],
# => True: same connection object injected to both
}Key Takeaway: FastAPI caches dependency results within a single request. A dependency used by multiple sub-dependencies is called once per request, preventing duplicate resource creation.
Why It Matters: Database connection sharing within a single request is critical for transactional consistency. If get_user_repo and get_order_repo each opened their own database connection, updates to users and orders would run in separate transactions—breaking atomicity for operations that must succeed or fail together. FastAPI’s dependency caching ensures the same connection (and transaction) flows through the entire request without explicit plumbing code in handlers.
Group 12: Security and Authentication
Example 30: OAuth2 Password Flow - Login Endpoint
FastAPI’s OAuth2PasswordBearer and OAuth2PasswordRequestForm implement the OAuth2 Password grant type—the standard pattern for username/password authentication in API-first applications. This example shows the login endpoint that issues tokens.
# main.py - OAuth2 Password flow login endpoint
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
import hashlib
app = FastAPI()
# Simulated user database
fake_users_db = {
"alice": {
"username": "alice",
"hashed_password": hashlib.sha256("secret".encode()).hexdigest(),
# => In production: use bcrypt via passlib
# => hashlib.sha256 is INSECURE for passwords
"email": "alice@example.com",
"disabled": False,
}
}
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# => Declares that tokens come from POST /token
# => Adds "Authorize" button to Swagger UI
# => tokenUrl is the endpoint that issues tokens
class Token(BaseModel): # => OAuth2 token response model
access_token: str
token_type: str # => Always "bearer" for OAuth2 Bearer tokens
def verify_password(plain: str, hashed: str) -> bool:
return hashlib.sha256(plain.encode()).hexdigest() == hashed
# => Check if plain password matches hash
# => Use passlib.context.verify() in production
@app.post("/token", response_model=Token)
# => Standard OAuth2 token endpoint at POST /token
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
# => OAuth2PasswordRequestForm reads username/password
# => from application/x-www-form-urlencoded body
# => Fields: form_data.username, form_data.password
):
user = fake_users_db.get(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
# => WWW-Authenticate header required by OAuth2 spec
)
# In production: generate real JWT token here
access_token = f"fake-token-for-{user['username']}"
# => Real code: create_access_token(data={"sub": username})
return Token(access_token=access_token, token_type="bearer")
# => Response: {"access_token": "...", "token_type": "bearer"}Key Takeaway: Use OAuth2PasswordRequestForm to read credentials from form data, verify them against stored hashes, and return an access_token with token_type: "bearer".
Why It Matters: Implementing OAuth2 Password flow with OAuth2PasswordRequestForm ensures your token endpoint is compatible with the OAuth2 specification—meaning any OAuth2 client library in any language can authenticate against your API without custom code. FastAPI’s Swagger UI automatically shows an “Authorize” dialog when OAuth2PasswordBearer is declared, enabling immediate manual testing of authenticated endpoints during development, replacing the manual curl token-fetching step that slows down development iteration.
Example 31: JWT Token Creation and Verification
JSON Web Tokens (JWT) encode user identity and claims in a signed, self-contained token. This example shows creating tokens during login and verifying them on protected endpoints using python-jose.
# main.py - JWT token creation and verification
# pip install "python-jose[cryptography]" passlib[bcrypt]
from datetime import datetime, timedelta, timezone
from typing import Annotated, Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt # => python-jose library for JWT operations
from pydantic import BaseModel
app = FastAPI()
SECRET_KEY = "your-256-bit-secret-key-change-in-production"
# => NEVER hardcode in production; use env var
ALGORITHM = "HS256" # => HMAC-SHA256 signing algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # => Token valid for 30 minutes
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class TokenData(BaseModel): # => Decoded token payload model
username: Optional[str] = None
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy() # => Copy to avoid mutating the original dict
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
# => Default 15 minutes if not specified
to_encode.update({"exp": expire}) # => Add expiration claim to payload
# => JWT spec: "exp" is Unix timestamp
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# => Signs payload with SECRET_KEY
# => Returns base64url-encoded JWT string
return encoded_jwt # => "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
# => oauth2_scheme extracts Bearer token from header
# => Authorization: Bearer eyJhbGciO...
) -> str:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# => Verifies signature AND checks expiration
# => Raises JWTError if invalid or expired
username: Optional[str] = payload.get("sub")
# => "sub" (subject) claim contains the user identifier
if username is None:
raise credentials_exception
return username # => Return verified username string
except JWTError: # => Invalid signature, expired token, malformed JWT
raise credentials_exception
@app.get("/users/me")
async def get_me(
current_user: Annotated[str, Depends(get_current_user)],
# => get_current_user runs before handler
# => Returns username if token valid, 401 if not
):
return {"username": current_user} # => Only reached with valid JWTKey Takeaway: Create JWTs with jwt.encode() including an exp claim, and verify them with jwt.decode(). Wrap verification in a get_current_user dependency that protects endpoints.
Why It Matters: JWTs enable stateless authentication that scales horizontally without shared session storage. Any server in a cluster can verify a JWT using the shared SECRET_KEY without querying a central session store—eliminating a distributed system bottleneck. The exp claim provides automatic token expiry without server-side session management. However, JWTs cannot be revoked before expiry without additional infrastructure (token blacklist), making short expiry windows (15-30 minutes) with refresh token rotation the recommended production pattern.
Example 32: Role-Based Access Control
Extend the JWT authentication pattern to include user roles in the token payload. Create separate dependencies for different permission levels and use them to protect routes based on user roles.
# main.py - Role-based access control with JWT claims
from typing import Annotated, List
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
app = FastAPI()
SECRET_KEY = "change-me-in-production"
ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
roles: List[str] # => List of role strings: ["user", "admin"]
def decode_token(token: str) -> User:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
roles = payload.get("roles", [])
# => roles claim: ["user"] or ["user", "admin"]
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return User(username=username, roles=roles)
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
) -> User:
return decode_token(token) # => Returns User model with username and roles
def require_role(*required_roles: str):
# => Factory function returning a dependency
# => Call with: Depends(require_role("admin"))
def role_checker(
user: Annotated[User, Depends(get_current_user)],
) -> User:
for role in required_roles: # => Check if user has ANY of the required roles
if role in user.roles:
return user # => Role found: allow access, return user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
# => 403 Forbidden: authenticated but not authorized
detail=f"Requires role(s): {', '.join(required_roles)}",
)
return role_checker # => Return the checker function as the dependency
@app.get("/profile")
def user_profile(
user: Annotated[User, Depends(get_current_user)],
# => Requires any authenticated user
):
return {"username": user.username, "roles": user.roles}
@app.delete("/admin/users/{user_id}")
def admin_delete_user(
user_id: int,
admin: Annotated[User, Depends(require_role("admin"))],
# => Requires "admin" role; 403 otherwise
):
return {"deleted": user_id, "by": admin.username}
@app.get("/reports")
def view_reports(
user: Annotated[User, Depends(require_role("admin", "analyst"))],
# => Requires "admin" OR "analyst" role
):
return {"reports": [], "viewer": user.username}Key Takeaway: Embed roles in JWT claims and create dependency factories with require_role(*roles) to enforce role-based access. The factory pattern avoids duplicating role-check logic.
Why It Matters: Role-based access control implemented as dependencies composes cleanly with FastAPI’s declarative route definitions. Adding an “admin” requirement to an existing endpoint is a one-line change at the route level, not a modification of business logic. The factory pattern require_role("admin") reads like natural language at the point of use, making access control decisions visible during code review without requiring reviewers to understand the underlying dependency chain implementation.
Group 13: Database Integration with SQLAlchemy
Example 33: Async SQLAlchemy Setup
SQLAlchemy 2.0 with asyncio provides non-blocking database operations. This example shows the engine, session factory, and base model setup needed for FastAPI async database integration.
# database.py - Async SQLAlchemy configuration
# pip install "sqlalchemy[asyncio]" asyncpg
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from typing import AsyncGenerator
# Async database URL (note: postgresql+asyncpg://)
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# => asyncpg is the async PostgreSQL driver
# => Not psycopg2 (that's blocking)
engine = create_async_engine(
DATABASE_URL,
pool_size=10, # => Number of connections to maintain in pool
max_overflow=20, # => Additional connections when pool is full
echo=False, # => Set True to log all SQL statements
# => Useful for debugging; disable in production
)
# => engine manages the connection pool lifecycle
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession, # => Use AsyncSession (not regular Session)
expire_on_commit=False, # => Don't expire objects after commit
# => Prevents lazy-load errors after session closes
)
class Base(DeclarativeBase): # => Declarative base for all models
pass # => All models inherit from Base
# Dependency to inject database session
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
# => Create session using async context manager
# => Session opened before yield
try:
yield session # => Inject session into route handler
except Exception:
await session.rollback() # => Roll back on any unhandled exception
raise # => Re-raise after rollback
finally:
await session.close() # => Always close session (context manager handles this)
# models.py - SQLAlchemy async model
from sqlalchemy import String, Float, Integer
from sqlalchemy.orm import Mapped, mapped_column
class Item(Base): # => ORM model inheriting from Base
__tablename__ = "items" # => Database table name
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
# => Primary key with B-tree index
name: Mapped[str] = mapped_column(String(100), nullable=False)
# => VARCHAR(100) NOT NULL column
price: Mapped[float] = mapped_column(Float, nullable=False)
description: Mapped[str | None] = mapped_column(String(500), nullable=True)
# => Nullable VARCHAR(500)Key Takeaway: Use create_async_engine with asyncpg driver and AsyncSession for non-blocking database operations. The get_db generator dependency manages session lifecycle per request.
Why It Matters: Async database access is the difference between 1,000 and 10,000 concurrent connections on the same hardware. A blocking psycopg2 query freezes the event loop for the duration of the database round trip—typically 1-50ms—during which no other requests are handled. Async SQLAlchemy with asyncpg yields the event loop during I/O waits, allowing thousands of concurrent requests to interleave database operations. For read-heavy APIs with modest compute requirements, this single change can eliminate the need for additional server instances.
Example 34: CRUD Operations with Async SQLAlchemy
Implement Create, Read, Update, and Delete operations using async SQLAlchemy session methods. These patterns compose with FastAPI’s dependency injection to create clean, testable data access layers.
# main.py - CRUD operations with async SQLAlchemy
from typing import Annotated, List, Optional
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from pydantic import BaseModel
# Assume engine, AsyncSessionLocal, Base, get_db, Item (model) from previous example
app = FastAPI()
class ItemCreate(BaseModel): # => Request body for creating items
name: str
price: float
description: Optional[str] = None
class ItemUpdate(BaseModel): # => Request body for updating items (all optional)
name: Optional[str] = None
price: Optional[float] = None
description: Optional[str] = None
class ItemResponse(BaseModel): # => Response model with database ID
id: int
name: str
price: float
description: Optional[str]
class Config:
from_attributes = True # => Enable ORM mode: read from SQLAlchemy model attrs
# => Allows Pydantic to read Item.id, Item.name etc.
@app.post("/items", response_model=ItemResponse)
async def create_item(
item_data: ItemCreate,
db: Annotated[AsyncSession, Depends(get_db)],
# => Inject async session from get_db dependency
):
db_item = Item(**item_data.model_dump())
# => Create ORM instance from Pydantic model dict
db.add(db_item) # => Stage item for insertion (not yet in DB)
await db.commit() # => Execute INSERT, commit transaction
await db.refresh(db_item) # => Reload to get DB-generated values (id, created_at)
return db_item # => Return ORM instance; response_model converts it
@app.get("/items/{item_id}", response_model=ItemResponse)
async def get_item(item_id: int, db: Annotated[AsyncSession, Depends(get_db)]):
result = await db.execute(select(Item).where(Item.id == item_id))
# => select(Item).where(...) builds SQL: SELECT * FROM items WHERE id = ?
item = result.scalar_one_or_none()
# => scalar_one_or_none: returns Item or None
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.get("/items", response_model=List[ItemResponse])
async def list_items(
skip: int = 0,
limit: int = 20,
db: Annotated[AsyncSession, Depends(get_db)] = None,
):
result = await db.execute(select(Item).offset(skip).limit(limit))
# => .offset(skip) = SQL OFFSET; .limit(limit) = SQL LIMIT
return result.scalars().all() # => scalars() extracts Item objects; all() returns list
@app.delete("/items/{item_id}", status_code=204)
async def delete_item(item_id: int, db: Annotated[AsyncSession, Depends(get_db)]):
result = await db.execute(delete(Item).where(Item.id == item_id))
# => Executes: DELETE FROM items WHERE id = ?
if result.rowcount == 0: # => rowcount: number of rows affected
raise HTTPException(status_code=404, detail="Item not found")
await db.commit() # => Commit the DELETE transactionKey Takeaway: Use SQLAlchemy 2.0 select(), update(), delete() statement builders with await db.execute(). Call await db.commit() after write operations and await db.refresh() to reload generated values.
Why It Matters: Async SQLAlchemy CRUD patterns enable database-backed APIs to handle thousands of concurrent requests without thread pool exhaustion. The repository pattern (separating CRUD functions from route handlers) makes the code independently testable—unit tests can stub the database session, integration tests use a real database. SQLAlchemy 2.0’s select() builder API is type-safe and composable, making complex queries with joins, filters, and ordering readable and IDE-navigable.
Example 35: SQLAlchemy Relationships and Joins
SQLAlchemy relationships enable navigating between related models. This example shows one-to-many relationships, eager loading with selectinload, and joining related data in queries.
# main.py - SQLAlchemy relationships and eager loading
from typing import List
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
from pydantic import BaseModel
from database import Base, get_db # => From previous example
app = FastAPI()
class Author(Base):
__tablename__ = "authors"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column()
books: Mapped[List["Book"]] = relationship(
"Book",
back_populates="author", # => Bidirectional relationship
lazy="select", # => Default: lazy load (don't load unless accessed)
# => With async, always use eager loading instead
)
class Book(Base):
__tablename__ = "books"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column()
author_id: Mapped[int] = mapped_column(ForeignKey("authors.id"))
# => Foreign key referencing authors.id
author: Mapped["Author"] = relationship("Author", back_populates="books")
# => Many-to-one: each book has one author
class BookResponse(BaseModel):
id: int
title: str
author_name: str
class Config:
from_attributes = True
class AuthorWithBooks(BaseModel):
id: int
name: str
books: List[dict] # => Include related books in response
class Config:
from_attributes = True
@app.get("/authors/{author_id}/with-books")
async def get_author_with_books(
author_id: int,
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(Author)
.where(Author.id == author_id)
.options(selectinload(Author.books))
# => selectinload: eager loading for async
# => Issues a second SELECT for books where author_id = ?
# => NOT a JOIN; avoids N+1 for large result sets
)
author = result.scalar_one_or_none()
if author is None:
return {"error": "Author not found"}
return {
"id": author.id,
"name": author.name,
"books": [{"id": b.id, "title": b.title} for b in author.books],
# => author.books is already loaded (no lazy load)
}Key Takeaway: Use selectinload() for eager loading relationships in async SQLAlchemy. It issues a separate SELECT for related records rather than lazy loading, which is incompatible with async sessions.
Why It Matters: Lazy loading—the default in SQLAlchemy—triggers additional SQL queries when relationship attributes are accessed. In an async context, lazy loading is impossible without a running event loop at access time, causing MissingGreenlet errors that crash endpoints. selectinload solves this by loading relationships eagerly in a separate query before the session closes. Understanding eager vs lazy loading prevents the most common async SQLAlchemy production bug: a 200 response in development that becomes a 500 crash under production load.
Group 14: WebSockets
Example 36: WebSocket Basic Connection
FastAPI supports WebSocket connections through @app.websocket() decorated handlers. WebSocket routes use an async def function that receives a WebSocket object and must call accept() before communicating.
# main.py - Basic WebSocket echo server
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
# Simple HTML client for testing WebSocket without extra tools
html = """
<!DOCTYPE html>
<html>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) { console.log("Received:", event.data); };
function send(msg) { ws.send(msg); }
</script>
<button onclick="send('Hello Server')">Send</button>
</html>
"""
@app.get("/")
async def home():
return HTMLResponse(html) # => Serve HTML client for testing
@app.websocket("/ws") # => WebSocket route at ws://host/ws
async def websocket_endpoint(websocket: WebSocket):
# => websocket: WebSocket object for this connection
await websocket.accept() # => Must call accept() to establish WebSocket handshake
# => Without accept(), connection is rejected
try:
while True: # => Keep connection open; loop until disconnect
data = await websocket.receive_text()
# => Blocks until client sends a text message
# => data is the string the client sent
# => Also: receive_bytes(), receive_json()
print(f"Received: {data}")
await websocket.send_text(f"Echo: {data}")
# => Send response back to same client
# => Also: send_bytes(), send_json()
except WebSocketDisconnect: # => Raised when client closes connection
print("Client disconnected") # => Normal disconnect; no need to re-raise
# => Connection is already closed at this pointKey Takeaway: Decorate with @app.websocket(), call await websocket.accept(), then loop with receive_text() and send_text(). Catch WebSocketDisconnect for graceful cleanup on client disconnect.
Why It Matters: WebSockets enable real-time features that polling cannot achieve efficiently: live collaboration, push notifications, gaming state synchronization, and financial data streaming. An API that pushes stock price updates via WebSocket uses 100x fewer server resources than one where 10,000 clients poll every second, because each WebSocket connection is idle (no CPU, minimal memory) between messages. FastAPI’s async WebSocket support handles thousands of concurrent connections on a single server that would require a cluster with polling architectures.
Example 37: WebSocket Connection Manager for Broadcasting
Real applications need to broadcast messages to multiple connected clients. A ConnectionManager class tracks active connections and provides broadcast methods—a pattern for chat rooms, live dashboards, and collaborative editing.
# main.py - WebSocket connection manager for broadcasting
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
class ConnectionManager: # => Manages multiple WebSocket connections
def __init__(self):
self.active_connections: List[WebSocket] = []
# => List of all currently connected clients
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept() # => Accept the WebSocket handshake
self.active_connections.append(websocket)
# => Add to active connections list
print(f"Client connected. Total: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket) -> None:
self.active_connections.remove(websocket)
# => Remove from list on disconnect
print(f"Client disconnected. Total: {len(self.active_connections)}")
async def send_personal(self, message: str, websocket: WebSocket) -> None:
await websocket.send_text(message)
# => Send to one specific client only
async def broadcast(self, message: str) -> None:
for connection in self.active_connections:
# => Send same message to ALL connected clients
await connection.send_text(message)
# => Each send is async; clients receive in order
manager = ConnectionManager() # => Single instance shared across all connections
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# => Wait for message from this client
await manager.send_personal(
f"[You] {data}", websocket,
) # => Echo back to sender labeled "[You]"
await manager.broadcast(
f"[Client {client_id}] {data}",
) # => Broadcast to ALL connected clients
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"[System] Client {client_id} left the chat")
# => Notify remaining clients of departureKey Takeaway: A ConnectionManager class encapsulates the list of active connections and provides connect(), disconnect(), and broadcast() methods. One shared instance routes messages between all connected clients.
Why It Matters: The connection manager pattern is the foundation of all real-time collaborative features. Chat applications, multiplayer games, live document editing, and team dashboards all require broadcasting state changes to multiple subscribers. This in-process implementation is sufficient for single-server deployments. Production systems with multiple server instances need a distributed pub/sub backbone (Redis Pub/Sub or Kafka) behind the connection manager, with each server instance broadcasting to only its locally connected clients.
Group 15: Testing
Example 38: Testing with TestClient
FastAPI’s TestClient wraps an HTTPX client to make test requests against your application without starting a real server. Tests can be synchronous even for async applications.
# test_main.py - Testing FastAPI endpoints with TestClient
# pip install httpx pytest
from fastapi import FastAPI, Depends, HTTPException
from fastapi.testclient import TestClient
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class Item(BaseModel):
name: str
price: float
description: Optional[str] = None
items_db: dict[int, Item] = {
1: Item(name="Widget", price=9.99),
}
@app.get("/items/{item_id}")
def get_item(item_id: int):
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
return items_db[item_id]
@app.post("/items", status_code=201)
def create_item(item: Item):
new_id = max(items_db.keys()) + 1 if items_db else 1
items_db[new_id] = item
return {"id": new_id, **item.model_dump()}
# --- Tests below ---
client = TestClient(app) # => Create test client bound to app
# => No real server; requests handled in-process
def test_get_existing_item():
response = client.get("/items/1") # => HTTP GET request to /items/1
assert response.status_code == 200 # => Check status code
data = response.json() # => Parse JSON response body
assert data["name"] == "Widget" # => Check specific field value
assert data["price"] == 9.99
def test_get_nonexistent_item():
response = client.get("/items/999")
assert response.status_code == 404 # => Expect 404 for missing item
assert response.json()["detail"] == "Item not found"
# => Check error message
def test_create_item():
payload = {"name": "Gadget", "price": 49.99}
response = client.post("/items", json=payload)
# => json= sends Content-Type: application/json
assert response.status_code == 201 # => Expect 201 Created
data = response.json()
assert data["name"] == "Gadget"
assert "id" in data # => Check that ID was assigned
def test_create_item_validation_error():
response = client.post("/items", json={"name": "Bad"})
# => Missing required "price" field
assert response.status_code == 422 # => Expect 422 Unprocessable Entity
errors = response.json()["detail"]
assert any(e["loc"] == ["body", "price"] for e in errors)
# => Check that price field is in errorsKey Takeaway: Use TestClient(app) to make in-process HTTP requests in synchronous tests. It supports the full HTTP API: .get(), .post(), .put(), .delete() with json=, headers=, and params= arguments.
Why It Matters: FastAPI’s TestClient enables black-box API testing without network overhead or server startup time, making tests run in milliseconds. Tests validate the full request-response cycle including middleware, dependencies, and response model filtering—not just isolated handler functions. When CI runs 500 API tests in 10 seconds instead of 2 minutes, developers run the full suite on every commit rather than deferring testing to CI, catching regressions before they reach code review.
Example 39: Dependency Override in Tests
Override dependencies in tests to replace database sessions, authentication checks, and external services with test doubles. This creates isolated, deterministic tests without real databases or network calls.
# test_auth.py - Overriding dependencies for isolated testing
from fastapi import FastAPI, Depends, HTTPException
from fastapi.testclient import TestClient
from typing import Annotated
app = FastAPI()
# --- Application code ---
def get_current_user() -> dict: # => Production: verifies JWT from request header
# In production: decode JWT, verify, return user
raise HTTPException(status_code=401, detail="Not authenticated")
# => This is never called in tests (overridden)
def get_db(): # => Production: returns real database session
raise RuntimeError("Real DB not available in tests")
# => This is never called in tests (overridden)
@app.get("/profile")
def get_profile(
user: Annotated[dict, Depends(get_current_user)],
db = Depends(get_db),
):
return {"username": user["username"], "email": user["email"]}
# --- Test code ---
def fake_current_user() -> dict: # => Test double for get_current_user
return {"username": "testuser", "email": "test@example.com"}
# => Returns hardcoded user without auth logic
class FakeDB: # => Test double for database session
def query(self): return [] # => Returns empty results
def fake_get_db(): # => Test double for get_db
return FakeDB() # => Returns in-memory fake, no real DB needed
# Override dependencies for all tests using this client
app.dependency_overrides[get_current_user] = fake_current_user
# => Replace get_current_user with fake_current_user
# => Every Depends(get_current_user) now calls fake
app.dependency_overrides[get_db] = fake_get_db
# => Replace get_db with fake_get_db
client = TestClient(app)
def test_profile_authenticated():
response = client.get("/profile") # => No auth header needed; fake_current_user runs
assert response.status_code == 200
data = response.json()
assert data["username"] == "testuser"
# => Gets testuser from fake dependency
def cleanup():
app.dependency_overrides.clear() # => Remove all overrides after tests
# => Restore original dependenciesKey Takeaway: Use app.dependency_overrides[original_dep] = fake_dep to replace dependencies in tests. Clear overrides after tests with app.dependency_overrides.clear().
Why It Matters: Dependency overrides enable true unit testing of HTTP handlers without real infrastructure. A test suite that requires a running PostgreSQL database cannot run in CI without Docker, slowing feedback loops from seconds to minutes. With dependency overrides, the handler’s request-parsing and response-formatting logic is tested in isolation, while a separate integration test validates the database interaction. This separation lets developers run fast unit tests locally and reserve slow integration tests for CI, maintaining developer velocity without sacrificing coverage.
Group 16: Application Lifecycle and Configuration
Example 40: Application Startup and Shutdown Events
FastAPI supports lifecycle hooks via @app.on_event("startup") (deprecated in FastAPI 0.93+) and the recommended lifespan context manager. Use lifecycle events to initialize and clean up resources: database connections, ML models, caches.
# main.py - Application lifecycle with lifespan context manager
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
# Simulated resources
class MLModel:
def __init__(self):
self.loaded = False
def load(self):
self.loaded = True
print("[Startup] ML model loaded")
def predict(self, data: str) -> str:
return f"prediction for: {data}"
def unload(self):
self.loaded = False
print("[Shutdown] ML model unloaded")
ml_model = MLModel() # => Module-level model instance
class DatabasePool:
def __init__(self):
self.pool = None
async def connect(self):
await asyncio.sleep(0.01) # => Simulate async pool initialization
self.pool = "connected"
print("[Startup] Database pool initialized")
async def close(self):
await asyncio.sleep(0.01) # => Simulate async pool teardown
self.pool = None
print("[Shutdown] Database pool closed")
db_pool = DatabasePool()
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- Startup: code runs before app accepts requests ---
print("[Startup] Starting application...")
ml_model.load() # => Sync startup task
await db_pool.connect() # => Async startup task
print("[Startup] Application ready")
yield # => App runs here; yields control to FastAPI
# --- Shutdown: code runs after last request, before process exits ---
print("[Shutdown] Shutting down...")
await db_pool.close() # => Async cleanup
ml_model.unload() # => Sync cleanup
print("[Shutdown] Goodbye")
app = FastAPI(lifespan=lifespan) # => Pass lifespan to FastAPI constructor
# => FastAPI calls it during startup and shutdown
@app.get("/predict")
def predict(data: str):
if not ml_model.loaded:
return {"error": "Model not ready"}
return {"result": ml_model.predict(data)}
# => Model guaranteed loaded because startup ran
@app.get("/db-status")
def db_status():
return {"connected": db_pool.pool is not None}
# => Pool guaranteed initializedKey Takeaway: Use the lifespan context manager (recommended in FastAPI 0.93+) for startup and shutdown logic. Code before yield runs at startup; code after yield runs at shutdown.
Why It Matters: Proper lifecycle management prevents the most common production startup failures: endpoints receiving requests before the database pool is ready, or ML models responding with errors while still loading from disk. The lifespan pattern guarantees startup completes before traffic arrives and cleanup runs before the process exits—preventing connection leaks that accumulate when Kubernetes restarts pods frequently. Kubernetes liveness and readiness probes depend on this: the app should not become “ready” until all startup tasks complete successfully.
Example 41: Configuration Management with Pydantic Settings
Use pydantic-settings to load configuration from environment variables and .env files with type validation. This pattern centralizes configuration, documents required variables, and prevents runtime crashes from missing or malformed config.
# config.py - Application configuration with pydantic-settings
# pip install pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field, AnyHttpUrl
from typing import List
from functools import lru_cache
class Settings(BaseSettings): # => BaseSettings reads from environment variables
model_config = SettingsConfigDict(
env_file=".env", # => Load from .env file if present
env_file_encoding="utf-8",
case_sensitive=False, # => DB_HOST and db_host both work
)
# Application settings
app_name: str = "My FastAPI App" # => Default value; override with APP_NAME env var
debug: bool = False # => APP_DEBUG=true enables debug mode
secret_key: str = Field( # => Required: SECRET_KEY env var must be set
..., # => No default = raises error if not set
min_length=32, # => Must be at least 32 chars for security
)
# Database settings
database_url: str = Field( # => DATABASE_URL env var
default="postgresql+asyncpg://user:pass@localhost/db",
)
db_pool_size: int = 10 # => DB_POOL_SIZE env var; default 10
# CORS settings
allowed_origins: List[str] = ["http://localhost:3000"]
# => ALLOWED_ORIGINS env var: JSON array string
# => ALLOWED_ORIGINS='["https://myapp.com"]'
# JWT settings
jwt_secret_key: str = Field(..., min_length=32)
# => JWT_SECRET_KEY env var; required
jwt_expire_minutes: int = 30 # => JWT_EXPIRE_MINUTES; default 30
@lru_cache() # => Cache Settings instance; reads env vars once
def get_settings() -> Settings:
return Settings() # => Reads from environment + .env file
# => lru_cache ensures single instance per process
# main.py - Using settings in application
from fastapi import FastAPI, Depends
from typing import Annotated
app = FastAPI()
@app.get("/config-info")
def config_info(
settings: Annotated[Settings, Depends(get_settings)],
# => Depends(get_settings) injects Settings instance
# => lru_cache means same Settings object every time
):
return {
"app_name": settings.app_name,
"debug": settings.debug,
"db_pool_size": settings.db_pool_size,
# Never return secret_key or jwt_secret_key!
}
# .env file content:
# SECRET_KEY=my-very-long-secret-key-at-least-32-chars
# JWT_SECRET_KEY=another-very-long-secret-key-here
# DATABASE_URL=postgresql+asyncpg://alice:pass@localhost/prod
# APP_NAME=Production APIKey Takeaway: BaseSettings reads environment variables and .env files with full Pydantic validation. Use @lru_cache() on the factory function to create one Settings instance per process.
Why It Matters: Configuration validation at startup catches misconfiguration before it causes runtime failures. An application that starts successfully but crashes at the first database query (because DATABASE_URL was misspelled in the deployment manifest) wastes valuable incident response time. Pydantic Settings with required fields (using ... as default) makes misconfiguration a hard startup failure—immediately visible in deployment logs—rather than a runtime 500 error that triggers 3 AM alerts. The @lru_cache() pattern prevents repeated environment variable reads, which can be slow in containerized environments.
Group 17: Custom Middleware and Response Streaming
Example 42: Starlette BaseHTTPMiddleware
For more control than @app.middleware("http"), subclass BaseHTTPMiddleware from Starlette. This pattern enables middleware with initialization parameters, helper methods, and shared state across requests.
# main.py - Class-based middleware with BaseHTTPMiddleware
import time
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
app = FastAPI()
class RequestLogMiddleware(BaseHTTPMiddleware):
# => Subclass BaseHTTPMiddleware for class-based middleware
def __init__(self, app: ASGIApp, log_level: str = "INFO"):
super().__init__(app) # => Must call super().__init__(app)
self.log_level = log_level # => Store configuration on instance
async def dispatch(self, request: Request, call_next) -> Response:
# => dispatch() is called for every HTTP request
# => Equivalent to the middleware function body
start = time.perf_counter()
client_ip = request.client.host if request.client else "unknown"
# => request.client.host is the client IP address
response = await call_next(request)
# => Call the next handler (route or inner middleware)
duration_ms = (time.perf_counter() - start) * 1000
log_line = (
f"[{self.log_level}] {client_ip} "
f"{request.method} {request.url.path} "
f"{response.status_code} {duration_ms:.1f}ms"
)
print(log_line) # => "[INFO] 127.0.0.1 GET /items 200 12.3ms"
return response
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
# => Middleware that adds security headers
async def dispatch(self, request: Request, call_next) -> Response:
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
# => Prevents MIME-type sniffing attacks
response.headers["X-Frame-Options"] = "DENY"
# => Prevents clickjacking via iframes
response.headers["X-XSS-Protection"] = "1; mode=block"
# => Legacy XSS protection header
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
# Add middleware in reverse order (last added runs first)
app.add_middleware(RequestLogMiddleware, log_level="INFO")
app.add_middleware(SecurityHeadersMiddleware)
# => SecurityHeadersMiddleware runs first (outermost)
# => RequestLogMiddleware runs second
@app.get("/items")
def list_items():
return {"items": []}Key Takeaway: Subclass BaseHTTPMiddleware and implement dispatch() for class-based middleware with configuration, helper methods, and shared state. Multiple middleware layers wrap around each other in reverse registration order.
Why It Matters: Security headers protect users from a class of browser-based attacks without requiring changes to individual route handlers. X-Content-Type-Options: nosniff prevents browsers from executing uploaded HTML files as JavaScript when served as text/plain—closing a common file upload exploit. Implementing security headers as middleware ensures every endpoint, including third-party routers and sub-applications mounted to the main app, inherits the protection automatically without manual annotation of each route.
Example 43: Streaming Responses
StreamingResponse allows FastAPI to send data to the client progressively rather than buffering the entire response. Use it for large file downloads, server-sent events (SSE), and AI text generation streams.
# main.py - Streaming responses for large data and SSE
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def generate_large_data(): # => Async generator yielding chunks
for i in range(100): # => Simulate 100 database rows
row = {"id": i, "value": f"item-{i}", "timestamp": datetime.now().isoformat()}
yield json.dumps(row) + "\n" # => Newline-delimited JSON (NDJSON)
# => Each line is a complete JSON object
await asyncio.sleep(0.01) # => Simulate database row retrieval time
@app.get("/stream/data")
async def stream_data():
return StreamingResponse(
generate_large_data(), # => Pass generator (not called yet)
media_type="application/x-ndjson",
# => MIME type for newline-delimited JSON
) # => FastAPI calls next() on generator per chunk
# => Client receives data progressively
async def event_stream(max_events: int = 10):
# => Server-Sent Events (SSE) format
for i in range(max_events):
event_data = json.dumps({"count": i, "time": datetime.now().isoformat()})
yield f"data: {event_data}\n\n"
# => SSE format: "data: {json}\n\n"
# => Two newlines separate events
await asyncio.sleep(1) # => Send one event per second
yield "data: {\"done\": true}\n\n"
# => Final event signals stream end
@app.get("/stream/events")
async def stream_events():
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
# => SSE media type; browser EventSource API uses this
headers={
"Cache-Control": "no-cache",
# => Prevent caching of event stream
"X-Accel-Buffering": "no",
# => Disable nginx buffering for real-time delivery
},
)
# JavaScript client:
# const source = new EventSource('/stream/events');
# source.onmessage = (e) => console.log(JSON.parse(e.data));Key Takeaway: Use StreamingResponse with an async generator to stream data progressively. For Server-Sent Events, format chunks as "data: {json}\n\n" with text/event-stream media type.
Why It Matters: Streaming responses solve two distinct problems: memory efficiency for large data transfers and latency reduction for incremental results. An endpoint exporting 10,000 database rows buffered entirely before sending requires 50MB of RAM; the same endpoint streamed requires only the memory for one row at a time. For AI text generation (ChatGPT-style), streaming sends each token to the user as it generates rather than waiting for the complete response—transforming a 30-second wait into a perceived response that starts immediately and types itself out progressively.
Group 18: Sub-Applications and Versioning
Example 44: Mounting Sub-Applications for API Versioning
FastAPI applications can mount other FastAPI or Starlette applications at path prefixes. This pattern enables clean API versioning, microservice composition, and tenant isolation.
# main.py - API versioning with mounted sub-applications
from fastapi import FastAPI
from pydantic import BaseModel
# Version 1 API
v1_app = FastAPI(
title="My API v1",
version="1.0.0",
docs_url="/docs", # => Docs at /v1/docs
)
class ItemV1(BaseModel):
id: int
name: str # => v1: simple name field
@v1_app.get("/items/{item_id}")
def get_item_v1(item_id: int):
return ItemV1(id=item_id, name=f"item-{item_id}")
# => GET /v1/items/42 => {"id": 42, "name": "item-42"}
# Version 2 API (new fields, different behavior)
v2_app = FastAPI(
title="My API v2",
version="2.0.0",
docs_url="/docs", # => Docs at /v2/docs
)
class ItemV2(BaseModel):
id: int
name: str
display_name: str # => v2: additional computed field
version: str = "v2"
@v2_app.get("/items/{item_id}")
def get_item_v2(item_id: int):
return ItemV2(
id=item_id,
name=f"item-{item_id}",
display_name=f"Item #{item_id} (v2)",
) # => GET /v2/items/42 => includes display_name
# Root app mounts versioned sub-apps
root_app = FastAPI(title="My API Gateway")
root_app.mount("/v1", v1_app) # => All v1_app routes accessible under /v1/
# => GET /v1/items/1 routes to v1_app
root_app.mount("/v2", v2_app) # => All v2_app routes accessible under /v2/
# => GET /v2/items/1 routes to v2_app
@root_app.get("/")
def api_info():
return {
"versions": ["v1", "v2"],
"latest": "v2",
"v1_docs": "/v1/docs", # => Each version has its own docs
"v2_docs": "/v2/docs",
}Key Takeaway: Mount separate FastAPI applications with app.mount("/prefix", sub_app) to implement API versioning. Each sub-application has its own routes, middleware, and OpenAPI docs.
Why It Matters: Sub-application mounting enables backward-compatible API evolution without the complexity of version branching in a monolithic router. When v2 changes a field name, existing v1 clients continue to work because their requests route to the unmodified v1 sub-application. Each version can have independently configured middleware, CORS settings, and rate limits. The separate OpenAPI docs per version let mobile apps still using v1 browse the correct documentation without confusion from v2 additions.
Example 45: APIRouter with Prefix, Tags, and Dependencies
APIRouter is the primary tool for organizing FastAPI applications into modular components. Combine prefix, tags, and dependencies on the router to apply shared configuration to all routes in a group.
# routers/users.py - A complete user management router
from typing import Annotated, List
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
# Inline dependency for demonstration
def get_current_admin():
return {"username": "admin", "role": "admin"}
# => In production: verify JWT admin role
router = APIRouter(
prefix="/users", # => All routes start with /users
tags=["users"], # => All routes tagged "users" in OpenAPI
dependencies=[Depends(get_current_admin)],
# => All routes in this router require admin auth
# => Applied automatically without per-route declaration
responses={
401: {"description": "Not authenticated"},
403: {"description": "Not authorized"},
}, # => These responses documented for all routes in router
)
class User(BaseModel):
id: int
username: str
email: str
active: bool = True
fake_users: List[User] = [
User(id=1, username="alice", email="alice@example.com"),
User(id=2, username="bob", email="bob@example.com"),
]
@router.get("/", response_model=List[User])
# => Full path: GET /users/
# => Requires admin (from router dependencies)
def list_users():
return fake_users
@router.get("/{user_id}", response_model=User)
# => Full path: GET /users/{user_id}
def get_user(user_id: int):
user = next((u for u in fake_users if u.id == user_id), None)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
# main.py - Include the router
from fastapi import FastAPI
# from routers.users import router as users_router # In real project
main_app = FastAPI()
main_app.include_router(router) # => Routes registered: GET /users/, GET /users/{user_id}
# => Admin dependency applied to bothKey Takeaway: Set prefix, tags, and dependencies on APIRouter to apply shared configuration to all routes in the router. Include it with app.include_router().
Why It Matters: Router-level dependencies eliminate the most common FastAPI security mistake: forgetting to add auth checks to individual endpoints. When the admin router enforces get_current_admin at the router level, a developer adding a new admin endpoint to that router cannot accidentally create an unauthenticated endpoint. The prefix also makes route structure self-documenting—router = APIRouter(prefix="/admin") immediately communicates the security boundary to code reviewers, even before they see the dependencies.
Group 19: Advanced Pydantic Patterns
Example 46: Pydantic v2 Model Validators and Computed Fields
Pydantic v2 introduces @model_validator for validation across multiple fields simultaneously, and @computed_field for read-only fields computed from other fields. These patterns enable rich domain model validation.
# main.py - Pydantic v2 model validators and computed fields
from typing import Optional
from fastapi import FastAPI
from pydantic import BaseModel, Field, model_validator, computed_field
import re
app = FastAPI()
class UserRegistration(BaseModel):
username: str = Field(min_length=3, max_length=50)
password: str = Field(min_length=8)
confirm_password: str # => Must match password
email: str
@model_validator(mode="after") # => Runs after all individual field validators
# => mode="after": receives fully validated model instance
# => mode="before": receives raw input dict
def passwords_must_match(self) -> "UserRegistration":
# => self is the model instance
# => Must return self (or raise ValueError)
if self.password != self.confirm_password:
raise ValueError("password and confirm_password must match")
# => ValueError message included in 422 response
return self
@model_validator(mode="after")
def username_not_in_password(self) -> "UserRegistration":
if self.username.lower() in self.password.lower():
raise ValueError("password must not contain username")
return self
class Product(BaseModel):
name: str
price: float = Field(gt=0)
discount_percent: float = Field(ge=0, le=100, default=0)
@computed_field # => Field computed from other fields
@property # => Must also be a @property
def discounted_price(self) -> float:
return round(self.price * (1 - self.discount_percent / 100), 2)
# => price=100, discount=10 => discounted_price=90.0
# => Included in JSON response automatically
@computed_field
@property
def on_sale(self) -> bool:
return self.discount_percent > 0
# => True if discount_percent > 0
@app.post("/register")
def register(user: UserRegistration):
return {"username": user.username, "email": user.email}
# => confirm_password not in response (response_model could filter)
@app.post("/products")
def create_product(product: Product):
return product # => Includes discounted_price and on_sale in response
# => POST {"name": "Widget", "price": 100, "discount_percent": 20}
# => Returns: {"name": "Widget", "price": 100,
# => "discount_percent": 20,
# => "discounted_price": 80.0, "on_sale": true}Key Takeaway: Use @model_validator(mode="after") for cross-field validation and @computed_field with @property for derived values included in serialization.
Why It Matters: Cross-field validation eliminates a class of inconsistent state bugs that single-field validators cannot catch. A date range where end_date < start_date, a price where discount > original_price, or mismatched passwords are all detected at the model boundary before business logic executes. Computed fields keep derived values consistent—discounted_price always reflects the current price and discount_percent, eliminating the desynchronization that occurs when computed values are stored separately.
Example 47: Pydantic Aliases and Config
Pydantic field aliases map external JSON field names (often camelCase from JavaScript) to Python attribute names (snake_case). The model_config class enables customizing serialization behavior for the whole model.
# main.py - Pydantic aliases for JSON field name mapping
from typing import Optional
from fastapi import FastAPI
from pydantic import BaseModel, Field
from pydantic.alias_generators import to_camel
app = FastAPI()
class OrderItem(BaseModel):
model_config = { # => Pydantic v2 model configuration
"populate_by_name": True, # => Accept both snake_case and camelCase
# => Without this: only alias works for input
"alias_generator": to_camel, # => Auto-generate camelCase aliases for all fields
# => snake_case Python names <-> camelCase JSON names
}
product_id: int # => Python: product_id; JSON: productId (auto-aliased)
unit_price: float # => Python: unit_price; JSON: unitPrice
quantity: int
discount_code: Optional[str] = None
# => Python: discount_code; JSON: discountCode
class LegacyItem(BaseModel): # => Manual aliases for non-standard external field names
item_id: int = Field(alias="ItemID")
# => JSON field "ItemID" maps to Python item_id
item_name: str = Field(alias="ItemName")
unit_cost: float = Field(alias="Cost_USD")
# => Non-standard: legacy system uses Cost_USD
@app.post("/orders/items", response_model=OrderItem)
def create_order_item(item: OrderItem):
# => Accepts JSON with camelCase keys:
# => {"productId": 1, "unitPrice": 9.99, "quantity": 2}
return item # => Response also uses camelCase (alias_generator)
@app.post("/legacy/items", response_model=LegacyItem)
def create_legacy_item(item: LegacyItem):
# => Accepts JSON: {"ItemID": 1, "ItemName": "Widget", "Cost_USD": 9.99}
# => item.item_id = 1, item.item_name = "Widget"
return item
# Note: To serialize responses using aliases, add:
# @app.post("/orders/items", response_model=OrderItem)
# def ...:
# return item.model_dump(by_alias=True) # => Returns {"productId": ...} instead of {"product_id": ...}Key Takeaway: Use alias_generator=to_camel with populate_by_name=True to automatically map between Python snake_case attributes and JSON camelCase keys. Use Field(alias="...") for individual field overrides.
Why It Matters: JavaScript clients send camelCase JSON keys while Python uses snake_case by convention. Without aliases, APIs must choose between inconsistent Python code (using camelCase attribute names) or inconsistent JavaScript clients (using snake_case JSON). Pydantic aliases let both sides follow their native conventions—Python code reads item.product_id while the JSON wire format shows "productId". The alias_generator=to_camel approach converts an entire model automatically, eliminating per-field alias declarations that become maintenance overhead as models grow.
Group 20: Background Processing and Advanced Patterns
Example 48: Dependency-Scoped Caching with functools.cache
Cache expensive computations and external lookups at multiple scopes: per-request (via dependency chain), per-process (via module-level LRU cache), or externally (via Redis). This example shows in-process caching patterns.
# main.py - In-process caching strategies
from functools import lru_cache
from typing import Annotated
import time
from fastapi import FastAPI, Depends
app = FastAPI()
# Process-level cache: persists for the lifetime of the process
@lru_cache(maxsize=128) # => Cache up to 128 unique argument combinations
def expensive_computation(key: str) -> dict:
# => Function must be deterministic for caching to be safe
print(f"[CACHE MISS] Computing for key: {key}")
time.sleep(0.1) # => Simulate 100ms computation
return {"key": key, "result": key.upper() * 3, "computed_at": time.time()}
# => time.time() changes each call
# => But lru_cache returns CACHED result after first call
# => So computed_at is the time of first computation
@lru_cache(maxsize=10)
def get_config_from_external() -> dict:
# => Read config from file/env once; cache forever
print("[CACHE MISS] Reading config")
return {"theme": "dark", "max_items": 100}
# Request-scoped cache via dependency chain (previous Example 29 showed this)
# FastAPI caches dependency results within a single request automatically
class SimpleCache: # => Simple TTL-aware in-process cache
def __init__(self, ttl_seconds: int = 60):
self._cache: dict = {}
self._expiry: dict = {}
self.ttl = ttl_seconds
def get(self, key: str):
if key in self._cache:
if time.time() < self._expiry[key]: # => Check if not expired
return self._cache[key] # => Return cached value
del self._cache[key] # => Remove expired entry
del self._expiry[key]
return None # => Cache miss
def set(self, key: str, value) -> None:
self._cache[key] = value
self._expiry[key] = time.time() + self.ttl
# => Store expiry timestamp
item_cache = SimpleCache(ttl_seconds=30) # => Cache items for 30 seconds
@app.get("/compute/{key}")
def compute(key: str):
return expensive_computation(key) # => Second call with same key: instant
# => lru_cache returns cached result
@app.get("/items-cached/{item_id}")
def get_item_cached(item_id: int):
cache_key = f"item:{item_id}"
cached = item_cache.get(cache_key)
if cached is not None:
return {**cached, "from_cache": True} # => Cache hit: return immediately
result = {"id": item_id, "name": f"item-{item_id}", "price": 9.99}
item_cache.set(cache_key, result) # => Store in cache for 30 seconds
return {**result, "from_cache": False} # => Cache miss: computed and cachedKey Takeaway: Use @lru_cache for process-level memoization of pure functions. Build a simple TTL cache class for time-sensitive data. FastAPI’s dependency system handles request-scoped caching automatically.
Why It Matters: In-process caching reduces database load and external API calls by orders of magnitude for frequently accessed, slowly changing data. Configuration values, permission sets, and lookup tables read from a database on every request create unnecessary load. An lru_cache on a get_permissions() function cuts 1,000 permission-check database queries to one per cache entry per process lifetime. For distributed systems with multiple processes, elevate to Redis for shared cache state across all instances.
Example 49: Request Context with Middleware
Pass request-scoped data between middleware and handlers using Python’s contextvars module. This enables tracing information, authenticated user objects, and request IDs to flow through the call stack without explicit parameter passing.
# main.py - Request context with contextvars for tracing
import uuid
from contextvars import ContextVar
from fastapi import FastAPI, Request
app = FastAPI()
# ContextVar: per-async-task storage (per-coroutine, per-request in FastAPI)
request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")
# => Each coroutine (request) has its own value
# => Setting in middleware sets it for this request only
# => Other concurrent requests have their own values
current_user_var: ContextVar[dict] = ContextVar("current_user", default={})
@app.middleware("http")
async def context_middleware(request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
# => Use client-provided ID or generate new one
token = request_id_var.set(request_id)
# => Set value in ContextVar for this coroutine
# => token allows resetting to previous value
try:
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
# => Echo request ID back to client for correlation
return response
finally:
request_id_var.reset(token) # => Reset to previous value after request
# => Prevents context bleeding between requests
def get_request_id() -> str: # => Helper to read request ID anywhere
return request_id_var.get() # => Returns the ID set by middleware for current request
def log(message: str) -> None:
rid = get_request_id()
print(f"[{rid[:8]}] {message}") # => All logs include request ID prefix
@app.get("/items/{item_id}")
async def get_item(item_id: int):
log(f"Fetching item {item_id}") # => Log includes request ID without explicit param
# => "[a7c3f2e1] Fetching item 42"
# Simulate service call
log("Calling database")
return {
"item_id": item_id,
"request_id": get_request_id(),
# => Include request ID in response for debugging
}Key Takeaway: Use ContextVar to store request-scoped data set in middleware and read in handlers or service functions without passing it as explicit parameters through the call chain.
Why It Matters: Distributed tracing requires correlating all log lines from a single request across potentially dozens of function calls. Without ContextVar, every function that logs must accept a request_id parameter—polluting every function signature with infrastructure concerns. ContextVar gives each concurrent request its own isolated context, allowing tracing data to flow implicitly through the async call stack. OpenTelemetry’s Python SDK uses ContextVar internally for trace context propagation, making it the standard approach for distributed tracing integration.
Example 50: Pagination with Cursor-Based Approach
Offset-based pagination degrades at scale (large OFFSETs scan all preceding rows). Cursor-based pagination uses an opaque bookmark into the dataset, maintaining constant query performance regardless of page depth.
# main.py - Cursor-based pagination for scalable list endpoints
from typing import Optional, List
from fastapi import FastAPI, Query
from pydantic import BaseModel
import base64
import json
app = FastAPI()
# Simulated dataset
items = [{"id": i, "name": f"Item {i}", "created_at": i * 1000} for i in range(1, 101)]
# => 100 items with id 1..100
class PaginatedResponse(BaseModel):
items: List[dict]
next_cursor: Optional[str] = None # => None means no more pages
has_more: bool
def encode_cursor(item_id: int) -> str:
payload = json.dumps({"id": item_id})
return base64.b64encode(payload.encode()).decode()
# => Encodes {"id": 42} as base64 opaque string
# => Clients treat this as an opaque bookmark
def decode_cursor(cursor: str) -> Optional[int]:
try:
payload = base64.b64decode(cursor.encode()).decode()
return json.loads(payload)["id"]
# => Decodes back to {"id": 42} => returns 42
except Exception:
return None # => Invalid cursor: start from beginning
@app.get("/items/cursor-paginated", response_model=PaginatedResponse)
def list_items_cursor(
cursor: Optional[str] = Query(default=None, description="Pagination cursor from previous response"),
# => cursor=None means start from beginning
limit: int = Query(default=10, ge=1, le=50),
):
if cursor is None:
start_id = 0 # => Start from first item
else:
start_id = decode_cursor(cursor) or 0
# => Resume from cursor position
# Filter and paginate (in real code: WHERE id > start_id LIMIT limit+1)
filtered = [item for item in items if item["id"] > start_id]
# => Items after cursor position
page_items = filtered[: limit + 1]
# => Fetch one extra to detect if more pages exist
has_more = len(page_items) > limit
# => Extra item exists => more pages available
result_items = page_items[:limit] # => Trim back to requested limit
next_cursor = None
if has_more and result_items:
next_cursor = encode_cursor(result_items[-1]["id"])
# => Cursor points to last returned item's ID
return PaginatedResponse(
items=result_items,
next_cursor=next_cursor,
has_more=has_more,
)
# GET /items/cursor-paginated => Returns items 1-10, next_cursor="eyJpZCI6IDEwfQ=="
# GET /items/cursor-paginated?cursor=eyJpZCI6IDEwfQ== => Returns items 11-20Key Takeaway: Cursor-based pagination uses an opaque encoded position marker instead of numeric offsets. Query WHERE id > cursor_id LIMIT n+1 maintains constant query performance regardless of page depth.
Why It Matters: Offset pagination performs full table scans to skip N rows before returning results. At page 1000 with 20 items per page, the database scans 20,000 rows it immediately discards—a performance cliff that slows response times linearly with page number. Cursor-based pagination uses WHERE id > last_id LIMIT 20, which hits an index directly regardless of which “page” is requested. For APIs exposing datasets with millions of rows, cursor pagination maintains sub-10ms query times at any depth while offset pagination degrades to seconds.
Example 51: Error Response Standardization
Standardize all error responses across your API to a consistent JSON structure. This enables clients to write one error-handling function instead of parsing different formats from different endpoints.
# main.py - Standardized error response format
from typing import Any, Optional
from fastapi import FastAPI, Request, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel
app = FastAPI()
class ErrorResponse(BaseModel): # => Standard error response envelope
error_code: str # => Machine-readable code: "ITEM_NOT_FOUND"
message: str # => Human-readable message
details: Optional[Any] = None # => Additional context (validation errors, etc.)
def make_error(code: str, message: str, details=None) -> dict:
return ErrorResponse(
error_code=code,
message=message,
details=details,
).model_dump() # => Convert to dict for JSONResponse
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
# => Override default HTTPException handler
# => exc.status_code: HTTP status code
# => exc.detail: original detail string
code_map = {
400: "BAD_REQUEST",
401: "UNAUTHORIZED",
403: "FORBIDDEN",
404: "NOT_FOUND",
429: "RATE_LIMITED",
500: "INTERNAL_ERROR",
}
return JSONResponse(
status_code=exc.status_code,
content=make_error(
code=code_map.get(exc.status_code, "HTTP_ERROR"),
message=str(exc.detail),
),
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
# => Override default 422 validation error handler
return JSONResponse(
status_code=422,
content=make_error(
code="VALIDATION_ERROR",
message="Request validation failed",
details=exc.errors(), # => List of individual field errors
),
)
@app.get("/items/{item_id}")
def get_item(item_id: int):
if item_id != 1:
raise HTTPException(status_code=404, detail="Item not found")
# => Handler returns: {"error_code": "NOT_FOUND", "message": "Item not found"}
return {"id": 1, "name": "Widget"}Key Takeaway: Register @app.exception_handler() for HTTPException and RequestValidationError to replace FastAPI’s default error format with a consistent application-wide error envelope.
Why It Matters: Inconsistent error response formats force clients to write multiple error parsers—one for validation errors (array format), one for HTTP errors (string format), and one for unhandled exceptions (HTML format). A single ErrorResponse envelope lets clients write one error handler function for all endpoints: read error.error_code for programmatic handling and error.message for display. Machine-readable error_code strings enable client-side retry logic, localization of error messages, and analytics on which errors occur most frequently in production.
Example 52: Async Context Manager Dependencies
Generator dependencies with yield support async cleanup using async with patterns. This enables dependencies to open resources, yield them to the handler, and clean up after the response is sent—essential for database transactions and external API sessions.
# main.py - Async context manager dependencies
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from fastapi import FastAPI, Depends
import asyncio
app = FastAPI()
class AsyncHTTPSession: # => Simulated async HTTP client session
def __init__(self, base_url: str):
self.base_url = base_url
self.closed = False
print(f"[Session] Opened connection to {base_url}")
async def get(self, path: str) -> dict:
await asyncio.sleep(0.01) # => Simulate HTTP request
return {"url": f"{self.base_url}{path}", "data": "response"}
async def close(self):
self.closed = True
print(f"[Session] Closed connection to {self.base_url}")
async def get_http_session() -> AsyncGenerator[AsyncHTTPSession, None]:
# => Generator yields session; cleanup runs after handler
session = AsyncHTTPSession("https://api.external-service.com")
try:
yield session # => Handler receives the session
finally:
await session.close() # => Always closed, even if handler raises exception
class DatabaseTransaction: # => Simulated database transaction context
def __init__(self, session):
self.session = session
self.committed = False
async def commit(self):
self.committed = True
print("[TX] Transaction committed")
async def rollback(self):
print("[TX] Transaction rolled back")
async def get_transaction(db = None):
# => In production: db would be injected via Depends(get_db)
tx = DatabaseTransaction(db)
try:
yield tx # => Handler uses tx; can call tx.commit()
if not tx.committed:
await tx.rollback() # => Auto-rollback if handler didn't commit
except Exception:
await tx.rollback() # => Rollback on any exception
raise # => Re-raise after rollback
@app.get("/external-data")
async def fetch_external(
session: AsyncHTTPSession = Depends(get_http_session),
# => session is open for duration of this handler
):
result = await session.get("/users")
return result # => Session closed automatically after returnKey Takeaway: Use async def generators with yield for dependencies that require async setup and cleanup. The try/finally pattern guarantees cleanup runs even when handlers raise exceptions.
Why It Matters: Resource cleanup guarantees prevent connection leaks that accumulate into system failures. An HTTP session that is not explicitly closed keeps a TCP connection open in CLOSE_WAIT state—harmless individually but catastrophic at scale when thousands of leaked connections exhaust the operating system’s connection table. The try/finally pattern around yield makes cleanup unconditional: network errors, validation failures, and unhandled exceptions in handlers all trigger the cleanup path. This is the FastAPI-recommended pattern for database transaction management, replacing the try/except/commit/rollback boilerplate repeated in every handler.
Example 53: Custom OpenAPI Schema Injection
Extend FastAPI’s automatically generated OpenAPI schema with custom security schemes, additional metadata, webhook definitions, and schema extensions. This enables compatibility with API gateway tools that parse the schema.
# main.py - Customizing the OpenAPI schema
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
app = FastAPI()
@app.get("/items")
def list_items():
return []
@app.get("/health")
def health():
return {"status": "ok"}
def custom_openapi():
if app.openapi_schema: # => Return cached schema if already generated
return app.openapi_schema
openapi_schema = get_openapi(
title="My API",
version="1.0.0",
description="Custom OpenAPI schema with extensions",
routes=app.routes, # => Generate from registered routes
)
# Add API key security scheme
openapi_schema["components"]["securitySchemes"] = {
"ApiKeyHeader": { # => Define API key in header scheme
"type": "apiKey",
"in": "header",
"name": "X-API-Key", # => Header name clients should send
},
"BearerAuth": { # => Define JWT Bearer scheme
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT", # => Hints to tools that this is a JWT
},
}
# Add x-logo extension for ReDoc
openapi_schema["info"]["x-logo"] = {
"url": "https://example.com/logo.png",
# => Logo displayed in ReDoc documentation
}
# Add webhook definitions
openapi_schema["webhooks"] = {
"item-created": { # => Webhook name
"post": {
"description": "Fired when a new item is created",
"requestBody": {
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/Item"}
}
}
},
"responses": {"200": {"description": "Webhook received"}},
}
}
}
app.openapi_schema = openapi_schema # => Cache for subsequent requests
return app.openapi_schema
app.openapi = custom_openapi # => Replace default openapi() methodKey Takeaway: Replace app.openapi with a custom function to modify the generated OpenAPI schema. Add security schemes, webhooks, and vendor extensions without losing auto-generation from route definitions.
Why It Matters: API gateway products (AWS API Gateway, Kong, Apigee) import OpenAPI schemas to configure routing, rate limiting, and authentication. These tools often require specific security scheme definitions or vendor extension fields (x-amazon-apigateway-integration, x-kong-plugin-*) not generated by FastAPI’s default schema. Custom schema injection lets you maintain FastAPI’s schema auto-generation for route documentation while adding gateway-specific annotations, eliminating hand-maintained schema files that drift out of sync with implementation.
Example 54: Testing Async Endpoints with AsyncClient
For endpoints that use async dependencies or test teardown logic that requires async cleanup, use httpx.AsyncClient with ASGITransport and pytest-asyncio.
# test_async.py - Async endpoint testing with AsyncClient
# pip install httpx pytest-asyncio
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
id: int
name: str
@app.get("/items/{item_id}", response_model=Item)
async def get_item(item_id: int):
return Item(id=item_id, name=f"item-{item_id}")
@app.post("/items", response_model=Item, status_code=201)
async def create_item(item: Item):
return item
# pytest configuration: add to pyproject.toml or pytest.ini
# [tool.pytest.ini_options]
# asyncio_mode = "auto" # => All async test functions run with asyncio
@pytest.mark.asyncio # => Mark test as async (or use asyncio_mode=auto)
async def test_get_item():
async with AsyncClient(
transport=ASGITransport(app=app),
# => ASGITransport routes requests to app without network
base_url="http://test", # => Dummy base URL; actual requests go to app
) as client:
response = await client.get("/items/42")
# => await: async HTTP request
assert response.status_code == 200
data = response.json()
assert data["id"] == 42
assert data["name"] == "item-42"
@pytest.mark.asyncio
async def test_create_item():
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
payload = {"id": 1, "name": "Widget"}
response = await client.post("/items", json=payload)
assert response.status_code == 201
assert response.json()["name"] == "Widget"
@pytest.fixture
async def async_client(): # => Reusable async client fixture
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client # => Yield to tests; cleanup runs after test
@pytest.mark.asyncio
async def test_using_fixture(async_client: AsyncClient):
response = await async_client.get("/items/1")
# => Use fixture client directly
assert response.status_code == 200Key Takeaway: Use httpx.AsyncClient with ASGITransport(app=app) for async test functions. Create a reusable async_client fixture for tests that share setup and teardown.
Why It Matters: Async test clients accurately test async endpoints including await chains in handlers, middleware, and dependencies. A synchronous TestClient wraps async code in a synchronous adapter that can mask timing-dependent bugs in concurrent code. AsyncClient also enables testing asyncio.gather patterns and concurrent request simulation in tests, catching race conditions in shared state before they cause production incidents. The fixture pattern reduces boilerplate and ensures client cleanup runs reliably even when assertions fail.
Example 55: Health Check and Readiness Endpoints
Production deployments require health check endpoints for load balancers and container orchestrators. Distinguish between liveness (is the process running?) and readiness (can it serve traffic?) checks.
# main.py - Liveness and readiness health check endpoints
import asyncio
import time
from fastapi import FastAPI, Response, status
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Application state tracking
class AppState:
started_at: float = time.time()
database_healthy: bool = True # => Updated by background health checker
cache_healthy: bool = True
shutting_down: bool = False # => Set True when SIGTERM received
state = AppState()
class HealthStatus(BaseModel):
status: str # => "healthy" | "degraded" | "unhealthy"
uptime_seconds: float
checks: dict
async def check_database() -> bool:
try:
await asyncio.sleep(0.001) # => Simulate DB ping (use real connection in production)
return True
except Exception:
return False
async def check_external_api() -> bool:
# In production: ping external dependency
return True # => Simplified for example
@app.get("/health/live")
async def liveness(): # => Liveness: is the process alive and not deadlocked?
# => Load balancer uses this to decide restart
# => Should NEVER check external dependencies
if state.shutting_down:
return Response(
content='{"status": "shutting_down"}',
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
# => 503 tells LB to stop sending new requests
media_type="application/json",
)
return {"status": "alive", "uptime": time.time() - state.started_at}
@app.get("/health/ready")
async def readiness(): # => Readiness: can the app serve requests?
# => Kubernetes uses this for deployment health
# => Should check all dependencies
db_ok = await check_database()
api_ok = await check_external_api()
all_healthy = db_ok and api_ok
status_str = "healthy" if all_healthy else "degraded"
http_status = 200 if all_healthy else 503
# => 503 during startup until DB connects
# => Kubernetes won't route traffic until 200
return Response(
content=HealthStatus(
status=status_str,
uptime_seconds=time.time() - state.started_at,
checks={"database": db_ok, "external_api": api_ok},
).model_dump_json(),
status_code=http_status,
media_type="application/json",
)Key Takeaway: Implement separate /health/live (process alive, no external checks) and /health/ready (dependencies healthy, can serve traffic) endpoints. Return 503 from readiness when dependencies are unavailable.
Why It Matters: Kubernetes and ECS rely on health check endpoints to prevent traffic routing to unhealthy pods. A single /health endpoint that checks the database creates a circular failure: when the database is slow, health checks time out, the pod is killed, it restarts and immediately checks the database again—amplifying load on an already struggling database. Separate liveness and readiness endpoints break this cycle: liveness keeps the pod alive, while readiness gates traffic until the database connection pool is established, allowing graceful degradation under partial failures.