Task Agent
Need lightweight concurrency without GenServer complexity? This guide teaches you Task for async operations and Agent for simple state containers with practical patterns for parallel processing and state management.
Prerequisites
- Understanding of processes
- Basic GenServer knowledge
- Completed Intermediate Tutorial
Problem
GenServer is powerful but heavyweight for simple async operations or basic state storage. You need lighter abstractions for common concurrency patterns like running parallel computations, managing simple state without callback boilerplate, handling async timeouts, and coordinating concurrent tasks.
Challenges:
- Running concurrent operations and collecting results
- Simple state without callback boilerplate
- Handling async timeouts and failures
- Managing task supervision
- Coordinating multiple tasks
- Updating shared state safely
Solution
Use Task for concurrent computations with built-in supervision and Agent for simple state containers without the overhead of GenServer callbacks.
How It Works
Task Patterns
1. Basic Async/Await
task = Task.async(fn ->
:timer.sleep(1000)
"Result"
end)
IO.puts "Working on other things..."
result = Task.await(task) # "Result"Real-world example:
defmodule UserLoader do
def load_user_with_details(user_id) do
# Load user and their data concurrently
user_task = Task.async(fn -> Repo.get(User, user_id) end)
posts_task = Task.async(fn -> Repo.get_posts_by_user(user_id) end)
comments_task = Task.async(fn -> Repo.get_comments_by_user(user_id) end)
# Wait for all results
user = Task.await(user_task)
posts = Task.await(posts_task)
comments = Task.await(comments_task)
%{user: user, posts: posts, comments: comments}
end
end2. Multiple Concurrent Tasks
defmodule ParallelProcessor do
def fetch_all_users(user_ids) do
tasks = Enum.map(user_ids, fn id ->
Task.async(fn -> fetch_user(id) end)
end)
# Wait for all tasks (default 5s timeout each)
Task.await_many(tasks)
end
defp fetch_user(id) do
# Simulate API call
:timer.sleep(100)
%{id: id, name: "User #{id}"}
end
end
ParallelProcessor.fetch_all_users([1, 2, 3, 4, 5])With custom timeout:
def fetch_with_timeout(user_ids) do
tasks = Enum.map(user_ids, fn id ->
Task.async(fn -> slow_api_call(id) end)
end)
# 10 second timeout for each task
Task.await_many(tasks, 10_000)
end3. Supervised Tasks
Add to application supervisor:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endUsing supervised tasks:
defmodule SafeProcessor do
def process_risky_operation(data) do
task = Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
risky_operation(data)
end)
# If task crashes, it's supervised and won't take down caller
case Task.await(task, 5000) do
result -> {:ok, result}
catch
:exit, _ -> {:error, :task_failed}
end
end
defp risky_operation(data) do
# Might raise or crash
external_api_call(data)
end
end4. Fire-and-Forget
Unsupervised (use with caution):
Task.start(fn ->
send_email(user)
end)
:okSupervised (recommended):
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
cleanup_old_data()
end)5. Task.async_stream for Enumerable Processing
Process items concurrently with back-pressure:
defmodule BatchProcessor do
def process_urls(urls) do
urls
|> Task.async_stream(&fetch_url/1, max_concurrency: 10, timeout: 5000)
|> Enum.map(fn
{:ok, result} -> result
{:exit, reason} -> {:error, reason}
end)
end
defp fetch_url(url) do
HTTPoison.get!(url)
|> Map.get(:body)
end
end
BatchProcessor.process_urls([
"https://example.com/1",
"https://example.com/2",
# ... 100 more URLs
])Advanced async_stream options:
urls
|> Task.async_stream(
&fetch_url/1,
max_concurrency: 20, # Max parallel tasks
timeout: 10_000, # Per-task timeout
on_timeout: :kill_task, # Kill tasks that timeout
ordered: false # Results can be out of order (faster)
)
|> Stream.filter(fn
{:ok, _} -> true
{:exit, _} -> false # Filter out failures
end)
|> Enum.to_list()6. Task.yield for Manual Control
defmodule FlexibleProcessor do
def process_with_manual_control(data) do
task = Task.async(fn -> expensive_operation(data) end)
# Do other work while task runs
other_result = quick_operation()
# Check if task completed (1 second timeout)
case Task.yield(task, 1000) do
{:ok, result} ->
# Task completed within timeout
{:ok, result, other_result}
nil ->
# Task still running, shut it down
Task.shutdown(task, :brutal_kill)
{:error, :timeout, other_result}
end
end
endAgent Patterns
1. Simple Counter
defmodule Counter do
use Agent
def start_link(initial_value \\ 0) do
Agent.start_link(fn -> initial_value end, name: __MODULE__)
end
def increment do
Agent.update(__MODULE__, fn count -> count + 1 end)
end
def decrement do
Agent.update(__MODULE__, fn count -> count - 1 end)
end
def get do
Agent.get(__MODULE__, fn count -> count end)
end
def reset do
Agent.update(__MODULE__, fn _ -> 0 end)
end
end
{:ok, _pid} = Counter.start_link(10)
Counter.increment() # 11
Counter.increment() # 12
Counter.get() # 12
Counter.reset() # 02. Key-Value Store
defmodule KVStore do
use Agent
def start_link(_opts) do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def get(key) do
Agent.get(__MODULE__, fn map -> Map.get(map, key) end)
end
def put(key, value) do
Agent.update(__MODULE__, fn map -> Map.put(map, key, value) end)
end
def delete(key) do
Agent.update(__MODULE__, fn map -> Map.delete(map, key) end)
end
def get_all do
Agent.get(__MODULE__, fn map -> map end)
end
def keys do
Agent.get(__MODULE__, fn map -> Map.keys(map) end)
end
end3. Configuration Store
defmodule Config do
use Agent
def start_link(initial_config) do
Agent.start_link(fn -> initial_config end, name: __MODULE__)
end
def get(key, default \\ nil) do
Agent.get(__MODULE__, fn config ->
get_in(config, List.wrap(key)) || default
end)
end
def put(key, value) do
Agent.update(__MODULE__, fn config ->
put_in(config, List.wrap(key), value)
end)
end
def update(key, fun) do
Agent.get_and_update(__MODULE__, fn config ->
current_value = get_in(config, List.wrap(key))
new_value = fun.(current_value)
new_config = put_in(config, List.wrap(key), new_value)
{new_value, new_config}
end)
end
end
Config.start_link(%{
database: %{host: "localhost", port: 5432},
cache: %{ttl: 3600}
})
Config.get([:database, :host]) # "localhost"
Config.put([:database, :port], 5433)
Config.update([:cache, :ttl], &(&1 * 2)) # Double the TTL4. Agent with Complex State
defmodule GameState do
use Agent
defstruct players: [], score: %{}, round: 1
def start_link(_opts) do
Agent.start_link(fn -> %__MODULE__{} end, name: __MODULE__)
end
def add_player(player_name) do
Agent.update(__MODULE__, fn state ->
%{state |
players: [player_name | state.players],
score: Map.put(state.score, player_name, 0)
}
end)
end
def update_score(player_name, points) do
Agent.update(__MODULE__, fn state ->
%{state | score: Map.update(state.score, player_name, points, &(&1 + points))}
end)
end
def next_round do
Agent.update(__MODULE__, fn state ->
%{state | round: state.round + 1}
end)
end
def get_state do
Agent.get(__MODULE__, fn state -> state end)
end
def leaderboard do
Agent.get(__MODULE__, fn state ->
state.score
|> Enum.sort_by(fn {_name, score} -> score end, :desc)
end)
end
end5. Agent with Timeout and Cleanup
defmodule SessionStore do
use Agent
def start_link(_opts) do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def create_session(user_id) do
session_id = generate_session_id()
expires_at = System.system_time(:second) + 3600 # 1 hour
Agent.update(__MODULE__, fn sessions ->
Map.put(sessions, session_id, %{
user_id: user_id,
expires_at: expires_at,
created_at: DateTime.utc_now()
})
end)
session_id
end
def get_session(session_id) do
Agent.get(__MODULE__, fn sessions ->
case Map.get(sessions, session_id) do
nil -> {:error, :not_found}
session ->
if session.expires_at > System.system_time(:second) do
{:ok, session}
else
{:error, :expired}
end
end
end)
end
def delete_session(session_id) do
Agent.update(__MODULE__, fn sessions ->
Map.delete(sessions, session_id)
end)
end
def cleanup_expired do
now = System.system_time(:second)
Agent.update(__MODULE__, fn sessions ->
Enum.reject(sessions, fn {_id, session} ->
session.expires_at <= now
end)
|> Map.new()
end)
end
defp generate_session_id do
:crypto.strong_rand_bytes(32) |> Base.encode64()
end
endVariations
Task with Custom Timeout
task = Task.async(fn -> slow_operation() end)
Task.await(task, 10_000) # 10 second timeoutAgent with Complex State Updates
defmodule Analytics do
use Agent
def start_link(_opts) do
Agent.start_link(fn ->
%{
page_views: %{},
unique_visitors: MapSet.new(),
total_requests: 0
}
end, name: __MODULE__)
end
def track_page_view(page, visitor_id) do
Agent.update(__MODULE__, fn state ->
%{state |
page_views: Map.update(state.page_views, page, 1, &(&1 + 1)),
unique_visitors: MapSet.put(state.unique_visitors, visitor_id),
total_requests: state.total_requests + 1
}
end)
end
def get_stats do
Agent.get(__MODULE__, fn state ->
%{
total_page_views: Enum.sum(Map.values(state.page_views)),
unique_visitors: MapSet.size(state.unique_visitors),
total_requests: state.total_requests,
popular_pages: Enum.sort_by(state.page_views, fn {_k, v} -> v end, :desc)
|> Enum.take(5)
}
end)
end
endCoordinating Tasks
defmodule Pipeline do
def process_data(input) do
# Stage 1: Parse
parse_task = Task.async(fn -> parse(input) end)
parsed = Task.await(parse_task)
# Stage 2: Transform (parallel)
transform_tasks = Enum.map(parsed, fn item ->
Task.async(fn -> transform(item) end)
end)
transformed = Task.await_many(transform_tasks)
# Stage 3: Aggregate
aggregate_task = Task.async(fn -> aggregate(transformed) end)
Task.await(aggregate_task)
end
defp parse(input), do: String.split(input, ",")
defp transform(item), do: String.upcase(item)
defp aggregate(items), do: Enum.join(items, " | ")
endAdvanced Patterns
1. Task Pool Pattern
defmodule TaskPool do
def parallel_map(collection, fun, opts \\ []) do
max_concurrency = Keyword.get(opts, :max_concurrency, System.schedulers_online() * 2)
collection
|> Task.async_stream(fun, max_concurrency: max_concurrency)
|> Enum.map(fn {:ok, result} -> result end)
end
end
TaskPool.parallel_map(1..1000, fn n ->
expensive_operation(n)
end, max_concurrency: 20)2. Agent with ETS for Performance
defmodule HybridCache do
use Agent
def start_link(_opts) do
:ets.new(:cache, [:named_table, :public, :set])
Agent.start_link(fn -> :ok end, name: __MODULE__)
end
def put(key, value) do
:ets.insert(:cache, {key, value})
end
def get(key) do
case :ets.lookup(:cache, key) do
[{^key, value}] -> {:ok, value}
[] -> {:error, :not_found}
end
end
# Agent only for coordination, ETS for fast reads
end3. Task Retry Pattern
defmodule RetryTask do
def async_with_retry(fun, retries \\ 3) do
Task.async(fn ->
retry(fun, retries)
end)
end
defp retry(fun, retries) when retries > 0 do
try do
fun.()
rescue
error ->
if retries > 1 do
:timer.sleep(1000) # Wait before retry
retry(fun, retries - 1)
else
reraise error, __STACKTRACE__
end
end
end
end
task = RetryTask.async_with_retry(fn ->
unreliable_api_call()
end, 5)
Task.await(task)4. Distributed Agent Pattern
defmodule DistributedCounter do
use Agent
def start_link(opts) do
name = Keyword.get(opts, :name, __MODULE__)
Agent.start_link(fn -> 0 end, name: {:global, name})
end
def increment(name \\ __MODULE__) do
Agent.update({:global, name}, fn count -> count + 1 end)
end
def get(name \\ __MODULE__) do
Agent.get({:global, name}, fn count -> count end)
end
endUse Cases
Task:
- Parallel API calls
- Background jobs
- CPU-intensive computations
- Batch processing
- Database queries in parallel
- File processing pipelines
Agent:
- Application configuration
- Simple caches
- Counters/metrics
- Temporary state
- Session storage
- Feature flags
Combined:
- Fan-out/fan-in patterns
- Parallel processing with shared state
- Worker pools
- Rate limiting
- Circuit breakers
Best Practices
Use Task.Supervisor for production:
# Don't Task.async(fn -> risky_operation() end) # Do Task.Supervisor.async(MyApp.TaskSupervisor, fn -> risky_operation() end)Always set timeouts:
Task.await(task, 5_000) # Explicit timeoutHandle task failures:
try do Task.await(task) catch :exit, _ -> handle_failure() endKeep Agent updates fast:
# Don't - slow operation in Agent Agent.update(MyAgent, fn state -> slow_computation(state) end) # Do - compute outside, update quickly new_value = slow_computation(old_value) Agent.update(MyAgent, fn _ -> new_value end)Use get_and_update for atomic operations:
Agent.get_and_update(MyAgent, fn state -> {state.value, %{state | value: new_value}} end)
Common Pitfalls
- Not using supervision: Tasks crash and bring down caller
- Missing timeouts: Tasks hang forever
- Blocking Agent calls: Slow operations block all access
- Too many concurrent tasks: Memory exhaustion
- Not handling failures: Errors propagate unexpectedly
- Race conditions: Multiple Agents instead of one GenServer
Troubleshooting
Task Timeout
Task.await(task, 30_000)
Task.shutdown(task, 5_000)Agent Bottleneck
:ets.new(:my_table, [:named_table, :public])
:ets.insert(:my_table, {:key, :value})
:ets.lookup(:my_table, :key)Memory Leaks in Agents
def cleanup_old_entries do
Agent.update(__MODULE__, fn state ->
Enum.filter(state, &is_recent?/1)
end)
end