Channel

interface Channel<E> : SendChannel<E> , ReceiveChannel<E>

Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to Java's java.util.concurrent.BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.

Creating channels

The Channel(capacity) factory function is used to create channels of different kinds depending on the value of the capacity integer:

  • When capacity is 0 — it creates a rendezvous channel. This channel does not have any buffer at all. An element is transferred from the sender to the receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive, and receive suspends until another coroutine invokes send.

  • When capacity is Channel.UNLIMITED — it creates a channel with effectively unlimited buffer. This channel has a linked-list buffer of unlimited capacity (limited only by available memory). Sending to this channel never suspends, and trySend always succeeds.

  • When capacity is Channel.CONFLATED — it creates a conflated channel This channel buffers at most one element and conflates all subsequent send and trySend invocations, so that the receiver always gets the last element sent. Back-to-back sent elements are conflated — only the last sent element is received, while previously sent elements are lost. Sending to this channel never suspends, and trySend always succeeds.

  • When capacity is positive but less than UNLIMITED — it creates an array-based channel with the specified capacity. This channel has an array buffer of a fixed capacity. Sending suspends only when the buffer is full, and receiving suspends only when the buffer is empty.

Buffered channels can be configured with an additional onBufferOverflow parameter. It controls the behaviour of the channel's send function on buffer overflow:

  • SUSPEND — the default, suspend send on buffer overflow until there is free space in the buffer.

  • DROP_OLDEST — do not suspend the send, add the latest value to the buffer, drop the oldest one from the buffer. A channel with capacity = 1 and onBufferOverflow = DROP_OLDEST is a conflated channel.

  • DROP_LATEST — do not suspend the send, drop the value that is being sent, keep the buffer contents intact.

A non-default onBufferOverflow implicitly creates a channel with at least one buffered element and is ignored for a channel with unlimited buffer. It cannot be specified for capacity = CONFLATED, which is a shortcut by itself.

Prompt cancellation guarantee

All suspending functions with channels provide prompt cancellation guarantee. If the job was cancelled while send or receive function was suspended, it will not resume successfully, but throws a CancellationException. With a single-threaded dispatcher like Dispatchers.Main this gives a guarantee that if a piece code running in this thread cancels a Job, then a coroutine running this job cannot resume successfully and continue to run, ensuring a prompt response to its cancellation.

Prompt cancellation guarantee for channel operations was added since kotlinx.coroutines version 1.4.0 and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions. The low-level mechanics of prompt cancellation are explained in suspendCancellableCoroutine function.

Undelivered elements

As a result of a prompt cancellation guarantee, when a closeable resource (like open file or a handle to another native resource) is transferred via channel from one coroutine to another it can fail to be delivered and will be lost if either send or receive operations are cancelled in transit.

A Channel() constructor function has an onUndeliveredElement optional parameter. When onUndeliveredElement parameter is set, the corresponding function is called once for each element that was sent to the channel with the call to the send function but failed to be delivered, which can happen in the following cases:

  • When send operation throws an exception because it was cancelled before it had a chance to actually send the element or because the channel was closed or cancelled.

  • When receive, receiveOrNull, or hasNext operation throws an exception when it had retrieved the element from the channel but was cancelled before the code following the receive call resumed.

  • The channel was cancelled, in which case onUndeliveredElement is called on every remaining element in the channel's buffer.

Note, that onUndeliveredElement function is called synchronously in an arbitrary context. It should be fast, non-blocking, and should not throw exceptions. Any exception thrown by onUndeliveredElement is wrapped into an internal runtime exception which is either rethrown from the caller method or handed off to the exception handler in the current context (see CoroutineExceptionHandler) when one is available.

A typical usage for onDeliveredElement is to close a resource that is being transferred via the channel. The following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel are cancelled. Resources are never lost.

// Create the channel with onUndeliveredElement block that closes a resource
val channel = Channel<Resource>(capacity) { resource -> resource.close() }

// Producer code
val resourceToSend = openResource()
channel.send(resourceToSend)

// Consumer code
val resourceReceived = channel.receive()
try {
// work with received resource
} finally {
resourceReceived.close()
}

Note, that if you do any kind of work in between openResource() and channel.send(...), then you should ensure that resource gets closed in case this additional code fails.

Types

Factory
Link copied to clipboard
object Factory

Constants for the channel factory function Channel().

Sources

common source
Link copied to clipboard