Channel
Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to Java's java.util.concurrent.BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.
Creating channels
The Channel(capacity)
factory function is used to create channels of different kinds depending on the value of the capacity
integer:
When
capacity
is 0 — it creates a rendezvous channel. This channel does not have any buffer at all. An element is transferred from the sender to the receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive, and receive suspends until another coroutine invokes send.When
capacity
is Channel.UNLIMITED — it creates a channel with effectively unlimited buffer. This channel has a linked-list buffer of unlimited capacity (limited only by available memory). Sending to this channel never suspends, and trySend always succeeds.When
capacity
is Channel.CONFLATED — it creates a conflated channel This channel buffers at most one element and conflates all subsequentsend
andtrySend
invocations, so that the receiver always gets the last element sent. Back-to-back sent elements are conflated — only the last sent element is received, while previously sent elements are lost. Sending to this channel never suspends, and trySend always succeeds.When
capacity
is positive but less than UNLIMITED — it creates an array-based channel with the specified capacity. This channel has an array buffer of a fixedcapacity
. Sending suspends only when the buffer is full, and receiving suspends only when the buffer is empty.
Buffered channels can be configured with an additional onBufferOverflow
parameter. It controls the behaviour of the channel's send function on buffer overflow:
SUSPEND — the default, suspend
send
on buffer overflow until there is free space in the buffer.DROP_OLDEST — do not suspend the
send
, add the latest value to the buffer, drop the oldest one from the buffer. A channel withcapacity = 1
andonBufferOverflow = DROP_OLDEST
is a conflated channel.DROP_LATEST — do not suspend the
send
, drop the value that is being sent, keep the buffer contents intact.
A non-default onBufferOverflow
implicitly creates a channel with at least one buffered element and is ignored for a channel with unlimited buffer. It cannot be specified for capacity = CONFLATED
, which is a shortcut by itself.
Prompt cancellation guarantee
All suspending functions with channels provide prompt cancellation guarantee. If the job was cancelled while send or receive function was suspended, it will not resume successfully, even if it already changed the channel's state, but throws a CancellationException. With a single-threaded dispatcher like Dispatchers.Main, this gives a guarantee that the coroutine promptly reacts to the cancellation of its Job and does not resume its execution.
Prompt cancellation guarantee for channel operations was added since
kotlinx.coroutines
version1.4.0
and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions. The low-level mechanics of prompt cancellation are explained in suspendCancellableCoroutine function.
Undelivered elements
As a result of the prompt cancellation guarantee, when a closeable resource (like open file or a handle to another native resource) is transferred via a channel from one coroutine to another, it can fail to be delivered and will be lost if the receiving operation is cancelled in transit.
A Channel()
constructor function has an onUndeliveredElement
optional parameter. When onUndeliveredElement
parameter is set, the corresponding function is called once for each element that was sent to the channel with the call to the send function but failed to be delivered, which can happen in the following cases:
When send operation throws an exception because it was cancelled before it had a chance to actually send the element or because the channel was closed or cancelled.
When receive, receiveOrNull, or hasNext operation throws an exception when it had retrieved the element from the channel but was cancelled before the code following the receive call resumed.
The channel was cancelled, in which case
onUndeliveredElement
is called on every remaining element in the channel's buffer.
Note, that onUndeliveredElement
function is called synchronously in an arbitrary context. It should be fast, non-blocking, and should not throw exceptions. Any exception thrown by onUndeliveredElement
is wrapped into an internal runtime exception which is either rethrown from the caller method or handed off to the exception handler in the current context (see CoroutineExceptionHandler) when one is available.
A typical usage for onUndeliveredElement
is to close a resource that is being transferred via the channel. The following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel are cancelled. Resources are never lost.
// Create the channel with onUndeliveredElement block that closes a resource
val channel = Channel<Resource>(capacity) { resource -> resource.close() }
// Producer code
val resourceToSend = openResource()
channel.send(resourceToSend)
// Consumer code
val resourceReceived = channel.receive()
try {
// work with received resource
} finally {
resourceReceived.close()
}
Note, that if you do any kind of work in between
openResource()
andchannel.send(...)
, then you should ensure that resource gets closed in case this additional code fails.