Spring反应式API

Java / 2020-03-22

Spring WebFlux

  • Spring5中加入了反应式开发框架-WebFlux
  • 其实在web中加入反应式就是把之前的领域类型或领域类型集变成反应流
  • 控制层接受和返回变成反应流
  • 数据库也是存储反应流查询出来的也是反应流
  • 目前关系型数据库还不支持反应式开发
  • 但是你可以在操作数据库之前把领域类型转为反应流,这样也可以增加上层执行的速度
  • 非关系型数据库比如MongoDB是支持反映式编程的
  • 异步web框架通过事件轮询机制一个线程可以执行多个请求
  • 当请求需要执行成本高昂的操作时,事件轮询为其注册一个回调,然后去接受其他的请求

Spring WebFlux依赖

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>

  • WebFlux默认使用的嵌入式服务器是Netty而不是Tomcat

编写反应式控制器

package com.springstudy.demo.controller;

import com.springstudy.demo.model.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.servlet.function.RouterFunctions;
import org.springframework.web.servlet.function.ServerRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.validation.Valid;

import java.time.Duration;

import static jdk.nashorn.internal.runtime.PropertyDescriptor.GET;
import static org.springframework.web.servlet.function.RequestPredicates.GET;
import static org.springframework.web.servlet.function.RequestPredicates.POST;
import static org.springframework.web.servlet.function.RouterFunctions.route;

/**
 * @ClassName FluxController
 * @Description
 * @Auther liuxiansen
 * @Date 2020/3/21 7:26 下午
 **/
@RestController
@RequestMapping(value = "/api")
@CrossOrigin()
@Slf4j
public class FluxController {

    @PostMapping(value = "/login")
    public Flux<String> login(@RequestBody @Valid Flux<User> user){
        return Flux.just("success");
    }
}


就是把输入和输出变为反应流


编写函数式的控制器

package com.springstudy.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;

import static org.springframework.data.repository.core.support.RepositoryComposition.just;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

/**
 * @ClassName RouterFunctionConfig
 * @Description
 * @Auther liuxiansen
 * @Date 2020/3/21 7:29 下午
 **/
@Configuration
public class RouterFunctionConfig {

    @Bean
    public RouterFunction<?> helloRouterFunxtion(){
        return route(GET("/hello"),serverRequest -> ok().body(just("hello word"),String.class))
                .andRoute(POST("/index"),serverRequest -> ok().body(just("123"),String.class));
    }
}

向路由中添加访问路径

public RouterFunction<?> routerFunction(){
        return route(POST("/index"),this::index)
                .andRoute(POST("/hello"),this::hello);

    }

    public Mono<User> index(ServerRequest request){
        return Flux.fromArray(new User[1]).next();
    }

    public Mono<User> hello(ServerRequest request){
        return Flux.fromArray(new User[1]).next();
    }


反应式消费REST API

Spring5提供了WebClient 他是RestTemplate的反应式版本
WebClient可以访问和接收外部的API
WebClient的通用使用模式

  • 创建WebClient实例(或者注入WebClient实例)
  • 指定要发送请求的HTTP方法
  • 指定请求中的URI和头信息
  • 提交请求
  • 消费相应

获取资源


Mono<User> userMono = WebClient.create()
                .get()
                .uri("http://localhost:8070/api/index")
                .retrieve()
                .bodyToMono(User.class);

        userMono.subscribe(i -> System.out.println(i));

发送get请求并消费请求,retrieve是执行请求,subscribe是订阅消费请求
在消费之前也可以对反应流进行过滤筛选等操作

使用基础的URI发送请求

我们可以配置WebClient每次发送请求都使用相同的基础URI


package com.springstudy.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;

/**
 * @ClassName WebClientConfig
 * @Description
 * @Auther liuxiansen
 * @Date 2020/3/21 9:56 下午
 **/
@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient(){
        return WebClient.create("http://localhost:8070/api");
    }
}


这样每次发送请求都是在基础的URI上拼接上后面的地址
比如:


 @Autowired
    private WebClient webClient;



 Flux<User> userFlux = webClient
                .post()
                .uri("/login")
                .retrieve()
                .bodyToFlux(User.class);

        userFlux.timeout(Duration.ofMillis(100))
                .subscribe(s -> System.out.println(s),e -> {
                    System.out.println("timeout");
                });

首先需要注入刚才的WebClient bean
这里请求的地址其实是:http://localhost:8070/api/login
并且对请求进行了超时处理,就是对反应流进行时间的限制,0.1秒后关闭请求并输出错误

发送资源

public void sendFlux(){

 @Autowired
    private WebClient webClient;


        //发送资源
        Flux<User> userFlux = Flux.just(new User());

        Flux<User> result = webClient
                .post()
                .uri("/login")
                .body(userFlux,User.class)
                .retrieve()
                .bodyToFlux(User.class);
        
        result.subscribe(s -> System.out.println(s));
    }

首先也是需要注入bean
然后发送post请求 body是在发送的请求体中加入的内容
消费返回的反应流并输出

删除资源


public void deleteFlux(){
        
        //删除
        Mono<Void> deleteFlux = webClient
                .delete()
                .uri("/delete/{}",new User())
                .retrieve()
                .bodyToMono(Void.class) ;
    
        
    }

处理错误

public void handleError(){
        
        //处理错误
        Mono<Void> deleteFlux = webClient
                .delete()
                .uri("/delete/{}",new User())
                .retrieve()
                .bodyToMono(Void.class);
        
        deleteFlux.subscribe(
                s -> {
                    System.out.println(s);
                },
                error -> {
                    log.error(String.valueOf(error));
                }
        );
    }

在subscribe 注册消费者的时候也要注册一个错误消费者

也可以使用onStatus判断请求是否失败

//使用状态码判断错误
        Mono<Void> deleteMono = webClient
                .delete()
                .uri("/delete/{}",new User())
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError,
                            error -> Mono.just(new UnknownError())
                        )
                .bodyToMono(Void.class);

如果请求响应的是400级别的 返回一个记录错误的反应流

也可以确定到 404

//使用状态码判断错误
        Mono<Void> deleteMono1 = webClient
                .delete()
                .uri("/delete/{}",new User())
                .retrieve()
                .onStatus(status -> status == HttpStatus.NOT_FOUND,
                        error -> Mono.just(new UnknownError())
                )
                .bodyToMono(Void.class);

NOT_FOUND 就是请求响应是404

保护反应式Web Api

使用SpringSecuri 保护WebApi

引入依赖

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-security</artifactId>
		</dependency>

使用WebSecurity 保护api时 反应式api和非反应式api有一些不同

如果是非反应式api

package com.springstudy.demo.config;

import com.springstudy.demo.service.UserService;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;

/**
 * @ClassName SecurityConfig
 * @Description
 * @Auther liuxiansen
 * @Date 2020/3/18 8:52 下午
 **/
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(AuthenticationManagerBuilder auth) throws Exception {
//        auth.inMemoryAuthentication()
//                .withUser("liujijiang")
//                .password("liujijiang123")
//                .authorities("ROLE_USER")
//                .and()
//                .withUser("jinweizhi")
//                .password("liujijiang123")
//                .authorities("ROLE_USER");
//
//        auth.ldapAuthentication()
//                .userSearchFilter("(uid={0})")
//                .groupSearchFilter("member={0}");
        auth.userDetailsService(userDetailsService());
    }

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http.authorizeRequests()
                .antMatchers("/design","/orders")
                .hasRole("ROLE_USER")
                .antMatchers("/","/**")
                .permitAll()
                .and()
                .formLogin()
                .loginPage("/login")
                .defaultSuccessUrl("/design",true);
    }
}


当访问/design /orders 用户需要有权限ROLE_USER
并且自定义了登录页

下面是保护反应式的APi

package com.springstudy.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;

/**
 * @ClassName WebFluxSecurityConfig
 * @Description
 * @Auther liuxiansen
 * @Date 2020/3/21 10:17 下午
 **/
@Configuration
public class WebFluxSecurityConfig {

    @Bean
    public SecurityWebFilterChain webFilterChain(ServerHttpSecurity http){

        return http.authorizeExchange()
                .pathMatchers("/login","index").hasRole("USER")
                .anyExchange().permitAll()
                .and()
                .build();
    }
}

定义一个过滤流
因为这是声明一个bean 而不是重写框架方法,所以最后build 把所有的安全规则聚合到返回的对象中