Advanced
Group 9: Database Advanced
Example 51: Transactions and Concurrency with Ecto.Multi
Execute multiple database operations atomically. If any fails, all rollback.
%% Ecto.Multi transaction flow
graph TD
A[Multi.new] --> B[Multi.update :debit]
B --> C[Multi.update :credit]
C --> D[Multi.insert :log]
D --> E[Repo.transaction]
E --> F{All succeed?}
F -->|Yes| G[Commit all changes]
F -->|No| H[Rollback everything]
G --> I[Return {:ok, results}]
H --> J[Return {:error, failed_op}]
style A fill:#0173B2,color:#fff
style E fill:#DE8F05,color:#fff
style G fill:#029E73,color:#fff
style H fill:#CA9161,color:#fff
defmodule MyApp.Transfers do # => Defines Transfers service module
# => Groups money transfer operations
def transfer_funds(from_account, to_account, amount) do # => Public function: transfer_funds/3
# => Params: from_account, to_account, amount
result = # => Binds result to Multi pipeline return
# => Will be {:ok, map} or {:error, ...} tuple
Ecto.Multi.new() # => Creates empty Multi struct
# => Multi accumulates operations to run in transaction
|> Ecto.Multi.update( # => Pipes Multi into update operation
:debit, # => Named operation key :debit
# => Can reference this result later as %{debit: ...}
Ecto.Changeset.change(from_account, balance: from_account.balance - amount)
# => Creates changeset subtracting amount from balance
# => E.g., balance: 1000 - 50 = 950
) # => Returns Multi with :debit operation added
# => Multi now has 1 operation queued
|> Ecto.Multi.update( # => Adds second update operation
:credit, # => Named operation key :credit
# => Independent of :debit (parallel semantics)
Ecto.Changeset.change(to_account, balance: to_account.balance + amount)
# => Creates changeset adding amount to balance
# => E.g., balance: 500 + 50 = 550
) # => Returns Multi with :debit and :credit queued
# => Multi now has 2 operations
|> Ecto.Multi.insert(:transaction_log, %TransactionLog{
# => Adds insert operation for audit log
# => Named :transaction_log
from_id: from_account.id, # => Sets from_id field to source account ID
# => E.g., from_id: 123
to_id: to_account.id, # => Sets to_id field to destination account ID
# => E.g., to_id: 456
amount: amount # => Sets amount field to transferred amount
# => E.g., amount: 50
}) # => Returns Multi with 3 operations total
# => All operations ready to execute atomically
|> MyApp.Repo.transaction() # => Executes all operations in database transaction
# => Returns {:ok, %{debit: ..., credit: ..., transaction_log: ...}} on success
# => Returns {:error, failed_operation, changeset, changes_so_far} on failure
# => Database commits all or rolls back all
case result do # => Pattern match on transaction result
# => Handles success and failure cases
{:ok, %{debit: from, credit: to, transaction_log: log}} -> # => Success pattern match
# => Destructures map with all 3 results
# => from is updated from_account struct
# => to is updated to_account struct
# => log is inserted TransactionLog struct
{:ok, from, to, log} # => Returns success tuple with all results
# => Caller receives all updated structs
# => Database changes are committed permanently
{:error, :debit, changeset, _changes} -> # => Failure pattern: :debit operation failed
# => changeset contains validation errors
# => _changes is empty map (no operations succeeded yet)
{:error, "Debit failed", changeset} # => Returns error tuple with message
# => Transaction rolled back before :credit ran
# => Database unchanged (atomicity guaranteed)
{:error, failed_op, changeset, _changes} -> # => Catch-all failure pattern for any operation
# => failed_op is operation name (:credit or :transaction_log)
# => changeset has error details
{:error, "Operation failed: #{failed_op}", changeset} # => Returns error with operation name
# => E.g., "Operation failed: credit"
# => All changes discarded, database rolled back
end # => End case statement
end # => End transfer_funds/3 function
# Dependency between operations - sequential execution with references
def complex_transaction do # => Public function: complex_transaction/0
# => No parameters, demonstrates dependent operations
Ecto.Multi.new() # => Creates empty Multi struct
# => Starting point for dependent operation chain
|> Ecto.Multi.insert(:user, User.create_changeset(%{email: "user@example.com"}))
# => First operation: insert user
# => Named :user (required for later references)
# => Creates user with email "user@example.com"
# => Returns Multi with :user operation queued
|> Ecto.Multi.insert(:profile, fn %{user: user} -> # => Second operation: insert profile (DEPENDENT)
# => Anonymous function receives %{user: user} from previous operations
# => user is the inserted User struct from :user operation
# => Sequential dependency: profile needs user.id
Profile.create_changeset(user) # => Creates profile changeset for inserted user
# => Uses user struct to set foreign key
# => E.g., profile.user_id = user.id
end) # => Returns Multi with :user and :profile queued
# => Multi now has 2 operations, :profile depends on :user
|> Ecto.Multi.insert(:settings, fn %{user: user} -> # => Third operation: insert settings (DEPENDENT)
# => Also receives %{user: user} from accumulated results
# => user is same inserted User struct
Settings.default_changeset(user) # => Creates default settings for user
# => Uses user struct to set foreign key
# => E.g., settings.user_id = user.id
end) # => Returns Multi with all 3 operations queued
# => Execution order: :user, then :profile, then :settings
|> MyApp.Repo.transaction() # => Executes operations in order within transaction
# => If :user fails, :profile and :settings never run
# => If :profile fails, :user is rolled back, :settings never runs
# => Returns {:ok, %{user: ..., profile: ..., settings: ...}} on success
# => Returns {:error, operation_name, changeset, partial_results} on failure
end # => End complex_transaction/0 function
end # => End MyApp.Transfers moduleKey Takeaway: Ecto.Multi ensures all-or-nothing execution. Operations reference previous results with fn. Rollback happens automatically on any failure. Perfect for transfers, account creation, multi-step operations.
Why It Matters: Ecto provides type-safe database interactions with compile-time query validation. This prevents SQL injection and catches query errors before deployment.
Example 52: Database Constraints and Error Handling
Handle database constraint violations (unique, foreign key, etc.) gracefully in changesets.
%% Constraint violation handling
graph TD
A[Insert user] --> B[Database]
B --> C{Constraint violated?}
C -->|UNIQUE email| D[unique_constraint catches]
C -->|FK organization_id| E[assoc_constraint catches]
C -->|No violation| F[Insert succeeds]
D --> G[Return changeset with error]
E --> G
F --> H[Return {:ok, user}]
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#fff
style F fill:#029E73,color:#fff
style G fill:#CA9161,color:#fff
defmodule MyApp.Accounts.User do # => Defines User schema module
# => Maps Elixir struct to "users" database table
schema "users" do # => Defines schema for "users" table
# => Auto-generates struct with id, inserted_at, updated_at
field :email, :string # => Declares email field, type :string (VARCHAR in DB)
# => Will be in User struct as user.email
field :username, :string # => Declares username field, type :string
# => Will be in User struct as user.username
end # => End schema definition
def registration_changeset(user, attrs) do # => Public function: registration_changeset/2
# => Params: user struct, attrs map
# => Returns changeset for user registration
user # => Starts with User struct (empty or existing)
# => E.g., %User{email: nil, username: nil}
|> cast(attrs, [:email, :username]) # => Casts attrs map to changeset with allowed fields
# => Only :email and :username are whitelisted
# => E.g., attrs = %{email: "alice@example.com", username: "alice"}
# => Returns changeset with changes: %{email: "alice@example.com", username: "alice"}
|> unique_constraint(:email) # => Adds constraint check for unique email
# => Does NOT query database yet (lazy)
# => If Repo.insert finds duplicate email, catches constraint error
# => Converts PostgreSQL UNIQUE violation to changeset error
# => Error: {:email, {"has already been taken", [constraint: :unique]}}
|> unique_constraint(:username) # => Adds constraint check for unique username
# => If Repo.insert finds duplicate username, catches error
# => Prevents "ERROR: duplicate key value violates unique constraint"
# => Converts to user-friendly changeset error
|> assoc_constraint(:organization) # => Validates foreign key :organization_id exists in organizations table
# => If organization_id points to non-existent record, catches FK violation
# => Converts "violates foreign key constraint" to changeset error
# => Error: {:organization, {"does not exist", [constraint: :assoc]}}
end # => Returns changeset with all constraints registered
# => Constraints only validated when Repo.insert/update called
end # => End MyApp.Accounts.User module
# In your service
defmodule MyApp.Accounts do # => Defines Accounts context module
# => Groups user registration and management operations
def register_user(attrs) do # => Public function: register_user/1
# => Param: attrs map with email, username
# => Returns {:ok, user} or {:error, message}
case %User{} # => Creates empty User struct
# => %User{id: nil, email: nil, username: nil}
|> User.registration_changeset(attrs) # => Builds changeset with validations
# => Adds constraints for email, username, organization
# => Returns changeset ready for insert
|> Repo.insert() do # => Attempts database INSERT
# => If successful: returns {:ok, %User{id: 123, email: "...", ...}}
# => If constraint violated: returns {:error, %Changeset{errors: [...]}}
# => Constraint checks happen at database level
{:ok, user} -> # => Pattern match on success case
# => user is persisted User struct with database ID
# => E.g., %User{id: 123, email: "alice@example.com", username: "alice"}
{:ok, user} # => Returns success tuple with user struct
# => Caller receives complete user record
{:error, %Changeset{} = changeset} -> # => Pattern match on failure case
# => changeset contains validation/constraint errors
# => E.g., changeset.errors = [{:email, {"has already been taken", ...}}]
# Check for constraint violations
error_fields = Enum.map(changeset.errors, fn {field, {msg, _}} -> {field, msg} end)
# => Maps errors to {field, message} tuples
# => E.g., [{:email, "has already been taken"}, {:username, "..."}]
# => Extracts field name and error message, discards metadata
if Enum.any?(error_fields, fn {field, _} -> field == :email end) do
# => Checks if :email field has error
# => Returns true if duplicate email constraint violated
# => Returns false if email is valid
{:error, "Email already registered"} # => Returns specific error for duplicate email
# => User-friendly message for UI display
else # => All other errors (username duplicate, FK violation, etc.)
{:error, "Registration failed"} # => Returns generic error message
# => Could inspect changeset.errors for specific details
end # => End if statement
end # => End case statement
end # => End register_user/1 function
end # => End MyApp.Accounts moduleKey Takeaway: unique_constraint/2 catches database uniqueness violations. assoc_constraint/2 catches foreign key errors. Changesets provide user-friendly error messages without SQL errors exposed.
Why It Matters: Changesets centralize validation logic and provide user-friendly error messages. This pattern ensures data integrity and improves user experience with clear feedback.
Example 53: Polymorphic Associations with many_to_many :through
Model flexible relationships where the same entity can have many different types of related entities.
%% Many-to-many through join table
erDiagram
POST ||--o{ POST_TAG : has
TAG ||--o{ POST_TAG : has
COMMENT ||--o{ COMMENT_TAG : has
TAG ||--o{ COMMENT_TAG : has
POST {
int id PK
string title
}
TAG {
int id PK
string name
}
POST_TAG {
int post_id FK
int tag_id FK
}
COMMENT {
int id PK
string body
}
COMMENT_TAG {
int comment_id FK
int tag_id FK
}
defmodule MyApp.Content.Post do # => Defines Post schema module
# => Maps to "posts" database table
schema "posts" do # => Defines schema for "posts" table
# => Auto-generates id, inserted_at, updated_at
field :title, :string # => Declares title field, type :string
# => Will be accessible as post.title
many_to_many(:tags, MyApp.Tagging.Tag, join_through: "post_tags")
# => Declares many-to-many relationship with Tag
# => join_through: "post_tags" - uses post_tags join table
# => Post can have many Tags, Tag can have many Posts
# => Accessible as post.tags (list of Tag structs)
many_to_many(:attachments, MyApp.Attachments.Attachment, join_through: "post_attachments")
# => Second many-to-many relationship with Attachment
# => Uses post_attachments join table
# => Demonstrates polymorphic-like pattern (Post has Tags AND Attachments)
end # => End schema definition
end # => End MyApp.Content.Post module
defmodule MyApp.Content.Comment do # => Defines Comment schema module
# => Maps to "comments" database table
schema "comments" do # => Defines schema for "comments" table
# => Auto-generates id, inserted_at, updated_at
field :body, :string # => Declares body field for comment text
# => Will be accessible as comment.body
many_to_many(:tags, MyApp.Tagging.Tag, join_through: "comment_tags")
# => Declares many-to-many relationship with SAME Tag module
# => Uses comment_tags join table (different from post_tags)
# => Comment can have many Tags, Tag can belong to Posts AND Comments
# => Polymorphic pattern: Tag shared across multiple entities
end # => End schema definition
end # => End MyApp.Content.Comment module
# Migration for join table
def change do # => Migration function to create join table
# => Runs when executing mix ecto.migrate
create table(:post_tags) do # => Creates post_tags join table
# => Auto-generates id, inserted_at, updated_at
add :post_id, references(:posts, on_delete: :delete_all)
# => Adds post_id foreign key column
# => references(:posts) - points to posts.id
# => on_delete: :delete_all - when post deleted, delete all post_tags rows
# => Ensures referential integrity (CASCADE DELETE)
add :tag_id, references(:tags, on_delete: :delete_all)
# => Adds tag_id foreign key column
# => references(:tags) - points to tags.id
# => on_delete: :delete_all - when tag deleted, delete all post_tags rows
# => Prevents orphaned join records
timestamps() # => Adds inserted_at and updated_at timestamp columns
# => Tracks when join record was created/updated
end # => End table creation
create unique_index(:post_tags, [:post_id, :tag_id])
# => Creates unique composite index on (post_id, tag_id)
# => Prevents duplicate Post-Tag associations
# => E.g., can't link same Post to same Tag twice
# => Also improves query performance for lookups
end # => End migration function
# Usage
post = Post # => Starts with Post module alias
|> Repo.preload(:tags) # => Loads associated tags from database
# => Executes JOIN query: SELECT tags.* FROM tags JOIN post_tags ON...
# => Returns post with post.tags populated (list of Tag structs)
# => E.g., post.tags = [%Tag{id: 1, name: "elixir"}, %Tag{id: 2, name: "web"}]
|> Ecto.Changeset.change() # => Creates changeset from preloaded post
# => Prepares post struct for updating associations
# => Returns changeset with no changes yet
|> put_assoc(:tags, tags) # => Replaces post's tags with new tags list
# => tags variable contains list of Tag structs
# => Ecto will DELETE old post_tags rows and INSERT new ones
# => E.g., tags = [%Tag{id: 3}, %Tag{id: 4}]
|> Repo.update() # => Persists association changes to database
# => Executes: DELETE FROM post_tags WHERE post_id = X
# => INSERT INTO post_tags (post_id, tag_id) VALUES (X, 3), (X, 4)
# => Returns {:ok, updated_post} with new associations
# Query posts with specific tag
posts = from p in Post, # => Starts Ecto query for Post table
# => p is alias for posts table
join: t in assoc(p, :tags), # => INNER JOIN with tags through post_tags join table
# => assoc(p, :tags) generates: JOIN post_tags ON post_tags.post_id = p.id
# => JOIN tags ON tags.id = post_tags.tag_id
# => t is alias for tags table
where: t.slug == "featured" # => Filters for tag with slug = "featured"
# => WHERE tags.slug = 'featured'
# => Returns only posts that have the "featured" tag
# => Result: list of Post structs [%Post{...}, ...]Key Takeaway: many_to_many/3 with join_through creates flexible relationships. Use put_assoc/3 to update related records. Query across relationships with join.
Why It Matters: Query composition enables complex database operations. Understanding Ecto queries is essential for application performance.
Example 54: Multi-Tenancy with Ecto Query Prefix
Isolate tenant data at the query level. Each query automatically scopes to tenant.
defmodule MyApp.Accounts do # => Defines Accounts context for multi-tenancy
# => Groups tenant-scoped user operations
def get_user(user_id, tenant_id) do # => Public function: get_user/2
# => Params: user_id, tenant_id for isolation
from(u in User, where: u.id == ^user_id and u.tenant_id == ^tenant_id)
# => Builds query filtering by BOTH user_id AND tenant_id
# => Prevents cross-tenant data leakage
# => WHERE users.id = ? AND users.tenant_id = ?
|> Repo.one() # => Executes query, returns single user or nil
# => Result: %User{} if found, nil if not found or wrong tenant
end # => End get_user/2 function
def create_user(attrs, tenant_id) do # => Public function: create_user/2
# => Params: attrs map, tenant_id for isolation
%User{} # => Creates empty User struct
# => %User{id: nil, tenant_id: nil, ...}
|> User.changeset(attrs |> Map.put("tenant_id", tenant_id))
# => Adds tenant_id to attrs before validation
# => E.g., %{"name" => "Alice", "tenant_id" => 123}
# => Ensures user always belongs to correct tenant
|> Repo.insert() # => Inserts user into database
# => Returns {:ok, %User{}} or {:error, changeset}
end # => End create_user/2 function
end # => End MyApp.Accounts module
# Or use query prefix for schema-per-tenant
defmodule MyApp.Repo do # => Alternative approach: schema-level isolation
# => Each tenant has separate PostgreSQL schema
def for_tenant(tenant_id) do # => Public function: for_tenant/1
# => Switches Repo to use tenant-specific schema
# All queries run with prefix filter
Repo.put_dynamic_repo({__MODULE__, {tenant_id}}) # => Sets dynamic repo prefix for current process
# => All subsequent queries in this process use "tenant_#{tenant_id}" schema
# => E.g., tenant_id=123 → queries use "tenant_123" schema
end # => End for_tenant/1 function
end # => End MyApp.Repo module
# Query scoped to tenant
User # => Starts with User schema
|> where([u], u.tenant_id == ^tenant_id) # => Filters by tenant_id column
# => WHERE users.tenant_id = ?
# => Ensures only this tenant's users returned
|> Repo.all() # => Executes query, returns list of users
# => Result: [%User{tenant_id: 123}, %User{tenant_id: 123}]
# => Never returns users from other tenants
# Or with dynamic query prefix
User # => Starts with User schema
|> Repo.all(prefix: "tenant_#{tenant_id}") # => Queries from tenant-specific schema
# => E.g., SELECT * FROM tenant_123.users
# => Complete schema isolation (tenant_123, tenant_456, etc.)
# => Each tenant has independent tablesKey Takeaway: Always filter by tenant_id in queries. Use scopes (functions that return queries) to prevent tenant leaks. Consider separate schemas per tenant for complete isolation.
Why It Matters: Schemas define data structure and types for validation and persistence. This provides a single source of truth for your domain models.
Example 55: PostgreSQL Advanced Features in Ecto
Leverage PostgreSQL-specific features: JSONB, arrays, full-text search, custom types.
defmodule MyApp.Blog.Post do # => Defines Post schema with PostgreSQL features
# => Demonstrates JSONB, arrays, full-text search
schema "posts" do # => Maps to "posts" table
# => Auto-generates id, inserted_at, updated_at
field :title, :string # => Standard string field (VARCHAR)
# => Will be accessible as post.title
field :metadata, :map # => JSONB in PostgreSQL
# => Stores arbitrary JSON data efficiently
# => E.g., %{"status" => "draft", "tags" => ["elixir"]}
field :tags, {:array, :string} # => Array type
# => PostgreSQL native array: text[]
# => E.g., ["elixir", "phoenix", "web"]
field :search_vector, :string # => Full-text search
# => Actually stored as tsvector type
# => Preprocessed text for efficient searching
end # => End schema definition
end # => End MyApp.Blog.Post module
# Migration
def change do # => Migration function to create posts table
# => Runs when executing mix ecto.migrate
create table(:posts) do # => Creates posts table
# => Auto-adds id primary key
add :title, :string # => Adds title column (VARCHAR)
# => Standard text field
add :metadata, :jsonb, default: "{}" # => Adds metadata column (JSONB type)
# => Default: empty JSON object {}
# => Stores structured data without predefined schema
add :tags, {:array, :string}, default: [] # => Adds tags column (text[] array)
# => Default: empty array []
# => Native PostgreSQL array, not JSON
add :search_vector, :tsvector # => Adds search_vector column (tsvector type)
# => Optimized for full-text search
# => Stores preprocessed, indexed text
timestamps() # => Adds inserted_at, updated_at columns
# => Auto-managed by Ecto
end # => End table creation
# GIN index for JSONB performance
create index(:posts, ["(metadata)"], using: :gin) # => Creates GIN index on metadata column
# => GIN (Generalized Inverted Index) for JSONB
# => Enables fast queries on JSON keys/values
# => E.g., WHERE metadata->>'status' = 'published'
# GIN index for full-text search
create index(:posts, [:search_vector], using: :gin) # => Creates GIN index on search_vector
# => Enables fast full-text search
# => Required for efficient @@ operator queries
end # => End migration function
# Query JSONB
posts = from p in Post, # => Starts Ecto query for Post
# => p is alias for posts table
where: fragment("? ->> ? = ?", p.metadata, "status", "published")
# => JSONB query: metadata->>'status' = 'published'
# => ->> extracts JSON field as text
# => Finds posts where metadata has status="published"
# => Result: posts with matching JSON property
# Array operations
posts = from p in Post, # => Query for posts with specific tag
# => Uses PostgreSQL array operators
where: fragment("? @> ?", p.tags, ^["elixir"]) # => Array contains operator @>
# => Checks if tags array contains ["elixir"]
# => E.g., ["elixir", "phoenix"] @> ["elixir"] = true
# => Result: posts tagged with "elixir"
# Full-text search
results = from p in Post, # => Full-text search query
# => Uses PostgreSQL tsvector and tsquery
where: fragment("to_tsvector('english', ?) @@ plainto_tsquery('english', ?)",
p.title, ^search_term), # => to_tsvector converts title to searchable form
# => plainto_tsquery converts search term to query
# => @@ matches tsvector against tsquery
# => E.g., search_term="phoenix framework" finds titles with those words
select: p # => Returns matching Post structs
# => Result: posts matching search termKey Takeaway: Use :map for JSONB, {:array, :string} for arrays. Full-text search with tsvector. Use fragment/2 for database-specific SQL. Index JSONB and tsvector for performance.
Why It Matters: This Phoenix pattern is fundamental for building production web applications. Understanding this concept enables you to create robust, maintainable, and scalable applications.
Group 10: Performance
Example 56: Query Optimization - N+1 Prevention
Load related data efficiently to avoid N+1 queries where one query results in N additional queries.
%% N+1 problem vs preload solution
graph TD
A[❌ N+1 Problem] --> B[Query 1: SELECT * FROM posts]
B --> C[Query 2: SELECT * FROM authors WHERE id = 1]
B --> D[Query 3: SELECT * FROM authors WHERE id = 2]
B --> E[Query 4: SELECT * FROM authors WHERE id = 3]
B --> F[... N queries]
G[✅ Preload Solution] --> H[Query 1: SELECT * FROM posts]
H --> I[Query 2: SELECT * FROM authors WHERE id IN]
I --> J[Total: 2 queries]
style A fill:#CA9161,color:#fff
style G fill:#029E73,color:#fff
style J fill:#029E73,color:#fff
# ❌ N+1 Problem - 1 query + N queries
posts = Post |> Repo.all() # => SELECT * FROM posts (1 query)
for post <- posts do
author = Author |> where([a], a.id == ^post.author_id) |> Repo.one()
# => SELECT * FROM authors WHERE id = ? (N queries!)
end
# Total: 1 + N queries (if 100 posts = 101 queries!)
# ✅ Solution 1: Preload
posts = Post
|> preload(:author) # => Eager load authors
|> Repo.all()
# => 2 queries total: SELECT posts, SELECT authors WHERE id IN (...)
# ✅ Solution 2: Join (for aggregations)
posts = from p in Post,
join: a in assoc(p, :author), # => SQL JOIN
select: {p, a}
# => 1 query: SELECT posts.*, authors.* FROM posts JOIN authors
# ✅ Solution 3: Preload with nested associations
posts = Post
|> preload([comments: :author]) # => Loads comments and their authors
|> Repo.all()
# Query with EXPLAIN to see execution plan
results = Repo.all(from p in Post, preload: :author)
IO.inspect(Repo.explain(:all, Post))Key Takeaway: Use preload/1 to eager-load associations. Use join for aggregations and filtering. Always check your queries with EXPLAIN. Avoid fetching in loops.
Why It Matters: Associations model relationships between data entities. Understanding Ecto associations enables efficient data access patterns.
Example 57: Caching Strategies
Cache expensive operations to reduce database load and improve response time.
defmodule MyApp.CacheServer do # => Defines in-memory cache GenServer
# => Stores key-value pairs with TTL
use GenServer # => Imports GenServer behavior
# => Provides start_link, init, handle_call, etc.
def start_link(_opts) do # => Public function: start_link/1
# => Called by supervisor to start cache process
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
# => Starts GenServer with empty map as initial state
# => Registers process with module name
# => Returns {:ok, pid}
end # => End start_link/1 function
def get_cache(key) do # => Public API: get_cache/1
# => Synchronous retrieval of cached value
GenServer.call(__MODULE__, {:get, key}) # => Sends synchronous message to cache process
# => Blocks caller until response received
# => Returns {:ok, value} or :not_found
end # => End get_cache/1 function
def set_cache(key, value, ttl_ms) do # => Public API: set_cache/3
# => Asynchronous cache write with TTL
GenServer.cast(__MODULE__, {:set, key, value, ttl_ms})
# => Sends async message (fire-and-forget)
# => Returns :ok immediately without blocking
# => Value stored with expiration time
end # => End set_cache/3 function
@impl true # => Implements GenServer callback
def init(state) do # => Initializes GenServer state
# => Called when process starts
{:ok, state} # => Returns {:ok, initial_state}
# => State is empty map %{}
end # => End init/1 callback
@impl true # => Implements GenServer callback
def handle_call({:get, key}, _from, state) do # => Handles synchronous get requests
# => Pattern matches {:get, key} message
case Map.get(state, key) do # => Retrieves value from state map
# => Result: {value, expires_at} or nil
{value, expires_at} when expires_at > System.monotonic_time(:millisecond) ->
# => Pattern match: value exists AND not expired
# => Compares expiration time with current time
{:reply, {:ok, value}, state} # => Cache hit, not expired
# => Returns value to caller, keeps state unchanged
_ -> # => Catch-all: cache miss or expired
# => Either key doesn't exist OR past expiration
{:reply, :not_found, state} # => Cache miss or expired
# => Returns :not_found to caller
end # => End case statement
end # => Synchronous call, blocks caller until reply
@impl true # => Implements GenServer callback
def handle_cast({:set, key, value, ttl_ms}, state) do
# => Handles asynchronous set requests
# => Pattern matches {:set, key, value, ttl_ms} message
expires_at = System.monotonic_time(:millisecond) + ttl_ms # => Calculate expiry
# => Current monotonic time + TTL in milliseconds
# => E.g., current=1000, ttl=5000 → expires_at=6000
{:noreply, Map.put(state, key, {value, expires_at})}
# => Updates state map with new cache entry
# => Stores tuple {value, expires_at} as value
# => Returns {:noreply, new_state}
end # => Asynchronous, returns immediately without blocking
end # => End MyApp.CacheServer module
# Or use Cachex library
defmodule MyApp.Blog do # => Example using Cachex library
# => Production-ready caching solution
def get_popular_posts do # => Public function: get_popular_posts/0
# => Caches expensive database query
case Cachex.get(:blog_cache, "popular_posts") do # => Checks cache first
# => :blog_cache is cache name (configured in app)
# => "popular_posts" is cache key
{:ok, nil} -> # => Cache miss: no cached value
# => Need to fetch from database
posts = Post |> where([p], p.likes > 100) |> Repo.all()
# => Expensive database query
# => SELECT * FROM posts WHERE likes > 100
# => Returns list of popular posts
Cachex.put(:blog_cache, "popular_posts", posts, ttl: :timer.minutes(60))
# => Stores result in cache with 60-minute TTL
# => Future requests served from cache
# => TTL: cache expires after 1 hour
posts # => Returns posts to caller
# => First request: slow (database query)
{:ok, posts} -> # => Cache hit: value exists in cache
# => posts already fetched from cache
posts # => Returns cached posts immediately
# => Subsequent requests: fast (no database query)
end # => End case statement
end # => End get_popular_posts/0 function
end # => End MyApp.Blog moduleKey Takeaway: Cache expensive queries with TTL (time-to-live). Invalidate cache when data changes. Use Cachex for distributed caching. Cache at controller or service layer.
Why It Matters: Controllers implement the request-response pattern that forms the backbone of web applications. Understanding Phoenix controllers enables proper separation of concerns and clean HTTP interface design.
Example 58: Background Jobs with Oban
Execute long-running tasks asynchronously. Schedule recurring jobs.
%% Oban job processing flow
graph TD
A[User Registration] --> B[Insert Oban Job]
B --> C[oban_jobs table]
C --> D[Oban Worker Pool]
D --> E[EmailWorker.perform]
E --> F{Success?}
F -->|Yes| G[Mark completed]
F -->|No| H[Retry with backoff]
H --> D
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#fff
style G fill:#029E73,color:#fff
style H fill:#CC78BC,color:#fff
# lib/my_app/workers/email_worker.ex
defmodule MyApp.Workers.EmailWorker do
use Oban.Worker, queue: :default
@impl Oban.Worker
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
user = MyApp.Accounts.get_user!(user_id) # => Load user
MyApp.Mailer.send_welcome_email(user) # => Send email
:ok # => Mark job complete
end # => Return :ok for success, {:error, reason} to retry
end
# In your controller/action
defmodule MyAppWeb.UserController do
def create(conn, %{"user" => user_params}) do
case MyApp.Accounts.create_user(user_params) do
{:ok, user} ->
# Queue background job
%{"user_id" => user.id}
|> MyApp.Workers.EmailWorker.new() # => Build job struct
|> Oban.insert() # => Insert into oban_jobs table
json(conn, user) # => Respond immediately
{:error, changeset} ->
error_response(conn, changeset)
end
end
end
# Schedule recurring jobs
defmodule MyApp.Application do
def start(_type, _args) do
children = [
# ... other children
Oban,
# Schedule daily cleanup at 2 AM
{Oban.Cron, crontab: [
{"0 2 * * *", MyApp.Workers.CleanupWorker}
]}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
endKey Takeaway: Create Worker modules implementing Oban.Worker. Queue jobs asynchronously. Implement retry logic. Use Oban for background processing and cron jobs.
Why It Matters: Process-based architecture enables horizontal scaling and fault isolation. Understanding OTP processes is key to building highly available systems.
Example 59: Phoenix LiveDashboard for Metrics
Monitor your application in real-time with LiveDashboard. View requests, Ecto stats, processes.
# mix.exs
defp deps do # => Dependency list for LiveDashboard
# => Requires 3 libraries for monitoring
[
{:phoenix_live_dashboard, "~> 0.7"}, # => Main dashboard library
# => Provides web UI for metrics/monitoring
# => Version ~> 0.7 (compatible with 0.7.x)
{:telemetry_metrics, "~> 0.6"}, # => Metrics aggregation library
# => Collects and processes telemetry events
# => Required by LiveDashboard
{:telemetry_poller, "~> 1.0"} # => Periodic metric collection
# => Polls VM metrics (memory, processes, etc.)
# => Runs collection tasks on interval
]
end
# lib/my_app_web/telemetry.ex
defmodule MyAppWeb.Telemetry do # => Defines Telemetry supervisor module
# => Manages metric collection processes
use Supervisor # => Imports Supervisor behavior
# => Provides supervision functions
def start_link(arg) do # => Public function: start_link/1
# => Called by application supervisor
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
# => Starts supervisor process
# => Registers with module name
# => Returns {:ok, pid}
end # => End start_link/1 function
@impl true # => Implements Supervisor callback
def init(_arg) do # => Initializes supervisor
# => Defines child processes to supervise
children = [ # => List of child specifications
# => Each child runs independently
# Telemetry poller periodically collects metrics
{:telemetry_poller, handlers: handle_metrics()},
# => Starts telemetry_poller process
# => Calls handle_metrics() to get metric definitions
# => Polls metrics on configured interval
{Phoenix.LiveDashboard.TelemetryListener, []} # => Starts LiveDashboard listener
# => Listens for telemetry events
# => Updates dashboard UI in real-time
] # => End children list
Supervisor.init(children, strategy: :one_for_one) # => Initializes supervisor with children
# => Strategy :one_for_one: restart only failed child
# => If poller crashes, restart poller (not listener)
end # => End init/1 callback
defp handle_metrics do # => Private function: handle_metrics/0
# => Returns list of metrics to collect
[
# VM metrics
{:process_count, unit: {:byte, :kilobyte}}, # => Metric: number of Erlang processes
# => Unit conversion: bytes to kilobytes
# => Monitors process count over time
{:memory, unit: {:byte, :kilobyte}}, # => Metric: total VM memory usage
# => Converts bytes to KB for display
# => Tracks memory consumption
# Ecto metrics
{MyApp.Repo, [:repo, :adapter, :status], :ok}, # => Metric: database connection status
# => Event: MyApp.Repo emits [:repo, :adapter, :status]
# => Tracks connection health
{MyApp.Repo, [:repo, :adapter, :connection_error], :ok}
# => Metric: database connection errors
# => Event: emitted when connection fails
# => Monitors database issues
] # => End metrics list
end # => End handle_metrics/0 function
end # => End MyAppWeb.Telemetry module
# router.ex
import Phoenix.LiveDashboard.Router # => Imports LiveDashboard routing macros
# => Provides live_dashboard/2 macro
scope "/" do # => Defines root scope
# => Routes accessible from /
pipe_through :browser # => Uses browser pipeline
# => Applies session, CSRF, cookies
live_dashboard "/dashboard" # => Mounts LiveDashboard at /dashboard route
# => Creates LiveView route
# => Access at http://localhost:4000/dashboard
# => Shows metrics, processes, Ecto stats in UI
end # => End scopeKey Takeaway: Phoenix LiveDashboard shows real-time metrics. Monitor request performance, database connections, memory usage, processes. Access at /dashboard.
Why It Matters: Process-based architecture enables horizontal scaling and fault isolation. Understanding OTP processes is key to building highly available systems.
Example 60: Custom Metrics with Telemetry
Emit custom metrics to track business logic and application behavior.
defmodule MyApp.Blog do
def create_post(attrs) do
start_time = System.monotonic_time()
case %Post{}
|> Post.changeset(attrs)
|> Repo.insert() do
{:ok, post} ->
duration = System.monotonic_time() - start_time
:telemetry.execute(
[:blog, :post, :created],
%{duration: duration},
%{post_id: post.id, user_id: attrs["user_id"]}
)
{:ok, post}
{:error, changeset} ->
{:error, changeset}
end
end
end
# Listen to events
defmodule MyApp.TelemetryHandler do
def attach_handlers do
:telemetry.attach(
"blog-post-created",
[:blog, :post, :created],
&__MODULE__.handle_post_created/4,
nil
)
end
def handle_post_created(_event, measurements, metadata, _config) do
IO.inspect({measurements, metadata})
# Send to monitoring service, log, increment counter, etc.
end
end
# In your application startup
MyApp.TelemetryHandler.attach_handlers()Key Takeaway: Use :telemetry.execute/3 to emit metrics. Attach handlers with :telemetry.attach/4. Track custom business metrics for monitoring and alerting.
Why It Matters: This Phoenix pattern is fundamental for building production web applications. Understanding this concept enables you to create robust, maintainable, and scalable applications.
Group 11: Production Deployment
Example 61: Mix Releases for Production
Build self-contained release that runs without Elixir/Erlang installed. Configure runtime settings.
# mix.exs
def project do # => Project configuration function
# => Defines app metadata and release settings
[
app: :my_app, # => Application name (atom)
# => Used for release binary name
version: "0.1.0", # => Current version string
# => Follows semantic versioning
releases: [ # => Release configuration map
# => Defines production release settings
prod: [ # => Release name: "prod"
# => Can have multiple releases (prod, staging, etc.)
include_executables_for: [:unix], # => Include Unix executables (start, stop scripts)
# => Targets Linux/macOS platforms
# => Excludes Windows executables
steps: [:assemble, :tar] # => Release build steps
# => :assemble - builds release structure
# => :tar - creates .tar.gz archive for deployment
]
]
]
end
# config/runtime.exs - Loaded at runtime, not compile time
import Config # => Imports Config macros
# => Enables config/2 function
config :my_app, MyAppWeb.Endpoint, # => Configures Phoenix Endpoint
# => Settings applied when release starts
http: [ip: {0, 0, 0, 0}, port: String.to_integer(System.get_env("PORT") || "4000")],
# => HTTP server configuration
# => Bind to all interfaces (0.0.0.0) for Docker
# => ip: {0,0,0,0} accepts connections from any IP
# => port: reads PORT env var or defaults to 4000
# => E.g., PORT=8080 → server listens on 8080
secret_key_base: System.fetch_env!("SECRET_KEY_BASE"), # => Read from env var
# => Fetches SECRET_KEY_BASE environment variable
# => Crashes if not set (fetch_env! requires it)
# => Used for session encryption
url: [host: System.get_env("PHX_HOST", "localhost"), port: 443, scheme: "https"]
# => URL configuration for generated URLs
# => HTTPS for production
# => host: reads PHX_HOST or defaults to "localhost"
# => port: 443 (standard HTTPS port)
# => scheme: "https" (forces HTTPS URLs)
if config_env() == :prod do # => Production-only configuration
# => Only applies when MIX_ENV=prod
config :my_app, MyApp.Repo, # => Configures Ecto Repo for production
# => Database connection settings
url: System.fetch_env!("DATABASE_URL"), # => Reads DATABASE_URL from environment
# => E.g., "ecto://user:pass@host/db"
# => Crashes if DATABASE_URL not set
ssl: true, # => Enables SSL for database connection
# => Required for most cloud databases
socket_options: [:inet6] # => Enables IPv6 support
# => Allows IPv6 database connections
end # => End production config block
# Build release
# mix release # => Command: builds default release
# => Creates _build/prod/rel/my_app/ directory
# => Packages app, dependencies, ERTS runtime
# Or with custom name
# mix release prod # => Command: builds "prod" named release
# => Uses prod release configuration from mix.exs
# => Creates tarball for deployment
# Run release
# _build/prod/rel/my_app/bin/my_app start # => Command: starts release as daemon
# => Runs in background
# => Logs to _build/prod/rel/my_app/log/
# Start in foreground
# _build/prod/rel/my_app/bin/my_app foreground # => Command: starts release in foreground
# => Logs to stdout/stderr
# => Useful for Docker containers
# => Ctrl+C stops the applicationKey Takeaway: Mix.Release builds independent package. config/runtime.exs loads at runtime. Set environment variables for secrets. Releases don’t require Elixir installation.
Why It Matters: Runtime configuration enables environment-specific settings. This pattern allows the same release to run in different environments without rebuilding.
Example 62: Docker Containerization with Multi-Stage Build
Build optimized Docker image with minimal size and security.
%% Docker multi-stage build
graph TD
A[Stage 1: Builder] --> B[elixir:1.14-alpine]
B --> C[Install build deps]
C --> D[Copy source code]
D --> E[mix deps.get]
E --> F[MIX_ENV=prod mix release]
G[Stage 2: Runtime] --> H[alpine:latest]
H --> I[Install runtime deps]
I --> J[Copy release from builder]
J --> K[Final image 50MB]
style A fill:#0173B2,color:#fff
style F fill:#DE8F05,color:#fff
style G fill:#029E73,color:#fff
style K fill:#029E73,color:#fff
# Dockerfile - Multi-stage build
FROM elixir:1.14-alpine AS builder # => Stage 1: Builder image
# => Uses Elixir 1.14 on Alpine Linux (minimal)
# => AS builder: names this stage for later reference
WORKDIR /app # => Sets working directory to /app
# => All subsequent commands run from /app
# Install build dependencies
RUN apk add --no-cache build-base git # => Installs build tools
# => build-base: gcc, make, etc. for native deps
# => git: for git-based dependencies
# => --no-cache: doesn't store package index (smaller image)
# Copy source
COPY mix.exs mix.lock ./ # => Copies dependency files first
# => Enables Docker layer caching
# => If mix.exs unchanged, deps layer cached
RUN mix local.hex --force && mix local.rebar --force # => Installs Hex package manager
# => Installs Rebar build tool
# => --force: non-interactive installation
RUN mix deps.get # => Downloads dependencies
# => Fetches from Hex and Git sources
# => Cached if mix.exs/mix.lock unchanged
COPY . . # => Copies entire application code
# => Runs AFTER deps.get for better caching
# => Invalidates cache only when code changes
# Compile release
RUN MIX_ENV=prod mix release # => Builds production release
# => MIX_ENV=prod: production optimizations
# => Creates self-contained package
# => Output: _build/prod/rel/my_app/
# Runtime stage
FROM alpine:latest # => Stage 2: Runtime image
# => Minimal Alpine Linux (~5MB base)
# => No Elixir, no build tools (small image)
WORKDIR /app # => Sets working directory
# => Same as builder for consistency
# Install runtime dependencies
RUN apk add --no-cache openssl # => Installs OpenSSL library
# => Required for HTTPS/SSL connections
# => Only runtime dependency needed
# Copy release from builder
COPY --from=builder /app/_build/prod/rel/my_app ./ # => Copies compiled release from builder stage
# => --from=builder: references first stage
# => Only copies final binary (not source)
# => Dramatically reduces image size (50MB vs 500MB+)
EXPOSE 4000 # => Documents that container listens on port 4000
# => Doesn't actually publish port (use docker run -p)
# => Informational for users
# Health check
HEALTHCHECK --interval=10s --timeout=3s --start-period=40s --retries=3 \
# => Configures container health check
# => --interval=10s: check every 10 seconds
# => --timeout=3s: fail if check takes >3s
# => --start-period=40s: grace period on startup
# => --retries=3: mark unhealthy after 3 failures
CMD ["./bin/my_app", "eval", "MyApp.ready?"] # => Health check command
# => Evaluates Elixir expression MyApp.ready?
# => Should return true when app ready
# => Kubernetes uses this for liveness/readiness
CMD ["./bin/my_app", "start"] # => Default command when container starts
# => Starts Phoenix application
# => Runs in foreground (required for Docker)Key Takeaway: Multi-stage builds keep image small. Builder stage compiles, runtime stage runs. Use Alpine Linux for minimal footprint. Include health checks.
Why It Matters: Health checks enable load balancer integration and monitoring. This is essential for zero-downtime deployments and auto-scaling.
Example 63: Health Checks for Kubernetes
Implement liveness and readiness endpoints for orchestration systems to manage pod lifecycle.
%% Health check flow in Kubernetes
graph TD
A[Kubernetes] --> B{Liveness Probe}
B -->|GET /health/live| C[Health endpoint]
C --> D{App alive?}
D -->|Yes 200| E[Keep running]
D -->|No 5xx| F[Restart pod]
A --> G{Readiness Probe}
G -->|GET /health/ready| H[Health endpoint]
H --> I{DB connected?}
I -->|Yes 200| J[Send traffic]
I -->|No 503| K[Remove from load balancer]
style A fill:#0173B2,color:#fff
style E fill:#029E73,color:#fff
style F fill:#CA9161,color:#fff
style J fill:#029E73,color:#fff
style K fill:#CC78BC,color:#fff
# lib/my_app_web/controllers/health_controller.ex
defmodule MyAppWeb.HealthController do
use MyAppWeb, :controller
def readiness(conn, _params) do
# Check if app is ready to serve traffic
case check_database() do
:ok ->
json(conn, %{status: "ok"}) # => 200 OK, ready
:error ->
conn
|> put_status(:service_unavailable) # => 503 Service Unavailable
|> json(%{status: "error", reason: "database_unavailable"})
# => Kubernetes removes from load balancer
end
end # => Checked every 5 seconds
def liveness(conn, _params) do
# Check if app is alive (should restart if not)
json(conn, %{status: "ok"}) # => Always returns 200
end # => If this fails, Kubernetes restarts pod
defp check_database do
case Ecto.Adapters.SQL.query(MyApp.Repo, "SELECT 1", []) do
{:ok, _} -> :ok
{:error, _} -> :error
end
end
end
# router.ex
scope "/health", MyAppWeb do
get "/ready", HealthController, :readiness
get "/live", HealthController, :liveness
end
# Kubernetes deployment yaml
# livenessProbe:
# httpGet:
# path: /health/live
# port: 4000
# initialDelaySeconds: 30
# periodSeconds: 10
# readinessProbe:
# httpGet:
# path: /health/ready
# port: 4000
# initialDelaySeconds: 5
# periodSeconds: 5Key Takeaway: Readiness probe indicates if app can handle traffic. Liveness probe indicates if app needs restart. Health endpoints check critical dependencies (database, cache).
Why It Matters: Endpoints configure the HTTP entry point for your application. Understanding endpoint configuration is essential for performance tuning and security.
Example 64: Graceful Shutdown
Handle shutdown signals gracefully, completing in-flight requests before terminating.
# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
MyAppWeb.Telemetry,
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
MyAppWeb.Endpoint
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
@impl true
def config_change(changed, _new, removed) do
MyAppWeb.Endpoint.config_change(changed, removed)
end
@impl true
def prep_stop(_state) do
# Called before shutdown
# Drain in-flight requests
IO.puts("Shutting down gracefully...") # => Log shutdown
:ok
end # => Application.stop/1 waits for this to return
end
# In endpoint config
config :my_app, MyAppWeb.Endpoint,
# Graceful shutdown timeout (milliseconds)
shutdown: 25_000,
# Give existing connections time to close
drain_on_stop: trueKey Takeaway: prep_stop/1 gives app chance to drain requests. Set shutdown timeout. Complete in-flight work before terminating. Important for zero-downtime deployments.
Why It Matters: Deployment releases enable zero-downtime updates and easy rollbacks. Understanding OTP releases is essential for production Phoenix applications.
Example 65: Environment Configuration Management
Separate configuration by environment. Use runtime configuration for secrets.
# config/config.exs - Common to all environments
import Config # => Imports Config module
# => Provides config/2 and config/3 macros
config :logger, :console, # => Configures Logger console backend
# => Applies to all environments (dev, test, prod)
format: "$time $metadata[$level] $message\n" # => Log format template
# => $time: timestamp, $level: info/warn/error
# => $message: log message, $metadata: custom data
# config/dev.exs
import Config # => Development environment config
# => Only loaded when MIX_ENV=dev
config :my_app, MyApp.Repo, # => Ecto Repo configuration for dev
# => Local PostgreSQL database
username: "postgres", # => Database username
# => Default PostgreSQL user
password: "postgres", # => Database password
# => Hardcoded OK for dev (never in prod!)
hostname: "localhost", # => Database host
# => Connects to local PostgreSQL
port: 5432, # => PostgreSQL default port
# => Standard port number
database: "my_app_dev" # => Database name
# => Separate database for development
config :my_app, MyAppWeb.Endpoint, # => Phoenix Endpoint configuration
# => Dev-specific settings for fast iteration
debug_errors: true, # => Shows detailed error pages
# => Displays stack traces in browser
# => NEVER enable in production!
check_origin: false, # => Disables WebSocket origin checking
# => Allows connections from any origin in dev
# => Required for LiveView development
watchers: [esbuild: {Esbuild, :install_and_run, []}]
# => File watchers for asset compilation
# => Auto-rebuilds JS/CSS on file changes
# => esbuild watches and compiles assets
# config/test.exs
import Config # => Test environment configuration
# => Only loaded when MIX_ENV=test
config :my_app, MyApp.Repo, # => Ecto Repo configuration for tests
# => Separate database for test isolation
username: "postgres", # => Database username
# => Same as dev for simplicity
password: "postgres", # => Database password
# => Same as dev for simplicity
database: "my_app_test", # => Test database name
# => Separate from dev database
pool: Ecto.Adapters.SQL.Sandbox # => Connection pool for tests
# => Sandbox: wraps each test in transaction
# => Auto-rollback after each test
# => Ensures test isolation and cleanup
# config/runtime.exs - Loaded at runtime
import Config # => Runtime configuration
# => Loaded when release starts (NOT at compile time)
# => Enables environment-specific configuration
database_url = System.get_env("DATABASE_URL") || "ecto://postgres:postgres@localhost/my_app_prod"
# => Reads DATABASE_URL from environment
# => Falls back to default for local prod
# => E.g., "ecto://user:pass@host:5432/dbname"
# => Production: set by deployment platform
config :my_app, MyApp.Repo, url: database_url # => Configures Repo with database URL
# => Parses URL into connection params
# => Overrides compile-time config
secret_key_base = System.fetch_env!("SECRET_KEY_BASE")
# => Reads SECRET_KEY_BASE from environment
# => Crashes if not set (fetch_env! requires it)
# => Used for encrypting sessions/cookies
# => Generate with: mix phx.gen.secret
config :my_app, MyAppWeb.Endpoint, # => Endpoint runtime configuration
# => Settings loaded when app starts
secret_key_base: secret_key_base, # => Sets secret from environment
# => Required for session encryption
url: [host: System.get_env("PHX_HOST", "localhost"), port: 443, scheme: "https"]
# => URL configuration for generated links
# => host: reads PHX_HOST or defaults to localhost
# => port: 443 (HTTPS), scheme: "https"
# => Affects url_for/3 and route helpersKey Takeaway: config/ files configure at compile time. config/runtime.exs loads at runtime (for secrets). Use environment variables for production secrets. Never commit secrets to git.
Why It Matters: This Phoenix pattern is fundamental for building production web applications. Understanding this concept enables you to create robust, maintainable, and scalable applications.
Group 12: Resilience & Observability
Example 66: Error Tracking and Structured Logging
Send errors to Sentry. Log with context for debugging.
# mix.exs
defp deps do
[
{:sentry, "~> 10.0"},
{:logger_backends, "~> 1.1"},
{:jason, "~> 1.4"}
]
end
# config/config.exs
config :sentry,
dsn: System.get_env("SENTRY_DSN"),
environment_name: Mix.env(),
enable_in_test: false
# lib/my_app_web/router.ex
defmodule MyAppWeb.Router do
use MyAppWeb, :router
# Sentry captures errors
use Sentry.PlugContext
end
# Structured logging
defmodule MyApp.Blog do # => Defines Blog context module
# => Demonstrates structured logging patterns
require Logger # => Imports Logger macros
# => Enables compile-time logging optimization
def create_post(attrs) do # => Public function: create_post/1
# => Param: attrs map with post data
Logger.info("Creating post", %{user_id: attrs["user_id"], title: attrs["title"]})
# => Logs informational message with structured metadata
# => First arg: string message
# => Second arg: metadata map with context
# => E.g., [info] Creating post user_id=123 title="Hello"
# => Structured data parseable by log aggregators
case MyApp.Repo.insert(changeset) do # => Attempts to insert post into database
# => Returns {:ok, post} or {:error, changeset}
{:ok, post} -> # => Success case: post created
# => post is inserted struct with ID
Logger.info("Post created successfully", %{post_id: post.id})
# => Logs success with post ID
# => Metadata includes database-generated ID
# => Enables tracking specific post through logs
{:ok, post} # => Returns success tuple to caller
# => Caller receives created post struct
{:error, changeset} -> # => Failure case: validation errors
# => changeset contains error details
Logger.warning("Post creation failed", %{ # => Logs warning (not error - expected case)
# => Warning level: operation failed but handled
errors: changeset.errors, # => Includes validation errors
# => E.g., [{:title, {"is required", []}}]
user_id: attrs["user_id"] # => Includes user context for debugging
# => Helps correlate failures to users
})
{:error, changeset} # => Returns error tuple to caller
# => Caller can display validation errors
end # => End case statement
rescue # => Rescue clause catches unexpected exceptions
# => Only for crashes, not {:error, ...} returns
e -> # => Binds exception to variable e
# => e is exception struct (e.g., ArithmeticError)
Logger.error("Post creation crashed", # => Logs error (unexpected crash)
# => Error level: requires investigation
error: inspect(e), # => Logs exception details
# => inspect/1 converts to readable string
# => E.g., %ArithmeticError{message: "divide by zero"}
stacktrace: Exception.format_stacktrace(__STACKTRACE__)
# => Logs formatted stack trace
# => __STACKTRACE__: built-in macro with call stack
# => Shows file, line, function for each frame
)
{:error, "Internal error"} # => Returns generic error to caller
# => Doesn't expose exception details to user
# => Full details in logs/Sentry
end # => End rescue clause
end # => End MyApp.Blog moduleKey Takeaway: Sentry captures production errors. Structured logging adds context. Use Logger.info/warn/error with metadata maps. Include request IDs for tracing.
Why It Matters: Contexts provide bounded modules for organizing business logic. This pattern enables clean API boundaries between different parts of your application and improves maintainability.
Example 67: Rate Limiting with Token Bucket
Prevent abuse by limiting requests per IP or user. Token bucket algorithm refills over time.
%% Rate limiting state machine
stateDiagram-v2
[*] --> CheckBucket: Request arrives
CheckBucket --> HasTokens: Tokens available
CheckBucket --> NoTokens: Bucket empty
HasTokens --> ConsumeToken: Take 1 token
ConsumeToken --> AllowRequest: Process request
NoTokens --> RejectRequest: 429 Too Many Requests
AllowRequest --> [*]
RejectRequest --> [*]
note right of CheckBucket: Bucket refills over time
note right of NoTokens: Client must wait
defmodule MyApp.RateLimiter do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def allow_request?(key, max_requests, time_window_ms) do
GenServer.call(__MODULE__, {:check, key, max_requests, time_window_ms})
end
@impl true
def init(_opts) do
{:ok, %{}}
end
@impl true
def handle_call({:check, key, max_requests, time_window_ms}, _from, state) do
now = System.monotonic_time(:millisecond)
{requests, state} = case Map.get(state, key) do
{count, reset_time} when reset_time > now ->
{count, state} # => Window still active
_ ->
# Reset bucket
reset_time = now + time_window_ms # => New window starts
{0, Map.put(state, key, {0, reset_time})} # => Reset counter
end # => Token bucket refills
if requests < max_requests do
{count, reset_time} = Map.get(state, key)
new_state = Map.put(state, key, {count + 1, reset_time}) # => Consume token
{:reply, :ok, new_state} # => Allow request
else
{:reply, :rate_limited, state} # => Reject request
end # => Returns 429 Too Many Requests
end
end
# Plug for rate limiting
defmodule MyAppWeb.Plugs.RateLimit do
def init(opts), do: opts
def call(conn, opts) do
max_requests = opts[:max_requests] || 100
time_window = opts[:time_window] || 60_000
key = conn.remote_ip |> Tuple.to_list() |> Enum.join(".")
case MyApp.RateLimiter.allow_request?(key, max_requests, time_window) do
:ok -> conn
:rate_limited ->
conn
|> put_status(:too_many_requests)
|> json(%{error: "Rate limit exceeded"})
|> halt()
end
end
endKey Takeaway: Rate limiting prevents abuse. Token bucket algorithm is fair and flexible. Apply per IP or per user. Return 429 Too Many Requests when limited.
Why It Matters: Event-driven patterns decouple components and enable scalable architectures. Understanding events is key to building maintainable Phoenix applications.
Example 68: Distributed Phoenix Clustering
Connect multiple Phoenix instances for distributed state and fault tolerance.
%% Distributed Phoenix cluster
graph TD
A[Load Balancer] --> B[Phoenix Node 1]
A --> C[Phoenix Node 2]
A --> D[Phoenix Node 3]
B <-->|Distributed Erlang| C
C <-->|Distributed Erlang| D
B <-->|Distributed Erlang| D
B --> E[PubSub broadcasts to all nodes]
C --> E
D --> E
E --> F[PostgreSQL]
style A fill:#0173B2,color:#fff
style B fill:#029E73,color:#fff
style C fill:#029E73,color:#fff
style D fill:#029E73,color:#fff
style F fill:#DE8F05,color:#fff
# mix.exs
defp deps do
[
{:libcluster, "~> 3.3"},
{:observer_cli, "~> 1.7"}
]
end
# config/config.exs
config :libcluster,
topologies: [
example: [
strategy: Cluster.Strategy.Kubernetes.DNS, # => Auto-discover in K8s
config: [
service: "my_app-headless", # => Headless service name
namespace: "default" # => K8s namespace
]
]
] # => Nodes connect automatically via DNS
# Or with fixed IPs
config :libcluster,
topologies: [
fixed: [
strategy: Cluster.Strategy.Static,
config: [
nodes: [:"app1@10.0.0.1", :"app2@10.0.0.2", :"app3@10.0.0.3"]
]
]
]
# lib/my_app/cluster.ex
defmodule MyApp.Cluster do
def start_link(opts) do
Supervisor.start_link(
[{:libcluster, Cluster.Strategy.Kubernetes.DNS, [topologies: topologies()]}],
opts
)
end
defp topologies do
Application.get_env(:libcluster, :topologies)
end
end
# Distributed PubSub across nodes
broadcast_all = fn event, data ->
MyApp.PubSub
|> Phoenix.PubSub.broadcast("topic", {:event, event, data})
# => Sends to ALL nodes in cluster
end # => Every connected LiveView receives update
# List connected nodes
Node.list()Key Takeaway: libcluster connects nodes automatically. Distribute PubSub across cluster. All nodes share state. Provides fault tolerance.
Why It Matters: PubSub enables event-driven communication between processes. This pattern powers real-time broadcasts and decoupled system architecture.
Example 69: WebSocket Load Balancing with Sticky Sessions
Route WebSocket connections to same server. Use load balancer affinity.
# nginx.conf - Sticky sessions by IP
upstream my_app { # => Defines upstream server group "my_app"
# => Load balancer distributes requests across these
server app1:4000; # => Backend server 1 at app1:4000
# => Phoenix node 1
server app2:4000; # => Backend server 2 at app2:4000
# => Phoenix node 2
server app3:4000; # => Backend server 3 at app3:4000
# => Phoenix node 3
hash $remote_addr consistent; # => Sticky by IP
# => Routes same IP to same backend server
# => consistent: uses consistent hashing
# => Required for WebSocket persistence
} # => End upstream block
server { # => Defines HTTP server block
# => Handles incoming requests
listen 80; # => Listens on port 80 (HTTP)
# => Standard HTTP port
server_name my_app.com; # => Server domain name
# => Only processes requests for my_app.com
location / { # => Matches all paths (/)
# => Applies to all routes
proxy_pass http://my_app; # => Proxies requests to my_app upstream
# => Load balances across app1/app2/app3
proxy_http_version 1.1; # => Uses HTTP/1.1 protocol
# => Required for WebSocket upgrades
# WebSocket headers
proxy_set_header Upgrade $http_upgrade; # => Forwards Upgrade header
# => Enables WebSocket protocol upgrade
# => $http_upgrade: request's Upgrade value
proxy_set_header Connection "upgrade"; # => Sets Connection header to "upgrade"
# => Required for WebSocket handshake
# => Tells backend to upgrade connection
# Preserve client IP
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# => Forwards client IP address
# => Appends to X-Forwarded-For chain
# => Phoenix reads from this header
proxy_set_header X-Real-IP $remote_addr; # => Sets original client IP
# => $remote_addr: nginx's client IP variable
# => Phoenix uses for rate limiting/logging
} # => End location block
} # => End server block
# HAProxy.cfg - Also supports sticky sessions
backend phoenix_nodes # => Defines HAProxy backend group
# => Alternative to nginx for sticky sessions
balance roundrobin # => Load balancing algorithm: round robin
# => Distributes requests evenly (with cookie affinity)
cookie SERVERID insert indirect nocache # => Cookie-based sticky sessions
# => insert: HAProxy adds SERVERID cookie
# => indirect: doesn't forward cookie to backend
# => nocache: prevents caching of Set-Cookie
server node1 app1:4000 check cookie node1 # => Backend server 1
# => check: health checks enabled
# => cookie node1: assigns cookie value "node1"
# => Clients get SERVERID=node1 cookie
server node2 app2:4000 check cookie node2 # => Backend server 2
# => Clients routed here get SERVERID=node2
server node3 app3:4000 check cookie node3 # => Backend server 3
# => HAProxy routes based on cookie value
# => WebSockets stay on same serverKey Takeaway: WebSockets require persistent connections to same server. Use sticky sessions (by IP or cookie). Load balancer must preserve connection. Forward X-Forwarded-For headers.
Why It Matters: Phoenix sockets enable efficient bidirectional communication for real-time features. This is essential for chat, notifications, and collaborative editing applications.
Example 70: Blue-Green Deployment for Zero-Downtime Releases
Run two production environments. Switch traffic after verifying new version works.
%% Blue-green deployment flow
graph TD
A[Current: Blue v1.0] --> B[Deploy Green v1.1]
B --> C[Test Green in isolation]
C --> D{Tests pass?}
D -->|Yes| E[Switch traffic to Green]
D -->|No| F[Keep Blue, fix Green]
E --> G[Green is now production]
G --> H[Blue becomes standby]
H --> I[Rollback available]
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#fff
style E fill:#029E73,color:#fff
style F fill:#CA9161,color:#fff
style I fill:#CC78BC,color:#fff
# Blue-Green deployment with two releases
#!/bin/bash # => Bash script for blue-green deployment
# => Zero-downtime deployment strategy
# Current version (blue)
BLUE_VERSION=$(aws ecs describe-services --cluster prod --services my-app | \
jq -r '.services[0].taskDefinition' | \
grep -oP 'my-app:\K[^:]+') # => Gets current running version from AWS ECS
# => Queries ECS service task definition
# => Extracts version number from task definition
# => E.g., "my-app:v1.2.3" → BLUE_VERSION="v1.2.3"
# New version (green)
GREEN_VERSION=$(git describe --tags --always) # => Gets new version from Git tags
# => Uses latest Git tag or commit SHA
# => E.g., "v1.3.0" or "abc123def"
# Build and deploy green version
mix release # => Builds Elixir production release
# => Creates _build/prod/rel/my_app/
# => Self-contained package
docker build -t my-app:$GREEN_VERSION . # => Builds Docker image with new version tag
# => Uses Dockerfile in current directory
# => Tags image with GREEN_VERSION
# => E.g., my-app:v1.3.0
docker tag my-app:$GREEN_VERSION my-app:green # => Tags image with "green" alias
# => Creates second tag for same image
# => my-app:green always points to latest deployment
docker push my-app:green # => Pushes image to Docker registry
# => Uploads to ECR/DockerHub
# => Makes available to ECS cluster
# Register new task definition
aws ecs register-task-definition \
--family my-app \
--container-definitions "[{\"image\": \"my-app:$GREEN_VERSION\", ...}]"
# => Registers new ECS task definition
# => Task definition: container specs (image, CPU, memory)
# => Creates new revision with green version
# => E.g., my-app:47 (revision number increments)
# Update service to use green
aws ecs update-service \
--cluster prod \
--service my-app \
--task-definition my-app:$GREEN_VERSION # => Updates ECS service to use new task definition
# => Triggers deployment of green version
# => ECS starts new tasks with green containers
# => Blue tasks still running (zero downtime)
# Wait for deployment
aws ecs wait services-stable \
--cluster prod \
--services my-app # => Blocks until deployment completes
# => Waits for all green tasks to be healthy
# => Waits for old blue tasks to drain
# => Timeout after 10 minutes if not stable
# Test green version
curl http://green.my-app.com/health/ready # => Tests green version health endpoint
# => Verifies application is responding
# => Checks database connectivity
# => Returns HTTP 200 if ready
if [ $? -eq 0 ]; then # => Checks if curl succeeded (exit code 0)
# => $? is exit code of previous command
# Switch traffic from blue to green
aws route53 change-resource-record-sets \
--hosted-zone-id Z123 \
--change-batch "[{\"Action\": \"UPSERT\", \"ResourceRecordSet\": {\"Name\": \"my-app.com\", \"Type\": \"A\", \"TTL\": 60, \"ResourceRecords\": [{\"Value\": \"green-alb.aws.com\"}]}}]"
# => Updates DNS to point to green load balancer
# => UPSERT: creates or updates DNS record
# => Changes my-app.com A record to green ALB
# => TTL 60s: DNS caches for 1 minute
# => Traffic gradually shifts to green
echo "Deployment successful!" # => Logs success message
# => Green version now serving production traffic
else
# Rollback to blue
aws ecs update-service \
--cluster prod \
--service my-app \
--task-definition my-app:$BLUE_VERSION # => Rolls back to blue version
# => Updates service to use previous task definition
# => Keeps blue running, stops green
# => Fast rollback without re-deployment
fi # => End if statementKey Takeaway: Blue-green keeps current version running while deploying new version. Switch traffic only after verification. Can rollback instantly. Zero-downtime deployments.
Why It Matters: Deployment releases enable zero-downtime updates and easy rollbacks. Understanding OTP releases is essential for production Phoenix applications.
Example 71: Custom Ecto Types for Domain Logic
Create custom Ecto types to encapsulate domain logic and validation.
# lib/my_app/ecto_types/money.ex
defmodule MyApp.EctoTypes.Money do
use Ecto.Type
def type, do: :map # => Stored as JSONB in PostgreSQL
# Cast from user input
def cast(%{"amount" => amount, "currency" => currency}) when is_number(amount) do
{:ok, %{amount: amount, currency: currency}} # => Valid money struct
end
def cast(_), do: :error # => Invalid format
# Load from database
def load(%{"amount" => amount, "currency" => currency}) do
{:ok, %{amount: Decimal.new(amount), currency: currency}} # => Convert to Decimal
end
# Dump to database
def dump(%{amount: amount, currency: currency}) do
{:ok, %{"amount" => Decimal.to_float(amount), "currency" => currency}}
# => Store as JSON: {"amount": 19.99, "currency": "USD"}
end
def dump(_), do: :error
end
# Usage in schema
defmodule MyApp.Shop.Product do # => Defines Product schema using custom Money type
# => Demonstrates domain logic encapsulation
use Ecto.Schema # => Imports Ecto.Schema macros
# => Provides schema/2, field/3, etc.
schema "products" do # => Maps to "products" table
# => Auto-generates id, inserted_at, updated_at
field :name, :string # => Standard string field for product name
# => Will be accessible as product.name
field :price, MyApp.EctoTypes.Money # => Custom type
# => Uses Money Ecto type defined above
# => Handles validation, casting, db conversion
# => Accessible as product.price (map with amount/currency)
timestamps() # => Adds inserted_at, updated_at fields
# => Auto-managed by Ecto
end # => End schema definition
def changeset(product, attrs) do # => Public function: changeset/2
# => Validates and casts product attributes
product # => Starts with Product struct
# => Either new %Product{} or existing from database
|> cast(attrs, [:name, :price]) # => Cast price with custom type
# => Casts attrs to allowed fields
# => price calls Money.cast/1 automatically
# => Returns changeset
|> validate_required([:name, :price]) # => Validates required fields
# => Adds errors if name or price missing
|> validate_price() # => Domain validation
# => Custom business rule validation
end # => End changeset/2 function
defp validate_price(changeset) do # => Private function: validate_price/1
# => Domain-specific validation logic
case get_change(changeset, :price) do # => Gets changed price value
# => Returns nil if price unchanged
# => Returns money map if changed
%{amount: amount} when amount < 0 -> # => Pattern match: negative price
# => Guard: amount < 0
add_error(changeset, :price, "must be positive") # => Business rule
# => Adds validation error
# => Error: {:price, {"must be positive", []}}
# => Prevents negative prices
_ -> # => Catch-all: price valid or unchanged
# => Either positive amount or nil
changeset # => Returns unchanged changeset
# => No validation errors
end # => End case statement
end # => End validate_price/1 function
end # => End MyApp.Shop.Product module
# Create product
%Product{} # => Creates empty Product struct
# => %Product{name: nil, price: nil}
|> Product.changeset(%{ # => Builds changeset with attributes
# => Validates and casts input
name: "Widget", # => Product name
# => Simple string value
price: %{"amount" => 19.99, "currency" => "USD"} # => Input format
# => Plain map (user input format)
# => Money.cast/1 converts to internal format
})
|> Repo.insert() # => Inserts product into database
# => Stores: {"amount": 19.99, "currency": "USD"} in database
# => Money.dump/1 converts to JSONB
# => Database column stores JSON
# => Returns {:ok, %Product{}} on successKey Takeaway: Custom Ecto types encapsulate domain logic. Implement cast/1, load/1, dump/1, and type/0. Use Decimal for money to avoid floating-point errors.
Why It Matters: Ecto provides type-safe database interactions with compile-time query validation. This prevents SQL injection and catches query errors before deployment.
Example 72: Database Fragments for Advanced Queries
Use Ecto fragments for PostgreSQL-specific features and complex SQL.
# Full-text search with tsvector
defmodule MyApp.Blog do
import Ecto.Query
def search_posts(search_term) do
from p in Post,
where: fragment(
"to_tsvector('english', ?) @@ plainto_tsquery('english', ?)",
p.title, # => Search in title
^search_term # => User's search query
),
order_by: fragment(
"ts_rank(to_tsvector('english', ?), plainto_tsquery('english', ?)) DESC",
p.title,
^search_term # => Rank by relevance
),
select: %{
post: p,
rank: fragment("ts_rank(to_tsvector('english', ?), plainto_tsquery('english', ?))",
p.title, ^search_term) # => Include rank in results
}
|> Repo.all()
# => SELECT posts.*, ts_rank(...) FROM posts WHERE to_tsvector(...) @@ plainto_tsquery(...)
end
# JSONB queries
def posts_by_metadata(key, value) do
from p in Post,
where: fragment("? ->> ? = ?", p.metadata, ^key, ^value)
# => SELECT * FROM posts WHERE metadata->>'status' = 'draft'
|> Repo.all()
end
# Array contains
def posts_with_tag(tag) do
from p in Post,
where: fragment("? @> ?::jsonb", p.tags, ^[tag]) # => Array contains tag
|> Repo.all()
# => SELECT * FROM posts WHERE tags @> '["elixir"]'::jsonb
end
# Window functions
def posts_with_rank do # => Public function: posts_with_rank/0
# => Uses PostgreSQL window functions
from p in Post, # => Starts query from Post table
# => p is alias for posts
select: %{ # => Selects map with post and ranking fields
# => Returns list of maps, not Post structs
post: p, # => Includes full post struct
# => All post fields available
row_number: fragment("ROW_NUMBER() OVER (ORDER BY ? DESC)", p.inserted_at),
# => Window function: assigns sequential numbers
# => Assign sequential number
# => ROW_NUMBER(): 1, 2, 3... (no ties)
# => OVER: defines window (all rows)
# => ORDER BY inserted_at DESC: newest first
# => E.g., newest post = 1, second newest = 2
rank: fragment("RANK() OVER (ORDER BY ? DESC)", p.view_count)
# => Window function: ranks by view count
# => Rank by view count (ties get same rank)
# => RANK(): ties get same rank, next skips
# => E.g., 1000 views = rank 1, 1000 views = rank 1, 500 views = rank 3
# => ORDER BY view_count DESC: most views first
} # => End select map
|> Repo.all() # => Executes query
# => Returns list of maps with post, row_number, rank
end # => End posts_with_rank/0 function
# Common Table Expressions (CTEs)
def posts_with_comment_count do # => Public function: posts_with_comment_count/0
# => Demonstrates CTE (WITH clause) usage
comments_cte = from c in Comment, # => Defines CTE: counts comments per post
# => c is alias for comments table
group_by: c.post_id, # => Groups comments by post_id
# => Aggregates all comments for each post
select: %{post_id: c.post_id, count: count(c.id)}
# => Selects post_id and comment count
# => count(c.id): number of comments
# => Result: %{post_id: 1, count: 5}, %{post_id: 2, count: 3}
from p in Post, # => Main query from posts table
# => Joins with CTE result
left_join: c in subquery(comments_cte), on: c.post_id == p.id,
# => LEFT JOIN with CTE subquery
# => subquery: executes comments_cte inline
# => LEFT JOIN: includes posts with 0 comments
# => ON: matches post ID
select: %{post: p, comment_count: coalesce(c.count, 0)}
# => Selects post and comment count
# => coalesce(c.count, 0): returns 0 if NULL
# => Handles posts with no comments (LEFT JOIN NULL)
|> Repo.all() # => Executes query
# => WITH comments AS (...) SELECT posts.*, COALESCE(comments.count, 0)
# => Generates SQL with CTE
# => Result: [%{post: %Post{}, comment_count: 5}, ...]
end # => End posts_with_comment_count/0 function
end # => End moduleKey Takeaway: Use fragment/1 for database-specific SQL. Supports full-text search, JSONB queries, window functions, and CTEs. Always use ^pinned parameters to prevent SQL injection.
Why It Matters: JSON API patterns enable integration with mobile apps and external services. Phoenix makes building RESTful APIs straightforward with proper content negotiation.
Example 73: Query Profiling with Telemetry
Measure query performance to identify slow queries in production.
# lib/my_app/telemetry.ex
defmodule MyApp.Telemetry do
require Logger
def attach_handlers do
:telemetry.attach(
"my-app-ecto-query", # => Handler ID
[:my_app, :repo, :query], # => Event name
&__MODULE__.handle_query/4, # => Handler function
nil
)
end
def handle_query(_event, measurements, metadata, _config) do
query_time = measurements.total_time # => Time in native units
query_time_ms = System.convert_time_unit(query_time, :native, :millisecond)
if query_time_ms > 100 do # => Slow query threshold
Logger.warning("Slow query detected",
query: inspect(metadata.query), # => SQL query
params: inspect(metadata.params), # => Query parameters
time_ms: query_time_ms, # => Execution time
source: metadata.source # => Table name
)
# => Log: Slow query detected query="SELECT ..." time_ms=250
end
# Send to monitoring service
if query_time_ms > 1000 do # => Very slow (>1s)
MyApp.Monitoring.report_slow_query(%{
query: metadata.query,
time_ms: query_time_ms,
type: metadata.type # => :ecto_sql_query
})
end
end
end
# In application.ex
def start(_type, _args) do
MyApp.Telemetry.attach_handlers() # => Attach on startup
children = [
MyApp.Repo,
MyAppWeb.Endpoint
]
Supervisor.start_link(children, strategy: :one_for_one)
end
# Query all telemetry events
defmodule MyApp.QueryStats do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
:telemetry.attach_many(
"query-stats",
[
[:my_app, :repo, :query], # => Ecto queries
[:phoenix, :router_dispatch, :stop], # => HTTP requests
[:phoenix, :endpoint, :stop] # => Endpoint timing
],
&__MODULE__.handle_event/4,
%{stats: %{}}
)
{:ok, %{query_count: 0, total_time: 0}}
end
def handle_event([:my_app, :repo, :query], measurements, _metadata, state) do
state = %{
query_count: state.query_count + 1, # => Increment counter
total_time: state.total_time + measurements.total_time
}
{:ok, state}
end
def get_stats do
GenServer.call(__MODULE__, :get_stats) # => Retrieve stats
end
def handle_call(:get_stats, _from, state) do
avg_time = if state.query_count > 0 do
state.total_time / state.query_count # => Average query time
else
0
end
{:reply, %{
query_count: state.query_count,
avg_time_ms: System.convert_time_unit(avg_time, :native, :millisecond)
}, state}
end
endKey Takeaway: Attach telemetry handlers to [:repo, :query] events. Log slow queries (>100ms). Track query count and average time. Use for production monitoring and optimization.
Why It Matters: Telemetry enables observability and performance monitoring. This is critical for debugging production issues and understanding application behavior.
Example 74: Advanced LiveView Performance Optimization
Optimize LiveView rendering with targeted updates and efficient assigns.
defmodule MyAppWeb.DashboardLive do
use Phoenix.LiveView
# Temporary assigns - not tracked for diff
def mount(_params, _session, socket) do
if connected?(socket) do
:timer.send_interval(1000, self(), :tick) # => Update every second
end
{:ok,
socket
|> assign(:time, DateTime.utc_now()) # => Tracked assign
|> assign(:posts, load_posts())
|> assign_new(:expensive_data, fn -> load_expensive_data() end)}
# => Only computed on first mount, cached after
end
# Use temporary_assigns to avoid diffing large data
def handle_info(:tick, socket) do
{:noreply,
socket
|> assign(:time, DateTime.utc_now()) # => Only updates time
|> push_event("time-update", %{time: DateTime.to_string(socket.assigns.time)})}
# => Send JS event instead of re-rendering
end
# Optimize list rendering with streams
def handle_event("load_more", _params, socket) do
new_posts = load_more_posts(socket.assigns.last_id)
{:noreply, stream_insert(socket, :posts, new_posts)} # => Append to stream
# => Only new items rendered, existing items unchanged
end
# Debounce user input
def handle_event("search", %{"query" => query}, socket) do
Process.send_after(self(), {:search, query}, 300) # => Debounce 300ms
{:noreply, assign(socket, :search_query, query)}
end
def handle_info({:search, query}, socket) do
if socket.assigns.search_query == query do # => Check still current
results = search_posts(query) # => Execute search
{:noreply, assign(socket, :search_results, results)}
else
{:noreply, socket} # => User typed more, skip
end
end
# Use update/3 for atomic updates
def handle_event("increment_likes", %{"post_id" => id}, socket) do
{:noreply,
update(socket, :posts, fn posts ->
Enum.map(posts, fn post ->
if post.id == id do
%{post | likes: post.likes + 1} # => Update only this post
else
post # => Keep unchanged
end
end)
end)}
end
# Render only changed sections
def render(assigns) do # => Render callback: generates HTML
# => Called whenever assigns change
~H""" # => HEEx template (HTML + Elixir)
# => Compiled to efficient diff tracking
<div>
<.header time={@time} /> # => Separate component
# => Function component: header/1
# => Only re-renders when @time changes
# => Passes time={@time} as assign
<div id="posts" phx-update="stream"> # => Stream container for efficient list rendering
# => phx-update="stream": only patches changed items
# => No full list re-render on updates
<%= for {dom_id, post} <- @streams.posts do %>
# => Iterates over stream items
# => dom_id: unique DOM ID per item
# => post: actual post struct
<.post_card id={dom_id} post={post} /> # => Component per post
# => id={dom_id}: stable DOM ID for diffing
# => Only changed posts re-render
<% end %>
</div>
<.footer /> # => Static component
# => Never re-renders (no dynamic assigns)
</div>
"""
end # => End render/1 function
# Function components for granular updates
def header(assigns) do # => Function component: header/1
# => Isolated template for header section
~H""" # => HEEx template
# => LiveView diffs only this section
<header>
<time><%= @time %></time> # => Only this re-renders
# => When @time changes, only <time> updates
# => Rest of page unchanged (efficient)
</header>
"""
end # => End header/1 component
end # => End MyAppWeb.DashboardLive moduleKey Takeaway: Use assign_new/3 for expensive one-time computation. Prefer streams over lists for collections. Debounce rapid events. Use push_event for client-side updates. Break templates into small function components.
Why It Matters: Templates with HEEx enable component-based UI development with compile-time validation. This catches HTML errors during compilation rather than runtime, improving reliability.
Example 75: Production Debugging with Observer and LiveDashboard
Debug production issues with Observer, LiveDashboard, and remote IEx.
# Connect to remote production node
# On local machine:
# iex --name debug@127.0.0.1 --cookie production_cookie
# In IEx session:
Node.connect(:"my_app@production-server.com") # => Connect to prod node
Node.list() # => ["my_app@production-server.com"]
# Inspect running processes
Process.list() # => All Erlang processes
|> Enum.filter(fn pid ->
case Process.info(pid, :current_function) do
{:current_function, {module, _fun, _arity}} ->
String.starts_with?(to_string(module), "Elixir.MyApp")
_ -> false
end
end)
|> Enum.map(fn pid ->
{pid, Process.info(pid, [:memory, :message_queue_len, :current_function])}
end)
# => [{#PID<0.123.0>, [memory: 12345, message_queue_len: 0, ...]}, ...]
# Find process by name
pid = Process.whereis(MyApp.Worker) # => #PID<0.456.0>
Process.info(pid) # => Full process info
# Check process mailbox
Process.info(pid, :message_queue_len) # => {message_queue_len, 1000}
# => If high, process is backed up
# Inspect state of GenServer
:sys.get_state(pid) # => Current state map
# => %{pending: [], processing: 10, errors: []}
# Trace function calls
:dbg.tracer() # => Start tracer
:dbg.p(:all, :c) # => Trace all processes
:dbg.tp(MyApp.Blog, :create_post, 2, []) # => Trace function
# => Prints every call to MyApp.Blog.create_post/2
# LiveDashboard metrics
defmodule MyAppWeb.Telemetry do
def metrics do
[
# VM Metrics
last_value("vm.memory.total", unit: {:byte, :megabyte}),
last_value("vm.total_run_queue_lengths.total"), # => Scheduler queue
last_value("vm.total_run_queue_lengths.cpu"),
# Database Metrics
summary("my_app.repo.query.total_time",
unit: {:native, :millisecond},
tags: [:source, :command] # => Group by table/command
),
counter("my_app.repo.query.count"), # => Total queries
# Phoenix Metrics
summary("phoenix.router_dispatch.stop.duration",
tags: [:route],
unit: {:native, :millisecond}
),
counter("phoenix.router_dispatch.stop.count",
tags: [:route, :status] # => Requests by route/status
),
# Custom Business Metrics
counter("my_app.orders.created.count"), # => Business events
last_value("my_app.users.active.count"),
distribution("my_app.checkout.duration",
buckets: [100, 500, 1000, 5000] # => Histogram buckets
)
]
end
end
# Memory leak detection
defmodule MyApp.MemoryMonitor do
use GenServer
require Logger
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
:timer.send_interval(60_000, :check_memory) # => Check every minute
{:ok, %{baseline: get_memory()}}
end
def handle_info(:check_memory, state) do
current = get_memory()
diff = current - state.baseline
if diff > 100_000_000 do # => >100MB increase
Logger.warning("Memory leak detected",
baseline_mb: div(state.baseline, 1_000_000),
current_mb: div(current, 1_000_000),
diff_mb: div(diff, 1_000_000)
)
# Get top memory consumers
top_processes = Process.list()
|> Enum.map(fn pid ->
{pid, Process.info(pid, :memory)}
end)
|> Enum.sort_by(fn {_pid, {:memory, mem}} -> mem end, :desc)
|> Enum.take(10) # => Top 10 processes
Logger.warning("Top memory consumers", processes: top_processes)
end
{:noreply, state}
end
defp get_memory do
:erlang.memory(:total) # => Total VM memory
end
endKey Takeaway: Use remote IEx to connect to production. Inspect process state with :sys.get_state/1. Monitor memory leaks. Use LiveDashboard for real-time metrics. Trace function calls with :dbg.
Why It Matters: Process-based architecture enables horizontal scaling and fault isolation. Understanding OTP processes is key to building highly available systems.
Example 76: Security Best Practices
Implement security headers, CSRF protection, and input sanitization.
# Security headers in endpoint
defmodule MyAppWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :my_app
plug Plug.Static, at: "/", from: :my_app
# Security headers
plug :put_secure_headers
defp put_secure_headers(conn, _opts) do
conn
|> put_resp_header("x-frame-options", "DENY") # => Prevent clickjacking
|> put_resp_header("x-content-type-options", "nosniff") # => Prevent MIME sniffing
|> put_resp_header("x-xss-protection", "1; mode=block") # => XSS protection
|> put_resp_header("strict-transport-security",
"max-age=31536000; includeSubDomains") # => Force HTTPS
|> put_resp_header("content-security-policy",
"default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'")
# => CSP policy
|> put_resp_header("referrer-policy", "strict-origin-when-cross-origin")
end
plug Plug.RequestId # => Add request ID header
plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint]
plug Plug.Parsers,
parsers: [:urlencoded, :multipart, :json],
pass: ["*/*"],
json_decoder: Jason
plug Plug.MethodOverride # => Support _method param
plug Plug.Head # => Handle HEAD requests
plug Plug.Session, @session_options # => Encrypted sessions
plug MyAppWeb.Router
end
# CSRF protection (automatic in Phoenix)
defmodule MyAppWeb.Router do
pipeline :browser do
plug :accepts, ["html"]
plug :fetch_session
plug :fetch_live_flash
plug :put_root_layout, {MyAppWeb.Layouts, :root}
plug :protect_from_forgery # => CSRF token validation
plug :put_secure_browser_headers
end
# API doesn't use CSRF (use JWT instead)
pipeline :api do
plug :accepts, ["json"]
# No CSRF protection for stateless API
end
end
# Input sanitization for user content
defmodule MyApp.Content do
@doc "Sanitize HTML to prevent XSS"
def sanitize_html(html) do
HtmlSanitizeEx.basic_html(html) # => Strip dangerous tags
# => Allows: <p>, <br>, <strong>, <em>, <a>, etc.
# => Removes: <script>, <iframe>, onclick, etc.
end
def sanitize_user_post(params) do
params
|> Map.update("body", "", &sanitize_html/1) # => Clean HTML content
|> Map.update("title", "", &String.trim/1) # => Trim whitespace
end
end
# SQL injection prevention (automatic with Ecto)
# ❌ NEVER do this:
def search_posts_unsafe(term) do
query = "SELECT * FROM posts WHERE title LIKE '%#{term}%'" # => SQL INJECTION!
Ecto.Adapters.SQL.query!(Repo, query, [])
end
# ✅ Always use parameterized queries:
def search_posts_safe(term) do
from p in Post,
where: like(p.title, ^"%#{term}%") # => Parameter binding
|> Repo.all()
# => Generates: SELECT * FROM posts WHERE title LIKE $1
# => Binds: ["%search%"]
end
# Rate limiting (prevent brute force)
defmodule MyAppWeb.Plugs.RateLimitLogin do
def init(opts), do: opts
def call(conn, _opts) do
ip = get_ip(conn) # => Client IP
key = "login_attempts:#{ip}"
case MyApp.Cache.get(key) do
nil ->
MyApp.Cache.set(key, 1, ttl: 300) # => First attempt, 5min window
conn
attempts when attempts < 5 ->
MyApp.Cache.incr(key) # => Increment counter
conn
_ ->
conn
|> put_status(:too_many_requests) # => 429
|> Phoenix.Controller.json(%{error: "Too many login attempts"})
|> halt() # => Block request
end
end
defp get_ip(conn) do
conn.remote_ip |> Tuple.to_list() |> Enum.join(".")
end
end
# Secure password hashing (automatic with phx.gen.auth)
defmodule MyApp.Accounts.User do
def registration_changeset(user, attrs) do
user
|> cast(attrs, [:email, :password])
|> validate_email()
|> validate_password()
|> hash_password() # => Bcrypt with salt
end
defp hash_password(changeset) do
password = get_change(changeset, :password)
if password && changeset.valid? do
changeset
|> put_change(:password_hash, Bcrypt.hash_pwd_salt(password))
|> delete_change(:password) # => Don't store plaintext
else
changeset
end
end
endKey Takeaway: Set security headers (CSP, HSTS, X-Frame-Options). CSRF tokens automatic in Phoenix. Sanitize user HTML with HtmlSanitizeEx. Always use Ecto for queries (prevents SQL injection). Rate limit login attempts.
Why It Matters: Ecto provides type-safe database interactions with compile-time query validation. This prevents SQL injection and catches query errors before deployment.
Example 77: WebSocket Connection Pooling
Optimize WebSocket connections with connection pooling and load distribution.
# Channel with connection limits
defmodule MyAppWeb.UserSocket do
use Phoenix.Socket
# Limit concurrent connections per user
def connect(%{"token" => token}, socket, _connect_info) do
case verify_token(token) do
{:ok, user_id} ->
# Check connection limit
case check_connection_limit(user_id) do
:ok ->
{:ok, assign(socket, :user_id, user_id)} # => Allow connection
{:error, :limit_exceeded} ->
:error # => Reject connection
end
{:error, _} ->
:error
end
end
defp check_connection_limit(user_id) do
count = Phoenix.Tracker.list(MyApp.Presence, "user:#{user_id}")
|> Enum.count() # => Count current connections
if count < 5 do # => Max 5 connections per user
:ok
else
{:error, :limit_exceeded}
end
end
def id(socket), do: "user_socket:#{socket.assigns.user_id}"
channel "room:*", MyAppWeb.RoomChannel
channel "user:*", MyAppWeb.UserChannel
end
# Connection pooling for external services
defmodule MyApp.External.APIClient do # => Defines HTTP client with connection pooling
# => Reuses connections for better performance
use GenServer # => Imports GenServer behavior
# => Manages connection pool lifecycle
def start_link(opts) do # => Public function: start_link/1
# => Called by supervisor to start process
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
# => Starts GenServer
# => Registers with module name
# => Returns {:ok, pid}
end # => End start_link/1 function
def init(_opts) do # => Initializes GenServer
# => Sets up HTTP connection pool
# Create connection pool
{:ok, pool} = :hackney_pool.child_spec(:api_pool, [
# => Creates Hackney connection pool
# => Pool name: :api_pool
timeout: 15_000, # => Connection timeout
# => Max 15 seconds to establish connection
# => Prevents hanging on slow servers
max_connections: 100 # => Pool size
# => Maximum 100 concurrent connections
# => Reuses idle connections
]) # => Returns pool child_spec
{:ok, %{pool: pool}} # => Returns GenServer initial state
# => Stores pool reference
end # => End init/1 callback
def get(path) do # => Public API: get/1
# => Makes HTTP GET request using pool
url = "https://api.example.com" <> path # => Builds full URL
# => E.g., path="/users" → "https://api.example.com/users"
headers = [{"Authorization", "Bearer #{api_token()}"}]
# => Sets request headers
# => Authorization header with Bearer token
# => api_token() fetches from config/env
case :hackney.get(url, headers, "", [pool: :api_pool]) do
# => Makes HTTP GET request
# => Uses :api_pool for connection reuse
# => Returns {:ok, status, headers, client_ref} or {:error, reason}
{:ok, 200, _headers, client_ref} -> # => Success case: HTTP 200 OK
# => client_ref: reference to read response body
{:ok, body} = :hackney.body(client_ref) # => Read response
# => Reads full response body
# => body is binary string (JSON)
Jason.decode(body) # => Parse JSON
# => Converts JSON string to Elixir map
# => Returns {:ok, map} or {:error, reason}
{:ok, status, _headers, _client_ref} -> # => Non-200 status code
# => E.g., 404, 500, etc.
{:error, {:http_error, status}} # => Returns error tuple with status
# => E.g., {:error, {:http_error, 404}}
{:error, reason} -> # => Network/connection error
# => E.g., timeout, connection refused
{:error, reason} # => Returns error reason
# => E.g., {:error, :timeout}
end # => End case statement
end # => End get/1 function
end # => End MyApp.External.APIClient module
# Distribute WebSocket connections across nodes
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
# Track connections across cluster
def track_connection(socket) do
track(socket, "connections", socket.id, %{
node: node(), # => Which Erlang node
joined_at: System.system_time(:second)
})
end
# Get connection count across all nodes
def connection_count do
list("connections") # => All tracked connections
|> Enum.count()
end
# Get connections per node
def connections_by_node do
list("connections")
|> Enum.group_by(fn {_id, %{metas: [meta | _]}} ->
meta.node # => Group by node name
end)
|> Enum.map(fn {node, conns} ->
{node, Enum.count(conns)} # => {node, count}
end)
|> Enum.into(%{})
end
endKey Takeaway: Limit connections per user to prevent abuse. Use Hackney pool for HTTP connection pooling. Track WebSocket connections across cluster with Presence. Distribute load across multiple nodes.
Why It Matters: Phoenix sockets enable efficient bidirectional communication for real-time features. This is essential for chat, notifications, and collaborative editing applications.
Example 78: GraphQL API with Absinthe
Build GraphQL API for flexible client queries.
# mix.exs
defp deps do
[
{:absinthe, "~> 1.7"}, # => GraphQL implementation
{:absinthe_plug, "~> 1.5"} # => Phoenix integration
]
end
# GraphQL schema
defmodule MyAppWeb.Schema do
use Absinthe.Schema
import_types MyAppWeb.Schema.ContentTypes
query do
@desc "Get all posts"
field :posts, list_of(:post) do
arg :limit, :integer, default_value: 10 # => Optional limit
arg :offset, :integer, default_value: 0
resolve &MyAppWeb.Resolvers.Content.list_posts/3
end
@desc "Get post by ID"
field :post, :post do
arg :id, non_null(:id) # => Required ID
resolve &MyAppWeb.Resolvers.Content.get_post/3
end
@desc "Search posts"
field :search_posts, list_of(:post) do
arg :query, non_null(:string)
resolve &MyAppWeb.Resolvers.Content.search_posts/3
end
end
mutation do
@desc "Create a post"
field :create_post, :post do
arg :title, non_null(:string)
arg :body, non_null(:string)
arg :tags, list_of(:string)
resolve &MyAppWeb.Resolvers.Content.create_post/3
end
@desc "Update a post"
field :update_post, :post do
arg :id, non_null(:id)
arg :title, :string # => Optional fields
arg :body, :string
resolve &MyAppWeb.Resolvers.Content.update_post/3
end
end
subscription do
@desc "Subscribe to new posts"
field :post_created, :post do
config fn _args, _context ->
{:ok, topic: "posts"} # => PubSub topic
end
trigger :create_post, topic: fn _post ->
["posts"] # => Trigger on mutation
end
end
end
end
# Object types
defmodule MyAppWeb.Schema.ContentTypes do
use Absinthe.Schema.Notation
object :post do
field :id, :id # => Post ID
field :title, :string
field :body, :string
field :inserted_at, :string
field :author, :user, resolve: &get_author/3 # => Nested resolver
field :comments, list_of(:comment) do
resolve &get_comments/3 # => Load comments
end
end
object :user do
field :id, :id
field :name, :string
field :email, :string
end
object :comment do
field :id, :id
field :body, :string
field :author, :user
end
defp get_author(post, _args, _resolution) do
{:ok, MyApp.Accounts.get_user!(post.author_id)} # => Load author
end
defp get_comments(post, _args, _resolution) do
comments = MyApp.Blog.list_comments(post.id)
{:ok, comments} # => Load comments
end
end
# Resolvers
defmodule MyAppWeb.Resolvers.Content do
def list_posts(_parent, args, _resolution) do
posts = MyApp.Blog.list_posts(args) # => Pass limit/offset
{:ok, posts}
end
def get_post(_parent, %{id: id}, _resolution) do
case MyApp.Blog.get_post(id) do
nil -> {:error, "Post not found"}
post -> {:ok, post}
end
end
def create_post(_parent, args, %{context: %{current_user: user}}) do
case MyApp.Blog.create_post(Map.put(args, :author_id, user.id)) do
{:ok, post} ->
Absinthe.Subscription.publish(
MyAppWeb.Endpoint,
post,
post_created: "posts" # => Trigger subscription
)
{:ok, post}
{:error, changeset} ->
{:error, changeset} # => Return errors
end
end
end
# Router
defmodule MyAppWeb.Router do # => Defines Phoenix router
# => Configures GraphQL endpoints
scope "/api" do # => API scope under /api path
# => All routes prefixed with /api
pipe_through :api # => Uses API pipeline
# => Accepts JSON, no session/CSRF
forward "/graphql", Absinthe.Plug, # => GraphQL query endpoint
# => POST requests to /api/graphql
schema: MyAppWeb.Schema # => GraphQL endpoint
# => Uses schema defined above
# => Handles queries, mutations, subscriptions
forward "/graphiql", Absinthe.Plug.GraphiQL, # => GraphQL IDE endpoint
# => Browser-based query explorer
schema: MyAppWeb.Schema, # => Uses same schema
# => Access at /api/graphiql in browser
interface: :playground # => GraphQL IDE
# => Interactive query editor
# => Shows schema documentation
# => Only enable in dev/staging!
end # => End /api scope
end # => End router
# Example queries:
# query { # => GraphQL query operation
# posts(limit: 5) { # => Fetches up to 5 posts
# id # => Request post ID
# title # => Request post title
# author { name } # => Nested query: author's name
# comments { body } # => Nested query: all comment bodies
# } # => Client chooses exact fields needed
# } # => No over-fetching or under-fetching
#
# mutation { # => GraphQL mutation operation
# createPost(title: "Hello", body: "World") { # => Creates new post with arguments
# id # => Returns created post ID
# title # => Returns created post title
# } # => Client specifies return fields
# } # => Mutation triggers subscriptionKey Takeaway: Absinthe provides GraphQL for Phoenix. Define schema with queries, mutations, subscriptions. Clients request only needed fields. Use resolvers to load data. GraphiQL provides interactive API explorer.
Why It Matters: Schemas define data structure and types for validation and persistence. This provides a single source of truth for your domain models.
Example 79: Event Sourcing Pattern
Implement event sourcing to store state changes as immutable events.
# Event schema
defmodule MyApp.Events.Event do
use Ecto.Schema
schema "events" do
field :aggregate_id, :string # => Entity ID (e.g., order_id)
field :aggregate_type, :string # => Entity type (e.g., "Order")
field :event_type, :string # => Event name
field :payload, :map # => Event data (JSONB)
field :version, :integer # => Event version
field :inserted_at, :utc_datetime
end
end
# Event store
defmodule MyApp.EventStore do
alias MyApp.Events.Event
alias MyApp.Repo
def append_event(aggregate_id, aggregate_type, event_type, payload) do
# Get current version
current_version = get_current_version(aggregate_id)
event = %Event{
aggregate_id: aggregate_id,
aggregate_type: aggregate_type,
event_type: event_type,
payload: payload,
version: current_version + 1, # => Increment version
inserted_at: DateTime.utc_now()
}
case Repo.insert(event) do
{:ok, event} ->
# Publish event for projections
Phoenix.PubSub.broadcast(
MyApp.PubSub,
"events:#{aggregate_type}",
{:event_appended, event}
)
{:ok, event}
{:error, changeset} ->
{:error, changeset}
end
end
def get_events(aggregate_id) do
from e in Event,
where: e.aggregate_id == ^aggregate_id,
order_by: [asc: e.version] # => Chronological order
|> Repo.all()
end
defp get_current_version(aggregate_id) do
from e in Event,
where: e.aggregate_id == ^aggregate_id,
select: max(e.version)
|> Repo.one()
|> case do
nil -> 0 # => No events yet
version -> version
end
end
end
# Order aggregate
defmodule MyApp.Orders.Order do # => Defines Order aggregate (domain model)
# => State rebuilt from events, not stored directly
defstruct id: nil, # => Order ID field
items: [], # => List of order items
status: :draft, # => Order status (:draft, :placed, :shipped)
total: 0, # => Total order amount
version: 0 # => Event version counter
# Apply event to state
def apply_event(order, %Event{event_type: "OrderCreated", payload: payload}) do
# => Handles OrderCreated event
# => Pattern matches event_type
%{order | # => Updates order struct
id: payload["id"], # => Sets order ID from event
items: [], # => Starts with empty items list
status: :draft, # => Initial status is :draft
total: 0 # => Initial total is 0
} # => Returns updated order
end # => End OrderCreated handler
def apply_event(order, %Event{event_type: "ItemAdded", payload: payload}) do
# => Handles ItemAdded event
# => Adds item to order
item = payload["item"] # => Extracts item from payload
# => E.g., %{"name" => "Widget", "price" => 10}
%{order | # => Updates order struct
items: [item | order.items], # => Prepends item to items list
# => E.g., [new_item | existing_items]
total: order.total + item["price"] # => Update total
# => Adds item price to running total
# => E.g., total: 20 + 10 = 30
} # => Returns updated order
end # => End ItemAdded handler
def apply_event(order, %Event{event_type: "OrderPlaced"}) do
# => Handles OrderPlaced event
# => Transitions order to placed status
%{order | status: :placed} # => Change status
# => Updates status field to :placed
# => Order can no longer be modified
end # => End OrderPlaced handler
def apply_event(order, %Event{event_type: "OrderShipped", payload: payload}) do
# => Handles OrderShipped event
# => Records shipment details
%{order | # => Updates order struct
status: :shipped, # => Changes status to :shipped
# => Final order status
tracking_number: payload["tracking_number"] # => Adds tracking number from payload
# => E.g., "TRACK123456"
} # => Returns updated order
end # => End OrderShipped handler
# Rebuild state from events
def from_events(events) do # => Public function: from_events/1
# => Replays events to rebuild order state
Enum.reduce(events, %__MODULE__{}, &apply_event(&2, &1))
# => Reduces events list into order struct
# => Starts with empty %Order{}
# => Applies each event in order
# => Replay all events to rebuild current state
# => E.g., [OrderCreated, ItemAdded, ItemAdded, OrderPlaced]
# => Builds final order: %Order{id: 1, items: [..], status: :placed, total: 50}
end # => End from_events/1 function
end # => End MyApp.Orders.Order module
# Command handler
defmodule MyApp.Orders.Commands do
alias MyApp.EventStore
alias MyApp.Orders.Order
def create_order(order_id) do
EventStore.append_event(
order_id,
"Order",
"OrderCreated",
%{"id" => order_id} # => Event payload
)
end
def add_item(order_id, item) do
# Load current state
events = EventStore.get_events(order_id)
order = Order.from_events(events) # => Rebuild from events
# Validate command
if order.status == :draft do
EventStore.append_event(
order_id,
"Order",
"ItemAdded",
%{"item" => item} # => Record event
)
else
{:error, "Cannot add items to placed order"}
end
end
def place_order(order_id) do
events = EventStore.get_events(order_id)
order = Order.from_events(events)
if order.total > 0 do
EventStore.append_event(
order_id,
"Order",
"OrderPlaced",
%{} # => No additional data
)
else
{:error, "Order must have items"}
end
end
end
# Read model projection (materialized view)
defmodule MyApp.Orders.Projection do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def init(_) do
Phoenix.PubSub.subscribe(MyApp.PubSub, "events:Order")
{:ok, %{}}
end
def handle_info({:event_appended, event}, state) do
# Update read model based on event
update_read_model(event) # => Update database view
{:noreply, state}
end
defp update_read_model(%{event_type: "OrderPlaced", aggregate_id: order_id}) do
# Update orders table for fast queries
events = EventStore.get_events(order_id)
order = Order.from_events(events)
# Save snapshot for performance
MyApp.Repo.insert_or_update(%MyApp.Orders.OrderSnapshot{
id: order_id,
items: order.items,
total: order.total,
status: order.status
})
end
endKey Takeaway: Store all state changes as immutable events. Rebuild state by replaying events. Use projections for fast queries. Events provide complete audit trail. Supports time travel and debugging.
Why It Matters: Event-driven patterns decouple components and enable scalable architectures. Understanding events is key to building maintainable Phoenix applications.
Example 80: Advanced Testing Strategies
Implement property-based testing and contract testing.
# Property-based testing with StreamData
defmodule MyApp.BlogTest do # => Test module for Blog context
# => Uses property-based testing
use ExUnit.Case # => Imports ExUnit test macros
# => Provides test/2, assert/1, etc.
use ExUnitProperties # => Property testing
# => Imports property/2, check/2, generators
property "post title is always trimmed" do # => Property test: invariant over many inputs
# => Tests 100 random inputs by default
check all title <- string(:alphanumeric, min_length: 1, max_length: 100) do
# => Generator: random alphanumeric strings
# => min_length: 1, max_length: 100
# => Runs test with 100 different titles
# Generate random titles
params = %{"title" => " #{title} "} # => Add whitespace
# => Surrounds title with spaces
# => E.g., " HelloWorld "
changeset = Post.changeset(%Post{}, params) # => Creates changeset
# => Validates and casts params
if changeset.valid? do # => Only check if changeset valid
# => Skips invalid inputs
trimmed = Ecto.Changeset.get_change(changeset, :title)
# => Gets :title change from changeset
# => Should be trimmed value
assert String.trim(title) == trimmed # => Property: always trimmed
# => Invariant: whitespace removed
# => Fails if any input violates property
end # => End if
end # => End check
end # => End property test
property "creating and deleting post is idempotent" do
# => Property test: operation reversibility
# => Tests create → delete → verify cycle
check all title <- string(:printable) do # => Generator: random printable strings
# => Includes special chars, unicode
# Create post
{:ok, post} = MyApp.Blog.create_post(%{title: title, body: "test"})
# => Creates post with random title
# => Should succeed for all inputs
# Delete post
{:ok, _} = MyApp.Blog.delete_post(post.id) # => Deletes created post
# => Should succeed
# Verify deleted
assert MyApp.Blog.get_post(post.id) == nil # => Property: delete works
# => Post no longer exists
# => Invariant: create+delete = nothing
end # => End check
end # => End property test
end # => End MyApp.BlogTest module
# Contract testing for APIs
defmodule MyApp.API.ContractTest do
use ExUnit.Case
@api_contract %{
"POST /api/posts" => %{
request: %{
"title" => "string",
"body" => "string"
},
response: %{
status: 201,
body: %{
"id" => "integer",
"title" => "string",
"body" => "string",
"inserted_at" => "string"
}
}
},
"GET /api/posts/:id" => %{
response: %{
status: 200,
body: %{
"id" => "integer",
"title" => "string"
}
}
}
}
test "API contract compliance" do
for {endpoint, contract} <- @api_contract do
# Test request/response matches contract
verify_contract(endpoint, contract) # => Verify API shape
end
end
defp verify_contract("POST /api/posts", contract) do
conn = build_conn()
|> post("/api/posts", contract.request)
assert conn.status == contract.response.status
body = Jason.decode!(conn.resp_body)
# Verify response structure
for {field, type} <- contract.response.body do
assert Map.has_key?(body, field) # => Field exists
assert type_matches?(body[field], type) # => Type correct
end
end
defp type_matches?(value, "string"), do: is_binary(value)
defp type_matches?(value, "integer"), do: is_integer(value)
end
# Integration tests with setup
defmodule MyAppWeb.PostLiveTest do # => LiveView integration test module
# => Tests full user interactions
use MyAppWeb.ConnCase, async: true # => Uses conn test case
# => async: true enables parallel test execution
# => Faster test suite (isolated database)
import Phoenix.LiveViewTest # => Imports LiveView test helpers
# => Provides live/2, render_click/2, etc.
setup do # => Setup callback: runs before each test
# => Creates test data
# Setup test data
user = insert(:user) # => Factory
# => Creates test user with factory
# => Uses ExMachina or similar
posts = insert_list(3, :post, author: user) # => Create 3 posts
# => Creates 3 posts linked to user
# => insert_list/3: factory helper
{:ok, user: user, posts: posts} # => Return to test
# => Makes user and posts available in test context
# => Tests receive as %{user: user, posts: posts}
end # => End setup callback
test "displays posts", %{conn: conn, posts: posts} do
# => Test function
# => Receives conn and posts from setup
{:ok, view, html} = live(conn, "/posts") # => Mounts LiveView at /posts
# => view: LiveView test process
# => html: initial rendered HTML
# Assert all posts shown
for post <- posts do # => Iterates over test posts
# => Checks each post rendered
assert html =~ post.title # => Each title present
# => =~ checks string contains title
# => Fails if any post missing
end # => End for loop
# Test interaction
view # => LiveView test process
|> element("button", "Load More") # => Finds <button>Load More</button>
# => Selects element to interact with
|> render_click() # => Click button
# => Triggers phx-click event
# => Simulates user click
assert has_element?(view, "[data-role=post]", count: 6) # => 6 posts now
# => Checks for 6 elements with data-role="post"
# => Verifies "Load More" added 3 more posts
end # => End test
test "creates post", %{conn: conn} do # => Test post creation flow
# => Receives conn from setup
{:ok, view, _html} = live(conn, "/posts/new") # => Mounts LiveView at /posts/new
# => Opens post creation form
# Fill form
view # => LiveView test process
|> form("#post-form", post: %{title: "", body: ""})
# => Selects form by ID
# => Sets form values (empty for validation test)
|> render_change() # => Trigger validation
# => Simulates phx-change event
# => Triggers LiveView validation
assert has_element?(view, ".error") # => Shows errors
# => Checks for error message elements
# => Validates required field errors shown
# Submit valid form
view # => LiveView test process
|> form("#post-form", post: %{title: "Test", body: "Content"})
# => Fills form with valid data
# => title: "Test", body: "Content"
|> render_submit() # => Submit form
# => Triggers phx-submit event
# => Creates post in database
assert_redirected(view, "/posts/1") # => Redirects to show
# => Verifies redirect to post show page
# => Post ID is 1 (first post in test)
end # => End test
end # => End MyAppWeb.PostLiveTest module
# Performance testing
defmodule MyApp.PerformanceTest do # => Performance test module
# => Validates performance requirements
use ExUnit.Case # => Imports ExUnit test macros
# => Provides test/2, assert/1, etc.
@tag :performance # => Tags test as performance test
# => Can run selectively: mix test --only performance
test "list posts performs under 100ms" do # => Performance test: query speed
# => Validates 100ms requirement
# Create test data
insert_list(100, :post) # => Creates 100 test posts
# => Realistic data volume
{time_us, _result} = :timer.tc(fn -> # => Measures execution time
# => :timer.tc/1 returns {microseconds, result}
MyApp.Blog.list_posts() # => Measure execution time
# => Function being benchmarked
end) # => Returns {time_in_microseconds, function_result}
time_ms = time_us / 1000 # => Converts microseconds to milliseconds
# => 1000 μs = 1 ms
assert time_ms < 100, "Query took #{time_ms}ms, expected < 100ms"
# => Fails if query takes ≥100ms
# => Error message shows actual time
end # => End performance test
@tag :performance # => Tags as performance test
test "N+1 query prevention" do # => Tests for N+1 query problem
# => Validates preloading works
posts = insert_list(10, :post) # => Creates 10 test posts
# => Base data for N+1 test
Enum.each(posts, fn post -> # => Iterates over posts
# => Creates comments for each
insert_list(5, :comment, post: post) # => Creates 5 comments per post
# => Total: 50 comments
end) # => End each
# Count queries
query_count = count_queries(fn -> # => Counts database queries executed
# => Uses telemetry to track
MyApp.Blog.list_posts_with_comments() # => Preload comments
# => Should use 2 queries (posts + comments)
# => BAD: 11 queries (1 + 10 for each post)
end) # => Returns total query count
assert query_count == 2, "Expected 2 queries, got #{query_count}"
# => Fails if more than 2 queries
# => 1 for posts, 1 for comments (not 11!)
# => Verifies preload prevents N+1
end # => End N+1 test
defp count_queries(fun) do # => Helper: counts queries during execution
# => Uses telemetry to intercept queries
ref = make_ref() # => Creates unique reference
# => Used for telemetry handler ID
:telemetry.attach( # => Attaches telemetry handler
# => Intercepts query events
"query-counter-#{ref}", # => Handler ID (unique per test)
# => E.g., "query-counter-#Reference<0.1.2.3>"
[:my_app, :repo, :query], # => Event to listen for
# => Emitted by Ecto on each query
fn _event, _measurements, _metadata, acc -> # => Handler function
# => Called for each query
send(self(), {:query, acc + 1}) # => Sends message with incremented count
# => acc starts at 0, increments each query
end,
0 # => Initial accumulator value
# => Starting count is 0
)
fun.() # => Executes function being measured
# => Triggers database queries
count = receive_queries(0) # => Collects query count from messages
# => Starts with count 0
:telemetry.detach("query-counter-#{ref}") # => Removes telemetry handler
# => Cleanup to prevent leaks
count # => Returns total query count
end # => End count_queries/1 helper
defp receive_queries(count) do # => Helper: receives query count messages
# => Recursively collects counts
receive do # => Waits for messages
{:query, new_count} -> receive_queries(new_count)
# => Receives updated count
# => Recursively waits for more
after
100 -> count # => No more queries
# => Timeout after 100ms
# => Returns final count
end # => End receive
end # => End receive_queries/1 helper
end # => End MyApp.PerformanceTest moduleKey Takeaway: Property-based testing validates invariants across random inputs. Contract testing ensures API stability. Integration tests verify full user flows. Performance tests catch regressions. Use async: true for parallel test execution.
Why It Matters: This Phoenix pattern is fundamental for building production web applications. Understanding this concept enables you to create robust, maintainable, and scalable applications.