produce
Launches a new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine.
The scope of the coroutine contains the ProducerScope interface, which implements both CoroutineScope and SendChannel, so that the coroutine can invoke send directly.
The kind of the resulting channel depends on the specified capacity parameter. See the Channel interface documentation for details. By default, an unbuffered channel is created. If an invalid capacity value is specified, an IllegalArgumentException is thrown.
Behavior on termination
The channel is closed when the coroutine completes.
val values = listOf(1, 2, 3, 4)
val channel = produce<Int> {
for (value in values) {
send(value)
}
}
check(channel.toList() == values)
The running coroutine is cancelled when the channel is cancelled.
val channel = produce<Int> {
send(1)
send(2)
try {
send(3) // will throw CancellationException
} catch (e: CancellationException) {
println("The channel was cancelled!)
throw e // always rethrow CancellationException
}
}
check(channel.receive() == 1)
check(channel.receive() == 2)
channel.cancel()
If this coroutine finishes with an exception, it will close the channel with that exception as the cause, so after receiving all the existing elements, all further attempts to receive from it will throw the exception with which the coroutine finished.
val produceJob = Job()
// create and populate a channel with a buffer
val channel = produce<Int>(produceJob, capacity = Channel.UNLIMITED) {
repeat(5) { send(it) }
throw TestException()
}
produceJob.join() // wait for `produce` to fail
check(produceJob.isCancelled == true)
// prints 0, 1, 2, 3, 4, then throws `TestException`
for (value in channel) { println(value) }
When the coroutine is cancelled via structured concurrency and not the cancel
function, the channel does not automatically close until the coroutine completes, so it is possible that some elements will be sent even after the coroutine is cancelled:
val parentScope = CoroutineScope(Dispatchers.Default)
val channel = parentScope.produce<Int>(capacity = Channel.UNLIMITED) {
repeat(5) {
send(it)
}
parentScope.cancel()
// suspending after this point would fail, but sending succeeds
send(-1)
}
for (c in channel) {
println(c) // 0, 1, 2, 3, 4, -1
} // throws a `CancellationException` exception after reaching -1
Note that cancelling produce
via structured concurrency closes the channel with a cause.
The behavior around coroutine cancellation and error handling is experimental and may change in a future release.
Coroutine context
The coroutine context is inherited from this CoroutineScope. Additional context elements can be specified with the context argument. If the context does not have any dispatcher or other ContinuationInterceptor, then Dispatchers.Default is used. The parent job is inherited from the CoroutineScope as well, but it can also be overridden with a corresponding context element.
See newCoroutineContext for a description of debugging facilities available for newly created coroutines.
Undelivered elements
Some values that produce creates may be lost:
val channel = produce(Dispatchers.Default, capacity = 5) {
repeat(100) {
send(it)
println("Sent $it")
}
}
channel.cancel() // no elements can be received after this!
There is no way to recover these lost elements. If this is unsuitable, please create a Channel manually and pass the onUndeliveredElement
callback to the constructor: Channel(onUndeliveredElement = ...).
Usage example
/* Generate random integers until we find the square root of 9801.
To calculate whether the given number is that square root,
use several coroutines that separately process these integers.
Alternatively, we may randomly give up during value generation.
`produce` is used to generate the integers and put them into a
channel, from which the square-computing coroutines take them. */
val parentScope = CoroutineScope(SupervisorJob())
val channel = parentScope.produce<Int>(
Dispatchers.IO,
capacity = 16 // buffer of size 16
) {
// this code will run on Dispatchers.IO
while (true) {
val request = run {
// simulate waiting for the next request
delay(5.milliseconds)
val randomInt = Random.nextInt(-1, 100)
if (randomInt == -1) {
// external termination request received
println("Producer: no longer accepting requests")
return@produce
}
println("Producer: sending a request ($randomInt)")
randomInt
}
send(request)
}
}
// Launch consumers
repeat(4) {
launch(Dispatchers.Default) {
for (request in channel) {
// simulate processing a request
delay(25.milliseconds)
println("Consumer $it: received a request ($request)")
if (request * request == 9801) {
println("Consumer $it found the square root of 9801!")
/* the work is done, the producer may finish.
the internal termination request will cancel
the producer on the next suspension point. */
channel.cancel()
}
}
}
}
Note: This is an experimental api. Behaviour of producers that work as children in a parent scope with respect to cancellation and error handling may change in the future.