Package kotlinx.coroutines.reactor

Utilities for Reactor.

Types

ReactorContext
Link copied to clipboard
class ReactorContext(context: Context) : AbstractCoroutineContextElement

Wraps Reactor's Context into a CoroutineContext element for seamless integration between Reactor and kotlinx.coroutines. Context.asCoroutineContext puts Reactor's Context elements into a CoroutineContext, which can be used to propagate the information about Reactor's Context through coroutines.

SchedulerCoroutineDispatcher
Link copied to clipboard
class SchedulerCoroutineDispatcher(scheduler: Scheduler) : CoroutineDispatcher, Delay

Implements CoroutineDispatcher on top of an arbitrary Scheduler.

Functions

asCoroutineContext
Link copied to clipboard
fun ContextView.asCoroutineContext(): ReactorContext

Wraps the given ContextView into ReactorContext, so it can be added to the coroutine's context and later used via coroutineContext[ReactorContext].

asCoroutineDispatcher
Link copied to clipboard

Converts an instance of Scheduler to an implementation of CoroutineDispatcher.

asFlux
Link copied to clipboard
fun <T : Any> Flow<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T>

Converts the given flow to a cold flux. The original flow is cancelled when the flux subscriber is disposed.

asMono
Link copied to clipboard
fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T>

Converts this deferred value to the hot reactive mono that signals success or error.

fun Job.asMono(context: CoroutineContext): Mono<Unit>

Converts this job to the hot reactive mono that signals with success when the corresponding job completes.

awaitSingle
Link copied to clipboard
suspend fun <T> Mono<T>.awaitSingle(): T

Awaits the single value from the given Mono without blocking the thread and returns the resulting value, or, if this Mono has produced an error, throws the corresponding exception.

awaitSingleOrNull
Link copied to clipboard
suspend fun <T> Mono<T>.awaitSingleOrNull(): T?

Awaits the single value from the given Mono without blocking the thread and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value, null is returned.

flux
Link copied to clipboard
fun <T> flux(context: CoroutineContext = EmptyCoroutineContext, block: suspend ProducerScope<T>.() -> Unit): Flux<T>

Creates a cold reactive Flux that runs the given block in a coroutine. Every time the returned flux is subscribed, it starts a new coroutine in the specified context. The coroutine emits (Subscriber.onNext) values with send, completes (Subscriber.onComplete) when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed, emits the error (Subscriber.onError) and closes the channel with the cause. Unsubscribing cancels the running coroutine.

mono
Link copied to clipboard
fun <T> mono(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T?): Mono<T>

Creates a cold mono that runs a given block in a coroutine and emits its result. Every time the returned mono is subscribed, it starts a new coroutine. If the result of block is null, MonoSink.success is invoked without a value. Unsubscribing cancels the running coroutine.