Coroutines Advanced
Problem
Modern applications require sophisticated asynchronous patterns beyond basic coroutines. Complex scenarios need reactive streams, backpressure handling, structured concurrency, and proper cancellation propagation.
Solution
1. StateFlow and SharedFlow for Reactive State
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
class UserViewModel {
// StateFlow - Always has value, replays latest to new collectors
private val _userState = MutableStateFlow<UserState>(UserState.NotLoggedIn)
val userState: StateFlow<UserState> = _userState.asStateFlow()
// SharedFlow - Can have no initial value, configurable replay
private val _events = MutableSharedFlow<UserEvent>(
replay = 1, // Replay last event to new collectors
extraBufferCapacity = 10 // Buffer up to 10 events
)
val events: SharedFlow<UserEvent> = _events.asSharedFlow()
sealed class UserState {
object NotLoggedIn : UserState()
data class LoggedIn(val username: String) : UserState()
data class Loading(val message: String) : UserState()
}
sealed class UserEvent {
data class ShowMessage(val message: String) : UserEvent()
object LogoutSuccess : UserEvent()
}
suspend fun login(username: String, password: String) {
_userState.value = UserState.Loading("Logging in...")
delay(1000) // Simulate API call
if (password == "correct") {
_userState.value = UserState.LoggedIn(username)
_events.emit(UserEvent.ShowMessage("Welcome, $username!"))
} else {
_userState.value = UserState.NotLoggedIn
_events.emit(UserEvent.ShowMessage("Invalid credentials"))
}
}
suspend fun logout() {
_userState.value = UserState.NotLoggedIn
_events.emit(UserEvent.LogoutSuccess)
}
}
// Usage in UI (Android)
class UserActivity : AppCompatActivity() {
private val viewModel = UserViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// Collect StateFlow (automatically cancels on lifecycle destroy)
lifecycleScope.launch {
viewModel.userState.collect { state ->
when (state) {
is UserState.NotLoggedIn -> showLoginForm()
is UserState.LoggedIn -> showUserProfile(state.username)
is UserState.Loading -> showLoadingIndicator(state.message)
}
}
}
// Collect events (one-time actions)
lifecycleScope.launch {
viewModel.events.collect { event ->
when (event) {
is UserEvent.ShowMessage -> showToast(event.message)
is UserEvent.LogoutSuccess -> navigateToLogin()
}
}
}
}
}2. Flow Operators and Transformation
import kotlinx.coroutines.flow.*
class DataRepository {
// Cold flow - starts emitting when collected
fun fetchUserUpdates(userId: String): Flow<User> = flow {
while (true) {
val user = api.getUser(userId) // API call
emit(user)
delay(5000) // Poll every 5 seconds
}
}
// Transform flows with operators
fun getActiveUsers(): Flow<List<User>> = flow {
emit(api.getAllUsers())
}
.map { users -> users.filter { it.isActive } } // Filter
.map { users -> users.sortedBy { it.lastSeen } } // Sort
.catch { e -> emit(emptyList()) } // Error handling
.onEach { users -> println("Emitting ${users.size} active users") } // Side effects
// Combine multiple flows
fun getUserProfile(userId: String): Flow<UserProfile> {
val userFlow = fetchUser(userId)
val postsFlow = fetchUserPosts(userId)
val friendsFlow = fetchUserFriends(userId)
return combine(userFlow, postsFlow, friendsFlow) { user, posts, friends ->
UserProfile(
user = user,
posts = posts,
friends = friends
)
}
}
// FlatMap variations
fun searchUsers(query: String): Flow<List<User>> {
return flowOf(query)
.debounce(300) // Wait 300ms after last emission
.filter { it.length >= 3 } // Only search if 3+ chars
.distinctUntilChanged() // Ignore duplicate queries
.flatMapLatest { searchQuery -> // Cancel previous search
flow {
val results = api.search(searchQuery)
emit(results)
}
}
}
// Retry logic
fun fetchWithRetry(userId: String): Flow<User> = flow {
emit(api.getUser(userId))
}
.retry(3) { cause ->
cause is IOException // Retry only on network errors
}
.retryWhen { cause, attempt ->
if (attempt < 3 && cause is IOException) {
delay(1000 * (attempt + 1)) // Exponential backoff
true
} else {
false
}
}
}3. Channels for Producer-Consumer Patterns
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.*
class TaskProcessor {
// Channel - Hot stream for communication between coroutines
private val taskChannel = Channel<Task>(capacity = Channel.BUFFERED)
data class Task(val id: Int, val data: String)
// Producer
suspend fun submitTask(task: Task) {
taskChannel.send(task)
println("Task ${task.id} submitted")
}
// Consumer
fun startProcessing(scope: CoroutineScope) {
scope.launch {
for (task in taskChannel) { // Iterate until channel is closed
processTask(task)
}
println("Processing completed")
}
}
private suspend fun processTask(task: Task) {
delay(1000) // Simulate work
println("Processed task ${task.id}: ${task.data}")
}
fun shutdown() {
taskChannel.close()
}
}
// Usage
suspend fun main() = coroutineScope {
val processor = TaskProcessor()
// Start consumer
processor.startProcessing(this)
// Submit tasks from multiple producers
launch {
repeat(5) { i ->
processor.submitTask(Task(i, "Data $i"))
delay(500)
}
}
launch {
repeat(5) { i ->
processor.submitTask(Task(i + 100, "Data ${i + 100}"))
delay(700)
}
}
delay(10000)
processor.shutdown()
}
// Broadcast channel for multiple consumers
class EventBroadcaster {
private val _events = MutableSharedFlow<Event>(
replay = 0,
extraBufferCapacity = 64,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val events: SharedFlow<Event> = _events.asSharedFlow()
sealed class Event {
data class UserLoggedIn(val userId: String) : Event()
data class MessageReceived(val message: String) : Event()
}
suspend fun broadcast(event: Event) {
_events.emit(event)
}
// Multiple consumers can collect independently
fun subscribeToEvents(scope: CoroutineScope, handler: suspend (Event) -> Unit) {
scope.launch {
events.collect { event ->
handler(event)
}
}
}
}4. Structured Concurrency and supervisorScope
import kotlinx.coroutines.*
class DataSyncService {
// Regular coroutineScope - child failure cancels all siblings
suspend fun syncAllData(): Result<Unit> = coroutineScope {
try {
val userData = async { fetchUserData() } // If this fails...
val settingsData = async { fetchSettings() } // ...this cancels too
val preferencesData = async { fetchPreferences() }
// All must succeed
Result.success(Unit)
} catch (e: Exception) {
Result.failure(e)
}
}
// supervisorScope - child failure doesn't affect siblings
suspend fun syncDataIndependently(): SyncResult = supervisorScope {
val userDeferred = async { fetchUserData() }
val settingsDeferred = async { fetchSettings() }
val preferencesDeferred = async { fetchPreferences() }
// Each can fail independently
SyncResult(
user = runCatching { userDeferred.await() },
settings = runCatching { settingsDeferred.await() },
preferences = runCatching { preferencesDeferred.await() }
)
}
data class SyncResult(
val user: Result<UserData>,
val settings: Result<Settings>,
val preferences: Result<Preferences>
)
// Structured concurrency with timeout
suspend fun fetchWithTimeout(userId: String): User? {
return try {
withTimeout(5000) { // 5-second timeout
fetchUser(userId)
}
} catch (e: TimeoutCancellationException) {
println("Request timed out")
null
}
}
// Parallel decomposition with structured concurrency
suspend fun processLargeDataset(items: List<Item>): List<Result> = coroutineScope {
items.chunked(100).map { chunk ->
async {
processChunk(chunk)
}
}.awaitAll()
}
}5. Cancellation and Exception Handling
import kotlinx.coroutines.*
class CancellationHandler {
// Cooperative cancellation
suspend fun longRunningTask() {
try {
repeat(1000) { i ->
// Check cancellation periodically
ensureActive() // Throws CancellationException if cancelled
// Or use isActive
if (!isActive) {
println("Task cancelled at step $i")
return
}
performWork(i)
delay(100) // delay() also checks cancellation
}
} catch (e: CancellationException) {
// Cleanup on cancellation
println("Cleaning up resources")
throw e // Must re-throw CancellationException
}
}
// NonCancellable context for cleanup
suspend fun taskWithCleanup() {
try {
performTask()
} finally {
// Ensure cleanup even if cancelled
withContext(NonCancellable) {
cleanup() // This will complete even if parent is cancelled
}
}
}
// CoroutineExceptionHandler for uncaught exceptions
private val exceptionHandler = CoroutineExceptionHandler { _, exception ->
println("Caught exception: ${exception.message}")
}
fun startWithExceptionHandling(scope: CoroutineScope) {
scope.launch(exceptionHandler) {
// Exceptions in this coroutine will be caught by handler
throw RuntimeException("Something went wrong")
}
}
// Supervisor job for independent child failures
class BackgroundService {
private val supervisorJob = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.Default + supervisorJob)
fun startTasks() {
// Task 1 failure won't cancel Task 2
scope.launch {
try {
failingTask()
} catch (e: Exception) {
println("Task 1 failed: ${e.message}")
}
}
scope.launch {
successfulTask()
}
}
fun shutdown() {
supervisorJob.cancel()
}
}
}6. Testing Coroutines
import kotlinx.coroutines.test.*
import kotlin.test.*
class CoroutineTest {
// Use TestDispatcher for controlled time
@Test
fun testWithVirtualTime() = runTest {
val viewModel = UserViewModel()
// Advance virtual time
advanceTimeBy(1000)
// Trigger action
viewModel.login("user", "password")
// Advance past delay in login
advanceUntilIdle()
// Assert state
assertEquals(
UserViewModel.UserState.LoggedIn("user"),
viewModel.userState.value
)
}
// Test flow collection
@Test
fun testFlowEmissions() = runTest {
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
val results = mutableListOf<Int>()
val job = launch {
flow.collect { results.add(it) }
}
advanceTimeBy(1500)
assertEquals(listOf(1, 2), results)
advanceUntilIdle()
assertEquals(listOf(1, 2, 3), results)
job.cancel()
}
// Test cancellation
@Test
fun testCancellation() = runTest {
var cleanupCalled = false
val job = launch {
try {
delay(Long.MAX_VALUE)
} finally {
cleanupCalled = true
}
}
job.cancel()
job.join()
assertTrue(cleanupCalled)
}
}How It Works
%% Color Palette: Blue #0173B2, Orange #DE8F05, Teal #029E73, Purple #CC78BC
graph TD
A[Launch Coroutine] --> B{Structured Scope}
B -->|coroutineScope| C[Child Failure Cancels All]
B -->|supervisorScope| D[Independent Children]
C --> E{Check Cancellation}
D --> E
E -->|isActive| F[Continue Execution]
E -->|Cancelled| G[Throw CancellationException]
F --> H[Complete or Fail]
G --> I[Cleanup in Finally]
style A fill:#0173B2
style B fill:#DE8F05
style E fill:#029E73
style I fill:#CC78BC
Coroutine Lifecycle:
- Launch: Coroutine starts in structured scope
- Execution: Code runs cooperatively, checking cancellation
- Suspension: Suspend functions yield to other coroutines
- Cancellation: Parent cancellation propagates to children
- Completion: Result or exception propagates to parent
Variations
Custom Flow Operators
fun <T> Flow<T>.throttleFirst(periodMillis: Long): Flow<T> = flow {
var lastEmissionTime = 0L
collect { value ->
val currentTime = System.currentTimeMillis()
if (currentTime - lastEmissionTime >= periodMillis) {
lastEmissionTime = currentTime
emit(value)
}
}
}
// Usage
searchQueryFlow
.throttleFirst(1000) // Emit at most once per second
.collect { query -> performSearch(query) }Conflated Flow for Latest Value
class LocationTracker {
private val _location = MutableSharedFlow<Location>(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
// Conflate emissions - only latest matters
val location: Flow<Location> = _location.conflate()
suspend fun updateLocation(loc: Location) {
_location.emit(loc)
}
}Common Pitfalls
1. Not Handling Cancellation
Problem: Ignoring cancellation leads to resource leaks.
// ❌ Bad: Blocking operation doesn't check cancellation
suspend fun processData() {
while (true) { // Infinite loop!
processNextItem()
Thread.sleep(1000) // Doesn't check cancellation!
}
}
// ✅ Good: Check cancellation
suspend fun processData() {
while (isActive) { // Check cancellation
processNextItem()
delay(1000) // Cancellable delay
}
}2. Collecting Flows in ViewModel
Problem: Cold flows restart on each collection.
// ❌ Bad: Flow restarts on config change
fun observeData() {
viewModelScope.launch {
repository.getData().collect { // New flow on each call!
_state.value = it
}
}
}
// ✅ Good: Use StateFlow to cache latest value
val data: StateFlow<Data> = repository.getData()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = Data.Empty
)3. Using GlobalScope
Problem: Leaks coroutines, no structured concurrency.
// ❌ Bad: GlobalScope has no parent, runs forever
fun fetchData() {
GlobalScope.launch {
api.getData() // Never cancelled!
}
}
// ✅ Good: Use appropriate scope
class MyService(private val scope: CoroutineScope) {
fun fetchData() {
scope.launch {
api.getData() // Cancelled when scope is cancelled
}
}
}4. Not Using SupervisorJob for Independent Tasks
Problem: One task failure cancels all siblings.
// ❌ Bad: One failure cancels all
coroutineScope {
launch { task1() } // If this fails...
launch { task2() } // ...this gets cancelled
}
// ✅ Good: Use supervisorScope
supervisorScope {
launch { task1() } // Failure doesn't affect task2
launch { task2() }
}5. Blocking Main Thread with runBlocking
Problem: Freezes UI.
// ❌ Bad: Blocks main thread
fun onClick() {
runBlocking { // UI frozen!
delay(1000)
}
}
// ✅ Good: Launch in appropriate scope
fun onClick() {
lifecycleScope.launch {
delay(1000)
}
}Related Patterns
Related Tutorial: See Advanced Tutorial - Coroutines. Related How-To: See Handle Coroutines and Async, Flow State Management. Related Cookbook: See Cookbook recipe “Coroutine Patterns”.
Last updated