提交 a19501d7 编写于 作者: oldratlee's avatar oldratlee 🔥

add reactive integration demo

上级 18c92417
......@@ -171,6 +171,11 @@
<version>3.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.10.RELEASE</version>
</dependency>
</dependencies>
<distributionManagement>
......
......@@ -171,6 +171,11 @@
<version>3.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.10.RELEASE</version>
</dependency>
</dependencies>
<distributionManagement>
......
@file:JvmName("ReactorIntegrationDemo")
package com.alibaba.integration
import com.alibaba.ttl.TransmittableThreadLocal
import com.alibaba.ttl.threadpool.TtlExecutors
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
fun main() {
// TTL integration for Reactor
Schedulers.addExecutorServiceDecorator("TransmittableThreadLocal") { _, scheduledExecutorService ->
TtlExecutors.getTtlScheduledExecutorService(scheduledExecutorService)
}
val ttl = TransmittableThreadLocal<String?>()
ttl.set("init")
// expand thread pool
Flux.range(1, 20)
.flatMap {
Flux.just(it)
.subscribeOn(Schedulers.parallel())
.doOnNext {
Thread.sleep(2)
println("expand thread pool: [${Thread.currentThread().name}] $it ${ttl.get()}")
}
}
.collectList()
.block()
ttl.set("jerry")
Flux.just("Hello")
.subscribeOn(Schedulers.parallel())
.doOnNext {
println("[${Thread.currentThread().name}] $it ${ttl.get()}")
}
.collectList()
.block()
ttl.set("tom")
Flux.just("Hello")
.subscribeOn(Schedulers.parallel())
.doOnNext {
println("[${Thread.currentThread().name}] $it ${ttl.get()}")
}
.collectList()
.block()
}
@file:JvmName("RxJavaIntegrationDemo")
package com.alibaba.integration
import com.alibaba.ttl.TransmittableThreadLocal
import com.alibaba.ttl.TtlRunnable
import io.reactivex.Flowable
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.Schedulers
fun main() {
val ttl = TransmittableThreadLocal<String?>()
ttl.set("init")
// expand thread pool
Flowable.range(1, 20)
.flatMap {
Flowable.just(it)
.observeOn(Schedulers.computation())
.doOnNext {
Thread.sleep(2)
println("expand thread pool: [${Thread.currentThread().name}] $it ${ttl.get()}")
}
}
.toList()
.blockingGet()
// TTL integration for RxJava
RxJavaPlugins.setScheduleHandler(TtlRunnable::get)
ttl.set("jerry")
Flowable.just("Hello")
.observeOn(Schedulers.computation())
.doOnNext {
println("[${Thread.currentThread().name}] $it ${ttl.get()}")
}
.toList()
.blockingGet()
ttl.set("tom")
Flowable.just("Hello")
.subscribeOn(Schedulers.computation())
.doOnNext {
println("[${Thread.currentThread().name}] $it ${ttl.get()}")
}
.toList()
.blockingGet()
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册