Advanced
Group 9: Database Advanced
Example 51: Transactions and Concurrency with Ecto.Multi
Execute multiple database operations atomically. If any fails, all rollback.
%% Ecto.Multi transaction flow
graph TD
A[Multi.new] --> B[Multi.update :debit]
B --> C[Multi.update :credit]
C --> D[Multi.insert :log]
D --> E[Repo.transaction]
E --> F{All succeed?}
F -->|Yes| G[Commit all changes]
F -->|No| H[Rollback everything]
G --> I[Return {:ok, results}]
H --> J[Return {:error, failed_op}]
style A fill:#0173B2,color:#fff
style E fill:#DE8F05,color:#fff
style G fill:#029E73,color:#fff
style H fill:#CA9161,color:#fff
defmodule MyApp.Transfers do # => Defines Transfers service module
# => Groups money transfer operations
def transfer_funds(from_account, to_account, amount) do # => Public function: transfer_funds/3
# => Params: from_account, to_account, amount
result = # => Binds result to Multi pipeline return
# => Will be {:ok, map} or {:error, ...} tuple
Ecto.Multi.new() # => Creates empty Multi struct
# => Multi accumulates operations to run in transaction
|> Ecto.Multi.update( # => Pipes Multi into update operation
:debit, # => Named operation key :debit
# => Can reference this result later as %{debit: ...}
Ecto.Changeset.change(from_account, balance: from_account.balance - amount)
# => Creates changeset subtracting amount from balance
# => E.g., balance: 1000 - 50 = 950
) # => Returns Multi with :debit operation added
# => Multi now has 1 operation queued
|> Ecto.Multi.update( # => Adds second update operation
:credit, # => Named operation key :credit
# => Independent of :debit (parallel semantics)
Ecto.Changeset.change(to_account, balance: to_account.balance + amount)
# => Creates changeset adding amount to balance
# => E.g., balance: 500 + 50 = 550
) # => Returns Multi with :debit and :credit queued
# => Multi now has 2 operations
|> Ecto.Multi.insert(:transaction_log, %TransactionLog{
# => Adds insert operation for audit log
# => Named :transaction_log
from_id: from_account.id, # => Sets from_id field to source account ID
# => E.g., from_id: 123
to_id: to_account.id, # => Sets to_id field to destination account ID
# => E.g., to_id: 456
amount: amount # => Sets amount field to transferred amount
# => E.g., amount: 50
}) # => Returns Multi with 3 operations total
# => All operations ready to execute atomically
|> MyApp.Repo.transaction() # => Executes all operations in database transaction
# => Returns {:ok, %{debit: ..., credit: ..., transaction_log: ...}} on success
# => Returns {:error, failed_operation, changeset, changes_so_far} on failure
# => Database commits all or rolls back all
case result do # => Pattern match on transaction result
# => Handles success and failure cases
{:ok, %{debit: from, credit: to, transaction_log: log}} -> # => Success pattern match
# => Destructures map with all 3 results
# => from is updated from_account struct
# => to is updated to_account struct
# => log is inserted TransactionLog struct
{:ok, from, to, log} # => Returns success tuple with all results
# => Caller receives all updated structs
# => Database changes are committed permanently
{:error, :debit, changeset, _changes} -> # => Failure pattern: :debit operation failed
# => changeset contains validation errors
# => _changes is empty map (no operations succeeded yet)
{:error, "Debit failed", changeset} # => Returns error tuple with message
# => Transaction rolled back before :credit ran
# => Database unchanged (atomicity guaranteed)
{:error, failed_op, changeset, _changes} -> # => Catch-all failure pattern for any operation
# => failed_op is operation name (:credit or :transaction_log)
# => changeset has error details
{:error, "Operation failed: #{failed_op}", changeset} # => Returns error with operation name
# => E.g., "Operation failed: credit"
# => All changes discarded, database rolled back
end # => End case statement
end # => End transfer_funds/3 function
# Dependency between operations - sequential execution with references
def complex_transaction do # => Public function: complex_transaction/0
# => No parameters, demonstrates dependent operations
Ecto.Multi.new() # => Creates empty Multi struct
# => Starting point for dependent operation chain
|> Ecto.Multi.insert(:user, User.create_changeset(%{email: "user@example.com"}))
# => First operation: insert user
# => Named :user (required for later references)
# => Creates user with email "user@example.com"
# => Returns Multi with :user operation queued
|> Ecto.Multi.insert(:profile, fn %{user: user} -> # => Second operation: insert profile (DEPENDENT)
# => Anonymous function receives %{user: user} from previous operations
# => user is the inserted User struct from :user operation
# => Sequential dependency: profile needs user.id
Profile.create_changeset(user) # => Creates profile changeset for inserted user
# => Uses user struct to set foreign key
# => E.g., profile.user_id = user.id
end) # => Returns Multi with :user and :profile queued
# => Multi now has 2 operations, :profile depends on :user
|> Ecto.Multi.insert(:settings, fn %{user: user} -> # => Third operation: insert settings (DEPENDENT)
# => Also receives %{user: user} from accumulated results
# => user is same inserted User struct
Settings.default_changeset(user) # => Creates default settings for user
# => Uses user struct to set foreign key
# => E.g., settings.user_id = user.id
end) # => Returns Multi with all 3 operations queued
# => Execution order: :user, then :profile, then :settings
|> MyApp.Repo.transaction() # => Executes operations in order within transaction
# => If :user fails, :profile and :settings never run
# => If :profile fails, :user is rolled back, :settings never runs
# => Returns {:ok, %{user: ..., profile: ..., settings: ...}} on success
# => Returns {:error, operation_name, changeset, partial_results} on failure
end # => End complex_transaction/0 function
end # => End MyApp.Transfers moduleKey Takeaway: Ecto.Multi chains database operations atomically—all succeed or all rollback—with each step able to reference previous results, making it ideal for transfers and multi-step writes.
Why It Matters: Ecto.Multi makes complex multi-step database operations atomic without manual transaction management. Naming each operation allows precise error reporting — when :debit fails, you know exactly which operation failed without inspecting raw database errors. This is essential for financial operations, user registration flows, and any workflow where partial completion leaves data in an inconsistent state.
Example 52: Database Constraints and Error Handling
Handle database constraint violations (unique, foreign key, etc.) gracefully in changesets.
%% Constraint violation handling
graph TD
A[Insert user] --> B[Database]
B --> C{Constraint violated?}
C -->|UNIQUE email| D[unique_constraint catches]
C -->|FK organization_id| E[assoc_constraint catches]
C -->|No violation| F[Insert succeeds]
D --> G[Return changeset with error]
E --> G
F --> H[Return {:ok, user}]
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#fff
style F fill:#029E73,color:#fff
style G fill:#CA9161,color:#fff
defmodule MyApp.Accounts.User do # => Defines User schema module
# => Maps Elixir struct to "users" database table
schema "users" do # => Defines schema for "users" table
# => Auto-generates struct with id, inserted_at, updated_at
field :email, :string # => Declares email field, type :string (VARCHAR in DB)
# => Will be in User struct as user.email
field :username, :string # => Declares username field, type :string
# => Will be in User struct as user.username
end # => End schema definition
def registration_changeset(user, attrs) do # => Public function: registration_changeset/2
# => Params: user struct, attrs map
# => Returns changeset for user registration
user # => Starts with User struct (empty or existing)
# => E.g., %User{email: nil, username: nil}
|> cast(attrs, [:email, :username]) # => Casts attrs map to changeset with allowed fields
# => Only :email and :username are whitelisted
# => E.g., attrs = %{email: "alice@example.com", username: "alice"}
# => Returns changeset with changes: %{email: "alice@example.com", username: "alice"}
|> unique_constraint(:email) # => Adds constraint check for unique email
# => Does NOT query database yet (lazy)
# => If Repo.insert finds duplicate email, catches constraint error
# => Converts PostgreSQL UNIQUE violation to changeset error
# => Error: {:email, {"has already been taken", [constraint: :unique]}}
|> unique_constraint(:username) # => Adds constraint check for unique username
# => If Repo.insert finds duplicate username, catches error
# => Prevents "ERROR: duplicate key value violates unique constraint"
# => Converts to user-friendly changeset error
|> assoc_constraint(:organization) # => Validates foreign key :organization_id exists in organizations table
# => If organization_id points to non-existent record, catches FK violation
# => Converts "violates foreign key constraint" to changeset error
# => Error: {:organization, {"does not exist", [constraint: :assoc]}}
end # => Returns changeset with all constraints registered
# => Constraints only validated when Repo.insert/update called
end # => End MyApp.Accounts.User module
# In your service
defmodule MyApp.Accounts do # => Defines Accounts context module
# => Groups user registration and management operations
def register_user(attrs) do # => Public function: register_user/1
# => Param: attrs map with email, username
# => Returns {:ok, user} or {:error, message}
case %User{} # => Creates empty User struct
# => %User{id: nil, email: nil, username: nil}
|> User.registration_changeset(attrs) # => Builds changeset with validations
# => Adds constraints for email, username, organization
# => Returns changeset ready for insert
|> Repo.insert() do # => Attempts database INSERT
# => If successful: returns {:ok, %User{id: 123, email: "...", ...}}
# => If constraint violated: returns {:error, %Changeset{errors: [...]}}
# => Constraint checks happen at database level
{:ok, user} -> # => Pattern match on success case
# => user is persisted User struct with database ID
# => E.g., %User{id: 123, email: "alice@example.com", username: "alice"}
{:ok, user} # => Returns success tuple with user struct
# => Caller receives complete user record
{:error, %Changeset{} = changeset} -> # => Pattern match on failure case
# => changeset contains validation/constraint errors
# => E.g., changeset.errors = [{:email, {"has already been taken", ...}}]
# Check for constraint violations
error_fields = Enum.map(changeset.errors, fn {field, {msg, _}} -> {field, msg} end)
# => Maps errors to {field, message} tuples
# => E.g., [{:email, "has already been taken"}, {:username, "..."}]
# => Extracts field name and error message, discards metadata
if Enum.any?(error_fields, fn {field, _} -> field == :email end) do
# => Checks if :email field has error
# => Returns true if duplicate email constraint violated
# => Returns false if email is valid
{:error, "Email already registered"} # => Returns specific error for duplicate email
# => User-friendly message for UI display
else # => All other errors (username duplicate, FK violation, etc.)
{:error, "Registration failed"} # => Returns generic error message
# => Could inspect changeset.errors for specific details
end # => End if statement
end # => End case statement
end # => End register_user/1 function
end # => End MyApp.Accounts moduleKey Takeaway: unique_constraint/2 and assoc_constraint/2 translate database constraint violations into user-friendly changeset errors, keeping SQL details out of your API responses.
Why It Matters: Database constraints enforced via changesets catch integrity errors and translate them into user-friendly messages. unique_constraint/2 and assoc_constraint/2 convert database-level violations into changeset errors, so users see "Email already taken" instead of a 500 error. This dual-layer approach—application validation for fast feedback and database constraints for absolute correctness—prevents race conditions where two concurrent requests both pass validation but only one satisfies the unique index.
Example 53: Polymorphic Associations with many_to_many :through
Model flexible relationships where the same entity can have many different types of related entities.
%% Many-to-many through join table
erDiagram
POST ||--o{ POST_TAG : has
TAG ||--o{ POST_TAG : has
COMMENT ||--o{ COMMENT_TAG : has
TAG ||--o{ COMMENT_TAG : has
POST {
int id PK
string title
}
TAG {
int id PK
string name
}
POST_TAG {
int post_id FK
int tag_id FK
}
COMMENT {
int id PK
string body
}
COMMENT_TAG {
int comment_id FK
int tag_id FK
}
defmodule MyApp.Content.Post do # => Defines Post schema module
# => Maps to "posts" database table
schema "posts" do # => Defines schema for "posts" table
# => Auto-generates id, inserted_at, updated_at
field :title, :string # => Declares title field, type :string
# => Will be accessible as post.title
many_to_many(:tags, MyApp.Tagging.Tag, join_through: "post_tags")
# => Declares many-to-many relationship with Tag
# => join_through: "post_tags" - uses post_tags join table
# => Post can have many Tags, Tag can have many Posts
# => Accessible as post.tags (list of Tag structs)
many_to_many(:attachments, MyApp.Attachments.Attachment, join_through: "post_attachments")
# => Second many-to-many relationship with Attachment
# => Uses post_attachments join table
# => Demonstrates polymorphic-like pattern (Post has Tags AND Attachments)
end # => End schema definition
end # => End MyApp.Content.Post module
defmodule MyApp.Content.Comment do # => Defines Comment schema module
# => Maps to "comments" database table
schema "comments" do # => Defines schema for "comments" table
# => Auto-generates id, inserted_at, updated_at
field :body, :string # => Declares body field for comment text
# => Will be accessible as comment.body
many_to_many(:tags, MyApp.Tagging.Tag, join_through: "comment_tags")
# => Declares many-to-many relationship with SAME Tag module
# => Uses comment_tags join table (different from post_tags)
# => Comment can have many Tags, Tag can belong to Posts AND Comments
# => Polymorphic pattern: Tag shared across multiple entities
end # => End schema definition
end # => End MyApp.Content.Comment module
# Migration for join table
def change do # => Migration function to create join table
# => Runs when executing mix ecto.migrate
create table(:post_tags) do # => Creates post_tags join table
# => Auto-generates id, inserted_at, updated_at
add :post_id, references(:posts, on_delete: :delete_all)
# => Adds post_id foreign key column
# => references(:posts) - points to posts.id
# => on_delete: :delete_all - when post deleted, delete all post_tags rows
# => Ensures referential integrity (CASCADE DELETE)
add :tag_id, references(:tags, on_delete: :delete_all)
# => Adds tag_id foreign key column
# => references(:tags) - points to tags.id
# => on_delete: :delete_all - when tag deleted, delete all post_tags rows
# => Prevents orphaned join records
timestamps() # => Adds inserted_at and updated_at timestamp columns
# => Tracks when join record was created/updated
end # => End table creation
create unique_index(:post_tags, [:post_id, :tag_id])
# => Creates unique composite index on (post_id, tag_id)
# => Prevents duplicate Post-Tag associations
# => E.g., can't link same Post to same Tag twice
# => Also improves query performance for lookups
end # => End migration function
# Usage
post = Post # => Starts with Post module alias
|> Repo.preload(:tags) # => Loads associated tags from database
# => Executes JOIN query: SELECT tags.* FROM tags JOIN post_tags ON...
# => Returns post with post.tags populated (list of Tag structs)
# => E.g., post.tags = [%Tag{id: 1, name: "elixir"}, %Tag{id: 2, name: "web"}]
|> Ecto.Changeset.change() # => Creates changeset from preloaded post
# => Prepares post struct for updating associations
# => Returns changeset with no changes yet
|> put_assoc(:tags, tags) # => Replaces post's tags with new tags list
# => tags variable contains list of Tag structs
# => Ecto will DELETE old post_tags rows and INSERT new ones
# => E.g., tags = [%Tag{id: 3}, %Tag{id: 4}]
|> Repo.update() # => Persists association changes to database
# => Executes: DELETE FROM post_tags WHERE post_id = X
# => INSERT INTO post_tags (post_id, tag_id) VALUES (X, 3), (X, 4)
# => Returns {:ok, updated_post} with new associations
# Query posts with specific tag
posts = from p in Post, # => Starts Ecto query for Post table
# => p is alias for posts table
join: t in assoc(p, :tags), # => INNER JOIN with tags through post_tags join table
# => assoc(p, :tags) generates: JOIN post_tags ON post_tags.post_id = p.id
# => JOIN tags ON tags.id = post_tags.tag_id
# => t is alias for tags table
where: t.slug == "featured" # => Filters for tag with slug = "featured"
# => WHERE tags.slug = 'featured'
# => Returns only posts that have the "featured" tag
# => Result: list of Post structs [%Post{...}, ...]Key Takeaway: many_to_many/3 with a join table creates flexible relationships; put_assoc/3 replaces associated records atomically and join in queries enables filtering across relationship boundaries.
Why It Matters: Polymorphic associations (taggable_type/taggable_id) let a single tags table serve multiple parent models without duplicating tables. The pattern comes with query complexity — you must always filter by both taggable_type and taggable_id to avoid cross-model contamination. In Ecto, this requires custom join conditions since the ORM cannot infer the relationship automatically. Understanding when polymorphic associations are justified versus when separate join tables are cleaner is a key database design decision.
Example 54: Multi-Tenancy with Ecto Query Prefix
Isolate tenant data at the query level. Each query automatically scopes to tenant.
defmodule MyApp.Accounts do # => Defines Accounts context for multi-tenancy
# => Groups tenant-scoped user operations
def get_user(user_id, tenant_id) do # => Public function: get_user/2
# => Params: user_id, tenant_id for isolation
from(u in User, where: u.id == ^user_id and u.tenant_id == ^tenant_id)
# => Builds query filtering by BOTH user_id AND tenant_id
# => Prevents cross-tenant data leakage
# => WHERE users.id = ? AND users.tenant_id = ?
|> Repo.one() # => Executes query, returns single user or nil
# => Result: %User{} if found, nil if not found or wrong tenant
end # => End get_user/2 function
def create_user(attrs, tenant_id) do # => Public function: create_user/2
# => Params: attrs map, tenant_id for isolation
%User{} # => Creates empty User struct
# => %User{id: nil, tenant_id: nil, ...}
|> User.changeset(attrs |> Map.put("tenant_id", tenant_id))
# => Adds tenant_id to attrs before validation
# => E.g., %{"name" => "Alice", "tenant_id" => 123}
# => Ensures user always belongs to correct tenant
|> Repo.insert() # => Inserts user into database
# => Returns {:ok, %User{}} or {:error, changeset}
end # => End create_user/2 function
end # => End MyApp.Accounts module
# Or use query prefix for schema-per-tenant
defmodule MyApp.Repo do # => Alternative approach: schema-level isolation
# => Each tenant has separate PostgreSQL schema
def for_tenant(tenant_id) do # => Public function: for_tenant/1
# => Switches Repo to use tenant-specific schema
# All queries run with prefix filter
Repo.put_dynamic_repo({__MODULE__, {tenant_id}}) # => Sets dynamic repo prefix for current process
# => All subsequent queries in this process use "tenant_#{tenant_id}" schema
# => E.g., tenant_id=123 → queries use "tenant_123" schema
end # => End for_tenant/1 function
end # => End MyApp.Repo module
# Query scoped to tenant
User # => Starts with User schema
|> where([u], u.tenant_id == ^tenant_id) # => Filters by tenant_id column
# => WHERE users.tenant_id = ?
# => Ensures only this tenant's users returned
|> Repo.all() # => Executes query, returns list of users
# => Result: [%User{tenant_id: 123}, %User{tenant_id: 123}]
# => Never returns users from other tenants
# Or with dynamic query prefix
User # => Starts with User schema
|> Repo.all(prefix: "tenant_#{tenant_id}") # => Queries from tenant-specific schema
# => E.g., SELECT * FROM tenant_123.users
# => Complete schema isolation (tenant_123, tenant_456, etc.)
# => Each tenant has independent tablesKey Takeaway: Always filter queries by tenant_id using composable scope functions to prevent data leaks between tenants; for stronger isolation, use separate database schemas per tenant.
Why It Matters: PostgreSQL schema-based multi-tenancy isolates each tenant's data into a separate database schema, providing hard boundaries that row-level tenant_id filtering cannot guarantee. Ecto's prefix: option routes every query to the correct schema without application-layer branching. This approach simplifies backup and restore per tenant and enables compliance requirements that demand physical data isolation, though it increases migration complexity since schema changes must be applied across all tenant schemas.
Example 55: PostgreSQL Advanced Features in Ecto
Leverage PostgreSQL-specific features: JSONB, arrays, full-text search, custom types.
defmodule MyApp.Blog.Post do # => Defines Post schema with PostgreSQL features
# => Demonstrates JSONB, arrays, full-text search
schema "posts" do # => Maps to "posts" table
# => Auto-generates id, inserted_at, updated_at
field :title, :string # => Standard string field (VARCHAR)
# => Will be accessible as post.title
field :metadata, :map # => JSONB in PostgreSQL
# => Stores arbitrary JSON data efficiently
# => E.g., %{"status" => "draft", "tags" => ["elixir"]}
field :tags, {:array, :string} # => Array type
# => PostgreSQL native array: text[]
# => E.g., ["elixir", "phoenix", "web"]
field :search_vector, :string # => Full-text search
# => Actually stored as tsvector type
# => Preprocessed text for efficient searching
end # => End schema definition
end # => End MyApp.Blog.Post module
# Migration
def change do # => Migration function to create posts table
# => Runs when executing mix ecto.migrate
create table(:posts) do # => Creates posts table
# => Auto-adds id primary key
add :title, :string # => Adds title column (VARCHAR)
# => Standard text field
add :metadata, :jsonb, default: "{}" # => Adds metadata column (JSONB type)
# => Default: empty JSON object {}
# => Stores structured data without predefined schema
add :tags, {:array, :string}, default: [] # => Adds tags column (text[] array)
# => Default: empty array []
# => Native PostgreSQL array, not JSON
add :search_vector, :tsvector # => Adds search_vector column (tsvector type)
# => Optimized for full-text search
# => Stores preprocessed, indexed text
timestamps() # => Adds inserted_at, updated_at columns
# => Auto-managed by Ecto
end # => End table creation
# GIN index for JSONB performance
create index(:posts, ["(metadata)"], using: :gin) # => Creates GIN index on metadata column
# => GIN (Generalized Inverted Index) for JSONB
# => Enables fast queries on JSON keys/values
# => E.g., WHERE metadata->>'status' = 'published'
# GIN index for full-text search
create index(:posts, [:search_vector], using: :gin) # => Creates GIN index on search_vector
# => Enables fast full-text search
# => Required for efficient @@ operator queries
end # => End migration function
# Query JSONB
posts = from p in Post, # => Starts Ecto query for Post
# => p is alias for posts table
where: fragment("? ->> ? = ?", p.metadata, "status", "published")
# => JSONB query: metadata->>'status' = 'published'
# => ->> extracts JSON field as text
# => Finds posts where metadata has status="published"
# => Result: posts with matching JSON property
# Array operations
posts = from p in Post, # => Query for posts with specific tag
# => Uses PostgreSQL array operators
where: fragment("? @> ?", p.tags, ^["elixir"]) # => Array contains operator @>
# => Checks if tags array contains ["elixir"]
# => E.g., ["elixir", "phoenix"] @> ["elixir"] = true
# => Result: posts tagged with "elixir"
# Full-text search
results = from p in Post, # => Full-text search query
# => Uses PostgreSQL tsvector and tsquery
where: fragment("to_tsvector('english', ?) @@ plainto_tsquery('english', ?)",
p.title, ^search_term), # => to_tsvector converts title to searchable form
# => plainto_tsquery converts search term to query
# => @@ matches tsvector against tsquery
# => E.g., search_term="phoenix framework" finds titles with those words
select: p # => Returns matching Post structs
# => Result: posts matching search termKey Takeaway: Use :map for JSONB and {:array, :string} for arrays; leverage tsvector for full-text search and fragment/2 for database-specific SQL, always with indexes for performance.
Why It Matters: PostgreSQL's JSONB, array, and full-text search types, accessed through Ecto fragments, let you handle semi-structured data without a separate NoSQL database. tsvector full-text search is significantly faster than LIKE queries and supports ranked results. Using native PostgreSQL types avoids application-layer parsing and keeps complex queries close to the data.
Group 10: Performance
Example 56: Query Optimization - N+1 Prevention
Load related data efficiently to avoid N+1 queries where one query results in N additional queries.
%% N+1 problem vs preload solution
graph TD
A[❌ N+1 Problem] --> B[Query 1: SELECT * FROM posts]
B --> C[Query 2: SELECT * FROM authors WHERE id = 1]
B --> D[Query 3: SELECT * FROM authors WHERE id = 2]
B --> E[Query 4: SELECT * FROM authors WHERE id = 3]
B --> F[... N queries]
G[✅ Preload Solution] --> H[Query 1: SELECT * FROM posts]
H --> I[Query 2: SELECT * FROM authors WHERE id IN]
I --> J[Total: 2 queries]
style A fill:#CA9161,color:#fff
style G fill:#029E73,color:#fff
style J fill:#029E73,color:#fff
# ❌ N+1 Problem - 1 query + N queries
posts = Post |> Repo.all() # => SELECT * FROM posts (1 query)
for post <- posts do # => Loop through each post
author = Author |> where([a], a.id == ^post.author_id) |> Repo.one()
# => SELECT * FROM authors WHERE id = ? (N queries!)
# => One query per post!
end
# Total: 1 + N queries (if 100 posts = 101 queries!)
# => 100 posts = 101 database queries
# ✅ Solution 1: Preload
posts = Post
|> preload(:author) # => Eager load authors
|> Repo.all()
# => 2 queries total: SELECT posts, SELECT authors WHERE id IN (...)
# => Batches all author lookups into one query
# ✅ Solution 2: Join (for aggregations)
posts = from p in Post,
join: a in assoc(p, :author), # => SQL JOIN
select: {p, a} # => Return both post and author
# => 1 query: SELECT posts.*, authors.* FROM posts JOIN authors
# ✅ Solution 3: Preload with nested associations
posts = Post
|> preload([comments: :author]) # => Loads comments and their authors
|> Repo.all()
# => 3 queries: posts, comments, comment authors
# Query with EXPLAIN to see execution plan
results = Repo.all(from p in Post, preload: :author) # => Execute query
IO.inspect(Repo.explain(:all, Post)) # => Print EXPLAIN ANALYZE output
# => Shows index usage, seq scans, row estimatesKey Takeaway: Eager-load associations with preload/1 to prevent N+1 queries, use join for aggregations, and always verify complex queries with EXPLAIN ANALYZE before deploying.
Why It Matters: N+1 queries grow exponentially: fetching 100 posts then loading comments for each triggers 101 SELECT statements instead of 2. Repo.preload/2 collapses these into a single IN-clause query, reducing database round trips regardless of result set size. Dataloader batches nested resolver calls in GraphQL APIs. Adding telemetry to track query counts per request lets you catch N+1 regressions in CI before they reach production and degrade response times under real load.
Example 57: Caching Strategies
Cache expensive operations to reduce database load and improve response time.
defmodule MyApp.CacheServer do # => Defines in-memory cache GenServer
# => Stores key-value pairs with TTL
use GenServer # => Imports GenServer behavior
# => Provides start_link, init, handle_call, etc.
def start_link(_opts) do # => Public function: start_link/1
# => Called by supervisor to start cache process
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
# => Starts GenServer with empty map as initial state
# => Registers process with module name
# => Returns {:ok, pid}
end # => End start_link/1 function
def get_cache(key) do # => Public API: get_cache/1
# => Synchronous retrieval of cached value
GenServer.call(__MODULE__, {:get, key}) # => Sends synchronous message to cache process
# => Blocks caller until response received
# => Returns {:ok, value} or :not_found
end # => End get_cache/1 function
def set_cache(key, value, ttl_ms) do # => Public API: set_cache/3
# => Asynchronous cache write with TTL
GenServer.cast(__MODULE__, {:set, key, value, ttl_ms})
# => Sends async message (fire-and-forget)
# => Returns :ok immediately without blocking
# => Value stored with expiration time
end # => End set_cache/3 function
@impl true # => Implements GenServer callback
def init(state) do # => Initializes GenServer state
# => Called when process starts
{:ok, state} # => Returns {:ok, initial_state}
# => State is empty map %{}
end # => End init/1 callback
@impl true # => Implements GenServer callback
def handle_call({:get, key}, _from, state) do # => Handles synchronous get requests
# => Pattern matches {:get, key} message
case Map.get(state, key) do # => Retrieves value from state map
# => Result: {value, expires_at} or nil
{value, expires_at} when expires_at > System.monotonic_time(:millisecond) ->
# => Pattern match: value exists AND not expired
# => Compares expiration time with current time
{:reply, {:ok, value}, state} # => Cache hit, not expired
# => Returns value to caller, keeps state unchanged
_ -> # => Catch-all: cache miss or expired
# => Either key doesn't exist OR past expiration
{:reply, :not_found, state} # => Cache miss or expired
# => Returns :not_found to caller
end # => End case statement
end # => Synchronous call, blocks caller until reply
@impl true # => Implements GenServer callback
def handle_cast({:set, key, value, ttl_ms}, state) do
# => Handles asynchronous set requests
# => Pattern matches {:set, key, value, ttl_ms} message
expires_at = System.monotonic_time(:millisecond) + ttl_ms # => Calculate expiry
# => Current monotonic time + TTL in milliseconds
# => E.g., current=1000, ttl=5000 → expires_at=6000
{:noreply, Map.put(state, key, {value, expires_at})}
# => Updates state map with new cache entry
# => Stores tuple {value, expires_at} as value
# => Returns {:noreply, new_state}
end # => Asynchronous, returns immediately without blocking
end # => End MyApp.CacheServer module
# Or use Cachex library
defmodule MyApp.Blog do # => Example using Cachex library
# => Production-ready caching solution
def get_popular_posts do # => Public function: get_popular_posts/0
# => Caches expensive database query
case Cachex.get(:blog_cache, "popular_posts") do # => Checks cache first
# => :blog_cache is cache name (configured in app)
# => "popular_posts" is cache key
{:ok, nil} -> # => Cache miss: no cached value
# => Need to fetch from database
posts = Post |> where([p], p.likes > 100) |> Repo.all()
# => Expensive database query
# => SELECT * FROM posts WHERE likes > 100
# => Returns list of popular posts
Cachex.put(:blog_cache, "popular_posts", posts, ttl: :timer.minutes(60))
# => Stores result in cache with 60-minute TTL
# => Future requests served from cache
# => TTL: cache expires after 1 hour
posts # => Returns posts to caller
# => First request: slow (database query)
{:ok, posts} -> # => Cache hit: value exists in cache
# => posts already fetched from cache
posts # => Returns cached posts immediately
# => Subsequent requests: fast (no database query)
end # => End case statement
end # => End get_popular_posts/0 function
end # => End MyApp.Blog moduleKey Takeaway: Cache expensive queries with TTL using Cachex and invalidate entries on data changes—caching at the context layer keeps controllers clean and makes cache behavior testable.
Why It Matters: Caching eliminates repeated computation for expensive queries, but each strategy has different tradeoffs. ETS caches live in node memory — fast but lost on restart and not shared across cluster nodes. Cachex provides distributed TTL-based caching with automatic invalidation. HTTP cache headers (ETag, Cache-Control) let browsers and CDNs cache responses, reducing server load entirely for public content. Measuring cache hit rates and tracking invalidation latency are essential to knowing whether caching actually helps or masks deeper query problems.
Example 58: Background Jobs with Oban
Execute long-running tasks asynchronously. Schedule recurring jobs.
%% Oban job processing flow
graph TD
A[User Registration] --> B[Insert Oban Job]
B --> C[oban_jobs table]
C --> D[Oban Worker Pool]
D --> E[EmailWorker.perform]
E --> F{Success?}
F -->|Yes| G[Mark completed]
F -->|No| H[Retry with backoff]
H --> D
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#fff
style G fill:#029E73,color:#fff
style H fill:#CC78BC,color:#fff
# lib/my_app/workers/email_worker.ex
defmodule MyApp.Workers.EmailWorker do # => Background job worker module
use Oban.Worker, queue: :default # => Register as Oban worker on :default queue
# => Oban.Worker provides perform/1 callback
@impl Oban.Worker
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
# => args is the map from EmailWorker.new/1
# => Pattern match extracts user_id
user = MyApp.Accounts.get_user!(user_id) # => Load user
MyApp.Mailer.send_welcome_email(user) # => Send email
:ok # => Mark job complete
end # => Return :ok for success, {:error, reason} to retry
end
# In your controller/action
defmodule MyAppWeb.UserController do # => Web controller
def create(conn, %{"user" => user_params}) do # => POST /users handler
case MyApp.Accounts.create_user(user_params) do
{:ok, user} -> # => User created successfully
# Queue background job
%{"user_id" => user.id} # => Job args map
|> MyApp.Workers.EmailWorker.new() # => Build job struct
|> Oban.insert() # => Insert into oban_jobs table
# => Job runs asynchronously in background
json(conn, user) # => Respond immediately
# => Don't wait for email to send
{:error, changeset} -> # => Validation failed
error_response(conn, changeset) # => Return 422 with errors
end
end
end
# Schedule recurring jobs
defmodule MyApp.Application do # => OTP Application module
def start(_type, _args) do # => Called at application startup
children = [
# ... other children
Oban, # => Oban GenServer in supervision tree
# Schedule daily cleanup at 2 AM
{Oban.Cron, crontab: [
{"0 2 * * *", MyApp.Workers.CleanupWorker} # => Cron schedule: daily 2:00 AM UTC
# => CleanupWorker.perform/1 called on schedule
]}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
# => Start supervision tree with Oban
end
endKey Takeaway: Oban workers implement Oban.Worker with a perform/1 callback; jobs are enqueued asynchronously with automatic retries and cron scheduling backed by PostgreSQL for durability.
Why It Matters: Oban persists jobs in PostgreSQL before processing, ensuring no job is lost even if the server crashes mid-flight. Failed jobs retry with configurable exponential backoff, and dead jobs are retained in a queryable table for inspection and manual requeue. This durability is essential for critical operations like sending emails or processing payments where silent failures—jobs that ran once and disappeared—are unacceptable in production.
Example 59: Phoenix LiveDashboard for Metrics
Monitor your application in real-time with LiveDashboard. View requests, Ecto stats, processes.
# mix.exs
defp deps do # => Dependency list for LiveDashboard
# => Requires 3 libraries for monitoring
[
{:phoenix_live_dashboard, "~> 0.7"}, # => Main dashboard library
# => Provides web UI for metrics/monitoring
# => Version ~> 0.7 (compatible with 0.7.x)
{:telemetry_metrics, "~> 0.6"}, # => Metrics aggregation library
# => Collects and processes telemetry events
# => Required by LiveDashboard
{:telemetry_poller, "~> 1.0"} # => Periodic metric collection
# => Polls VM metrics (memory, processes, etc.)
# => Runs collection tasks on interval
]
end
# lib/my_app_web/telemetry.ex
defmodule MyAppWeb.Telemetry do # => Defines Telemetry supervisor module
# => Manages metric collection processes
use Supervisor # => Imports Supervisor behavior
# => Provides supervision functions
def start_link(arg) do # => Public function: start_link/1
# => Called by application supervisor
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
# => Starts supervisor process
# => Registers with module name
# => Returns {:ok, pid}
end # => End start_link/1 function
@impl true # => Implements Supervisor callback
def init(_arg) do # => Initializes supervisor
# => Defines child processes to supervise
children = [ # => List of child specifications
# => Each child runs independently
# Telemetry poller periodically collects metrics
{:telemetry_poller, handlers: handle_metrics()},
# => Starts telemetry_poller process
# => Calls handle_metrics() to get metric definitions
# => Polls metrics on configured interval
{Phoenix.LiveDashboard.TelemetryListener, []} # => Starts LiveDashboard listener
# => Listens for telemetry events
# => Updates dashboard UI in real-time
] # => End children list
Supervisor.init(children, strategy: :one_for_one) # => Initializes supervisor with children
# => Strategy :one_for_one: restart only failed child
# => If poller crashes, restart poller (not listener)
end # => End init/1 callback
defp handle_metrics do # => Private function: handle_metrics/0
# => Returns list of metrics to collect
[
# VM metrics
{:process_count, unit: {:byte, :kilobyte}}, # => Metric: number of Erlang processes
# => Unit conversion: bytes to kilobytes
# => Monitors process count over time
{:memory, unit: {:byte, :kilobyte}}, # => Metric: total VM memory usage
# => Converts bytes to KB for display
# => Tracks memory consumption
# Ecto metrics
{MyApp.Repo, [:repo, :adapter, :status], :ok}, # => Metric: database connection status
# => Event: MyApp.Repo emits [:repo, :adapter, :status]
# => Tracks connection health
{MyApp.Repo, [:repo, :adapter, :connection_error], :ok}
# => Metric: database connection errors
# => Event: emitted when connection fails
# => Monitors database issues
] # => End metrics list
end # => End handle_metrics/0 function
end # => End MyAppWeb.Telemetry module
# router.ex
import Phoenix.LiveDashboard.Router # => Imports LiveDashboard routing macros
# => Provides live_dashboard/2 macro
scope "/" do # => Defines root scope
# => Routes accessible from /
pipe_through :browser # => Uses browser pipeline
# => Applies session, CSRF, cookies
live_dashboard "/dashboard" # => Mounts LiveDashboard at /dashboard route
# => Creates LiveView route
# => Access at http://localhost:4000/dashboard
# => Shows metrics, processes, Ecto stats in UI
end # => End scopeKey Takeaway: Phoenix LiveDashboard at /dashboard provides real-time visibility into request performance, database pool health, memory usage, and running processes with zero additional infrastructure.
Why It Matters: LiveDashboard provides real-time visibility into your running application without any external monitoring infrastructure. You can see slow Ecto queries, process memory growth, and request latency in a single dashboard, enabling rapid diagnosis of production incidents. All metrics are generated from Telemetry instrumentation already built into Phoenix and Ecto—no additional instrumentation code required to get immediate production observability from day one.
Example 60: Custom Metrics with Telemetry
Emit custom metrics to track business logic and application behavior.
defmodule MyApp.Blog do # => Blog context module
def create_post(attrs) do # => Creates a new blog post
start_time = System.monotonic_time() # => Record start time in native units
# => Used to measure execution duration
case %Post{}
|> Post.changeset(attrs) # => Validate post attributes
|> Repo.insert() do # => Persist to database
{:ok, post} -> # => Insert succeeded
duration = System.monotonic_time() - start_time # => Calculate elapsed time
# => In Erlang native time units
:telemetry.execute(
[:blog, :post, :created], # => Event name: list of atoms
# => Matches handler attachment name
%{duration: duration}, # => Measurements map: numeric metrics
# => duration in native time units
%{post_id: post.id, user_id: attrs["user_id"]} # => Metadata map: context info
# => Available in all attached handlers
)
{:ok, post} # => Return success with inserted post
{:error, changeset} -> # => Validation/DB error
{:error, changeset} # => Return error changeset
end
end
end
# Listen to events
defmodule MyApp.TelemetryHandler do # => Centralized telemetry handler module
def attach_handlers do # => Called once at application startup
:telemetry.attach(
"blog-post-created", # => Unique handler ID (must be unique per event)
[:blog, :post, :created], # => Match exact event name from execute/3
&__MODULE__.handle_post_created/4, # => Handler function reference (4 args)
nil # => Config passed to handler (nil = none)
)
end
def handle_post_created(_event, measurements, metadata, _config) do
# => Called for every :blog, :post, :created event
# => measurements: %{duration: ...}
# => metadata: %{post_id: ..., user_id: ...}
IO.inspect({measurements, metadata}) # => Log measurements and metadata
# Send to monitoring service, log, increment counter, etc.
end
end
# In your application startup
MyApp.TelemetryHandler.attach_handlers() # => Register all handlers when app starts
# => Must be called before events fireKey Takeaway: Emit custom metrics with :telemetry.execute/3 and attach handlers with :telemetry.attach/4 to route events into StatsD, Prometheus, or any observability backend.
Why It Matters: Custom business metrics via :telemetry.execute/3 instrument your application domain — post creation rates, checkout durations, user signup flows — not just infrastructure. Decoupling metric emission from metric handling via event names means you can attach multiple handlers (logging, StatsD, Prometheus) without touching the business logic code.
Group 11: Production Deployment
Example 61: Mix Releases for Production
Build self-contained release that runs without Elixir/Erlang installed. Configure runtime settings.
# mix.exs
def project do # => Project configuration function
# => Defines app metadata and release settings
[
app: :my_app, # => Application name (atom)
# => Used for release binary name
version: "0.1.0", # => Current version string
# => Follows semantic versioning
releases: [ # => Release configuration map
# => Defines production release settings
prod: [ # => Release name: "prod"
# => Can have multiple releases (prod, staging, etc.)
include_executables_for: [:unix], # => Include Unix executables (start, stop scripts)
# => Targets Linux/macOS platforms
# => Excludes Windows executables
steps: [:assemble, :tar] # => Release build steps
# => :assemble - builds release structure
# => :tar - creates .tar.gz archive for deployment
]
]
]
end
# config/runtime.exs - Loaded at runtime, not compile time
import Config # => Imports Config macros
# => Enables config/2 function
config :my_app, MyAppWeb.Endpoint, # => Configures Phoenix Endpoint
# => Settings applied when release starts
http: [ip: {0, 0, 0, 0}, port: String.to_integer(System.get_env("PORT") || "4000")],
# => HTTP server configuration
# => Bind to all interfaces (0.0.0.0) for Docker
# => ip: {0,0,0,0} accepts connections from any IP
# => port: reads PORT env var or defaults to 4000
# => E.g., PORT=8080 → server listens on 8080
secret_key_base: System.fetch_env!("SECRET_KEY_BASE"), # => Read from env var
# => Fetches SECRET_KEY_BASE environment variable
# => Crashes if not set (fetch_env! requires it)
# => Used for session encryption
url: [host: System.get_env("PHX_HOST", "localhost"), port: 443, scheme: "https"]
# => URL configuration for generated URLs
# => HTTPS for production
# => host: reads PHX_HOST or defaults to "localhost"
# => port: 443 (standard HTTPS port)
# => scheme: "https" (forces HTTPS URLs)
if config_env() == :prod do # => Production-only configuration
# => Only applies when MIX_ENV=prod
config :my_app, MyApp.Repo, # => Configures Ecto Repo for production
# => Database connection settings
url: System.fetch_env!("DATABASE_URL"), # => Reads DATABASE_URL from environment
# => E.g., "ecto://user:pass@host/db"
# => Crashes if DATABASE_URL not set
ssl: true, # => Enables SSL for database connection
# => Required for most cloud databases
socket_options: [:inet6] # => Enables IPv6 support
# => Allows IPv6 database connections
end # => End production config block
# Build release
# mix release # => Command: builds default release
# => Creates _build/prod/rel/my_app/ directory
# => Packages app, dependencies, ERTS runtime
# Or with custom name
# mix release prod # => Command: builds "prod" named release
# => Uses prod release configuration from mix.exs
# => Creates tarball for deployment
# Run release
# _build/prod/rel/my_app/bin/my_app start # => Command: starts release as daemon
# => Runs in background
# => Logs to _build/prod/rel/my_app/log/
# Start in foreground
# _build/prod/rel/my_app/bin/my_app foreground # => Command: starts release in foreground
# => Logs to stdout/stderr
# => Useful for Docker containers
# => Ctrl+C stops the applicationKey Takeaway: mix release builds a self-contained package with config/runtime.exs loading secrets from environment variables at startup—no Elixir installation required on the production server.
Why It Matters: Runtime configuration via config/runtime.exs enables the same release artifact to run in staging, production, and disaster recovery with different credentials and endpoints—without rebuilding. mix release creates a self-contained package that includes the Erlang runtime, so deployment targets don't need Elixir installed. This makes releases predictable, reproducible, and safe to promote from staging to production with no build-time differences.
Example 62: Docker Containerization with Multi-Stage Build
Build optimized Docker image with minimal size and security.
%% Docker multi-stage build
graph TD
A[Stage 1: Builder] --> B[elixir:1.14-alpine]
B --> C[Install build deps]
C --> D[Copy source code]
D --> E[mix deps.get]
E --> F[MIX_ENV=prod mix release]
G[Stage 2: Runtime] --> H[alpine:latest]
H --> I[Install runtime deps]
I --> J[Copy release from builder]
J --> K[Final image 50MB]
style A fill:#0173B2,color:#fff
style F fill:#DE8F05,color:#fff
style G fill:#029E73,color:#fff
style K fill:#029E73,color:#fff
# Dockerfile - Multi-stage build
FROM elixir:1.14-alpine AS builder # => Stage 1: Builder image
# => Uses Elixir 1.14 on Alpine Linux (minimal)
# => AS builder: names this stage for later reference
WORKDIR /app # => Sets working directory to /app
# => All subsequent commands run from /app
# Install build dependencies
RUN apk add --no-cache build-base git # => Installs build tools
# => build-base: gcc, make, etc. for native deps
# => git: for git-based dependencies
# => --no-cache: doesn't store package index (smaller image)
# Copy source
COPY mix.exs mix.lock ./ # => Copies dependency files first
# => Enables Docker layer caching
# => If mix.exs unchanged, deps layer cached
RUN mix local.hex --force && mix local.rebar --force # => Installs Hex package manager
# => Installs Rebar build tool
# => --force: non-interactive installation
RUN mix deps.get # => Downloads dependencies
# => Fetches from Hex and Git sources
# => Cached if mix.exs/mix.lock unchanged
COPY . . # => Copies entire application code
# => Runs AFTER deps.get for better caching
# => Invalidates cache only when code changes
# Compile release
RUN MIX_ENV=prod mix release # => Builds production release
# => MIX_ENV=prod: production optimizations
# => Creates self-contained package
# => Output: _build/prod/rel/my_app/
# Runtime stage
FROM alpine:latest # => Stage 2: Runtime image
# => Minimal Alpine Linux (~5MB base)
# => No Elixir, no build tools (small image)
WORKDIR /app # => Sets working directory
# => Same as builder for consistency
# Install runtime dependencies
RUN apk add --no-cache openssl # => Installs OpenSSL library
# => Required for HTTPS/SSL connections
# => Only runtime dependency needed
# Copy release from builder
COPY --from=builder /app/_build/prod/rel/my_app ./ # => Copies compiled release from builder stage
# => --from=builder: references first stage
# => Only copies final binary (not source)
# => Dramatically reduces image size (50MB vs 500MB+)
EXPOSE 4000 # => Documents that container listens on port 4000
# => Doesn't actually publish port (use docker run -p)
# => Informational for users
# Health check
HEALTHCHECK --interval=10s --timeout=3s --start-period=40s --retries=3 \
# => Configures container health check
# => --interval=10s: check every 10 seconds
# => --timeout=3s: fail if check takes >3s
# => --start-period=40s: grace period on startup
# => --retries=3: mark unhealthy after 3 failures
CMD ["./bin/my_app", "eval", "MyApp.ready?"] # => Health check command
# => Evaluates Elixir expression MyApp.ready?
# => Should return true when app ready
# => Kubernetes uses this for liveness/readiness
CMD ["./bin/my_app", "start"] # => Default command when container starts
# => Starts Phoenix application
# => Runs in foreground (required for Docker)Key Takeaway: Multi-stage Docker builds compile the release in a builder stage and copy only the result to a minimal Alpine runtime image, keeping the production image small and fast to pull.
Why It Matters: Multi-stage Docker builds reduce image size by 60-80% compared to single-stage builds by excluding the Elixir compiler, Mix toolchain, and intermediate build artifacts from the runtime image. Smaller images pull faster during Kubernetes rolling deployments and reduce the attack surface for security vulnerabilities. Alpine Linux as the runtime base layer ensures minimal footprint while remaining compatible with most Erlang runtime requirements.
Example 63: Health Checks for Kubernetes
Implement liveness and readiness endpoints for orchestration systems to manage pod lifecycle.
%% Health check flow in Kubernetes
graph TD
A[Kubernetes] --> B{Liveness Probe}
B -->|GET /health/live| C[Health endpoint]
C --> D{App alive?}
D -->|Yes 200| E[Keep running]
D -->|No 5xx| F[Restart pod]
A --> G{Readiness Probe}
G -->|GET /health/ready| H[Health endpoint]
H --> I{DB connected?}
I -->|Yes 200| J[Send traffic]
I -->|No 503| K[Remove from load balancer]
style A fill:#0173B2,color:#fff
style E fill:#029E73,color:#fff
style F fill:#CA9161,color:#fff
style J fill:#029E73,color:#fff
style K fill:#CC78BC,color:#fff
# lib/my_app_web/controllers/health_controller.ex
defmodule MyAppWeb.HealthController do # => Controller for health check endpoints
use MyAppWeb, :controller # => Phoenix controller
def readiness(conn, _params) do # => GET /health/ready - Kubernetes readiness probe
# Check if app is ready to serve traffic
case check_database() do
:ok ->
json(conn, %{status: "ok"}) # => 200 OK, ready
:error ->
conn
|> put_status(:service_unavailable) # => 503 Service Unavailable
|> json(%{status: "error", reason: "database_unavailable"})
# => Kubernetes removes from load balancer
end
end # => Checked every 5 seconds
def liveness(conn, _params) do # => GET /health/live - Kubernetes liveness probe
# Check if app is alive (should restart if not)
json(conn, %{status: "ok"}) # => Always returns 200
end # => If this fails, Kubernetes restarts pod
defp check_database do # => Test DB connectivity
case Ecto.Adapters.SQL.query(MyApp.Repo, "SELECT 1", []) do
# => Minimal query: just check connection
{:ok, _} -> :ok # => Database responsive
{:error, _} -> :error # => Database unreachable
end
end
end
# router.ex
scope "/health", MyAppWeb do # => Health check routes (no authentication)
get "/ready", HealthController, :readiness # => Readiness probe: check dependencies
get "/live", HealthController, :liveness # => Liveness probe: basic alive check
end
# Kubernetes deployment yaml
# livenessProbe:
# httpGet:
# path: /health/live
# port: 4000
# initialDelaySeconds: 30
# periodSeconds: 10
# readinessProbe:
# httpGet:
# path: /health/ready
# port: 4000
# initialDelaySeconds: 5
# periodSeconds: 5Key Takeaway: Expose separate /healthz/ready and /healthz/live endpoints—readiness checks dependencies before accepting traffic while liveness signals whether the pod needs restarting.
Why It Matters: Kubernetes uses separate liveness and readiness endpoints to distinguish between "restart me" and "stop sending traffic". A liveness check failing restarts the pod; a readiness check failing removes it from the load balancer without restarting. During database migrations or startup initialization, a pod that is alive but not ready should return 503 on the readiness endpoint while passing liveness, preventing traffic from hitting an uninitialized application and causing request failures during rolling deployments.
Example 64: Graceful Shutdown
Handle shutdown signals gracefully, completing in-flight requests before terminating.
# lib/my_app/application.ex
defmodule MyApp.Application do # => OTP Application module
use Application # => Provides start/2 callback
@impl true
def start(_type, _args) do # => Called when VM starts application
children = [
MyAppWeb.Telemetry, # => Metrics reporting process
MyApp.Repo, # => Database connection pool
{Phoenix.PubSub, name: MyApp.PubSub}, # => Distributed PubSub for LiveView
MyAppWeb.Endpoint # => HTTP/WebSocket server
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
# => :one_for_one: restart only failed child
Supervisor.start_link(children, opts) # => Start all children under supervisor
end
@impl true
def config_change(changed, _new, removed) do # => Hot config reload callback
MyAppWeb.Endpoint.config_change(changed, removed)
# => Tell endpoint about config changes
end
@impl true
def prep_stop(_state) do # => Called before application stops
# Called before shutdown
# Drain in-flight requests
IO.puts("Shutting down gracefully...") # => Log shutdown
:ok
end # => Application.stop/1 waits for this to return
end
# In endpoint config
config :my_app, MyAppWeb.Endpoint,
# Graceful shutdown timeout (milliseconds)
shutdown: 25_000, # => Wait up to 25s for connections to close
# Give existing connections time to close
drain_on_stop: true # => Complete in-flight requests before exitKey Takeaway: prep_stop/1 provides a shutdown hook to drain in-flight requests before termination; set shutdown_timeout in mix releases to guarantee the app finishes current work gracefully.
Why It Matters: prep_stop/1 and stop/1 OTP callbacks give your application control over the shutdown sequence, ensuring in-flight requests complete before the BEAM terminates. Without graceful shutdown, Kubernetes rolling deployments can result in 502 errors as the load balancer continues routing requests to a pod that's already shutting down. Setting shutdown_timeout in your Mix release configuration prevents deployment-induced request failures and makes zero-downtime deployments reliable.
Example 65: Environment Configuration Management
Separate configuration by environment. Use runtime configuration for secrets.
# config/config.exs - Common to all environments
import Config # => Imports Config module
# => Provides config/2 and config/3 macros
config :logger, :console, # => Configures Logger console backend
# => Applies to all environments (dev, test, prod)
format: "$time $metadata[$level] $message\n" # => Log format template
# => $time: timestamp, $level: info/warn/error
# => $message: log message, $metadata: custom data
# config/dev.exs
import Config # => Development environment config
# => Only loaded when MIX_ENV=dev
config :my_app, MyApp.Repo, # => Ecto Repo configuration for dev
# => Local PostgreSQL database
username: "postgres", # => Database username
# => Default PostgreSQL user
password: "postgres", # => Database password
# => Hardcoded OK for dev (never in prod!)
hostname: "localhost", # => Database host
# => Connects to local PostgreSQL
port: 5432, # => PostgreSQL default port
# => Standard port number
database: "my_app_dev" # => Database name
# => Separate database for development
config :my_app, MyAppWeb.Endpoint, # => Phoenix Endpoint configuration
# => Dev-specific settings for fast iteration
debug_errors: true, # => Shows detailed error pages
# => Displays stack traces in browser
# => NEVER enable in production!
check_origin: false, # => Disables WebSocket origin checking
# => Allows connections from any origin in dev
# => Required for LiveView development
watchers: [esbuild: {Esbuild, :install_and_run, []}]
# => File watchers for asset compilation
# => Auto-rebuilds JS/CSS on file changes
# => esbuild watches and compiles assets
# config/test.exs
import Config # => Test environment configuration
# => Only loaded when MIX_ENV=test
config :my_app, MyApp.Repo, # => Ecto Repo configuration for tests
# => Separate database for test isolation
username: "postgres", # => Database username
# => Same as dev for simplicity
password: "postgres", # => Database password
# => Same as dev for simplicity
database: "my_app_test", # => Test database name
# => Separate from dev database
pool: Ecto.Adapters.SQL.Sandbox # => Connection pool for tests
# => Sandbox: wraps each test in transaction
# => Auto-rollback after each test
# => Ensures test isolation and cleanup
# config/runtime.exs - Loaded at runtime
import Config # => Runtime configuration
# => Loaded when release starts (NOT at compile time)
# => Enables environment-specific configuration
database_url = System.get_env("DATABASE_URL") || "ecto://postgres:postgres@localhost/my_app_prod"
# => Reads DATABASE_URL from environment
# => Falls back to default for local prod
# => E.g., "ecto://user:pass@host:5432/dbname"
# => Production: set by deployment platform
config :my_app, MyApp.Repo, url: database_url # => Configures Repo with database URL
# => Parses URL into connection params
# => Overrides compile-time config
secret_key_base = System.fetch_env!("SECRET_KEY_BASE")
# => Reads SECRET_KEY_BASE from environment
# => Crashes if not set (fetch_env! requires it)
# => Used for encrypting sessions/cookies
# => Generate with: mix phx.gen.secret
config :my_app, MyAppWeb.Endpoint, # => Endpoint runtime configuration
# => Settings loaded when app starts
secret_key_base: secret_key_base, # => Sets secret from environment
# => Required for session encryption
url: [host: System.get_env("PHX_HOST", "localhost"), port: 443, scheme: "https"]
# => URL configuration for generated links
# => host: reads PHX_HOST or defaults to localhost
# => port: 443 (HTTPS), scheme: "https"
# => Affects url_for/3 and route helpersKey Takeaway: Use config/runtime.exs to read secrets from environment variables at startup—unlike compile-time config, this enables the same release artifact to be deployed across environments.
Why It Matters: Separating compile-time config (config/*.exs) from runtime config (config/runtime.exs) is critical for deployable releases. Runtime config reads environment variables when the application starts, so the same release artifact works in staging and production with different credentials. This eliminates the need to rebuild for each deployment environment.
Group 12: Resilience & Observability
Example 66: Error Tracking and Structured Logging
Send errors to Sentry. Log with context for debugging.
# mix.exs
defp deps do
[
{:sentry, "~> 10.0"},
{:logger_backends, "~> 1.1"},
{:jason, "~> 1.4"}
]
end
# config/config.exs
config :sentry,
dsn: System.get_env("SENTRY_DSN"),
environment_name: Mix.env(),
enable_in_test: false
# lib/my_app_web/router.ex
defmodule MyAppWeb.Router do
use MyAppWeb, :router
# Sentry captures errors
use Sentry.PlugContext
end
# Structured logging
defmodule MyApp.Blog do # => Defines Blog context module
# => Demonstrates structured logging patterns
require Logger # => Imports Logger macros
# => Enables compile-time logging optimization
def create_post(attrs) do # => Public function: create_post/1
# => Param: attrs map with post data
Logger.info("Creating post", %{user_id: attrs["user_id"], title: attrs["title"]})
# => Logs informational message with structured metadata
# => First arg: string message
# => Second arg: metadata map with context
# => E.g., [info] Creating post user_id=123 title="Hello"
# => Structured data parseable by log aggregators
case MyApp.Repo.insert(changeset) do # => Attempts to insert post into database
# => Returns {:ok, post} or {:error, changeset}
{:ok, post} -> # => Success case: post created
# => post is inserted struct with ID
Logger.info("Post created successfully", %{post_id: post.id})
# => Logs success with post ID
# => Metadata includes database-generated ID
# => Enables tracking specific post through logs
{:ok, post} # => Returns success tuple to caller
# => Caller receives created post struct
{:error, changeset} -> # => Failure case: validation errors
# => changeset contains error details
Logger.warning("Post creation failed", %{ # => Logs warning (not error - expected case)
# => Warning level: operation failed but handled
errors: changeset.errors, # => Includes validation errors
# => E.g., [{:title, {"is required", []}}]
user_id: attrs["user_id"] # => Includes user context for debugging
# => Helps correlate failures to users
})
{:error, changeset} # => Returns error tuple to caller
# => Caller can display validation errors
end # => End case statement
rescue # => Rescue clause catches unexpected exceptions
# => Only for crashes, not {:error, ...} returns
e -> # => Binds exception to variable e
# => e is exception struct (e.g., ArithmeticError)
Logger.error("Post creation crashed", # => Logs error (unexpected crash)
# => Error level: requires investigation
error: inspect(e), # => Logs exception details
# => inspect/1 converts to readable string
# => E.g., %ArithmeticError{message: "divide by zero"}
stacktrace: Exception.format_stacktrace(__STACKTRACE__)
# => Logs formatted stack trace
# => __STACKTRACE__: built-in macro with call stack
# => Shows file, line, function for each frame
)
{:error, "Internal error"} # => Returns generic error to caller
# => Doesn't expose exception details to user
# => Full details in logs/Sentry
end # => End rescue clause
end # => End MyApp.Blog moduleKey Takeaway: Integrate Sentry for automatic exception capture and use Logger with metadata maps for structured logs; include request IDs in all log entries to enable distributed tracing.
Why It Matters: Structured logging with consistent correlation IDs lets you trace a single request across multiple log lines in centralized aggregators like Elasticsearch or Datadog. Sentry integration captures exception context — request params, user identity, stack trace — that plain log lines miss. Combining structured logs with error tracking reduces mean time to resolution (MTTR) because you can jump directly from an error alert to the correlated request trace, rather than grepping through unstructured log files.
Example 67: Rate Limiting with Token Bucket
Prevent abuse by limiting requests per IP or user. Token bucket algorithm refills over time.
%% Rate limiting state machine
stateDiagram-v2
[*] --> CheckBucket: Request arrives
CheckBucket --> HasTokens: Tokens available
CheckBucket --> NoTokens: Bucket empty
HasTokens --> ConsumeToken: Take 1 token
ConsumeToken --> AllowRequest: Process request
NoTokens --> RejectRequest: 429 Too Many Requests
AllowRequest --> [*]
RejectRequest --> [*]
note right of CheckBucket: Bucket refills over time
note right of NoTokens: Client must wait
defmodule MyApp.RateLimiter do # => GenServer for token bucket rate limiting
use GenServer # => OTP GenServer behavior
def start_link(opts) do # => Starts GenServer process
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
# => Start GenServer named globally
# => name: __MODULE__ registers by module name
# => Single global limiter shared across requests
end
def allow_request?(key, max_requests, time_window_ms) do
# => Public API: check if request is allowed
# => key: e.g., IP address or user ID
# => max_requests: e.g., 100 requests per window
# => time_window_ms: e.g., 60_000 for 60 seconds
GenServer.call(__MODULE__, {:check, key, max_requests, time_window_ms})
# => Synchronous call to GenServer
# => Blocks until GenServer replies
end
@impl true
def init(_opts) do # => GenServer initialization callback
{:ok, %{}} # => Initial state: empty map of {key => {count, reset_time}}
# => State grows as keys accumulate (consider periodic cleanup)
end
@impl true
def handle_call({:check, key, max_requests, time_window_ms}, _from, state) do
# => Check if key has remaining requests
# => _from: caller PID (unused, reply handles it)
now = System.monotonic_time(:millisecond) # => Current time in milliseconds
# => Monotonic clock never goes backward (unlike UTC)
{requests, state} = case Map.get(state, key) do
# => Look up existing rate limit record for this key
{count, reset_time} when reset_time > now ->
# => Window still active: reset_time is in the future
{count, state} # => Window still active
# => Return current count, state unchanged
_ -> # => Window expired or new key
# Reset bucket
reset_time = now + time_window_ms # => New window starts
# => Reset time = now + window duration
{0, Map.put(state, key, {0, reset_time})} # => Reset counter
# => Stores {count: 0, reset_time} for key
end # => Token bucket refills
if requests < max_requests do # => Check if under the limit
{count, reset_time} = Map.get(state, key)
# => Fetch current record to increment
new_state = Map.put(state, key, {count + 1, reset_time}) # => Consume token
# => Increments request count by 1
{:reply, :ok, new_state} # => Allow request
# => Replies :ok to caller, saves updated state
else
{:reply, :rate_limited, state} # => Reject request
# => state unchanged (don't record over-limit requests)
end # => Returns 429 Too Many Requests
end
end
# Plug for rate limiting
defmodule MyAppWeb.Plugs.RateLimit do # => Plug: applies rate limiting per IP
def init(opts), do: opts # => Compile-time options pass-through
# => opts set in router: plug RateLimit, max_requests: 100
def call(conn, opts) do # => Called for every request
max_requests = opts[:max_requests] || 100 # => Max requests per window (default: 100)
time_window = opts[:time_window] || 60_000 # => Time window in ms (default: 60s)
# => 60_000ms = 60 seconds = 1 minute
key = conn.remote_ip |> Tuple.to_list() |> Enum.join(".")
# => Build string key from IP: "127.0.0.1"
# => conn.remote_ip is {127, 0, 0, 1} tuple
# => Tuple.to_list/1 -> [127, 0, 0, 1]
# => Enum.join(".") -> "127.0.0.1"
case MyApp.RateLimiter.allow_request?(key, max_requests, time_window) do
# => Check if IP is under rate limit
:ok -> conn # => Under limit: continue to action
:rate_limited -> # => Over limit: return error response
conn
|> put_status(:too_many_requests) # => HTTP 429 status code
|> json(%{error: "Rate limit exceeded"}) # => JSON error body
|> halt() # => Stop plug pipeline, send response
end
end
endKey Takeaway: Token bucket rate limiting with a GenServer allows per-IP and per-user enforcement; the plug returns HTTP 429 when the bucket is empty and stores counters in process state for low-latency checks.
Why It Matters: The token bucket algorithm allows controlled bursting — a user can make several rapid requests as long as they have tokens — unlike fixed-window rate limiting that resets abruptly and allows double-rate spikes at window boundaries. Storing counters in ETS avoids database round trips on every request. Rate limiting at the plug level protects downstream services from abusive clients and reduces DDoS impact, while per-user limits prevent one heavy user from degrading service for others.
Example 68: Distributed Phoenix Clustering
Connect multiple Phoenix instances for distributed state and fault tolerance.
%% Distributed Phoenix cluster
graph TD
A[Load Balancer] --> B[Phoenix Node 1]
A --> C[Phoenix Node 2]
A --> D[Phoenix Node 3]
B <-->|Distributed Erlang| C
C <-->|Distributed Erlang| D
B <-->|Distributed Erlang| D
B --> E[PubSub broadcasts to all nodes]
C --> E
D --> E
E --> F[PostgreSQL]
style A fill:#0173B2,color:#fff
style B fill:#029E73,color:#fff
style C fill:#029E73,color:#fff
style D fill:#029E73,color:#fff
style F fill:#DE8F05,color:#fff
# mix.exs
defp deps do # => Project dependencies
[
{:libcluster, "~> 3.3"}, # => Automatic node clustering library
# => Supports Kubernetes DNS, Gossip, Static strategies
{:observer_cli, "~> 1.7"} # => Terminal-based cluster observer
# => Shows node connections, process counts
]
end
# config/config.exs
config :libcluster, # => Configure node discovery for Kubernetes
topologies: [
example: [
strategy: Cluster.Strategy.Kubernetes.DNS, # => Auto-discover in K8s
# => Queries DNS for pod IPs
# => K8s headless service returns all pod IPs
config: [
service: "my_app-headless", # => Headless service name
# => DNS: my_app-headless.default.svc.cluster.local
namespace: "default" # => K8s namespace
# => Matches deployed namespace in manifests
]
]
] # => Nodes connect automatically via DNS
# Or with fixed IPs
config :libcluster, # => Static topology for non-K8s
topologies: [
fixed: [
strategy: Cluster.Strategy.Static, # => Fixed node list
# => Nodes listed explicitly, no auto-discovery
config: [
nodes: [:"app1@10.0.0.1", :"app2@10.0.0.2", :"app3@10.0.0.3"]
# => Node names: name@ip format
# => name must match --name VM flag at startup
]
]
]
# lib/my_app/cluster.ex
defmodule MyApp.Cluster do # => Cluster manager supervisor
def start_link(opts) do # => Called by Application supervisor
Supervisor.start_link(
[{:libcluster, Cluster.Strategy.Kubernetes.DNS, [topologies: topologies()]}],
# => Start libcluster with configured topology
# => libcluster watches DNS and connects new nodes
opts
)
end
defp topologies do # => Private helper to read topologies config
Application.get_env(:libcluster, :topologies) # => Read topology config
# => Returns the topologies map from config/config.exs
end
end
# Distributed PubSub across nodes
broadcast_all = fn event, data -> # => Anonymous function for cross-node broadcast
MyApp.PubSub # => Local PubSub server name
|> Phoenix.PubSub.broadcast("topic", {:event, event, data})
# => Phoenix.PubSub.broadcast/3 sends to all subscribers
# => Sends to ALL nodes in cluster
# => PubSub automatically routes to all cluster nodes
# => Each node delivers to its own LiveView subscribers
end # => Every connected LiveView receives update
# List connected nodes
Node.list() # => Returns [:"app2@10.0.0.2", :"app3@10.0.0.3"]
# => Lists all currently connected Erlang nodes
# => Empty list [] means isolated (no cluster)Key Takeaway: libcluster automates node discovery in Kubernetes and bare-metal environments; once connected, Phoenix.PubSub broadcasts messages to all nodes so WebSocket updates reach any pod in the cluster.
Why It Matters: libcluster automates node discovery so newly started pods join the cluster without manual configuration, enabling auto-scaling. Once nodes are connected, Phoenix.PubSub broadcasts messages across all nodes so WebSocket messages reach users connected to any pod. Erlang distribution requires correct cookie configuration and EPMD port access — misconfigured cluster security is a common production issue where nodes silently fail to communicate despite appearing healthy individually.
Example 69: WebSocket Load Balancing with Sticky Sessions
Route WebSocket connections to same server. Use load balancer affinity.
# nginx.conf - Sticky sessions by IP
upstream my_app { # => Defines upstream server group "my_app"
# => Load balancer distributes requests across these
server app1:4000; # => Backend server 1 at app1:4000
# => Phoenix node 1
server app2:4000; # => Backend server 2 at app2:4000
# => Phoenix node 2
server app3:4000; # => Backend server 3 at app3:4000
# => Phoenix node 3
hash $remote_addr consistent; # => Sticky by IP
# => Routes same IP to same backend server
# => consistent: uses consistent hashing
# => Required for WebSocket persistence
} # => End upstream block
server { # => Defines HTTP server block
# => Handles incoming requests
listen 80; # => Listens on port 80 (HTTP)
# => Standard HTTP port
server_name my_app.com; # => Server domain name
# => Only processes requests for my_app.com
location / { # => Matches all paths (/)
# => Applies to all routes
proxy_pass http://my_app; # => Proxies requests to my_app upstream
# => Load balances across app1/app2/app3
proxy_http_version 1.1; # => Uses HTTP/1.1 protocol
# => Required for WebSocket upgrades
# WebSocket headers
proxy_set_header Upgrade $http_upgrade; # => Forwards Upgrade header
# => Enables WebSocket protocol upgrade
# => $http_upgrade: request's Upgrade value
proxy_set_header Connection "upgrade"; # => Sets Connection header to "upgrade"
# => Required for WebSocket handshake
# => Tells backend to upgrade connection
# Preserve client IP
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# => Forwards client IP address
# => Appends to X-Forwarded-For chain
# => Phoenix reads from this header
proxy_set_header X-Real-IP $remote_addr; # => Sets original client IP
# => $remote_addr: nginx's client IP variable
# => Phoenix uses for rate limiting/logging
} # => End location block
} # => End server block
# HAProxy.cfg - Also supports sticky sessions
backend phoenix_nodes # => Defines HAProxy backend group
# => Alternative to nginx for sticky sessions
balance roundrobin # => Load balancing algorithm: round robin
# => Distributes requests evenly (with cookie affinity)
cookie SERVERID insert indirect nocache # => Cookie-based sticky sessions
# => insert: HAProxy adds SERVERID cookie
# => indirect: doesn't forward cookie to backend
# => nocache: prevents caching of Set-Cookie
server node1 app1:4000 check cookie node1 # => Backend server 1
# => check: health checks enabled
# => cookie node1: assigns cookie value "node1"
# => Clients get SERVERID=node1 cookie
server node2 app2:4000 check cookie node2 # => Backend server 2
# => Clients routed here get SERVERID=node2
server node3 app3:4000 check cookie node3 # => Backend server 3
# => HAProxy routes based on cookie value
# => WebSockets stay on same serverKey Takeaway: WebSocket connections must stay on the same server—configure sticky sessions in nginx via IP hash or cookie affinity, and forward X-Forwarded-For headers for accurate client identification.
Why It Matters: WebSocket connections require sticky sessions because the stateful connection must reach the same server on reconnect. Without sticky routing, a reconnecting client may land on a different node that knows nothing about that user's channel subscriptions. Proper load balancer configuration is a prerequisite for scaling any Phoenix application with real-time features.
Example 70: Blue-Green Deployment for Zero-Downtime Releases
Run two production environments. Switch traffic after verifying new version works.
%% Blue-green deployment flow
graph TD
A[Current: Blue v1.0] --> B[Deploy Green v1.1]
B --> C[Test Green in isolation]
C --> D{Tests pass?}
D -->|Yes| E[Switch traffic to Green]
D -->|No| F[Keep Blue, fix Green]
E --> G[Green is now production]
G --> H[Blue becomes standby]
H --> I[Rollback available]
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#fff
style E fill:#029E73,color:#fff
style F fill:#CA9161,color:#fff
style I fill:#CC78BC,color:#fff
# Blue-Green deployment with two releases
#!/bin/bash # => Bash script for blue-green deployment
# => Zero-downtime deployment strategy
# Current version (blue)
BLUE_VERSION=$(aws ecs describe-services --cluster prod --services my-app | \
jq -r '.services[0].taskDefinition' | \
grep -oP 'my-app:\K[^:]+') # => Gets current running version from AWS ECS
# => Queries ECS service task definition
# => Extracts version number from task definition
# => E.g., "my-app:v1.2.3" → BLUE_VERSION="v1.2.3"
# New version (green)
GREEN_VERSION=$(git describe --tags --always) # => Gets new version from Git tags
# => Uses latest Git tag or commit SHA
# => E.g., "v1.3.0" or "abc123def"
# Build and deploy green version
mix release # => Builds Elixir production release
# => Creates _build/prod/rel/my_app/
# => Self-contained package
docker build -t my-app:$GREEN_VERSION . # => Builds Docker image with new version tag
# => Uses Dockerfile in current directory
# => Tags image with GREEN_VERSION
# => E.g., my-app:v1.3.0
docker tag my-app:$GREEN_VERSION my-app:green # => Tags image with "green" alias
# => Creates second tag for same image
# => my-app:green always points to latest deployment
docker push my-app:green # => Pushes image to Docker registry
# => Uploads to ECR/DockerHub
# => Makes available to ECS cluster
# Register new task definition
aws ecs register-task-definition \
--family my-app \
--container-definitions "[{\"image\": \"my-app:$GREEN_VERSION\", ...}]"
# => Registers new ECS task definition
# => Task definition: container specs (image, CPU, memory)
# => Creates new revision with green version
# => E.g., my-app:47 (revision number increments)
# Update service to use green
aws ecs update-service \
--cluster prod \
--service my-app \
--task-definition my-app:$GREEN_VERSION # => Updates ECS service to use new task definition
# => Triggers deployment of green version
# => ECS starts new tasks with green containers
# => Blue tasks still running (zero downtime)
# Wait for deployment
aws ecs wait services-stable \
--cluster prod \
--services my-app # => Blocks until deployment completes
# => Waits for all green tasks to be healthy
# => Waits for old blue tasks to drain
# => Timeout after 10 minutes if not stable
# Test green version
curl http://green.my-app.com/health/ready # => Tests green version health endpoint
# => Verifies application is responding
# => Checks database connectivity
# => Returns HTTP 200 if ready
if [ $? -eq 0 ]; then # => Checks if curl succeeded (exit code 0)
# => $? is exit code of previous command
# Switch traffic from blue to green
aws route53 change-resource-record-sets \
--hosted-zone-id Z123 \
--change-batch "[{\"Action\": \"UPSERT\", \"ResourceRecordSet\": {\"Name\": \"my-app.com\", \"Type\": \"A\", \"TTL\": 60, \"ResourceRecords\": [{\"Value\": \"green-alb.aws.com\"}]}}]"
# => Updates DNS to point to green load balancer
# => UPSERT: creates or updates DNS record
# => Changes my-app.com A record to green ALB
# => TTL 60s: DNS caches for 1 minute
# => Traffic gradually shifts to green
echo "Deployment successful!" # => Logs success message
# => Green version now serving production traffic
else
# Rollback to blue
aws ecs update-service \
--cluster prod \
--service my-app \
--task-definition my-app:$BLUE_VERSION # => Rolls back to blue version
# => Updates service to use previous task definition
# => Keeps blue running, stops green
# => Fast rollback without re-deployment
fi # => End if statementKey Takeaway: Blue-green deployments run two environments side-by-side so you can verify the new version before switching traffic—enabling instant rollback by pointing the load balancer back to the old environment.
Why It Matters: prep_stop/1 and stop/1 OTP callbacks give your application control over the shutdown sequence, ensuring in-flight requests complete before the BEAM terminates. Without graceful shutdown, Kubernetes rolling deployments can result in 502 errors as the load balancer continues routing requests to a pod that's already shutting down. Setting shutdown_timeout in your Mix release configuration prevents deployment-induced request failures and makes zero-downtime deployments reliable.
Example 71: Custom Ecto Types for Domain Logic
Create custom Ecto types to encapsulate domain logic and validation.
# lib/my_app/ecto_types/money.ex
defmodule MyApp.EctoTypes.Money do
use Ecto.Type
def type, do: :map # => Stored as JSONB in PostgreSQL
# Cast from user input
def cast(%{"amount" => amount, "currency" => currency}) when is_number(amount) do
{:ok, %{amount: amount, currency: currency}} # => Valid money struct
end
def cast(_), do: :error # => Invalid format
# Load from database
def load(%{"amount" => amount, "currency" => currency}) do
{:ok, %{amount: Decimal.new(amount), currency: currency}} # => Convert to Decimal
end
# Dump to database
def dump(%{amount: amount, currency: currency}) do
{:ok, %{"amount" => Decimal.to_float(amount), "currency" => currency}}
# => Store as JSON: {"amount": 19.99, "currency": "USD"}
end
def dump(_), do: :error
end
# Usage in schema
defmodule MyApp.Shop.Product do # => Defines Product schema using custom Money type
# => Demonstrates domain logic encapsulation
use Ecto.Schema # => Imports Ecto.Schema macros
# => Provides schema/2, field/3, etc.
schema "products" do # => Maps to "products" table
# => Auto-generates id, inserted_at, updated_at
field :name, :string # => Standard string field for product name
# => Will be accessible as product.name
field :price, MyApp.EctoTypes.Money # => Custom type
# => Uses Money Ecto type defined above
# => Handles validation, casting, db conversion
# => Accessible as product.price (map with amount/currency)
timestamps() # => Adds inserted_at, updated_at fields
# => Auto-managed by Ecto
end # => End schema definition
def changeset(product, attrs) do # => Public function: changeset/2
# => Validates and casts product attributes
product # => Starts with Product struct
# => Either new %Product{} or existing from database
|> cast(attrs, [:name, :price]) # => Cast price with custom type
# => Casts attrs to allowed fields
# => price calls Money.cast/1 automatically
# => Returns changeset
|> validate_required([:name, :price]) # => Validates required fields
# => Adds errors if name or price missing
|> validate_price() # => Domain validation
# => Custom business rule validation
end # => End changeset/2 function
defp validate_price(changeset) do # => Private function: validate_price/1
# => Domain-specific validation logic
case get_change(changeset, :price) do # => Gets changed price value
# => Returns nil if price unchanged
# => Returns money map if changed
%{amount: amount} when amount < 0 -> # => Pattern match: negative price
# => Guard: amount < 0
add_error(changeset, :price, "must be positive") # => Business rule
# => Adds validation error
# => Error: {:price, {"must be positive", []}}
# => Prevents negative prices
_ -> # => Catch-all: price valid or unchanged
# => Either positive amount or nil
changeset # => Returns unchanged changeset
# => No validation errors
end # => End case statement
end # => End validate_price/1 function
end # => End MyApp.Shop.Product module
# Create product
%Product{} # => Creates empty Product struct
# => %Product{name: nil, price: nil}
|> Product.changeset(%{ # => Builds changeset with attributes
# => Validates and casts input
name: "Widget", # => Product name
# => Simple string value
price: %{"amount" => 19.99, "currency" => "USD"} # => Input format
# => Plain map (user input format)
# => Money.cast/1 converts to internal format
})
|> Repo.insert() # => Inserts product into database
# => Stores: {"amount": 19.99, "currency": "USD"} in database
# => Money.dump/1 converts to JSONB
# => Database column stores JSON
# => Returns {:ok, %Product{}} on successKey Takeaway: Custom Ecto types implement cast/1, load/1, dump/1, and type/0 to encapsulate domain conversions—always use Decimal for money to avoid floating-point rounding errors.
Why It Matters: Custom Ecto types enforce domain invariants at the data layer—a Money type ensures currency amounts are always stored as integer cents, a Status type restricts values to a defined atom set, and an Encrypted type transparently encrypts sensitive fields. Centralizing these conversions in a type module means the business logic never handles raw database representations, and schema changes propagate automatically to all contexts that use the type.
Example 72: Database Fragments for Advanced Queries
Use Ecto fragments for PostgreSQL-specific features and complex SQL.
# Full-text search with tsvector
defmodule MyApp.Blog do # => Blog context module
import Ecto.Query # => Ecto query DSL
def search_posts(search_term) do # => PostgreSQL full-text search
from p in Post,
where: fragment(
"to_tsvector('english', ?) @@ plainto_tsquery('english', ?)",
# => to_tsvector: index document as word tokens
# => @@: match operator between tsvector and tsquery
p.title, # => Search in title
^search_term # => User's search query
),
order_by: fragment(
"ts_rank(to_tsvector('english', ?), plainto_tsquery('english', ?)) DESC",
# => ts_rank: float 0.0-1.0 indicating match quality
p.title,
^search_term # => Rank by relevance
),
select: %{
post: p, # => Include full post struct
rank: fragment("ts_rank(to_tsvector('english', ?), plainto_tsquery('english', ?))",
p.title, ^search_term) # => Include rank in results
}
|> Repo.all()
# => SELECT posts.*, ts_rank(...) FROM posts WHERE to_tsvector(...) @@ plainto_tsquery(...)
end
# JSONB queries
def posts_by_metadata(key, value) do # => Query JSONB column with arrow operator
from p in Post,
where: fragment("? ->> ? = ?", p.metadata, ^key, ^value)
# => ->>: extract JSONB value as text
# => p.metadata is PostgreSQL JSONB column
# => SELECT * FROM posts WHERE metadata->>'status' = 'draft'
|> Repo.all()
end
# Array contains
def posts_with_tag(tag) do # => Query JSONB array for tag membership
from p in Post,
where: fragment("? @> ?::jsonb", p.tags, ^[tag]) # => Array contains tag
# => @>: PostgreSQL "contains" operator
# => ^[tag]: convert Elixir list to JSONB array
|> Repo.all()
# => SELECT * FROM posts WHERE tags @> '["elixir"]'::jsonb
end
# Window functions
def posts_with_rank do # => Public function: posts_with_rank/0
# => Uses PostgreSQL window functions
from p in Post, # => Starts query from Post table
# => p is alias for posts
select: %{ # => Selects map with post and ranking fields
# => Returns list of maps, not Post structs
post: p, # => Includes full post struct
# => All post fields available
row_number: fragment("ROW_NUMBER() OVER (ORDER BY ? DESC)", p.inserted_at),
# => Window function: assigns sequential numbers
# => Assign sequential number
# => ROW_NUMBER(): 1, 2, 3... (no ties)
# => OVER: defines window (all rows)
# => ORDER BY inserted_at DESC: newest first
# => E.g., newest post = 1, second newest = 2
rank: fragment("RANK() OVER (ORDER BY ? DESC)", p.view_count)
# => Window function: ranks by view count
# => Rank by view count (ties get same rank)
# => RANK(): ties get same rank, next skips
# => E.g., 1000 views = rank 1, 1000 views = rank 1, 500 views = rank 3
# => ORDER BY view_count DESC: most views first
} # => End select map
|> Repo.all() # => Executes query
# => Returns list of maps with post, row_number, rank
end # => End posts_with_rank/0 function
# Common Table Expressions (CTEs)
def posts_with_comment_count do # => Public function: posts_with_comment_count/0
# => Demonstrates CTE (WITH clause) usage
comments_cte = from c in Comment, # => Defines CTE: counts comments per post
# => c is alias for comments table
group_by: c.post_id, # => Groups comments by post_id
# => Aggregates all comments for each post
select: %{post_id: c.post_id, count: count(c.id)}
# => Selects post_id and comment count
# => count(c.id): number of comments
# => Result: %{post_id: 1, count: 5}, %{post_id: 2, count: 3}
from p in Post, # => Main query from posts table
# => Joins with CTE result
left_join: c in subquery(comments_cte), on: c.post_id == p.id,
# => LEFT JOIN with CTE subquery
# => subquery: executes comments_cte inline
# => LEFT JOIN: includes posts with 0 comments
# => ON: matches post ID
select: %{post: p, comment_count: coalesce(c.count, 0)}
# => Selects post and comment count
# => coalesce(c.count, 0): returns 0 if NULL
# => Handles posts with no comments (LEFT JOIN NULL)
|> Repo.all() # => Executes query
# => WITH comments AS (...) SELECT posts.*, COALESCE(comments.count, 0)
# => Generates SQL with CTE
# => Result: [%{post: %Post{}, comment_count: 5}, ...]
end # => End posts_with_comment_count/0 function
end # => End moduleKey Takeaway: fragment/1 lets you write raw SQL for database-specific features like window functions and CTEs; always use ^-pinned parameters—never string interpolation—to prevent SQL injection.
Why It Matters: Ecto.Query.fragment/1 injects raw SQL into otherwise composable Ecto queries, giving access to database-specific functions like PostgreSQL's to_tsvector/tsquery for full-text search, earthdistance for geospatial queries, and window functions. The tradeoff is portability: fragments couple your queries to a specific database engine. Using fragments deliberately for performance-critical paths, while keeping business logic in portable Ecto syntax, balances power with maintainability.
Example 73: Query Profiling with Telemetry
Measure query performance to identify slow queries in production.
# lib/my_app/telemetry.ex
defmodule MyApp.Telemetry do # => Centralized telemetry handler
require Logger # => Import Logger macros
def attach_handlers do # => Register all handlers at startup
# => Call once at application boot
:telemetry.attach(
"my-app-ecto-query", # => Handler ID (must be unique globally)
# => Used to detach or identify the handler
[:my_app, :repo, :query], # => Event name
# => Ecto emits this for every query
# => Pattern: [:app_name, :repo_name, :query]
&__MODULE__.handle_query/4, # => Handler function
# => Called with (event, measurements, metadata, config)
nil # => No extra config
# => Passed as 4th arg to handler function
)
end
def handle_query(_event, measurements, metadata, _config) do
# => Called after each Ecto query
query_time = measurements.total_time # => Time in native units
query_time_ms = System.convert_time_unit(query_time, :native, :millisecond)
# => Convert to milliseconds
if query_time_ms > 100 do # => Slow query threshold
Logger.warning("Slow query detected",
query: inspect(metadata.query), # => SQL query
params: inspect(metadata.params), # => Query parameters
time_ms: query_time_ms, # => Execution time
source: metadata.source # => Table name
)
# => Log: Slow query detected query="SELECT ..." time_ms=250
end
# Send to monitoring service
if query_time_ms > 1000 do # => Very slow (>1s)
# => Critical threshold: > 1 second
MyApp.Monitoring.report_slow_query(%{
query: metadata.query, # => SQL text
# => Full SQL with placeholders
time_ms: query_time_ms, # => Duration in milliseconds
type: metadata.type # => :ecto_sql_query
# => Or :ecto_sql_sandbox for test queries
})
end
end
end
# In application.ex
def start(_type, _args) do # => Called at application startup
MyApp.Telemetry.attach_handlers() # => Attach on startup
# => Must run before first query
children = [
MyApp.Repo, # => Database connection pool
MyAppWeb.Endpoint # => HTTP/WebSocket server
]
Supervisor.start_link(children, strategy: :one_for_one)
# => Restart children independently on failure
end
# Query all telemetry events
defmodule MyApp.QueryStats do # => GenServer for aggregating query metrics
use GenServer # => OTP GenServer behavior
def start_link(opts) do # => Starts the stats GenServer process
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
# => Register globally by module name
# => Accessible as MyApp.QueryStats from any process
end
def init(_opts) do # => Initialize GenServer state
:telemetry.attach_many( # => attach_many registers multiple handlers at once
"query-stats", # => Unique handler ID
[
[:my_app, :repo, :query], # => Ecto queries
[:phoenix, :router_dispatch, :stop], # => HTTP requests
[:phoenix, :endpoint, :stop] # => Endpoint timing
],
&__MODULE__.handle_event/4,
%{stats: %{}}
)
{:ok, %{query_count: 0, total_time: 0}} # => Initial state: zero stats
end
def handle_event([:my_app, :repo, :query], measurements, _metadata, state) do
# => Called for each Ecto query
# => Pattern match on event name tuple
state = %{
query_count: state.query_count + 1, # => Increment counter
# => Total queries since server start
total_time: state.total_time + measurements.total_time
# => Accumulate total time in native units
# => Use :native for precision (convert on read)
}
{:ok, state} # => Update GenServer state
# => Return value from handle_event (not handle_call!)
end
def get_stats do # => Public API to retrieve aggregated stats
GenServer.call(__MODULE__, :get_stats) # => Retrieve stats
# => Synchronous call, blocks until reply
end
def handle_call(:get_stats, _from, state) do # => Synchronous stats fetch
avg_time = if state.query_count > 0 do
state.total_time / state.query_count # => Average query time
# => Float division: total / count
else
0 # => No queries yet
# => Avoids division by zero
end
{:reply, %{
query_count: state.query_count, # => Total queries executed
avg_time_ms: System.convert_time_unit(avg_time, :native, :millisecond)
# => Convert :native units to milliseconds
}, state} # => Return reply, preserve state
end
endKey Takeaway: Attach to [:my_app, :repo, :query] telemetry events to log slow queries (>100ms), track per-operation timing, and surface N+1 patterns before they cause production degradation.
Why It Matters: Query profiling telemetry catches N+1 problems, missing indexes, and unexpectedly slow operations before they become user-visible incidents. Attaching to [:my_app, :repo, :query] events gives you query duration, the SQL text, and the source call site in one handler—no ORM-level monkey-patching required. Setting alert thresholds on p95 query duration creates automated regression detection so new code that accidentally introduces slow queries fails CI before deployment.
Example 74: Advanced LiveView Performance Optimization
Optimize LiveView rendering with targeted updates and efficient assigns.
defmodule MyAppWeb.DashboardLive do # => LiveView for performance-optimized dashboard
use Phoenix.LiveView # => LiveView behavior
# Temporary assigns - not tracked for diff
def mount(_params, _session, socket) do # => Called on first render (HTTP + WS)
if connected?(socket) do # => Only start timer on WebSocket connection
:timer.send_interval(1000, self(), :tick) # => Update every second
end # => Erlang timer sends :tick message each second
{:ok,
socket
|> assign(:time, DateTime.utc_now()) # => Tracked assign
# => LiveView diffs this on each update
|> assign(:posts, load_posts()) # => Initial posts list
|> assign_new(:expensive_data, fn -> load_expensive_data() end)}
# => Only computed on first mount, cached after
# => assign_new: only calls fn if key not present
end
# Use temporary_assigns to avoid diffing large data
def handle_info(:tick, socket) do # => Handles :tick from timer
{:noreply,
socket
|> assign(:time, DateTime.utc_now()) # => Only updates time
|> push_event("time-update", %{time: DateTime.to_string(socket.assigns.time)})}
# => Send JS event instead of re-rendering
# => push_event bypasses LiveView diff
end
# Optimize list rendering with streams
def handle_event("load_more", _params, socket) do
# => Handle "Load More" button click
new_posts = load_more_posts(socket.assigns.last_id)
# => Fetch next page using cursor
{:noreply, stream_insert(socket, :posts, new_posts)} # => Append to stream
# => Only new items rendered, existing items unchanged
end
# Debounce user input
def handle_event("search", %{"query" => query}, socket) do
# => Fires on every keystroke
Process.send_after(self(), {:search, query}, 300) # => Debounce 300ms
# => Cancels if another search arrives in 300ms
{:noreply, assign(socket, :search_query, query)} # => Store latest query for comparison
end
def handle_info({:search, query}, socket) do # => Debounced search fires after 300ms quiet
if socket.assigns.search_query == query do # => Check still current
# => User may have typed more in 300ms window
results = search_posts(query) # => Execute search
{:noreply, assign(socket, :search_results, results)}
else
{:noreply, socket} # => User typed more, skip
end
end
# Use update/3 for atomic updates
def handle_event("increment_likes", %{"post_id" => id}, socket) do
# => Handle like button click
{:noreply,
update(socket, :posts, fn posts -> # => update/3: modify assign in place
Enum.map(posts, fn post ->
if post.id == id do
%{post | likes: post.likes + 1} # => Update only this post
else
post # => Keep unchanged
end
end)
end)}
end
# Render only changed sections
def render(assigns) do # => Render callback: generates HTML
# => Called whenever assigns change
~H""" # => HEEx template (HTML + Elixir)
# => Compiled to efficient diff tracking
<div>
<.header time={@time} /> # => Separate component
# => Function component: header/1
# => Only re-renders when @time changes
# => Passes time={@time} as assign
<div id="posts" phx-update="stream"> # => Stream container for efficient list rendering
# => phx-update="stream": only patches changed items
# => No full list re-render on updates
<%= for {dom_id, post} <- @streams.posts do %>
# => Iterates over stream items
# => dom_id: unique DOM ID per item
# => post: actual post struct
<.post_card id={dom_id} post={post} /> # => Component per post
# => id={dom_id}: stable DOM ID for diffing
# => Only changed posts re-render
<% end %>
</div>
<.footer /> # => Static component
# => Never re-renders (no dynamic assigns)
</div>
"""
end # => End render/1 function
# Function components for granular updates
def header(assigns) do # => Function component: header/1
# => Isolated template for header section
~H""" # => HEEx template
# => LiveView diffs only this section
<header>
<time><%= @time %></time> # => Only this re-renders
# => When @time changes, only <time> updates
# => Rest of page unchanged (efficient)
</header>
"""
end # => End header/1 component
end # => End MyAppWeb.DashboardLive moduleKey Takeaway: Combine assign_new/3 for lazy computation, streams for list efficiency, phx-debounce for event throttling, and push_event for client-side callbacks to build high-performance LiveView interfaces.
Why It Matters: LiveView performance problems compound: a single missed assign_new call on a high-traffic page can double database queries. assign_new prevents redundant data fetching on reconnect, streams replace full list re-renders with targeted DOM patches for large collections, and push_event offloads animations to client JS without server round trips. Measuring DOM diff size with telemetry before optimizing identifies which pattern actually reduces latency rather than applying all techniques speculatively.
Example 75: Production Debugging with Observer and LiveDashboard
Debug production issues with Observer, LiveDashboard, and remote IEx.
# Connect to remote production node
# On local machine:
# iex --name debug@127.0.0.1 --cookie production_cookie
# => Start local IEx with a distributed name and production cookie
# In IEx session:
Node.connect(:"my_app@production-server.com") # => Connect to prod node
# => Enables remote calls and process inspection
Node.list() # => ["my_app@production-server.com"]
# => Verify connection succeeded
# Inspect running processes
Process.list() # => All Erlang processes
|> Enum.filter(fn pid ->
case Process.info(pid, :current_function) do
{:current_function, {module, _fun, _arity}} ->
# => Get module from current function info
String.starts_with?(to_string(module), "Elixir.MyApp")
# => Keep only your app's processes
_ -> false
end
end)
|> Enum.map(fn pid ->
{pid, Process.info(pid, [:memory, :message_queue_len, :current_function])}
# => Gather memory and queue info per process
end)
# => [{#PID<0.123.0>, [memory: 12345, message_queue_len: 0, ...]}, ...]
# Find process by name
pid = Process.whereis(MyApp.Worker) # => #PID<0.456.0>
Process.info(pid) # => Full process info
# => Shows memory, links, status, etc.
# Check process mailbox
Process.info(pid, :message_queue_len) # => {message_queue_len, 1000}
# => If high, process is backed up
# Inspect state of GenServer
:sys.get_state(pid) # => Current state map
# => %{pending: [], processing: 10, errors: []}
# Trace function calls
:dbg.tracer() # => Start tracer
:dbg.p(:all, :c) # => Trace all processes
:dbg.tp(MyApp.Blog, :create_post, 2, []) # => Trace function
# => Prints every call to MyApp.Blog.create_post/2
# LiveDashboard metrics
defmodule MyAppWeb.Telemetry do
def metrics do
[
# VM Metrics
last_value("vm.memory.total", unit: {:byte, :megabyte}),
last_value("vm.total_run_queue_lengths.total"), # => Scheduler queue
last_value("vm.total_run_queue_lengths.cpu"),
# Database Metrics
summary("my_app.repo.query.total_time",
unit: {:native, :millisecond},
tags: [:source, :command] # => Group by table/command
),
counter("my_app.repo.query.count"), # => Total queries
# Phoenix Metrics
summary("phoenix.router_dispatch.stop.duration",
tags: [:route],
unit: {:native, :millisecond}
),
counter("phoenix.router_dispatch.stop.count",
tags: [:route, :status] # => Requests by route/status
),
# Custom Business Metrics
counter("my_app.orders.created.count"), # => Business events
last_value("my_app.users.active.count"),
distribution("my_app.checkout.duration",
buckets: [100, 500, 1000, 5000] # => Histogram buckets
)
]
end
end
# Memory leak detection
defmodule MyApp.MemoryMonitor do # => GenServer that monitors memory usage
use GenServer # => OTP GenServer behavior
require Logger # => Logging macros
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
# => Start named process for easy access
end
def init(_opts) do # => Initialize with baseline memory
:timer.send_interval(60_000, :check_memory) # => Check every minute
{:ok, %{baseline: get_memory()}} # => Record initial memory as baseline
end # => Will compare future readings against baseline
def handle_info(:check_memory, state) do # => Called every 60 seconds
current = get_memory()
diff = current - state.baseline
if diff > 100_000_000 do # => >100MB increase
Logger.warning("Memory leak detected",
baseline_mb: div(state.baseline, 1_000_000),
current_mb: div(current, 1_000_000),
diff_mb: div(diff, 1_000_000)
)
# Get top memory consumers
top_processes = Process.list()
|> Enum.map(fn pid ->
{pid, Process.info(pid, :memory)}
end)
|> Enum.sort_by(fn {_pid, {:memory, mem}} -> mem end, :desc)
|> Enum.take(10) # => Top 10 processes
Logger.warning("Top memory consumers", processes: top_processes)
end
{:noreply, state}
end
defp get_memory do
:erlang.memory(:total) # => Total VM memory
end
endKey Takeaway: Connect a remote IEx shell to running production nodes with --remsh; use :sys.get_state/1 for process inspection, :dbg for tracing, and LiveDashboard for real-time cluster metrics.
Why It Matters: Remote IEx sessions let you inspect live production state without restarting or redeploying—essential when you cannot reproduce a bug locally. :sys.get_state/1 reads GenServer internals non-destructively, :dbg traces function calls in real time with minimal overhead, and LiveDashboard shows aggregate cluster metrics. Together these tools reduce mean time to diagnosis from hours of log analysis to minutes of targeted introspection.
Example 76: Security Best Practices
Implement security headers, CSRF protection, and input sanitization.
# Security headers in endpoint
defmodule MyAppWeb.Endpoint do # => Phoenix application endpoint
use Phoenix.Endpoint, otp_app: :my_app # => Register as endpoint for :my_app
plug Plug.Static, at: "/", from: :my_app # => Serve static assets from priv/static
# Security headers
plug :put_secure_headers # => Add security headers to every response
defp put_secure_headers(conn, _opts) do # => Private plug: sets security headers
conn
|> put_resp_header("x-frame-options", "DENY") # => Prevent clickjacking
# => Blocks all iframe embedding
|> put_resp_header("x-content-type-options", "nosniff") # => Prevent MIME sniffing
# => Forces browser to respect declared content-type
|> put_resp_header("x-xss-protection", "1; mode=block") # => XSS protection
# => Blocks page if XSS detected (legacy browsers)
|> put_resp_header("strict-transport-security",
"max-age=31536000; includeSubDomains") # => Force HTTPS
# => max-age: 1 year HSTS policy
|> put_resp_header("content-security-policy",
"default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'")
# => CSP policy
# => 'self': only load resources from same origin
|> put_resp_header("referrer-policy", "strict-origin-when-cross-origin")
# => Only send referrer for same-origin HTTPS requests
end
plug Plug.RequestId # => Add request ID header
# => Enables tracing across distributed logs
plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint]
# => Emit telemetry for request timing
plug Plug.Parsers,
parsers: [:urlencoded, :multipart, :json], # => Parse form, upload, and JSON bodies
pass: ["*/*"], # => Accept all content types
json_decoder: Jason # => Use Jason for JSON decoding
plug Plug.MethodOverride # => Support _method param
# => Allows PUT/DELETE via HTML forms
plug Plug.Head # => Handle HEAD requests
plug Plug.Session, @session_options # => Encrypted sessions
# => @session_options configured in endpoint
plug MyAppWeb.Router # => Forward to router for dispatch
end
# CSRF protection (automatic in Phoenix)
defmodule MyAppWeb.Router do # => Request routing module
pipeline :browser do # => Browser request pipeline
plug :accepts, ["html"] # => Accept HTML responses
plug :fetch_session # => Load session from cookie
plug :fetch_live_flash # => Load LiveView flash messages
plug :put_root_layout, {MyAppWeb.Layouts, :root} # => Wrap responses in root layout
plug :protect_from_forgery # => CSRF token validation
# => Checks csrf_token on POST/PUT/DELETE
plug :put_secure_browser_headers # => Default security headers
end
# API doesn't use CSRF (use JWT instead)
pipeline :api do # => Stateless API pipeline
plug :accepts, ["json"] # => Only accept JSON
# No CSRF protection for stateless API # => JWT tokens provide authentication
end
end
# Input sanitization for user content
defmodule MyApp.Content do # => Content processing module
@doc "Sanitize HTML to prevent XSS"
def sanitize_html(html) do # => Removes dangerous HTML from user input
HtmlSanitizeEx.basic_html(html) # => Strip dangerous tags
# => Allows: <p>, <br>, <strong>, <em>, <a>, etc.
# => Removes: <script>, <iframe>, onclick, etc.
end
def sanitize_user_post(params) do # => Clean all user-supplied post params
params
|> Map.update("body", "", &sanitize_html/1) # => Clean HTML content
|> Map.update("title", "", &String.trim/1) # => Trim whitespace
end
end
# SQL injection prevention (automatic with Ecto)
# ❌ NEVER do this:
def search_posts_unsafe(term) do
query = "SELECT * FROM posts WHERE title LIKE '%#{term}%'" # => SQL INJECTION!
Ecto.Adapters.SQL.query!(Repo, query, [])
end
# ✅ Always use parameterized queries:
def search_posts_safe(term) do
from p in Post,
where: like(p.title, ^"%#{term}%") # => Parameter binding
|> Repo.all()
# => Generates: SELECT * FROM posts WHERE title LIKE $1
# => Binds: ["%search%"]
end
# Rate limiting (prevent brute force)
defmodule MyAppWeb.Plugs.RateLimitLogin do # => Plug: brute-force protection
def init(opts), do: opts # => Compile-time no-op
def call(conn, _opts) do # => Called on every login request
ip = get_ip(conn) # => Client IP
key = "login_attempts:#{ip}" # => Cache key per IP address
case MyApp.Cache.get(key) do
nil -> # => First attempt from this IP
MyApp.Cache.set(key, 1, ttl: 300) # => First attempt, 5min window
# => TTL resets 5-minute window on first attempt
conn # => Allow request
attempts when attempts < 5 -> # => Under limit (< 5 attempts)
MyApp.Cache.incr(key) # => Increment counter
conn # => Allow request
_ -> # => Too many attempts (>= 5)
conn
|> put_status(:too_many_requests) # => 429
|> Phoenix.Controller.json(%{error: "Too many login attempts"})
|> halt() # => Block request
# => Prevents calling controller action
end
end
defp get_ip(conn) do # => Convert :inet tuple to string
conn.remote_ip |> Tuple.to_list() |> Enum.join(".")
# => {127, 0, 0, 1} → "127.0.0.1"
end
end
# Secure password hashing (automatic with phx.gen.auth)
defmodule MyApp.Accounts.User do # => User schema module
def registration_changeset(user, attrs) do # => Validates new user registration
user
|> cast(attrs, [:email, :password]) # => Extract email and password from params
|> validate_email() # => Check email format and uniqueness
|> validate_password() # => Check password length and complexity
|> hash_password() # => Bcrypt with salt
end
defp hash_password(changeset) do # => Hash password before saving
password = get_change(changeset, :password) # => Get password from changeset changes
if password && changeset.valid? do # => Only hash if password changed AND changeset valid
changeset
|> put_change(:password_hash, Bcrypt.hash_pwd_salt(password))
# => Bcrypt generates unique salt per hash
|> delete_change(:password) # => Don't store plaintext
else
changeset # => Return unchanged if invalid
end
end
endKey Takeaway: Phoenix provides automatic CSRF tokens and secure headers; add CSP/HSTS via plugs, sanitize user HTML with HtmlSanitizeEx, and use Ecto parameterized queries exclusively to prevent SQL injection.
Why It Matters: Security requires layered defenses because no single mechanism stops all attacks. CSP headers block XSS injection, CSRF tokens prevent cross-site request forgery, HTML sanitization strips malicious input from user-generated content, bcrypt makes stolen password hashes computationally expensive to crack, and rate limiting stops credential stuffing. Each layer handles a different attack vector — missing any one leaves a gap. Phoenix provides built-in support for most layers; understanding all of them prevents the common mistake of deploying with only one or two active.
Example 77: WebSocket Connection Pooling
Optimize WebSocket connections with connection pooling and load distribution.
# Channel with connection limits
defmodule MyAppWeb.UserSocket do # => WebSocket socket module
use Phoenix.Socket # => Phoenix.Socket behavior
# Limit concurrent connections per user
def connect(%{"token" => token}, socket, _connect_info) do
# => Called on new WebSocket connection
# => token: JWT from client params
case verify_token(token) do # => Validate authentication token
{:ok, user_id} -> # => Token valid
# Check connection limit
case check_connection_limit(user_id) do
# => Enforce per-user connection limit
:ok ->
{:ok, assign(socket, :user_id, user_id)} # => Allow connection
# => user_id available in all channels
{:error, :limit_exceeded} ->
:error # => Reject connection
# => Client receives connection refused
end
{:error, _} ->
:error # => Invalid token: reject connection
end
end
defp check_connection_limit(user_id) do # => Count active connections for user
count = Phoenix.Tracker.list(MyApp.Presence, "user:#{user_id}")
# => List all presences for this user
|> Enum.count() # => Count current connections
if count < 5 do # => Max 5 connections per user
# => Multiple tabs/devices share the limit
:ok
else
{:error, :limit_exceeded} # => User has too many connections
end
end
def id(socket), do: "user_socket:#{socket.assigns.user_id}"
# => Returns unique ID for this socket
# => Used for disconnect_by_user_id/1
channel "room:*", MyAppWeb.RoomChannel # => All room:* channels → RoomChannel
channel "user:*", MyAppWeb.UserChannel # => All user:* channels → UserChannel
end
# Connection pooling for external services
defmodule MyApp.External.APIClient do # => Defines HTTP client with connection pooling
# => Reuses connections for better performance
use GenServer # => Imports GenServer behavior
# => Manages connection pool lifecycle
def start_link(opts) do # => Public function: start_link/1
# => Called by supervisor to start process
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
# => Starts GenServer
# => Registers with module name
# => Returns {:ok, pid}
end # => End start_link/1 function
def init(_opts) do # => Initializes GenServer
# => Sets up HTTP connection pool
# Create connection pool
{:ok, pool} = :hackney_pool.child_spec(:api_pool, [
# => Creates Hackney connection pool
# => Pool name: :api_pool
timeout: 15_000, # => Connection timeout
# => Max 15 seconds to establish connection
# => Prevents hanging on slow servers
max_connections: 100 # => Pool size
# => Maximum 100 concurrent connections
# => Reuses idle connections
]) # => Returns pool child_spec
{:ok, %{pool: pool}} # => Returns GenServer initial state
# => Stores pool reference
end # => End init/1 callback
def get(path) do # => Public API: get/1
# => Makes HTTP GET request using pool
url = "https://api.example.com" <> path # => Builds full URL
# => E.g., path="/users" → "https://api.example.com/users"
headers = [{"Authorization", "Bearer #{api_token()}"}]
# => Sets request headers
# => Authorization header with Bearer token
# => api_token() fetches from config/env
case :hackney.get(url, headers, "", [pool: :api_pool]) do
# => Makes HTTP GET request
# => Uses :api_pool for connection reuse
# => Returns {:ok, status, headers, client_ref} or {:error, reason}
{:ok, 200, _headers, client_ref} -> # => Success case: HTTP 200 OK
# => client_ref: reference to read response body
{:ok, body} = :hackney.body(client_ref) # => Read response
# => Reads full response body
# => body is binary string (JSON)
Jason.decode(body) # => Parse JSON
# => Converts JSON string to Elixir map
# => Returns {:ok, map} or {:error, reason}
{:ok, status, _headers, _client_ref} -> # => Non-200 status code
# => E.g., 404, 500, etc.
{:error, {:http_error, status}} # => Returns error tuple with status
# => E.g., {:error, {:http_error, 404}}
{:error, reason} -> # => Network/connection error
# => E.g., timeout, connection refused
{:error, reason} # => Returns error reason
# => E.g., {:error, :timeout}
end # => End case statement
end # => End get/1 function
end # => End MyApp.External.APIClient module
# Distribute WebSocket connections across nodes
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
# Track connections across cluster
def track_connection(socket) do
track(socket, "connections", socket.id, %{
node: node(), # => Which Erlang node
joined_at: System.system_time(:second)
})
end
# Get connection count across all nodes
def connection_count do
list("connections") # => All tracked connections
|> Enum.count()
end
# Get connections per node
def connections_by_node do
list("connections")
|> Enum.group_by(fn {_id, %{metas: [meta | _]}} ->
meta.node # => Group by node name
end)
|> Enum.map(fn {node, conns} ->
{node, Enum.count(conns)} # => {node, count}
end)
|> Enum.into(%{})
end
endKey Takeaway: Enforce per-user connection limits in join/3, use Hackney connection pooling for HTTP clients, and track cluster-wide WebSocket counts via Presence to prevent connection abuse at scale.
Why It Matters: Limiting WebSocket connections per user prevents a single browser session with many open tabs from exhausting server file descriptor limits and memory. Hackney connection pooling reuses HTTP connections to external services, avoiding TCP handshake overhead on each API call—critical for endpoints that call external services synchronously. Both patterns protect server capacity as concurrent user numbers grow, ensuring one heavy user cannot degrade service quality for others.
Example 78: GraphQL API with Absinthe
Build GraphQL API for flexible client queries.
# mix.exs
defp deps do
[
{:absinthe, "~> 1.7"}, # => GraphQL implementation
{:absinthe_plug, "~> 1.5"} # => Phoenix integration
]
end
# GraphQL schema
defmodule MyAppWeb.Schema do
use Absinthe.Schema # => Imports Absinthe schema DSL
# => Provides query/mutation/subscription macros
import_types MyAppWeb.Schema.ContentTypes # => Imports :post, :user, :comment types
# => Types defined in separate module for organization
query do
@desc "Get all posts"
field :posts, list_of(:post) do
arg :limit, :integer, default_value: 10 # => Optional limit
arg :offset, :integer, default_value: 0
resolve &MyAppWeb.Resolvers.Content.list_posts/3
end
@desc "Get post by ID"
field :post, :post do
arg :id, non_null(:id) # => Required ID
resolve &MyAppWeb.Resolvers.Content.get_post/3
end
@desc "Search posts"
field :search_posts, list_of(:post) do
arg :query, non_null(:string) # => Required search string
# => Passed to full-text search resolver
resolve &MyAppWeb.Resolvers.Content.search_posts/3
# => Resolver implements search logic
end
end
mutation do
@desc "Create a post"
field :create_post, :post do
arg :title, non_null(:string) # => Required title argument
# => Non-null: mutation fails if omitted
arg :body, non_null(:string) # => Required body argument
# => Must be non-empty string
arg :tags, list_of(:string) # => Optional list of tag strings
# => Defaults to nil if not provided
resolve &MyAppWeb.Resolvers.Content.create_post/3
# => Delegates to resolver function
# => Resolver handles authorization + persistence
end
@desc "Update a post"
field :update_post, :post do
arg :id, non_null(:id) # => Required post ID to update
arg :title, :string # => Optional fields
arg :body, :string # => Optional: only update if provided
# => Allows partial updates (PATCH semantics)
resolve &MyAppWeb.Resolvers.Content.update_post/3
# => Resolver merges only provided args
end
end
subscription do
# => Enables WebSocket subscriptions
# => Clients connect via AbsintheSocket
@desc "Subscribe to new posts"
field :post_created, :post do
config fn _args, _context -> # => Configures subscription topic per client
# => _args from subscription query, _context from socket
{:ok, topic: "posts"} # => PubSub topic
# => All post_created subscribers share this topic
end
trigger :create_post, topic: fn _post -> # => Defines which mutation triggers this subscription
# => _post is the created post from mutation
["posts"] # => Trigger on mutation
# => Broadcast to all clients on "posts" topic
end
end
end
end
# Object types
defmodule MyAppWeb.Schema.ContentTypes do
use Absinthe.Schema.Notation # => Imports object/field/arg macros
# => Used for defining GraphQL type definitions
object :post do
field :id, :id # => Post ID
field :title, :string
field :body, :string
field :inserted_at, :string
field :author, :user, resolve: &get_author/3 # => Nested resolver
field :comments, list_of(:comment) do
resolve &get_comments/3 # => Load comments
end
end
object :user do
field :id, :id # => User ID field (non-null unique identifier)
field :name, :string # => User display name
# => Clients can request this independently
field :email, :string # => User email address
# => Only return if authorized context
end
object :comment do
field :id, :id # => Comment ID (unique identifier)
field :body, :string # => Comment text content
# => Clients request only fields they need
field :author, :user # => Nested :user type
# => Resolved via author resolver if requested
end
defp get_author(post, _args, _resolution) do
{:ok, MyApp.Accounts.get_user!(post.author_id)} # => Load author
end
defp get_comments(post, _args, _resolution) do
comments = MyApp.Blog.list_comments(post.id)
{:ok, comments} # => Load comments
end
end
# Resolvers
defmodule MyAppWeb.Resolvers.Content do
def list_posts(_parent, args, _resolution) do
# => _parent is nil for root query fields
# => args contains limit/offset from query
posts = MyApp.Blog.list_posts(args) # => Pass limit/offset
# => Returns paginated list of Post structs
{:ok, posts} # => Absinthe expects {:ok, value} tuple
# => Wraps result for GraphQL execution
end
def get_post(_parent, %{id: id}, _resolution) do
# => Pattern matches :id from args map
# => _resolution holds context and schema info
case MyApp.Blog.get_post(id) do
nil -> {:error, "Post not found"} # => Returns GraphQL error on missing post
# => Absinthe maps {:error, msg} to errors field
post -> {:ok, post} # => Returns post struct on success
end
end
def create_post(_parent, args, %{context: %{current_user: user}}) do
# => Extracts current_user from context
# => Context is set by authentication plug
case MyApp.Blog.create_post(Map.put(args, :author_id, user.id)) do
# => Merges user.id into args before creating
# => Prevents clients from setting author_id
{:ok, post} ->
Absinthe.Subscription.publish(
MyAppWeb.Endpoint, # => Publishes to the Phoenix endpoint
# => Reaches all subscribed WebSocket clients
post, # => Subscription payload (the new post)
post_created: "posts" # => Trigger subscription
# => Matches topic configured in schema
)
{:ok, post} # => Returns post to mutation caller
{:error, changeset} ->
{:error, changeset} # => Return errors
# => Absinthe formats changeset errors automatically
end
end
end
# Router
defmodule MyAppWeb.Router do # => Defines Phoenix router
# => Configures GraphQL endpoints
scope "/api" do # => API scope under /api path
# => All routes prefixed with /api
pipe_through :api # => Uses API pipeline
# => Accepts JSON, no session/CSRF
forward "/graphql", Absinthe.Plug, # => GraphQL query endpoint
# => POST requests to /api/graphql
schema: MyAppWeb.Schema # => GraphQL endpoint
# => Uses schema defined above
# => Handles queries, mutations, subscriptions
forward "/graphiql", Absinthe.Plug.GraphiQL, # => GraphQL IDE endpoint
# => Browser-based query explorer
schema: MyAppWeb.Schema, # => Uses same schema
# => Access at /api/graphiql in browser
interface: :playground # => GraphQL IDE
# => Interactive query editor
# => Shows schema documentation
# => Only enable in dev/staging!
end # => End /api scope
end # => End router
# Example queries:
# query { # => GraphQL query operation
# posts(limit: 5) { # => Fetches up to 5 posts
# id # => Request post ID
# title # => Request post title
# author { name } # => Nested query: author's name
# comments { body } # => Nested query: all comment bodies
# } # => Client chooses exact fields needed
# } # => No over-fetching or under-fetching
#
# mutation { # => GraphQL mutation operation
# createPost(title: "Hello", body: "World") { # => Creates new post with arguments
# id # => Returns created post ID
# title # => Returns created post title
# } # => Client specifies return fields
# } # => Mutation triggers subscriptionKey Takeaway: Absinthe defines GraphQL schemas with queries, mutations, and subscriptions in Elixir; resolvers load data on demand and clients request only the fields they need, reducing over-fetching.
Why It Matters: GraphQL lets clients request exactly the fields they need, eliminating over-fetching (receiving unused data) and under-fetching (requiring multiple round trips). Absinthe compiles the schema at startup, catches type errors at compile time, and maps queries to resolver functions that load data from your existing Ecto contexts. Subscriptions via PubSub push data to clients over the existing WebSocket without polling. For frontends with diverse data needs — mobile, web, third-party — a single GraphQL endpoint replaces multiple REST endpoints.
Example 79: Event Sourcing Pattern
Implement event sourcing to store state changes as immutable events.
# Event schema
defmodule MyApp.Events.Event do
use Ecto.Schema
schema "events" do
field :aggregate_id, :string # => Entity ID (e.g., order_id)
field :aggregate_type, :string # => Entity type (e.g., "Order")
field :event_type, :string # => Event name
field :payload, :map # => Event data (JSONB)
field :version, :integer # => Event version
field :inserted_at, :utc_datetime
end
end
# Event store
defmodule MyApp.EventStore do # => Append-only event log module
# => Single source of truth for all state changes
alias MyApp.Events.Event # => Event Ecto schema (maps to events table)
alias MyApp.Repo # => Database repository
def append_event(aggregate_id, aggregate_type, event_type, payload) do
# => Public function: append_event/4
# => Writes an immutable event to the log
# Get current version
current_version = get_current_version(aggregate_id)
# => Fetches latest version number for this aggregate
# => Returns 0 if no events exist yet
event = %Event{ # => Builds Event struct for insertion
aggregate_id: aggregate_id, # => Which entity this event belongs to
# => E.g., order-123
aggregate_type: aggregate_type, # => Entity type for PubSub topic routing
# => E.g., Order
event_type: event_type, # => Domain event name
# => E.g., OrderCreated, ItemAdded
payload: payload, # => Event data as map (stored as JSONB)
# => E.g., %{item => %{price => 10}}
version: current_version + 1, # => Increment version
# => Monotonically increasing per aggregate
inserted_at: DateTime.utc_now() # => Event timestamp
# => Immutable once written
}
case Repo.insert(event) do # => Writes event to database
# => Fails if duplicate version (optimistic locking)
{:ok, event} ->
# Publish event for projections
Phoenix.PubSub.broadcast(
MyApp.PubSub, # => Broadcasts to all subscribers
# => Including Projection GenServers
"events:#{aggregate_type}", # => Topic scoped by aggregate type
# => E.g., events:Order
{:event_appended, event} # => Message pattern for handle_info
# => Projection receives the full event struct
)
{:ok, event} # => Returns success with persisted event
{:error, changeset} ->
{:error, changeset} # => Database insert failed
# => Changeset contains validation errors
end
end
def get_events(aggregate_id) do # => Public function: get_events/1
# => Loads full event history for an aggregate
from e in Event,
where: e.aggregate_id == ^aggregate_id, # => Filter to this aggregate's events only
# => ^aggregate_id pins variable (prevents injection)
order_by: [asc: e.version] # => Chronological order
# => Must replay in version order to rebuild state
|> Repo.all() # => Executes query, returns list of Event structs
# => E.g., [%Event{version: 1}, %Event{version: 2}]
end
defp get_current_version(aggregate_id) do # => Private helper: get max version
# => Used to calculate next event version
from e in Event,
where: e.aggregate_id == ^aggregate_id, # => Scoped to this aggregate
select: max(e.version) # => SQL: SELECT MAX(version) FROM events WHERE ...
|> Repo.one() # => Returns single value or nil
|> case do
nil -> 0 # => No events yet
# => First event will have version: 1
version -> version # => Returns current max version
# => Next event gets version + 1
end
end
end
# Order aggregate
defmodule MyApp.Orders.Order do # => Defines Order aggregate (domain model)
# => State rebuilt from events, not stored directly
defstruct id: nil, # => Order ID field
items: [], # => List of order items
status: :draft, # => Order status (:draft, :placed, :shipped)
total: 0, # => Total order amount
version: 0 # => Event version counter
# Apply event to state
def apply_event(order, %Event{event_type: "OrderCreated", payload: payload}) do
# => Handles OrderCreated event
# => Pattern matches event_type
%{order | # => Updates order struct
id: payload["id"], # => Sets order ID from event
items: [], # => Starts with empty items list
status: :draft, # => Initial status is :draft
total: 0 # => Initial total is 0
} # => Returns updated order
end # => End OrderCreated handler
def apply_event(order, %Event{event_type: "ItemAdded", payload: payload}) do
# => Handles ItemAdded event
# => Adds item to order
item = payload["item"] # => Extracts item from payload
# => E.g., %{"name" => "Widget", "price" => 10}
%{order | # => Updates order struct
items: [item | order.items], # => Prepends item to items list
# => E.g., [new_item | existing_items]
total: order.total + item["price"] # => Update total
# => Adds item price to running total
# => E.g., total: 20 + 10 = 30
} # => Returns updated order
end # => End ItemAdded handler
def apply_event(order, %Event{event_type: "OrderPlaced"}) do
# => Handles OrderPlaced event
# => Transitions order to placed status
%{order | status: :placed} # => Change status
# => Updates status field to :placed
# => Order can no longer be modified
end # => End OrderPlaced handler
def apply_event(order, %Event{event_type: "OrderShipped", payload: payload}) do
# => Handles OrderShipped event
# => Records shipment details
%{order | # => Updates order struct
status: :shipped, # => Changes status to :shipped
# => Final order status
tracking_number: payload["tracking_number"] # => Adds tracking number from payload
# => E.g., "TRACK123456"
} # => Returns updated order
end # => End OrderShipped handler
# Rebuild state from events
def from_events(events) do # => Public function: from_events/1
# => Replays events to rebuild order state
Enum.reduce(events, %__MODULE__{}, &apply_event(&2, &1))
# => Reduces events list into order struct
# => Starts with empty %Order{}
# => Applies each event in order
# => Replay all events to rebuild current state
# => E.g., [OrderCreated, ItemAdded, ItemAdded, OrderPlaced]
# => Builds final order: %Order{id: 1, items: [..], status: :placed, total: 50}
end # => End from_events/1 function
end # => End MyApp.Orders.Order module
# Command handler
defmodule MyApp.Orders.Commands do # => Command handler module
# => Validates and dispatches commands to EventStore
alias MyApp.EventStore # => Alias for shorter references
alias MyApp.Orders.Order # => Order aggregate for state rebuilding
def create_order(order_id) do # => Public function: create_order/1
# => Creates a new order aggregate
EventStore.append_event(
order_id, # => Aggregate ID (e.g., order-123)
# => Uniquely identifies this order's event stream
"Order", # => Aggregate type for event scoping
# => Used in PubSub topic events:Order
"OrderCreated", # => Event type string
# => Matches apply_event clause in Order aggregate
%{"id" => order_id} # => Event payload
# => Stored as JSONB in events table
)
end
def add_item(order_id, item) do # => Public function: add_item/2
# => Validates order status before appending event
# Load current state
events = EventStore.get_events(order_id) # => Fetches all events for this order
# => Returns events ordered by version asc
order = Order.from_events(events) # => Rebuild from events
# => Replays events to get current aggregate state
# Validate command
if order.status == :draft do # => Guard: only allow items on draft orders
# => Rejects command if order already placed
EventStore.append_event(
order_id, # => Same aggregate ID
"Order", # => Same aggregate type
"ItemAdded", # => Event type for item addition
%{"item" => item} # => Record event
# => item is %{name => ..., price => ...}
)
else
{:error, "Cannot add items to placed order"}
# => Returns error tuple
# => Caller decides how to handle rejection
end
end
def place_order(order_id) do # => Public function: place_order/1
# => Validates total > 0 before placing
events = EventStore.get_events(order_id) # => Load event history
order = Order.from_events(events) # => Rebuild current state from events
# => Check if order has any items
if order.total > 0 do # => Guard: prevent placing empty orders
# => total is sum of all ItemAdded prices
EventStore.append_event(
order_id,
"Order",
"OrderPlaced", # => Final placement event
%{} # => No additional data
# => Placement itself is the payload
)
else
{:error, "Order must have items"} # => Command rejected
# => Event is NOT written to store
end
end
end
# Read model projection (materialized view)
defmodule MyApp.Orders.Projection do # => GenServer that maintains a read model
# => Listens to PubSub events, updates query-optimized tables
use GenServer # => OTP GenServer behavior
# => Provides start_link, init, handle_info callbacks
def start_link(_opts) do # => OTP-compatible start function
# => Called by Supervisor to start this process
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
# => Registers process under module name
# => %{} is initial state (no in-memory data needed)
end
def init(_) do # => Called once when GenServer starts
# => Subscribe to PubSub before handling events
Phoenix.PubSub.subscribe(MyApp.PubSub, "events:Order")
# => Subscribes to events:Order topic
# => Receives {:event_appended, event} messages
{:ok, %{}} # => Returns initial GenServer state
# => %{} since projection state lives in DB
end
def handle_info({:event_appended, event}, state) do
# => Receives event from PubSub broadcast
# => Pattern matches {:event_appended, event}
# Update read model based on event
update_read_model(event) # => Update database view
# => Dispatches to type-specific handler
{:noreply, state} # => GenServer continues with unchanged state
# => Read model state lives in database, not here
end
defp update_read_model(%{event_type: "OrderPlaced", aggregate_id: order_id}) do
# => Pattern match: only handles OrderPlaced events
# => Other event types fall through (or are ignored)
# Update orders table for fast queries
events = EventStore.get_events(order_id) # => Fetch full event history for this order
# => Rebuilds aggregate state from scratch
order = Order.from_events(events) # => Replay events to get final order state
# => Used to snapshot the current projection
# Save snapshot for performance
MyApp.Repo.insert_or_update(%MyApp.Orders.OrderSnapshot{
# => Upserts snapshot into orders_snapshot table
# => insert_or_update handles both new and existing
id: order_id, # => Primary key: aggregate ID
items: order.items, # => Serialized item list for fast reads
# => Stored as JSONB in snapshot table
total: order.total, # => Computed total from event replay
# => Avoids re-computing on every query
status: order.status # => Current order status
# => Snapshot: :placed (triggering event type)
})
end
endKey Takeaway: Event sourcing stores immutable domain events as the source of truth; state is derived by replaying events through projections, providing a complete audit trail and enabling time-travel debugging.
Why It Matters: Event sourcing stores every state change as an immutable event rather than overwriting rows, providing a complete audit trail and the ability to replay history to reconstruct past states — useful for debugging, compliance, and retroactive analytics. Projections rebuild read models from events, separating query-optimized views from the event log. The tradeoff is significant complexity: eventual consistency between write and read models, and aggregate rebuilding latency for large event streams that require snapshotting.
Example 80: Advanced Testing Strategies
Implement property-based testing and contract testing.
# Property-based testing with StreamData
defmodule MyApp.BlogTest do # => Test module for Blog context
# => Uses property-based testing
use ExUnit.Case # => Imports ExUnit test macros
# => Provides test/2, assert/1, etc.
use ExUnitProperties # => Property testing
# => Imports property/2, check/2, generators
property "post title is always trimmed" do # => Property test: invariant over many inputs
# => Tests 100 random inputs by default
check all title <- string(:alphanumeric, min_length: 1, max_length: 100) do
# => Generator: random alphanumeric strings
# => min_length: 1, max_length: 100
# => Runs test with 100 different titles
# Generate random titles
params = %{"title" => " #{title} "} # => Add whitespace
# => Surrounds title with spaces
# => E.g., " HelloWorld "
changeset = Post.changeset(%Post{}, params) # => Creates changeset
# => Validates and casts params
if changeset.valid? do # => Only check if changeset valid
# => Skips invalid inputs
trimmed = Ecto.Changeset.get_change(changeset, :title)
# => Gets :title change from changeset
# => Should be trimmed value
assert String.trim(title) == trimmed # => Property: always trimmed
# => Invariant: whitespace removed
# => Fails if any input violates property
end # => End if
end # => End check
end # => End property test
property "creating and deleting post is idempotent" do
# => Property test: operation reversibility
# => Tests create → delete → verify cycle
check all title <- string(:printable) do # => Generator: random printable strings
# => Includes special chars, unicode
# Create post
{:ok, post} = MyApp.Blog.create_post(%{title: title, body: "test"})
# => Creates post with random title
# => Should succeed for all inputs
# Delete post
{:ok, _} = MyApp.Blog.delete_post(post.id) # => Deletes created post
# => Should succeed
# Verify deleted
assert MyApp.Blog.get_post(post.id) == nil # => Property: delete works
# => Post no longer exists
# => Invariant: create+delete = nothing
end # => End check
end # => End property test
end # => End MyApp.BlogTest module
# Contract testing for APIs
defmodule MyApp.API.ContractTest do
use ExUnit.Case
@api_contract %{
"POST /api/posts" => %{
request: %{
"title" => "string",
"body" => "string"
},
response: %{
status: 201,
body: %{
"id" => "integer",
"title" => "string",
"body" => "string",
"inserted_at" => "string"
}
}
},
"GET /api/posts/:id" => %{
response: %{
status: 200,
body: %{
"id" => "integer",
"title" => "string"
}
}
}
}
test "API contract compliance" do
for {endpoint, contract} <- @api_contract do
# Test request/response matches contract
verify_contract(endpoint, contract) # => Verify API shape
end
end
defp verify_contract("POST /api/posts", contract) do
conn = build_conn()
|> post("/api/posts", contract.request)
assert conn.status == contract.response.status
body = Jason.decode!(conn.resp_body)
# Verify response structure
for {field, type} <- contract.response.body do
assert Map.has_key?(body, field) # => Field exists
assert type_matches?(body[field], type) # => Type correct
end
end
defp type_matches?(value, "string"), do: is_binary(value)
defp type_matches?(value, "integer"), do: is_integer(value)
end
# Integration tests with setup
defmodule MyAppWeb.PostLiveTest do # => LiveView integration test module
# => Tests full user interactions
use MyAppWeb.ConnCase, async: true # => Uses conn test case
# => async: true enables parallel test execution
# => Faster test suite (isolated database)
import Phoenix.LiveViewTest # => Imports LiveView test helpers
# => Provides live/2, render_click/2, etc.
setup do # => Setup callback: runs before each test
# => Creates test data
# Setup test data
user = insert(:user) # => Factory
# => Creates test user with factory
# => Uses ExMachina or similar
posts = insert_list(3, :post, author: user) # => Create 3 posts
# => Creates 3 posts linked to user
# => insert_list/3: factory helper
{:ok, user: user, posts: posts} # => Return to test
# => Makes user and posts available in test context
# => Tests receive as %{user: user, posts: posts}
end # => End setup callback
test "displays posts", %{conn: conn, posts: posts} do
# => Test function
# => Receives conn and posts from setup
{:ok, view, html} = live(conn, "/posts") # => Mounts LiveView at /posts
# => view: LiveView test process
# => html: initial rendered HTML
# Assert all posts shown
for post <- posts do # => Iterates over test posts
# => Checks each post rendered
assert html =~ post.title # => Each title present
# => =~ checks string contains title
# => Fails if any post missing
end # => End for loop
# Test interaction
view # => LiveView test process
|> element("button", "Load More") # => Finds <button>Load More</button>
# => Selects element to interact with
|> render_click() # => Click button
# => Triggers phx-click event
# => Simulates user click
assert has_element?(view, "[data-role=post]", count: 6) # => 6 posts now
# => Checks for 6 elements with data-role="post"
# => Verifies "Load More" added 3 more posts
end # => End test
test "creates post", %{conn: conn} do # => Test post creation flow
# => Receives conn from setup
{:ok, view, _html} = live(conn, "/posts/new") # => Mounts LiveView at /posts/new
# => Opens post creation form
# Fill form
view # => LiveView test process
|> form("#post-form", post: %{title: "", body: ""})
# => Selects form by ID
# => Sets form values (empty for validation test)
|> render_change() # => Trigger validation
# => Simulates phx-change event
# => Triggers LiveView validation
assert has_element?(view, ".error") # => Shows errors
# => Checks for error message elements
# => Validates required field errors shown
# Submit valid form
view # => LiveView test process
|> form("#post-form", post: %{title: "Test", body: "Content"})
# => Fills form with valid data
# => title: "Test", body: "Content"
|> render_submit() # => Submit form
# => Triggers phx-submit event
# => Creates post in database
assert_redirected(view, "/posts/1") # => Redirects to show
# => Verifies redirect to post show page
# => Post ID is 1 (first post in test)
end # => End test
end # => End MyAppWeb.PostLiveTest module
# Performance testing
defmodule MyApp.PerformanceTest do # => Performance test module
# => Validates performance requirements
use ExUnit.Case # => Imports ExUnit test macros
# => Provides test/2, assert/1, etc.
@tag :performance # => Tags test as performance test
# => Can run selectively: mix test --only performance
test "list posts performs under 100ms" do # => Performance test: query speed
# => Validates 100ms requirement
# Create test data
insert_list(100, :post) # => Creates 100 test posts
# => Realistic data volume
{time_us, _result} = :timer.tc(fn -> # => Measures execution time
# => :timer.tc/1 returns {microseconds, result}
MyApp.Blog.list_posts() # => Measure execution time
# => Function being benchmarked
end) # => Returns {time_in_microseconds, function_result}
time_ms = time_us / 1000 # => Converts microseconds to milliseconds
# => 1000 μs = 1 ms
assert time_ms < 100, "Query took #{time_ms}ms, expected < 100ms"
# => Fails if query takes ≥100ms
# => Error message shows actual time
end # => End performance test
@tag :performance # => Tags as performance test
test "N+1 query prevention" do # => Tests for N+1 query problem
# => Validates preloading works
posts = insert_list(10, :post) # => Creates 10 test posts
# => Base data for N+1 test
Enum.each(posts, fn post -> # => Iterates over posts
# => Creates comments for each
insert_list(5, :comment, post: post) # => Creates 5 comments per post
# => Total: 50 comments
end) # => End each
# Count queries
query_count = count_queries(fn -> # => Counts database queries executed
# => Uses telemetry to track
MyApp.Blog.list_posts_with_comments() # => Preload comments
# => Should use 2 queries (posts + comments)
# => BAD: 11 queries (1 + 10 for each post)
end) # => Returns total query count
assert query_count == 2, "Expected 2 queries, got #{query_count}"
# => Fails if more than 2 queries
# => 1 for posts, 1 for comments (not 11!)
# => Verifies preload prevents N+1
end # => End N+1 test
defp count_queries(fun) do # => Helper: counts queries during execution
# => Uses telemetry to intercept queries
ref = make_ref() # => Creates unique reference
# => Used for telemetry handler ID
:telemetry.attach( # => Attaches telemetry handler
# => Intercepts query events
"query-counter-#{ref}", # => Handler ID (unique per test)
# => E.g., "query-counter-#Reference<0.1.2.3>"
[:my_app, :repo, :query], # => Event to listen for
# => Emitted by Ecto on each query
fn _event, _measurements, _metadata, acc -> # => Handler function
# => Called for each query
send(self(), {:query, acc + 1}) # => Sends message with incremented count
# => acc starts at 0, increments each query
end,
0 # => Initial accumulator value
# => Starting count is 0
)
fun.() # => Executes function being measured
# => Triggers database queries
count = receive_queries(0) # => Collects query count from messages
# => Starts with count 0
:telemetry.detach("query-counter-#{ref}") # => Removes telemetry handler
# => Cleanup to prevent leaks
count # => Returns total query count
end # => End count_queries/1 helper
defp receive_queries(count) do # => Helper: receives query count messages
# => Recursively collects counts
receive do # => Waits for messages
{:query, new_count} -> receive_queries(new_count)
# => Receives updated count
# => Recursively waits for more
after
100 -> count # => No more queries
# => Timeout after 100ms
# => Returns final count
end # => End receive
end # => End receive_queries/1 helper
end # => End MyApp.PerformanceTest moduleKey Takeaway: Use property-based testing with StreamData for invariant validation, contract tests for API stability, and performance benchmarks with Benchee to prevent regressions across all levels of the test pyramid.
Why It Matters: Property-based tests with StreamData find edge cases that example-based tests miss by generating hundreds of random inputs. Contract tests prevent API versioning regressions by verifying both producer and consumer agree on the interface. Together these testing strategies raise confidence that real-world usage patterns are handled correctly, not just the cases developers thought to test.
Last updated December 25, 2025