You typically combine Kotlin collections, coroutines, and Flow by using:
- collections for in-memory data
- coroutines for concurrency / async work
- Flow for asynchronous streams of values
Basic idea
If you have a collection:
val ids = listOf(1, 2, 3, 4, 5)
You can turn it into a Flow:
val idFlow = ids.asFlow()
Then process each item asynchronously using Flow operators:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val ids = listOf(1, 2, 3, 4, 5)
ids.asFlow()
.map { id ->
fetchUser(id)
}
.collect { user ->
println(user)
}
}
suspend fun fetchUser(id: Int): String {
delay(500)
return "User $id"
}
Here:
asFlow()converts the collection into aFlowmap { }applies a suspending transformationcollect { }starts the flow and consumes results
Sequential asynchronous processing
By default, Flow processes elements sequentially:
ids.asFlow()
.map { id ->
fetchUser(id)
}
.collect { user ->
println(user)
}
Even though fetchUser is suspending, each item is processed one after another.
Concurrent processing with flatMapMerge
If you want to process multiple items concurrently, use flatMapMerge:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val ids = listOf(1, 2, 3, 4, 5)
ids.asFlow()
.flatMapMerge(concurrency = 3) { id ->
flow {
emit(fetchUser(id))
}
}
.collect { user ->
println(user)
}
}
suspend fun fetchUser(id: Int): String {
delay(500)
return "User $id"
}
This allows up to 3 items to be processed at the same time.
Note:
flatMapMergemay emit results out of the original order.
Keeping order while doing concurrent work
If you need concurrency but want results in the original order, you can use async with a collection:
import kotlinx.coroutines.*
fun main() = runBlocking {
val ids = listOf(1, 2, 3, 4, 5)
val users = ids.map { id ->
async {
fetchUser(id)
}
}.awaitAll()
println(users)
}
suspend fun fetchUser(id: Int): String {
delay(500)
return "User $id"
}
awaitAll() returns results in the same order as the original list.
Filtering and transforming Flow values
You can use familiar collection-like operators:
ids.asFlow()
.filter { id ->
id % 2 == 0
}
.map { id ->
fetchUser(id)
}
.collect { user ->
println(user)
}
This is similar to collection processing, but it supports suspending operations.
Collecting a Flow back into a collection
If you need a List again:
val users: List<String> = ids.asFlow()
.map { id -> fetchUser(id) }
.toList()
Because toList() collects the flow, it must be called from a coroutine or suspend function.
Using flowOn for background work
You can move upstream processing to a dispatcher:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val ids = listOf(1, 2, 3, 4, 5)
ids.asFlow()
.map { id ->
fetchUser(id)
}
.flowOn(Dispatchers.IO)
.collect { user ->
println(user)
}
}
This is useful for I/O-bound work such as network or database calls.
Handling errors
Use catch to handle exceptions from upstream operators:
ids.asFlow()
.map { id ->
fetchUser(id)
}
.catch { error ->
emit("Fallback user because of: ${error.message}")
}
.collect { user ->
println(user)
}
Example: process URLs asynchronously
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val urls = listOf(
"https://example.com/a",
"https://example.com/b",
"https://example.com/c"
)
val results = urls.asFlow()
.flatMapMerge(concurrency = 2) { url ->
flow {
val content = download(url)
emit(url to content.length)
}
}
.toList()
println(results)
}
suspend fun download(url: String): String {
delay(1_000)
return "Content from $url"
}
When to use what
| Need | Use |
|---|---|
| Simple in-memory transformation | Collection operators: map, filter |
| Suspending work per item, sequential | asFlow().map { suspendCall() } |
| Suspending work per item, concurrent | flatMapMerge |
| Concurrent work while preserving order | map { async { ... } }.awaitAll() |
| Continuous stream of values | Flow |
| Convert Flow back to List | toList() |
In short:
val results = items.asFlow()
.filter { shouldProcess(it) }
.flatMapMerge(concurrency = 4) { item ->
flow {
emit(processAsync(item))
}
}
.toList()
That pattern is a good starting point for asynchronous collection processing with Kotlin coroutines and flows.
