Kotlin Help

Flow operators

Flows can be transformed using operators, in the same way as you would transform collections and sequences. Intermediate operators are applied to an upstream flow and return a downstream flow. These operators are cold. A call to such an operator is not a suspending function itself. It works quickly, returning the definition of a new transformed flow.

The basic operators have familiar names like map and filter. An important difference of these operators from sequences is that blocks of code inside these operators can call suspending functions.

For example, a flow of incoming requests can be mapped to its results with a map operator, even when performing a request is a long-running operation that is implemented by a suspending function:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart suspend fun performRequest(request: Int): String { delay(1000) // imitate long-running asynchronous work return "response $request" } fun main() = runBlocking<Unit> { (1..3).asFlow() // a flow of requests .map { request -> performRequest(request) } .collect { response -> println(response) } } //sampleEnd

It produces the following three lines, each appearing one second after the previous:

response 1 response 2 response 3

Transform operator

Among the flow transformation operators, the most general one is called transform. It can be used to imitate simple transformations like map and filter, as well as implement more complex transformations. Using the transform operator, we can emit arbitrary values an arbitrary number of times.

For example, using transform we can emit a string before performing a long-running asynchronous request and follow it with a response:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun performRequest(request: Int): String { delay(1000) // imitate long-running asynchronous work return "response $request" } fun main() = runBlocking<Unit> { //sampleStart (1..3).asFlow() // a flow of requests .transform { request -> emit("Making request $request") emit(performRequest(request)) } .collect { response -> println(response) } //sampleEnd }

The output of this code is:

Making request 1 response 1 Making request 2 response 2 Making request 3 response 3

Size-limiting operators

Size-limiting intermediate operators like take cancel the execution of the flow when the corresponding limit is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management functions (like try { ... } finally { ... } blocks) operate normally in case of cancellation:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart fun numbers(): Flow<Int> = flow { try { emit(1) emit(2) println("This line will not execute") emit(3) } finally { println("Finally in numbers") } } fun main() = runBlocking<Unit> { numbers() .take(2) // take only the first two .collect { value -> println(value) } } //sampleEnd

The output of this code clearly shows that the execution of the flow { ... } body in the numbers() function stopped after emitting the second number:

1 2 Finally in numbers

Terminal flow operators

Terminal operators on flows are suspending functions that start a collection of the flow. The collect operator is the most basic one, but there are other terminal operators, which can make it easier:

  • Conversion to various collections like toList and toSet.

  • Operators to get the first value and to ensure that a flow emits a single value.

  • Reducing a flow to a value with reduce and fold.

For example:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { //sampleStart val sum = (1..5).asFlow() .map { it * it } // squares of numbers from 1 to 5 .reduce { a, b -> a + b } // sum them (terminal operator) println(sum) //sampleEnd }

Prints a single number:

55

Buffering

Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when the emission by a simple flow is slow, taking 100 ms to produce an element; and collector is also slow, taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.system.* //sampleStart fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } } fun main() = runBlocking<Unit> { val time = measureTimeMillis { simple().collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") } //sampleEnd

It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):

1 2 3 Collected in 1220 ms

We can use a buffer operator on a flow to run emitting code of the simple flow concurrently with collecting code, as opposed to running them sequentially:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.system.* fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } } fun main() = runBlocking<Unit> { //sampleStart val time = measureTimeMillis { simple() .buffer() // buffer emissions, don't wait .collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") //sampleEnd }

It produces the same numbers just faster, as we have effectively created a processing pipeline, having to only wait 100 ms for the first number and then spending only 300 ms to process each number. This way it takes around 1000 ms to run:

1 2 3 Collected in 1071 ms

Conflation

When a flow represents partial results of the operation or operation status updates, it may not be necessary to process each value, but instead, only most recent ones. In this case, the conflate operator can be used to skip intermediate values when a collector is too slow to process them. Building on the previous example:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.system.* fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } } fun main() = runBlocking<Unit> { //sampleStart val time = measureTimeMillis { simple() .conflate() // conflate emissions, don't process each one .collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") //sampleEnd }

We see that while the first number was still being processed the second, and third were already produced, so the second one was conflated and only the most recent (the third one) was delivered to the collector:

1 3 Collected in 758 ms

Processing the latest value

Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values. The other way is to cancel a slow collector and restart it every time a new value is emitted. There is a family of xxxLatest operators that perform the same essential logic of a xxx operator, but cancel the code in their block on a new value. Let's try changing conflate to collectLatest in the previous example:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.system.* fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } } fun main() = runBlocking<Unit> { //sampleStart val time = measureTimeMillis { simple() .collectLatest { value -> // cancel & restart on the latest value println("Collecting $value") delay(300) // pretend we are processing it for 300 ms println("Done $value") } } println("Collected in $time ms") //sampleEnd }

Since the body of collectLatest takes 300 ms, but new values are emitted every 100 ms, we see that the block is run on every value, but completes only for the last value:

Collecting 1 Collecting 2 Collecting 3 Done 3 Collected in 741 ms

Composing multiple flows

There are lots of ways to compose multiple flows.

Zip

Just like the Sequence.zip extension function in the Kotlin standard library, flows have a zip operator that combines the corresponding values of two flows:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { //sampleStart val nums = (1..3).asFlow() // numbers 1..3 val strs = flowOf("one", "two", "three") // strings nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string .collect { println(it) } // collect and print //sampleEnd }

This example prints:

1 -> one 2 -> two 3 -> three

Combine

When flow represents the most recent value of a variable or operation (see also the related section on conflation), it might be needed to perform a computation that depends on the most recent values of the corresponding flows and to recompute it whenever any of the upstream flows emit a value. The corresponding family of operators is called combine.

For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms, then zipping them using the zip operator will still produce the same result, albeit results that are printed every 400 ms:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { //sampleStart val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip" .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }

However, when using a combine operator here instead of a zip:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { //sampleStart val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine" .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }

We get quite a different output, where a line is printed at each emission from either nums or strs flows:

1 -> one at 452 ms from start 2 -> one at 651 ms from start 2 -> two at 854 ms from start 3 -> two at 952 ms from start 3 -> three at 1256 ms from start

Flattening flows

Flows represent asynchronously received sequences of values, and so it is quite easy to get into a situation where each value triggers a request for another sequence of values. For example, we can have the following function that returns a flow of two strings 500 ms apart:

fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // wait 500 ms emit("$i: Second") }

Now if we have a flow of three integers and call requestFlow on each of them like this:

(1..3).asFlow().map { requestFlow(it) }

Then we will end up with a flow of flows (Flow<Flow<String>>) that needs to be flattened into a single flow for further processing. Collections and sequences have flatten and flatMap operators for this. However, due to the asynchronous nature of flows they call for different modes of flattening, and hence, a family of flattening operators on flows exists.

flatMapConcat

Concatenation of flows of flows is provided by the flatMapConcat and flattenConcat operators. They are the most direct analogues of the corresponding sequence operators. They wait for the inner flow to complete before starting to collect the next one as the following example shows:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // wait 500 ms emit("$i: Second") } fun main() = runBlocking<Unit> { //sampleStart val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms .flatMapConcat { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }

The sequential nature of flatMapConcat is clearly seen in the output:

1: First at 121 ms from start 1: Second at 622 ms from start 2: First at 727 ms from start 2: Second at 1227 ms from start 3: First at 1328 ms from start 3: Second at 1829 ms from start

flatMapMerge

Another flattening operation is to concurrently collect all the incoming flows and merge their values into a single flow so that values are emitted as soon as possible. It is implemented by flatMapMerge and flattenMerge operators. They both accept an optional concurrency parameter that limits the number of concurrent flows that are collected at the same time (it is equal to DEFAULT_CONCURRENCY by default).

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // wait 500 ms emit("$i: Second") } fun main() = runBlocking<Unit> { //sampleStart val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // a number every 100 ms .flatMapMerge { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }

The concurrent nature of flatMapMerge is obvious:

1: First at 136 ms from start 2: First at 231 ms from start 3: First at 333 ms from start 1: Second at 639 ms from start 2: Second at 732 ms from start 3: Second at 833 ms from start

flatMapLatest

In a similar way to the collectLatest operator, that was described in the section "Processing the latest value", there is the corresponding "Latest" flattening mode where the collection of the previous flow is cancelled as soon as new flow is emitted. It is implemented by the flatMapLatest operator.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // wait 500 ms emit("$i: Second") } fun main() = runBlocking<Unit> { //sampleStart val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // a number every 100 ms .flatMapLatest { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //sampleEnd }

The output here in this example is a good demonstration of how flatMapLatest works:

1: First at 142 ms from start 2: First at 322 ms from start 3: First at 425 ms from start 3: Second at 931 ms from start

Flow completion

When flow collection completes (normally or exceptionally) it may need to execute an action. As you may have already noticed, it can be done in two ways: imperative or declarative.

Imperative finally block

In addition to try/catch, a collector can also use a finally block to execute an action upon collect completion.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart fun simple(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { try { simple().collect { value -> println(value) } } finally { println("Done") } } //sampleEnd

This code prints three numbers produced by the simple flow followed by a "Done" string:

1 2 3 Done

Declarative handling

For the declarative approach, flow has onCompletion intermediate operator that is invoked when the flow has completely collected.

The previous example can be rewritten using an onCompletion operator and produces the same output:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun simple(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { //sampleStart simple() .onCompletion { println("Done") } .collect { value -> println(value) } //sampleEnd }

The key advantage of onCompletion is a nullable Throwable parameter of the lambda that can be used to determine whether the flow collection was completed normally or exceptionally. In the following example the simple flow throws an exception after emitting the number 1:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart fun simple(): Flow<Int> = flow { emit(1) throw RuntimeException() } fun main() = runBlocking<Unit> { simple() .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") } .catch { cause -> println("Caught exception") } .collect { value -> println(value) } } //sampleEnd

As you may expect, it prints:

1 Flow completed exceptionally Caught exception

The onCompletion operator, unlike catch, does not handle the exception. As we can see from the above example code, the exception still flows downstream. It will be delivered to further onCompletion operators and can be handled with a catch operator.

Successful completion

Another difference with catch operator is that onCompletion sees all exceptions and receives a null exception only on successful completion of the upstream flow (without cancellation or failure).

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart fun simple(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { simple() .onCompletion { cause -> println("Flow completed with $cause") } .collect { value -> check(value <= 1) { "Collected $value" } println(value) } } //sampleEnd

We can see the completion cause is not null, because the flow was aborted due to downstream exception:

1 Flow completed with java.lang.IllegalStateException: Collected 2 Exception in thread "main" java.lang.IllegalStateException: Collected 2

Launching flow

It is easy to use flows to represent asynchronous events that are coming from some source. In this case, we need an analogue of the addEventListener function that registers a piece of code with a reaction for incoming events and continues further work. The onEach operator can serve this role. However, onEach is an intermediate operator. We also need a terminal operator to collect the flow. Otherwise, just calling onEach has no effect.

If we use the collect terminal operator after onEach, then the code after it will wait until the flow is collected:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart // Imitate a flow of events fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } fun main() = runBlocking<Unit> { events() .onEach { event -> println("Event: $event") } .collect() // <--- Collecting the flow waits println("Done") } //sampleEnd

As you can see, it prints:

Event: 1 Event: 2 Event: 3 Done

The launchIn terminal operator comes in handy here. By replacing collect with launchIn we can launch a collection of the flow in a separate coroutine, so that execution of further code immediately continues:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* // Imitate a flow of events fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } //sampleStart fun main() = runBlocking<Unit> { events() .onEach { event -> println("Event: $event") } .launchIn(this) // <--- Launching the flow in a separate coroutine println("Done") } //sampleEnd

It prints:

Done Event: 1 Event: 2 Event: 3

The required parameter to launchIn must specify a CoroutineScope in which the coroutine to collect the flow is launched. In the above example this scope comes from the runBlocking coroutine builder, so while the flow is running, this runBlocking scope waits for completion of its child coroutine and keeps the main function from returning and terminating this example.

In actual applications a scope will come from an entity with a limited lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling the collection of the corresponding flow. This way the pair of onEach { ... }.launchIn(scope) works like the addEventListener. However, there is no need for the corresponding removeEventListener function, as cancellation and structured concurrency serve this purpose.

Note that launchIn also returns a Job, which can be used to cancel the corresponding flow collection coroutine only without cancelling the whole scope or to join it.

16 June 2026