consume

inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R(source)

Executes the block and then cancels the channel.

It is guaranteed that, after invoking this operation, the channel will be cancelled, so the operation is terminal. If the block finishes with an exception, that exception will be used for cancelling the channel and rethrown.

This function is useful for building more complex terminal operators while ensuring that the producers stop sending new elements to the channel.

Example:

suspend fun <E> ReceiveChannel<E>.consumeFirst(): E =
consume { return receive() }
// Launch a coroutine that constantly sends new values
val channel = produce(Dispatchers.Default) {
var i = 0
while (true) {
// Will fail with a `CancellationException`
// after `consumeFirst` finishes.
send(i++)
}
}
// Grab the first value and discard everything else
val firstElement = channel.consumeFirst()
check(firstElement == 0)
// *Note*: some elements could be lost in the channel!

In this example, the channel will get closed, and the producer coroutine will finish its work after the first element is obtained. If consumeFirst was implemented as for (e in this) { return e } instead, the producer coroutine would be active until it was cancelled some other way.

consume does not guarantee that new elements will not enter the channel after block finishes executing, so some channel elements may be lost. Use the onUndeliveredElement parameter of a manually created Channel to define what should happen with these elements during ReceiveChannel.cancel.


Deprecated (with error)

BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported

Opens subscription to this BroadcastChannel and makes sure that the given block consumes all elements from it by always invoking cancel after the execution of the block.

Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0 It is replaced with SharedFlow.

Safe to remove in 1.9.0 as was inline before.