conflate

fun <T> Flow<T>.conflate(): Flow<T>(source)

Conflates flow emissions via conflated channel and runs collector in a separate coroutine. The effect of this is that emitter is never suspended due to a slow collector, but collector always gets the most recent value emitted.

This is a shortcut for buffer(capacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST). See the buffer operator for other configuration options.

For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:

val flow = flow {
    for (i in 1..30) {
        delay(100)
        emit(i)
    }
}

Applying conflate() operator to it allows a collector that delays 1 second on each element to get integers 1, 10, 20, 30:

val result = flow.conflate().onEach { delay(1000) }.toList()
assertEquals(listOf(1, 10, 20, 30), result)

Note that conflate operator is a shortcut for buffer with capacity of Channel.CONFLATED, which is, in turn, a shortcut to a buffer that only keeps the latest element as created by buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).

Operator fusion

Adjacent applications of conflate/buffer, channelFlow, flowOn and produceIn are always fused so that only one properly configured channel is used for execution.

If there was no explicit buffer size specified, then the buffer size is 0. Otherwise, the buffer size is unchanged. The strategy for buffer overflow becomes BufferOverflow.DROP_OLDEST after the application of this operator, but can be overridden later.

Note that any instance of StateFlow already behaves as if conflate operator is applied to it, so applying conflate to a StateFlow has no effect. See StateFlow documentation on Operator Fusion.