Tuesday, December 13, 2011

Testing highly concurrent code

How often are you facing the issues with testing highly concurrent code? It's not so easy to write a test which verifies asynchronous procedure call or verifies that some tasks has been executed by some thread pool worker. Fortunately, it's getting much easier with this awesome library - Awaitility.

Let me demonstrate on a few simple but meaningful enough examples how easy it is to enrich your tests with it. Let's start with a POM file including only necessary stuff - JUnit and Awaitility.






Nothing special here. Our class under the test will collect notifications for particular users. As creating the notifications could take some time, the implementation will collect those using thread pool and asynchronous method invocation pattern: the method will return immediately delegating execution to thread pool. Pretty typical design decision. Let take a look on sample implementation.
package com.example.awaitility;

import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncQueueService {
    private final ExecutorService executor = Executors.newFixedThreadPool( 3 );
    private final ConcurrentLinkedQueue< Notification > queue = new ConcurrentLinkedQueue< Notification >();

    public static class Notification {
        private final long userId;

        public Notification( final long userId ) {
            this.userId = userId;

        public long getUserId() {
            return userId;

    public void enqueue( final Collection< Long > users ) {
        executor.execute( new Runnable() {
                public void run() {
                    for( final long userId: users ) {
                        // do some work with notifications
                        queue.add( new Notification( userId ) );

    public void clear() {

    public int size() {
        return queue.size();
I omit a bunch of details trying to make code simple and concentrate on important: method enqueue. As we can see, this method delegates all the work to internal thread pool. Now, how would we create a test to verify that this method actually works? It's difficult because method returns immediately, the result of its execution will be available sometime in the future. Mocking thread pool (executor service) is not a very good idea as it uses the internals of implementation. What if we decide to move from thread pool to scheduled task? Test should work without any change. It's where Awaitility comes on a rescue. Let's take a look on this test case:
package com.example.awaitility;

import static com.jayway.awaitility.Awaitility.await;
import static org.hamcrest.core.IsEqual.equalTo;

import java.util.Arrays;
import java.util.concurrent.Callable;

import org.junit.Before;
import org.junit.Test;

public class AsyncQueueServiceTestCase {
    private AsyncQueueService service;

    public void setUp() {
        service = new AsyncQueueService();

    public void testEnqueueManyNotifications() throws Exception {
        final Long[] users = new Long[] { 1L, 2L, 3L, 4L, 5L };

        service.enqueue( Arrays.asList( users ) );

            new Callable< Integer >() {
                public Integer call() throws Exception {
                    return service.size();
            equalTo( users.length )
As we see, the test method calls enqueue with some list of users. The test verifies that same amount of notifications should be in queue as users passed to enqueue method. With Awaitility such assertion is very trivial: just wait till service.size() will be equal to users.length!

I have just touched the surface of Awaitility. It has many features and even specific DSLs for Groovy and Scala. I highly encourage to take a look on it!