Introduction to Spring WebFlux
As the world of computation shifts from processing discrete data objects to stream processing, Spring 5 introduces a new stack for reactive processing, Spring WebFlux.
In this post I shed some light on the new stack and show some simple examples about the way it can be used.
A detailed description about Spring WebFlux can be found here.
Mono and Flux
The response from the server to the client is represented as either a Mono or a Flux object.
While Mono is used for returning 0 or 1 object, Flux is used for returning any number of objects, either as a collection or a stream, bounded or infinite.
These types are defined in the Reactor library which Spring WebFlux relies on.
When receiving a response, the client can query the object for any error, read the data or pass it on, asynchronically for further processing.
Background
The Spring framework did not support real asynchronous communication before it reached version 5.
The classic way was:
- The client (using RestTemplate) sends a request to the server and waits for a response
- The server computes a response and sends it back
- The client reads the response and processes it
Tha main issues with this model:
- The client has to hold a blocked thread for each request
- The client is unaware to the load on the server that might be overloaded with work and slow down, lose requests or even crush
- The response must be ready as a whole before it can be sent back from the server
An improvement was added in Spring 4 with the introduction of AsyncRestTemplate.
It provided a way to process the response in the background, using a callback function, without blocking the main thread.
AsyncRestTemplate is now deprecated and it was replaced in Spring 5 with the WebClient interface.
The popular paradigm of reactive programming provides a non-blocking way to exchange data reliably without the issues stated above.
In the stack the client can send a request and subscribe a method to process the response, whenever it will be received, or pass it on.
It doesn’t have to keep a dedicated thread for each request.
Moreover, If the response consists of a stream of elements, the server can send the elemnts of the data as they are ready, even before all of them are resolved, and the client can start process the elements as the flow in, even before the entire response is received.
The Server
The server’s controller is annotated with the familiar @RestController annotation and the endpoints return instances of Mono or Flux, for example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.ThreadLocalRandom;
@RestController
public class MyController {
/**
* Returns a single value
*/
@GetMapping("/str")
public Mono<String> getOneString() {
return Mono.just("Hello");
}
/**
* Returns a collection of values
*/
@GetMapping("/list")
public Flux<String> getSomeStrings() {
return Flux.just("Hello", "World");
}
/**
* Returns an infinite stream of values
*/
@GetMapping(path = "/infinite", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Long> getStreamOfStrings() {
return Flux.generate(sink -> sink.next(randomLong()));
}
private Long randomLong() {
return ThreadLocalRandom.current().nextLong();
}
}
In the first two methods above, the response is sent in the JSON format when it is ready.
In third method, however, the only way to process the infinite stream is by sending the elemnts one by one, whenever they are available and requested by the client (thus the media type must be specified as TEXT_EVENT_STREAM_VALUE in the annotation)
The Mono and the Flux API’s are very rich and provide many options, for examples:
Return an empty response
1
return Mono.empty();
an error
1
return Mono.error(new IllegalArgumentException());
a collection
1
return Flux.fromIterable(list);
or a stream
1
return Flux.fromStream(stream);
infinite
1
return Flux.generate(sink -> sink.next(nextValue()));
or bounded
1
return Flux.<String>generate(sink -> sink.next(nextValue())).take(10);
and even a response that never actually returns
1
return Mono.never()
The Client
WebClient is the new interface that was introduced in Spring 5. It provides an impressive set of methods to create and send requests and to process the responses in both blocking and non-blocking ways.
Note: as of Spring 5.0.1, the client can read String values, arrays / collections or objects, not other plain values as numbers (int, double, etc.), booleans or null. See my question and the bug report.
Blocking
The simplest way is to wait (block) for the response and process it in the same thread that initiate the request:
1
2
3
4
5
6
7
8
WebClient client = WebClient.create("http://localhost:8080");
String str = client.get()
.uri("/str")
.accept(APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.block();
process(str);
If the response is a collection (wrapped as Flux) we can block until all the elements are read
1
2
3
4
5
6
7
8
9
10
WebClient client = WebClient.create("http://localhost:8080");
Flux<String> flux = client.get()
.uri("/list")
.accept(APPLICATION_JSON)
.retrieve()
.bodyToFlux(String.class);
List<String> list = flux.collectList().block();
for (String str : list) {
process(str);
}
This technique can lead to performance issues if the collection is large, and, of course, inapplicable if the collection represents an infinite stream.
Non-Blocking
A better way is using the reactive, non-blocking, methods. In this case the elements of the response are processed as they flow in:
1
2
3
4
5
6
7
WebClient client = WebClient.create("http://localhost:8080");
Mono<String> mono = client.get()
.uri("/str")
.accept(APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
mono.subscribe(str -> process(str));
and
1
2
3
4
5
6
7
WebClient client = WebClient.create("http://localhost:8080");
Flux<String> flux = client.get()
.uri("/list")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
flux.subscribe(str -> process(str));
the latter example let us process the input as it flows in, even before the server completed the computation.
A basic client for the controller above may be implemented as
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
public class MyClient {
public void processOneString(Consumer<String> consumer) {
WebClient client = WebClient.create("http://localhost:8080");
Mono<String> mono = client.get()
.uri("/str")
.accept(APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
mono.subscribe(consumer);
}
public void processSomeStrings(Consumer<String> consumer) {
WebClient client = WebClient.create("http://localhost:8080");
Flux<String> flux = client.get()
.uri("/list")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
flux.subscribe(consumer);
}
public void processStreamOfStrings(Consumer<String> consumer) {
WebClient client = WebClient.create("http://localhost:8080");
Flux<String> flux = client.get()
.uri("/infinite")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
flux.subscribe(consumer);
}
}
The Javadoc of subscribe(…) alerts us that “since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread”. You may have to make sure that the application does not exit before the response is processed.
To close a connection to the server dispose the subscription
1
2
3
Disposable subscription = flux.subscribe(consumer);
...
subscription.dispose();
Note: since the connection is dropped by the client, without coordinating with the server, the server will complain about a “Broken pipe”. This is the “normal” behavior at the moment (see here)
Tags
kafka
avro
docker
spring
webflux
java9
modules
jpms