Sunday, October 22, 2017

Java 8: Stream

Stream was introduced in Java 8 and it provides very useful and compact ways of programming along with the Lamda expression, also introduced in Java 8.

Java API defines the stream as a stream of object references: a sequence of elements supporting sequential and parallel aggregate operations.  Stream is not a data structure that stores elements.  It conveys elements from a source through a pipeline of computational operations.   Collections and streams have different goals.  Collections are mainly for the efficient management of and access to their elements.  Streams do not provide a way of directly accessing the elements, but are more concerned with describing their source and the computational operation on the source. Source could be an array, a collection, a generator function, an I/O channel, and so on.

To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source, intermediate operations and a terminal operation. Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed..

Let's better understand the stream with examples.

[Code-1]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public static void main(String[] args) {
  List<Integer> numbers = new ArrayList<>();
      
  for(int i=30; i>=1; i--){
    numbers.add(i);
  }
      
  List<Integer> resultNumbers = numbers.stream()
            .filter(p -> p%2==0)
            .map(n -> n*2)
            .collect(Collectors.toList());
      
  resultNumbers.forEach(n-> System.out.print(n + " "));
}

Collection in Java 8 provides two methods that generate the Stream : stream() and parallelStream().  On this example, the stream() returns a sequential Stream with the 'numbers' as its source. The Stream provides several methods for the 'computation' such as map, find, count, forEach, limit, and so on.  Some of these methods returns a Stream after an operation and are used as an intermediate operation. Some of them returns a final object or value and they are used as a terminal operation.

- filter(Predicate<? super T> predicate) 
It filters elements of the stream that matches the given predicate.  Predicate is a boolean-valued function of one argument.  On the line 9, "p -> p%2 == 0" is the Predicate and "p" is the one passing argument.  "p%2==0" is a boolean-value returning expression.  In this example, it returns a Stream with even numbers in the source 'numbers'

- map(Function<? super T, ? extends R> mapper)
It applies the given function to each element of the stream.On the line 10, it doubles each number 'n' in the Stream of the filtered stream in the source 'numbers' (from the line 9)

Output:

60 56 52 48 44 40 36 32 28 24 20 16 12 8 4 

- Execution order of methods in Stream
To see the order of execution, let's change the code and run it.  This time, the 'numbers' list contains 10 numbers initially.

[Code-2]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public static void main(String[] args) {
  List<Integer> numbers = new ArrayList<>();
      
  for(int i=10; i>=1; i--){
    numbers.add(i);
  }
      
  List<Integer> resultNumbers = numbers.stream()
            .filter( p -> {System.out.print(" # F " + p); return p%2==0;})
            .map(n -> {System.out.print(" # M " + n*2); return (n*2);})
            .collect(Collectors.toList());
      
  System.out.print("\nFinal: ");
  resultNumbers.forEach(n-> System.out.print(n + " "));
}

Output:

# F 10 # M 20 # F 9 # F 8 # M 16 # F 7 # F 6 # M 12 # F 5 # F 4 # M 8 # F 3 # F 2 # M 4 # F 1
Final: 20 16 12 8 4 

It doesn't complete one intermediate method with all elements before calling the second intermediate method.  It filters each element and passes the element to the map if the element meets the Predicate condition before filtering the next element. 

The 'characteristic' of the stream is maintained. As said earlier, Streams are lazy: computation on the source data is only performed when the terminal operation is initiated

The following code also outputs the same result because the computation is not performed until the terminal operation 'collect' is initiated.

[Code-3]
1
2
3
4
5
 Stream<Integer> st = numbers.stream()
            .filter( p -> {System.out.print(" # F " + p); return p%2==0;});
      
 st = st.map(n -> {System.out.print(" # M " + n*2); return (n*2);});
 List<Integer> resultNumbers = st.collect(Collectors.toList());

If you put a breakpoint and run it on Debug mode, you will notice that there will be no output until executing the line 5.

- Return object and the element's order in the return object.
A 'new' List object is created in the 'Collectors.toList()' and this is the method description on the API document:

 /**
  * Returns a {@code Collector} that accumulates the input elements into a
  * new {@code List}. There are no guarantees on the type, mutability,
  * serializability, or thread-safety of the {@code List} returned; if more
  * control over the returned {@code List} is required, use {@link #toCollection(Supplier)}.
  *
  * @param <T> the type of the input elements
  * @return a {@code Collector} which collects all the input elements into a
  * {@code List}, in encounter order
  */
 public static <T>  Collector<T, ?, List<T>> toList() {

It says "return in encounter order".  This is the order in which the source makes its elements available by its iteration order.  Although we use a parallel stream, the same encounter order applies.

- Parallel Stream
[Code-2] uses a 'numbers.stream()' on the line 8.  Collection in Java 8 also provides a 'parallelStream' method that returns a parallel stream.  Let's see how the output changes.

[Code-4]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 public static void main(String[] args) {
   List<Integer> numbers = new ArrayList<>();
   for (int i = 10; i >= 1; i--) {
     numbers.add(i);
   }

   List<Integer> resultNumbers = numbers.parallelStream().filter(p -> {
          System.out.print(" # F " + p);
          return p % 2 == 0;})
        .map(n -> {
          System.out.print(" # M " + n * 2);
          return (n * 2);})
        .collect(Collectors.toList());

   System.out.print("\nFinal: ");
   resultNumbers.forEach(n -> System.out.print(n + " ")); 
 }

Output: 

 # F 4 # F 6 # M 12 # F 8 # M 16 # F 1 # F 5 # F 2 # F 10 # M 20 # F 3 # F 9 # M 4 # F 7 # M 8
Final: 20 16 12 8 4 

Since the parallel stream runs the methods in parallel, the output would be different each time, and the displaying order is random except one thing: "#F " outputs come before the corresponding "#M" output. (For example, "# F 2" comes before the " # M 4")

Parallelism and Joining Threads
For parallel stream, ordering constraint can degrade the execution performance.  For example, a 'limit()' method with the parallel steam may require some buffering to ensure the order and undermining the benefit of the parallelism.

I will add the 'limit' method between the filter and map methods shown in the [Code-4] with more number of element in the source.

[Code-5]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
 public static void main(String[] args) {
   List<Integer> numbers = new ArrayList<>();
   for (int i = 30; i >= 1; i--) {
     numbers.add(i);
   }

   List<Integer> resultNumbers = numbers.parallelStream().filter(p -> {
          System.out.print(" # F " + p);
          return p % 2 == 0;})
        .limit(10)
        .map(n -> {
          System.out.print(" # M " + n * 2);
          return (n * 2);})
        .collect(Collectors.toList());

   System.out.print("\nFinal: ");
   resultNumbers.forEach(n -> System.out.print(n + " ")); 
 }

Output:  (Some output from the filter are omitted to save the space)

 # F 11 # F 17 # F 26 # F 29 # F 27 # F 21 # F 22 ... # F 10 # F 14 # F 13 # M 36 # M 28 # M 24 # M 48 # M 52 # M 44 # M 40 # M 56 # M 60 # M 32
Final: 60 56 52 48 44 40 36 32 28 24 

The order of filter method is random by running it in parallel, and the order of the map method is random, but they are separated.  The reason is that the limit(10) method joins all threads used for the filter method before it limits the elements to 10.  After that, the map method run again in parallel.  (I said "joins all threads", but the ForkJoinTask, which run within a ForkJoinPool, is used.  ForkJoinTask is a thread-like entity that is much lighter than a normal thread)

The final output is same in encounter order used in the Collectors.toList().

Java 9: Flow - Reactive Programming

Programming world has always been changed fast enough and many programming / design paradigms have been introduced such as object oriented p...