Async Programming Patterns
Problem
Modern applications require efficient asynchronous I/O without blocking threads. Rust’s async/await provides zero-cost abstractions, but requires understanding runtimes, futures, and async patterns.
Solution
1. Basic Async with Tokio
// Cargo.toml
// [dependencies]
// tokio = { version = "1.41", features = ["full"] }
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
println!("Starting");
async_operation().await;
println!("Done");
}
async fn async_operation() {
println!("Operation started");
sleep(Duration::from_secs(1)).await;
println!("Operation completed");
}
// Concurrent execution
async fn fetch_data() {
let (result1, result2, result3) = tokio::join!(
fetch_user(1),
fetch_user(2),
fetch_user(3),
);
println!("Results: {:?}, {:?}, {:?}", result1, result2, result3);
}
async fn fetch_user(id: u64) -> String {
sleep(Duration::from_millis(100)).await;
format!("User {}", id)
}2. Async HTTP with reqwest
// Cargo.toml
// [dependencies]
// reqwest = { version = "0.12", features = ["json"] }
// serde = { version = "1.0", features = ["derive"] }
use reqwest;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct User {
id: u64,
name: String,
}
async fn fetch_users() -> Result<Vec<User>, Box<dyn std::error::Error>> {
let response = reqwest::get("https://api.example.com/users")
.await?
.json::<Vec<User>>()
.await?;
Ok(response)
}
// Concurrent requests
async fn fetch_multiple_users() -> Result<(), Box<dyn std::error::Error>> {
let user_ids = vec![1, 2, 3, 4, 5];
let tasks: Vec<_> = user_ids
.into_iter()
.map(|id| async move {
reqwest::get(format!("https://api.example.com/users/{}", id))
.await?
.json::<User>()
.await
})
.collect();
let results = futures::future::try_join_all(tasks).await?;
println!("Fetched {} users", results.len());
Ok(())
}3. Async Channels and Streams
use tokio::sync::mpsc;
async fn producer_consumer() {
let (tx, mut rx) = mpsc::channel(32);
// Producer task
tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
sleep(Duration::from_millis(100)).await;
}
});
// Consumer task
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
}
// Streams
use futures::StreamExt;
async fn process_stream() {
let stream = futures::stream::iter(0..10);
stream
.map(|x| x * 2)
.filter(|x| futures::future::ready(*x > 5))
.for_each(|x| {
println!("Value: {}", x);
futures::future::ready(())
})
.await;
}4. Timeouts and Cancellation
use tokio::time::timeout;
async fn with_timeout() -> Result<String, Box<dyn std::error::Error>> {
match timeout(Duration::from_secs(5), slow_operation()).await {
Ok(result) => Ok(result),
Err(_) => Err("Operation timed out".into()),
}
}
async fn slow_operation() -> String {
sleep(Duration::from_secs(10)).await;
"Done".to_string()
}
// Cancellation with select
use tokio::select;
async fn cancellable_operation() {
let mut operation = tokio::spawn(long_running_task());
let mut cancel_signal = tokio::signal::ctrl_c();
select! {
result = &mut operation => {
println!("Operation completed: {:?}", result);
}
_ = &mut cancel_signal => {
println!("Cancelled");
operation.abort();
}
}
}5. Async Traits with async-trait
// Cargo.toml
// [dependencies]
// async-trait = "0.1"
use async_trait::async_trait;
#[async_trait]
trait Repository {
async fn find_by_id(&self, id: u64) -> Option<User>;
async fn save(&self, user: User) -> Result<(), Error>;
}
struct UserRepository;
#[async_trait]
impl Repository for UserRepository {
async fn find_by_id(&self, id: u64) -> Option<User> {
// Async database query
sleep(Duration::from_millis(10)).await;
Some(User { id, name: "Alice".to_string() })
}
async fn save(&self, user: User) -> Result<(), Error> {
// Async database insert
sleep(Duration::from_millis(10)).await;
Ok(())
}
}How It Works
Async Runtime Model
Rust’s async model is based on cooperative multitasking:
- Future Trait: Async functions return types implementing
Future<Output = T> - Poll-based Execution: Runtime repeatedly polls futures until they complete
- Waker System: Futures register wakers to notify runtime when ready to make progress
- Zero Cost: No allocation or overhead when futures aren’t active
- Executor Scheduling: Runtime manages task queue and schedules ready futures
Tokio Runtime Architecture
Tokio provides multi-threaded work-stealing runtime:
Components:
- Thread Pool: Worker threads (default: number of CPU cores)
- Work Stealing: Idle threads steal tasks from busy threads for load balancing
- Task Queue: Each thread has local queue, plus global queue for overflow
- Reactor: Event loop monitoring I/O events (epoll/kqueue/IOCP)
- Timer Wheel: Efficient scheduling for sleep/timeout operations
Execution Flow:
spawn task → add to queue → worker picks task → poll future
↓
future returns Poll::Pending
↓
register waker → park task
↓
I/O ready → wake task → poll again
↓
future returns Poll::Ready(value)async/await Syntax Transformation
The async keyword transforms code into state machines:
Source code:
async fn example() -> u32 {
let x = fetch_data().await;
let y = process(x).await;
y + 1
}Compiler transformation (conceptual):
// State machine with states for each .await point
enum ExampleFuture {
Initial,
AwaitingFetch(FetchFuture),
AwaitingProcess(u32, ProcessFuture),
Done,
}
impl Future for ExampleFuture {
type Output = u32;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<u32> {
// Poll state machine, transition states at .await points
}
}Channel Communication Patterns
Tokio channels provide async message passing:
mpsc (multi-producer, single-consumer):
- Multiple senders, one receiver
- Bounded (fixed capacity) or unbounded
- Backpressure via
send().awaitblocking when full
oneshot:
- Single-use channel for one message
- Useful for request-response patterns
broadcast:
- Multiple receivers get copies of every message
- Useful for event distribution
watch:
- Single value that can be observed by multiple receivers
- Only latest value available (lossy)
Future Combinators
Combinators compose multiple futures:
join!- Wait for all futures (parallel execution)select!- Wait for first future to completetry_join!- Like join! but short-circuits on first errorjoin_all- Join dynamic collection of futuresrace- Like select! but only returns first result
Variations
1. Different Async Runtimes
While Tokio is most common, alternatives exist:
async-std - Standard library style API:
// Cargo.toml
// [dependencies]
// async-std = { version = "1.12", features = ["attributes"] }
#[async_std::main]
async fn main() {
let result = async_std::task::spawn(async {
async_std::task::sleep(Duration::from_secs(1)).await;
"Done"
}).await;
println!("{}", result);
}smol - Lightweight runtime:
// Cargo.toml
// [dependencies]
// smol = "2.0"
fn main() {
smol::block_on(async {
smol::Timer::after(Duration::from_secs(1)).await;
println!("Done");
});
}Trade-offs: Tokio has largest ecosystem. async-std prioritizes API consistency. smol is minimal and embeddable.
2. Structured Concurrency with tokio::task::JoinSet
Manage dynamic sets of tasks:
use tokio::task::JoinSet;
async fn process_batch(items: Vec<String>) -> Vec<String> {
let mut set = JoinSet::new();
for item in items {
set.spawn(async move {
// Process item
sleep(Duration::from_millis(100)).await;
item.to_uppercase()
});
}
let mut results = Vec::new();
while let Some(res) = set.join_next().await {
if let Ok(value) = res {
results.push(value);
}
}
results
}Trade-offs: Better task lifecycle management but requires runtime allocation.
3. Async Mutex and Synchronization
Use async-aware synchronization primitives:
use tokio::sync::{Mutex, RwLock, Semaphore};
// Async Mutex - doesn't block executor thread
let mutex = Arc::new(Mutex::new(vec![]));
let mut guard = mutex.lock().await; // Yields instead of blocking
guard.push(1);
// RwLock for multiple readers
let rwlock = Arc::new(RwLock::new(HashMap::new()));
let read = rwlock.read().await; // Multiple readers allowed
let write = rwlock.write().await; // Exclusive write access
// Semaphore for rate limiting
let semaphore = Arc::new(Semaphore::new(10)); // Max 10 concurrent
let permit = semaphore.acquire().await.unwrap();
// Do work...
drop(permit); // Release
Trade-offs: Async mutexes have overhead. Use only when lock held across await points.
4. Stream Processing Patterns
Process infinite or large data streams:
use futures::stream::{self, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
async fn stream_processing() {
let (tx, rx) = mpsc::channel(100);
let stream = ReceiverStream::new(rx);
// Parallel processing with buffered
stream
.map(|item| async move {
sleep(Duration::from_millis(100)).await;
item * 2
})
.buffer_unordered(10) // Process up to 10 items concurrently
.for_each(|result| async move {
println!("Result: {}", result);
})
.await;
}Trade-offs: Powerful for data pipelines but adds complexity.
5. Actor Pattern with Tokio
Implement actor model for state management:
use tokio::sync::mpsc;
struct Actor {
receiver: mpsc::Receiver<ActorMessage>,
state: i32,
}
enum ActorMessage {
Increment,
Decrement,
GetValue(oneshot::Sender<i32>),
}
impl Actor {
fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self {
Actor { receiver, state: 0 }
}
async fn run(mut self) {
while let Some(msg) = self.receiver.recv().await {
match msg {
ActorMessage::Increment => self.state += 1,
ActorMessage::Decrement => self.state -= 1,
ActorMessage::GetValue(respond) => {
let _ = respond.send(self.state);
}
}
}
}
}
// Usage
async fn use_actor() {
let (tx, rx) = mpsc::channel(32);
tokio::spawn(Actor::new(rx).run());
tx.send(ActorMessage::Increment).await.unwrap();
let (respond_tx, respond_rx) = oneshot::channel();
tx.send(ActorMessage::GetValue(respond_tx)).await.unwrap();
let value = respond_rx.await.unwrap();
}Trade-offs: Isolates mutable state but adds message passing overhead.
6. Async Iterators (Experimental)
Work with async iteration (requires nightly):
#![feature(async_iterator)]
use futures::stream::{self, StreamExt};
async fn async_iteration() {
let mut stream = stream::iter(0..10);
while let Some(item) = stream.next().await {
println!("Item: {}", item);
}
}Trade-offs: Ergonomic but currently unstable.
Common Pitfalls
1. Blocking the Async Runtime
Problem: Using blocking operations in async context starves the runtime:
// Bad: Blocks the executor thread
async fn bad_example() {
std::thread::sleep(Duration::from_secs(1)); // Blocks entire thread!
println!("Done");
}Solution: Use async sleep or spawn_blocking for CPU-bound work:
// Good: Async sleep doesn't block
async fn good_example() {
tokio::time::sleep(Duration::from_secs(1)).await; // Yields to executor
println!("Done");
}
// For blocking operations, use spawn_blocking
async fn with_blocking_work() {
let result = tokio::task::spawn_blocking(|| {
// CPU-intensive work or blocking I/O
std::thread::sleep(Duration::from_secs(1));
42
}).await.unwrap();
println!("Result: {}", result);
}2. Not Awaiting Futures
Problem: Creating futures without awaiting them:
// Bad: Future never executes
async fn bad_example() {
fetch_data(); // Does nothing! Future not awaited
println!("Done");
}Solution: Always await futures or spawn them:
// Good: Await the future
async fn good_example() {
fetch_data().await; // Future executes
println!("Done");
}
// Or spawn as a task
async fn spawn_example() {
tokio::spawn(fetch_data()); // Runs concurrently
println!("Started task");
}3. Holding Locks Across Await Points
Problem: Using std::sync locks in async code:
// Bad: Blocks executor if lock is held during await
use std::sync::Mutex;
async fn bad_lock_usage(mutex: Arc<Mutex<Vec<i32>>>) {
let mut data = mutex.lock().unwrap(); // Blocks thread!
data.push(1);
fetch_data().await; // Lock held during await - deadlock risk!
data.push(2);
}Solution: Use tokio::sync::Mutex or minimize lock scope:
// Good: Use async Mutex
use tokio::sync::Mutex;
async fn good_lock_usage(mutex: Arc<Mutex<Vec<i32>>>) {
let mut data = mutex.lock().await; // Async lock
data.push(1);
fetch_data().await; // Safe to await
data.push(2);
}
// Better: Minimize lock scope
use std::sync::Mutex;
async fn minimal_lock_scope(mutex: Arc<Mutex<Vec<i32>>>) {
{
let mut data = mutex.lock().unwrap();
data.push(1);
} // Lock released before await
fetch_data().await; // No lock held
{
let mut data = mutex.lock().unwrap();
data.push(2);
}
}4. Task Leakage
Problem: Spawning tasks without tracking completion:
// Bad: Tasks keep running even after function returns
async fn bad_spawn() {
for i in 0..10 {
tokio::spawn(async move {
loop {
println!("Task {}", i);
sleep(Duration::from_secs(1)).await;
}
});
}
// Function returns but tasks continue indefinitely!
}Solution: Use JoinSet or JoinHandle to track tasks:
// Good: Wait for tasks to complete
async fn good_spawn() {
let mut handles = Vec::new();
for i in 0..10 {
let handle = tokio::spawn(async move {
for _ in 0..5 {
println!("Task {}", i);
sleep(Duration::from_secs(1)).await;
}
});
handles.push(handle);
}
// Wait for all tasks
for handle in handles {
handle.await.unwrap();
}
}5. Unhandled Panics in Spawned Tasks
Problem: Panics in spawned tasks are silent:
// Bad: Panic is swallowed
async fn bad_error_handling() {
tokio::spawn(async {
panic!("This panic is lost!"); // No one will know
});
}Solution: Handle JoinHandle results:
// Good: Detect and handle panics
async fn good_error_handling() {
let handle = tokio::spawn(async {
panic!("This panic is detected!");
});
match handle.await {
Ok(_) => println!("Task completed"),
Err(e) => {
if e.is_panic() {
println!("Task panicked: {:?}", e);
}
}
}
}6. Unbounded Channel Backpressure
Problem: Unbounded channels can consume unlimited memory:
// Bad: Can cause OOM if producer is faster than consumer
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
loop {
tx.send(vec![0u8; 1024]).unwrap(); // Allocates forever!
}
});Solution: Use bounded channels for backpressure:
// Good: Bounded channel provides backpressure
let (tx, mut rx) = mpsc::channel(100); // Max 100 pending messages
tokio::spawn(async move {
loop {
// send().await blocks when channel is full
tx.send(vec![0u8; 1024]).await.unwrap();
}
});Related Patterns
Related Patterns: See Concurrent Programming for thread-based patterns, Error Handling Strategies for async error handling, Testing Patterns for testing async code.
Tutorial: See Beginner Tutorial for async basics.
Cookbook: See Cookbook Recipe 52 - Async HTTP Server, Recipe 53 - WebSocket Server for async examples.