In the blog Understanding Reactive Creation Operations with Spring Boot Test , we have seen how to create a new flux/Mono using various static operations given by reactor Project.

In the real world scenario, We often find a use cases where we need to combine two Flux/Mono to create a single Flux/Mono or needs to split a single type into more than one Flux/Mono.

For example, if we have two Flux and want to create a single flux that will produce the data as soon as it is available to either of the Flux streams, then we can use mergeWith() operation.

@Test
public void Flux_Merge(){

  Flux<String> namesFlux = Flux
      .just("Monkey", "Parrot", "Ant")
      .delayElements(Duration.ofSeconds(5));

  Flux<String> categoryFlux = Flux
       .just("Animal", "Bird", "Insect")
       .delaySubscription(Duration.ofSeconds(2.5))
       .delayElements(Duration.ofSeconds(5));

 Flux<String> fluxMerge = namesFlux.mergeWith(categoryFlux);

 StepVerifier.create(fluxMerge)
     .expectNext("Monkey")
     .expectNext("Animal")
     .expectNext("Parrot")
     .expectNext("Bird")
     .expectNext("Ant")
     .expectNext("Insect")
     .verifyComplete();
}

In the above example, we have one Flux with the names of animal, bird, insect and we have second Flux which are the categories the given names belongs to.

As Flux will produce the data as soon as possible, we can use delayElement() operation to slow down the flow of data. As we want to have the name and then the category to which the name belongs, we can delay the subscription using delaySubscription() operation.

The mergeWith() does not guarantee back and forth between the sources. As we see from above example, if we timing of the flux are changed, the merged flux will not have the consistency of getting name first and then category to which the name belongs.

We can use another operation zip(). It will produce the single flux which will be tuple of the items.

@Test
public void flux_zipped(){
  Flux<String> namesFlux = Flux
      .just("Monkey", "Parrot", "Ant");
  Flux<String> categoryFlux = Flux
       .just("Animal", "Bird", "Insect");

 Flux<Tuple2<String,String>> fluxZipped = 
      Flux.zip(namesFlux, categoryFlux);

 StepVerifier.create(fluxZipped)
     .expectNextMatches( i ->
        i.getT1().equals("Monkey") &amp;&amp;
        i.getT2().equals("Animal"))
     .expectNextMatches( i ->
        i.getT1().equals("Parrot") &amp;&amp;
        i.getT2().equals("Bird"))
     .expectNextMatches( i ->
        i.getT1().equals("Ant") &amp;&amp;
        i.getT2().equals("Insect"))
     .verifyComplete();
}

The created Flux is perfect alignment with the names of animals, birds and insects and the categories those names belongs to.

If we do not want to work with Tuples, then we can provide the function to the zip() which can produce any type of data, given two items.

@Test
public void Flux_zip_functions(){
  Flux<String> namesFlux = Flux
      .just("Monkey", "Parrot", "Ant");
  Flux<String> categoryFlux = Flux
       .just("Animal", "Bird", "Insect"); 

  Flux<String> fluxZipped = Flux.zip(namesFlux, categoryFlux,
       (n, c) -> n + " belongs to " + c);

  StepVerifier.create(fluxZipped)
        .expectNext("Monkey belongs to Animal")
        .expectNext("Parrot belongs to Bird")
        .expectNext("Ant belongs to Insect")
        .verifyComplete();
}   

Now suppose, you have two flux and we just want data emitted from first flux, then we can use first() operation.

@Test
public void flux_first(){
  Flux<String> namesFlux = Flux
      .just("Monkey", "Parrot", "Ant")
      .delaySubscription(Duration.ofSeconds(2));
  Flux<String> categoryFlux = Flux
       .just("Animal", "Bird", "Insect"); 

 Flux<String> fluxFirst = Flux.first(namesFlux, categoryFlux);

 StepVerifier.create(fluxFirst)
     .expectNext("Monkey")
     .expectNext("Parrot")
     .expectNext("Ant")
     .verifyComplete();
}

In the above example, we only wanted the names of the animal, bird or insect, So to make it possible, we can add some delay in the subscription, so that only names are present in the resulting flux.

This blog touched on some of the common operations used to merge and create a single flux from multiple flux.