rxObservable

fun <T : Any> rxObservable(context: CoroutineContext = EmptyCoroutineContext, block: suspend ProducerScope<T>.() -> Unit): Observable<T>(source)

Creates cold observable that will run a given block in a coroutine. Every time the returned observable is subscribed, it starts a new coroutine.

Coroutine emits (ObservableEmitter.onNext) values with send, completes (ObservableEmitter.onComplete) when the coroutine completes or channel is explicitly closed and emits error (ObservableEmitter.onError) if coroutine throws an exception or closes channel with a cause. Unsubscribing cancels running coroutine.

Invocations of send are suspended appropriately to ensure that onNext is not invoked concurrently. Note that Rx 2.x Observable does not support backpressure.

Coroutine context can be specified with context argument. If the context does not have any dispatcher nor any other ContinuationInterceptor, then Dispatchers.Default is used. Method throws IllegalArgumentException if provided context contains a Job instance.