<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId>
<artifactId>test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.8.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
@Slf4j
public class ReactorTest {
public static void main(String[] args) {
/* Mono.fromCallable(System::currentTimeMillis)
.doOnSuccess(r -> log.info("1"))
.subscribe(r -> log.info("2"));
Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(r -> log.info(""+r));
CompletableFuture.supplyAsync(System::currentTimeMillis).join();*/
//防止程序过早退出,放一个CountDownLatch拦住
/* CountDownLatch latch = new CountDownLatch(1);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}*/
Flux.just(1, 2, 3, 4, 5).subscribe(new Subscriber<Integer>() { // 1
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(6); // 2
}
@Override
public void onNext(Integer integer) {
log.info("onNext:" + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
log.info("onComplete");
}
});
}
}
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
@Slf4j
public class ReactorTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask();//实例化任务,传递参数
FutureTask<Integer> futureTask = new FutureTask<>(myTask);//将任务放进FutureTask里
//采用thread来开启多线程,futuretask继承了Runnable,可以放在线程池中来启动执行
Thread thread = new Thread(futureTask);
thread.start();
log.info("begin");
MySubscriber mySubscriber = new MySubscriber();
Flux.just(futureTask.get()).subscribe(mySubscriber);
Integer result = mySubscriber.getI();
log.info("result: "+ result);
}
}
@Slf4j
class MyTask implements Callable<Integer>{
//任务执行的动作
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
Random r = new Random();
int i = r.nextInt(5);
log.info("set i ="+i);
return i;
}
}
@Slf4j
class MySubscriber implements Subscriber<Integer>{
private Integer i;
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(2);
}
@Override
public void onNext(Integer integer) {
log.info("onNext "+integer);
i = integer;
log.info("onNext i="+i);
}
@Override
public void onError(Throwable throwable) {
log.info("onError");
}
@Override
public void onComplete() {
log.info("onComplete");
}
public Integer getI() {
return i;
}
}
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
@Slf4j
public class ReactorTestOld {
public static void main(String[] args) {
// Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(r -> log.info(""+r));
Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(new Subscriber<Long>() { // 1
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
//s.request(3); // 2
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long integer) {
log.info("onNext:" + integer);
}
@Override
public void onError(Throwable e) {
log.error("onError:" ,e);
}
@Override
public void onComplete() {
log.info("onComplete");
}
});
CountDownLatch latch = new CountDownLatch(1);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId>
<artifactId>test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>test</name>
<url>http://maven.apache.org</url>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>去掉默认配置
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> 引入log4j2依赖
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency> -->
<!--<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.7</version>
</dependency>-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<!--log4j相关配置开始-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.test.reactor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Slf4j
public class MonoTest {
public static void main(String[] args) {
/*Mono.fromCallable(System::currentTimeMillis)
.flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
.timeout(Duration.ofSeconds(3), errorHandler::fallback)
.doOnSuccess(r -> serviceM.incrementSuccess())
.subscribe(System.out::println);*/
/* Flux.range(1,4)
.filter(e -> {
log.info("filter thread:[{}]",Thread.currentThread().getName());
return e % 2 == 0;
})
.subscribeOn(Schedulers.newSingle("newSingle1"))
.publishOn(Schedulers.newSingle("newSingle2"))
.subscribe(e-> {
log.info("log thread:[{}]",Thread.currentThread().getName());
System.out.println(e);});*/
Flux.range(1,4)
//.delayUntil(MonoTest::request)
/*.subscribeOn(Schedulers.elastic())
.filter(e -> {
log.info("filter thread:[{}]",Thread.currentThread().getName());
return e % 2 == 0;
})*/
// .publishOn(Schedulers.newSingle("newSingle2"))
.doOnComplete(()->Mono.just(6).subscribeOn(Schedulers.elastic()).subscribe(e-> log.info(""+e)))
.subscribe(e-> log.info(""+e));
//LockSupport.park();
}
public static Mono<Void> request(int i) {
// run in a separate thread (RabbitTemplate uses blocking calls)
return Mono.fromRunnable(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("log thread:[{}]",Thread.currentThread().getName());
})
.subscribeOn(Schedulers.newSingle("newSingle"))
.then();
}
}