Work with Databases
Problem
Python applications need efficient database interaction with proper connection management, type safety, and migration support. Raw SQL strings are error-prone and difficult to maintain across database vendors.
Solution
1. SQLAlchemy Core - SQL Expression Language
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, select
engine = create_engine(
"postgresql://user:password@localhost/dbname",
pool_size=10,
max_overflow=20,
pool_pre_ping=True # Verify connections before use
)
metadata = MetaData()
users = Table(
'users',
metadata,
Column('id', Integer, primary_key=True),
Column('username', String(50), unique=True, nullable=False),
Column('email', String(100), nullable=False),
Column('age', Integer)
)
metadata.create_all(engine)
with engine.connect() as conn:
stmt = users.insert().values(
username="john_doe",
email="john@example.com",
age=30
)
result = conn.execute(stmt)
conn.commit()
print(f"Inserted user with id: {result.inserted_primary_key[0]}")
with engine.connect() as conn:
stmt = select(users).where(users.c.age > 25)
result = conn.execute(stmt)
for row in result:
print(f"User: {row.username}, Age: {row.age}")2. SQLAlchemy ORM - Declarative Models
from sqlalchemy import create_engine, String, Integer
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session, relationship
from typing import List
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(100))
age: Mapped[int] = mapped_column(Integer, nullable=True)
# Relationship
posts: Mapped[List["Post"]] = relationship(back_populates="author")
def __repr__(self):
return f"<User(username={self.username}, email={self.email})>"
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str]
user_id: Mapped[int] = mapped_column(Integer)
# Relationship
author: Mapped["User"] = relationship(back_populates="posts")
engine = create_engine("sqlite:///example.db", echo=True)
Base.metadata.create_all(engine)
with Session(engine) as session:
# Create
new_user = User(username="jane_doe", email="jane@example.com", age=28)
session.add(new_user)
session.commit()
# Read
user = session.query(User).filter_by(username="jane_doe").first()
print(f"Found user: {user}")
# Update
user.age = 29
session.commit()
# Delete
session.delete(user)
session.commit()3. Async SQLAlchemy for High Concurrency
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select
import asyncio
async_engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/dbname",
echo=True,
pool_size=20,
max_overflow=40
)
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
async def create_user(username: str, email: str):
async with AsyncSessionLocal() as session:
user = User(username=username, email=email)
session.add(user)
await session.commit()
return user
async def get_user_by_username(username: str):
async with AsyncSessionLocal() as session:
stmt = select(User).where(User.username == username)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def get_users_with_posts():
"""Fetch users with eager loading of posts."""
async with AsyncSessionLocal() as session:
stmt = select(User).options(selectinload(User.posts))
result = await session.execute(stmt)
users = result.scalars().all()
for user in users:
print(f"{user.username} has {len(user.posts)} posts")
async def main():
user = await create_user("async_user", "async@example.com")
found_user = await get_user_by_username("async_user")
await get_users_with_posts()
asyncio.run(main())4. Database Migrations with Alembic
alembic init alembicfrom myapp.models import Base
target_metadata = Base.metadataalembic revision --autogenerate -m "Add users table"
alembic upgrade head
alembic downgrade -1
alembic historyExample Migration File:
"""Add users table
Revision ID: 001
Revises:
Create Date: 2025-12-21
"""
from alembic import op
import sqlalchemy as sa
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(50), nullable=False),
sa.Column('email', sa.String(100), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()')),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('username')
)
def downgrade() -> None:
op.drop_table('users')5. Connection Pooling Best Practices
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool
import logging
engine = create_engine(
"postgresql://user:password@localhost/dbname",
poolclass=QueuePool,
pool_size=10, # Maintain 10 connections
max_overflow=20, # Allow 20 additional connections
pool_timeout=30, # Wait 30s for available connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Test connections before using
echo_pool=True # Log pool checkouts/checkins
)
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
"""Called when connection is created."""
logging.info("New database connection established")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
"""Called when connection is checked out from pool."""
logging.debug("Connection checked out from pool")
from contextlib import contextmanager
from sqlalchemy.orm import Session
@contextmanager
def get_db_session():
"""Provide transactional scope for database operations."""
session = Session(engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
with get_db_session() as session:
user = session.query(User).first()
user.age += 1
# Automatic commit on success, rollback on exception6. Query Optimization and N+1 Prevention
from sqlalchemy.orm import joinedload, selectinload, subqueryload
users = session.query(User).all()
for user in users:
# Each iteration executes separate query!
print(f"{user.username} has {len(user.posts)} posts")
users = session.query(User).options(joinedload(User.posts)).all()
for user in users:
print(f"{user.username} has {len(user.posts)} posts")
users = session.query(User).options(selectinload(User.posts)).all()
for user in users:
print(f"{user.username} has {len(user.posts)} posts")
users = session.query(User).options(subqueryload(User.posts)).all()
from sqlalchemy import func
users_with_stats = (
session.query(
User.username,
func.count(Post.id).label('post_count')
)
.outerjoin(Post)
.group_by(User.id, User.username)
.having(func.count(Post.id) > 5)
.all()
)How It Works
graph TD
A[Application] --> B[SQLAlchemy Session]
B --> C[Connection Pool]
C --> D{Available Connection?}
D -->|Yes| E[Checkout Connection]
D -->|No| F{Pool Full?}
F -->|Yes| G[Wait for Connection]
F -->|No| H[Create New Connection]
E --> I[Execute Query]
H --> I
G --> E
I --> J[Return Connection to Pool]
J --> C
style A fill:#0173B2
style B fill:#DE8F05
style C fill:#029E73
style I fill:#CC78BC
Key Components:
- Engine: Connection factory and pool manager
- Session: Workspace for ORM operations and transaction management
- Connection Pool: Reusable database connections for efficiency
- Unit of Work: Tracks changes and flushes to database on commit
Variations
Repository Pattern for Clean Architecture
from abc import ABC, abstractmethod
from typing import Optional, List
class UserRepository(ABC):
@abstractmethod
def get_by_id(self, user_id: int) -> Optional[User]:
pass
@abstractmethod
def get_by_username(self, username: str) -> Optional[User]:
pass
@abstractmethod
def create(self, user: User) -> User:
pass
@abstractmethod
def update(self, user: User) -> User:
pass
@abstractmethod
def delete(self, user_id: int) -> None:
pass
class SQLAlchemyUserRepository(UserRepository):
def __init__(self, session: Session):
self.session = session
def get_by_id(self, user_id: int) -> Optional[User]:
return self.session.query(User).filter(User.id == user_id).first()
def get_by_username(self, username: str) -> Optional[User]:
return self.session.query(User).filter(User.username == username).first()
def create(self, user: User) -> User:
self.session.add(user)
self.session.flush()
return user
def update(self, user: User) -> User:
self.session.merge(user)
self.session.flush()
return user
def delete(self, user_id: int) -> None:
user = self.get_by_id(user_id)
if user:
self.session.delete(user)
self.session.flush()
def create_user_service(username: str, email: str):
with get_db_session() as session:
repo = SQLAlchemyUserRepository(session)
user = User(username=username, email=email)
return repo.create(user)Bulk Operations for Performance
users_data = [
{"username": f"user_{i}", "email": f"user_{i}@example.com"}
for i in range(10000)
]
with Session(engine) as session:
session.bulk_insert_mappings(User, users_data)
session.commit()
session.bulk_update_mappings(
User,
[{"id": 1, "age": 31}, {"id": 2, "age": 32}]
)
session.commit()Read Replicas for Scalability
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
primary_engine = create_engine("postgresql://primary/dbname")
replica_engine = create_engine("postgresql://replica/dbname")
class DatabaseRouter:
"""Route queries to appropriate database."""
def get_session(self, for_write: bool = False) -> Session:
engine = primary_engine if for_write else replica_engine
return Session(engine)
router = DatabaseRouter()
with router.get_session(for_write=False) as session:
users = session.query(User).all()
with router.get_session(for_write=True) as session:
user = User(username="new_user", email="new@example.com")
session.add(user)
session.commit()Common Pitfalls
1. Not Managing Session Lifecycle
Problem: Keeping sessions open too long causes memory leaks.
session = Session(engine)
user = session.query(User).first()
session.commit() # Memory accumulated!
with Session(engine) as session:
user = session.query(User).first()
session.commit()2. Forgetting to Commit Transactions
Problem: Changes not persisted to database.
session = Session(engine)
user = User(username="test")
session.add(user)
with Session(engine) as session:
user = User(username="test")
session.add(user)
session.commit() # Persisted3. N+1 Query Problem
Problem: Executing separate query for each relationship.
users = session.query(User).all()
for user in users:
print(len(user.posts)) # Separate query each iteration!
users = session.query(User).options(selectinload(User.posts)).all()
for user in users:
print(len(user.posts)) # No additional queries4. Using Mutable Defaults
Problem: Shared mutable default values across instances.
class User(Base):
__tablename__ = "users"
tags = Column(JSON, default=[]) # Shared across instances!
class User(Base):
__tablename__ = "users"
tags = Column(JSON, server_default='[]')
# Or use callable default in Python
created_at = Column(DateTime, default=datetime.utcnow)5. Not Handling Database Errors
Problem: Connections fail, queries timeout, constraints violated.
from sqlalchemy.exc import IntegrityError, OperationalError
import logging
user = User(username="duplicate")
session.add(user)
session.commit() # Crashes on duplicate username!
try:
user = User(username="duplicate")
session.add(user)
session.commit()
except IntegrityError as e:
session.rollback()
logging.error(f"Duplicate username: {e}")
except OperationalError as e:
session.rollback()
logging.error(f"Database connection failed: {e}")Related Patterns
Related Tutorial: See Intermediate Tutorial - Database Integration. Related How-To: See Use Advanced Async Patterns. Related Cookbook: See Cookbook recipe “Database Patterns”.
Last updated