SharedFlow

interface SharedFlow<out T> : Flow<T>

A hotFlow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values. A shared flow is called hot because its active instance exists independently of the presence of collectors. This is opposed to a regular Flow, such as defined by the flow { ... } function, which is cold and is started separately for each collector.

Shared flow never completes. A call to Flow.collect on a shared flow never completes normally, and neither does a coroutine started by the Flow.launchIn function. An active collector of a shared flow is called a subscriber.

A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running is cancelled. A subscriber to a shared flow is always cancellable, and checks for cancellation before each emission. Note that most terminal operators like Flow.toList would also not complete, when applied to a shared flow, but flow-truncating operators like Flow.take and Flow.takeWhile can be used on a shared flow to turn it into a completing one.

A mutable shared flow is created using the MutableSharedFlow(...) constructor function. Its state can be updated by emitting values to it and performing other operations. See the MutableSharedFlow documentation for details.

SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go. For example, the following class encapsulates an event bus that distributes events to all subscribers in a rendezvous manner, suspending until all subscribers receive emitted event:

class EventBus {
private val _events = MutableSharedFlow<Event>() // private mutable shared flow
val events = _events.asSharedFlow() // publicly exposed as read-only shared flow

suspend fun produceEvent(event: Event) {
_events.emit(event) // suspends until all subscribers receive it
}
}

As an alternative to the above usage with the MutableSharedFlow(...) constructor function, any coldFlow can be converted to a shared flow using the shareIn operator.

There is a specialized implementation of shared flow for the case where the most recent state value needs to be shared. See StateFlow for details.

Replay cache and buffer

A shared flow keeps a specific number of the most recent values in its replay cache. Every new subscriber first gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is specified when the shared flow is created by the replay parameter. A snapshot of the current replay cache is available via the replayCache property and it can be reset with the MutableSharedFlow.resetReplayCache function.

A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved using the extraBufferCapacity parameter.

A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using the onBufferOverflow parameter, which is equal to one of the entries of the BufferOverflow enum. When a strategy other than SUSPENDED is configured, emissions to the shared flow never suspend.

Buffer overflow condition can happen only when there is at least one subscriber that is not ready to accept the new value. In the absence of subscribers only the most recent replay values are stored and the buffer overflow behavior is never triggered and has no effect. In particular, in the absence of subscribers emitter never suspends despite BufferOverflow.SUSPEND option and BufferOverflow.DROP_LATEST option does not have effect either. Essentially, the behavior in the absence of subscribers is always similar to BufferOverflow.DROP_OLDEST, but the buffer is just of replay size (without any extraBufferCapacity).

Unbuffered shared flow

A default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers. Thus, tryEmit call succeeds and returns true only if there are no subscribers (in which case the emitted value is immediately lost).

SharedFlow vs BroadcastChannel

Conceptually shared flow is similar to BroadcastChannel and is designed to completely replace it. It has the following important differences:

  • SharedFlow is simpler, because it does not have to implement all the Channel APIs, which allows for faster and simpler implementation.

  • SharedFlow supports configurable replay and buffer overflow strategy.

  • SharedFlow has a clear separation into a read-only SharedFlow interface and a MutableSharedFlow.

  • SharedFlow cannot be closed like BroadcastChannel and can never represent a failure. All errors and completion signals should be explicitly materialized if needed.

To migrate BroadcastChannel usage to SharedFlow, start by replacing usages of the BroadcastChannel(capacity) constructor with MutableSharedFlow(0, extraBufferCapacity=capacity) (broadcast channel does not replay values to new subscribers). Replace send and trySend calls with emit and tryEmit, and convert subscribers' code to flow operators.

Concurrency

All methods of shared flow are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.

Operator fusion

Application of flowOn, buffer with RENDEZVOUS capacity, or cancellable operators to a shared flow has no effect.

Implementation notes

Shared flow implementation uses a lock to ensure thread-safety, but suspending collector and emitter coroutines are resumed outside of this lock to avoid dead-locks when using unconfined coroutines. Adding new subscribers has O(1) amortized cost, but emitting has O(N) cost, where N is the number of subscribers.

Not stable for inheritance

The SharedFlow interface is not stable for inheritance in 3rd party libraries, as new methods might be added to this interface in the future, but is stable for use. Use the MutableSharedFlow(replay, ...) constructor function to create an implementation.

Functions

collect
Link copied to clipboard
abstract suspend override fun collect(collector: FlowCollector<T>): Nothing

Accepts the given collector and emits values into it. This method should never be used directly. To emit values from a shared flow into a specific collector, either collector.emitAll(flow) or collect { ... } extension should be used.

Properties

replayCache
Link copied to clipboard
abstract val replayCache: List<T>

A snapshot of the replay cache.

Inheritors

MutableSharedFlow
Link copied to clipboard
StateFlow
Link copied to clipboard

Extensions

onSubscription
Link copied to clipboard
fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T>

Returns a flow that invokes the given actionafter this shared flow starts to be collected (after the subscription is registered).

Sources

common source
Link copied to clipboard