Async Patterns Advanced
Problem
Modern Python applications require efficient concurrent I/O operations. Simple sequential code blocks on network/disk operations, wasting CPU cycles and degrading user experience.
Solution
1. Advanced asyncio Patterns
import asyncio
from typing import List, Any
class AsyncDatabaseConnection:
async def __aenter__(self):
self.conn = await self._connect()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
async def _connect(self):
# Simulate connection
await asyncio.sleep(0.1)
return {"status": "connected"}
async def fetch_data():
async with AsyncDatabaseConnection() as conn:
# Connection automatically closed after block
return await conn.get("SELECT * FROM users")2. Concurrent Request Handling with aiohttp
import aiohttp
import asyncio
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""Fetch single URL with timeout and error handling."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
return {
"url": url,
"status": response.status,
"data": await response.json()
}
except asyncio.TimeoutError:
return {"url": url, "error": "timeout"}
except Exception as e:
return {"url": url, "error": str(e)}
async def fetch_multiple_urls(urls: List[str]) -> List[Dict]:
"""Fetch multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3"
]
results = asyncio.run(fetch_multiple_urls(urls))3. Async Generators for Streaming
import asyncio
from typing import AsyncIterator
async def fetch_paginated_data(page_size: int = 100) -> AsyncIterator[Dict]:
"""Stream paginated API results without loading all into memory."""
page = 1
while True:
# Simulate API call
await asyncio.sleep(0.1)
data = await fetch_page(page, page_size)
if not data:
break
for item in data:
yield item
page += 1
async def fetch_page(page: int, size: int) -> List[Dict]:
"""Simulate fetching a page of data."""
# Simulated data - would be actual API call
if page > 3:
return []
return [{"id": i, "page": page} for i in range(size)]
async def process_all_items():
async for item in fetch_paginated_data(page_size=50):
await process_item(item)
async def process_item(item: Dict):
# Process individual item
print(f"Processing {item['id']}")4. Task Groups for Structured Concurrency
import asyncio
from typing import List
async def download_file(url: str, filename: str):
"""Download a single file."""
await asyncio.sleep(1) # Simulate download
print(f"Downloaded {filename}")
return filename
async def download_files_structured(urls: List[str]):
"""Use task groups for automatic cancellation on error."""
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(download_file(url, f"file_{i}.dat"))
for i, url in enumerate(urls)
]
# All tasks complete or all cancelled if one fails
return [task.result() for task in tasks]
urls = ["http://example.com/file1", "http://example.com/file2"]
try:
results = asyncio.run(download_files_structured(urls))
except* Exception as eg:
# Handle exception group
for exc in eg.exceptions:
print(f"Task failed: {exc}")5. Rate Limiting with Semaphores
import asyncio
from typing import List, Callable, Any
class RateLimiter:
"""Rate limiter using semaphore and delays."""
def __init__(self, max_concurrent: int, requests_per_second: float):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.delay = 1.0 / requests_per_second
self.last_request = 0
async def acquire(self):
"""Acquire rate limit permission."""
async with self.semaphore:
# Ensure minimum delay between requests
now = asyncio.get_event_loop().time()
time_since_last = now - self.last_request
if time_since_last < self.delay:
await asyncio.sleep(self.delay - time_since_last)
self.last_request = asyncio.get_event_loop().time()
yield
async def rate_limited_fetch(
limiter: RateLimiter,
url: str
) -> Dict:
"""Fetch URL with rate limiting."""
async with limiter.acquire():
# Actual request happens here
await asyncio.sleep(0.1) # Simulate request
return {"url": url, "status": "success"}
async def fetch_with_limits():
limiter = RateLimiter(max_concurrent=5, requests_per_second=10)
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
tasks = [rate_limited_fetch(limiter, url) for url in urls]
return await asyncio.gather(*tasks)6. Background Task Management
import asyncio
from typing import Set
class BackgroundTasks:
"""Manage background tasks with graceful shutdown."""
def __init__(self):
self.tasks: Set[asyncio.Task] = set()
def create_task(self, coro) -> asyncio.Task:
"""Create and track background task."""
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
return task
async def shutdown(self):
"""Cancel all tasks and wait for completion."""
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
async def background_worker(name: str):
"""Long-running background task."""
try:
while True:
print(f"{name} working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"{name} cancelled")
raise
async def main():
bg_tasks = BackgroundTasks()
# Start background workers
bg_tasks.create_task(background_worker("Worker-1"))
bg_tasks.create_task(background_worker("Worker-2"))
# Do main work
await asyncio.sleep(5)
# Graceful shutdown
await bg_tasks.shutdown()
asyncio.run(main())How It Works
graph TD
A[Async Function Called] --> B{I/O Operation?}
B -->|Yes| C[await keyword]
C --> D[Suspend Coroutine]
D --> E[Event Loop Schedules Next Task]
E --> F[I/O Completes]
F --> G[Resume Coroutine]
G --> H[Continue Execution]
B -->|No| H
style A fill:#0173B2
style C fill:#DE8F05
style E fill:#029E73
style H fill:#CC78BC
Key Mechanisms:
- Event Loop: Single-threaded event loop manages all async operations
- Coroutines: Functions defined with
async defthat can be suspended withawait - Tasks: Wrapped coroutines scheduled for execution by the event loop
- Futures: Placeholder objects representing results of async operations
Execution Flow:
result = await some_async_function()Variations
Async Iterator for Database Cursor
class AsyncCursor:
"""Async iterator for large database results."""
def __init__(self, query: str):
self.query = query
self.position = 0
self.batch_size = 100
def __aiter__(self):
return self
async def __anext__(self):
# Fetch next batch
batch = await self._fetch_batch(self.position, self.batch_size)
if not batch:
raise StopAsyncIteration
self.position += len(batch)
return batch
async def _fetch_batch(self, offset: int, limit: int):
await asyncio.sleep(0.1) # Simulate query
# Return empty after 5 batches
if offset >= 500:
return []
return [{"id": i} for i in range(offset, offset + limit)]
async def process_large_resultset():
cursor = AsyncCursor("SELECT * FROM large_table")
async for batch in cursor:
for row in batch:
await process_row(row)Timeout Handling
import asyncio
async def fetch_with_timeout(url: str, timeout: float = 5.0):
"""Fetch with timeout protection."""
try:
return await asyncio.wait_for(
fetch_data(url),
timeout=timeout
)
except asyncio.TimeoutError:
return {"error": "Request timed out", "url": url}
async def fetch_data(url: str):
# Simulate slow request
await asyncio.sleep(10)
return {"data": "result"}
result = await fetch_with_timeout("https://slow-api.com", timeout=3.0)Retry Logic with Exponential Backoff
import asyncio
from typing import Callable, TypeVar, Any
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[..., T],
max_retries: int = 3,
base_delay: float = 1.0,
*args,
**kwargs
) -> T:
"""Retry async function with exponential backoff."""
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
await asyncio.sleep(delay)
async def unreliable_api_call():
# Simulated flaky API
import random
if random.random() < 0.7:
raise Exception("API Error")
return {"status": "success"}
result = await retry_with_backoff(unreliable_api_call, max_retries=5)Common Pitfalls
1. Blocking the Event Loop
Problem: Calling blocking I/O in async functions freezes the event loop.
import asyncio
import time
async def bad_sleep():
time.sleep(5) # Blocks entire event loop!
return "done"
async def good_sleep():
await asyncio.sleep(5) # Suspends coroutine only
return "done"Solution: Use async alternatives or run blocking code in executor:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_operation():
"""CPU-intensive or blocking I/O."""
time.sleep(5)
return "result"
async def run_blocking_safely():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor()
# Run in thread pool to avoid blocking event loop
result = await loop.run_in_executor(executor, blocking_operation)
return result2. Not Awaiting Coroutines
Problem: Forgetting await creates unawaited coroutine warnings.
async def fetch_data():
return {"data": "value"}
async def main():
result = fetch_data() # Warning: coroutine was never awaited
print(result) # Prints: <coroutine object fetch_data>
async def main():
result = await fetch_data() # Properly awaited
print(result) # Prints: {'data': 'value'}3. Mixing Sync and Async Code Incorrectly
Problem: Can’t call async functions from sync code without event loop.
def sync_function():
result = await async_function() # SyntaxError!
return result
def sync_function():
return asyncio.run(async_function())
async def async_wrapper():
return await async_function()4. Not Handling Task Cancellation
Problem: Tasks can be cancelled - cleanup code might not run.
async def bad_resource_handling():
resource = await acquire_resource()
await long_operation()
await resource.close() # Never called if cancelled!
async def good_resource_handling():
resource = await acquire_resource()
try:
await long_operation()
finally:
await resource.close() # Always called
async def best_resource_handling():
async with acquire_resource() as resource:
await long_operation()
# Automatic cleanup5. Creating Too Many Concurrent Tasks
Problem: Unlimited concurrency overwhelms system resources.
async def bad_concurrent_requests():
urls = [f"http://api.com/item/{i}" for i in range(10000)]
tasks = [fetch_url(url) for url in urls]
return await asyncio.gather(*tasks)
async def good_concurrent_requests():
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
urls = [f"http://api.com/item/{i}" for i in range(10000)]
async def bounded_fetch(url):
async with semaphore:
return await fetch_url(url)
tasks = [bounded_fetch(url) for url in urls]
return await asyncio.gather(*tasks)Related Patterns
Related Tutorial: See Advanced Tutorial - Async Programming. Related How-To: See Work with Async Databases. Related Cookbook: See Cookbook recipe “Advanced Async Patterns”.