Package-level declarations

Channels — non-blocking primitives for communicating a stream of elements between coroutines.

Channels — non-blocking primitives for communicating a stream of elements between coroutines.

Channels — non-blocking primitives for communicating a stream of elements between coroutines.

Types

Link copied to clipboard

Scope for actor coroutine builder.

Link copied to clipboard

Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers that subscribe for the elements using openSubscription function and unsubscribe using ReceiveChannel.cancel function.

Link copied to clipboard

A strategy for buffer overflow handling in channels and flows that controls what is going to be sacrificed on buffer overflow:

Link copied to clipboard

Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to Java's java.util.concurrent.BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.

Link copied to clipboard
interface ChannelIterator<out E>

Iterator for ReceiveChannel. Instances of this interface are not thread-safe and shall not be used from concurrent coroutines.

Link copied to clipboard
value class ChannelResult<out T>

A discriminated union of channel operation result. It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with an optional cause.

Link copied to clipboard

Indicates an attempt to receive from a isClosedForReceive channel that was closed without a cause. A failed channel rethrows the original close cause exception on receive attempts.

Link copied to clipboard

Indicates an attempt to send to a isClosedForSend channel that was closed without a cause. A failed channel rethrows the original close cause exception on send attempts.

Link copied to clipboard

Broadcasts the most recently sent element (aka value) to all openSubscription subscribers.

Link copied to clipboard

Scope for the produce, callbackFlow and channelFlow builders.

Link copied to clipboard
interface ReceiveChannel<out E>

Receiver's interface to Channel.

Link copied to clipboard
interface SendChannel<in E>

Sender's interface to Channel.

Link copied to clipboard

Functions

Link copied to clipboard
fun <E> CoroutineScope.actor(context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit): SendChannel<E>

Launches new coroutine that is receiving messages from its mailbox channel and returns a reference to its mailbox channel as a SendChannel. The resulting object can be used to send messages to this coroutine.

Link copied to clipboard
suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {})

Suspends the current coroutine until the channel is either closed or cancelled and invokes the given block before resuming the coroutine.

Link copied to clipboard
fun <E> ReceiveChannel<E>.broadcast(capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY): BroadcastChannel<E>

Broadcasts all elements of the channel. This function consumes all elements of the original ReceiveChannel.

fun <E> CoroutineScope.broadcast(context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY, onCompletion: CompletionHandler? = null, block: suspend ProducerScope<E>.() -> Unit): BroadcastChannel<E>

Launches new coroutine to produce a stream of values by sending them to a broadcast channel and returns a reference to the coroutine as a BroadcastChannel. The resulting object can be used to subscribe to elements produced by this coroutine.

Link copied to clipboard

Creates a broadcast channel with the specified buffer capacity.

Link copied to clipboard
fun <E> Channel(capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: (E) -> Unit? = null): Channel<E>

Creates a channel with the specified buffer capacity (or without a buffer by default). See Channel interface documentation for details.

Link copied to clipboard

Opens subscription to this BroadcastChannel and makes sure that the given block consumes all elements from it by always invoking cancel after the execution of the block.

inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R

Makes sure that the given block consumes all elements from the given channel by always invoking cancel after the execution of the block.

Link copied to clipboard
inline suspend fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit)

Subscribes to this BroadcastChannel and performs the specified action for each received element.

inline suspend fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)

Performs the given action for each received element and cancels the channel after the execution of the block. If you need to iterate over the channel without consuming it, a regular for loop should be used instead.

Link copied to clipboard
inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T

Returns the encapsulated value if this instance represents success or the result of onFailure function for the encapsulated Throwable exception if it is failed or closed result.

Link copied to clipboard
inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T>

Performs the given action on the encapsulated Throwable exception if this instance represents failure due to channel being closed. The result of ChannelResult.exceptionOrNull is passed to the action parameter. It is guaranteed that if action is invoked, then the channel is either closed for send or is closed for receive depending on the failed operation.

Link copied to clipboard
inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T>

Performs the given action on the encapsulated Throwable exception if this instance represents failure. The result of ChannelResult.exceptionOrNull is passed to the action parameter.

Link copied to clipboard
inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T>

Performs the given action on the encapsulated value if this instance represents success. Returns the original ChannelResult unchanged.

Link copied to clipboard
fun <E> CoroutineScope.produce(context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, block: suspend ProducerScope<E>.() -> Unit): ReceiveChannel<E>

Launches a new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine.

Link copied to clipboard
fun ticker(delayMillis: Long, initialDelayMillis: Long = delayMillis, context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD): ReceiveChannel<Unit>

Creates a channel that produces the first item after the given initial delay and subsequent items with the given delay between them.

Link copied to clipboard
suspend fun <E> ReceiveChannel<E>.toList(): List<E>

Returns a List containing all elements.

Link copied to clipboard

Adds element to this channel, blocking the caller while this channel is full, and returning either successful result when the element was added, or failed result representing closed channel with a corresponding exception.