Spring Boot 2 comes with a reactive stack which enables us to create asynchronous, non-blocking, event-based applications. The reactor is a library for reactive programming that is part of the spring family.

To get started with Reactor. we need to add the below dependencies to pom file.

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
</dependency>

In this blog, we want to write lots of reactive tests, so we can add the below dependency to pom file.

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <scope>test</scope>
</dependency>

The reactor library has two main components which are Mono and Flux. The Mono and Flux are discussed in detail in Introduction to Reactor and Reactive Programming blog.

The common reactive operations

If you have one or more objects that you would like to create a Flux or Mono from, we can use the static just() method on Flux or Mono.

@Test
public void createFlux_just(){
Flux<String> animalFlux = Flux
        .just("Dog","Cat","Monkey","Elephant");

The flux has been created, but there are no subscribers , so the data does not flow. We need to subscribe to the flux to turn on the flow of data.

animalFlux.subscribe(
  a -> System.out.println("The Animals are: " + a));

The lambda expression which is given to subscribe method is actually a java.util.Consumer and is used to create a Reactive Streams Subscriber. Once we call subscribe, the data starts to flow.

The Spring Boot provides the StepVerifier class which can be used to test a Flux or Mono as shown below.

StepVerifier.create(animalFlux)
        .expectNext("Dog")
        .expectNext("Cat")
        .expectNext("Monkey")
        .expectNext("Elephant")
        .verifyComplete();

The StepVerifier subscribes to the Flux and then asserts each item matches with the expected animal. Finally, it verifies that Flux is complete.

A Flux can also be created with collections such as from array, Iterable and Java Stream.

To create a Flux from the array, reactor provides the static fromArray() method, where we need to pass the array.

@Test
public void createFlux_fromArray() {
  String[] animals = new String[] {
      "Dog", "Cat", "Monkey", "Elephant"  };

  Flux<String> animalFlux = Flux.fromArray(animals);
  
  StepVerifier.create(animalFlux)
        .expectNext("Dog")
        .expectNext("Cat")
        .expectNext("Monkey")
        .expectNext("Elephant")
        .verifyComplete();

Similarly, if we want to create a Flux from List or any other Iterable implementations, we can pass it into the static fromIterable() method:

@Test
public void CreateFlux_fromIterable(){
  List<String> animalList = new ArrayList<>();
  animalList.add("Dog");
  animalList.add("Cat");
  animalList.add("Monkey");
  animalList.add("Elephant");

  Flux<String> animalFlux = Flux.fromIterable(animalList);

  StepVerifier.create(animalFlux)
        .expectNext("Dog")
        .expectNext("Cat")
        .expectNext("Monkey")
        .expectNext("Elephant")
        .verifyComplete();

We can also use Flux as an counter by using the static range() method as shown below.

@Test
public void createFlux_range(){
  Flux<Integer> rangeFlux = 
      Flux.range(1,3);

  StepVerifier.create(rangeFlux)
        .expectNext(1);
        .expectNext(2);
        .expectNext(3);
        .verifyComplete();

In above code, the FLux is created with a starting value of 1 and ending value of 3, the StepVerifier verifies that 3 elements are published with integers 1,2,3.

We can also specify the the Flux to emit the values with some delays by using the static interval() method as shown below.

@Test
public void createFlux_interval(){
  Flux<Integer> intervalFlux = 
      Flux.interval(Duration.ofMinutes(2))
          .take(3);

  StepVerifier.create(intervalFlux)
        .expectNext(0);
        .expectNext(1);
        .expectNext(2);
        .verifyComplete();

In the above example,Flux starts with 0 , then increment and then emits the records every 2 minute. It does not have maximum value. We can limit the results by using take() operation to limit the number of records to 3 entries.

In this blog post, we have seen the various creation operations involved in creating a Flux. Most of the operations used by Mono are similar to Flux. So all the operations discussed above applies to creating Mono also.