consumeEach

inline suspend fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)(source)

Performs the given action for each received element and cancels the channel afterward.

This function stops processing elements when either the channel is closed, the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the channel's buffer, action fails with an exception, or an early return from action happens. If the action finishes with an exception, that exception will be used for cancelling the channel and rethrown. If the channel is closed with a cause, this cause will be rethrown from consumeEach.

When the channel does not need to be closed after iterating over its elements, a regular for loop (for (element in channel)) should be used instead.

The operation is terminal. This function consumes the elements of the original ReceiveChannel.

This function is useful in cases when this channel is only expected to have a single consumer that decides when the producer may stop. Example:

val channel = Channel<Int>(1)
// Launch several procedures that create values
repeat(5) {
launch(Dispatchers.Default) {
while (true) {
channel.send(Random.nextInt(40, 50))
}
}
}
// Launch the exclusive consumer
val result = run {
channel.consumeEach {
if (it == 42) {
println("Found the answer")
return@run it // forcibly stop collection
}
}
// *Note*: some elements could be lost in the channel!
}
check(result == 42)

In this example, several coroutines put elements into a single channel, and a single consumer processes the elements. Once it finds the elements it's looking for, it stops consumeEach by making an early return.

Pitfall: even though the name says "each", some elements could be left unprocessed if they are added after this function decided to close the channel. In this case, the elements will simply be lost. If the elements of the channel are resources that must be closed (like file handles, sockets, etc.), an onUndeliveredElement must be passed to the Channel on construction. It will be called for each element left in the channel at the point of cancellation.


inline suspend fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit)(source)

Deprecated (with error)

BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported

Subscribes to this BroadcastChannel and performs the specified action for each received element.

Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0