SpringMVC WebFlux 高级教程(五)

原文:Pro Spring MVC with WebFlux

协议:CC BY-NC-SA 4.0

十、使用 Spring WebFlux 构建反应式应用

在前一章中,函数式反应式编程是出于需要而引入的。Spring WebFlux 是一个用于编写 Spring 反应式应用的功能性反应式框架,为本章中的应用编写的代码是功能性的,并且使用反应式流。

Spring WebFlux 非常适合构建需要轻松流式传输的应用。上一章介绍的例子使用反应流通过 HTTP 连接向客户机发送数据。但是客户端只接收数据,没有做任何事情来控制发射速率。本章通过将部分书店应用从 MVC 迁移到 Spring WebFlux,引入一个提供新书发布信息的web client,并为应用添加聊天功能,来探索 Spring WebFlux 的功能。

从 Spring Web MVC 到 Spring WebFlux

将应用逻辑从经典迁移到反应式时,方法处理的类型会发生变化。陈述也会改变;它们不再是命令式的,而是声明式的,它们的代码只有在发出信号时才会执行。对于多层应用,可以从较低层(数据访问)到较高层(表示层),或者从较高层到较低层进行转换。为了让您热身,让我们从底层开始。

迁移数据访问层

直到不久前,关系数据库还不支持被动访问。传统的数据库 JDBC 驱动程序不是反应式的,因此在反应式应用中,它们代表了影响整个应用行为的阻塞 I/O 组件。不过,去年推出了 R2DBC 1 项目,将反应式编程 API 引入关系数据库。目前,大多数使用的关系数据库都有 R2DBC 实现。但是,即使在三次正式发布之后,这个项目仍然不稳定。

这就只剩下一个选择:放弃关系数据库,转而使用支持反应式访问的 NoSQL 数据库。你可能会使用 Spring WebFlux 来编写微服务应用。具有强类型和列表之间刚性关系的关系数据库与微服务架构的水平可伸缩性和集群化的要求不兼容。Spring 支持一些现代 NoSQL 数据库——couch base、Redis 和 Cassandra,但是 Spring 最喜欢的是 MongoDB。 2 作为本章案例研究的反应书店应用使用 MongoDB。(本章的项目包含一个 README.adoc 文件,其中包含在本地安装 MongoDB 的说明。)

在 Spring Boot 应用中,application.yml配置文件填充了允许与 MongoDB 数据库集成的属性。但是,它们对于 web 环境来说并不重要,所以这里不做介绍。

在 Spring Boot 应用中使用 MongoDB 数据库需要对项目类路径的spring-boot-starter-data-mongodb-reactive依赖和对代码的一些修改。下面列出了这些更改。

  • 实体类变成了域类,这意味着 ID 类型被限制为StringBigDecimal。如果你尝试使用任何其他类型,就会抛出一个类型为org.springframework.dao.InvalidDataAccessApiUsageException的异常。

  • 因为 NoSQL 数据库不是关系数据库,所以数据库结构改变以保持强链接的数据。

  • 数据库表成为数据库集合。

  • JPA/hibernate 注释被替换为 Spring Data MongoDB 注释。

  • JpaRepository<T, ID>扩展名被替换为ReactiveMongoRepository<T, ID>ReactiveCrudRepository<T, ID>

清单 10-1 中显示了对实现的Book类的更改。应用中的其他类也有类似的变化。

package com.apress.prospringmvc.bookstore.document;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import java.math.BigDecimal;
// other imports omitted

@Document(collection="book")
public class Book {
    @Id
    private String id;

    private String title;
    private String description;
    private BigDecimal price;
    private Integer year;
    private String author;

    @Indexed(unique = true)
    private String isbn;
    private String category;

    // getters and setters omitted
}

Listing 10-1The Book MongoDB Document Class

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

ReactiveCrudRepository<T, ID>接口是org.springframework.data.repository.reactive包的一部分,包含返回Publisher<T>实现的方法声明,对于 Spring,这意味着Flux<T>Mono<T>ReactiveSortingRepository<T, ID>org.springframework.data.repository.reactive包的一部分,它扩展了ReactiveCrudRepository<T, ID>来提供一些额外的为 MongoDB 优化的方法模板。用于被动数据访问的BookRepository接口使用 MongoDB 查询从数据库中选择数据,如清单 10-2 所示。

package com.apress.prospringmvc.bookstore.repository;

import com.apress.prospringmvc.bookstore.document.Book;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import reactor.core.publisher.Flux;

/**
 * Created by Iuliana Cosmina on 28/06/2020
 */
public interface BookRepository extends ReactiveMongoRepository<Book, String>{

    @Query("{'category': { '$regex' : ?0 } }")
    Flux<Book> findByCategory(String category);

    @Query(value= "{}", fields ="{'id': 1, 'isbn' : 1, 'category'  :1 }")
    Flux<Book> findAllLight();
}

Listing 10-2The BookRepository Reactive Interface for Accessing the MongoDB Book Collection

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

@Query注释中的参数用于直接在存储库方法上声明查找器查询。

迁移服务层

如果应用需要一个服务层,那么它的组件也必须修改成反应性的。除此之外,实现也必须改变以适应新的数据库结构。在清单 10-3 中,描述了BookstoreServiceImpl的一些方法。findBooksByCategory(String)被修改为支持类型为String而不是Category的参数,这是因为没有Category table。结果作为Flux<Book>返回。

修改findBooks(BookSearchCriteria)来创建一个 MongoDB 查询,并将其传递给BookRepository来过滤结果。结果被返回为Flux<Book>

findOrdersForAccountId(String accountId)被修改为从account集合中获取账户实例的订单,因为没有order集合。

package com.apress.prospringmvc.bookstore.service;

import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// other imports omitted

/**
 * Created by Iuliana Cosmina on 28/06/2020
 */
@Service
@Transactional(readOnly = true)
public class BookstoreServiceImpl implements  BookstoreService {

    @Override
    public Mono<Book> findBook(String id) {
        return this.bookRepository.findById(id);
    }

    @Override
    public Flux<Book> findBooksByCategory(String category) {
        return this.bookRepository.findByCategory(category);
    }

    @Override
    public Flux<Book> findBooks(BookSearchCriteria bookSearchCriteria) {
        Query query = new Query();
        if (bookSearchCriteria.getTitle() != null) {
            query.addCriteria(Criteria.where("title")
                .is(bookSearchCriteria.getTitle()));
        }
        if (bookSearchCriteria.getCategory() != null) {
            query.addCriteria(Criteria.where("category")
                .is(bookSearchCriteria.getTitle()));
        }
        return bookRepository.findAll(query);
    }

    @Override
    public Mono<List<Order>> findOrdersForAccountId(String accountId) {
        return this.accountRepository
            .findById(accountId).map(Account::getOrders);
    }

    //other code omitted
}

Listing 10-3The BookRepository Reactive Interface for Accessing the MongoDB Book Collection

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

一旦使用了反应式数据库,迁移数据访问和服务层就变得很容易,几乎没有什么困难。只要确保您总是返回Publisher<T>实例,就大功告成了。

迁移 Web 图层

迁移 web 图层需要进行一些更改,因为当您不知道渲染了多少数据时,渲染视图会很困难。过去,AJAX(异步 JavaScript 和 XML)解决了这个问题,但是 AJAX 使我们只能响应页面上的用户操作来更新页面。它没有解决来自服务器的更新问题。由于反应式通信涉及双向数据流,因此需要新的 web 库。做这件事的方法不止一种,所以我们开始研究吧,好吗?

反应式模板引擎的配置

让我们举一个在前面章节中使用的非常简单的例子:书店应用的index页面可以显示应用上下文中的 beans 列表。在前面的章节中,IndexController包含了一个方法,该方法使用一个包含应用上下文中所有 beans 名称的List<String>来填充模型。

要使这个控制器是可反应的,列表必须被一个Flux<String>,代替,视图也必须是可反应的。幸运的是,百里香叶可以配置为支持反应式视图。模板的语法没有改变;只有视图解析器和模板引擎必须替换为它们的反应式通信器。

在清单 10-4 中,描述了一个用于反应式百里香视图支持的 Spring 配置类。这个类有点冗长。其中设置的大多数属性值已经被声明为默认值,但是这个类是这样编写的,以便从开发的角度来看清楚什么是可定制的。

package com.apress.prospringmvc.bookstore.config;

import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.ViewResolverRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.thymeleaf.spring5.ISpringWebFluxTemplateEngine;
import org.thymeleaf.spring5.SpringWebFluxTemplateEngine;
import org.thymeleaf.spring5.templateresolver.SpringResourceTemplateResolver;
import org.thymeleaf.spring5.view.reactive.ThymeleafReactiveViewResolver;
import org.thymeleaf.templateresolver.ITemplateResolver;
// other imports omitted

@Configuration
public class ReactiveThymeleafWebConfig implements
            ApplicationContextAware, WebFluxConfigurer {

    ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext context) {
        this.context = context;
    }

    @Bean
    public ITemplateResolver thymeleafTemplateResolver() {
        var resolver = new SpringResourceTemplateResolver();
        resolver.setApplicationContext(this.context);
        resolver.setPrefix("classpath:templates/");
        resolver.setSuffix(".html");
        resolver.setTemplateMode(TemplateMode.HTML);
        resolver.setCacheable(false);
        resolver.setCheckExistence(false);
        return resolver;

    }

    @Bean
    public ISpringWebFluxTemplateEngine thymeleafTemplateEngine() {
        var templateEngine = new SpringWebFluxTemplateEngine();
        templateEngine.setTemplateResolver(thymeleafTemplateResolver());
        return templateEngine;
    }

    @Bean
    public ThymeleafReactiveViewResolver thymeleafReactiveViewResolver() {
        var viewResolver = new ThymeleafReactiveViewResolver();
        viewResolver.setTemplateEngine(thymeleafTemplateEngine());
        viewResolver.setOrder(1);
        viewResolver.setResponseMaxChunkSizeBytes(8192);
        return viewResolver;
    }

    @Override
    public void configureViewResolvers(ViewResolverRegistry registry) {
        registry.viewResolver(thymeleafReactiveViewResolver());
    }

}

Listing 10-4Spring Configuration Class for Reactive Thymeleaf Views Support

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

负责解析模板的模板解析器 bean 不需要是被动的。因为模板解析器 bean 包含来自应用配置的数据,所以在使用 Spring Boot 时可以完全删除它,并通过用@EnableConfigurationProperties(ThymeleafProperties.class)注释配置类来替换它。

使用模板解析器的模板引擎是反应式的,是ISpringWebFluxTemplateEngine的一个实现。为了与 Spring MVC 类型集成而设计的SpringTemplateEngine,必须替换为SpringWebFluxTemplateEngine,它是ISpringWebFluxTemplateEngine接口的一个实现,旨在与 Spring WebFlux 集成,并以一种反应友好的方式执行模板。由于模板引擎只需要一个模板解析器,我们也可以跳过这个 bean 声明,让 Spring Boot 来配置它。支持反应视图的配置类可以简化,如清单 10-5 所示。

package com.apress.prospringmvc.bookstore.config;

import org.springframework.boot.autoconfigure.thymeleaf.ThymeleafProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
// other imports omitted

@Configuration
@EnableConfigurationProperties(ThymeleafProperties.class)
public class ReactiveThymeleafWebConfig implements
         WebFluxConfigurer {

    private final ISpringWebFluxTemplateEngine thymeleafTemplateEngine;

    public ReactiveThymeleafWebConfig(ISpringWebFluxTemplateEngine templateEngine) {
        this.thymeleafTemplateEngine = templateEngine;
    }

    @Bean
    public ThymeleafReactiveViewResolver thymeleafReactiveViewResolver() {
        var viewResolver = new ThymeleafReactiveViewResolver();
        viewResolver.setTemplateEngine(thymeleafTemplateEngine);
        viewResolver.setOrder(1);
        return viewResolver;
    }
    //other code omitted
}

Listing 10-5Simplified Spring Configuration Class for Reactive Thymeleaf Views Support

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

@EnableConfigurationProperties注释支持百里香配置属性。用@ConfigurationProperties(prefix = "spring.thymeleaf"),ThymeleafProperties类进行了注释,使其成为百里香属性的配置 bean。这意味着你可以使用application.propertiesapplication.yml来配置百里香。这些属性以spring.thymeleaf为前缀,允许您配置模板解析器 bean,而无需编写额外的代码。清单 10-6 是相当于清单 10-4 中 Java 配置的 YML 配置。

spring:
  thymeleaf:
    prefix: classpath:templates/
    suffix: .html
    mode: HTML
    cache: false
    check-template: false
    reactive:
        max-chunk-size: 8192

Listing 10-6Simplified Spring Configuration Class for Reactive Thymeleaf Views Support (snippet from the application.yml file)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

有些百里香属性在ThymeleafProperties类中被赋予了默认值,所以这意味着如果你的应用使用默认值,百里香 YML 配置部分可以完全跳过,清单 10-5 中的配置仍然有效。

ThymeleafReactiveViewResolverorg.springframework.web.reactive.result.view.ViewResolver接口的实现,即 Spring WebFlux 视图解析器接口。responseMaxChunkSizeBytes是您应该感兴趣的属性,因为它定义了由百里香引擎生成并作为输出传递给服务器的输出org.springframework.core.io.buffer.DataBuffer实例的最大大小。这一点很重要,因为如果您有大量数据通过Flux<T>发送,您可能希望一点一点地呈现视图,而不是在响应完成之前保持网页处于加载状态。尤其是因为这是反应式沟通的主要思想之一。

百里香叶有三种操作模式。

  • FULL :当没有配置最大块大小,也没有数据驱动上下文变量时,模板输出在内存中作为单个块生成,然后作为响应发送。这与非反应行为非常相似。

  • CHUNKED :已建立配置的最大块大小限制,但尚未指定数据驱动程序上下文变量。模板以块的形式生成,大小大致等于配置的大小,并发送给客户端。在发送一个块后,百里香引擎停止并等待服务器请求更多的块;是的,这是背压的一种实现。

  • DATA-DRIVEN``:``data-driver变量以反应式数据流的形式包装异步对象,这意味着驱动模板的反应式友好执行。当在返回视图逻辑名称的处理程序方法中声明这种变量时,百里香引擎被设置为DATA-DRIVEN模式,这意味着解析后的视图作为数据流发送到客户端。支持数据驱动变量的模板必须包含该变量的迭代(th:each)。

重写IndexController使其反应的方法使用了一个data-driver变量。这意味着,不是向模型添加一个List<String>,而是需要一个org.thymeleaf.spring5.context.webflux.IReactiveDataDriverContextVariable的实例。这个变量包含在一个发出应用上下文 bean 名称的Flux<String>实例中。实现该接口的上下文变量以反应式数据流的形式包装Publisher<T>实例,以驱动模板的反应式友好执行。 3

为了使这种方法的结果清晰,值的发出被减慢到每 200 毫秒一次。这意味着当打开http://localhost:8080/ URL 时,您应该看到列出 bean 名称的页面部分逐渐加载。

清单 10-7 中描述了无功IndexController的实现。

package com.apress.prospringmvc.bookstore.controller;

import org.thymeleaf.spring5.context.webflux.IReactiveDataDriverContextVariable;
import org.thymeleaf.spring5.context.webflux.ReactiveDataDriverContextVariable;
import reactor.core.publisher.Flux;
// other imports omitted

@Controller
public class IndexController implements ApplicationContextAware {

    private ApplicationContext ctx;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
                    throws BeansException {
        ctx = applicationContext;
    }

    @GetMapping("/")
    public String index(final Model model) {
        List<String> beans = Arrays.stream(ctx.getBeanDefinitionNames())
            .sorted()
            .collect(Collectors.toList());
        Flux<String> flux = Flux.fromIterable(beans)
            .delayElements(Duration.ofMillis(200));
        IReactiveDataDriverContextVariable dataDriver =
                new ReactiveDataDriverContextVariable( flux,10);

        model.addAttribute("beans", dataDriver);
        return "index";
    }
}

Listing 10-7Reactive IndexController with Data-Driver Variable

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

ReactiveDataDriverContextVariableIReactiveDataDriverContextVariable接口的基本实现。使用IReactiveDataDriverContextVariable,,我们将百里香设置为数据驱动模式,这意味着 HTML 项目是以一种反应友好的方式产生的。

index.html模板不需要任何改变。Thymeleaf 使用相同的 HTML 结构迭代集合和反应性数据集。流发出的值被添加到嵌套在可滚动的<div>中的<ul>列表中,以保持页面大小等于屏幕大小。${beans}变量是对百里香数据驱动变量暴露的反应流的引用。

<!-- other HTML parts omitted -->
<div class="scrollable">
    <ul th:each="bean : ${beans}">
        <li th:text="${bean}"> </li>
    </ul>
</div>

Listing 10-8The Thymeleaf Template Snippet to Render the Bean Names (snippet from the index.html file

)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

尽管如此,逐渐加载页面并不是最好的选择,因为页面设计的某些部分直到交互通信完成后才会呈现。这使用户感觉好像他们是通过 ADSL 连接浏览这个页面。让我们看看另一种方法,在页面完全加载后,使用反应式处理程序方法和 JavaScript 函数在 HTML div 中逐渐加载这些 bean 名称。

使用服务器发送的事件(SSE)

在前一章中,提到了反应式控制器。大多数反应式控制器都用@RestController注释,这是一个组合注释,标记控制器具有返回数据而不是逻辑视图名称的处理程序方法。IndexController不能被注释,因为我们仍然需要它来解析index.html视图模板。但是,我们可以将 bean flux 提取到一个用@ResponseBody标注的反应处理程序方法中。这导致了清单 10-9 中的实现。

package com.apress.prospringmvc.bookstore.controller;

import org.springframework.web.bind.annotation.ResponseBody;
import reactor.core.publisher.Flux;
// other imports omitted

@Controller
public class IndexController implements ApplicationContextAware {

    private ApplicationContext ctx;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
        throws BeansException {
        ctx = applicationContext;
    }

    @GetMapping(path = {"/", "index.htm"})
    public String index() {
        return "index";
    }

    @ResponseBody
    @GetMapping(value = "/beans", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getBeanNames() {
        List<String> beans = Arrays.stream(ctx.getBeanDefinitionNames())
          .sorted().collect(Collectors.toList());
        return Flux.fromIterable(beans).delayElements(Duration.ofMillis(200));
    }
}

Listing 10-9Reactive IndexController with a Method Handler Returning a Flux<T> of Data

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

如果您启动该应用,您可以通过运行以下命令来测试该方法是否返回了一系列 bean 名称。

curl -H "Accept:text/event-stream" http://localhost:8080/beans

  • 1
  • 2

Spring WebFlux 将每个 bean 名称作为服务器发送的事件发出,现在,index.html需要一些修改来处理它们并在页面上显示它们。因为新数据必须在到达时添加到页面中,所以 JavaScript 需要编写一个函数来修改最少的 beans。

因此,显示 bean 名称所需的index.html模板片段发生了变化,如清单 10-10 所示。

<!-- other HTML/JavaScript parts omitted -->
<script type="text/javascript" th:inline="javascript">
/*<![CDATA[*/
    var renderBeans = {
        source: new EventSource([[@{|/beans|}]]) ,
        start: function () {
            this.source.addEventListener("message", function (event) {
                //console.log(event);
                $("#beans").append('<li>'+ event.data +'</li>')
            });
            this.source.onerror = function () {
                this.source.close();
            };
        },
        stop: function() {
            this.source.close();
        }
    };

    $( window ).on( "load", function() {
        renderBeans.start();
    });

    $( window ).on( "onbeforeunload", function() {
        renderBeans.stop();
    });
/*]]>*/
</script>
<div class="scrollable">
    <ul id="beans">
    </ul>
</div>

Listing 10-10Thymeleaf template snippet used to display the bean names received as Server-Sent Events (snippet from the index.html file)

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

[[@{|/beans|}]]是一个百里香链接表达式,用于生成相对于应用上下文的 URL。

在清单 10-10 中,jQuery库用于编写处理服务器发送事件所需的 JavaScript 代码,这是一种服务器推送技术,使客户端能够通过 HTTP 连接从服务器接收自动更新。这意味着页面得到呈现,但连接保持打开,因此服务器可以向客户端发送更多数据。EventSource API 被标准化为 HTML5 的一部分,除了 Internet Explorer,其他所有浏览器都支持它。

百里香叶产生三种类型的小分子物质。

  • :数据以head:{prefix}_head为前缀;其中prefix值通过ReactiveDataDriverContextVariable构造函数设置。前缀用于包含迭代数据(如果有)之前的所有标记的通信期间的单个事件。例如,当您正在读取一个脸书线程时,当您打开页面时,数据库中存在的所有注释,在您打开该页面时的时间戳之前,应该已经呈现在页面中了。没有必要一一渲染*。百里香支持这种类型的初始化事件。*

  • 数据消息:数据以message :{prefix}_message为前缀;其中prefix通过ReactiveDataDriverContextVariable构造器设置。前缀用于一系列事件,数据驱动程序产生的每个值对应一个事件。例如,当您阅读脸书帖子时,其他用户在您查看页面时发表的评论会一个接一个地出现在评论部分。来自注释的数据通过 message 类型的 SSE 发送给客户机。

  • Tail :数据以tail :{prefix}_tail为前缀,其中prefix值通过ReactiveDataDriverContextVariable构造函数设置。前缀用于通信期间的单个事件,包含迭代数据(如果有)之后的所有标记。例如,假设脸书有一个选项,用户可以通过它选择停止查看新评论,这种类型的事件可以用来发送数据库中所有现有的评论,其时间戳值在最后显示的评论和用户选择停止查看新评论的时间戳之间*。* 4

在前面的代码片段中,/beans URL 被用作 SSE 流的源。使用 URL 创建一个EventSource 5 实例,它打开一个持久连接,服务器通过该连接以text/event-stream格式发送事件。连接保持打开,直到通过调用EventSource.close()关闭。那些事件被 Spring WebFlux 标记为message事件,并在EventSource实例上设置一个EventListener 6 实例来拦截那些事件,提取数据,并将其添加到一个 HTML 页面中。

bean names 流被有意减慢,以显示连续的通信。如果使用 Chrome 或 Firefox,在加载页面时,可以在开发者控制台中看到服务器发送的事件。从EventListener实例体中删除console.log(event)语句的注释。在图 10-1 中,书店应用的主页在 Firefox 中打开,你可以在开发者控制台中看到从服务器发送的数据流。

SpringMVC WebFlux 高级教程(五)

图 10-1

服务器发送的事件显示在 Firefox 的开发人员控制台中

使用服务器发送的事件将反应数据显示到百里香模板中的另一种方法是使用一个IReactiveSSEDataDriverContextVariable上下文变量。实现该接口的上下文变量以反应数据流的形式包装Publisher<T>实例,这意味着以 SSE(服务器发送事件)模式驱动模板的反应友好执行。 7 这意味着 Spring WebFlux 不会将数据包装成 SSEs 相反,它会将它们发送到百里香引擎来执行此操作。

在 Spring WebFlux 应用使用百里香叶引擎生成视图的上下文中,这意味着发出的每个值都被映射到百里香叶模板片段,百里香叶视图的一部分。当发出一个值时,百里香引擎获取数据并将其封装在片段描述的 HTML 元素中,然后作为类型为message的 SSE 发出。然后使用 JavaScript 函数将事件数据注入 HTML 页面。清单 10-11 是百里香模板的一个片段。它显示了一个名为newBooks. N<div/>元素,其内部是一个只有一行的表格,包含即将发行的一本书的详细信息。

<!-- other HTML parts omitted -->
 <div class="releases_box">
    <div class="title">
        <span class="title_icon">
            <img th:src="@{/statimg/release.ico}" alt="" title="" />
        </span>
        <th:block th:text="#{main.title.newbooks}">New Books</th:block>
    </div>
    <div id="newBooks">
    <!-- /start/ the targeted fragment -->
        <table th:each="book : ${newBooks}" class="releases_table">
            <tr>
                <td
        th:text="${book.year} + ', ' + ${book.title} + ', by ' +  ${book.author}">
                </td>
            </tr>
        </table>
    <!-- /end/ the targeted fragment -->
    </div>
</div>

Listing 10-11Thymeleaf Template Snipped to Display the Bean Names Received As Server-Sent Events (snippet from the search.html file)

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

表定义表示每次发出值时由百里香引擎重写的片段。这是使用类似于清单 10-10 中描述的 JavaScript 函数完成的,只是这个函数用事件数据覆盖了newBooks div 的 HTML 内容。清单 10-12 中描述了包含实现所需反应行为的函数的renderBooks变量。

// other HTML/JavaScript parts omitted
var renderBooks = {
    source: new EventSource([[@{|/book/new|}]]) ,
    start: function () {
        renderBooks.source.addEventListener("message", function (event) {
            //console.log(event);
            $("#newBooks").html(event.data);
        });
        renderBooks.source.onerror = function () {
            this.close();
        };
    },
    stop: function() {
        this.source.close();
    }
};

Listing 10-12The JavaScript Function That Provides the View Reactive Behavior (snippet from the search.html file)

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

映射到/book/new URL 的处理程序方法是BookSearchController的一部分,与用于在IndexController中提供 beans 名称流的方法没有什么不同。主要有两个区别。第一个区别是IReactiveSSEDataDriverContextVariable被用作数据驱动上下文变量的引用类型。这就是 Thymeleaf 被告知我们希望模板以 SSE(服务器发送事件)模式执行的方式。

第二个区别是方法返回的逻辑视图名,该名称必须包含百里香模板片段的标识符,该模板片段应在作为 SSE 发送之前应用于流发出的每个元素。逻辑视图名称必须遵循以下语法:templateName :: #fragmentIdentifier。清单 10-13 中的上下文变量没有声明显式前缀,这意味着事件被标记为具有message类型。

package com.apress.prospringmvc.bookstore.controller;

import org.thymeleaf.spring5.context.webflux.IReactiveSSEDataDriverContextVariable;
import org.thymeleaf.spring5.context.webflux.ReactiveDataDriverContextVariable;
//other imports omitted

@Controller

public class BookSearchController {
  // Generates random books to be displayed
    @GetMapping( value = "/book/new", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public String newBooks(final Model model){
        Flux<Book> newReleases = Flux.interval(Duration.ofSeconds(3))
          .map(delay -> BookNewReleasesUtil.randomRelease());

        final IReactiveSSEDataDriverContextVariable dataDriver =
                new ReactiveDataDriverContextVariable(newReleases, 1);

        model.addAttribute("newBooks", dataDriver);
        return "book/search :: #newBooks";
    }
}
// other code omitted

Listing 10-13Handler Method in BookSearchController Declaring a IReactiveSSEDataDriverContextVariable

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

将所有这些组件放在一起,您会得到一个每 3 秒钟显示一本新书的页面。如果使用 Chrome 或 Firefox,在加载页面时,可以在开发者控制台中看到服务器发送的事件。从EventListener实例体中删除console.log(event)语句的注释。在图 10-2 中,书店应用的search页面在 Firefox 中打开,你可以在开发者控制台中看到服务器发送的数据流。注意,数据是一个与名为newBooks,的模板片段匹配的 HTML 片段,其中百里香叶变量被替换为发出的值。

SpringMVC WebFlux 高级教程(五)

图 10-2

服务器发送的事件显示在 Firefox 的开发人员控制台中

我们提到前缀支持 SSE。当您需要在同一个页面上有多个反应片段时,前缀非常有用。在这种情况下,前缀将 se 映射到同一模板中的不同片段。这是必需的,因为即使两个发布者在同一台服务器上发出事件,他们也是通过同一 HTTP 连接来完成的。需要两个事件侦听器来拦截不同类型的事件并相应地处理它们。

search.html模板中,如果我们添加一个显示技术新闻的新部分,我们必须向两个上下文数据驱动变量添加前缀,以便基于它们进行过滤,并定向到适当的模板片段。newBooks用作指向newBooks div 的服务器事件的前缀,techNews用作显示技术新闻的名为techNews的 div 的前缀。两个 div 元素的 HTML 和 JavaScript 基本相同。如果你想看看发送给客户的 SSEs 是什么样的,请看图 10-3 。

SpringMVC WebFlux 高级教程(五)

图 10-3

服务器发送的带有前缀名称的事件显示在 Firefox 的开发人员控制台中

在前面的例子中,书籍和科技新闻是由一个流提供的,该流从一个固定的集合中随机发出一个条目。不过,有人提到,当来自多个服务的数据被聚合时,反应式应用是一个很好的选择。对于更接近现实的情况,newBooks div 的数据可以由书店应用外部的反应式服务提供,或者可能更多地属于图书出版商(比如 Apress)。用于techNews div 的数据可以由某个公共技术聚合器应用的反应服务提供。从那些流发出的数据的处理程序方法的实现看起来很像清单 10-13 中的代码,所以这里不再重复。

Spring WebFlux 引入了一些东西,可以毫不费力地与其他反应式服务集成,这将在下一节讨论。

介绍 WebClient 和 WebTestClient

在 Spring WebFlux 之前,使用org.springframework.web.client.RestTemplate可以发出 HTTP 请求。这个客户端是同步的,通过 HTTP 方法为常见场景提供模板。现在已经废弃的AsyncRestTemplate是后来在同一个包中引入的,以支持异步 HTTP 请求,只是被WebClient所取代。对于测试来说,org.springframework.boot.test.web.client.TestRestTemplate仍然可以用于同步 HTTP 请求。

Spring WebFlux 提供了一个反应式的、非阻塞的 HTTP 客户端,通过org.springframework.web.reactive.function.client.WebClient接口公开了一个非常实用的 API。提供了单个实现,org.springframework.web.reactive.function.client.DefaultWebClient。在幕后,它使用在类路径上找到的 HTTP 客户端,比如 Reactor Netty。这是从您的反应式应用访问其他反应式服务的实用工具。WebClient接口提供了两个静态方法来创建一个WebClient实例——都被命名为create。其中一个调用接收应用的基本 URL 作为参数,所有后续调用都可以简化,因为它们的 URL 被认为是相对于基本 URL 的。

清单 10-14 显示了一个使用静态create()方法创建的web client。然后,实例对在http://localhost:8080/randomBookNews公开的反应服务进行 GET REST 调用。这个服务只不过是同一个应用中的一个功能端点,它返回无限的Book实例流。

package com.apress.prospringmvc.bookstore.controller;

import org.springframework.web.reactive.function.client.WebClient;
//other imports omitted

@Controller
public class BookSearchController {

    @GetMapping( value = "/book/new",
                 produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public String newBooks(final Model model){
        // previous implementation
        //Flux<Book> newReleases = Flux.interval(Duration.ofSeconds(5))
        //       .map(delay -> BookNewReleasesUtil.randomRelease());
        WebClient webClient = WebClient.create();
        Flux<Book> newReleases = webClient
                .get().uri("http://localhost:8080/randomBookNews")
                .retrieve()
                .bodyToFlux(Book.class);

        final IReactiveSSEDataDriverContextVariable dataDriver =
                new ReactiveDataDriverContextVariable(newReleases, 1, "newBooks");

        model.addAttribute("newBooks", dataDriver);
        return "book/search :: #newBooks";
    }
    //other code omitted
}

Listing 10-14Using WebClient Without a Base URL Within the BookSearchController

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

另一种方法是使用完整的 URL 来创建WebClient实例。然后,所有使用该实例的 HTTP 请求只能使用相对于 baseURL 的 URL 的一部分。清单 10-15 显示了一个使用静态create(String)方法创建的WebClient

WebClient webClient = WebClient.create("http://localhost:8080/randomBookNews");

Flux<Book> newReleases = webClient.get().uri("/")
        .retrieve()
        .bodyToFlux(Book.class);

Listing 10-15Using WebClient with a Base URL

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

创建WebClient的另一种方法是使用通过调用WebClient.builder()方法返回的构建器实例。这允许对WebClient实例进行更细粒度的配置。使用构建器时,您可以设置头、cookies、额外的操作符来定制返回值,甚至可以设置不同的客户端连接器。默认使用(ReactorClientHttpConnector 8 )。它是由 Spring WebFlux 提供的,但是你也可以使用一个反应式的 Apache CloseableHttpAsyncClient, 9 为例)。这意味着清单 10-15 中的WebClient声明可以像清单 10-16 中描述的那样编写。

WebClient webClient = WebClient.builder()
        .baseUrl("https://localhost:8080/randomBookNews")
        .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
        .defaultCookie("InternalCookie", "all")
        .build();

Listing 10-16Creating WebClient Using a Builder

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

不管是用什么方式创建的,WebClient实例都是不可变的;但是,它支持一个clone()方法,该方法返回一个构建器,该构建器可以基于原始实例创建一个新实例。

WebClient非常灵活,允许创建复杂的结构,支持使用带有路径变量和请求参数的 URL 进行 HTTP 请求。也支持 URL 生成器和 URL 编码。我们快速复习一些重要的方法。对于可用 API 的完整描述,请随意查阅官方文档,这非常好。 10

retrieve()方法获得一个 HTTP 响应。在前面的例子中,这个方法后面是对bodyToMono(Class)bodyToFlux(Class)的调用。两者都将发出的值的类型作为参数接收。但是,这个方法后面也可以跟一个对onStatus(..),的调用,并且可以发出异常,这些异常被封装到WebClientResponseException对象中,这取决于 HTTP 状态代码。清单 10-17 描述了这样一个例子。

Flux<Book> newReleases = webClient.get()
    .uri("/")
    .retrieve()
    .onStatus(HttpStatus::is4xxClientError, response ->
        Mono.error( response.statusCode() == HttpStatus.UNAUTHORIZED
            ? new ServiceDeniedException("You shall not pass!")
            : new ServiceDeniedException("Well.. this is unfortunate!"))
    )
    .onStatus(HttpStatus::is5xxServerError, response ->
        Mono.error(response.statusCode() == HttpStatus.INTERNAL_SERVER_ERROR
            ? new ServiceDownException("This is SpartAAA!!")
            : new ServiceDownException("Well.. this is a mystery!"))
    )
    .bodyToFlux(Book.class);

Listing 10-17Using WebClient with Customized Error Behavior, Using the onStatus(..) Method

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

如果响应应该有内容,那么它应该由匹配状态代码谓词的函数使用;否则,它的内容将被丢弃以释放资源。

retrieve()法可与get()post()put()delete()等配合使用。还有一个exchange()方法,它提供了更细粒度的控制。例如,exchange()方法提供了对响应的访问,这允许您检查响应头、cookies,或者以任何必要的方式更改它。缺点是它不支持基于 HTTP 状态代码的定制行为。

关于WebClient的一件很酷的事情是,如果它的输出可以被正确地反序列化,它可以与使用任何其他技术开发的服务一起使用。为了证明这一点,我们使用 Node.js 实现了提供科技新闻的服务。 11 清单 10-18 描述了该服务的实现。

const http = require('http');
const sys = require('util');
const fs = require('fs');

const hostname = 'localhost';
const port = 3000;

const news = [
    'Apress merged with Springer.',
    // other values omitted
];

const server = http.createServer((req, res) => {
    res.setHeader('Content-Type', 'text/event-stream;charset=UTF-8');
    res.setHeader('Cache-Control', 'no-cache');
    // only if you want anyone to access this endpoint
    res.setHeader('Access-Control-Allow-Origin', '*');
    res.flushHeaders();

    // Sends a SSE every 2 seconds on a single connection.
    setInterval(function() {
        res.write('data:'+news[Math.floor(Math.random() * news.length)] + '

');
    }, 2000);
});

server.listen(port, hostname, () => {
    console.log(`Event stream available at http://${hostname}:${port}/techNews`);
});

Listing 10-18The tech-news.js

Service That Generates an Infinite Stream of Random Tech News

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

js 库是 JavaScript 函数的集合,可以用几行代码创建一个 web 服务器。作为参数提供给http.createServer的函数每 2 秒发出一个随机文本。

当用任何技术编写反应式服务时,可以用curl命令检查输出。使用-v选项获得服务发送内容的详细描述。这揭示了编写客户端所需的重要信息,例如媒体类型和编码以及发送信息的格式。

清单 10-19 展示了 curl 命令和参数,用于检查 Node.js 技术新闻服务的响应及其在终端中的输出。

$  curl http://localhost:3000/techNews  -v
* Connection failed
* connect to ::1 port 3000 failed: Connection refused
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 3000 (#0)
> GET /techNews HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/event-stream;charset=UTF-8
< Cache-Control: no-cache
< Access-Control-Allow-Origin: *
< Date: Thu, 30 Jul 2020 11:12:54 GMT
< Connection: keep-alive
< Transfer-Encoding: chunked
<
data:Amazon launches reactive API for DynamoDB.

data:Java 17 will be released in September 2021.
...

Listing 10-19Node.js Service Output Returned by the curl Command

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

WebClient对于检索由其他服务产生的数据是实用的,但是更实用的是它的用于编写集成测试的版本:WebTestClientorg.springframework.test.web.reactive.server.WebTestClientorg.springframework.boot.test.web.client.TestRestTemplate的无功当量。它可以测试控制器和功能端点,它本质上包装了WebClient,并为其提供了测试环境。WebTestClient提供与WebClient相同的 API,但也支持对返回响应的测试假设。

清单 10-20 描述了检查搜索书籍匹配标准的 POST 请求实现的测试方法。

package com.apress.prospringmvc.bookstore.web;

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.web.reactive.server.WebTestClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
// other imports omitted

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class BookstoreWebTest {
    private static Logger logger = LoggerFactory.getLogger(BookstoreWebTest.class);

    @Autowired
    private  WebTestClient testClient;

    @Test
    public void shouldReturnTwoBooks(){
        BookSearchCriteria criteria = new BookSearchCriteria();
        criteria.setCategory(Book.Category.JAVA);

        testClient.post()
            .uri("/book/search")
            .accept(MediaType.APPLICATION_JSON)
            .body(Mono.just(criteria), BookSearchCriteria.class)
            .exchange()
            .expectStatus().isOk() /* test */
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectBodyList(Book.class)
            .consumeWith(
                result -> {
                    assertEquals(2, result.getResponseBody().size());
                    result.getResponseBody().forEach(p ->
                        logger.info("Response: {}",p));
                });
    }
}

Listing 10-20WebTestClient to Test a POST Request with Consumers

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

测试在测试应用上下文中运行,/book/search请求是针对在http://localhost:{mockPort}可用的模拟服务器发出的。注意方法的链接,这些方法也可以使用WebClientWebTestClient部分在exchange()呼叫之后开始。之后的所有三种方法都测试请求的假设。

  • expectStatus().isOk()检查 HTTP 状态代码是否为 200。

  • expectHeader().contentType(MediaType.APPLICATION_JSON)检查响应的媒体类型是否为 JSON。

  • expectBodyList(Book.class)检查响应的主体是否包含一组Book实例。

  • 作为一个参数,提供了一个消费者函数,它检查集合的大小是否为 2,并打印集合的每个成员。

方法允许开发者指定消费者使用他们觉得舒服的任何测试库来测试请求体。清单 10-20 中的实现可能被认为是冗长的。测试方法不需要打印结果体,这使得检查集合大小成为唯一需要的验证。在这种情况下,可以去掉consumeWith(..),用hasSize(2)代替。

使用这个实例的另一种方法值得一提。WebTestClient支持使用 JsonPath 12 表达式制作主体断言。对于包含 JSON 内容的响应,这是很实用的,我们不希望将这些内容反序列化为 Java 对象,或者在应用中没有相应的类。在清单 10-21 中,检查响应体的预期属性和预期值,而不将响应体反序列化为Book实例。

package com.apress.prospringmvc.bookstore.web;
// imports omitted

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class BookstoreWebTest {
    private static Logger logger = LoggerFactory.getLogger(BookstoreWebTest.class);

    @Autowired
    private  WebTestClient testClient;

    @Test
    public void shouldReturnBook(){
        testClient.get()
            .uri(uriBuilder -> uriBuilder.path("/book/isbn/{isbn}")
                        .build("9781484237779"))
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectBody()
            .jsonPath("$.title").isNotEmpty()
            .jsonPath("$.author").isEqualTo("Iuliana Cosmina");
    }
}

Listing 10-21WebTestClient to Test a GET Request Using JsonPath Expressions

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

WebTestClient可以使用bindToServer()方法测试真实的服务器。这很好,因为它可以测试使用其他技术开发的服务。

清单 10-22 描述了WebTestClient,的创建,它可以在运行于http://localhost:8080的真实应用上运行之前的测试方法。

private final WebTestClient testClient = WebTestClient
        .bindToServer()
        .baseUrl("https://localhost:8080")
        .build();

Listing 10-22WebTestClient Suitable to Test a GET Request on a Real Server

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

WebTestClient测试 API 比较丰富,写测试 13 的时候要参考官方文档,因为本书不可能全部涵盖。

国际化

使用反应式应用时的一个热门话题是国际化。Spring MVC 提供了一种非常简单的配置国际化的方法。

  1. 创建一个实现WebMvcConfigurer的配置类。

  2. @EnableWebMvc注释配置类(如果使用 Spring Boot,则不要注释)。

  3. 创建翻译属性文件。

  4. 声明一个MessageSource bean 并用它们的位置对其进行配置。

  5. 声明一个LocaleChangeInterceptor bean,根据附加到请求的lang参数的值将交换机配置到一个新的语言环境。

  6. 声明一个LocaleResolver bean 来配置一个语言环境解析策略。

可能会有很多步骤,但并不是所有的步骤都是必需的,尤其是在 Spring Boot 应用中,当遵循惯例时。

由于有了WebHandler API,配置国际化支持变得更加容易。首先,让我们讨论默认方式,它依赖于Accept-Language头。

使用 Accept-Language 头的国际化支持

@EnableWebFlux注释配置类从org.springframework.web.reactive.config.DelegatingWebFluxConfiguration导入 Spring WebFlux 配置。如果应用上下文的定制是必要的,比如国际化支持,那么可以扩展这个类,并且重写一些方法。Spring WebFlux 应用上下文被org.springframework.web.server.adapter.WebHttpHandlerBuilder用来组装一个处理链,该处理链由一个WebHandler实例组成,用一组WebFilter实例和WebExceptionHandler实例来修饰。默认情况下,WebHttpHandlerBuilderorg.springframework.web.server.i18n.AcceptHeaderLocaleContextResolver配置为支持地区上下文解析。类名(AcceptHeaderLocaleContextResolver)给出了它所描述的语言环境上下文解析策略的一个重要提示:语言环境是从 HTTP 请求的Accept-Language头中识别的。

Accept-Language请求 HTTP 头通告了客户机可以理解哪些语言以及首选哪种语言环境变量。浏览器根据他们的用户界面语言设置这个头,用户很少改变默认设置。当发出 REST 请求时,可以很容易地更改这个参数。每个响应都被翻译成请求中为这个头设置的语言值。

在 Spring WebFlux 引导应用中,添加语言资源文件,声明一个MessageSource bean,并配置它们的位置,就足以支持使用 Accept 头的国际化。

使用请求参数和 LocaleContextResolver 的自定义实现的国际化支持

大多数 web 应用支持使用请求参数的国际化。为了在 Spring WebFlux 应用中使用请求参数提供国际化支持,必须在配置中添加一个定制的LocaleContextResolver实现来替换默认的AcceptHeaderLocaleContextResolver(实现相同的接口)。这是通过扩展DelegatingWebFluxConfiguration并覆盖createLocaleContextResolver()方法来返回自定义LocaleContextResolver的实例来实现的。

清单 10-23 中描述了使用请求参数支持国际化的建议实现。

package com.apress.prospringmvc.bookstore.config.i18n;

import org.springframework.context.i18n.LocaleContext;
import org.springframework.context.i18n.SimpleLocaleContext;
import org.springframework.util.CollectionUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.i18n.LocaleContextResolver;

import java.util.List;
import java.util.Locale;

public class RequestParamLocaleResolver implements LocaleContextResolver {

    private String languageParameterName;

    public RequestParamLocaleResolver(final String languageParameterName) {
        this.languageParameterName = languageParameterName;
    }

    @Override
    public LocaleContext resolveLocaleContext(final ServerWebExchange exchange) {
        Locale defaultLocale = Locale.getDefault();
        List<String> referLang = exchange.getRequest().getQueryParams().get(languageParameterName);
        if (!CollectionUtils.isEmpty(referLang) ) {
            String lang = referLang.get(0);
            defaultLocale = Locale.forLanguageTag(lang);
        }
        return new SimpleLocaleContext(defaultLocale);
    }
}

Listing 10-23Custom LocaleContextResolver Resolving Locale Using a Request Parameter

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

为了配置 Spring WebFlux 来使用这个实现,我们需要添加扩展DelegatingWebFluxConfiguration的配置类。实现很简单,如清单 10-24 所示。

package com.apress.prospringmvc.bookstore.config;

import com.apress.prospringmvc.bookstore.config.i18n.RequestParamLocaleResolver;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.DelegatingWebFluxConfiguration;
import org.springframework.web.server.i18n.LocaleContextResolver;

@Configuration
public class LocaleSupportConfig extends DelegatingWebFluxConfiguration {

    @Override
    protected LocaleContextResolver createLocaleContextResolver() {
        return new RequestParamLocaleResolver("lang");
    }
}

Listing 10-24Custom LocaleContextResolver Resolving Locale Using a Request Parameter

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

语言参数名称应该是可配置的,但是请记住,视图中区域设置更改的 URL 需要匹配。前面的实现的缺点是,只有在以?lang=XX为后缀的情况下,所需的语言环境才会应用于请求。这是因为区域设置不会保存在任何地方。在 Spring MVC 应用中,我们使用CookieLocaleResolver创建一个区域 cookie 并读取它来识别用户配置的区域。这允许应用使用不同于浏览器中配置的语言环境。CookieLocaleResolverorg.springframework.web.servlet.i18n包的一部分,Spring WebFlux 没有这样的解析器。

很容易修改前面的RequestParamLocaleResolver来添加 cookie 支持,因为它可以通过ServerWebExchange访问请求和响应。清单 10-25 描述了LocaleContextResolver的一个实现,它将所需的地区存储在一个 cookie 中,生存期为五分钟。

package com.apress.prospringmvc.bookstore.config.i18n;

import org.springframework.http.HttpCookie;
import org.springframework.http.ResponseCookie;
// other imports omitted

public class CookieParamLocaleResolver implements LocaleContextResolver {

    public static final String LOCALE_REQUEST_ATTRIBUTE_NAME = "Bookstore.Cookie.LOCALE";

    private String languageParameterName;

    public CookieParamLocaleResolver(final String languageParameterName) {
        this.languageParameterName = languageParameterName;
    }

    @Override
    public LocaleContext resolveLocaleContext(final ServerWebExchange exchange) {
        List<String> referLang = exchange.getRequest().getQueryParams().get(languageParameterName);
        Locale defaultLocale = getLocaleFromCookie(exchange);
        if (!CollectionUtils.isEmpty(referLang) ) {
            String lang = referLang.get(0);
            defaultLocale = Locale.forLanguageTag(lang);
            setLocaleToCookie(lang, exchange);
        }
        return new SimpleLocaleContext(defaultLocale);
    }

    private void setLocaleToCookie(final String lang, final ServerWebExchange exchange) {
        MultiValueMap<String, HttpCookie> cookies =  exchange.getRequest().getCookies();
        HttpCookie langCookie = cookies.getFirst(LOCALE_REQUEST_ATTRIBUTE_NAME);
        if(langCookie == null || !lang.equals(langCookie.getValue())) {
            ResponseCookie cookie = ResponseCookie.from(LOCALE_REQUEST_ATTRIBUTE_NAME, lang)
              .maxAge(Duration.ofMinutes(5)).build();
            exchange.getResponse().addCookie(cookie);
        }
    }

    private Locale getLocaleFromCookie(final ServerWebExchange exchange){
        MultiValueMap<String, HttpCookie> cookies =  exchange.getRequest().getCookies();
        HttpCookie langCookie = cookies.getFirst(LOCALE_REQUEST_ATTRIBUTE_NAME);
        return langCookie != null ? Locale.forLanguageTag(langCookie.getValue()) : Locale.getDefault();
    }
}

Listing 10-25Custom LocaleContextResolver Resolving Locale Using a Request Parameter

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
使用请求参数和自定义实现 WebFilter 的国际化支持

使用定制的web filter实现国际化支持是一个非常优雅的解决方案,因为它不需要对 WebFlux 配置进行任何显式的修改。自定义WebFilter可以简单地声明为一个 bean。作为WebHttpHandlerBuilder应用上下文,的一部分,国际化过滤器被提取并添加到应用于每个请求的 web 过滤器集合中。

这种实现的缺点是区域设置不能保存在任何地方,所以如果我们想将区域设置保存到 cookie 或用户会话中,就必须编写额外的代码。这并不困难,因为WebFilter也可以通过ServerWebExchange访问请求和响应。

清单 10-26 中的实现不完全是我的。一个叫 Jonathan Mendoza 的开发者把它贴在了 StackOverflow 上,除了添加 cookies 支持,我没有别的办法可以对它进行改进。 14 我们之前称这个实现是最优雅的,因为它使用了默认的AcceptHeaderLocaleContextResolver,它只不过是拦截请求并用Accept-Language头来修饰它。该值取自请求参数。如果区域设置 cookie 不存在,则创建它。如果没有语言请求参数,则该值取自 cookie 或应用的默认值(如果 cookie 不存在)。

为了减少本书的篇幅,清单 10-26 只描述了LanguageQueryParameterWebFilter中的核心方法。完整的实现在包含本书代码的库中。

package com.apress.prospringmvc.bookstore.util;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import org.springframework.web.server.adapter.DefaultServerWebExchange;
import org.springframework.web.server.adapter.HttpWebHandlerAdapter;
// other imports omitted

@Component
public class LanguageQueryParameterWebFilter implements WebFilter {
    // other code omitted
    @Override
    public Mono<Void> filter(final ServerWebExchange exchange, final WebFilterChain chain) {
        final ServerHttpRequest request = exchange.getRequest();
        final MultiValueMap<String, String> queryParams = request.getQueryParams();
        final String languageValue = queryParams.getFirst("lang");

        final ServerWebExchange localizedExchange =
               getServerWebExchange(languageValue, exchange);
        return chain.filter(localizedExchange);
    }

    private ServerWebExchange getServerWebExchange(final String languageValue,
         final ServerWebExchange exchange) {
        return isEmpty(languageValue)
                ? getLocaleFromCookie(exchange)
                : getLocalizedServerWebExchange(languageValue, exchange);
    }

    private ServerWebExchange getLocalizedServerWebExchange(final String languageValue,
        final ServerWebExchange exchange) {
        setLocaleToCookie(languageValue, exchange);
        final ServerHttpRequest httpRequest = exchange.getRequest()
                .mutate()
                .headers(httpHeaders -> httpHeaders.set("Accept-Language", languageValue))
                .build();

        return new DefaultServerWebExchange(httpRequest, exchange.getResponse(),
                httpWebHandlerAdapter.getSessionManager(),
         httpWebHandlerAdapter.getCodecConfigurer(),
                httpWebHandlerAdapter.getLocaleContextResolver());
    }
     // setLocaleToCookie & getLocaleFromCookie are pretty similar to Listing 10-25.
}

Listing 10-26Custom WebFilter Resolving Locale Using a Request Parameter

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

验证、类型转换和错误处理

因为 Spring WebFlux 应用可以使用反应式控制器来构建,所以它以与 Spring MVC 应用相同的方式支持验证、类型转换和错误处理。

控制器参数支持类似于@Valid(来自javax.validation包)及其 Spring 等价物@Validated(来自org.springframework.validation.annotation包)的验证。要配置全局Validator,,必须配置一个类型为org.springframework.validation.beanvalidation.LocalValidatorFactoryBean的 bean。如果缺少这样的 bean,那么默认情况下会声明一个名为webFluxValidator的类型为org.springframework.validation.beanvalidation.OptionalValidatorFactoryBean的 bean。OptionalValidatorFactoryBeanLocalValidatorFactoryBean的子类,是一个伪验证器。它没有声明要执行的任何验证。为了通知在启动 Spring WebFlux 应用时不支持验证,清单 10-27 中的调试消息被打印在日志文件中。

DEBUG o.s.b.f.s.DefaultListableBeanFactory - Creating shared instance of singleton bean 'webFluxValidator'
DEBUG o.s.v.b.OptionalValidatorFactoryBean - Failed to set up a Bean Validation provider
javax.validation.NoProviderFoundException: Unable to create a Configuration because no Bean Validation provider could be found. Add a provider like Hibernate Validator (RI) to your classpath.
at javax.validation.Validation$GenericBootstrapImpl.configure(Validation.java:291)

Listing 10-27Debug Messages Printed in the Log File When Validation for a Spring WebFlux Application Is Not Supported

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

配置一个Validator bean 就像覆盖配置类正在实现的WebFluxConfigurer接口中声明的getValidator()默认方法一样简单。这个方法应该返回一个类型为Validator的 bean。这个 bean 从类路径中选择一个 bean 验证提供者,所以应该将一个具有这样一个提供者的库,比如Hibernate Validator库,添加到项目的依赖项中。

自定义转换器和格式化程序也是如此。WebFluxConfigurer接口声明了默认的addFormatters(FormatterRegistry),它可以注册定制的转换器和格式化程序。(这与WebMvcConfigurer接口为 Spring MVC 应用所做的是一样的。)清单 10-28 显示了来自ReactiveThymeleafWebConfig的一个片段,其中包括一个Validator bean 和一个日期格式化程序配置。

package com.apress.prospringmvc.bookstore.config;

import com.apress.prospringmvc.bookstore.util.formatter.DateFormatAnnotationFormatterFactory;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
import org.springframework.format.FormatterRegistry;
// other import omitted

@Configuration
@EnableWebFlux
public class ReactiveThymeleafWebConfig implements WebFluxConfigurer {
    @Bean
    public Validator validator() {
        final var validator = new LocalValidatorFactoryBean();
        validator.setValidationMessageSource(messageSource());
        return validator;
    }

    @Override
    public Validator getValidator() {
        return validator();
    }

    @Override
    public void addFormatters(FormatterRegistry registry) {
        registry.addFormatterForFieldAnnotation(new DateFormatAnnotationFormatterFactory());
    }
    // other code omitted
}

Listing 10-28Validator Bean and a Date Formatter Configuration for a Spring WebFlux Application

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

在 Spring WebFlux 应用中,任何与控制器相关的东西都以与 Spring MVC 应用相同的方式进行配置和工作。唯一需要开发人员额外工作的情况是对功能端点进行验证。

功能端点表示将请求映射到org.springframework.web.reactive.function.server.HandlerFunction<T extends ServerResponse>的方式。A handler function将一个org.springframework.web.reactive.function.server.ServerRequest作为参数,并返回一个由Mono<org.springframework.web.reactive.function.server.ServerResponse>返回类型表示的延迟响应。一个处理函数相当于一个@RequestMapping注释方法;不幸的是,它不支持像这类方法那样用注释@Valid@Validated标记验证的参数。由于这个小缺点,验证必须由开发人员在函数体中配置。

对于一个Book对象,应该声明一个实现org.springframework.validation.ValidatorBookValidator类来测试标题、作者、ISBN 和类别是否为空。这个类在清单 10-29 中有描述。

package com.apress.prospringmvc.bookstore.util.validation;

import com.apress.prospringmvc.bookstore.document.Book;
import org.springframework.validation.Errors;
import org.springframework.validation.ValidationUtils;
import org.springframework.validation.Validator;

public class BookValidator implements Validator {

    @Override
    public boolean supports(Class<?> clazz) {
        return (Book.class).isAssignableFrom(clazz);
    }

    @Override
    public void validate(Object target, Errors errors) {
        ValidationUtils.rejectIfEmpty
            (errors, "title", "required", new Object[] { "Title" });
        ValidationUtils.rejectIfEmpty
            (errors, "author", "required", new Object[] { "Author" });
        ValidationUtils.rejectIfEmpty
            (errors, "isbn", "required", new Object[] { "Isbn" });
        ValidationUtils.rejectIfEmpty(
            errors, "category", "required", new Object[] { "Category" });
    }
}

Listing 10-29BookValidator Class

to Validate Book Instances

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

处理发送一个Book对象到数据库的 PUT/POST 请求的处理函数应该首先验证Book实例,如果验证失败就抛出ServerWebInputException。对于任何包含不可接受数据的请求,都应该抛出这种类型的异常,因为它会自动将 HTTP 状态代码设置为 400(错误请求)并返回Errors对象,让用户知道问题出在哪里。清单 10-30 描述了一个BookHandler类,它包含了处理创建Book实例的 POST 请求所需的所有代码。

package com.apress.prospringmvc.bookstore.handler;
import com.apress.prospringmvc.bookstore.util.validation.BookValidator;
import org.springframework.validation.BeanPropertyBindingResult;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import org.springframework.web.server.ServerWebInputException;
import static org.springframework.web.reactive.function.server.ServerResponse.*;
import javax.validation.ValidationException;
// other imports omitted

@Component
public class BookHandler {

    private BookstoreService bookstoreService;
    private final Validator validator = new BookValidator();

    public BookHandler(BookstoreService bookstoreService) {
        this.bookstoreService = bookstoreService;
    }

    public Mono<ServerResponse> create(ServerRequest serverRequest) {
        return serverRequest.bodyToMono(Book.class)
            .flatMap(this::validate)
                .flatMap(book -> bookstoreService.addBook(book))
                .flatMap(book -> ServerResponse.created(URI.create("/book/isbn/" + book.getIsbn()))
                    .contentType(MediaType.APPLICATION_JSON).bodyValue(book))
            .onErrorResume(error -> ServerResponse.badRequest().bodyValue(error));
    }

    private Mono<Book> validate(Book book) {
        Errors errors = new BeanPropertyBindingResult(book, "book");
        validator.validate(book, errors);
        if (errors.hasErrors()) {
            throw new ValidationException(errors.toString());
        }
        return Mono.just(book);
    }
    // other handler functions emitted
}

Listing 10-30BookHandler Class

to Validate Book Instances

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

这种方法有点粗糙,因为ValidationException将消息设置为全文,导致将Errors对象转换为StringValidationExceptiononErrorResume(..)函数拦截,允许进一步配置响应。如果没有onErrorResume(..),由 Spring Boot 自动配置的默认错误处理程序 bean 会捕捉异常并生成默认响应。这个 bean 被命名为errorWebExceptionHandler,,它的类型是DefaultErrorWebExceptionHandler,,这是 Spring Boot 提供的默认实现。图 10-4 描述了WebExceptionHandler的层次结构。

SpringMVC WebFlux 高级教程(五)

图 10-4

WebExceptionHandler等级制度

该 bean 返回的响应是一个通用 JSON 表示对象,包含 HTTP 状态代码 400(错误请求)、URI 路径和一个字母数字请求标识符。测试验证应用的最简单的方法是使用WebTestClient编写一个阴性测试。清单 10-31 描述了一个测试,假设创建一个Book实例失败,并返回一个带有 HTTP 状态代码的响应。因为查看响应细节很有趣,所以添加了一个消费者来打印它。

package com.apress.prospringmvc.bookstore.api;

import org.junit.jupiter.api.Test;
import org.springframework.test.web.reactive.server.WebTestClient;
//other imports omitted

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class BookApiTest {
    private static Logger logger = LoggerFactory.getLogger(BookApiTest.class);

    @Autowired
    private WebTestClient testClient;

    @Test
    void shouldFailCreatingABook() {
        // no isbn, no category
        Book book = new Book();
        book.setTitle("TDD for dummies");
        book.setAuthor("Test User");

        testClient.post().uri("/book/isbn")
                .body(Mono.just(book), Book.class).exchange()
                .expectStatus().isBadRequest() // 400
                .expectBody()
                .consumeWith(responseEntity ->
                    logger.debug("Response: {}", responseEntity)
                );
    }

}

Listing 10-31Test Method Overing a Validation Failure When a Request Is Made for Creating a Book Instance

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

前面的测试通过,没有创建Book实例,因为ISBNcategory丢失了。响应 HTTP 代码是 400,正如控制台中打印的响应细节所证明的,您可以在清单 10-32 中看到。响应详细信息包括验证失败的对象。一些 JSON 行被删除了,因为输出太冗长,不能作为本书的一部分,但是验证细节被保留了下来。

DEBUG c.a.p.bookstore.api.BookApiTest - Response:
> POST http://localhost:51164/book/isbn
> WebTestClient-Request-Id: [1]
> Content-Type: [application/json]
> Content-Length: [132]

{
    "id":null,"title":"TDD for dummies",
    "description":null,
    "price":null,
    "year":null,
    "author":"Test User",
    "isbn":null
}

< 400 BAD_REQUEST Bad Request
< Vary: [Origin, Access-Control-Request-Method, Access-Control-Request-Headers]
< Content-Type: [application/json]
< Content-Length: [10831]

#response body starts here
{
# other JSON code omitted
"message": "[
    Field error in object 'book' on field 'isbn':
        rejected value [null];
    codes [required.book.isbn,required.isbn,
        required.java.lang.String,required];
    arguments [Isbn];
    Field error in object 'book' on field 'category':
        rejected value [null];
    codes [required.book.category,required.category,
        required.java.lang.String,required];
    arguments [Category];
}

Listing 10-32Response Details of a Failed Request to Create a Book Instance

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

在前面的清单中,您可以看到显式输出将您指向缺少值的必需字段。输出有些冗长,因为它是由BookValidator实例创建的Error实例的String表示。失败时返回的响应可以通过用其他东西替换Error实例来定制。

功能端点的验证处理很简单,依赖于将验证操作符添加到处理从ServerRequest实例中检索的对象的管道中。错误处理可以用同样的方式完成——在使用ServerResponse返回之前,添加一个操作符来处理管道处理对象中发出的错误。从开发的角度来看,最简单的方法是声明一个自定义错误对象或自定义异常类型,并尽可能依赖默认的错误处理程序。

对于 Spring Boot WebFlux 应用中错误处理行为的更细粒度定制,可以提供WebExceptionHandlerErrorWebExceptionHandler的实现。然而,由于级别太低,您必须直接处理请求/响应交换,这可能会很痛苦。定制的错误处理 bean 必须用@Order(-2)进行配置和注释,以优先于WebFluxResponseStatusExceptionHandler和 Spring Boot 的ErrorWebExceptionHandler。通过扩展AbstractErrorWebExceptionHandlerDefaultErrorWebExceptionHandler可以重用现有的实现。清单 10-33 描述了一个实现WebExceptionHandler的全局错误处理程序的简单实现。必须为handle(ServerWebExchange, Throwable)方法提供一个具体的实现来定制返回的响应消息。

package com.apress.prospringmvc.bookstore.util;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebExceptionHandler;
import com.apress.prospringmvc.bookstore.util.MissingValueException;
// other imports omitted

@Component
@Order(-2)
public class MissingValuesExceptionHandler implements WebExceptionHandler {

    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        DataBuffer buffer;
        if (ex instanceof MissingValueException) {
            exchange.getResponse().setStatusCode(HttpStatus.BAD_REQUEST);
            exchange.getResponse().getHeaders().add("Content-Type", "application/json");
            final String message = " {"missing_value_for": ""+
                ((MissingValueException)ex).getFieldNames() +""}";
            buffer = exchange.getResponse().bufferFactory().wrap(message.getBytes());
        } else {
            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            exchange.getResponse().getHeaders().add("Content-Type", "application/json");
            buffer = exchange.getResponse().bufferFactory().wrap("Ooops!".getBytes());
        }
        return exchange.getResponse().writeWith(Flux.just(buffer));
    }
}

Listing 10-33Custom Global Error Handler Implementation

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

MissingValueException类是一个定制的异常类,它封装了失败字段的名称。为了让前面的异常处理程序完成它的工作,必须更改BookHandler处理函数,以便在验证失败时抛出MissingValueException异常,并从请求/响应交换管道中删除onErrorResume(..)调用。清单 10-34 显示了这些变化。

package com.apress.prospringmvc.bookstore.handler;

import com.apress.prospringmvc.bookstore.util.MissingValueException;
// other imports omitted

@Component
public class BookHandler {

    private BookstoreService bookstoreService;
    private final Validator validator = new BookValidator();

    public BookHandler(BookstoreService bookstoreService) {
        this.bookstoreService = bookstoreService;
    }

    public Mono<ServerResponse> create(ServerRequest serverRequest) {
        return serverRequest.bodyToMono(Book.class)
            .flatMap(this::validate)
                .flatMap(book -> bookstoreService.addBook(book))
                .flatMap(book -> ServerResponse.created(URI.create("/book/isbn/" + book.getIsbn()))
                    .contentType(MediaType.APPLICATION_JSON).bodyValue(book));
        // no 'onErrorResume()' here
    }

    private Mono<Book> validate(Book book) {
        Errors errors = new BeanPropertyBindingResult(book, "book");
        validator.validate(book, errors);
        if (errors.hasErrors()) {
            throw MissingValueException.of(errors.getAllErrors());
        }
        return Mono.just(book);
    }
    // other handler functions emitted
}

Listing 10-34Handler Function That Throws a MissingValueException When Validation Fails

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

如果您现在运行清单 10-31 中的测试,它不会失败,因为 HTTP 响应代码是相同的,但是您会注意到响应体被简化为{"missing_value_for": "[Isbn, Category]"}

很简单,对吧?在一个实际的应用中,你可能两者都需要:因为WebExceptionHandler的实现适合全局异常,而验证错误处理可以在执行验证的地方实现,在特定类型对象的处理函数中。无论你采取什么方法,确保让使用你服务的人知道他们做错了什么。

摘要

这一章让你深入了解了在构建一个反应式 Spring WebFlux 应用时什么是重要的。我们讲述了迁移多层应用的一些细节,以强调这样一个事实:只有当一个反应式应用的所有组件都是反应式的时,它才是完全反应式的。为了帮助您从 Spring Web MVC 转换到 WebFlux,我们对这两种技术的配置进行了比较。

我们研究了使用反应式控制器以多种方式呈现百里香叶动态视图:使用数据驱动上下文变量以反应式友好的方式加载视图,使用 JavaScript 来使用Flux<T>并重新生成部分呈现的 HTML 模板,以及通过发送 SSE 来重写百里香叶视图的片段。

我们看了使用WebClient消费其他反应式服务,以及使用WebTestClientcurl命令测试反应式应用。

还讨论了功能端点的国际化、验证和错误处理,因为它们在构建 web 应用时非常重要。

从这一章中可以学到一些东西。Spring WebFlux 有一些优点,比如更干净、更简洁的代码。Spring Boot 提供了许多开箱即用的组件,使开发时间更长,配置时间更少。错误处理更容易实现,代码更容易阅读。但不是所有的元件都必须是电抗的。当您只有一个简单的页面要呈现给用户时,就没有必要使用反应式组件来呈现它。

最重要的事情:永远,永远不要打block(),尽可能避开subscribe()

Footnotes 1

https://r2dbc.io/

2

https://www.mongodb.com/

3

https://www.thymeleaf.org/apidocs/thymeleaf-spring5/3.0.11.RELEASE/org/thymeleaf/spring5/context/webflux/IReactiveDataDriverContextVariable.html

4

MongoDB 有可定制的游标,可以与 Spring Data MongoDB @Tailable注释结合使用,以带有头尾 https://docs.mongodb.com/manual/core/tailable-cursors/ 的反应式事件流的形式访问数据

5

https://developer.mozilla.org/en-US/docs/Web/API/EventSource

6

https://developer.mozilla.org/en-US/docs/Web/API/EventListener

7

https://www.thymeleaf.org/apidocs/thymeleaf-spring5/3.0.11.RELEASE/org/thymeleaf/spring5/context/webflux/IReactiveSSEDataDriverContextVariable.html

8

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/client/reactive/ReactorClientHttpConnector.html

9

https://hc.apache.org/httpcomponents-client-5.0.x/httpclient5/apidocs/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.html

10

https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-client

11

https://nodejs.org/en/

12

https://github.com/jayway/JsonPath

13

https://docs.spring.io/spring/docs/current/spring-framework-reference/testing.html#webtestclient

14

https://stackoverflow.com/questions/47527504/how-to-configure-i18n-in-spring-boot-2-webflux-thymeleaf/50055399#50055399

十一、保护 Spring WebFlux 应用

在反应式编程的两章之后,我们仅仅触及了反应式应用的表面。反应式代码不会让你的代码更简单、更易读,但它让你的代码更健壮、更容易扩展。交换大量信息的复杂应用最适合使用反应式框架来实现。通过使用反应式编程,数据可以流动,并且可以执行操作来简单有效地转换和组合数据,而不需要编写处理同步线程的复杂性的代码。

用 Spring WebFlux 编写的反应式 web 应用也可以公开反应式视图,使用户界面响应更快。反应式视图可以显示由反应式服务发送的数据,而无需冻结页面。这些主题已经讨论过了,实现它们的代码在前面两章中已经讨论过了。

本章涵盖了与 Spring WebFlux 应用相关的两个更重要的主题:如何应用背压和保护 Spring WebFlux 应用。

反压力

前两章提到了背压。有人暗示,背压表示订阅方控制其订阅的发布方发出值的速率的操作。如果没有 WebSocket 这样的双向通信协议,解决背压问题是没有意义的。

订阅者可以通过配置的方式向发布者请求特定数量的项目。前一章使用了速度较慢的发布器,它们使用zip操作符按配置的时间间隔发布项目。

让我们从介绍允许背压发生的技术开始:WebSocket 协议。

WebSocket 协议

处理数据流的一个核心建议是永远不要阻塞。使用由流发出的数据的客户端不拥有该线程,所以它永远不应该阻塞它。为了避免发出其他项,需要将它们存储在缓冲区中。但是,缓冲区是有限的,它们可能会被填满和溢出,数据可能会丢失。所以剩下唯一可能的选择:应该允许客户端控制流发出项目的速率。然而,要做到这一点,我们需要一个双向的沟通渠道。

首先,让我们快速回顾一下图 11-1 。

SpringMVC WebFlux 高级教程(五)

图 11-1

TCP 之上的通信技术

浏览器和服务器之间的正常通信是通过 TCP 连接完成的。HTTP 协议 1 是 TCP 之上的通信协议,它包含一旦数据到达就读取和处理该数据的特定指令。客户端向服务器发出 HTTP 请求,服务器用 HTTP 响应进行应答。客户机和服务器之间的多次请求和响应交换可以通过 HTTP 持久连接(保持活动)进行。客户端和服务器之间的这种通信方式被称为轮询,因为它们定期通过 HTTP 连接向服务器请求新数据。为章节 1 到 8 编写的应用就是为这种类型的通信而设计的。

第十章中介绍的服务器发送事件(SSE)2T3,是一种服务器推送技术,使客户端能够通过 HTTP 连接从服务器接收自动更新。这是一种单向通信,类似于 JMS 中的发布/订阅模型。

WebSocket 3 是一种计算机通信协议,通过单一 TCP 连接提供全双工通信通道。WebSocket 是 HTTP 的替代方案,它允许在浏览器(客户端)和服务器之间进行双向交互通信。它支持 TCP 之上的消息流,其 API 消息可以发送到服务器,并且可以接收事件驱动的响应,而无需轮询。WebSocket 被设计成 HTTP 兼容的。HTTP 仅用于握手。这是两个协议之间连接的终点。在引擎盖下,他们是非常不同的。WebSocket 是一个底层的传输协议,第一个 URL 请求建立一个连接,之后所有的应用消息都流经同一个 TCP 连接。图 11-2 大致描绘了客户端和服务器之间使用 WebSocket 的通信。

SpringMVC WebFlux 高级教程(五)

图 11-2

使用 WebSocket 进行通信

WebSocket 上的通信从一个 HTTP 请求开始,请求使用 WebSocket 协议。该请求应该有一个名为Upgrade的头,其中填充了值WebSocket,还有一个名为Connection的头,其中填充了值Upgrade。一些 base-64 编码的随机字节头可以防止同一条消息被发送两次。清单 11-1 中描述了这个初始 HTTP 请求和响应的内容示例。

 --- HTTP  Request ---
GET /chat HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Version: 13
Origin: http://localhost:8080

 --- HTTP  Response ---
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

Listing 11-1WebSocket HTTP Request and Response Handshake Example

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

响应应该是 HTTP 状态代码 101,这表示 WebSocket 接受通信。在这个初始握手之后,客户机(浏览器)和服务器通过 WebSocket 会话进行通信,任何一方都可以结束这个会话。

WebSocket 是需要频繁高速交换小块数据的应用的合适解决方案(例如,社交媒体、交易、视频游戏、博彩网站等)。).如果信息交换量相对较低,经典的 HTTP 轮询解决方案可能会提供有效的解决方案。在通信的数据传输部分,客户端和服务器可以同时向对方发送消息,这为向您的应用添加更健壮的实时通信功能打开了大门。

大多数现代浏览器都支持 WebSocket。

WebSocket 协议定义了一个ws://前缀来表示 WebSocket 连接。wss://前缀表示安全、加密的 WebSocket 连接用于通信。WebSocket 不知道代理服务器和防火墙,但它与 HTTP 兼容,并使用 HTTP 80 和 443 端口进行通信。未加密的 WebSocket 流量在到达 WebSocket 服务器的途中流经一个显式或透明的代理服务器。如果代理服务器没有配置为支持未加密的 WebSocket 流量,连接很可能会失败。加密的 WebSocket 流量是通过使用传输层安全性(TLS)的连接完成的。当浏览器配置为使用显式代理服务器时,会发出 HTTP CONNECT 命令。这建立了一个隧道,它通过 Web Sockets 安全客户端和 WebSocket 服务器之间的 HTTP 代理提供低级的端到端 TCP 通信。

从 4.0 版本开始,Spring 框架支持 WebSocket 风格的消息传递,并将 STOMP 作为应用级别的子协议。在框架内,对 WebSocket 的支持在spring-websocket模块中,它与 Java WebSocket API 标准(JSR-356)兼容。 4 对于一个 servlet 环境,还有一个 Spring Boot 启动器依赖:spring-boot-starter-websocket。在讨论在反应式应用中使用 WebSocket 进行通信之前,让我们先了解一下如何在非反应式应用中使用 WebSocket。

对非反应式应用使用 WebSocket API

当使用 Spring 的 WebSocket API 时,通常会实现org.springframework.web.socket.WebSocketHandler接口,或者使用方便的子类,比如用于处理二进制消息的org.springframework.web.socket.handler.BinaryWebSocketHandler,用于 SockJS 消息的org.springframework.web.socket.sockjs.transport.handler.SockJsWebSocketHandler,或者用于处理基于String的消息的org.springframework.web.socket.handler.TextWebSocketHandler。在我们的例子中,为了简单起见,我们使用一个TextWebSocketHandler来通过 WebSocket 传递字符串消息。您可以在本书的源代码中找到连接和发送消息到服务器的 JavaScript 代码。本节重点介绍实现一个非常基本的聊天功能的处理程序并对其进行配置所必需的 Spring 代码。如前所述,处理程序接收和发送文本消息,因此处理程序类必须扩展TextWebSocketHandler。代码如清单 11-2 所示。

package com.apress.prospringmvc.bookstore;

import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.List;
import java.util.Random;

public class ChatHandler extends TextWebSocketHandler {

    @Override
    public void handleTextMessage(WebSocketSession session, TextMessage textMessage) throws IOException {
        if(textMessage.getPayload().toLowerCase().contains("hello")||
            textMessage.getPayload().toLowerCase().contains("hi")) {
            session.sendMessage(new TextMessage(BOT_ANSWERS.get(0)));
            session.sendMessage(new TextMessage(BOT_ANSWERS.get(1)));
        } else {
            session.sendMessage(new TextMessage(randomMessages()));
        }
    }

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final List<String> BOT_ANSWERS = List.of(
            "Hello!",
            "How can I help?"
            // ... more messages omitted
    );

    private static String randomMessages() {
        return BOT_ANSWERS.get(RANDOM.nextInt(BOT_ANSWERS.size() - 2) + 2);
    }
}

Listing 11-2The ChatHandler Class That Extends TextWebSocketHandler

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

ChatHandler实现了一个非常基本的聊天机器人,它用一个固定集合中的随机消息来回复收到的消息。这个实现处理从客户端收到的请求,这些请求是使用特定于 WebSocket 官方 API 的 JavaScript 函数发送的。 5 现在我们有了一个用于 WebSocket 通信的处理程序,让我们将它映射到一个 URL 路径,并告诉 Spring 我们正在使用 WebSocket 与客户端通信。对此有一个特殊的注解叫做@EnableWebSocket

清单 11-3 描述了应用支持 WebSocket 所必需的 Spring 配置类。

package com.apress.prospringmvc.bookstore.web.config;

import com.apress.prospringmvc.bookstore.ChatHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
@EnableAsync
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatHandler(), "/chatHandler").setAllowedOrigins("*");
    }

    @Bean
    public ChatHandler chatHandler() {
        return new ChatHandler();
    }
}

Listing 11-3The Spring WebSocket Configuration Class

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

前面的配置类包含一些需要更详细解释的元素。

  • @EnableWebSocket :当放置在 Spring 配置类上时,它启用 WebSocket 请求处理。

  • WebSocketConfigurer :必须实现该接口才能访问WebSocketHandlerRegistry。应该实现一个方法registerWebSocketHandlers(WebSocketHandlerRegistry)来将 WebSocket URL 路径映射到适当的处理程序。

  • setAllowedOrigins("*") :这个方法被设置为允许从任何来源调用我们的应用。如果你熟悉 CORS(跨源资源共享),你知道出于安全原因,浏览器限制从脚本发起的跨源 HTTP 请求。默认情况下,只允许来自同一来源的呼叫。在产生的WebSocketHandlerRegistration上调用该方法可以确保您不会在本地遇到问题,尤其是在 127.0.0.1 上的浏览器中打开应用并尝试使用localhost发送 WebSocket 消息时。我在写代码的时候就遇到了这种情况,这就是为什么我认为它值得一提。在生产中,您不应该这样做,最多,您应该配置允许来源的列表。

  • 一个非常有用的注释,因为它支持异步消息传递。这意味着一旦连接打开,客户端和服务器就可以并行发送消息。

在这个简单的例子中,客户机由浏览器中执行的 JavaScript 代码表示,浏览器是chat.html视图的一部分。实现很简单。它使用 jQuery for JavaScript 事件声明侦听器,以连接到服务器、发送/接收消息或关闭连接。清单 11-4 描述了它的实现和 HTML 元素。

<script th:inline="javascript">
    var ping;
    var websocket;

    jQuery(function ($) {
        function writeMessage(message) {
            $('#messageOutput').append(message + '
')
        }

        $('#connect')
            .click(function doConnect() {
                var handlerURL = 'ws://localhost:8080/chapter11-1/chatHandler';
                websocket = new WebSocket(handlerURL);
                websocket.addEventListener('message', function (evt) {
                    writeMessage('STAFF: ' + evt.data);
                });

                websocket.addEventListener('open', function(evt) {
                    writeMessage("CONNECTED");
                });

                websocket.addEventListener('close', function (evt) {
                    writeMessage(`DISCONNECTED.
                        Reason: code=${evt.code}, reason=${evt.reason}`);
                });

                websocket.onerror = function (evt) {
                    writeMessage('ERROR:' + evt.data);
                };
            });

        $('#disconnect')
            .click(function () {
                if(typeof websocket != 'undefined') {
                    websocket.close();
                } else {
                    alert("Not connected.");
                }
            });

        $('#send')
            .click(function () {
                if(typeof websocket != 'undefined') {
                    websocket.send($('#message').val());
                    writeMessage('USER:' + $('#message').val());
                } else {
                    alert("Not connected.");
                }
            });
    });
</script>
<div class="left_content" id="left_content"
     th:fragment="~{template/layout :: left_content}" >
    <fieldset>
        <legend th:text="#{chat.title}">CONTACT STAFF</legend>
        <table>
            <tr>
                <td colspan="2"><button id="connect"
                    th:text="#{button.connect}">CONNECT</button></td>
            </tr>
            <tr>
                <td><input id="message" value=""/></td>
                <td><button id="send"
                    th:text="#{button.send}">SEND</button></td>
            </tr>
            <tr>
                <td colspan="2" align="center">
                    <textarea readonly id="messageOutput"
                        rows="10" cols="50"></textarea></td>
            </tr>
            <tr>
                <td colspan="2"><button id="disconnect"
                    th:text="#{button.disconnect}">Disconnect
                    </button></td>
            </tr>
        </table>
    </fieldset>
</div>
<!-- other HTML code omitted -->

Listing 11-4JavaScript Functions

for Establishing a WebSocket Connection and Sending/Receiving Messages Over It

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

使用chat.html模板生成的 HTML 页面如图 11-3 所示。

SpringMVC WebFlux 高级教程(五)

图 11-3

聊天页面

例如,这种类型的实现非常适合与提供支持的真人聊天。现在,让我们回到反应世界。

在反应式应用中使用 WebSocket API

在前面的章节中,由 Node.js 服务产生的新闻流被 Spring WebFlux 书店应用使用。使用一个数据驱动变量,使用一个WebClient来消费该流并产生一个注入百里香模板的Flux<String>。使用 WebSocket,我们可以避免使用被动视图,而使用 JavaScript 代码通过 WebSocket 连接直接与提供数据的服务器通信。从通信中删除一个节点可能会加快传输速度,并且不再需要编写一些 Java 代码。

在本节中,我们不再使用 Node.js 服务,而是使用WebSocketHandler实现相同的功能。Spring 框架提供了一个反应式 WebSocket API,您可以使用它来编写处理 WebSocket 消息的客户端和服务器端应用。reactive org.springframework.web.reactive.socket.WebSocketHandler接口声明了一个应该由开发人员实现的用于处理 WebSocket 会话的抽象方法(参见清单 11-5 )。

package org.springframework.web.reactive.socket;
// imports and default method omitted

public interface WebSocketHandler {
    Mono<Void> handle(WebSocketSession session);
}

Listing 11-5WebSocketHandler Method

Skeleton for handle(WebSocketSession)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

org.springframework.web.reactive.socket.WebSocketSession接口表示一个 WebSocket 会话,它声明了一组在双方之间交换信息的方法。最重要的是send(Publisher<WebSocketMessage>)receive()(见清单 11-6 )。

package org.springframework.web.reactive.socket;
// imports and other methods omitted

public interface WebSocketSession {
    WebSocketMessage textMessage(String payload);

    Mono<Void> send(Publisher<WebSocketMessage> messages);
    Flux<WebSocketMessage> receive();

    Mono<Void> close(CloseStatus status);
}

Listing 11-6WebSocketSession Method Skeleton for send(..) and receive()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

Spring 为这个接口提供了非常有用的org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession<T>实现. ItWebSocketSession接口中的所有方法提供了基本实现。在向客户端发送数据之前,必须将其转换为 WebSocket 协议能够识别的格式。任何类型对象的字符串表示都可以通过使用实用方法textMessage(String)转换成org.springframework.web.reactive.socket.WebSocketMessage。如果客户机是一个浏览器,并且使用 JavaScript 将数据呈现到浏览器中,那么最合适的文本表示就是 JSON。将WebSocketMessage实例转换为String的反向功能由WebSocketMessage类中声明的getPayloadAsText()方法提供。

一个WebSocketHandler实现必须将入站(来自客户端的消息)和出站(发送到客户端的消息)流组成一个统一的流,并返回一个Mono<Void>。前面提到过,WebSocket 通信可以被任何一方关闭。这意味着,根据应用要求,当发生以下情况时,统一流程完成。

  • 入站或出站消息流完成。

  • 入站流完成,连接关闭,而出站流是无限的。

  • 在选定的时间点(服务器超时),通过调用WebSocketSessionclose(CloseStatus)方法。(org.springframework.web.reactive.socket.CloseStatus类包含一组常数值,代表最常见的 WebSockets 状态代码。1000是用于优雅的沟通结束的代码。) 6

在书店应用的上下文中,WebSocketHandler的实现应该接收来自客户端的消息,同时还向应用发送技术新闻。清单 11-7 描述了这样一个实现。

package com.apress.prospringmvc.bookstore.handler;

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
// other imports omitted

public class TechNewsHandler implements WebSocketHandler {
    private final Logger logger = LoggerFactory.getLogger(NewsWebSocketHandler.class);

    private final AtomicLong rate = new AtomicLong(2000);

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<String> newsFlux = Flux.fromStream(
                Stream.generate(BookNewReleasesUtil::randomNews))
        .delayElements(Duration.ofMillis(rate.get()));
        return session.send(newsFlux.map(session::textMessage))
                .and(session.receive()
                    .map(WebSocketMessage::getPayloadAsText)
                    .doOnNext(message -> logger.debug("Client says: {}", message))
                );
    }
}

Listing 11-7TechNewsHandler Implementation

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

在清单 11-7 中,入站和出站流合并成一个统一的流可能并不明显。这就是声明式编程和 lambdas 的危险。清单 11-8 中的实现清楚地表明了这两个不同的流。

package com.apress.prospringmvc.bookstore.handler;
// other imports omitted

public class TechNewsHandler implements WebSocketHandler {
    private final Logger logger = LoggerFactory.getLogger(TechNewsHandler.class);

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        var inbound = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(message -> logger.debug("Client says: {}", message))
                .then();

        var source =  Flux.generate(
            (SynchronousSink<String> synchronousSink) ->
             synchronousSink.next(BookNewReleasesUtil.randomNews())
        );

        var outbound = session.send(source.map(session::textMessage)
            .delayElements(Duration.ofSeconds(2L))); // artificial delay

        return Mono.zip(inbound, outbound).then();
    }
}

Listing 11-8TechNewsHandler Implementation

Making the Two Streams Obvious

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

send(..)方法获取发送给客户端的消息的来源;在这种情况下,无限的Flux<String>发出随机的科技新闻。receive()方法返回一个流,该流发出代表从客户端接收的消息的WebSocketMessage实例。这些信息使用getPayloadAsText()方法转换成String,并打印在控制台上。

现在我们有了一个WebSocketHandler,我们必须将它映射到一个 URL。在一个将spring-boot-starter-webflux声明为依赖项的 Spring Boot 应用中,这是通过声明一个包含 URL 路径和TechNewsHandler bean 之间对应关系的HandlerMapping bean 和声明一个WebSocketHandlerAdapter来完成的。WebSocketHandlerAdapter委托给一个WebSocketService。默认情况下,这是一个类型为HandshakeWebSocketService. I的实例。ts 的职责是对与 WebSocket 相关的 HTTP 请求执行基本检查,并从WebSession中提取属性,以将它们插入到WebSocketSession中(这在需要身份验证时非常有用)。

不需要特殊的注释。只需在一个用@Configuration标注的类中声明三个 beans,它们就被 Spring 捡起来使用了。配置所需的所有接口和类都是org.springframework.web.reactive包及其子包的一部分。清单 11-9 描述了这个非常简单的配置。

package com.apress.prospringmvc.bookstore.config;

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
// other imports omitted

@Configuration
public class WebSocketConfig {

    @Bean
    WebSocketHandler techNewsHandler(){
        return new TechNewsHandler();
    }

    @Bean
    HandlerMapping handlerMapping(WebSocketHandler techNewsHandler) {
        return new SimpleUrlHandlerMapping() {
            {
                setUrlMap(Collections.singletonMap("/ws/tech/news", techNewsHandler));
                setOrder(-1);
            }
        };
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

}

Listing 11-9Spring Necessary Beans to Configure WebSocket Communication

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

这就是在 Spring 应用中配置 WebSocket 通信支持所需的全部内容。该处理程序的映射顺序设置为–1,以确保 WebSocket 请求在带注释的控制器之前得到处理。用于发出 WebSocket 请求的 JavaScript 代码非常简单;它在清单 11-10 中进行了描述。

<script th:inline="javascript">
    $( window ).on( "load", function() {
        renderNews.start();
    });
    $( window ).on( "onbeforeunload", function() {
        renderNews.start();
    });

    var renderNews = {
        socket : new WebSocket('ws://localhost:8080/ws/tech/news'),
        fromServer: [],
        start: function () {
            this.socket.addEventListener('message', function (event) {
                let message = event.data

                $("#techNews").html(message);
                renderNews.fromServer.push(event.data);
                if(renderNews.fromServer.length % 10 === 0) {
                    renderNews.socket.send('Slow down mate!');
                } else  if(renderNews.fromServer.length % 15 === 0) {
                    renderNews.socket.send('Faster mate!');
                }
            });

            this.socket.addEventListener('open', function(event) {
                console.log('Opening connection...');
                renderNews.socket.send('Give me your best shot');
            });

            this.socket.addEventListener('close', function(event) {
                if (event.wasClean) {
                    console.log(`Clean closing...
                        code=${event.code} reason=${event.reason}`);
                } else {
                    // event.code is usually 1006 in this case
                    console.log('Server closed the connection.');
                }
            });

            this.socket.addEventListener('error', function(event) {
                console.log(`Well bummer... ${error.message}`);
            });
        },
        stop: function() {
            this.socket.close();
        }
    };
</script>

Listing 11-10JavaScript Code Written Using Official WebSocket API in(part of the search.html template file

)

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

JavaScript 代码类似于清单 11-4 ,因为它使用相同的 WebSocket API 来处理连接、关闭和消息事件。

不幸的是,没有 Java 客户端来测试 WebSocket 反应式通信。而且curl在这方面也做得不好。测试它的唯一方法是运行项目并添加console.log语句来调试它。或者试试简单的 WebSocket 客户端 Chrome 插件。 7

既然我们有了一个使用 WebSocket 通信的客户机和服务器,那么是时候讨论背压这个主题了。

处理背压

背压是反应式编程的独角兽。几乎每个软件工程师都知道如何定义它,并且必须尽早处理它。术语背压借用自流体动力学,但在软件中,它表示阻止数据通过软件的力。如果你有两个应用在交换数据,而其中一个不能足够快地处理接收到的数据,这就是在阻止数据流。背压会导致阻塞和数据丢失,所以处理背压就是编写代码在服务器端调节数据流,在客户端实现一些数据保存机制。

如今,服务器和客户端应用通过网络分离,通信通过 TCP 完成。为了理解如何通过网络处理背压,有必要了解 TCP 的工作原理。在因特网上建立通信可以通过各种协议来完成,而网络协议是一个庞大的主题,已经有不止一本书写了关于它们的内容。但是,在这本书的背景下,我尽量保持简单。传输控制协议(TCP)是互联网协议簇的核心协议。通过 HTTP、WebSocket 和 TCP 之上的其他协议进行通信的应用通常对传递每个信息位(包)并不敏感,而是对传递所有信息的总持续时间敏感。这就是为什么每次包到达目的地时,都会发出一个确认信号,这确保了成功的端到端通信。

处理背压适合在哪里?应该实现某种机制来控制发送到网络或从网络接收的逻辑元素的数量。TCP 有自己的流量控制, 8 但是它适用于包。TCP 不知道你使用它上面的协议发送的内容的整体形状。因此,在 TCP 之上使用协议的应用必须将逻辑元素转换成字节。但是他们无法控制这些元素在被转换成包后是如何传输的。背压由 TCP 流量控制调节。当然,可以在应用中添加逻辑来控制元素发出的速率,但是客户端对此有什么可说的吗?

Spring Framework 5 在框架中实现了 WebSockets 支持的现代化,为这个通信通道增加了反应能力。一旦建立了 WebSocket 通信,客户端和服务器就可以独立地向对方发送在专用流上发出的消息。在非反应式应用中,客户端发送一条或多条消息,服务器也通过发送一条或多条消息做出反应。在反应式应用中,客户端和服务器可以通过 WebSocket 连接相互发送消息流。

在前面的例子中,我们已经看到消息以一定的速率发送到客户端。客户端同时发送的消息只是被打印出来,它们不会以任何方式中断或影响服务器的行为。运行清单 11-10 中描述的代码的浏览器和运行清单 11-8 中代码的服务器之间的交互的可视化表示如图 11-4 所示。

SpringMVC WebFlux 高级教程(五)

图 11-4

JavaScript 客户端和 Spring WebFlux 应用通过 WebSocket 进行通信

服务器通过inbound流接收客户端发送的消息,并使用outbound流将消息发送给客户端。客户机在浏览器的 HTML 页面中呈现消息,服务器在控制台中打印它收到的消息。没用吧?没多大作用。在 Internet 上也找不到其他的基本示例,它们实现了一种 echo 通信,在这种通信中,服务器用客户端发送的消息进行响应。应用级别的背压控制的例子很简单,只需使用定制的订户来处理块中发出的元素,从而调节流量。这假设客户端应用是使用反应流 API 实现编写的。

清单 11-11 是一个使用项目反应器BaseSubscriber<T>的实现来处理背压的简单例子。消息源与前一节中使用 WebSocket 发送消息的流相同,但是它被限制为 20 个元素,以保持执行的有限性。

@Test
void testBackpressureHandlingOne() {
    var techNews = Flux.fromStream(
            Stream.generate(BookNewReleasesUtil::randomNews))
        .take(20).log(); // server outbound stream

    // client
    techNews.subscribe(new BaseSubscriber<>() {
        int processed;
        final int limit = 5;

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            subscription.request(limit);
        }

        @Override
        protected void hookOnNext(String news) {
            //client logic here
            if (++processed >= limit) {
                processed = 0;
                request(limit);
            }
        }
    });
}

Listing 11-11Backpressure Handling Example Using Customized BaseSubscriber<T>

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

Project Reactor 的BaseSubscriber<T>中的hookOnSubscribe(Subscription)实现被认为是无限的,这意味着它请求的元素数量与subscription.request(..)方法的声明参数类型的上限(即long)一样多,因此默认情况下,它从流中请求Long.MAX_VALUE元素。您可以通过查看BaseSubscriber<T> Java 代码来检查这一点。 9

这并不总是好的,因为消费者处理接收到的值可能会很慢。

再打个流体力学的比方,客户控制水龙头杠杆来决定水压是很正常的吧?下面的代码相当于使用了limitRate(int)方法,当在Flux<T>实例上调用该方法时,它会将后续订阅者的请求限制在作为参数提供的数量内(这样就像一个流量杠杆)。清单 11-12 中描述了相当于清单 11-11 的代码。

@Test
void testBackpressureHandlingTwo() {
    var techNews = Flux.fromStream(
            Stream.generate(BookNewReleasesUtil::randomNews))
        .take(20).log();
    consume(techNews.limitRate(5));
}

private void consume(Flux<String> input) {
    input.subscribe(/*s -> clientLogicHere(s)*/);
}

Listing 11-12Backpressure Handling Example Using Flux.limitRate(int)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

当您运行前面的任何测试时,您会看到日志消息清楚地表明数据是以五条消息的形式发出的,这让客户端可以松口气。但是,大多数人期望客户机告诉服务器首选的发射速率,对吗?嗯,我这样做是因为我得到了反应流和双向连接的承诺,我想一起使用它们。我试图修改TechNewsHandler,以便当从客户端接收到一条消息时,服务器用一个根据消息以不同速率发出值的流来响应。信息*慢点,伙计!使频率下降到每 5 秒发射一个值,并且更快,伙计!*使速率增加到每 2 秒发射一个值。清单 11-13 中描述了一个解决方案。

package com.apress.prospringmvc.bookstore.handler;
// other imports omitted

public class TechNewsHandler implements WebSocketHandler {
    private final Logger logger = LoggerFactory.getLogger(TechNewsHandler.class);

    private Flux<String> getRandomNews(String message){
        long rate = "Slow down mate!".equals(message) ? 5000:2000L;
        return  Flux.fromStream(Stream.generate(BookNewReleasesUtil::randomNews))
            .delayElements(Duration.ofMillis(rate));
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .log()
                .flatMap(this::getRandomNews)
                .map(session::textMessage)).then();
    }
}

Listing 11-13ServerController

Modified to Support Different Emission Rates for Messages

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

这看起来是一个非常优雅的解决方案,既漂亮又干净,但是它能像预期的那样工作吗?

最长的答案是:不尽然。

简短的回答是:没有。

反应式编程的声明式风格和对它的误解有时会导致这样的怪物。前面的handle(..)方法的实现可以总结如下:每次在这个会话中收到一条消息时,发送一个由getRandomNews()调用返回的无限消息流。该函件可归纳如下。

  • 客户发送,给我你最好的照片。

  • 服务器上的一个数据流每 2 秒钟就开始发送随机的科技新闻。

  • 客户发送慢点,伙计!

  • 服务器上的另一个流开始每 5 秒钟发送一次随机的科技新闻。

  • 客户发送,快点,伙计!

  • 服务器上的另一个数据流每 2 秒钟就开始发送随机的科技新闻。

等等。

没有人取消订阅这些流,结果是什么呢?所有流都会继续发出元素,直到用户关闭网页或者服务器内存不足。所有这些流都在同一个WebSocketSession将它们的消息发送到同一个客户端,所以结果会淹没浏览器。背压控制没有正确实现,因为控制流是不可能的,因为处理后续消息之间不共享状态。

在本书附带的源代码中,您可以找到 Node.js 服务器实现(文件tech-news-server.js),它允许客户端控制发射速率。这是可能的,因为使用了setIntervalclearInterval方法。对生成消息的函数的引用在同一会话中处理后续消息时共享。没有使用 JavaScript 反应流库,所以反应是模拟的。但是,它的工作是考虑客户的愿望。

使用 Spring WebFlux 无法编写等效的版本。一个ConcurrentHashMap<StringFlux<WebSocketMessage>>可以存储映射到每个WebSocketSession的出站流的引用。当接收到消息时,从连接映射中检索对出站流的引用,只需在现有流上调用.subscribe().dispose()并用另一个具有不同值发出速率的流来替换它。主要问题是反应式编程的声明式风格阻止您替换现有的流。这是因为不变性。

呈现在网页上的随机新闻不需要与客户端进行太多的交流。客户端必须打开页面,这将打开 WebSocket 连接,通过该连接传送数据,该操作还意味着订阅服务器上的技术新闻流,然后关闭页面,这将取消订阅,因为 WebSocket 连接也被关闭。仅此而已。在这种情况下,根据客户端的偏好来调节消息发送速率是没有意义的。上一次一个网站被设计成降低广告速度以避免浏览器冻结是什么时候?没人关心这个,这就是广告拦截器被发明的原因。服务器被设计成以保持信息可见和有用的频率向客户机发送消息。

来自客户端的消息影响服务器在其流上发回的内容的唯一方式是将它们保存在数据库中,向客户端发送消息的流将该数据库用作源。添加一个反应式流程,使用客户端消息作为命令来控制需要生成什么并发送给客户端,这样两者之间就有了良好的交互。这种情况如图 11-5 所示,请记住:您可以影响在出站流上发送什么而不是发送消息的频率

SpringMVC WebFlux 高级教程(五)

图 11-5

反应式应用,在这种应用中,客户端可以控制服务器使用共享数据库发送哪些消息

Project Reactor 是一个反应流实现,因此它的所有操作符都支持非阻塞背压。然而,这仅适用于服务器端的 Java 应用。无论是 HTTP 还是 WebSocket,通信协议都限制了无法正确处理背压。关于如何处理两个 Spring WebFlux 应用之间的背压的最详细的解释可以在 StackOverflow 10 上找到,这是 Project Reactor 的知名贡献者 Oleh Dokuka 的精彩贡献。可以得出的结论是,我们可以声明几个预取的元素,并将服务器发出的数据分割成块,以抑制需求(就像前面的示例中所做的那样)。但是,在数据开始流动后,客户端没有办法影响这一点。

Oleh Dokuka 说,要通过网络边界实现逻辑元素背压,我们需要一个合适的协议。该协议是 RSocket,将在下一节中介绍。

RSocket 协议

RSocket 11 是一种二进制应用协议,提供可用于字节流传输(如 TCP、WebSockets 和 Aeron)之上的反应流语义。它是由当今最受欢迎的流媒体平台网飞的工程师开发的。它支持在单个连接上异步交换消息,具有以下特点。

  • fire-and-forget(无响应):比如 handler 方法返回Mono<Void>,声明一个类型为 RequestMessageType 的参数;您可以将此视为客户端和服务器之间的一对一通信。HTTP 支持这一点,但是响应的缺乏让一些浏览器感到困惑。

  • 请求/响应(stream of 1) :例如 handler 方法返回Mono<response messagetype>并声明一个类型为 RequestMessageTypeMono<request message type>的参数;您可以将此视为客户端和服务器之间的一对一通信。HTTP 支持这一点。

  • 请求/流(有限多流):例如 handler 方法返回Flux<response messagetype>并声明一个类型为 RequestMessageTypeMono<request message type>的参数;您可以将此视为客户端和服务器之间的一对多通信。WebSocket 支持这一点。

  • 通道(双向流):比如 handler 方法返回Flux<response messagetype>,声明一个Flux<request message type>类型的参数;您可以将此视为客户端和服务器之间的多对多通信。

    SpringMVC WebFlux 高级教程(五) RequestMessageTypeRequestMessageType 是占位符类型,替换客户端和服务器之间通信中使用的真实类型,以创建通用模式。

RSocket 最好的一点是有 Java、JavaScript、Kotlin、.NET,Python,Go,C++。理论上,这意味着用 JavaScript 开发的应用可以使用该协议与 Java 应用交换消息,这意味着可以在逻辑元素级别应用背压。RSocket 还没有被广泛使用,但是它确实有很大的潜力。不幸的是,在书店应用中包含 RSocket 支持会带来很多麻烦,因为rsocket-js API 是作为一组 Node.js 模块提供的。所以取而代之的是,因为这本书关注的是 Spring,所以我们写了一个 Spring WebFlux RSocket 服务器和客户端。

RSocket 的 Java 实现建立在 Project Reactor 之上。正如您可能预料的那样,有一个名为org.springframework.boot:spring-boot-starter-rsocket的 RSocket 的 Spring Boot 启动器依赖项,它很容易使用。当这个库出现在类路径中时,会使用您的application.propertiesapplication.yaml文件中的属性自动配置和定制一个org.springframework.boot.rsocket.netty.NettyRSocketServer bean。需要在应用配置文件中设置两个属性:服务器端口和进行 RSocket 通信的传输协议。没有它们,应用将无法启动。

在清单 11-14 中,您可以看到示例配置文件,这些文件配置服务器在端口 8081 上启动,通过 WebSocket 发送和接收消息。

spring:
  rsocket:
    server:
      transport: websocket
      port: 8081
      mapping-path: /rsocket

Listing 11-14Spring Boot RSocket Server Application Configuration File

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

spring.rsocket.server.transport属性的另一个选项是TCP,这也是默认选项,假设 RSocket 协议用于 HTTP 之上。spring.rsocket.server.mapping-path是可选的,可用于设置 RSocket 应用的根上下文路径。如果未指定,则不设置上下文路径。

现在我们有了一个服务器,我们需要一个控制器或处理程序类来处理 RSocket 请求。目前仅支持控制器。你可以在一个@Controller注释的类中拥有处理程序方法,方法是用@MessageMapping(path)进行注释,来自包org.springframework.messaging.handler.annotation。还没有提供 RSocket 请求的自定义处理程序类和功能路由。我的技术评论员想要一个 GitHub 问题链接,但是没有。API 还没有出现,还没有人请求它。

前面已经介绍了 RSocket 上的四种通信方式。清单中的ServerController11-15 包含四种方法,每种方法处理一种类型的通信。当客户端和服务器交换比简单文本值更复杂的对象时,必须使用消息转换器来支持序列化和反序列化。默认情况下,使用 JSON,但是如果您正在定制通信并使用其他消息转换器,您可能需要用@Payload注释 RSocket 处理程序方法的参数。在清单 11-15 中,使用了注释,即使不是必需的,因为默认情况下,被处理的对象可以在 JSON 中序列化和反序列化。

package com.apress.prospringmvc.bookstore.controller;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
// other imports omitted

@Controller
public class ServerController {
    private final Logger logger = LoggerFactory.getLogger(ServerController.class);

    // fire-and-forget
    @MessageMapping("introduction")
    public Mono<Void> introduction(@Payload ClientMessage clientMessage){
        logger.debug("{}:  We have a new client -->  {}" , Instant.now(), clientMessage.getName());
        return Mono.empty();
    }

    // request/response
    @MessageMapping("check-service")
    public Mono<String> checkService(@Payload ClientMessage clientMessage){
        logger.debug("{}:  Ping request from client --> {}" , Instant.now(), clientMessage.getName());
        return Mono.just(Instant.now() + ": Service online. Send command.");
    }

    // request/stream
    @MessageMapping("show-books")
    public Flux<Book> showBooks(@Payload ClientMessage clientMessage) {
        logger.debug("{}:  Random releases requested by client --> {}" , Instant.now(), clientMessage.getName());
        return Flux.fromStream(
                Stream.generate(BookNewReleasesUtil::randomRelease))
                .delayElements(Duration.ofSeconds(1L));
    }

    //channel (bi-directional streams)
    @MessageMapping("books-channel")
    public Flux<Book> useChannel(@Payload Flux<ClientMessage> messages) {
        return messages
                .map(message-> BookNewReleasesUtil.randomForAuthor(message.getAuthor()))
                .delayElements(Duration.ofSeconds(1L));
    }
}

Listing 11-15RSocket Requests Handler Methods in a @Controller

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

每个方法接收一个ClientMessage实例,作为参数,该实例是一个简单的类,包含发出请求的客户机的名称。useChannel(Flux<ClientMessage>)方法处理客户机和服务器之间的双向通信流。它从客户端接收消息流,处理每个消息流,并使用基于从客户端接收的数据发出的数据流进行响应。在这种情况下,ClientMessage实例包含一个作者姓名,在生成随机书籍时用作标准。这是一个非常简单的类,与此无关。

如果您启动应用,您会看到服务器在 8081 端口上启动,但是我们如何测试它呢?还记得我说过这个协议很有潜力吗?我不是唯一这样想的人。这就是为什么 Pivotal/VMWare 的一名开发人员创建了一个命令行实用程序来帮助测试 RSocket 服务器。 13 简称 RSocket 客户端 CLI (RSC)或rsc。它可以测试前三种方法。测试双向是一项困难的任务,因为使用命令行提供流作为参数很麻烦。在本章的代码中,您可以找到测试所有三种方法的命令。在清单 11-16 中,您可以看到用于测试showBooks()方法的rsc命令。--debug选项得到一个详细的结果,以文本和二进制格式显示有效负载数据。

$ rsc ws://localhost:8081/rsocket --stream --route show-books --log --debug -d "{"name": "Gigi"}"
2020-08-17 21:44:17.425  INFO --- [ctor-http-nio-1] rsc             : onSubscribe(FluxMap.MapSubscriber)
2020-08-17 21:44:17.425  INFO --- [ctor-http-nio-1] rsc             : request(unbounded)
2020-08-17 21:44:17.433 DEBUG --- [ctor-http-nio-1] i.r.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 40 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-----------------------------------------------+----------------+
|00000000| 0a 73 68 6f 77 2d 62 6f 6f 6b 73                |.show-books     |
+--------+-----------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-----------------------------------------------+----------------+
|00000000| 7b 22 6e 61 6d 65 22 3a 20 22 47 69 67 69 22 7d |{"name": "Gigi"}|
+--------+-----------------------------------------------+----------------+
2020-08-17 21:44:18.439 DEBUG --- [ctor-http-nio-1] i.r.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 123
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-----------------------------------------------+----------------+
|00000000| 7b 22 74 69 74 6c 65 22 3a 22 52 65 61 63 74 69 |{"title":"Reacti|
|00000010| 76 65 20 53 70 72 69 6e 67 22 2c 22 70 72 69 63 |ve Spring","pric|
|00000020| 65 22 3a 32 35 2e 34 34 2c 22 79 65 61 72 22 3a |e":25.44,"year":|
|00000030| 32 30 32 30 2c 22 61 75 74 68 6f 72 22 3a 22 4a |2020,"author":"J|
|00000040| 6f 73 68 20 4c 6f 6e 67 22 2c 22 69 73 62 6e 22 |osh Long","isbn"|
|00000050| 3a 22 39 37 38 31 34 38 34 32 32 37 31 31 31 22 |:"9781484227111"|
|00000060| 2c 22 63 61 74 65 67 6f 72 79 22 3a 22 53 70 72 |,"category":"Spr|
|00000070| 69 6e 67 22 7d                                  |ing"}           |
+--------+-----------------------------------------------+----------------+
2020-08-17 21:44:18.439  INFO --- [ctor-http-nio-1] rsc: onNext({"title":"Reactive Spring","price":25.44,"year":2020,"author":"Josh Long","isbn":"9781484227111","category":"Spring"})

# more elements here since this stream is infinite

Listing 11-16Output of rsc Command

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

SpringMVC WebFlux 高级教程(五)您可能已经注意到,请求/响应请求/流方法没有将 Mono < ClientMessage >声明为参数。这是有意的,因为这会使它们不适合用 RSC 进行测试.

为了使请求/响应请求/流遵守反应规则,它们的参数应该被声明为具有Mono<ClientMessage>。这改变了方法的主体,因为必须在处理管道的前端添加参数。清单 11-17 中描述了这些变化。

package com.apress.prospringmvc.bookstore.controller;
// other imports omitted

@Controller
public class ServerController {
    private final Logger logger = LoggerFactory.getLogger(ServerController.class);

    // request/response
    // no longer testable with rsc
    @MessageMapping("check-service")
    public Mono<String> checkService(@Payload Mono<ClientMessage> clientMessage){
        return clientMessage
                .doOnNext(message -> logger.debug("{}:  Ping request from client --> {}" ,
                    Instant.now(), message.getName()))
                .map( message -> Instant.now() + ": Service online. Send command.");
    }

    // request/stream
    // no longer testable with rsc
    @MessageMapping("show-books")
    public Flux<Book> showBooks(@Payload Mono<ClientMessage> clientMessage) {
        return clientMessage
                .doOnNext(message -> logger.debug("{}:  " +
                        "Random releases requested by client --> {}" , Instant.now(), message.getName()))
                .thenMany(Flux.fromStream(
                        Stream.generate(BookNewReleasesUtil::randomRelease))
                        .delayElements(Duration.ofSeconds(1L)));
    }

    // rest of the code omitted
}

Listing 11-17The ServerController Fully Reactive Request/Response and the Request/Stream Implementations

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

一旦服务器按预期启动并运行,正如rsc所确认的,您就可以开始编写您的客户机了。同一个 Spring Boot 依赖项提供了编写客户机所需的类,但是因为我们连接到一个 RSocket 服务器,所以应用配置文件没有填充特定于 RSocket 的属性。在应用配置文件中没有任何属性并且spring-boot-starter-webflux在类路径中org.springframework.boot.web.embedded.netty.NettyWebServer在默认端口 8080 上启动。

我们的客户机是一个简单的@RestController带注释的类,包含四个方法,每个方法对 RSocket 服务器应用上的四个相应方法之一进行 RSocket 调用。这包括不能用rsc测试的方法。为此,需要一个org.springframework.messaging.rsocket.RSocketRequester的实例。正在使用 WebSocket URL 配置此 bean,该 URL 向服务器公开的 RSocket 消息处理方法发出请求。bean 是使用RSocketRequester.Builder.创建的,它是反应式的并返回一个Mono<RSocketRequester>,所以需要一个block()调用(是的,就是我在第十章末尾告诉你要避免的那个)来提取RSocketRequester实例。

清单 11-18 描述了这个 bean 的声明。因为我们使用 RSocket over WebSocket 进行通信,所以在服务器上调用了带有前缀为ws://的典型 WebSocket URL。

如果我们使用 HTTP,那么应该调用带有 HTTP URL 的connectTcp(..)

package com.apress.prospringmvc.bookstore;
import org.springframework.messaging.rsocket.RSocketRequester;
//other imports omitted

@SpringBootApplication
public class RSocketClientBookstoreApplication {

    public static void main(String... args) {
        new SpringApplication(RSocketClientBookstoreApplication.class).run(args);
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
        return builder.connectWebSocket(URI.create("ws://localhost:8081/rsocket")).block();
    }
}

Listing 11-18The RSocketRequester Bean Declaration in the Spring Boot Application Main Class

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

控制器没什么特别的,但是对RSocketRequester bean 方法调用的调用很特别。清单 11-19 描绘了ClientController,其中的每个处理程序方法都可以被认为是一个 RSocket 客户端使用同一个RSocketRequester bean 向 RSocket 服务器发出请求。

package com.apress.prospringmvc.bookstore;
import org.springframework.messaging.rsocket.RSocketRequester;
//other imports omitted

@RestController
public class ClientController {

    private final RSocketRequester requester;

    public ClientController(RSocketRequester requester) {
        this.requester = requester;
    }

    @GetMapping("introduction")
    public Mono<String> introduction(){
        ClientMessage clientMessage = new ClientMessage().name("gigi");
        requester.route("introduction").data(clientMessage).send();
        return Mono.just("Introduction data was sent.");
    }

    @GetMapping("check-service")
    public Mono<String> checkService(){
        ClientMessage clientMessage = new ClientMessage().name("gigi");
        return requester.route("check-service").data(clientMessage).retrieveMono(String.class);
    }

    @GetMapping(path = "show-books", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Book> showBooks(){
        ClientMessage clientMessage = new ClientMessage().name("gigi");
        return requester.route("show-books").data(clientMessage).retrieveFlux(Book.class).limitRate(20);
    }

    @GetMapping(value = "books-channel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Book> booksChannel() {
        return this.requester.route("books-channel")
                .data(Flux.range(0, 10).map(i -> new ClientMessage().name("gigi").author(RandomUtil.randomAuthor())))
                .retrieveFlux(Book.class).limitRate(5).log();
    }
}

Listing 11-19The ClientController Containing Handler Methods Making RSocket Requests

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

introduction()checkService()方法不需要produces属性,因为它们返回一个Mono<String>, Spring 足够聪明,可以在没有特殊指令的情况下自行反序列化。作为单个值,没有必要将它作为 SSE 发送给客户机。它也很实用,因为这些方法可以用浏览器来测试。

RSocketRequester将每个客户机请求路由到适当的服务器方法。该方法在服务器端由使用@MessageMapping注释指定的属性来标识。introduction()处理程序方法被设计成发出一个服务器调用,然后返回一个Mono<String>,所以该方法可以通过在浏览器中打开http://localhost:8080/introduction来执行。如果不返回任何内容,访问这个 URL 会让大多数浏览器感到困惑。例如,Firefox 抱怨无法读取响应,因为它无法将其转换为文本,而 Chrome 则什么都不说。但是,在反应式服务直接相互对话(不涉及浏览器)的环境中,只发送消息而不期待响应有时是有意义的。举个例子,我们来看看网络广告。你认为将它们发送到页面的服务器会期待你的响应吗?

可以在浏览器中测试checkService()方法,从服务器收到的响应显示为网页。最后两个方法返回事件流;为了测试它们,你需要使用curl命令 14

在越来越多的人对使用 Spring Boot 编写微服务应用感兴趣的背景下,RSocket 目前是 Pivotal 的典型代表。它易于设置,并且完美地集成在 Spring 生态系统中。如果你想在 Spring Reactive 应用的背景下了解更多关于 RSocket 的知识,spring.io 官方博客上有一系列文章。 十五

WebFlux 安全性

WebFlux 安全是本书的最后一个反应式应用主题。任何公开的应用都需要支持多个用户并控制对敏感资源的访问。在 Spring WebFlux 应用中进行配置比在非反应式 Spring web 应用中进行配置更容易。

从第 5 版开始,Spring Security 库被修改为包含 reactive 组件,以在 Spring WebFlux reactive 应用中设置安全性。Spring Security 在书中有专门的一章。如果您不熟悉保护 Spring 应用,我们建议您先阅读相关内容。我们所说的关于 servlet 环境中 Spring Security 性的一切也适用于反应式应用。唯一的区别是一些类和注释,它们允许通过使用反应流和声明性编程在反应式应用中配置安全性。

表 11-1 包含了为 Spring WebFlux 应用和 Spring MVC 应用配置安全性所涉及的注释和类。

表 11-1

Spring MVC 和 WebFlux 安全组件比较

|

spring webflux

|

框架

|

描述

|
| — | — | — |
| org.springframework.security. config.annotation .web.reactive | org.springframework.security. config.annotation.web.configuration | 大多数组件所在的根包。 |
| @EnableWebFluxSecurity | @EnableWebSecurity / @EnableWebMvcSecurity | 在配置类上使用的注释,用于启用 Spring Security 性。(@EnableWebMvcSecurity已弃用,可能会在 Spring 6 中移除。它的所有功能已经是@EnableWebSecurity的一部分。) |
| @EnableRSocketSecurity | | @EnableRSocketSecurityorg.springframework.security.config.annotation.rsocket包的一部分,增加了通过 RSocket 协议通信的应用中的 Spring Security 性。 |
| @EnableReactive MethodSecurity | @EnableGlobalMethodSecurity | 在配置类上使用的注释,用于在方法级别启用 Spring Security 性。 |
| | SecurityConfigurer | 由安全配置类实现的接口。 |
| ServerHttpSecurity(类) | HttpSecurity(最后一课) | 允许为特定的 HTTP 请求配置基于 web 的安全性。 |
| ReactiveUserDetailsService | UserDetailsService | 实现这些接口的类的实例存储用户信息,这些信息随后被封装到Authentication对象中。ReactiveUserDetailsService是围绕UserDetailsService的反应式包装器。 |
| ServerHttpRequest | HttpServletRequest | ServerHttpRequest是 Spring Web 库的一部分,而HttpServletRequest是 Java Servlet 库的一部分。 |
| ServerHttpResponse | HttpServletResponse | ServerHttpResponse是 Spring Web 库的一部分,而HttpServletResponse是 Java Servlet 库的一部分。 |
| ServerWebExchange | | ServerWebExchange表示 HTTP 请求-响应交互的契约。它提供对ServerHttpRequestServerHttpResponse的访问。 |
| SecurityWebFilterChain | SecurityFilterChain | 表示过滤器链的接口与一个ServerWebExchange / HttpServletRequest匹配,以决定它是否适用于该请求。 |
| WebFilterChainProxy | DelegatingFilterProxy | 将过滤请求的工作委托给一组SecurityWebFilterChain / SecurityFilterChain实例。 |
| WebSessionServerCsrfTokenRepository | HttpSessionCsrfTokenRepository | 一个 CSRF 令牌库实现,它将CsrfToken存储在HttpSession中。 |

当使用 Spring Boot 编写一个安全的 web 反应式应用时,项目类路径上的spring-boot-starter-security依赖项会根据一些内部类为您自动配置安全性。应用仍然会启动,但是任何 URL 都会重定向到一个默认的登录表单。

要定制 Spring Boot WebFlux 应用中的安全配置,您可以在任何配置类中声明一个类型为org.springframework.security.web.server.SecurityWebFilterChain的 bean。当应用启动时,它会选择它并启用它配置的所有访问规则。如果不使用 Spring Boot,配置类需要用@EnableWebFluxSecurity注释。这个注释是在org.springframework.security.config.annotation.web.reactive包中声明的,尽管在 Spring Boot 应用中没有必要,但大多数开发人员倾向于将所有与安全相关的 beans 聚集在一个用这个注释注释的配置类中,从而将它用作一个标记。

创建 SecurityWebFilterChain bean 的方法将 Spring 注入的一个org.springframework.security.config.web.server.ServerHttpSecurity对象作为参数。ServerHttpSecurity类公开了与它的非反应对等物HttpSecurity几乎相同的方法,允许开发人员为 URL、身份验证提供者、登录表单、注销表单、CSRF 实现等指定访问规则。第十二章详细解释了如何配置 Spring Security。

在 Spring WebFlux 应用中,配置认证的最快方法是声明一个用一个或多个UserDetails实例初始化的MapReactiveUserDetailsService bean。该 bean 为内存中的身份验证提供数据。清单 11-20 是一个被配置为向两个用户提供认证数据的MapReactiveUserDetailsService bean 的例子。您可以看到PasswordEncoder bean,它设置了密码散列以获得更好的安全性。开发环境的配置可以使用NoOpPasswordEncoder来设置,它不会以任何方式改变密码。出于安全考虑,这种方法不被认可。

package com.apress.prospringmvc.bookstore.config.security;

import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;

//other import omitted

@Configuration
@EnableWebFluxSecurity
public class SecurityConfig  {
    //other code omitted
    @Bean
    public MapReactiveUserDetailsService userDetailsService() {
        UserDetails john = User.withUsername("john")
                .password(passwordEncoder().encode("doe")).roles("USER")
                .build();
        UserDetails admin = User.withUsername("admin")
                .password(passwordEncoder().encode("admin")).roles("ADMIN")
                .build();
        return new MapReactiveUserDetailsService(john, admin);
    }

    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }
}

Listing 11-20The MapReactiveUserDetailsService Bean

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

MapReactiveUserDetailsService类实现了org.springframework.security.core.userdetails.ReactiveUserDetailsService来声明一个简单的 API,用于根据用户名检索一个Mono<UserDetails>。如果身份验证数据是由数据库或任何外部系统(例如 Google 或 Okta 这样的单点登录提供商)提供的,那么使用现有数据的最简单方法就是实现这个接口,并提供一种定制的方法来检索身份验证数据。声明一个你的类型的 bean,它会被自动拾取。因为书店应用的身份验证数据保存在 MongoDB 表中,所以实现很简单,因为反应式 MongoDB 存储库使用反应式流返回数据。清单 11-21 描述了书店应用中使用的ReactiveUserDetailsService的实现。

package com.apress.prospringmvc.bookstore.config.security;
import com.apress.prospringmvc.bookstore.repository.AccountRepository;
import org.springframework.security.core.userdetails.ReactiveUserDetailsService;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
// other imports omitted

@Service
public class ReactiveAuthenticationService implements ReactiveUserDetailsService {

    private final AccountRepository accountRepository;

    public ReactiveAuthenticationService(AccountRepository accountRepository) {
        this.accountRepository = accountRepository;
    }

    @Override
    public Mono<UserDetails> findByUsername(String username) {
        return accountRepository.findByUsername(username).switchIfEmpty(
                Mono.defer(() -> Mono.error(new UsernameNotFoundException("User Not Found"))
        )).map(this::toUserDetails);
    }

    private UserDetails toUserDetails(Account account) {
        String[] authorities = new String[account.getRoles().size()];
        authorities = account.getRoles().toArray(authorities);
        return User.withUsername(account.getUsername())
                .password(account.getPassword())
                .authorities(authorities)
                .build();
    }
}

Listing 11-21The ReactiveAuthenticationService

Bean Used in the Bookstore Application

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

ReactiveAuthenticationService返回的数据用于认证用户。它的角色用于决定允许用户访问哪些资源,以及允许在应用中执行哪些操作。现在我们已经设置了身份验证提供者数据,下一步是配置SecurityWebFilterChain。清单 11-22 描述了书店应用中使用的配置。

package com.apress.prospringmvc.bookstore.config.security;

import org.thymeleaf.extras.springsecurity5.dialect.SpringSecurityDialect;
import org.springframework.security.web.server.util.matcher.ServerWebExchangeMatchers;
// other imports omitted

@Configuration
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class SecurityConfig  {

    @Bean
    SecurityWebFilterChain authorization(ServerHttpSecurity http) {
        final RedirectServerLogoutSuccessHandler logoutSuccessHandler =
             new RedirectServerLogoutSuccessHandler();
        logoutSuccessHandler.setLogoutSuccessUrl(URI.create("/"));

        return http
                .formLogin(formLogin -> formLogin.loginPage("/login"))
                .logout(logoutSpec -> logoutSpec.logoutUrl("/signout")
                    .logoutSuccessHandler(logoutSuccessHandler))
                .authorizeExchange(authorize -> authorize
                        .matchers(PathRequest.toStaticResources().atCommonLocations()).permitAll()
                        .pathMatchers("/book/edit/*", "/book/create").hasRole("ADMIN")
                        .pathMatchers("/customer/edit/*").hasRole("ADMIN")
                        .matchers(ServerWebExchangeMatchers.pathMatchers(HttpMethod.DELETE,
                                "/book/delete/*", "/customer/delete/*", "/account/delete/*")).hasRole("ADMIN")
                        .anyExchange().permitAll()
                )
                .csrf(csrf ->  csrf.csrfTokenRepository(repo()))
                .build();
    }

    @Bean
    public ServerCsrfTokenRepository repo() {
        WebSessionServerCsrfTokenRepository repo = new WebSessionServerCsrfTokenRepository();
        repo.setParameterName("_csrf");
        repo.setHeaderName("X-CSRF-TOKEN"); // default header name
        return repo;
    }

    @Bean
    public SpringSecurityDialect securityDialect() {
        return new SpringSecurityDialect();
    }

}

Listing 11-22The SecurityWebFilterChain

Bean

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

该配置描述了以下内容。

  • .formLogin(formLoginformLogin.loginPage("/login"))配置/login URL 返回的视图中声明的表单,用于用户登录。要使用默认生成的表单,您可以使用.formLogin(Customizer.withDefaults()).

  • 。logout(logout spec→logout spec . logout URL(“/sign out “))是对 URL /signout的 POST 请求,触发注销操作。默认为/logout

  • .authorizeExchange(..)使用Customizer<T>实例来配置授权。

  • .matchers(PathRequest.toStaticResources().atCommonLocations())构建了一个ServerWebExchangeMatcher,它匹配静态源所在的 Spring Boot 默认位置(/resources/static目录),而.permitall()调用将它们排除在安全性之外。

  • pathMatchers(String... )方法对 URL 使用 Ant 样式模式,并返回将 URL 映射到处理程序方法的ServerWebExchangeMatcher实例。

  • ServerWebExchangeMatchers包含一些创建ServerWebExchangeMatcher实例的实用方法。前面的清单中显示的例子使用一个 HTML 方法和一个 URL 列表来创建一个应用安全规则的ServerWebExchangeMatcher实例。

  • .csrf(csrfcsrf.csrfTokenRepository(repo()))通过引入由同一代码示例中配置的ServerCsrfTokenRepository生成的 CSRF 令牌来保护应用。

  • RedirectServerLogoutSuccessHandler注销后将用户重定向到根页面(“/”)。

通过这种配置,客户端和服务器之间的所有交换都是安全的,无论它们是使用控制器处理程序方法还是功能端点来设置的。

通过在测试端点的方法上使用@WithMockUser注释,可以非常容易地测试安全配置。这个注释是spring-security-test库的一部分,位于org.springframework.security.test.context.support包中。它是在 Spring 4 中引入的,在 Spring 5 中被扩展到了反应端点。

清单 11-23 描述了四种测试方法。清单 11-22 中的配置涵盖了每个端点。

package com.apress.prospringmvc.bookstore.api;
import org.springframework.security.test.context.support.WithMockUser;
//other imports

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class BookSecuredApiTest {
    private static Logger logger = LoggerFactory.getLogger(BookSecuredApiTest.class);

    @Autowired
    private WebTestClient testClient;

    @WithMockUser(roles = "USER")
    @Test
    void shouldFindByIsbn(){
        testClient.get()
            .uri(uriBuilder -> uriBuilder.path("/book/by/{isbn}").build("9781484230042"))
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectBody(Book.class)
            .consumeWith(responseEntity -> {
                logger.debug("Response: {}", responseEntity);
                Book book = responseEntity.getResponseBody();
                assertAll("book", () ->
                {
                    assertNotNull(book);
                    assertAll("book",
                            () -> assertNotNull(book.getTitle()),
                            () -> assertEquals("Iuliana Cosmina", book.getAuthor()));
                });
            });
    }

    @Test
    @WithMockUser(roles = "ADMIN")
    void shouldCreateABook() {
        Book book = new Book();
        book.setTitle("TDD for dummies");
        book.setAuthor("Test User");
        book.setPrice(BigDecimal.valueOf(40.99));
        book.setIsbn("12232434324");
        book.setCategory("test");

        testClient.post().uri("/book/create")
            .body(Mono.just(book), Book.class)
            .exchange()
            .expectStatus().isCreated()
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectHeader().exists("Location")
            .expectBody(Book.class)
            .consumeWith(responseEntity -> {
                logger.debug("Response: {}", responseEntity);
                assertAll("book", () ->
                {
                    assertNotNull(book);
                    assertAll("book",
                            () -> assertNotNull(book.getIsbn()),
                            () -> assertEquals("test", book.getCategory()));
                });
            });
    }

    @WithMockUser(roles = "ADMIN")
    @Test
    void shouldDeleteByIsbn(){
        testClient.delete()
            .uri(uriBuilder -> uriBuilder.path("/book/delete/{isbn}")
                .build("9781484230042"))
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .expectStatus().isNoContent();
    }

    @Test
    public void shouldReturnTwoBooks(){
        BookSearchCriteria criteria = new BookSearchCriteria();
        criteria.setCategory(Book.Category.JAVA);

        testClient.post()
            .uri("/book/search")
            .accept(MediaType.APPLICATION_JSON)
            .body(Mono.just(criteria), BookSearchCriteria.class)
            .exchange()
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectBodyList(Book.class)
            .hasSize(2);
    }
}

Listing 11-23Class Testing Secured Endpoints Access

SpringMVC WebFlux 高级教程(五)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

@WithMockUser被置于 Spring Security 引导测试上下文中的测试方法上时,对被测试端点的调用是使用特定于模拟用户的模拟安全上下文进行的。可以使用名称@WithMockUser("john")来指定用户,但是由于在清单 11-21 中,安全规则是使用角色来声明的,所以使用角色的注释形式更合适。

书店应用使用百里香叶作为模板引擎。为了支持视图中的安全百里香元素,必须将类型为SpringSecurityDialect的 bean 添加到配置中。

如果安全部分看起来很薄,这是因为第十二章完全致力于 Spring Security。考虑阅读该内容,然后回到本节。

摘要

在我们三部曲反应章节的最后,有几件事你应该记住。要构建反应式应用,您需要一种反应式思维,并以声明方式编写代码。Spring WebFlux 是编写在健壮的 JVM 平台上运行的反应式应用的优秀候选。Spring WebFlux 简化了线程化工作,因为它不需要与并行工作的底层组件进行交互。它提供了许多简化数据流转换的操作符。最终的代码更干净、可读性更强、也更健壮。

支持服务器发送的事件和 WebSocket 协议。RSocket(网飞开发人员的另一个创意)是一种新的消息协议,旨在解决常见的微服务通信挑战,如在 TCP 上处理逻辑元素级别的背压。您可以获得现代控制,如多路复用、背压、恢复和路由,并且您可以获得多种消息传递模式,包括一劳永逸、请求-响应和流式传输。

保护 WebFlux 应用也很容易。

反应式应用改善用户体验,现在大部分 web 和移动应用都是反应式应用。所以,如果你认为你可能会推迟学习反应式编程,也许不要。😃

也有缺点。学习曲线可能很陡,很容易错误地管理订阅,最终导致内存泄漏,影响用户体验。一般的方法是慢慢来,只对需要的组件进行反应。

当构建高负载或多用户应用时,您需要变得被动。社交网络应用是反应式应用的一个很好的例子。在线聊天应用是另一个例子。你想流媒体音乐或视频?变得被动也是正确的选择。你想建立一个具有高度互动 UI 元素的网络游戏吗?是的,你需要把它建成一个反应式的应用。

反应式编程已经存在,并且对于任何有成就的开发人员来说,很好地使用它都是一项必备技能。

Footnotes 1

https://tools.ietf.org/html/rfc2616

2

https://www.w3.org/TR/eventsource/

3

https://tools.ietf.org/html/rfc6455

4

http://www.oracle.com/technetwork/articles/java/jsr356-1937161.html

5

https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API

6

https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent

7

https://chrome.google.com/webstore/detail/simple-websocket-client/pfdhoblngboilpfeibdedpjgfnlcodoo?hl=en

8

http://www-sop.inria.fr/mistral/personnel/Thomas.Bonald/tcp_eng.html

9

https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/BaseSubscriber.java

10

https://stackoverflow.com/questions/52244808/backpressure-mechanism-in-spring-web-flux

11

https://rsocket.io/

12

https://github.com/real-logic/aeron

13

https://github.com/making/rsc

14

https://curl.se/

15

https://spring.io/blog/2020/03/02/getting-started-with-rsocket-spring-boot-server

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
暂无评论...