Sunday, November 26, 2017

Java 9: Flow - Reactive Programming

Programming world has always been changed fast enough and many programming / design paradigms have been introduced such as object oriented programming, domain driven design, functional programming, reactive programming and so on, although they are not always conflicting to each other.  Java 8 introduced Streams and Lambda Expression and supports the functional programming.  Java 9 introduced a Flow API and established a common interface for the Reactive Programming.

Reactive Streams is an API.  According to the Reactive Manifesto, Reactive Systems are Responsive, Resilient, Elastic, and (Asynchronous) Message Driven.  Large systems are composed of smaller ones and therefore depend on the Reactive properties.

Java 9 Flow API
The java.util.concurrent.Flow class in Java 9 corresponds to the Reactive Streams API, and there are already supporting implementations such as AkkaRatpackReactor, etc.  The Flow has four nested interfaces: Flow.Processor, Flow.Publisher, Flow.Subscriber, and Flow.Subscription.   Among these four interfaces, the Java 9 provides (only) one implementing class of the Publisher: java.util.concurrent.SubmissionPublisher.

@FunctionalInterface
public static interface Publisher<T> {
   public void subscribe(Subscriber<? super T> subscriber);
}
 
public static interface Subscriber<T> {
   public void onSubscribe(Subscription subscription);
   public void onNext(T item);
   public void onError(Throwable throwable);
   public void onComplete();
}
 
public static interface Subscription {
   public void request(long n);
   public void cancel();
}
 
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

The Publisher publishes a stream of data that the Subscriber is asynchronously subscribed, and it usually defines its own Subscription implementation by having an inner Subscription class as like a private BufferedSubscription class in the SubmissionPublisher class.  It usually publishes itms to the subscriber asynchronously using an ExecutorExecutorServiceForkJoinPool as an implementing class for better work management.)  Subscription's two defined methods let a Subscriber requests items or cancels it based on Subscriber's need and capacity.  It is a concept of 'back pressure' defined under the "Message Driven" property of the Reactive Streams and I will briefly talk about the Back Pressure later on this article.

The Subscriber arranges items to be requested or processed and this is some description of each method defined in the API.
 - onComplete :  Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.

 - onError(Throwable throwable) :  Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.

 - onNext(T item) :  Method invoked with a Subscription's next item.

 - onSubscribe(Subscription subscription)Method invoked prior to invoking any other Subscriber methods for the given Subscription.

Example Code
Let's now see an implementation example using a SubmissionPublisher provided in Java 9.

- Implementation of Subscriber

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class MySubscriber<T> implements Subscriber<T> {
   private Subscription subscription;
   private String name;

   public MySubscriber(String name) {
      this.name = name;
   }

   @Override
   public void onComplete() {
      System.out.println(name + ": onComplete");
   }

   @Override
   public void onError(Throwable t) {
      System.out.println(name + ": onError");
      t.printStackTrace();
   }

   @Override
   public void onNext(T msg) {
      System.out.println(name + ": " + msg.toString() + " received in onNext");
      subscription.request(1);
   }

   @Override
   public void onSubscribe(Subscription subscription) {
      System.out.println(name + ": onSubscribe");
      this.subscription = subscription;
      subscription.request(1);
   }
}


- The Main using the SubmissionPublisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.SubmissionPublisher;

public class FlowMain {

   public static void main(String[] args) {
      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

      MySubscriber<String> subscriber = new MySubscriber<>("Mine");
      MySubscriber<String> subscriberYours = new MySubscriber<>("Yours");
      MySubscriber<String> subscriberHis = new MySubscriber<>("His");
      MySubscriber<String> subscriberHers = new MySubscriber<>("Hers");

      publisher.subscribe(subscriber);
      publisher.subscribe(subscriberYours);
      publisher.subscribe(subscriberHis);
      publisher.subscribe(subscriberHers);

      publisher.submit("One");
      publisher.submit("Two");
      publisher.submit("Three");
      publisher.submit("Four");
      publisher.submit("Five");

      try {
         Thread.sleep(1000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      publisher.close();
   }
}

Let's run the main application first and then talk about how this Flow application works.

- Output
This is one possible output depending on a status of the runtime parallel execution.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Mine: onSubscribe
Hers: onSubscribe
His: onSubscribe
Yours: onSubscribe
Yours: One received in onNext
His: One received in onNext
Mine: One received in onNext
Hers: One received in onNext
Mine: Two received in onNext
His: Two received in onNext
Yours: Two received in onNext
His: Three received in onNext
Mine: Three received in onNext
Hers: Two received in onNext
Mine: Four received in onNext
His: Four received in onNext
Yours: Three received in onNext
His: Five received in onNext
Mine: Five received in onNext
Hers: Three received in onNext
Yours: Four received in onNext
Hers: Four received in onNext
Yours: Five received in onNext
Hers: Five received in onNext
Mine: onComplete
Hers: onComplete
His: onComplete
Yours: onComplete

- Explanation of the Flow Application
It has one SubmissionPublisher and four subscribers that subscribe to the publisher.  Depending on the implementation of the publisher, but in general, a Publisher implements an asynchronous implementation. In the SubmissionPublisher, the common pool instance of the ForkJoinPool is used unless it cannot support parallelism.

I put a break point on a line 32 on the MySubscriber.java and a line 18 on the FlowMain.java.  This is my debug window, which shows a usage of the ForkJoinPool.commonPool and multiple workers running.

During a call of the subscribe method in the SubmissionPublisher, the onSubscribe method of the MySubscriber is called. The line 33 on the MySubscriber controls a number of items to the current unfulfilled demand for this subscription.  When a publisher publishes an item / message on the line 18~22 of the FlowMain.java, the onNext method of the MySubscriber is called.

Without the line 26 on the MySubscriber, the subscribers will not receive more than one item because the subscriber requested only one item on the line 33.  This is how the subscriber controls a number of items to handle. (again, Back Pressure). For unbounded number of items, Long.MAX_VALUE can be used on the line 33:  subscription.request(Long.MAX_VALUE)

Per each publication activity (using the submit method) on the line 18 ~ 22, all subscribers of the publisher get a notification with an item / message which is shown on the line 5 ~ 24 of the Output.  You may notice that the order of output is not same as the order of the subscription because each worker in the ForkJoinPool works in parallel. 

When the publisher closes the publication, the onComplete method of the Subscriber is called and the subscriber can do any necessary process.

Back Pressure
On this article, I mentioned the 'back pressure'.  This concept is defined in the Message Driven characteristic of the Reactive Systems.  The Reactive Manifesto defines the "Back Pressure" like this:

"When one component is struggling to keep-up, the system as a whole needs to respond in a sensible way. It is unacceptable for the component under stress to fail catastrophically or to drop messages in an uncontrolled fashion. Since it can’t cope and it can’t fail it should communicate the fact that it is under stress to upstream components and so get them to reduce the load. This back-pressure is an important feedback mechanism that allows systems to gracefully respond to load rather than collapse under it. The back-pressure may cascade all the way up to the user, at which point responsiveness may degrade, but this mechanism will ensure that the system is resilient under load, and will provide information that may allow the system itself to apply other resources to help distribute the load."

Summary
A paradigm of the Reactive Programming is not new, and many other products have already started to support the paradigm.  It is great to have Java 9 adding a final class Flow and providing a systematic way to implement the reactive programming.  You may often need to implement your own Publisher class based on the interfaces defined in the Flow.  During the implementation, developers need to focus on the core of the reactive programming:  Responsive, Resilient, Elastic, and (Asynchronous) Message Driven.

To achieve the core of the reactive programming, the concurrent programming also become more important. Understanding of Future /CompletableFuture, Executor / ExecutorService, ForkJoinPool, Callable, etc are very necessary and I may cover them on a next post.

No comments:

Post a Comment

Java 9: Flow - Reactive Programming

Programming world has always been changed fast enough and many programming / design paradigms have been introduced such as object oriented p...