JDK23: Gatherers upgrades pipeline design pattern JEP-473
The source-code flexibility goes hands in hands with maintainability and testability. The Java language benefits not only from its maturity but also from the fact that it is one of a strongly typed language. This may initiate possible discussion topics, as each defined variable requires its type to be known, which can lead to unnecessary verbosity in the source code (reduced by JEP-286: local variable type inference[4]). Since the introduction of the Stream API in Java 8, developers have benefited from the ability to create complex transformations inside the pipelines. The Stream API allows a lazily evaluated definition of how pipeline (Example 1.- 1) should be transformed through one or multiple intermediate functions (Example 1.- 2). The sequence of intermediate functions must be terminated by a terminal function that materializes the output (Example 1.- 3).
long totalNumberOfWords =
Stream
.of("one", "", "two", "three", "four", "fix", "", "six") // 1
.filter(Predicate.not(String::isEmpty)) // 2
.collect(Collectors.counting()); // 3
Example 1.: Standard stream usage is expressive and very efficient.
A Stream API meets the requirements for most standard use cases, but when an additional complex conditional transformation is required, these steps lead to a complex definition of a terminal function, which is represented by the use of a Collector. Such collector usage can be not only difficult to understand but also difficult to maintain. In other words, there is an obvious lack of ability to define advanced intermediate operations on the pipeline.
Expanding intermediate functions with JEP-473
The JEP-473[1] comes with the salvage option named Gatherers. The Gatherer is an intermediate function, a functional interface(Example 2.), that can transform an input element of type <T> to an output element of type <R>. The Gatherer may potentially remember its internal state of type <A> (Example 2. - method integrator). The ability of referring to the Internal state comes very handy when any kind of internal data lookup is required. The gatherer internal state <A> is hidden from the rest of the code as an implementation detail.
@PreviewFeature(feature = PreviewFeature.Feature.STREAM_GATHERERS)
public interface Gatherer<T, A, R> {
default Supplier<A> initializer() {return ...};
default Integrator<A, T, R> integrator();
default BinaryOperator<A> combiner() {return ...}
default BiConsumer<A, Downstream<? super R>> finisher() {...}
}
Example 2.: Gather is a functional interface that defines a stages, where type A represent an internal state
One of the newly introduced goals of gatherer can be considered to simplify the use of parallelism inside intermediate functions. The interface comes with a method combiner() (Example 2.) that operates on the gatherer internal state. Importantly, when the initial stream supports parallel execution the combiner() method is executed in parallel, in all other cases the combiner() method is executed sequentially. This can be useful when the data stream can not support parallelism by nature of the transformation.
Anytimes gatherer is invoked it creates a Downstream object. The Downstream instance represents an output of type <R> passed into the next evaluation stage. The gatherer initiates its internal state by initializer() method and invokes an integrator() method. Invoking the finisher() method passes downstream objects into the output (Example 3.)
record WindowFixed<T>(int windowSize)
implements Gatherer<T, ArrayList<T>, List<T>> {
public WindowFixed {
if (windowSize < 1)
throw new IllegalArgumentException("window size must be positive");
}
@Override
public Supplier<ArrayList<T>> initializer() {
return () -> new ArrayList<>(windowSize);
}
@Override
public Integrator<ArrayList<T>, T, List<T>> integrator() {
return Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
window.add(element);
if (window.size() < windowSize)
return true;
var result = new ArrayList<T>(window);
window.clear();
return downstream.push(result);
});
}
@Override
public BiConsumer<ArrayList<T>, Downstream<? super List<T>>> finisher() {
return (window, downstream) -> {
if (!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<T>(window));
window.clear();
}
};
}
}
var list = Stream.of(1,2,3,4,5,6,7,8)
.gather(new WindowFixed(3))
.gather(new WindowFixed(2))
.collect(Collectors.toList());
System.out.println("Result:" + list);
Output:
Result:[[[1, 2, 3], [4, 5, 6]], [[7, 8]]]
Example 3.: The FixedWindow gatherer operates on a stream of types <T> with internal state <A> as ArrayList<T> and output type List<T>. Collectors.toList() is the terminal operation and the moment the stream materializes
Collection of build-in gatherers
The JEP-473[1] comes with collection of build-in gatherers which may serve main of purposes without requirement creating custom ones(Example 4.):
- fold(…) : performs stateful order many-to-one reduction transformation
- mapConcurrent(…) : is a stateful one-to-one which may invoke the supplied function concurrently
- scan(…): performs stateful one-to-one incremental accumulation where initial states is obtained from the Supplier and in each next stage BiConsumer is performed
- windowFixed(…): performs many-to-many stateful transformation where elements are separated into windows
- windowSliding(…): performs many-to-many stateful order transformation where each output window contains the previous state and adds one new element from the stream.
var fold1 = Stream.of(1,2,3,4,5,6,7,8,9)
.gather(Gatherers.fold(() -> "", (string, number) -> string + number))
.collect(Collectors.toSet());
var scan1 = Stream.of(1,2,3,4)
.gather(Gatherers.scan(() -> "", (string, number) -> string + number))
.toList();
var slidingWindow2 =
Stream.of(1,2,3,4).gather(Gatherers.windowSliding(2))
.toList();
Output:
Result fold1:[123456789]
Result scan1:[1, 12, 123, 1234]
Result slidingWindow2:[[1, 2], [2, 3], [3, 4]]
Example 4.: Some of build-in gatherers and their outputs
Conclusion
The JEP-473 can help meet the functional requirements of today’s industry, where huge amounts of data need to be processed and analyzed. Gatherers help eliminate the need to define large error-prone Collectors, which can degrade source code maintainability. They clearly contribute to maintaining the purpose of the pipeline design pattern[3] and transparently applying the intermediate transformations, while their internal state does not escape the scope of the materialized pipeline. In addition, each introduced gatherer can be tested within the scope of its functionality. JEP-473: Stream Gatherers is another great example of how to move the Java platform forward[2] while meeting industry needs.
Resources
[1] JEP-473: Stream Gatherers (Second Preview)
[2] Java 23 Has Arrived, And It Brings a Truckload of Changes
[3] Practical Design Patterns for Java Developers
[3] JEP 286: Local-Variable Type Inference