Showing posts with label asynchronous. Show all posts
Showing posts with label asynchronous. Show all posts

Saturday, May 29, 2021

Chasing Java's release train, from 8 to 16. Part 2: The race to the next LTS release.

In the first part we thoroughly went through the massive amount of features delivered in scope of JDK-9. Nevertheless, this release was always considered as being transitional, with little or no adoption expected. It has a mission to kick off the race towards next LTS release, JDK-11.

JDK 10

JDK-10, the first release followed the six months cadence cycle, brought a number of new features into the language and JVM itself. Let us take a look at the most interesting ones from the developer's perspective.

Undoubtedly, JDK-10 release has quite moderate amount of features comparing to JDK-9, but every one of those was delivered much faster, thanks to the new release cycle.

JDK 11

The first LTS release of the JDK following the new schedule, JDK-11, had seen the light in 2018, six month after JDK-10 release. It finally brought a long awaited stability and established a new baseline in post JDK-9 world. It also included a number of features.

It worth to note that JDK-11 had introduced two new garbage collectors, ZGC and Epsilon, both were marked as experimental. We are going to get back to those in the upcoming posts while discussing more recent JDK releases.

So, where are we today? The JDK-11 slowly but steadily getting more adoption as more and more projects migrate off the JDK-8. Nonetheless, the majority are still on JDK-8 and in my opinion, there are no reasons to expect drastic changes of the balance within next couple of years. But this is another story ...

Wednesday, September 30, 2020

For gourmets and practioners: pick your flavour of the reactive stack with JAX-RS and Apache CXF

When JAX-RS 2.1 specification was released back in 2017, one of its true novelties was the introduction of the reactive API extensions. The industry has acknowledged the importance of the modern programming paradigms and specification essentially mandated the first-class support of the asynchronous and reactive programming for the Client API.

But what about the server side? It was not left out, the JAX-RS 2.1 asynchronous processing model has been enriched with Java 8's CompletionStage support, certainly a step in a right direction. Any existing REST web APIs built on top of the JAX-RS 2.1 implementation (like Apache CXF for example) could benefit from such enhancements right away.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@Service
@Path("/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public CompletionStage<List<Person>> getPeople() {
        return CompletableFuture
        	.supplyAsync(() -> Arrays.asList(new Person("a@b.com", "Tom", "Knocker")));
    }
}

Udoubtedly, CompletionStage and CompletableFuture are powerful tools but not without own quirks and limitations. The Reactive Streams specification and a number of its implementations offer a considerably better glimpse on how asynchronous and reactive programming should look like on JVM. With that, the logical question pops up: could your JAX-RS web services and APIs take advantage of the modern reactive libraries? And if the answer is positive, what does it take?

If your bets are on Apache CXF, you are certainly well positioned. The latest Apache CXF 3.2.14 / 3.3.7 / 3.4.0 release trains bring a comprehesive support of RxJava3, RxJava2 and Project Reactor. Along this post we are going to see how easy it is to plug your favorite reactive library in and place it at the forefront of your REST web APIs and services.

Since the most applications and services on the JVM are built on top of excellent Spring framework and Spring Boot, we will be developing the reference implementations using those as a foundation. The Spring Boot starter which comes along with Apache CXF distribution is taking care of most of the boring wirings you would have needed to do otherwise.

<dependency>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-spring-boot-starter-jaxrs</artifactId>
	<version>3.4.0</version>
</dependency>

The Project Reactor is the number one choice as the reactive foundation for Spring-based applications and services, so let us just start from that.

<dependency>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-rt-rs-extension-reactor</artifactId>
	<version>3.4.0</version>
</dependency>

Great, believe it or not, we are mostly done here. In order to teach Apache CXF to understand Project Reactor types like Mono or/and Flux we need to tune the configuration just a bit using ReactorCustomizer instance.

import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
    @Bean
    public Server rsServer(Bus bus, PeopleRestService service) {
        final JAXRSServerFactoryBean bean = new JAXRSServerFactoryBean();
        bean.getProperties(true).put("useStreamingSubscriber", true);
        bean.setBus(bus);
        bean.setAddress("/");
        bean.setServiceBean(service);
        bean.setProvider(new JacksonJsonProvider());
        new ReactorCustomizer().customize(bean);
        return bean.create();
    }
}

With such customization in-place, our JAX-RS web services and APIs could freely utilize Project Reactor primitives in a streaming fashion, for example.

import reactor.core.publisher.Flux;

@Service
@Path("/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Flux<Person> getPeople() {
        return Flux.just(new Person("a@b.com", "Tom", "Knocker"));
    }
}

As you probably noticed, the implementation purposely does not do anything complicated. However, once the reactive types are put at work, you could unleash the full power of the library of your choice (and Project Reactor is really good at that).

Now, when you undestand the principle, it comes the turn of the RxJava3, the last generation of the pioneering reactive library for the JVM platform.

<dependency>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-rt-rs-extension-rx3</artifactId>
	<version>3.4.0</version>
</dependency>

The configuration tuning is mostly identical to the one we have seen with Project Reactor, the customizer instance, ReactiveIOCustomizer, is all that changes.

import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
    @Bean
    public Server rsServer(Bus bus, PeopleRestService service) {
        final JAXRSServerFactoryBean bean = new JAXRSServerFactoryBean();
        bean.getProperties(true).put("useStreamingSubscriber", true);
        bean.setBus(bus);
        bean.setAddress("/");
        bean.setServiceBean(service);
        bean.setProvider(new JacksonJsonProvider());
        new ReactiveIOCustomizer().customize(bean);
        return bean.create();
    }
}

The list of supported types includes Flowable, Single and Observable, the equivalent implementation in terms of RxJava3 primitives may look like this.


import io.reactivex.rxjava3.core.Flowable;

@Service
@Path("/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Flowable<Person> getPeople() {
        return Flowable.just(new Person("a@b.com", "Tom", "Knocker"));
    }
}

Pretty simple, isn't it? If you stuck with an older generation, RxJava2, nothing to worry about, Apache CXF has you covered.

<dependency>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-rt-rs-extension-rx2</artifactId>
	<version>3.4.0</version>
</dependency>

The same configuration trick with applying the customizer (which may look annoying at this point to be fair) is all that is required.

import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
    @Bean
    public Server rsServer(Bus bus, PeopleRestService service) {
        final JAXRSServerFactoryBean bean = new JAXRSServerFactoryBean();
        bean.getProperties(true).put("useStreamingSubscriber", true);
        bean.setBus(bus);
        bean.setAddress("/");
        bean.setServiceBean(service);
        bean.setProvider(new JacksonJsonProvider());
        new ReactiveIOCustomizer().customize(bean);
        return bean.create();
    }
}

And we are good to go, ready to use the familiar reactive types Observable, Flowable and Single.

import io.reactivex.Flowable;
import io.reactivex.Observable;

@Service
@Path("/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Observable<Person> getPeople() {
        return Flowable
            .just(new Person("a@b.com", "Tom", "Knocker"))
            .toObservable();
    }
}

Last but not least, if you happens to be using the first generation of RxJava, it is also available with Apache CXF but certainly not recommended for production (as it has EOLed a couple of years ago).

Reactive programming paradigm is steadily getting more and more traction. It is great to see that the ecosystem embraces that and frameworks like Apache CXF are not an exception. If you are looking for robust foundation to build reactive and/or asynchronous REST web APIs on JVM, Apache CXF is worth considering, please give it a try!

The complete source code is available on Github.

Tuesday, December 13, 2011

Testing highly concurrent code

How often are you facing the issues with testing highly concurrent code? It's not so easy to write a test which verifies asynchronous procedure call or verifies that some tasks has been executed by some thread pool worker. Fortunately, it's getting much easier with this awesome library - Awaitility.

Let me demonstrate on a few simple but meaningful enough examples how easy it is to enrich your tests with it. Let's start with a POM file including only necessary stuff - JUnit and Awaitility.

 4.0.0

 com.example
 awaitility
 0.0.1-SNAPSHOT
 jar

 
  UTF-8
 

 
  
   com.jayway.awaitility
   awaitility
   1.3.3
   test
  

  
   junit
   junit
   4.8.2
   test
  
 

Nothing special here. Our class under the test will collect notifications for particular users. As creating the notifications could take some time, the implementation will collect those using thread pool and asynchronous method invocation pattern: the method will return immediately delegating execution to thread pool. Pretty typical design decision. Let take a look on sample implementation.
package com.example.awaitility;

import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncQueueService {
    private final ExecutorService executor = Executors.newFixedThreadPool( 3 );
    private final ConcurrentLinkedQueue< Notification > queue = new ConcurrentLinkedQueue< Notification >();

    public static class Notification {
        private final long userId;

        public Notification( final long userId ) {
            this.userId = userId;
        }

        public long getUserId() {
            return userId;
        }
    }

    public void enqueue( final Collection< Long > users ) {
        executor.execute( new Runnable() {
                public void run() {
                    for( final long userId: users ) {
                        // do some work with notifications
                        queue.add( new Notification( userId ) );
                    }
                }
            }
        );
    }

    public void clear() {
        queue.clear();
    }

    public int size() {
        return queue.size();
    }
}
I omit a bunch of details trying to make code simple and concentrate on important: method enqueue. As we can see, this method delegates all the work to internal thread pool. Now, how would we create a test to verify that this method actually works? It's difficult because method returns immediately, the result of its execution will be available sometime in the future. Mocking thread pool (executor service) is not a very good idea as it uses the internals of implementation. What if we decide to move from thread pool to scheduled task? Test should work without any change. It's where Awaitility comes on a rescue. Let's take a look on this test case:
package com.example.awaitility;

import static com.jayway.awaitility.Awaitility.await;
import static org.hamcrest.core.IsEqual.equalTo;

import java.util.Arrays;
import java.util.concurrent.Callable;

import org.junit.Before;
import org.junit.Test;

public class AsyncQueueServiceTestCase {
    private AsyncQueueService service;

    @Before
    public void setUp() {
        service = new AsyncQueueService();
    }

    @Test
    public void testEnqueueManyNotifications() throws Exception {
        final Long[] users = new Long[] { 1L, 2L, 3L, 4L, 5L };

        service.enqueue( Arrays.asList( users ) );

        await().until(
            new Callable< Integer >() {
                public Integer call() throws Exception {
                    return service.size();
                }
            },
            equalTo( users.length )
        );
    }
}
As we see, the test method calls enqueue with some list of users. The test verifies that same amount of notifications should be in queue as users passed to enqueue method. With Awaitility such assertion is very trivial: just wait till service.size() will be equal to users.length!

I have just touched the surface of Awaitility. It has many features and even specific DSLs for Groovy and Scala. I highly encourage to take a look on it!