Advanced
Group 9: Database Advanced
Example 51: Transactions and Concurrency with Ecto.Multi
Execute multiple database operations atomically. If any fails, all rollback.
%% Ecto.Multi transaction flow
graph TD
A[Multi.new] --> B[Multi.update :debit]
B --> C[Multi.update :credit]
C --> D[Multi.insert :log]
D --> E[Repo.transaction]
E --> F{All succeed?}
F -->|Yes| G[Commit all changes]
F -->|No| H[Rollback everything]
G --> I[Return {:ok, results}]
H --> J[Return {:error, failed_op}]
style A fill:#0173B2,color:#fff
style E fill:#DE8F05,color:#000
style G fill:#029E73,color:#fff
style H fill:#CA9161,color:#000
defmodule MyApp.Transfers do
def transfer_funds(from_account, to_account, amount) do
result =
Ecto.Multi.new() # => Start transaction pipeline
|> Ecto.Multi.update(
:debit, # => Named operation
Ecto.Changeset.change(from_account, balance: from_account.balance - amount)
) # => Deduct from source account
|> Ecto.Multi.update(
:credit, # => Second operation
Ecto.Changeset.change(to_account, balance: to_account.balance + amount)
) # => Add to destination account
|> Ecto.Multi.insert(:transaction_log, %TransactionLog{
from_id: from_account.id,
to_id: to_account.id,
amount: amount
}) # => Record transaction history
|> MyApp.Repo.transaction() # => Execute all or rollback
case result do
{:ok, %{debit: from, credit: to, transaction_log: log}} ->
{:ok, from, to, log} # => All 3 operations committed
# => Database changes persisted atomically
{:error, :debit, changeset, _changes} ->
{:error, "Debit failed", changeset} # => Rolled back before credit
# => No partial updates in database
{:error, failed_op, changeset, _changes} ->
{:error, "Operation failed: #{failed_op}", changeset}
# => All changes discarded, database unchanged
end
end
# Dependency between operations
def complex_transaction do
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, User.create_changeset(%{email: "user@example.com"}))
|> Ecto.Multi.insert(:profile, fn %{user: user} ->
Profile.create_changeset(user)
end)
|> Ecto.Multi.insert(:settings, fn %{user: user} ->
Settings.default_changeset(user)
end)
|> MyApp.Repo.transaction()
end
endKey Takeaway: Ecto.Multi ensures all-or-nothing execution. Operations reference previous results with fn. Rollback happens automatically on any failure. Perfect for transfers, account creation, multi-step operations.
Why It Matters: Ecto provides type-safe database interactions with compile-time query validation. This prevents SQL injection and catches query errors before deployment.
Example 52: Database Constraints and Error Handling
Handle database constraint violations (unique, foreign key, etc.) gracefully in changesets.
%% Constraint violation handling
graph TD
A[Insert user] --> B[Database]
B --> C{Constraint violated?}
C -->|UNIQUE email| D[unique_constraint catches]
C -->|FK organization_id| E[assoc_constraint catches]
C -->|No violation| F[Insert succeeds]
D --> G[Return changeset with error]
E --> G
F --> H[Return {:ok, user}]
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#000
style F fill:#029E73,color:#fff
style G fill:#CA9161,color:#000
defmodule MyApp.Accounts.User do
schema "users" do
field :email, :string
field :username, :string
end
def registration_changeset(user, attrs) do
user
|> cast(attrs, [:email, :username])
|> unique_constraint(:email) # => Catch duplicate email
|> unique_constraint(:username) # => Catch duplicate username
|> assoc_constraint(:organization) # => Validate FK exists
end # => Converts DB errors to changeset errors
end
# In your service
defmodule MyApp.Accounts do
def register_user(attrs) do
case %User{}
|> User.registration_changeset(attrs)
|> Repo.insert() do
{:ok, user} ->
{:ok, user}
{:error, %Changeset{} = changeset} ->
# Check for constraint violations
error_fields = Enum.map(changeset.errors, fn {field, {msg, _}} -> {field, msg} end)
if Enum.any?(error_fields, fn {field, _} -> field == :email end) do
{:error, "Email already registered"}
else
{:error, "Registration failed"}
end
end
end
endKey Takeaway: unique_constraint/2 catches database uniqueness violations. assoc_constraint/2 catches foreign key errors. Changesets provide user-friendly error messages without SQL errors exposed.
Why It Matters: Changesets centralize validation logic and provide user-friendly error messages. This pattern ensures data integrity and improves user experience with clear feedback.
Example 53: Polymorphic Associations with many_to_many :through
Model flexible relationships where the same entity can have many different types of related entities.
%% Many-to-many through join table
erDiagram
POST ||--o{ POST_TAG : has
TAG ||--o{ POST_TAG : has
COMMENT ||--o{ COMMENT_TAG : has
TAG ||--o{ COMMENT_TAG : has
POST {
int id PK
string title
}
TAG {
int id PK
string name
}
POST_TAG {
int post_id FK
int tag_id FK
}
COMMENT {
int id PK
string body
}
COMMENT_TAG {
int comment_id FK
int tag_id FK
}
defmodule MyApp.Content.Post do
schema "posts" do
field :title, :string
many_to_many(:tags, MyApp.Tagging.Tag, join_through: "post_tags")
many_to_many(:attachments, MyApp.Attachments.Attachment, join_through: "post_attachments")
end
end
defmodule MyApp.Content.Comment do
schema "comments" do
field :body, :string
many_to_many(:tags, MyApp.Tagging.Tag, join_through: "comment_tags")
end
end
# Migration for join table
def change do
create table(:post_tags) do
add :post_id, references(:posts, on_delete: :delete_all)
add :tag_id, references(:tags, on_delete: :delete_all)
timestamps()
end
create unique_index(:post_tags, [:post_id, :tag_id])
end
# Usage
post = Post
|> Repo.preload(:tags)
|> Ecto.Changeset.change()
|> put_assoc(:tags, tags)
|> Repo.update()
# Query posts with specific tag
posts = from p in Post,
join: t in assoc(p, :tags),
where: t.slug == "featured"Key Takeaway: many_to_many/3 with join_through creates flexible relationships. Use put_assoc/3 to update related records. Query across relationships with join.
Why It Matters: Query composition enables complex database operations. Understanding Ecto queries is essential for application performance.
Example 54: Multi-Tenancy with Ecto Query Prefix
Isolate tenant data at the query level. Each query automatically scopes to tenant.
defmodule MyApp.Accounts do
def get_user(user_id, tenant_id) do
from(u in User, where: u.id == ^user_id and u.tenant_id == ^tenant_id)
|> Repo.one()
end
def create_user(attrs, tenant_id) do
%User{}
|> User.changeset(attrs |> Map.put("tenant_id", tenant_id))
|> Repo.insert()
end
end
# Or use query prefix for schema-per-tenant
defmodule MyApp.Repo do
def for_tenant(tenant_id) do
# All queries run with prefix filter
Repo.put_dynamic_repo({__MODULE__, {tenant_id}})
end
end
# Query scoped to tenant
User
|> where([u], u.tenant_id == ^tenant_id)
|> Repo.all()
# Or with dynamic query prefix
User
|> Repo.all(prefix: "tenant_#{tenant_id}")Key Takeaway: Always filter by tenant_id in queries. Use scopes (functions that return queries) to prevent tenant leaks. Consider separate schemas per tenant for complete isolation.
Why It Matters: Schemas define data structure and types for validation and persistence. This provides a single source of truth for your domain models.
Example 55: PostgreSQL Advanced Features in Ecto
Leverage PostgreSQL-specific features: JSONB, arrays, full-text search, custom types.
defmodule MyApp.Blog.Post do
schema "posts" do
field :title, :string
field :metadata, :map # => JSONB in PostgreSQL
field :tags, {:array, :string} # => Array type
field :search_vector, :string # => Full-text search
end
end
# Migration
def change do
create table(:posts) do
add :title, :string
add :metadata, :jsonb, default: "{}"
add :tags, {:array, :string}, default: []
add :search_vector, :tsvector
timestamps()
end
# GIN index for JSONB performance
create index(:posts, ["(metadata)"], using: :gin)
# GIN index for full-text search
create index(:posts, [:search_vector], using: :gin)
end
# Query JSONB
posts = from p in Post,
where: fragment("? ->> ? = ?", p.metadata, "status", "published")
# Array operations
posts = from p in Post,
where: fragment("? @> ?", p.tags, ^["elixir"])
# Full-text search
results = from p in Post,
where: fragment("to_tsvector('english', ?) @@ plainto_tsquery('english', ?)",
p.title, ^search_term),
select: pKey Takeaway: Use :map for JSONB, {:array, :string} for arrays. Full-text search with tsvector. Use fragment/2 for database-specific SQL. Index JSONB and tsvector for performance.
Why It Matters: This Phoenix pattern is fundamental for building production web applications. Understanding this concept enables you to create robust, maintainable, and scalable applications.
Group 10: Performance
Example 56: Query Optimization - N+1 Prevention
Load related data efficiently to avoid N+1 queries where one query results in N additional queries.
%% N+1 problem vs preload solution
graph TD
A[❌ N+1 Problem] --> B[Query 1: SELECT * FROM posts]
B --> C[Query 2: SELECT * FROM authors WHERE id = 1]
B --> D[Query 3: SELECT * FROM authors WHERE id = 2]
B --> E[Query 4: SELECT * FROM authors WHERE id = 3]
B --> F[... N queries]
G[✅ Preload Solution] --> H[Query 1: SELECT * FROM posts]
H --> I[Query 2: SELECT * FROM authors WHERE id IN]
I --> J[Total: 2 queries]
style A fill:#CA9161,color:#000
style G fill:#029E73,color:#fff
style J fill:#029E73,color:#fff
# ❌ N+1 Problem - 1 query + N queries
posts = Post |> Repo.all() # => SELECT * FROM posts (1 query)
for post <- posts do
author = Author |> where([a], a.id == ^post.author_id) |> Repo.one()
# => SELECT * FROM authors WHERE id = ? (N queries!)
end
# Total: 1 + N queries (if 100 posts = 101 queries!)
# ✅ Solution 1: Preload
posts = Post
|> preload(:author) # => Eager load authors
|> Repo.all()
# => 2 queries total: SELECT posts, SELECT authors WHERE id IN (...)
# ✅ Solution 2: Join (for aggregations)
posts = from p in Post,
join: a in assoc(p, :author), # => SQL JOIN
select: {p, a}
# => 1 query: SELECT posts.*, authors.* FROM posts JOIN authors
# ✅ Solution 3: Preload with nested associations
posts = Post
|> preload([comments: :author]) # => Loads comments and their authors
|> Repo.all()
# Query with EXPLAIN to see execution plan
results = Repo.all(from p in Post, preload: :author)
IO.inspect(Repo.explain(:all, Post))Key Takeaway: Use preload/1 to eager-load associations. Use join for aggregations and filtering. Always check your queries with EXPLAIN. Avoid fetching in loops.
Why It Matters: Associations model relationships between data entities. Understanding Ecto associations enables efficient data access patterns.
Example 57: Caching Strategies
Cache expensive operations to reduce database load and improve response time.
defmodule MyApp.CacheServer do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def get_cache(key) do
GenServer.call(__MODULE__, {:get, key})
end
def set_cache(key, value, ttl_ms) do
GenServer.cast(__MODULE__, {:set, key, value, ttl_ms})
end
@impl true
def init(state) do
{:ok, state}
end
@impl true
def handle_call({:get, key}, _from, state) do
case Map.get(state, key) do
{value, expires_at} when expires_at > System.monotonic_time(:millisecond) ->
{:reply, {:ok, value}, state} # => Cache hit, not expired
_ ->
{:reply, :not_found, state} # => Cache miss or expired
end
end # => Synchronous call, blocks caller until reply
@impl true
def handle_cast({:set, key, value, ttl_ms}, state) do
expires_at = System.monotonic_time(:millisecond) + ttl_ms # => Calculate expiry
{:noreply, Map.put(state, key, {value, expires_at})}
end # => Asynchronous, returns immediately without blocking
end
# Or use Cachex library
defmodule MyApp.Blog do
def get_popular_posts do
case Cachex.get(:blog_cache, "popular_posts") do
{:ok, nil} ->
posts = Post |> where([p], p.likes > 100) |> Repo.all()
Cachex.put(:blog_cache, "popular_posts", posts, ttl: :timer.minutes(60))
posts
{:ok, posts} ->
posts
end
end
endKey Takeaway: Cache expensive queries with TTL (time-to-live). Invalidate cache when data changes. Use Cachex for distributed caching. Cache at controller or service layer.
Why It Matters: Controllers implement the request-response pattern that forms the backbone of web applications. Understanding Phoenix controllers enables proper separation of concerns and clean HTTP interface design.
Example 58: Background Jobs with Oban
Execute long-running tasks asynchronously. Schedule recurring jobs.
%% Oban job processing flow
graph TD
A[User Registration] --> B[Insert Oban Job]
B --> C[oban_jobs table]
C --> D[Oban Worker Pool]
D --> E[EmailWorker.perform]
E --> F{Success?}
F -->|Yes| G[Mark completed]
F -->|No| H[Retry with backoff]
H --> D
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#000
style G fill:#029E73,color:#fff
style H fill:#CC78BC,color:#000
# lib/my_app/workers/email_worker.ex
defmodule MyApp.Workers.EmailWorker do
use Oban.Worker, queue: :default
@impl Oban.Worker
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
user = MyApp.Accounts.get_user!(user_id) # => Load user
MyApp.Mailer.send_welcome_email(user) # => Send email
:ok # => Mark job complete
end # => Return :ok for success, {:error, reason} to retry
end
# In your controller/action
defmodule MyAppWeb.UserController do
def create(conn, %{"user" => user_params}) do
case MyApp.Accounts.create_user(user_params) do
{:ok, user} ->
# Queue background job
%{"user_id" => user.id}
|> MyApp.Workers.EmailWorker.new() # => Build job struct
|> Oban.insert() # => Insert into oban_jobs table
json(conn, user) # => Respond immediately
{:error, changeset} ->
error_response(conn, changeset)
end
end
end
# Schedule recurring jobs
defmodule MyApp.Application do
def start(_type, _args) do
children = [
# ... other children
Oban,
# Schedule daily cleanup at 2 AM
{Oban.Cron, crontab: [
{"0 2 * * *", MyApp.Workers.CleanupWorker}
]}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
endKey Takeaway: Create Worker modules implementing Oban.Worker. Queue jobs asynchronously. Implement retry logic. Use Oban for background processing and cron jobs.
Why It Matters: Process-based architecture enables horizontal scaling and fault isolation. Understanding OTP processes is key to building highly available systems.
Example 59: Phoenix LiveDashboard for Metrics
Monitor your application in real-time with LiveDashboard. View requests, Ecto stats, processes.
# mix.exs
defp deps do
[
{:phoenix_live_dashboard, "~> 0.7"},
{:telemetry_metrics, "~> 0.6"},
{:telemetry_poller, "~> 1.0"}
]
end
# lib/my_app_web/telemetry.ex
defmodule MyAppWeb.Telemetry do
use Supervisor
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
@impl true
def init(_arg) do
children = [
# Telemetry poller periodically collects metrics
{:telemetry_poller, handlers: handle_metrics()},
{Phoenix.LiveDashboard.TelemetryListener, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
defp handle_metrics do
[
# VM metrics
{:process_count, unit: {:byte, :kilobyte}},
{:memory, unit: {:byte, :kilobyte}},
# Ecto metrics
{MyApp.Repo, [:repo, :adapter, :status], :ok},
{MyApp.Repo, [:repo, :adapter, :connection_error], :ok}
]
end
end
# router.ex
import Phoenix.LiveDashboard.Router
scope "/" do
pipe_through :browser
live_dashboard "/dashboard"
endKey Takeaway: Phoenix LiveDashboard shows real-time metrics. Monitor request performance, database connections, memory usage, processes. Access at /dashboard.
Why It Matters: Process-based architecture enables horizontal scaling and fault isolation. Understanding OTP processes is key to building highly available systems.
Example 60: Custom Metrics with Telemetry
Emit custom metrics to track business logic and application behavior.
defmodule MyApp.Blog do
def create_post(attrs) do
start_time = System.monotonic_time()
case %Post{}
|> Post.changeset(attrs)
|> Repo.insert() do
{:ok, post} ->
duration = System.monotonic_time() - start_time
:telemetry.execute(
[:blog, :post, :created],
%{duration: duration},
%{post_id: post.id, user_id: attrs["user_id"]}
)
{:ok, post}
{:error, changeset} ->
{:error, changeset}
end
end
end
# Listen to events
defmodule MyApp.TelemetryHandler do
def attach_handlers do
:telemetry.attach(
"blog-post-created",
[:blog, :post, :created],
&__MODULE__.handle_post_created/4,
nil
)
end
def handle_post_created(_event, measurements, metadata, _config) do
IO.inspect({measurements, metadata})
# Send to monitoring service, log, increment counter, etc.
end
end
# In your application startup
MyApp.TelemetryHandler.attach_handlers()Key Takeaway: Use :telemetry.execute/3 to emit metrics. Attach handlers with :telemetry.attach/4. Track custom business metrics for monitoring and alerting.
Why It Matters: This Phoenix pattern is fundamental for building production web applications. Understanding this concept enables you to create robust, maintainable, and scalable applications.
Group 11: Production Deployment
Example 61: Mix Releases for Production
Build self-contained release that runs without Elixir/Erlang installed. Configure runtime settings.
# mix.exs
def project do
[
app: :my_app,
version: "0.1.0",
releases: [
prod: [
include_executables_for: [:unix],
steps: [:assemble, :tar]
]
]
]
end
# config/runtime.exs - Loaded at runtime, not compile time
import Config
config :my_app, MyAppWeb.Endpoint,
http: [ip: {0, 0, 0, 0}, port: String.to_integer(System.get_env("PORT") || "4000")],
# => Bind to all interfaces (0.0.0.0) for Docker
secret_key_base: System.fetch_env!("SECRET_KEY_BASE"), # => Read from env var
url: [host: System.get_env("PHX_HOST", "localhost"), port: 443, scheme: "https"]
# => HTTPS for production
if config_env() == :prod do
config :my_app, MyApp.Repo,
url: System.fetch_env!("DATABASE_URL"),
ssl: true,
socket_options: [:inet6]
end
# Build release
# mix release
# Or with custom name
# mix release prod
# Run release
# _build/prod/rel/my_app/bin/my_app start
# Start in foreground
# _build/prod/rel/my_app/bin/my_app foregroundKey Takeaway: Mix.Release builds independent package. config/runtime.exs loads at runtime. Set environment variables for secrets. Releases don’t require Elixir installation.
Why It Matters: Runtime configuration enables environment-specific settings. This pattern allows the same release to run in different environments without rebuilding.
Example 62: Docker Containerization with Multi-Stage Build
Build optimized Docker image with minimal size and security.
%% Docker multi-stage build
graph TD
A[Stage 1: Builder] --> B[elixir:1.14-alpine]
B --> C[Install build deps]
C --> D[Copy source code]
D --> E[mix deps.get]
E --> F[MIX_ENV=prod mix release]
G[Stage 2: Runtime] --> H[alpine:latest]
H --> I[Install runtime deps]
I --> J[Copy release from builder]
J --> K[Final image 50MB]
style A fill:#0173B2,color:#fff
style F fill:#DE8F05,color:#000
style G fill:#029E73,color:#fff
style K fill:#029E73,color:#fff
# Dockerfile - Multi-stage build
FROM elixir:1.14-alpine AS builder
WORKDIR /app
# Install build dependencies
RUN apk add --no-cache build-base git
# Copy source
COPY mix.exs mix.lock ./
RUN mix local.hex --force && mix local.rebar --force
RUN mix deps.get
COPY . .
# Compile release
RUN MIX_ENV=prod mix release
# Runtime stage
FROM alpine:latest
WORKDIR /app
# Install runtime dependencies
RUN apk add --no-cache openssl
# Copy release from builder
COPY --from=builder /app/_build/prod/rel/my_app ./
EXPOSE 4000
# Health check
HEALTHCHECK --interval=10s --timeout=3s --start-period=40s --retries=3 \
CMD ["./bin/my_app", "eval", "MyApp.ready?"]
CMD ["./bin/my_app", "start"]Key Takeaway: Multi-stage builds keep image small. Builder stage compiles, runtime stage runs. Use Alpine Linux for minimal footprint. Include health checks.
Why It Matters: Health checks enable load balancer integration and monitoring. This is essential for zero-downtime deployments and auto-scaling.
Example 63: Health Checks for Kubernetes
Implement liveness and readiness endpoints for orchestration systems to manage pod lifecycle.
%% Health check flow in Kubernetes
graph TD
A[Kubernetes] --> B{Liveness Probe}
B -->|GET /health/live| C[Health endpoint]
C --> D{App alive?}
D -->|Yes 200| E[Keep running]
D -->|No 5xx| F[Restart pod]
A --> G{Readiness Probe}
G -->|GET /health/ready| H[Health endpoint]
H --> I{DB connected?}
I -->|Yes 200| J[Send traffic]
I -->|No 503| K[Remove from load balancer]
style A fill:#0173B2,color:#fff
style E fill:#029E73,color:#fff
style F fill:#CA9161,color:#000
style J fill:#029E73,color:#fff
style K fill:#CC78BC,color:#000
# lib/my_app_web/controllers/health_controller.ex
defmodule MyAppWeb.HealthController do
use MyAppWeb, :controller
def readiness(conn, _params) do
# Check if app is ready to serve traffic
case check_database() do
:ok ->
json(conn, %{status: "ok"}) # => 200 OK, ready
:error ->
conn
|> put_status(:service_unavailable) # => 503 Service Unavailable
|> json(%{status: "error", reason: "database_unavailable"})
# => Kubernetes removes from load balancer
end
end # => Checked every 5 seconds
def liveness(conn, _params) do
# Check if app is alive (should restart if not)
json(conn, %{status: "ok"}) # => Always returns 200
end # => If this fails, Kubernetes restarts pod
defp check_database do
case Ecto.Adapters.SQL.query(MyApp.Repo, "SELECT 1", []) do
{:ok, _} -> :ok
{:error, _} -> :error
end
end
end
# router.ex
scope "/health", MyAppWeb do
get "/ready", HealthController, :readiness
get "/live", HealthController, :liveness
end
# Kubernetes deployment yaml
# livenessProbe:
# httpGet:
# path: /health/live
# port: 4000
# initialDelaySeconds: 30
# periodSeconds: 10
# readinessProbe:
# httpGet:
# path: /health/ready
# port: 4000
# initialDelaySeconds: 5
# periodSeconds: 5Key Takeaway: Readiness probe indicates if app can handle traffic. Liveness probe indicates if app needs restart. Health endpoints check critical dependencies (database, cache).
Why It Matters: Endpoints configure the HTTP entry point for your application. Understanding endpoint configuration is essential for performance tuning and security.
Example 64: Graceful Shutdown
Handle shutdown signals gracefully, completing in-flight requests before terminating.
# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
MyAppWeb.Telemetry,
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
MyAppWeb.Endpoint
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
@impl true
def config_change(changed, _new, removed) do
MyAppWeb.Endpoint.config_change(changed, removed)
end
@impl true
def prep_stop(_state) do
# Called before shutdown
# Drain in-flight requests
IO.puts("Shutting down gracefully...") # => Log shutdown
:ok
end # => Application.stop/1 waits for this to return
end
# In endpoint config
config :my_app, MyAppWeb.Endpoint,
# Graceful shutdown timeout (milliseconds)
shutdown: 25_000,
# Give existing connections time to close
drain_on_stop: trueKey Takeaway: prep_stop/1 gives app chance to drain requests. Set shutdown timeout. Complete in-flight work before terminating. Important for zero-downtime deployments.
Why It Matters: Deployment releases enable zero-downtime updates and easy rollbacks. Understanding OTP releases is essential for production Phoenix applications.
Example 65: Environment Configuration Management
Separate configuration by environment. Use runtime configuration for secrets.
# config/config.exs - Common to all environments
import Config
config :logger, :console,
format: "$time $metadata[$level] $message\n"
# config/dev.exs
import Config
config :my_app, MyApp.Repo,
username: "postgres",
password: "postgres",
hostname: "localhost",
port: 5432,
database: "my_app_dev"
config :my_app, MyAppWeb.Endpoint,
debug_errors: true,
check_origin: false,
watchers: [esbuild: {Esbuild, :install_and_run, []}]
# config/test.exs
import Config
config :my_app, MyApp.Repo,
username: "postgres",
password: "postgres",
database: "my_app_test",
pool: Ecto.Adapters.SQL.Sandbox
# config/runtime.exs - Loaded at runtime
import Config
database_url = System.get_env("DATABASE_URL") || "ecto://postgres:postgres@localhost/my_app_prod"
config :my_app, MyApp.Repo, url: database_url
secret_key_base = System.fetch_env!("SECRET_KEY_BASE")
config :my_app, MyAppWeb.Endpoint,
secret_key_base: secret_key_base,
url: [host: System.get_env("PHX_HOST", "localhost"), port: 443, scheme: "https"]Key Takeaway: config/ files configure at compile time. config/runtime.exs loads at runtime (for secrets). Use environment variables for production secrets. Never commit secrets to git.
Why It Matters: This Phoenix pattern is fundamental for building production web applications. Understanding this concept enables you to create robust, maintainable, and scalable applications.
Group 12: Resilience & Observability
Example 66: Error Tracking and Structured Logging
Send errors to Sentry. Log with context for debugging.
# mix.exs
defp deps do
[
{:sentry, "~> 10.0"},
{:logger_backends, "~> 1.1"},
{:jason, "~> 1.4"}
]
end
# config/config.exs
config :sentry,
dsn: System.get_env("SENTRY_DSN"),
environment_name: Mix.env(),
enable_in_test: false
# lib/my_app_web/router.ex
defmodule MyAppWeb.Router do
use MyAppWeb, :router
# Sentry captures errors
use Sentry.PlugContext
end
# Structured logging
defmodule MyApp.Blog do
require Logger
def create_post(attrs) do
Logger.info("Creating post", %{user_id: attrs["user_id"], title: attrs["title"]})
case MyApp.Repo.insert(changeset) do
{:ok, post} ->
Logger.info("Post created successfully", %{post_id: post.id})
{:ok, post}
{:error, changeset} ->
Logger.warning("Post creation failed", %{
errors: changeset.errors,
user_id: attrs["user_id"]
})
{:error, changeset}
end
rescue
e ->
Logger.error("Post creation crashed",
error: inspect(e),
stacktrace: Exception.format_stacktrace(__STACKTRACE__)
)
{:error, "Internal error"}
end
endKey Takeaway: Sentry captures production errors. Structured logging adds context. Use Logger.info/warn/error with metadata maps. Include request IDs for tracing.
Why It Matters: Contexts provide bounded modules for organizing business logic. This pattern enables clean API boundaries between different parts of your application and improves maintainability.
Example 67: Rate Limiting with Token Bucket
Prevent abuse by limiting requests per IP or user. Token bucket algorithm refills over time.
%% Rate limiting state machine
stateDiagram-v2
[*] --> CheckBucket: Request arrives
CheckBucket --> HasTokens: Tokens available
CheckBucket --> NoTokens: Bucket empty
HasTokens --> ConsumeToken: Take 1 token
ConsumeToken --> AllowRequest: Process request
NoTokens --> RejectRequest: 429 Too Many Requests
AllowRequest --> [*]
RejectRequest --> [*]
note right of CheckBucket: Bucket refills over time
note right of NoTokens: Client must wait
defmodule MyApp.RateLimiter do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def allow_request?(key, max_requests, time_window_ms) do
GenServer.call(__MODULE__, {:check, key, max_requests, time_window_ms})
end
@impl true
def init(_opts) do
{:ok, %{}}
end
@impl true
def handle_call({:check, key, max_requests, time_window_ms}, _from, state) do
now = System.monotonic_time(:millisecond)
{requests, state} = case Map.get(state, key) do
{count, reset_time} when reset_time > now ->
{count, state} # => Window still active
_ ->
# Reset bucket
reset_time = now + time_window_ms # => New window starts
{0, Map.put(state, key, {0, reset_time})} # => Reset counter
end # => Token bucket refills
if requests < max_requests do
{count, reset_time} = Map.get(state, key)
new_state = Map.put(state, key, {count + 1, reset_time}) # => Consume token
{:reply, :ok, new_state} # => Allow request
else
{:reply, :rate_limited, state} # => Reject request
end # => Returns 429 Too Many Requests
end
end
# Plug for rate limiting
defmodule MyAppWeb.Plugs.RateLimit do
def init(opts), do: opts
def call(conn, opts) do
max_requests = opts[:max_requests] || 100
time_window = opts[:time_window] || 60_000
key = conn.remote_ip |> Tuple.to_list() |> Enum.join(".")
case MyApp.RateLimiter.allow_request?(key, max_requests, time_window) do
:ok -> conn
:rate_limited ->
conn
|> put_status(:too_many_requests)
|> json(%{error: "Rate limit exceeded"})
|> halt()
end
end
endKey Takeaway: Rate limiting prevents abuse. Token bucket algorithm is fair and flexible. Apply per IP or per user. Return 429 Too Many Requests when limited.
Why It Matters: Event-driven patterns decouple components and enable scalable architectures. Understanding events is key to building maintainable Phoenix applications.
Example 68: Distributed Phoenix Clustering
Connect multiple Phoenix instances for distributed state and fault tolerance.
%% Distributed Phoenix cluster
graph TD
A[Load Balancer] --> B[Phoenix Node 1]
A --> C[Phoenix Node 2]
A --> D[Phoenix Node 3]
B <-->|Distributed Erlang| C
C <-->|Distributed Erlang| D
B <-->|Distributed Erlang| D
B --> E[PubSub broadcasts to all nodes]
C --> E
D --> E
E --> F[PostgreSQL]
style A fill:#0173B2,color:#fff
style B fill:#029E73,color:#fff
style C fill:#029E73,color:#fff
style D fill:#029E73,color:#fff
style F fill:#DE8F05,color:#000
# mix.exs
defp deps do
[
{:libcluster, "~> 3.3"},
{:observer_cli, "~> 1.7"}
]
end
# config/config.exs
config :libcluster,
topologies: [
example: [
strategy: Cluster.Strategy.Kubernetes.DNS, # => Auto-discover in K8s
config: [
service: "my_app-headless", # => Headless service name
namespace: "default" # => K8s namespace
]
]
] # => Nodes connect automatically via DNS
# Or with fixed IPs
config :libcluster,
topologies: [
fixed: [
strategy: Cluster.Strategy.Static,
config: [
nodes: [:"app1@10.0.0.1", :"app2@10.0.0.2", :"app3@10.0.0.3"]
]
]
]
# lib/my_app/cluster.ex
defmodule MyApp.Cluster do
def start_link(opts) do
Supervisor.start_link(
[{:libcluster, Cluster.Strategy.Kubernetes.DNS, [topologies: topologies()]}],
opts
)
end
defp topologies do
Application.get_env(:libcluster, :topologies)
end
end
# Distributed PubSub across nodes
broadcast_all = fn event, data ->
MyApp.PubSub
|> Phoenix.PubSub.broadcast("topic", {:event, event, data})
# => Sends to ALL nodes in cluster
end # => Every connected LiveView receives update
# List connected nodes
Node.list()Key Takeaway: libcluster connects nodes automatically. Distribute PubSub across cluster. All nodes share state. Provides fault tolerance.
Why It Matters: PubSub enables event-driven communication between processes. This pattern powers real-time broadcasts and decoupled system architecture.
Example 69: WebSocket Load Balancing with Sticky Sessions
Route WebSocket connections to same server. Use load balancer affinity.
# nginx.conf - Sticky sessions by IP
upstream my_app {
server app1:4000;
server app2:4000;
server app3:4000;
hash $remote_addr consistent; # => Sticky by IP
}
server {
listen 80;
server_name my_app.com;
location / {
proxy_pass http://my_app;
proxy_http_version 1.1;
# WebSocket headers
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# Preserve client IP
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Real-IP $remote_addr;
}
}
# HAProxy.cfg - Also supports sticky sessions
backend phoenix_nodes
balance roundrobin
cookie SERVERID insert indirect nocache
server node1 app1:4000 check cookie node1
server node2 app2:4000 check cookie node2
server node3 app3:4000 check cookie node3Key Takeaway: WebSockets require persistent connections to same server. Use sticky sessions (by IP or cookie). Load balancer must preserve connection. Forward X-Forwarded-For headers.
Why It Matters: Phoenix sockets enable efficient bidirectional communication for real-time features. This is essential for chat, notifications, and collaborative editing applications.
Example 70: Blue-Green Deployment for Zero-Downtime Releases
Run two production environments. Switch traffic after verifying new version works.
%% Blue-green deployment flow
graph TD
A[Current: Blue v1.0] --> B[Deploy Green v1.1]
B --> C[Test Green in isolation]
C --> D{Tests pass?}
D -->|Yes| E[Switch traffic to Green]
D -->|No| F[Keep Blue, fix Green]
E --> G[Green is now production]
G --> H[Blue becomes standby]
H --> I[Rollback available]
style A fill:#0173B2,color:#fff
style C fill:#DE8F05,color:#000
style E fill:#029E73,color:#fff
style F fill:#CA9161,color:#000
style I fill:#CC78BC,color:#000
# Blue-Green deployment with two releases
#!/bin/bash
# Current version (blue)
BLUE_VERSION=$(aws ecs describe-services --cluster prod --services my-app | \
jq -r '.services[0].taskDefinition' | \
grep -oP 'my-app:\K[^:]+')
# New version (green)
GREEN_VERSION=$(git describe --tags --always)
# Build and deploy green version
mix release
docker build -t my-app:$GREEN_VERSION .
docker tag my-app:$GREEN_VERSION my-app:green
docker push my-app:green
# Register new task definition
aws ecs register-task-definition \
--family my-app \
--container-definitions "[{\"image\": \"my-app:$GREEN_VERSION\", ...}]"
# Update service to use green
aws ecs update-service \
--cluster prod \
--service my-app \
--task-definition my-app:$GREEN_VERSION
# Wait for deployment
aws ecs wait services-stable \
--cluster prod \
--services my-app
# Test green version
curl http://green.my-app.com/health/ready
if [ $? -eq 0 ]; then
# Switch traffic from blue to green
aws route53 change-resource-record-sets \
--hosted-zone-id Z123 \
--change-batch "[{\"Action\": \"UPSERT\", \"ResourceRecordSet\": {\"Name\": \"my-app.com\", \"Type\": \"A\", \"TTL\": 60, \"ResourceRecords\": [{\"Value\": \"green-alb.aws.com\"}]}}]"
echo "Deployment successful!"
else
# Rollback to blue
aws ecs update-service \
--cluster prod \
--service my-app \
--task-definition my-app:$BLUE_VERSION
fiKey Takeaway: Blue-green keeps current version running while deploying new version. Switch traffic only after verification. Can rollback instantly. Zero-downtime deployments.
Why It Matters: Deployment releases enable zero-downtime updates and easy rollbacks. Understanding OTP releases is essential for production Phoenix applications.
Example 71: Custom Ecto Types for Domain Logic
Create custom Ecto types to encapsulate domain logic and validation.
# lib/my_app/ecto_types/money.ex
defmodule MyApp.EctoTypes.Money do
use Ecto.Type
def type, do: :map # => Stored as JSONB in PostgreSQL
# Cast from user input
def cast(%{"amount" => amount, "currency" => currency}) when is_number(amount) do
{:ok, %{amount: amount, currency: currency}} # => Valid money struct
end
def cast(_), do: :error # => Invalid format
# Load from database
def load(%{"amount" => amount, "currency" => currency}) do
{:ok, %{amount: Decimal.new(amount), currency: currency}} # => Convert to Decimal
end
# Dump to database
def dump(%{amount: amount, currency: currency}) do
{:ok, %{"amount" => Decimal.to_float(amount), "currency" => currency}}
# => Store as JSON: {"amount": 19.99, "currency": "USD"}
end
def dump(_), do: :error
end
# Usage in schema
defmodule MyApp.Shop.Product do
use Ecto.Schema
schema "products" do
field :name, :string
field :price, MyApp.EctoTypes.Money # => Custom type
timestamps()
end
def changeset(product, attrs) do
product
|> cast(attrs, [:name, :price]) # => Cast price with custom type
|> validate_required([:name, :price])
|> validate_price() # => Domain validation
end
defp validate_price(changeset) do
case get_change(changeset, :price) do
%{amount: amount} when amount < 0 ->
add_error(changeset, :price, "must be positive") # => Business rule
_ ->
changeset
end
end
end
# Create product
%Product{}
|> Product.changeset(%{
name: "Widget",
price: %{"amount" => 19.99, "currency" => "USD"} # => Input format
})
|> Repo.insert()
# => Stores: {"amount": 19.99, "currency": "USD"} in databaseKey Takeaway: Custom Ecto types encapsulate domain logic. Implement cast/1, load/1, dump/1, and type/0. Use Decimal for money to avoid floating-point errors.
Why It Matters: Ecto provides type-safe database interactions with compile-time query validation. This prevents SQL injection and catches query errors before deployment.
Example 72: Database Fragments for Advanced Queries
Use Ecto fragments for PostgreSQL-specific features and complex SQL.
# Full-text search with tsvector
defmodule MyApp.Blog do
import Ecto.Query
def search_posts(search_term) do
from p in Post,
where: fragment(
"to_tsvector('english', ?) @@ plainto_tsquery('english', ?)",
p.title, # => Search in title
^search_term # => User's search query
),
order_by: fragment(
"ts_rank(to_tsvector('english', ?), plainto_tsquery('english', ?)) DESC",
p.title,
^search_term # => Rank by relevance
),
select: %{
post: p,
rank: fragment("ts_rank(to_tsvector('english', ?), plainto_tsquery('english', ?))",
p.title, ^search_term) # => Include rank in results
}
|> Repo.all()
# => SELECT posts.*, ts_rank(...) FROM posts WHERE to_tsvector(...) @@ plainto_tsquery(...)
end
# JSONB queries
def posts_by_metadata(key, value) do
from p in Post,
where: fragment("? ->> ? = ?", p.metadata, ^key, ^value)
# => SELECT * FROM posts WHERE metadata->>'status' = 'draft'
|> Repo.all()
end
# Array contains
def posts_with_tag(tag) do
from p in Post,
where: fragment("? @> ?::jsonb", p.tags, ^[tag]) # => Array contains tag
|> Repo.all()
# => SELECT * FROM posts WHERE tags @> '["elixir"]'::jsonb
end
# Window functions
def posts_with_rank do
from p in Post,
select: %{
post: p,
row_number: fragment("ROW_NUMBER() OVER (ORDER BY ? DESC)", p.inserted_at),
# => Assign sequential number
rank: fragment("RANK() OVER (ORDER BY ? DESC)", p.view_count)
# => Rank by view count (ties get same rank)
}
|> Repo.all()
end
# Common Table Expressions (CTEs)
def posts_with_comment_count do
comments_cte = from c in Comment,
group_by: c.post_id,
select: %{post_id: c.post_id, count: count(c.id)}
from p in Post,
left_join: c in subquery(comments_cte), on: c.post_id == p.id,
select: %{post: p, comment_count: coalesce(c.count, 0)}
|> Repo.all()
# => WITH comments AS (...) SELECT posts.*, COALESCE(comments.count, 0)
end
endKey Takeaway: Use fragment/1 for database-specific SQL. Supports full-text search, JSONB queries, window functions, and CTEs. Always use ^pinned parameters to prevent SQL injection.
Why It Matters: JSON API patterns enable integration with mobile apps and external services. Phoenix makes building RESTful APIs straightforward with proper content negotiation.
Example 73: Query Profiling with Telemetry
Measure query performance to identify slow queries in production.
# lib/my_app/telemetry.ex
defmodule MyApp.Telemetry do
require Logger
def attach_handlers do
:telemetry.attach(
"my-app-ecto-query", # => Handler ID
[:my_app, :repo, :query], # => Event name
&__MODULE__.handle_query/4, # => Handler function
nil
)
end
def handle_query(_event, measurements, metadata, _config) do
query_time = measurements.total_time # => Time in native units
query_time_ms = System.convert_time_unit(query_time, :native, :millisecond)
if query_time_ms > 100 do # => Slow query threshold
Logger.warning("Slow query detected",
query: inspect(metadata.query), # => SQL query
params: inspect(metadata.params), # => Query parameters
time_ms: query_time_ms, # => Execution time
source: metadata.source # => Table name
)
# => Log: Slow query detected query="SELECT ..." time_ms=250
end
# Send to monitoring service
if query_time_ms > 1000 do # => Very slow (>1s)
MyApp.Monitoring.report_slow_query(%{
query: metadata.query,
time_ms: query_time_ms,
type: metadata.type # => :ecto_sql_query
})
end
end
end
# In application.ex
def start(_type, _args) do
MyApp.Telemetry.attach_handlers() # => Attach on startup
children = [
MyApp.Repo,
MyAppWeb.Endpoint
]
Supervisor.start_link(children, strategy: :one_for_one)
end
# Query all telemetry events
defmodule MyApp.QueryStats do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
:telemetry.attach_many(
"query-stats",
[
[:my_app, :repo, :query], # => Ecto queries
[:phoenix, :router_dispatch, :stop], # => HTTP requests
[:phoenix, :endpoint, :stop] # => Endpoint timing
],
&__MODULE__.handle_event/4,
%{stats: %{}}
)
{:ok, %{query_count: 0, total_time: 0}}
end
def handle_event([:my_app, :repo, :query], measurements, _metadata, state) do
state = %{
query_count: state.query_count + 1, # => Increment counter
total_time: state.total_time + measurements.total_time
}
{:ok, state}
end
def get_stats do
GenServer.call(__MODULE__, :get_stats) # => Retrieve stats
end
def handle_call(:get_stats, _from, state) do
avg_time = if state.query_count > 0 do
state.total_time / state.query_count # => Average query time
else
0
end
{:reply, %{
query_count: state.query_count,
avg_time_ms: System.convert_time_unit(avg_time, :native, :millisecond)
}, state}
end
endKey Takeaway: Attach telemetry handlers to [:repo, :query] events. Log slow queries (>100ms). Track query count and average time. Use for production monitoring and optimization.
Why It Matters: Telemetry enables observability and performance monitoring. This is critical for debugging production issues and understanding application behavior.
Example 74: Advanced LiveView Performance Optimization
Optimize LiveView rendering with targeted updates and efficient assigns.
defmodule MyAppWeb.DashboardLive do
use Phoenix.LiveView
# Temporary assigns - not tracked for diff
def mount(_params, _session, socket) do
if connected?(socket) do
:timer.send_interval(1000, self(), :tick) # => Update every second
end
{:ok,
socket
|> assign(:time, DateTime.utc_now()) # => Tracked assign
|> assign(:posts, load_posts())
|> assign_new(:expensive_data, fn -> load_expensive_data() end)}
# => Only computed on first mount, cached after
end
# Use temporary_assigns to avoid diffing large data
def handle_info(:tick, socket) do
{:noreply,
socket
|> assign(:time, DateTime.utc_now()) # => Only updates time
|> push_event("time-update", %{time: DateTime.to_string(socket.assigns.time)})}
# => Send JS event instead of re-rendering
end
# Optimize list rendering with streams
def handle_event("load_more", _params, socket) do
new_posts = load_more_posts(socket.assigns.last_id)
{:noreply, stream_insert(socket, :posts, new_posts)} # => Append to stream
# => Only new items rendered, existing items unchanged
end
# Debounce user input
def handle_event("search", %{"query" => query}, socket) do
Process.send_after(self(), {:search, query}, 300) # => Debounce 300ms
{:noreply, assign(socket, :search_query, query)}
end
def handle_info({:search, query}, socket) do
if socket.assigns.search_query == query do # => Check still current
results = search_posts(query) # => Execute search
{:noreply, assign(socket, :search_results, results)}
else
{:noreply, socket} # => User typed more, skip
end
end
# Use update/3 for atomic updates
def handle_event("increment_likes", %{"post_id" => id}, socket) do
{:noreply,
update(socket, :posts, fn posts ->
Enum.map(posts, fn post ->
if post.id == id do
%{post | likes: post.likes + 1} # => Update only this post
else
post # => Keep unchanged
end
end)
end)}
end
# Render only changed sections
def render(assigns) do
~H"""
<div>
<.header time={@time} /> # => Separate component
<div id="posts" phx-update="stream">
<%= for {dom_id, post} <- @streams.posts do %>
<.post_card id={dom_id} post={post} /> # => Component per post
<% end %>
</div>
<.footer /> # => Static component
</div>
"""
end
# Function components for granular updates
def header(assigns) do
~H"""
<header>
<time><%= @time %></time> # => Only this re-renders
</header>
"""
end
endKey Takeaway: Use assign_new/3 for expensive one-time computation. Prefer streams over lists for collections. Debounce rapid events. Use push_event for client-side updates. Break templates into small function components.
Why It Matters: Templates with HEEx enable component-based UI development with compile-time validation. This catches HTML errors during compilation rather than runtime, improving reliability.
Example 75: Production Debugging with Observer and LiveDashboard
Debug production issues with Observer, LiveDashboard, and remote IEx.
# Connect to remote production node
# On local machine:
# iex --name debug@127.0.0.1 --cookie production_cookie
# In IEx session:
Node.connect(:"my_app@production-server.com") # => Connect to prod node
Node.list() # => ["my_app@production-server.com"]
# Inspect running processes
Process.list() # => All Erlang processes
|> Enum.filter(fn pid ->
case Process.info(pid, :current_function) do
{:current_function, {module, _fun, _arity}} ->
String.starts_with?(to_string(module), "Elixir.MyApp")
_ -> false
end
end)
|> Enum.map(fn pid ->
{pid, Process.info(pid, [:memory, :message_queue_len, :current_function])}
end)
# => [{#PID<0.123.0>, [memory: 12345, message_queue_len: 0, ...]}, ...]
# Find process by name
pid = Process.whereis(MyApp.Worker) # => #PID<0.456.0>
Process.info(pid) # => Full process info
# Check process mailbox
Process.info(pid, :message_queue_len) # => {message_queue_len, 1000}
# => If high, process is backed up
# Inspect state of GenServer
:sys.get_state(pid) # => Current state map
# => %{pending: [], processing: 10, errors: []}
# Trace function calls
:dbg.tracer() # => Start tracer
:dbg.p(:all, :c) # => Trace all processes
:dbg.tp(MyApp.Blog, :create_post, 2, []) # => Trace function
# => Prints every call to MyApp.Blog.create_post/2
# LiveDashboard metrics
defmodule MyAppWeb.Telemetry do
def metrics do
[
# VM Metrics
last_value("vm.memory.total", unit: {:byte, :megabyte}),
last_value("vm.total_run_queue_lengths.total"), # => Scheduler queue
last_value("vm.total_run_queue_lengths.cpu"),
# Database Metrics
summary("my_app.repo.query.total_time",
unit: {:native, :millisecond},
tags: [:source, :command] # => Group by table/command
),
counter("my_app.repo.query.count"), # => Total queries
# Phoenix Metrics
summary("phoenix.router_dispatch.stop.duration",
tags: [:route],
unit: {:native, :millisecond}
),
counter("phoenix.router_dispatch.stop.count",
tags: [:route, :status] # => Requests by route/status
),
# Custom Business Metrics
counter("my_app.orders.created.count"), # => Business events
last_value("my_app.users.active.count"),
distribution("my_app.checkout.duration",
buckets: [100, 500, 1000, 5000] # => Histogram buckets
)
]
end
end
# Memory leak detection
defmodule MyApp.MemoryMonitor do
use GenServer
require Logger
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
:timer.send_interval(60_000, :check_memory) # => Check every minute
{:ok, %{baseline: get_memory()}}
end
def handle_info(:check_memory, state) do
current = get_memory()
diff = current - state.baseline
if diff > 100_000_000 do # => >100MB increase
Logger.warning("Memory leak detected",
baseline_mb: div(state.baseline, 1_000_000),
current_mb: div(current, 1_000_000),
diff_mb: div(diff, 1_000_000)
)
# Get top memory consumers
top_processes = Process.list()
|> Enum.map(fn pid ->
{pid, Process.info(pid, :memory)}
end)
|> Enum.sort_by(fn {_pid, {:memory, mem}} -> mem end, :desc)
|> Enum.take(10) # => Top 10 processes
Logger.warning("Top memory consumers", processes: top_processes)
end
{:noreply, state}
end
defp get_memory do
:erlang.memory(:total) # => Total VM memory
end
endKey Takeaway: Use remote IEx to connect to production. Inspect process state with :sys.get_state/1. Monitor memory leaks. Use LiveDashboard for real-time metrics. Trace function calls with :dbg.
Why It Matters: Process-based architecture enables horizontal scaling and fault isolation. Understanding OTP processes is key to building highly available systems.
Example 76: Security Best Practices
Implement security headers, CSRF protection, and input sanitization.
# Security headers in endpoint
defmodule MyAppWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :my_app
plug Plug.Static, at: "/", from: :my_app
# Security headers
plug :put_secure_headers
defp put_secure_headers(conn, _opts) do
conn
|> put_resp_header("x-frame-options", "DENY") # => Prevent clickjacking
|> put_resp_header("x-content-type-options", "nosniff") # => Prevent MIME sniffing
|> put_resp_header("x-xss-protection", "1; mode=block") # => XSS protection
|> put_resp_header("strict-transport-security",
"max-age=31536000; includeSubDomains") # => Force HTTPS
|> put_resp_header("content-security-policy",
"default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'")
# => CSP policy
|> put_resp_header("referrer-policy", "strict-origin-when-cross-origin")
end
plug Plug.RequestId # => Add request ID header
plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint]
plug Plug.Parsers,
parsers: [:urlencoded, :multipart, :json],
pass: ["*/*"],
json_decoder: Jason
plug Plug.MethodOverride # => Support _method param
plug Plug.Head # => Handle HEAD requests
plug Plug.Session, @session_options # => Encrypted sessions
plug MyAppWeb.Router
end
# CSRF protection (automatic in Phoenix)
defmodule MyAppWeb.Router do
pipeline :browser do
plug :accepts, ["html"]
plug :fetch_session
plug :fetch_live_flash
plug :put_root_layout, {MyAppWeb.Layouts, :root}
plug :protect_from_forgery # => CSRF token validation
plug :put_secure_browser_headers
end
# API doesn't use CSRF (use JWT instead)
pipeline :api do
plug :accepts, ["json"]
# No CSRF protection for stateless API
end
end
# Input sanitization for user content
defmodule MyApp.Content do
@doc "Sanitize HTML to prevent XSS"
def sanitize_html(html) do
HtmlSanitizeEx.basic_html(html) # => Strip dangerous tags
# => Allows: <p>, <br>, <strong>, <em>, <a>, etc.
# => Removes: <script>, <iframe>, onclick, etc.
end
def sanitize_user_post(params) do
params
|> Map.update("body", "", &sanitize_html/1) # => Clean HTML content
|> Map.update("title", "", &String.trim/1) # => Trim whitespace
end
end
# SQL injection prevention (automatic with Ecto)
# ❌ NEVER do this:
def search_posts_unsafe(term) do
query = "SELECT * FROM posts WHERE title LIKE '%#{term}%'" # => SQL INJECTION!
Ecto.Adapters.SQL.query!(Repo, query, [])
end
# ✅ Always use parameterized queries:
def search_posts_safe(term) do
from p in Post,
where: like(p.title, ^"%#{term}%") # => Parameter binding
|> Repo.all()
# => Generates: SELECT * FROM posts WHERE title LIKE $1
# => Binds: ["%search%"]
end
# Rate limiting (prevent brute force)
defmodule MyAppWeb.Plugs.RateLimitLogin do
def init(opts), do: opts
def call(conn, _opts) do
ip = get_ip(conn) # => Client IP
key = "login_attempts:#{ip}"
case MyApp.Cache.get(key) do
nil ->
MyApp.Cache.set(key, 1, ttl: 300) # => First attempt, 5min window
conn
attempts when attempts < 5 ->
MyApp.Cache.incr(key) # => Increment counter
conn
_ ->
conn
|> put_status(:too_many_requests) # => 429
|> Phoenix.Controller.json(%{error: "Too many login attempts"})
|> halt() # => Block request
end
end
defp get_ip(conn) do
conn.remote_ip |> Tuple.to_list() |> Enum.join(".")
end
end
# Secure password hashing (automatic with phx.gen.auth)
defmodule MyApp.Accounts.User do
def registration_changeset(user, attrs) do
user
|> cast(attrs, [:email, :password])
|> validate_email()
|> validate_password()
|> hash_password() # => Bcrypt with salt
end
defp hash_password(changeset) do
password = get_change(changeset, :password)
if password && changeset.valid? do
changeset
|> put_change(:password_hash, Bcrypt.hash_pwd_salt(password))
|> delete_change(:password) # => Don't store plaintext
else
changeset
end
end
endKey Takeaway: Set security headers (CSP, HSTS, X-Frame-Options). CSRF tokens automatic in Phoenix. Sanitize user HTML with HtmlSanitizeEx. Always use Ecto for queries (prevents SQL injection). Rate limit login attempts.
Why It Matters: Ecto provides type-safe database interactions with compile-time query validation. This prevents SQL injection and catches query errors before deployment.
Example 77: WebSocket Connection Pooling
Optimize WebSocket connections with connection pooling and load distribution.
# Channel with connection limits
defmodule MyAppWeb.UserSocket do
use Phoenix.Socket
# Limit concurrent connections per user
def connect(%{"token" => token}, socket, _connect_info) do
case verify_token(token) do
{:ok, user_id} ->
# Check connection limit
case check_connection_limit(user_id) do
:ok ->
{:ok, assign(socket, :user_id, user_id)} # => Allow connection
{:error, :limit_exceeded} ->
:error # => Reject connection
end
{:error, _} ->
:error
end
end
defp check_connection_limit(user_id) do
count = Phoenix.Tracker.list(MyApp.Presence, "user:#{user_id}")
|> Enum.count() # => Count current connections
if count < 5 do # => Max 5 connections per user
:ok
else
{:error, :limit_exceeded}
end
end
def id(socket), do: "user_socket:#{socket.assigns.user_id}"
channel "room:*", MyAppWeb.RoomChannel
channel "user:*", MyAppWeb.UserChannel
end
# Connection pooling for external services
defmodule MyApp.External.APIClient do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
# Create connection pool
{:ok, pool} = :hackney_pool.child_spec(:api_pool, [
timeout: 15_000, # => Connection timeout
max_connections: 100 # => Pool size
])
{:ok, %{pool: pool}}
end
def get(path) do
url = "https://api.example.com" <> path
headers = [{"Authorization", "Bearer #{api_token()}"}]
case :hackney.get(url, headers, "", [pool: :api_pool]) do
{:ok, 200, _headers, client_ref} ->
{:ok, body} = :hackney.body(client_ref) # => Read response
Jason.decode(body) # => Parse JSON
{:ok, status, _headers, _client_ref} ->
{:error, {:http_error, status}}
{:error, reason} ->
{:error, reason}
end
end
end
# Distribute WebSocket connections across nodes
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
# Track connections across cluster
def track_connection(socket) do
track(socket, "connections", socket.id, %{
node: node(), # => Which Erlang node
joined_at: System.system_time(:second)
})
end
# Get connection count across all nodes
def connection_count do
list("connections") # => All tracked connections
|> Enum.count()
end
# Get connections per node
def connections_by_node do
list("connections")
|> Enum.group_by(fn {_id, %{metas: [meta | _]}} ->
meta.node # => Group by node name
end)
|> Enum.map(fn {node, conns} ->
{node, Enum.count(conns)} # => {node, count}
end)
|> Enum.into(%{})
end
endKey Takeaway: Limit connections per user to prevent abuse. Use Hackney pool for HTTP connection pooling. Track WebSocket connections across cluster with Presence. Distribute load across multiple nodes.
Why It Matters: Phoenix sockets enable efficient bidirectional communication for real-time features. This is essential for chat, notifications, and collaborative editing applications.
Example 78: GraphQL API with Absinthe
Build GraphQL API for flexible client queries.
# mix.exs
defp deps do
[
{:absinthe, "~> 1.7"}, # => GraphQL implementation
{:absinthe_plug, "~> 1.5"} # => Phoenix integration
]
end
# GraphQL schema
defmodule MyAppWeb.Schema do
use Absinthe.Schema
import_types MyAppWeb.Schema.ContentTypes
query do
@desc "Get all posts"
field :posts, list_of(:post) do
arg :limit, :integer, default_value: 10 # => Optional limit
arg :offset, :integer, default_value: 0
resolve &MyAppWeb.Resolvers.Content.list_posts/3
end
@desc "Get post by ID"
field :post, :post do
arg :id, non_null(:id) # => Required ID
resolve &MyAppWeb.Resolvers.Content.get_post/3
end
@desc "Search posts"
field :search_posts, list_of(:post) do
arg :query, non_null(:string)
resolve &MyAppWeb.Resolvers.Content.search_posts/3
end
end
mutation do
@desc "Create a post"
field :create_post, :post do
arg :title, non_null(:string)
arg :body, non_null(:string)
arg :tags, list_of(:string)
resolve &MyAppWeb.Resolvers.Content.create_post/3
end
@desc "Update a post"
field :update_post, :post do
arg :id, non_null(:id)
arg :title, :string # => Optional fields
arg :body, :string
resolve &MyAppWeb.Resolvers.Content.update_post/3
end
end
subscription do
@desc "Subscribe to new posts"
field :post_created, :post do
config fn _args, _context ->
{:ok, topic: "posts"} # => PubSub topic
end
trigger :create_post, topic: fn _post ->
["posts"] # => Trigger on mutation
end
end
end
end
# Object types
defmodule MyAppWeb.Schema.ContentTypes do
use Absinthe.Schema.Notation
object :post do
field :id, :id # => Post ID
field :title, :string
field :body, :string
field :inserted_at, :string
field :author, :user, resolve: &get_author/3 # => Nested resolver
field :comments, list_of(:comment) do
resolve &get_comments/3 # => Load comments
end
end
object :user do
field :id, :id
field :name, :string
field :email, :string
end
object :comment do
field :id, :id
field :body, :string
field :author, :user
end
defp get_author(post, _args, _resolution) do
{:ok, MyApp.Accounts.get_user!(post.author_id)} # => Load author
end
defp get_comments(post, _args, _resolution) do
comments = MyApp.Blog.list_comments(post.id)
{:ok, comments} # => Load comments
end
end
# Resolvers
defmodule MyAppWeb.Resolvers.Content do
def list_posts(_parent, args, _resolution) do
posts = MyApp.Blog.list_posts(args) # => Pass limit/offset
{:ok, posts}
end
def get_post(_parent, %{id: id}, _resolution) do
case MyApp.Blog.get_post(id) do
nil -> {:error, "Post not found"}
post -> {:ok, post}
end
end
def create_post(_parent, args, %{context: %{current_user: user}}) do
case MyApp.Blog.create_post(Map.put(args, :author_id, user.id)) do
{:ok, post} ->
Absinthe.Subscription.publish(
MyAppWeb.Endpoint,
post,
post_created: "posts" # => Trigger subscription
)
{:ok, post}
{:error, changeset} ->
{:error, changeset} # => Return errors
end
end
end
# Router
defmodule MyAppWeb.Router do
scope "/api" do
pipe_through :api
forward "/graphql", Absinthe.Plug,
schema: MyAppWeb.Schema # => GraphQL endpoint
forward "/graphiql", Absinthe.Plug.GraphiQL,
schema: MyAppWeb.Schema,
interface: :playground # => GraphQL IDE
end
end
# Example queries:
# query {
# posts(limit: 5) {
# id
# title
# author { name }
# comments { body }
# }
# }
#
# mutation {
# createPost(title: "Hello", body: "World") {
# id
# title
# }
# }Key Takeaway: Absinthe provides GraphQL for Phoenix. Define schema with queries, mutations, subscriptions. Clients request only needed fields. Use resolvers to load data. GraphiQL provides interactive API explorer.
Why It Matters: Schemas define data structure and types for validation and persistence. This provides a single source of truth for your domain models.
Example 79: Event Sourcing Pattern
Implement event sourcing to store state changes as immutable events.
# Event schema
defmodule MyApp.Events.Event do
use Ecto.Schema
schema "events" do
field :aggregate_id, :string # => Entity ID (e.g., order_id)
field :aggregate_type, :string # => Entity type (e.g., "Order")
field :event_type, :string # => Event name
field :payload, :map # => Event data (JSONB)
field :version, :integer # => Event version
field :inserted_at, :utc_datetime
end
end
# Event store
defmodule MyApp.EventStore do
alias MyApp.Events.Event
alias MyApp.Repo
def append_event(aggregate_id, aggregate_type, event_type, payload) do
# Get current version
current_version = get_current_version(aggregate_id)
event = %Event{
aggregate_id: aggregate_id,
aggregate_type: aggregate_type,
event_type: event_type,
payload: payload,
version: current_version + 1, # => Increment version
inserted_at: DateTime.utc_now()
}
case Repo.insert(event) do
{:ok, event} ->
# Publish event for projections
Phoenix.PubSub.broadcast(
MyApp.PubSub,
"events:#{aggregate_type}",
{:event_appended, event}
)
{:ok, event}
{:error, changeset} ->
{:error, changeset}
end
end
def get_events(aggregate_id) do
from e in Event,
where: e.aggregate_id == ^aggregate_id,
order_by: [asc: e.version] # => Chronological order
|> Repo.all()
end
defp get_current_version(aggregate_id) do
from e in Event,
where: e.aggregate_id == ^aggregate_id,
select: max(e.version)
|> Repo.one()
|> case do
nil -> 0 # => No events yet
version -> version
end
end
end
# Order aggregate
defmodule MyApp.Orders.Order do
defstruct id: nil,
items: [],
status: :draft,
total: 0,
version: 0
# Apply event to state
def apply_event(order, %Event{event_type: "OrderCreated", payload: payload}) do
%{order |
id: payload["id"],
items: [],
status: :draft,
total: 0
}
end
def apply_event(order, %Event{event_type: "ItemAdded", payload: payload}) do
item = payload["item"]
%{order |
items: [item | order.items],
total: order.total + item["price"] # => Update total
}
end
def apply_event(order, %Event{event_type: "OrderPlaced"}) do
%{order | status: :placed} # => Change status
end
def apply_event(order, %Event{event_type: "OrderShipped", payload: payload}) do
%{order |
status: :shipped,
tracking_number: payload["tracking_number"]
}
end
# Rebuild state from events
def from_events(events) do
Enum.reduce(events, %__MODULE__{}, &apply_event(&2, &1))
# => Replay all events to rebuild current state
end
end
# Command handler
defmodule MyApp.Orders.Commands do
alias MyApp.EventStore
alias MyApp.Orders.Order
def create_order(order_id) do
EventStore.append_event(
order_id,
"Order",
"OrderCreated",
%{"id" => order_id} # => Event payload
)
end
def add_item(order_id, item) do
# Load current state
events = EventStore.get_events(order_id)
order = Order.from_events(events) # => Rebuild from events
# Validate command
if order.status == :draft do
EventStore.append_event(
order_id,
"Order",
"ItemAdded",
%{"item" => item} # => Record event
)
else
{:error, "Cannot add items to placed order"}
end
end
def place_order(order_id) do
events = EventStore.get_events(order_id)
order = Order.from_events(events)
if order.total > 0 do
EventStore.append_event(
order_id,
"Order",
"OrderPlaced",
%{} # => No additional data
)
else
{:error, "Order must have items"}
end
end
end
# Read model projection (materialized view)
defmodule MyApp.Orders.Projection do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def init(_) do
Phoenix.PubSub.subscribe(MyApp.PubSub, "events:Order")
{:ok, %{}}
end
def handle_info({:event_appended, event}, state) do
# Update read model based on event
update_read_model(event) # => Update database view
{:noreply, state}
end
defp update_read_model(%{event_type: "OrderPlaced", aggregate_id: order_id}) do
# Update orders table for fast queries
events = EventStore.get_events(order_id)
order = Order.from_events(events)
# Save snapshot for performance
MyApp.Repo.insert_or_update(%MyApp.Orders.OrderSnapshot{
id: order_id,
items: order.items,
total: order.total,
status: order.status
})
end
endKey Takeaway: Store all state changes as immutable events. Rebuild state by replaying events. Use projections for fast queries. Events provide complete audit trail. Supports time travel and debugging.
Why It Matters: Event-driven patterns decouple components and enable scalable architectures. Understanding events is key to building maintainable Phoenix applications.
Example 80: Advanced Testing Strategies
Implement property-based testing and contract testing.
# Property-based testing with StreamData
defmodule MyApp.BlogTest do
use ExUnit.Case
use ExUnitProperties # => Property testing
property "post title is always trimmed" do
check all title <- string(:alphanumeric, min_length: 1, max_length: 100) do
# Generate random titles
params = %{"title" => " #{title} "} # => Add whitespace
changeset = Post.changeset(%Post{}, params)
if changeset.valid? do
trimmed = Ecto.Changeset.get_change(changeset, :title)
assert String.trim(title) == trimmed # => Property: always trimmed
end
end
end
property "creating and deleting post is idempotent" do
check all title <- string(:printable) do
# Create post
{:ok, post} = MyApp.Blog.create_post(%{title: title, body: "test"})
# Delete post
{:ok, _} = MyApp.Blog.delete_post(post.id)
# Verify deleted
assert MyApp.Blog.get_post(post.id) == nil # => Property: delete works
end
end
end
# Contract testing for APIs
defmodule MyApp.API.ContractTest do
use ExUnit.Case
@api_contract %{
"POST /api/posts" => %{
request: %{
"title" => "string",
"body" => "string"
},
response: %{
status: 201,
body: %{
"id" => "integer",
"title" => "string",
"body" => "string",
"inserted_at" => "string"
}
}
},
"GET /api/posts/:id" => %{
response: %{
status: 200,
body: %{
"id" => "integer",
"title" => "string"
}
}
}
}
test "API contract compliance" do
for {endpoint, contract} <- @api_contract do
# Test request/response matches contract
verify_contract(endpoint, contract) # => Verify API shape
end
end
defp verify_contract("POST /api/posts", contract) do
conn = build_conn()
|> post("/api/posts", contract.request)
assert conn.status == contract.response.status
body = Jason.decode!(conn.resp_body)
# Verify response structure
for {field, type} <- contract.response.body do
assert Map.has_key?(body, field) # => Field exists
assert type_matches?(body[field], type) # => Type correct
end
end
defp type_matches?(value, "string"), do: is_binary(value)
defp type_matches?(value, "integer"), do: is_integer(value)
end
# Integration tests with setup
defmodule MyAppWeb.PostLiveTest do
use MyAppWeb.ConnCase, async: true
import Phoenix.LiveViewTest
setup do
# Setup test data
user = insert(:user) # => Factory
posts = insert_list(3, :post, author: user) # => Create 3 posts
{:ok, user: user, posts: posts} # => Return to test
end
test "displays posts", %{conn: conn, posts: posts} do
{:ok, view, html} = live(conn, "/posts")
# Assert all posts shown
for post <- posts do
assert html =~ post.title # => Each title present
end
# Test interaction
view
|> element("button", "Load More")
|> render_click() # => Click button
assert has_element?(view, "[data-role=post]", count: 6) # => 6 posts now
end
test "creates post", %{conn: conn} do
{:ok, view, _html} = live(conn, "/posts/new")
# Fill form
view
|> form("#post-form", post: %{title: "", body: ""})
|> render_change() # => Trigger validation
assert has_element?(view, ".error") # => Shows errors
# Submit valid form
view
|> form("#post-form", post: %{title: "Test", body: "Content"})
|> render_submit() # => Submit form
assert_redirected(view, "/posts/1") # => Redirects to show
end
end
# Performance testing
defmodule MyApp.PerformanceTest do
use ExUnit.Case
@tag :performance
test "list posts performs under 100ms" do
# Create test data
insert_list(100, :post)
{time_us, _result} = :timer.tc(fn ->
MyApp.Blog.list_posts() # => Measure execution time
end)
time_ms = time_us / 1000
assert time_ms < 100, "Query took #{time_ms}ms, expected < 100ms"
end
@tag :performance
test "N+1 query prevention" do
posts = insert_list(10, :post)
Enum.each(posts, fn post ->
insert_list(5, :comment, post: post)
end)
# Count queries
query_count = count_queries(fn ->
MyApp.Blog.list_posts_with_comments() # => Preload comments
end)
assert query_count == 2, "Expected 2 queries, got #{query_count}"
# => 1 for posts, 1 for comments (not 11!)
end
defp count_queries(fun) do
ref = make_ref()
:telemetry.attach(
"query-counter-#{ref}",
[:my_app, :repo, :query],
fn _event, _measurements, _metadata, acc ->
send(self(), {:query, acc + 1})
end,
0
)
fun.()
count = receive_queries(0)
:telemetry.detach("query-counter-#{ref}")
count
end
defp receive_queries(count) do
receive do
{:query, new_count} -> receive_queries(new_count)
after
100 -> count # => No more queries
end
end
endKey Takeaway: Property-based testing validates invariants across random inputs. Contract testing ensures API stability. Integration tests verify full user flows. Performance tests catch regressions. Use async: true for parallel test execution.
Why It Matters: This Phoenix pattern is fundamental for building production web applications. Understanding this concept enables you to create robust, maintainable, and scalable applications.