Spring WebFlux
- Spring5中加入了反应式开发框架-WebFlux
- 其实在web中加入反应式就是把之前的领域类型或领域类型集变成反应流
- 控制层接受和返回变成反应流
- 数据库也是存储反应流查询出来的也是反应流
- 目前关系型数据库还不支持反应式开发
- 但是你可以在操作数据库之前把领域类型转为反应流,这样也可以增加上层执行的速度
- 非关系型数据库比如MongoDB是支持反映式编程的
- 异步web框架通过事件轮询机制一个线程可以执行多个请求
- 当请求需要执行成本高昂的操作时,事件轮询为其注册一个回调,然后去接受其他的请求
Spring WebFlux依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
- WebFlux默认使用的嵌入式服务器是Netty而不是Tomcat
编写反应式控制器
package com.springstudy.demo.controller;
import com.springstudy.demo.model.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.servlet.function.RouterFunctions;
import org.springframework.web.servlet.function.ServerRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;
import java.time.Duration;
import static jdk.nashorn.internal.runtime.PropertyDescriptor.GET;
import static org.springframework.web.servlet.function.RequestPredicates.GET;
import static org.springframework.web.servlet.function.RequestPredicates.POST;
import static org.springframework.web.servlet.function.RouterFunctions.route;
/**
* @ClassName FluxController
* @Description
* @Auther liuxiansen
* @Date 2020/3/21 7:26 下午
**/
@RestController
@RequestMapping(value = "/api")
@CrossOrigin()
@Slf4j
public class FluxController {
@PostMapping(value = "/login")
public Flux<String> login(@RequestBody @Valid Flux<User> user){
return Flux.just("success");
}
}
就是把输入和输出变为反应流
编写函数式的控制器
package com.springstudy.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import static org.springframework.data.repository.core.support.RepositoryComposition.just;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
/**
* @ClassName RouterFunctionConfig
* @Description
* @Auther liuxiansen
* @Date 2020/3/21 7:29 下午
**/
@Configuration
public class RouterFunctionConfig {
@Bean
public RouterFunction<?> helloRouterFunxtion(){
return route(GET("/hello"),serverRequest -> ok().body(just("hello word"),String.class))
.andRoute(POST("/index"),serverRequest -> ok().body(just("123"),String.class));
}
}
向路由中添加访问路径
public RouterFunction<?> routerFunction(){
return route(POST("/index"),this::index)
.andRoute(POST("/hello"),this::hello);
}
public Mono<User> index(ServerRequest request){
return Flux.fromArray(new User[1]).next();
}
public Mono<User> hello(ServerRequest request){
return Flux.fromArray(new User[1]).next();
}
反应式消费REST API
Spring5提供了WebClient 他是RestTemplate的反应式版本
WebClient可以访问和接收外部的API
WebClient的通用使用模式
- 创建WebClient实例(或者注入WebClient实例)
- 指定要发送请求的HTTP方法
- 指定请求中的URI和头信息
- 提交请求
- 消费相应
获取资源
Mono<User> userMono = WebClient.create()
.get()
.uri("http://localhost:8070/api/index")
.retrieve()
.bodyToMono(User.class);
userMono.subscribe(i -> System.out.println(i));
发送get请求并消费请求,retrieve是执行请求,subscribe是订阅消费请求
在消费之前也可以对反应流进行过滤筛选等操作
使用基础的URI发送请求
我们可以配置WebClient每次发送请求都使用相同的基础URI
package com.springstudy.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
/**
* @ClassName WebClientConfig
* @Description
* @Auther liuxiansen
* @Date 2020/3/21 9:56 下午
**/
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient(){
return WebClient.create("http://localhost:8070/api");
}
}
这样每次发送请求都是在基础的URI上拼接上后面的地址
比如:
@Autowired
private WebClient webClient;
Flux<User> userFlux = webClient
.post()
.uri("/login")
.retrieve()
.bodyToFlux(User.class);
userFlux.timeout(Duration.ofMillis(100))
.subscribe(s -> System.out.println(s),e -> {
System.out.println("timeout");
});
首先需要注入刚才的WebClient bean
这里请求的地址其实是:http://localhost:8070/api/login
并且对请求进行了超时处理,就是对反应流进行时间的限制,0.1秒后关闭请求并输出错误
发送资源
public void sendFlux(){
@Autowired
private WebClient webClient;
//发送资源
Flux<User> userFlux = Flux.just(new User());
Flux<User> result = webClient
.post()
.uri("/login")
.body(userFlux,User.class)
.retrieve()
.bodyToFlux(User.class);
result.subscribe(s -> System.out.println(s));
}
首先也是需要注入bean
然后发送post请求 body是在发送的请求体中加入的内容
消费返回的反应流并输出
删除资源
public void deleteFlux(){
//删除
Mono<Void> deleteFlux = webClient
.delete()
.uri("/delete/{}",new User())
.retrieve()
.bodyToMono(Void.class) ;
}
处理错误
public void handleError(){
//处理错误
Mono<Void> deleteFlux = webClient
.delete()
.uri("/delete/{}",new User())
.retrieve()
.bodyToMono(Void.class);
deleteFlux.subscribe(
s -> {
System.out.println(s);
},
error -> {
log.error(String.valueOf(error));
}
);
}
在subscribe 注册消费者的时候也要注册一个错误消费者
也可以使用onStatus判断请求是否失败
//使用状态码判断错误
Mono<Void> deleteMono = webClient
.delete()
.uri("/delete/{}",new User())
.retrieve()
.onStatus(HttpStatus::is4xxClientError,
error -> Mono.just(new UnknownError())
)
.bodyToMono(Void.class);
如果请求响应的是400级别的 返回一个记录错误的反应流
也可以确定到 404
//使用状态码判断错误
Mono<Void> deleteMono1 = webClient
.delete()
.uri("/delete/{}",new User())
.retrieve()
.onStatus(status -> status == HttpStatus.NOT_FOUND,
error -> Mono.just(new UnknownError())
)
.bodyToMono(Void.class);
NOT_FOUND 就是请求响应是404
保护反应式Web Api
使用SpringSecuri 保护WebApi
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
使用WebSecurity 保护api时 反应式api和非反应式api有一些不同
如果是非反应式api
package com.springstudy.demo.config;
import com.springstudy.demo.service.UserService;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
/**
* @ClassName SecurityConfig
* @Description
* @Auther liuxiansen
* @Date 2020/3/18 8:52 下午
**/
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(AuthenticationManagerBuilder auth) throws Exception {
// auth.inMemoryAuthentication()
// .withUser("liujijiang")
// .password("liujijiang123")
// .authorities("ROLE_USER")
// .and()
// .withUser("jinweizhi")
// .password("liujijiang123")
// .authorities("ROLE_USER");
//
// auth.ldapAuthentication()
// .userSearchFilter("(uid={0})")
// .groupSearchFilter("member={0}");
auth.userDetailsService(userDetailsService());
}
@Override
protected void configure(HttpSecurity http) throws Exception {
http.authorizeRequests()
.antMatchers("/design","/orders")
.hasRole("ROLE_USER")
.antMatchers("/","/**")
.permitAll()
.and()
.formLogin()
.loginPage("/login")
.defaultSuccessUrl("/design",true);
}
}
当访问/design /orders 用户需要有权限ROLE_USER
并且自定义了登录页
下面是保护反应式的APi
package com.springstudy.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
/**
* @ClassName WebFluxSecurityConfig
* @Description
* @Auther liuxiansen
* @Date 2020/3/21 10:17 下午
**/
@Configuration
public class WebFluxSecurityConfig {
@Bean
public SecurityWebFilterChain webFilterChain(ServerHttpSecurity http){
return http.authorizeExchange()
.pathMatchers("/login","index").hasRole("USER")
.anyExchange().permitAll()
.and()
.build();
}
}
定义一个过滤流
因为这是声明一个bean 而不是重写框架方法,所以最后build 把所有的安全规则聚合到返回的对象中