ReactorContext
Wraps Reactor's Context into a CoroutineContext element for seamless integration between Reactor and kotlinx.coroutines. Context.asCoroutineContext puts Reactor's Context elements into a CoroutineContext, which can be used to propagate the information about Reactor's Context through coroutines.
This context element is implicitly propagated through subscribers' context by all Reactive integrations, such as mono, flux, asFlow, asPublisher and Flow.asFlux. Functions that subscribe to a reactive stream (e.g. kotlinx.coroutines.reactive.awaitFirst), too, propagate ReactorContext to the subscriber's Context.
Examples of Reactive context integration.
Propagating ReactorContext to Reactor's Context
val flux = myDatabaseService.getUsers()
.contextWrite { ctx -> println(ctx); ctx }
flux.awaitFirst() // Will print "null"
// Now add ReactorContext
withContext(Context.of("answer", "42").asCoroutineContext()) {
flux.awaitFirst() // Will print "Context{'key'='value'}"
}
Content copied to clipboard
Propagating subscriber's Context to ReactorContext:
val flow = flow {
println("Reactor context in Flow: " + currentCoroutineContext()[ReactorContext])
}
// No context
flow.asFlux()
.subscribe() // Will print 'Reactor context in Flow: null'
// Add subscriber's context
flow.asFlux()
.contextWrite { ctx -> ctx.put("answer", 42) }
.subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}"
Content copied to clipboard