Sunday, November 26, 2017

Java 9: Flow - Reactive Programming

Programming world has always been changed fast enough and many programming / design paradigms have been introduced such as object oriented programming, domain driven design, functional programming, reactive programming and so on, although they are not always conflicting to each other.  Java 8 introduced Streams and Lambda Expression and supports the functional programming.  Java 9 introduced a Flow API and established a common interface for the Reactive Programming.

Reactive Streams is an API.  According to the Reactive Manifesto, Reactive Systems are Responsive, Resilient, Elastic, and (Asynchronous) Message Driven.  Large systems are composed of smaller ones and therefore depend on the Reactive properties.

Java 9 Flow API
The java.util.concurrent.Flow class in Java 9 corresponds to the Reactive Streams API, and there are already supporting implementations such as AkkaRatpackReactor, etc.  The Flow has four nested interfaces: Flow.Processor, Flow.Publisher, Flow.Subscriber, and Flow.Subscription.   Among these four interfaces, the Java 9 provides (only) one implementing class of the Publisher: java.util.concurrent.SubmissionPublisher.

@FunctionalInterface
public static interface Publisher<T> {
   public void subscribe(Subscriber<? super T> subscriber);
}
 
public static interface Subscriber<T> {
   public void onSubscribe(Subscription subscription);
   public void onNext(T item);
   public void onError(Throwable throwable);
   public void onComplete();
}
 
public static interface Subscription {
   public void request(long n);
   public void cancel();
}
 
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

The Publisher publishes a stream of data that the Subscriber is asynchronously subscribed, and it usually defines its own Subscription implementation by having an inner Subscription class as like a private BufferedSubscription class in the SubmissionPublisher class.  It usually publishes itms to the subscriber asynchronously using an ExecutorExecutorServiceForkJoinPool as an implementing class for better work management.)  Subscription's two defined methods let a Subscriber requests items or cancels it based on Subscriber's need and capacity.  It is a concept of 'back pressure' defined under the "Message Driven" property of the Reactive Streams and I will briefly talk about the Back Pressure later on this article.

The Subscriber arranges items to be requested or processed and this is some description of each method defined in the API.
 - onComplete :  Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.

 - onError(Throwable throwable) :  Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.

 - onNext(T item) :  Method invoked with a Subscription's next item.

 - onSubscribe(Subscription subscription)Method invoked prior to invoking any other Subscriber methods for the given Subscription.

Example Code
Let's now see an implementation example using a SubmissionPublisher provided in Java 9.

- Implementation of Subscriber

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class MySubscriber<T> implements Subscriber<T> {
   private Subscription subscription;
   private String name;

   public MySubscriber(String name) {
      this.name = name;
   }

   @Override
   public void onComplete() {
      System.out.println(name + ": onComplete");
   }

   @Override
   public void onError(Throwable t) {
      System.out.println(name + ": onError");
      t.printStackTrace();
   }

   @Override
   public void onNext(T msg) {
      System.out.println(name + ": " + msg.toString() + " received in onNext");
      subscription.request(1);
   }

   @Override
   public void onSubscribe(Subscription subscription) {
      System.out.println(name + ": onSubscribe");
      this.subscription = subscription;
      subscription.request(1);
   }
}


- The Main using the SubmissionPublisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.SubmissionPublisher;

public class FlowMain {

   public static void main(String[] args) {
      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

      MySubscriber<String> subscriber = new MySubscriber<>("Mine");
      MySubscriber<String> subscriberYours = new MySubscriber<>("Yours");
      MySubscriber<String> subscriberHis = new MySubscriber<>("His");
      MySubscriber<String> subscriberHers = new MySubscriber<>("Hers");

      publisher.subscribe(subscriber);
      publisher.subscribe(subscriberYours);
      publisher.subscribe(subscriberHis);
      publisher.subscribe(subscriberHers);

      publisher.submit("One");
      publisher.submit("Two");
      publisher.submit("Three");
      publisher.submit("Four");
      publisher.submit("Five");

      try {
         Thread.sleep(1000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      publisher.close();
   }
}

Let's run the main application first and then talk about how this Flow application works.

- Output
This is one possible output depending on a status of the runtime parallel execution.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Mine: onSubscribe
Hers: onSubscribe
His: onSubscribe
Yours: onSubscribe
Yours: One received in onNext
His: One received in onNext
Mine: One received in onNext
Hers: One received in onNext
Mine: Two received in onNext
His: Two received in onNext
Yours: Two received in onNext
His: Three received in onNext
Mine: Three received in onNext
Hers: Two received in onNext
Mine: Four received in onNext
His: Four received in onNext
Yours: Three received in onNext
His: Five received in onNext
Mine: Five received in onNext
Hers: Three received in onNext
Yours: Four received in onNext
Hers: Four received in onNext
Yours: Five received in onNext
Hers: Five received in onNext
Mine: onComplete
Hers: onComplete
His: onComplete
Yours: onComplete

- Explanation of the Flow Application
It has one SubmissionPublisher and four subscribers that subscribe to the publisher.  Depending on the implementation of the publisher, but in general, a Publisher implements an asynchronous implementation. In the SubmissionPublisher, the common pool instance of the ForkJoinPool is used unless it cannot support parallelism.

I put a break point on a line 32 on the MySubscriber.java and a line 18 on the FlowMain.java.  This is my debug window, which shows a usage of the ForkJoinPool.commonPool and multiple workers running.

During a call of the subscribe method in the SubmissionPublisher, the onSubscribe method of the MySubscriber is called. The line 33 on the MySubscriber controls a number of items to the current unfulfilled demand for this subscription.  When a publisher publishes an item / message on the line 18~22 of the FlowMain.java, the onNext method of the MySubscriber is called.

Without the line 26 on the MySubscriber, the subscribers will not receive more than one item because the subscriber requested only one item on the line 33.  This is how the subscriber controls a number of items to handle. (again, Back Pressure). For unbounded number of items, Long.MAX_VALUE can be used on the line 33:  subscription.request(Long.MAX_VALUE)

Per each publication activity (using the submit method) on the line 18 ~ 22, all subscribers of the publisher get a notification with an item / message which is shown on the line 5 ~ 24 of the Output.  You may notice that the order of output is not same as the order of the subscription because each worker in the ForkJoinPool works in parallel. 

When the publisher closes the publication, the onComplete method of the Subscriber is called and the subscriber can do any necessary process.

Back Pressure
On this article, I mentioned the 'back pressure'.  This concept is defined in the Message Driven characteristic of the Reactive Systems.  The Reactive Manifesto defines the "Back Pressure" like this:

"When one component is struggling to keep-up, the system as a whole needs to respond in a sensible way. It is unacceptable for the component under stress to fail catastrophically or to drop messages in an uncontrolled fashion. Since it can’t cope and it can’t fail it should communicate the fact that it is under stress to upstream components and so get them to reduce the load. This back-pressure is an important feedback mechanism that allows systems to gracefully respond to load rather than collapse under it. The back-pressure may cascade all the way up to the user, at which point responsiveness may degrade, but this mechanism will ensure that the system is resilient under load, and will provide information that may allow the system itself to apply other resources to help distribute the load."

Summary
A paradigm of the Reactive Programming is not new, and many other products have already started to support the paradigm.  It is great to have Java 9 adding a final class Flow and providing a systematic way to implement the reactive programming.  You may often need to implement your own Publisher class based on the interfaces defined in the Flow.  During the implementation, developers need to focus on the core of the reactive programming:  Responsive, Resilient, Elastic, and (Asynchronous) Message Driven.

To achieve the core of the reactive programming, the concurrent programming also become more important. Understanding of Future /CompletableFuture, Executor / ExecutorService, ForkJoinPool, Callable, etc are very necessary and I may cover them on a next post.

Saturday, November 18, 2017

Lambda Expression / Functional Programming - Not Always Beautiful.

Functional Programming has been gaining much attentions in the past few years.  Java 8 started to support the lambda expression and several new programming languages adopted the functional programming.  While I also have been learning it and watching many developers saying how great the functional programming is, I have also been remembering an old story I read before an age 10: The Naked King (or The Emperor's New Clothes)  - Everyone says "the most beautiful clothes" until a pure child says "the king is naked!"

I exaggerated.  Functional programming is very useful and worth to be popular, but it definitely requires careful thought before blindly saying "it is very beautiful".  A blog "Stop abusing lambda expression" also talks about what people need to be careful and I completely agree with this blog.

On this post, I will show you a few things we need to think about as I convert simple algorithm shown on the previous post to a lambda expression in Java 8 and to functions in Scala.

- Minimum Difference of Each Two Numbers in a Sorted Array / List
The previous post "Algorithm, Performance, and Back to the Basic" showed one solution.  When an input array is in a sorted order, this is a simple imperative code..
[Code 1]
public int findMinAbsDifferenceSorted(int[] numbers) {
   int min = Integer.MAX_VALUE;
   for(int i=1; i<numbers.length; i++){
      int diff = numbers[i] - numbers[i-1];
      if(diff < min){
         min = diff;
      }
   }
   return min;
}

- Functional Way
I am not very good at functional programming yet. So, I thought about how I can convert the simple imperative method to a functional way.  Since the functional programming suppose to be simple, less maintainable, and less error prone, this is how I write.  I forced myself to write the method this way, but I think creating another IntStream may not be a good functional way. (Functional programming experts:  Please suggest me the better code)
[Code 2]
public int findMinAbsDifferenceSortStream(int[] numbers) {
   return IntStream.range(0, numbers.length-1)
            .map(i -> numbers[i+1] - numbers[i])
            .min().getAsInt();
}

- Methods in Scala
We can convert the imperative method to functional programing in a few different ways and I will show you two different methods.
Without much thinking, this could be a simple way in Scala
[Code 3]
{for(i <- 1 to numbers.length -1) yield numbers(i) - numbers(i-1)}.min

After some consideration, this could be another way.
[Code 4]
numbers.tail.zip(numbers).map(n => n._1 - n._2).min

As you see, the Code 3 & 4 are much simpler than the Code 2 or Code 1.  Great! Are we happy now?  If you want to say "Yes", please wait until you answer the following questions.

1. Which code is better between Code 3 and Code 4? (Yes, I am not even asking you to think about different languages)
2. What would be a Big O notation on these Code 3 & 4?  Can you anticipate the growth rate?

It is known that Functional Programming focuses on "what to do" instead of "how to do" as like the imperative programming.  So, the Code 3 & 4 shows us "what to do" but is it really enough for software engineers?

- Execution Time of Each Method
Running 5 times with 100,000 random numbers :

Code 3:  22958 ~ 50029 ms
Code 4:  6 ~ 33 ms

Running 5 times with 10,000,000 random numbers :

Code 1:  5 ~ 9 ms
Code 2:  9 ~ 13 ms
Code 3:  Didn't even run it
Code 4:  3262 ~ 11068 ms

I understand that there was some unfairness: Scala uses an Int object and Java code used an primitive 'int' data type.  So, I also used a List<Integer> data type for the Code 1 & 2.  The execution time with 10,000,000 random numbers are

Code 1 with List<Integer>:  21 ~ 30 ms
Code 2 with List<Integer>:  26 ~ 38 ms

- Summary
I showed execution time of each method although I understand the execution time isn't everything.  Functional programming makes an parallel process simpler and we can utilize the usage of a multi core processor.  Nevertheless, I am asking how we can evaluate the algorithm of Code 3 and Code 4 before running the code with larger data set.  This much slower execution time might be not only a matter of an executing algorithm itself, but also using and creating immutable objects.

As we can see from many other resources, the Functional programming has many strengths, but I just want to say that software engineers need to see more than "what to do".  Without seeing "how to do", we may have hard time to control the quality of software. (although the definition of the quality may be different to different people.)

Sunday, November 5, 2017

Algorithm, Performance and Back to the Basic

I have had some chances to interview people for a Sr. software engineer position.  Sometimes, people don't remember what they learned at a college after working for companies more than 10 years.  I understand all situations and excuses because I may be same.

So, I chose a easier problem and wanted to see how they approach the problem within the first 10 minutes out of 50 minutes given to me.  OK. I got this question from a HackerRank.com, and it was a problem for 15 points, which means a easy problem.


Basically, question is about finding a minimum absolute difference between any two numbers in a array with a size 'n'.  Surprisingly enough, I didn't have any interesting discussion with candidates.  I will talk about very basic algorithm and efficiency here and continue my talk on a Lambda expression or functional programming on the next post.

- Brute-Force Approach
Most developers regardless of their job title can solve the problem using a nested two for loop without even thinking about much.

public int findMinAbsDifferenceLoop(int[] numbers) {
   int min = Integer.MAX_VALUE;
   for(int i=0; i<numbers.length; i++){
      for(int j=i+1; j<numbers.length; j++){
         int diff = Math.abs((numbers[i] - numbers[j]));
         if(min > diff){
            min = diff; 
         }
      }
   }
   return min;
}

Any problem with this simple solution if your interest is simply getting an answer?

Then, what is the big O notation of this algorithm?  Simply O(n^2)!!  What does it mean? As a number of data in the array grows, this execution time grows as fast as a graph of a square of x grows.

- Improvement after a Little Thinking.
Since we need to compare every each two numbers, the previous approach with two loops might be necessary.  But what if the numbers in the array were sorted? Do we still need to compare the first number with the second, third, and fourth number to find a minimum difference? No! Right?

public int findMinAbsDifferenceSorted(int[] numbers) {
   int min = Integer.MAX_VALUE;
   for(int i=1; i<numbers.length; i++){
      int diff = numbers[i] - numbers[i-1];
      if(diff < min){
         min = diff;
      }
   }
   return min;
}

This logic? Simply O(n)!!
What about the expense of the sorting??  Yes, Yes, Yes...!! We should have talked about it even if you were not a software engineer!

There are many different sorting algorithms and each algorithm has a best, a worst, and an average case as shown on this wiki page.  Nevertheless, most people will not have a problem with saying an average of O(n log n).

public int findMinAbsDifferenceSorted(int[] numbers) {
   Arrays.sort(numbers);
   int min = Integer.MAX_VALUE;
   for(int i=1; i<numbers.length; i++){
      int diff = numbers[i] - numbers[i-1];
      if(diff < min){
         min = diff;
      }
   }
   return min;
}

So, this logic has O(n log n) + O(n) --> O(n log n)!

- Execution Time
I created random numbers and this is the execution time on my mac book pro with a processor 2.3 GHz Intel Core i7.

X-axis indicates a number of data in the array starting from 10,000 to 100,000 incremented by 2,000. Y-axis indicates an execution time in millisecond.  Execution time for 100,000 data with the double for loop was 12,627 millisecond, but one with the Sorted data was 6 millisecond.  As you see, the execution time with the sorted data is almost flat on this graph.

I am not patient enough to run more number of data using the inner for loop.  This is execution time for the sorted data, size from 100,000 to 1,000,000 incremented by 20,000.  Although, there are some fluctuated execution time, it took only 78 millisecond to calculate the min difference with 1 million numbers.


Summary
Some people try to solve this problem with O(n) algorithm.  Great! But I will not talk about a better algorithm, because my purpose of this post is asking people to think about the basic and to provide you some refreshment before I talk about the functional programming on the next post.


Java 9: Flow - Reactive Programming

Programming world has always been changed fast enough and many programming / design paradigms have been introduced such as object oriented p...