Package-level declarations

Utilities for Reactor.

Types

Link copied to clipboard

Wraps Reactor's Context into a CoroutineContext element for seamless integration between Reactor and kotlinx.coroutines. Context.asCoroutineContext puts Reactor's Context elements into a CoroutineContext, which can be used to propagate the information about Reactor's Context through coroutines.

Link copied to clipboard

Implements CoroutineDispatcher on top of an arbitrary Scheduler.

Functions

Link copied to clipboard

Wraps the given ContextView into ReactorContext, so it can be added to the coroutine's context and later used via coroutineContext[ReactorContext].

Link copied to clipboard

Converts an instance of Scheduler to an implementation of CoroutineDispatcher.

Link copied to clipboard
fun <T : Any> Flow<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T>

Converts the given flow to a cold flux. The original flow is cancelled when the flux subscriber is disposed.

Link copied to clipboard
fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T>

Converts this deferred value to the hot reactive mono that signals success or error.

Converts this job to the hot reactive mono that signals with success when the corresponding job completes.

Link copied to clipboard
suspend fun <T> Mono<T>.awaitSingle(): T

Awaits the single value from the given Mono without blocking the thread and returns the resulting value, or, if this Mono has produced an error, throws the corresponding exception.

Link copied to clipboard
suspend fun <T> Mono<T>.awaitSingleOrNull(): T?

Awaits the single value from the given Mono without blocking the thread and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value, null is returned.

Link copied to clipboard
fun <T> flux(context: CoroutineContext = EmptyCoroutineContext, block: suspend ProducerScope<T>.() -> Unit): Flux<T>

Creates a cold reactive Flux that runs the given block in a coroutine. Every time the returned flux is subscribed, it starts a new coroutine in the specified context. The coroutine emits (Subscriber.onNext) values with send, completes (Subscriber.onComplete) when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed, emits the error (Subscriber.onError) and closes the channel with the cause. Unsubscribing cancels the running coroutine.

Link copied to clipboard
fun <T> mono(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T?): Mono<T>

Creates a cold mono that runs a given block in a coroutine and emits its result. Every time the returned mono is subscribed, it starts a new coroutine. If the result of block is null, MonoSink.success is invoked without a value. Unsubscribing cancels the running coroutine.