Spring反应式编程(一)

Java / 2020-03-22

反应式和命令式

String name = "jjboy";
        String upperCaseName = name.toUpperCase();
        String hello = "hello" + upperCaseName + "!";
        System.out.println(hello);

        Mono.just("jjboy")
                .map(n -> n.toUpperCase())
                .map(n -> "hello" + n + "!")
                .subscribe(s -> System.out.println(s));
  • 第一种是命令式写法,这种写法每一步都是需要前一步执行完才能开始执行的
  • 第二种写法就是反应式写法,这两种的输出结果是一样的,反应式就是把每一步分成多个子集,第一步中的一个子集执行完这个子集就可以往下走去执行下一步,不需要等这一步的其他的子集全执行完,这样的结果就是在同一个时间点上可能每一步都在执行,就是并发执行,反应式是比命令式快的,因为没有了等待的时间

Reactor

Reactor是一个反应式变成库,他是spring5反应式编程的基础
Reactor有两个核心,一个是Mono,一个是Flux
Flux代表零个,一个,或者是多个
Mono是针对数据项不超过一个的场景进行了优化


添加依赖

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>


常见的反应式基础操作

  • 创建操作
  • 组合操作
  • 转换操作
  • 逻辑操作

创建反应式操作

1 根据对象创建

// 根据对象创建
        Flux<String> flux = Flux.just("apple","orange","grape");

        flux.subscribe(
                f -> System.out.println(f)
        );

subscribe是订阅这个反映流
2 根据集合创建

String [] fruits = new String[] {"apple","origin"};

        Flux<String> flux = Flux.fromArray(fruits);

        Stream<String> fruits2 = Stream.of("Apple","Origin");

        Flux<String> flux1 = Flux.fromStream(fruits2);


从数组和java8的stream中创建反应流
3 根据范围创建

 //根据范围创建
        Flux<Integer> flux = Flux.range(0,5);

        flux.subscribe(
                s -> System.out.println(s)
        );

// 两秒一个 取出5个
        Flux<Long> flux = Flux.interval(Duration.ofSeconds(2)).take(5);

        flux.subscribe(
                s -> System.out.println(s)
        );

使用interval创建的流是从零开始的

合并反应式类型

//合并
        Flux<String> flux1 = Flux.just("Apple","Origin");
        Flux<String> flux2 = Flux.just("123","456");

        Flux<String> flux = flux1.mergeWith(flux2);

        flux.subscribe(
                s -> System.out.println(s)
        );

转换和过滤反应式流

//跳过三个
        Flux<Integer> flux = Flux.range(0,20);

        flux.skip(3)
                .subscribe(
                        s -> System.out.println(s)
                );

        //跳过0.1秒
        Flux<Integer> flux = Flux.range(0,2)
                .delayElements(Duration.ofMillis(1))
                .skip(Duration.ofMillis(100));

        StepVerifier.create(flux)
                .verifyComplete();

        //过滤 取偶数 跳过1个后取出两个
        Flux<Integer> flux = Flux.range(0,10)
                .filter(s -> s%2==0)
                .skip(1)
                .take(2);

        flux.subscribe(
                s -> System.out.println(s)
        );

        //删除重复的
        Flux<String> flux = Flux.just("123","123","123","345")
                .distinct();

        flux.subscribe(
                s -> System.out.println(s)
        );