consumeTo
Consumes the elements of this channel into the provided mutable collection.
This is a convenience function equivalent to calling consumeAsFlow followed by kotlinx.coroutines.flow.toCollection. Please use consumeAsFlow directly in scenarios where elements should undergo additional transformations before being added to the resulting collection.
consumeTo attempts to receive elements and put them into the collection until the channel is closed.
If the channel is closed with a cause, consumeTo will rethrow that cause. However, the elements already received up to that point will remain in the collection.
Since this function is implemented using consume, it is terminal, meaning that consumeTo will cancel the channel before exiting. A consumeTo call can finish before the sender closes the channel if it gets cancelled while waiting for the next element, or if MutableCollection.add fails with an exception. In both cases, the exception will be used for cancelling the channel and then rethrown.
The intended use case for this function is collecting the remaining elements of a closed channel and processing them in a single batch.
Example:
val doContinue = AtomicBoolean(true)
// Start the sender
val channel = produce {
var i = 0
while (doContinue.load()) {
send(i++)
delay(10.milliseconds)
if (i == 42) break
}
}
// Start the consumer
launch {
// Read elements until we suddenly decide to stop
// or until the channel is closed.
while (Random.nextInt(100) != 0) {
val nextElement = channel.receiveCatching()
if (nextElement.isClosed) return@launch
println("Received ${nextElement.getOrNull()}")
}
delay(100.milliseconds)
doContinue.store(false)
delay(100.milliseconds)
val remainingElements = mutableListOf<Int>()
try {
channel.consumeTo(remainingElements)
} finally {
println("Remaining elements: $remainingElements")
}
}