Asynchronous Flow
A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.
Representing multiple values
Multiple values can be represented in Kotlin using collections. For example, we can have a simple
function that returns a List of three numbers and then print them all using forEach:
This code outputs:
Sequences
If we are computing the numbers with some CPU-consuming blocking code (each computation taking 100ms), then we can represent the numbers using a Sequence:
This code outputs the same numbers, but it waits 100ms before printing each one.
Suspending functions
However, this computation blocks the main thread that is running the code. When these values are computed by asynchronous code we can mark the simple
function with a suspend
modifier, so that it can perform its work without blocking and return the result as a list:
This code prints the numbers after waiting for a second.
Flows
Using the List<Int>
result type, means we can only return all the values at once. To represent the stream of values that are being computed asynchronously, we can use a Flow<Int>
type just like we would use a Sequence<Int>
type for synchronously computed values:
This code waits 100ms before printing each number without blocking the main thread. This is verified by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
Notice the following differences in the code with the Flow from the earlier examples:
Flows are cold
Flows are cold streams similar to sequences — the code inside a flow builder does not run until the flow is collected. This becomes clear in the following example:
Which prints:
This is a key reason the simple
function (which returns a flow) is not marked with suspend
modifier. The simple()
call itself returns quickly and does not wait for anything. The flow starts afresh every time it is collected and that is why we see "Flow started" every time we call collect
again.
Flow cancellation basics
Flows adhere to the general cooperative cancellation of coroutines. As usual, flow collection can be cancelled when the flow is suspended in a cancellable suspending function (like delay). The following example shows how the flow gets cancelled on a timeout when running in a withTimeoutOrNull block and stops executing its code:
Notice how only two numbers get emitted by the flow in the simple
function, producing the following output:
See Flow cancellation checks section for more details.
Flow builders
The flow { ... }
builder from the previous examples is the most basic one. There are other builders that allow flows to be declared:
The flowOf builder defines a flow that emits a fixed set of values.
Various collections and sequences can be converted to flows using the
.asFlow()
extension function.
For example, the snippet that prints the numbers 1 to 3 from a flow can be rewritten as follows:
Intermediate flow operators
Flows can be transformed using operators, in the same way as you would transform collections and sequences. Intermediate operators are applied to an upstream flow and return a downstream flow. These operators are cold, just like flows are. A call to such an operator is not a suspending function itself. It works quickly, returning the definition of a new transformed flow.
The basic operators have familiar names like map and filter. An important difference of these operators from sequences is that blocks of code inside these operators can call suspending functions.
For example, a flow of incoming requests can be mapped to its results with a map operator, even when performing a request is a long-running operation that is implemented by a suspending function:
It produces the following three lines, each appearing one second after the previous:
Transform operator
Among the flow transformation operators, the most general one is called transform. It can be used to imitate simple transformations like map and filter, as well as implement more complex transformations. Using the transform
operator, we can emit arbitrary values an arbitrary number of times.
For example, using transform
we can emit a string before performing a long-running asynchronous request and follow it with a response:
The output of this code is:
Size-limiting operators
Size-limiting intermediate operators like take cancel the execution of the flow when the corresponding limit is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management functions (like try { ... } finally { ... }
blocks) operate normally in case of cancellation:
The output of this code clearly shows that the execution of the flow { ... }
body in the numbers()
function stopped after emitting the second number:
Terminal flow operators
Terminal operators on flows are suspending functions that start a collection of the flow. The collect operator is the most basic one, but there are other terminal operators, which can make it easier:
For example:
Prints a single number:
Flows are sequential
Each individual collection of a flow is performed sequentially unless special operators that operate on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator. No new coroutines are launched by default. Each emitted value is processed by all the intermediate operators from upstream to downstream and is then delivered to the terminal operator after.
See the following example that filters the even integers and maps them to strings:
Producing:
Flow context
Collection of a flow always happens in the context of the calling coroutine. For example, if there is a simple
flow, then the following code runs in the context specified by the author of this code, regardless of the implementation details of the simple
flow:
This property of a flow is called context preservation.
So, by default, code in the flow { ... }
builder runs in the context that is provided by a collector of the corresponding flow. For example, consider the implementation of a simple
function that prints the thread it is called on and emits three numbers:
Running this code produces:
Since simple().collect
is called from the main thread, the body of simple
's flow is also called in the main thread. This is the perfect default for fast-running or asynchronous code that does not care about the execution context and does not block the caller.
A common pitfall when using withContext
However, the long-running CPU-consuming code might need to be executed in the context of Dispatchers.Default and UI-updating code might need to be executed in the context of Dispatchers.Main. Usually, withContext is used to change the context in the code using Kotlin coroutines, but code in the flow { ... }
builder has to honor the context preservation property and is not allowed to emit from a different context.
Try running the following code:
This code produces the following exception:
flowOn operator
The exception refers to the flowOn function that shall be used to change the context of the flow emission. The correct way to change the context of a flow is shown in the example below, which also prints the names of the corresponding threads to show how it all works:
Notice how flow { ... }
works in the background thread, while collection happens in the main thread:
Another thing to observe here is that the flowOn operator has changed the default sequential nature of the flow. Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine ("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.
Buffering
Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when the emission by a simple
flow is slow, taking 100 ms to produce an element; and collector is also slow, taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:
It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):
We can use a buffer operator on a flow to run emitting code of the simple
flow concurrently with collecting code, as opposed to running them sequentially:
It produces the same numbers just faster, as we have effectively created a processing pipeline, having to only wait 100 ms for the first number and then spending only 300 ms to process each number. This way it takes around 1000 ms to run:
Conflation
When a flow represents partial results of the operation or operation status updates, it may not be necessary to process each value, but instead, only most recent ones. In this case, the conflate operator can be used to skip intermediate values when a collector is too slow to process them. Building on the previous example:
We see that while the first number was still being processed the second, and third were already produced, so the second one was conflated and only the most recent (the third one) was delivered to the collector:
Processing the latest value
Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values. The other way is to cancel a slow collector and restart it every time a new value is emitted. There is a family of xxxLatest
operators that perform the same essential logic of a xxx
operator, but cancel the code in their block on a new value. Let's try changing conflate to collectLatest in the previous example:
Since the body of collectLatest takes 300 ms, but new values are emitted every 100 ms, we see that the block is run on every value, but completes only for the last value:
Composing multiple flows
There are lots of ways to compose multiple flows.
Zip
Just like the Sequence.zip extension function in the Kotlin standard library, flows have a zip operator that combines the corresponding values of two flows:
This example prints:
Combine
When flow represents the most recent value of a variable or operation (see also the related section on conflation), it might be needed to perform a computation that depends on the most recent values of the corresponding flows and to recompute it whenever any of the upstream flows emit a value. The corresponding family of operators is called combine.
For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms, then zipping them using the zip operator will still produce the same result, albeit results that are printed every 400 ms:
However, when using a combine operator here instead of a zip:
We get quite a different output, where a line is printed at each emission from either nums
or strs
flows:
Flattening flows
Flows represent asynchronously received sequences of values, and so it is quite easy to get into a situation where each value triggers a request for another sequence of values. For example, we can have the following function that returns a flow of two strings 500 ms apart:
Now if we have a flow of three integers and call requestFlow
on each of them like this:
Then we will end up with a flow of flows (Flow<Flow<String>>
) that needs to be flattened into a single flow for further processing. Collections and sequences have flatten and flatMap operators for this. However, due to the asynchronous nature of flows they call for different modes of flattening, and hence, a family of flattening operators on flows exists.
flatMapConcat
Concatenation of flows of flows is provided by the flatMapConcat and flattenConcat operators. They are the most direct analogues of the corresponding sequence operators. They wait for the inner flow to complete before starting to collect the next one as the following example shows:
The sequential nature of flatMapConcat is clearly seen in the output:
flatMapMerge
Another flattening operation is to concurrently collect all the incoming flows and merge their values into a single flow so that values are emitted as soon as possible. It is implemented by flatMapMerge and flattenMerge operators. They both accept an optional concurrency
parameter that limits the number of concurrent flows that are collected at the same time (it is equal to DEFAULT_CONCURRENCY by default).
The concurrent nature of flatMapMerge is obvious:
flatMapLatest
In a similar way to the collectLatest operator, that was described in the section "Processing the latest value", there is the corresponding "Latest" flattening mode where the collection of the previous flow is cancelled as soon as new flow is emitted. It is implemented by the flatMapLatest operator.
The output here in this example is a good demonstration of how flatMapLatest works:
Flow exceptions
Flow collection can complete with an exception when an emitter or code inside the operators throw an exception. There are several ways to handle these exceptions.
Collector try and catch
A collector can use Kotlin's try/catch
block to handle exceptions:
This code successfully catches an exception in collect terminal operator and, as we see, no more values are emitted after that:
Everything is caught
The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators. For example, let's change the code so that emitted values are mapped to strings, but the corresponding code produces an exception:
This exception is still caught and collection is stopped:
Exception transparency
But how can code of the emitter encapsulate its exception handling behavior?
Flows must be transparent to exceptions and it is a violation of the exception transparency to emit values in the flow { ... }
builder from inside of a try/catch
block. This guarantees that a collector throwing an exception can always catch it using try/catch
as in the previous example.
The emitter can use a catch operator that preserves this exception transparency and allows encapsulation of its exception handling. The body of the catch
operator can analyze an exception and react to it in different ways depending on which exception was caught:
Exceptions can be rethrown using
throw
.Exceptions can be turned into emission of values using emit from the body of catch.
Exceptions can be ignored, logged, or processed by some other code.
For example, let us emit the text on catching an exception:
The output of the example is the same, even though we do not have try/catch
around the code anymore.
Transparent catch
The catch intermediate operator, honoring exception transparency, catches only upstream exceptions (that is an exception from all the operators above catch
, but not below it). If the block in collect { ... }
(placed below catch
) throws an exception then it escapes:
A "Caught ..." message is not printed despite there being a catch
operator:
Catching declaratively
We can combine the declarative nature of the catch operator with a desire to handle all the exceptions, by moving the body of the collect operator into onEach and putting it before the catch
operator. Collection of this flow must be triggered by a call to collect()
without parameters:
Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly using a try/catch
block:
Flow completion
When flow collection completes (normally or exceptionally) it may need to execute an action. As you may have already noticed, it can be done in two ways: imperative or declarative.
Imperative finally block
In addition to try
/catch
, a collector can also use a finally
block to execute an action upon collect
completion.
This code prints three numbers produced by the simple
flow followed by a "Done" string:
Declarative handling
For the declarative approach, flow has onCompletion intermediate operator that is invoked when the flow has completely collected.
The previous example can be rewritten using an onCompletion operator and produces the same output:
The key advantage of onCompletion is a nullable Throwable
parameter of the lambda that can be used to determine whether the flow collection was completed normally or exceptionally. In the following example the simple
flow throws an exception after emitting the number 1:
As you may expect, it prints:
The onCompletion operator, unlike catch, does not handle the exception. As we can see from the above example code, the exception still flows downstream. It will be delivered to further onCompletion
operators and can be handled with a catch
operator.
Successful completion
Another difference with catch operator is that onCompletion sees all exceptions and receives a null
exception only on successful completion of the upstream flow (without cancellation or failure).
We can see the completion cause is not null, because the flow was aborted due to downstream exception:
Imperative versus declarative
Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways. The natural question here is, which approach is preferred and why? As a library, we do not advocate for any particular approach and believe that both options are valid and should be selected according to your own preferences and code style.
Launching flow
It is easy to use flows to represent asynchronous events that are coming from some source. In this case, we need an analogue of the addEventListener
function that registers a piece of code with a reaction for incoming events and continues further work. The onEach operator can serve this role. However, onEach
is an intermediate operator. We also need a terminal operator to collect the flow. Otherwise, just calling onEach
has no effect.
If we use the collect terminal operator after onEach
, then the code after it will wait until the flow is collected:
As you can see, it prints:
The launchIn terminal operator comes in handy here. By replacing collect
with launchIn
we can launch a collection of the flow in a separate coroutine, so that execution of further code immediately continues:
It prints:
The required parameter to launchIn
must specify a CoroutineScope in which the coroutine to collect the flow is launched. In the above example this scope comes from the runBlocking coroutine builder, so while the flow is running, this runBlocking scope waits for completion of its child coroutine and keeps the main function from returning and terminating this example.
In actual applications a scope will come from an entity with a limited lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling the collection of the corresponding flow. This way the pair of onEach { ... }.launchIn(scope)
works like the addEventListener
. However, there is no need for the corresponding removeEventListener
function, as cancellation and structured concurrency serve this purpose.
Note that launchIn also returns a Job, which can be used to cancel the corresponding flow collection coroutine only without cancelling the whole scope or to join it.
Flow cancellation checks
For convenience, the flow builder performs additional ensureActive checks for cancellation on each emitted value. It means that a busy loop emitting from a flow { ... }
is cancellable:
We get only numbers up to 3 and a CancellationException after trying to emit number 4:
However, most other flow operators do not do additional cancellation checks on their own for performance reasons. For example, if you use IntRange.asFlow extension to write the same busy loop and don't suspend anywhere, then there are no checks for cancellation:
All numbers from 1 to 5 are collected and cancellation gets detected only before return from runBlocking
:
Making busy flow cancellable
In the case where you have a busy loop with coroutines you must explicitly check for cancellation. You can add .onEach { currentCoroutineContext().ensureActive() }
, but there is a ready-to-use cancellable operator provided to do that:
With the cancellable
operator only the numbers from 1 to 3 are collected:
Flow and Reactive Streams
For those who are familiar with Reactive Streams or reactive frameworks such as RxJava and project Reactor, design of the Flow may look very familiar.
Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in Reactive Streams and Kotlin Flows article.
While being different, conceptually, Flow is a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. Such converters are provided by kotlinx.coroutines
out-of-the-box and can be found in corresponding reactive modules (kotlinx-coroutines-reactive
for Reactive Streams, kotlinx-coroutines-reactor
for Project Reactor and kotlinx-coroutines-rx2
/kotlinx-coroutines-rx3
for RxJava2/RxJava3). Integration modules include conversions from and to Flow
, integration with Reactor's Context
and suspension-friendly ways to work with various reactive entities.