consumeEach
Performs the given action for each received element and cancels the channel afterward.
This function stops processing elements when either the channel is closed, the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the channel's buffer, action fails with an exception, or an early return from action happens. If the action finishes with an exception, that exception will be used for cancelling the channel and rethrown. If the channel is closed with a cause, this cause will be rethrown from consumeEach.
When the channel does not need to be closed after iterating over its elements, a regular for loop (for (element in channel)) should be used instead.
The operation is terminal. This function consumes the elements of the original ReceiveChannel.
This function is useful in cases when this channel is only expected to have a single consumer that decides when the producer may stop. Example:
val channel = Channel<Int>(1)
// Launch several procedures that create values
repeat(5) {
launch(Dispatchers.Default) {
while (true) {
channel.send(Random.nextInt(40, 50))
}
}
}
// Launch the exclusive consumer
val result = run {
channel.consumeEach {
if (it == 42) {
println("Found the answer")
return@run it // forcibly stop collection
}
}
// *Note*: some elements could be lost in the channel!
}
check(result == 42)In this example, several coroutines put elements into a single channel, and a single consumer processes the elements. Once it finds the elements it's looking for, it stops consumeEach by making an early return.
Deprecated (with error)
BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported
Subscribes to this BroadcastChannel and performs the specified action for each received element.
Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0