fun <T> Flow<T>.conflate(): Flow<T>

Conflates flow emissions via conflated channel and runs collector in a separate coroutine. The effect of this is that emitter is never suspended due to a slow collector, but collector always gets the most recent value emitted.

For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:

val flow = flow {
for (i in 1..30) {

Applying conflate() operator to it allows a collector that delays 1 second on each element to get integers 1, 10, 20, 30:

val result = flow.conflate().onEach { delay(1000) }.toList()
assertEquals(listOf(1, 10, 20, 30), result)

Note that conflate operator is a shortcut for buffer with capacity of Channel.CONFLATED, which is, in turn, a shortcut to a buffer that only keeps the latest element as created by buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).

Operator fusion

Adjacent applications of conflate/buffer, channelFlow, flowOn and produceIn are always fused so that only one properly configured channel is used for execution. Conflation takes precedence over buffer() calls with any other capacity.

Note that any instance of StateFlow already behaves as if conflate operator is applied to it, so applying conflate to a StateFlow has no effect. See StateFlow documentation on Operator Fusion.


Link copied to clipboard