shareIn

fun <T> Flow<T>.shareIn(scope: CoroutineScope, started: SharingStarted, replay: Int = 0): SharedFlow<T>(source)

Converts a cold Flow into a hot SharedFlow that is started in the given coroutine scope, sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers, and replaying a specified number of replay values to new subscribers. See the SharedFlow documentation for the general concepts of shared flows.

The starting of the sharing coroutine is controlled by the started parameter. The following options are supported.

  • Eagerly — the upstream flow is started even before the first subscriber appears. Note that in this case all values emitted by the upstream beyond the most recent values as specified by replay parameter will be immediately discarded.

  • Lazily — starts the upstream flow after the first subscriber appears, which guarantees that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to get the most recent replay values. The upstream flow continues to be active even when all subscribers disappear, but only the most recent replay values are cached without subscribers.

  • WhileSubscribed() — starts the upstream flow when the first subscriber appears, immediately stops when the last subscriber disappears, keeping the replay cache forever. It has additional optional configuration parameters as explained in its documentation.

  • A custom strategy can be supplied by implementing the SharingStarted interface.

The shareIn operator is useful in situations when there is a cold flow that is expensive to create and/or to maintain, but there are multiple subscribers that need to collect its values. For example, consider a flow of messages coming from a backend over the expensive network connection, taking a lot of time to establish. Conceptually, it might be implemented like this:

val backendMessages: Flow<Message> = flow {
connectToBackend() // takes a lot of time
try {
while (true) {
emit(receiveMessageFromBackend())
}
} finally {
disconnectFromBackend()
}
}

If this flow is directly used in the application, then every time it is collected a fresh connection is established, and it will take a while before messages start flowing. However, we can share a single connection and establish it eagerly like this:

val messages: SharedFlow<Message> = backendMessages.shareIn(scope, SharingStarted.Eagerly)

Now a single connection is shared between all collectors from messages, and there is a chance that the connection is already established by the time it is needed.

Upstream completion and error handling

Normal completion of the upstream flow has no effect on subscribers, and the sharing coroutine continues to run. If a strategy like SharingStarted.WhileSubscribed is used, then the upstream can get restarted again. If a special action on upstream completion is needed, then an onCompletion operator can be used before the shareIn operator to emit a special value in this case, like this:

backendMessages
.onCompletion { cause -> if (cause == null) emit(UpstreamHasCompletedMessage) }
.shareIn(scope, SharingStarted.Eagerly)

Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers, and will be handled by the scope in which the sharing coroutine is launched. Custom exception handling can be configured by using the catch or retry operators before the shareIn operator. For example, to retry connection on any IOException with 1 second delay between attempts, use:

val messages = backendMessages
.retry { e ->
val shallRetry = e is IOException // other exception are bugs - handle them
if (shallRetry) delay(1000)
shallRetry
}
.shareIn(scope, SharingStarted.Eagerly)

Initial value

When a special initial value is needed to signal to subscribers that the upstream is still loading the data, use the onStart operator on the upstream flow. For example:

backendMessages
.onStart { emit(UpstreamIsStartingMessage) }
.shareIn(scope, SharingStarted.Eagerly, 1) // replay one most recent message

Buffering and conflation

The shareIn operator runs the upstream flow in a separate coroutine, and buffers emissions from upstream as explained in the buffer operator's description, using a buffer of replay size or the default (whichever is larger). This default buffering can be overridden with an explicit buffer configuration by preceding the shareIn call with buffer or conflate, for example:

  • buffer(0).shareIn(scope, started, 0) — overrides the default buffer size and creates a SharedFlow without a buffer. Effectively, it configures sequential processing between the upstream emitter and subscribers, as the emitter is suspended until all subscribers process the value. Note, that the value is still immediately discarded when there are no subscribers.

  • buffer(b).shareIn(scope, started, r) — creates a SharedFlow with replay = r and extraBufferCapacity = b.

  • conflate().shareIn(scope, started, r) — creates a SharedFlow with replay = r, onBufferOverflow = DROP_OLDEST, and extraBufferCapacity = 1 when replay == 0 to support this strategy.

Operator fusion

Application of flowOn, buffer with RENDEZVOUS capacity, or cancellable operators to the resulting shared flow has no effect.

Exceptions

This function throws IllegalArgumentException on unsupported values of parameters or combinations thereof.

Parameters

scope

the coroutine scope in which sharing is started.

started

the strategy that controls when sharing is started and stopped.

replay

the number of values replayed to new subscribers (cannot be negative, defaults to zero).