Channel

Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to java.util.concurrent.BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.

Channel capacity

Most ways to create a Channel (in particular, the Channel() factory function) allow specifying a capacity, which determines how elements are buffered in the channel. There are several predefined constants for the capacity that have special behavior:

See each constant's documentation for more details.

If the capacity is positive but less than Channel.UNLIMITED, the channel has a buffer with the specified capacity. It is safe to construct a channel with a large buffer, as memory is only allocated gradually as elements are added.

Constructing a channel with a negative capacity not equal to a predefined constant is not allowed and throws an IllegalArgumentException.

Buffer overflow

Some ways to create a Channel also expose a BufferOverflow parameter (by convention, onBufferOverflow), which does not affect the receiver but determines the behavior of the sender when the buffer is full. The options include suspending until there is space in the buffer, dropping the oldest element to make room for the new one, or dropping the element to be sent. See the BufferOverflow documentation.

By convention, the default value for BufferOverflow whenever it can not be configured is BufferOverflow.SUSPEND.

See the Channel.RENDEZVOUS, Channel.CONFLATED, and Channel.UNLIMITED documentation for a description of how they interact with the BufferOverflow parameter.

Prompt cancellation guarantee

All suspending functions with channels provide prompt cancellation guarantee. If the job was cancelled while send or receive function was suspended, it will not resume successfully, even if it already changed the channel's state, but throws a CancellationException. With a single-threaded dispatcher like Dispatchers.Main, this gives a guarantee that the coroutine promptly reacts to the cancellation of its Job and does not resume its execution.

Prompt cancellation guarantee for channel operations was added in kotlinx.coroutines version 1.4.0 and has replaced the channel-specific atomic cancellation that was not consistent with other suspending functions. The low-level mechanics of prompt cancellation are explained in the suspendCancellableCoroutine documentation.

Undelivered elements

As a result of the prompt cancellation guarantee, when a closeable resource (like an open file or a handle to another native resource) is transferred via a channel, it can be successfully extracted from the channel, but still be lost if the receiving operation is cancelled in parallel.

The Channel() factory function has the optional parameter onUndeliveredElement. When that parameter is set, the corresponding function is called once for each element that was sent to the channel with the call to the send function but failed to be delivered, which can happen in the following cases:

  • When an element is dropped due to the limited buffer capacity. This can happen when the overflow strategy is BufferOverflow.DROP_LATEST or BufferOverflow.DROP_OLDEST.

  • When the sending operations like send or onSend throw an exception because it was cancelled before it had a chance to actually send the element or because the channel was closed or cancelled.

  • When the receiving operations like receive, onReceive, or hasNext throw an exception after retrieving the element from the channel because of being cancelled before the code following them had a chance to resume.

  • When the channel was cancelled, in which case onUndeliveredElement is called on every remaining element in the channel's buffer.

Note that onUndeliveredElement is called synchronously in an arbitrary context. It should be fast, non-blocking, and should not throw exceptions. Any exception thrown by onUndeliveredElement is wrapped into an internal runtime exception which is either rethrown from the caller method or handed off to the exception handler in the current context (see CoroutineExceptionHandler) when one is available.

A typical usage for onUndeliveredElement is to close a resource that is being transferred via the channel. The following code pattern guarantees that opened resources are closed even if the producer, the consumer, and/or the channel are cancelled. Resources are never lost.

// Create a channel with an onUndeliveredElement block that closes a resource
val channel = Channel<Resource>(onUndeliveredElement = { resource -> resource.close() })

// Producer code
val resourceToSend = openResource()
channel.send(resourceToSend)

// Consumer code
val resourceReceived = channel.receive()
try {
// work with received resource
} finally {
resourceReceived.close()
}

Note that if any work happens between openResource() and channel.send(...), it is your responsibility to ensure that resource gets closed in case this additional code fails.

Types

Link copied to clipboard
object Factory

Constants for the channel factory function Channel().