Reactive Streams is an API. According to the Reactive Manifesto, Reactive Systems are Responsive, Resilient, Elastic, and (Asynchronous) Message Driven. Large systems are composed of smaller ones and therefore depend on the Reactive properties.
Java 9 Flow API
The java.util.concurrent.Flow class in Java 9 corresponds to the Reactive Streams API, and there are already supporting implementations such as Akka, Ratpack, Reactor, etc. The Flow has four nested interfaces: Flow.Processor, Flow.Publisher, Flow.Subscriber, and Flow.Subscription. Among these four interfaces, the Java 9 provides (only) one implementing class of the Publisher: java.util.concurrent.SubmissionPublisher.
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); } public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } public static interface Subscription { public void request(long n); public void cancel(); } public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
The Publisher publishes a stream of data that the Subscriber is asynchronously subscribed, and it usually defines its own Subscription implementation by having an inner Subscription class as like a private BufferedSubscription class in the SubmissionPublisher class. It usually publishes itms to the subscriber asynchronously using an Executor / ExecutorService ( ForkJoinPool as an implementing class for better work management.) Subscription's two defined methods let a Subscriber requests items or cancels it based on Subscriber's need and capacity. It is a concept of 'back pressure' defined under the "Message Driven" property of the Reactive Streams and I will briefly talk about the Back Pressure later on this article.
The Subscriber arranges items to be requested or processed and this is some description of each method defined in the API.
- onComplete : Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.
- onError(Throwable throwable) : Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.
- onNext(T item) : Method invoked with a Subscription's next item.
- onSubscribe(Subscription subscription) : Method invoked prior to invoking any other Subscriber methods for the given Subscription.
Example Code
Let's now see an implementation example using a SubmissionPublisher provided in Java 9.
- Implementation of Subscriber
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; public class MySubscriber<T> implements Subscriber<T> { private Subscription subscription; private String name; public MySubscriber(String name) { this.name = name; } @Override public void onComplete() { System.out.println(name + ": onComplete"); } @Override public void onError(Throwable t) { System.out.println(name + ": onError"); t.printStackTrace(); } @Override public void onNext(T msg) { System.out.println(name + ": " + msg.toString() + " received in onNext"); subscription.request(1); } @Override public void onSubscribe(Subscription subscription) { System.out.println(name + ": onSubscribe"); this.subscription = subscription; subscription.request(1); } } |
- The Main using the SubmissionPublisher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | import java.util.concurrent.SubmissionPublisher; public class FlowMain { public static void main(String[] args) { SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); MySubscriber<String> subscriber = new MySubscriber<>("Mine"); MySubscriber<String> subscriberYours = new MySubscriber<>("Yours"); MySubscriber<String> subscriberHis = new MySubscriber<>("His"); MySubscriber<String> subscriberHers = new MySubscriber<>("Hers"); publisher.subscribe(subscriber); publisher.subscribe(subscriberYours); publisher.subscribe(subscriberHis); publisher.subscribe(subscriberHers); publisher.submit("One"); publisher.submit("Two"); publisher.submit("Three"); publisher.submit("Four"); publisher.submit("Five"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } publisher.close(); } } |
Let's run the main application first and then talk about how this Flow application works.
- Output
This is one possible output depending on a status of the runtime parallel execution.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | Mine: onSubscribe Hers: onSubscribe His: onSubscribe Yours: onSubscribe Yours: One received in onNext His: One received in onNext Mine: One received in onNext Hers: One received in onNext Mine: Two received in onNext His: Two received in onNext Yours: Two received in onNext His: Three received in onNext Mine: Three received in onNext Hers: Two received in onNext Mine: Four received in onNext His: Four received in onNext Yours: Three received in onNext His: Five received in onNext Mine: Five received in onNext Hers: Three received in onNext Yours: Four received in onNext Hers: Four received in onNext Yours: Five received in onNext Hers: Five received in onNext Mine: onComplete Hers: onComplete His: onComplete Yours: onComplete |
- Explanation of the Flow Application
It has one SubmissionPublisher and four subscribers that subscribe to the publisher. Depending on the implementation of the publisher, but in general, a Publisher implements an asynchronous implementation. In the SubmissionPublisher, the common pool instance of the ForkJoinPool is used unless it cannot support parallelism.
I put a break point on a line 32 on the MySubscriber.java and a line 18 on the FlowMain.java. This is my debug window, which shows a usage of the ForkJoinPool.commonPool and multiple workers running.
During a call of the subscribe method in the SubmissionPublisher, the onSubscribe method of the MySubscriber is called. The line 33 on the MySubscriber controls a number of items to the current unfulfilled demand for this subscription. When a publisher publishes an item / message on the line 18~22 of the FlowMain.java, the onNext method of the MySubscriber is called.
Without the line 26 on the MySubscriber, the subscribers will not receive more than one item because the subscriber requested only one item on the line 33. This is how the subscriber controls a number of items to handle. (again, Back Pressure). For unbounded number of items, Long.MAX_VALUE can be used on the line 33: subscription.request(Long.MAX_VALUE)
Per each publication activity (using the submit method) on the line 18 ~ 22, all subscribers of the publisher get a notification with an item / message which is shown on the line 5 ~ 24 of the Output. You may notice that the order of output is not same as the order of the subscription because each worker in the ForkJoinPool works in parallel.
When the publisher closes the publication, the onComplete method of the Subscriber is called and the subscriber can do any necessary process.
Back Pressure
On this article, I mentioned the 'back pressure'. This concept is defined in the Message Driven characteristic of the Reactive Systems. The Reactive Manifesto defines the "Back Pressure" like this:
"When one component is struggling to keep-up, the system as a whole needs to respond in a sensible way. It is unacceptable for the component under stress to fail catastrophically or to drop messages in an uncontrolled fashion. Since it can’t cope and it can’t fail it should communicate the fact that it is under stress to upstream components and so get them to reduce the load. This back-pressure is an important feedback mechanism that allows systems to gracefully respond to load rather than collapse under it. The back-pressure may cascade all the way up to the user, at which point responsiveness may degrade, but this mechanism will ensure that the system is resilient under load, and will provide information that may allow the system itself to apply other resources to help distribute the load."
Summary
A paradigm of the Reactive Programming is not new, and many other products have already started to support the paradigm. It is great to have Java 9 adding a final class Flow and providing a systematic way to implement the reactive programming. You may often need to implement your own Publisher class based on the interfaces defined in the Flow. During the implementation, developers need to focus on the core of the reactive programming: Responsive, Resilient, Elastic, and (Asynchronous) Message Driven.
To achieve the core of the reactive programming, the concurrent programming also become more important. Understanding of Future /CompletableFuture, Executor / ExecutorService, ForkJoinPool, Callable, etc are very necessary and I may cover them on a next post.