Saturday, December 29, 2012

Implementing Producer / Consumer using SynchronousQueue

Among plenty of useful classes which Java provides for concurrency support, there is one I would like to talk about: SynchronousQueue. In particular, I would like to walk through Producer / Consumer implementation using handy SynchronousQueue as an exchange mechanism.

It might not sound clear why to use this type of queue for producer / consumer communication unless we look under the hood of SynchronousQueue implementation. It turns out that it's not really a queue as we used to think about queues. The analogy would be just a collection containing at most one element.

Why it's useful? Well, there are several reasons. From producer's point of view, only one element (or message) could be stored into the queue. In order to proceed with the next element (or message), the producer should wait till consumer consumes the one currently in the queue. From consumer's point of view, it just polls the queue for next element (or message) available. Quite simple, but the great benefit is: producer cannot send messages faster than consumer can process them.

Here is one of the use cases I encountered recently: compare two database tables (possibly just huge) and detect if those contain different data or data is the same (copy). The SynchronousQueue is quite a handy tool for this problem: it allows to handle each table in own thread as well as compensate the possible timeouts / latency while reading from two different databases.

Let's start by defining our compare function which accepts source and destination data sources as well as a table name (to compare). I am using quite useful JdbcTemplate class from Spring framework as it extremely well abstract all the boring details dealing with connections and prepared statements.

public boolean compare( final DataSource source, final DataSource destination, final String table )  {
    final JdbcTemplate from = new  JdbcTemplate( source );
    final JdbcTemplate to = new JdbcTemplate( destination );

Before doing any actual data comparison, it's a good idea to compare table's row count of the source and destination databases:

if( from.queryForLong("SELECT count(1) FROM " + table ) != to.queryForLong("SELECT count(1) FROM " + table ) ) {
    return false;

Now, at least knowing that table contains same number of rows in both databases, we can start with data comparison. The algorithm is very simple:

  • create a separate thread for source (producer) and destination (consumer) databases
  • producer thread reads single row from the table and puts it into the SynchronousQueue
  • consumer thread also reads single row from the table, then asks queue for the available row to compare (waits if necessary) and lastly compare two result sets

Using another great part Java concurrent utilities for thread pooling, let's define a thread pool with fixed amount of threads (2).

final ExecutorService executor = Executors.newFixedThreadPool( 2 );
final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();        

Following the described algorithm, the producer functionality could be represented as a single callable:

Callable< Void > producer = new Callable< Void >() {
    public Void call() throws Exception {
        from.query( "SELECT * FROM " + table,
            new RowCallbackHandler() {
                public void processRow(ResultSet rs) throws SQLException {
                    try {                   
                        List< ? > row = ...; // convert ResultSet to List
                        if( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {
                            throw new SQLException( "Having more data but consumer has already completed" );
                    } catch( InterruptedException ex ) {
                        throw new SQLException( "Having more data but producer has been interrupted" );

        return  null;

The code is a bit verbose due to Java syntax but it doesn't do much actually. Every result set read from the table producer converts to a list (implementation has been omitted as it's a boilerplate) and puts in a queue (offer). If queue is not empty, producer is blocked waiting for consumer to finish his work. The consumer, respectively, could be represented as a following callable:

Callable< Void > consumer = new Callable< Void >() {
    public Void call() throws Exception {
        to.query( "SELECT * FROM " + table,
            new RowCallbackHandler() {
                public void processRow(ResultSet rs) throws SQLException {
                    try {
                        List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );
                        if( source == null ) {
                            throw new SQLException( "Having more data but producer has already completed" );
                        List< ? > destination = ...; // convert ResultSet to List
                        if( !source.equals( destination ) ) {
                            throw new SQLException( "Row data is not the same" );
                    } catch ( InterruptedException ex ) {
                        throw new SQLException( "Having more data but consumer has been interrupted" );
        return  null;

The consumer does a reverse operation on the queue: instead of putting data it pulls it (poll) from the queue. If queue is empty, consumer is blocked waiting for producer to publish next row. The part which is left is only submitting those callables for execution. Any exception returned by the Future's get method indicates that table doesn't contain the same data (or there are issue with getting data from database):

    List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );
    for( final Future< Void > future: futures ) {
        future.get( 5, TimeUnit.MINUTES );

That's basically all for today ... and this year. Happy New Year to everyone!

Friday, November 30, 2012

Using YAML for Java application configuration

YAML is well-known format within Ruby community, quite widely used for a long time now. But we as Java developers mostly deal with property files and XMLs in case we need some configuration for our apps. How many times we needed to express complicated configuration by inventing our own XML schema or imposing property names convention?

Though JSON is becoming a popular format for web applications, using JSON files to describe the configuration is a bit cumbersome and, in my opinion, is not as expressive as YAML. Let's see what YAML can do for us to make our life easier.

For sure, let's start with the problem. In order for our application to function properly, we need to feed it following data somehow:

  • version and release date
  • database connection parameters
  • list of supported protocols
  • list of users with their passwords

This list of parameters sounds a bit weird, but the purpose is to demonstrate different data types in work: strings, numbers, dates, lists and maps. The Java model consists of two simple classes: Connection

package com.example.yaml;

public final class Connection {
    private String url;
    private int poolSize;
    public String getUrl() {
        return url;
    public void setUrl(String url) {
        this.url = url;

    public int getPoolSize() {
        return poolSize;

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    public String toString() {
        return String.format( "'%s' with pool of %d", getUrl(), getPoolSize() );

and Configuration, both are typical Java POJOs, verbose because of property setters and getters (we get used to it, right?).

package com.example.yaml;

import static java.lang.String.format;

import java.util.Date;
import java.util.List;
import java.util.Map;

public final class Configuration { 
    private Date released;
    private String version;
    private Connection connection;
    private List< String > protocols;
    private Map< String, String > users; 
    public Date getReleased() {
        return released;
    public String getVersion() {
        return version;
    public void setReleased(Date released) {
        this.released = released;
    public void setVersion(String version) {
        this.version = version;
    public Connection getConnection() {
        return connection;
    public void setConnection(Connection connection) {
        this.connection = connection;
    public List< String > getProtocols() {
        return protocols;

    public void setProtocols(List< String > protocols) {
        this.protocols = protocols;
    public Map< String, String > getUsers() {
        return users;
    public void setUsers(Map< String, String > users) {
        this.users = users;
    public String toString() {
        return new StringBuilder()
            .append( format( "Version: %s\n", version ) )
            .append( format( "Released: %s\n", released ) )
            .append( format( "Connecting to database: %s\n", connection ) )
            .append( format( "Supported protocols: %s\n", protocols ) )
            .append( format( "Users: %s\n", users ) )

Now, as model is quite clear, let us try to express it as the human being normally does it. Looking back to our list of required configuration, let's try to write it down one by one.

1. version and release date
version: 1.0
released: 2012-11-30
2. database connection parameters
    url: jdbc:mysql://localhost:3306/db
    poolSize: 5
3. list of supported protocols
   - http
   - https
4. list of users with their passwords
    tom: passwd
    bob: passwd

And this is it, our configuration expressed in YAML syntax is completed! The whole file sample.yml looks like this:

version: 1.0
released: 2012-11-30

# Connection parameters
    url: jdbc:mysql://localhost:3306/db
    poolSize: 5

# Protocols
   - http
   - https

# Users
    tom: passwd
    bob: passwd

To make it work in Java, we just need to use the awesome library called snakeyml, respectively the Maven POM file is quite simple:





Please notice the usage of Java 1.7, the language extensions and additional libraries simplify a lot of regular tasks as we could see looking into YamlConfigRunner:

package com.example.yaml;

import java.nio.file.Files;
import java.nio.file.Paths;

import org.yaml.snakeyaml.Yaml;

public class YamlConfigRunner {
    public static void main(String[] args) throws IOException {
        if( args.length != 1 ) {
            System.out.println( "Usage: <file.yml>" );
        Yaml yaml = new Yaml();  
        try( InputStream in = Files.newInputStream( Paths.get( args[ 0 ] ) ) ) {
            Configuration config = yaml.loadAs( in, Configuration.class );
            System.out.println( config.toString() );

The code snippet here loads the configuration from file (args[ 0 ]), tries to parse it and fill up the Configuration class with meaningful data using JavaBeans conventions, converting to the declared types where possible. Running this class with sample.yml as an argument generates the following output:

Version: 1.0
Released: Thu Nov 29 19:00:00 EST 2012
Connecting to database: 'jdbc:mysql://localhost:3306/db' with pool of 5
Supported protocols: [http, https]
Users: {tom=passwd, bob=passwd}

Totally identical to the values we have configured!

Tuesday, October 30, 2012

Simple but powerful concept: packing your Java application as one (or fat) JAR

Today's post will target an interesting but quite powerful concept: packing your application as single, runnable JAR file, also known as one or fat JAR.

We get used to large WAR archives which contain all dependencies packed together under some common folder structure. With JAR-like packaging the story is a bit different: in order to make your application runnable (via java -jar) all dependencies should be provided over classpath parameter or environment variable. Usually it means there would be some lib folder with all dependencies and some runnable script which will do the job to construct classpath and run JVM. Maven Assembly plugin is well know for making such kind of application distribution.

A slightly different approach would be to package all your application dependencies to the same JAR file and make it runnable without any additional parameters or scripting required. Sounds great but ... it won't work unless you add some magic: meet One-JAR project.

Let's briefly outline the problem: we are writing a stand-alone Spring application which should be runnable just by typing java -jar <our-app.jar>.

As always, let's start with our POM file, which will be pretty simple








Our sample application will bootstrap Spring context, get some bean instance and call a method on it. Our bean is called SimpleBean and looks like:

package com.example;

public class SimpleBean {
    public void print() {
        System.out.println( "Called from single JAR!" );

Falling in love with Spring Java configuration, let us define our context as annotated AppConfig POJO:

package com.example.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.example.SimpleBean;

public class AppConfig {
    public SimpleBean simpleBean() {
        return new SimpleBean();

And finally, our application Starter with main():

package com.example;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.example.config.AppConfig;

public class Starter {
    public static void main( final String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
        SimpleBean bean = context.getBean( SimpleBean.class );

Adding our main class to META-INF/MANIFEST.MF allows to leverage Java capabilities to run JAR file without explicitly specifying class with main() method. Maven JAR plugin can help us with that.


Trying to run java -jar spring-one-jar-0.0.1-SNAPSHOT.jar will print the exception to the console: java.lang.NoClassDefFoundError. The reason is pretty straightforward: even such a simple application as this one already required following libraries to be in classpath.


Let's see what One-JAR can do for us here. Thanks to availability of onejar-maven-plugin we can add one to the plugins section of our POM file.


Also, pluginRepositories section should contain this repository in order to download the plugin.


As the result, there will be another artifact available in the target folder, postfixed with one-jar: Running this one with java -jar will print to the console:

Called from single JAR!

Fully runnable Java application as single, redistributable JAR file! The last comment: though our application looks pretty simple, One-JAR works perfectly for complex, large applications as well without any issues. Please, add it to your toolbox, it's really useful tool to have.

Thanks to One-JAR guys!

Sunday, September 30, 2012

Redis pub/sub using Spring

Continuing to discover the powerful set of Redis features, the one worth mentioning about is out of the box support of pub/sub messaging.

Pub/Sub messaging is essential part of many software architectures. Some software systems demand from messaging solution to provide high-performance, scalability, queues persistence and durability, fail-over support, transactions, and many more nice-to-have features, which in Java world mostly always leads to using one of JMS implementation providers. In my previous projects I have actively used Apache ActiveMQ (now moving towards Apache ActiveMQ Apollo). Though it's a great implementation, sometimes I just needed simple queuing support and Apache ActiveMQ just looked overcomplicated for that.

Alternatives? Please welcome Redis pub/sub! If you are already using Redis as key/value store, few additional lines of configuration will bring pub/sub messaging to your application in no time.

Spring Data Redis project abstracts very well Redis pub/sub API and provides the model so familiar to everyone who uses Spring capabilities to integrate with JMS.

As always, let's start with the POM configuration file. It's pretty small and simple, includes necessary Spring dependencies, Spring Data Redis and Jedis, great Java client for Redis.










Moving on to configuring Spring context, let's understand what we need to have in order for a publisher to publish some messages and for a consumer to consume them. Knowing the respective Spring abstractions for JMS will help a lot with that.

  • we need connection factory -> JedisConnectionFactory
  • we need a template for publisher to publish messages -> RedisTemplate
  • we need a message listener for consumer to consume messages -> RedisMessageListenerContainer
Using Spring Java configuration, let's describe our context:
package com.example.redis.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.example.redis.IRedisPublisher;
import com.example.redis.impl.RedisMessageListener;
import com.example.redis.impl.RedisPublisherImpl;

public class AppConfig {
    JedisConnectionFactory jedisConnectionFactory() {
        return new JedisConnectionFactory();

    RedisTemplate< String, Object > redisTemplate() {
        final RedisTemplate< String, Object > template =  new RedisTemplate< String, Object >();
        template.setConnectionFactory( jedisConnectionFactory() );
        template.setKeySerializer( new StringRedisSerializer() );
        template.setHashValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
        template.setValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
        return template;

    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter( new RedisMessageListener() );

    RedisMessageListenerContainer redisContainer() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory( jedisConnectionFactory() );
        container.addMessageListener( messageListener(), topic() );

        return container;
    IRedisPublisher redisPublisher() {
        return new RedisPublisherImpl( redisTemplate(), topic() );

    ChannelTopic topic() {
        return new ChannelTopic( "pubsub:queue" );

Very easy and straightforward. The presence of @EnableScheduling annotation is not necessary and is required only for our publisher implementation: the publisher will publish a string message every 100 ms.

package com.example.redis.impl;

import java.util.concurrent.atomic.AtomicLong;

import org.springframework.scheduling.annotation.Scheduled;

import com.example.redis.IRedisPublisher;

public class RedisPublisherImpl implements IRedisPublisher {
    private final RedisTemplate< String, Object > template;
    private final ChannelTopic topic; 
    private final AtomicLong counter = new AtomicLong( 0 );

    public RedisPublisherImpl( final RedisTemplate< String, Object > template, 
            final ChannelTopic topic ) {
        this.template = template;
        this.topic = topic;

    @Scheduled( fixedDelay = 100 )
    public void publish() {
        template.convertAndSend( topic.getTopic(), "Message " + counter.incrementAndGet() + 
            ", " + Thread.currentThread().getName() );

And finally our message listener implementation (which just prints message on a console).

package com.example.redis.impl;


public class RedisMessageListener implements MessageListener {
    public void onMessage( final Message message, final byte[] pattern ) {
        System.out.println( "Message received: " + message.toString() );

Awesome, just two small classes, one configuration to wire things together and we have full pub/sub messaging support in our application! Let's run the application as standalone ...

package com.example.redis;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.example.redis.config.AppConfig;

public class RedisPubSubStarter {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext( AppConfig.class );
... and see following output in a console:
Message received: Message 1, pool-1-thread-1
Message received: Message 2, pool-1-thread-1
Message received: Message 3, pool-1-thread-1
Message received: Message 4, pool-1-thread-1
Message received: Message 5, pool-1-thread-1
Message received: Message 6, pool-1-thread-1
Message received: Message 7, pool-1-thread-1
Message received: Message 8, pool-1-thread-1
Message received: Message 9, pool-1-thread-1
Message received: Message 10, pool-1-thread-1
Message received: Message 11, pool-1-thread-1
Message received: Message 12, pool-1-thread-1
Message received: Message 13, pool-1-thread-1
Message received: Message 14, pool-1-thread-1
Message received: Message 15, pool-1-thread-1
Message received: Message 16, pool-1-thread-1
Great! There is much more which you could do with Redis pub/sub, excellent documentation is available for you on Redis official web site.

Wednesday, August 29, 2012

BTrace: hidden gem in Java developer toolbox

Today's post is about BTrace which I am considering as a hidden gem for Java developer.
BTrace is a safe, dynamic tracing tool for the Java platform. BTrace can be used to dynamically trace a running Java program (similar to DTrace for OpenSolaris applications and OS).

Shortly, the tool allows to inject tracing points without restarting or reconfiguring your Java application while it's running. Moreover, though there are several ways to do that, the one I would like to discuss today is using JVisualVM tool from standard JDK bundle.

What is very cool, BTrace itself uses Java language to define injection trace points. The approach looks very familiar if you ever did aspect-oriented programming (AOP).

So let's get started with a problem: we have an application which uses one of the NoSQL databases (f.e., let it be MongoDB) and suddenly starts to experience significant performance slowdown. Developers suspect that application runs too many queries or updates but cannot say it with confidence. Here BTrace can help.

First thing first, let's run JVisualVM and install BTrace plugin:

JVisualVM should be restarted in order for plugin to appear. Now, while our application is up and running, let's right click on it in JVisualVM applications tree:

The following very intuitive BTrace editor (with simple toolbar) should appear:

This is a place where tracing instrumentation could be defined and dynamically injected into the running application. BTrace has a very rich model in order to define what exactly should be traced: methods, constructors, method returns, errors, .... Also it supports aggregations out of the box so it quite easy to collect a bunch of metrics while application is running. For our problem, we would like to see which methods related to MongoDB are being executed.

As my application uses Spring Data MongoDB, I am interested in which methods of any implementation of interface are being called by application and how long every call takes. So I have defined a very simple BTrace script:

import com.sun.btrace.*;
import com.sun.btrace.annotations.*;
import static com.sun.btrace.BTraceUtils.*;

public class TracingScript {
    @TLS private static String method;

        clazz = "", 
        method = "/.*/"
    public static void onMongo( 
            @ProbeClassName String className, 
            @ProbeMethodName String probeMethod, 
            AnyType[] args ) {
        method = strcat( strcat( className, "::" ), probeMethod );
        clazz = "", 
        method = "/.*/", 
        location = @Location( Kind.RETURN ) 
    public static void onMongoReturn( @Duration long duration ) {
         println( strcat( strcat( strcat( strcat( "Method ", method ), 
            " executed in " ), str( duration / 1000 ) ), "ms" ) );

Let me explain briefly what I am doing here. Basically, I would like to know when any method of any implementation of is called (onMongo marks that) and duration of the call (onMongoReturn marks that in turn). Thread-local variable method holds full qualified method name (with a class), while thanks to useful BTrace predefined annotation, duration parameter holds the method execution time (in nanoseconds). Though it's pure Java, BTrace allows only small subset of Java classes to be used. It's not a problem as com.sun.btrace.BTraceUtils class provides a lot of useful methods (f.e., strcat) to fill the gaps. Running this script produces following output:

** Compiling the BTrace script ...
*** Compiled
** Instrumenting 1 classes ...
Method executed in 25ms
Method executed in 3ms
Method executed in 22ms
Method executed in 2ms
Method executed in 19ms
Method$100 executed in 2ms
Method$100 executed in 1ms
Method executed in 3ms
Method executed in 2ms
Method executed in 2ms
Method executed in 1ms
Method executed in 6ms
Method$100 executed in 1ms
Method$100 executed in 0ms
Method executed in 2ms
Method executed in 1ms
Method executed in 2ms
Method executed in 1ms
Method executed in 6ms
Method$100 executed in 1ms
Method$100 executed in 0ms
Method executed in 2ms
Method executed in 1ms

As you can see, output contains bunch of inner classes which could easily be eliminated by providing more precise method name templates (or maybe even tracing MongoDB driver instead).

I have just started to discover BTrace but I definitely see a great value for me as a developer from using this awesome tool. Thanks to BTrace guys!

Monday, June 25, 2012

Using Redis with Spring

As NoSQL solutions are getting more and more popular for many kind of problems, more often the modern projects consider to use some (or several) of NoSQLs instead (or side-by-side) of traditional RDBMS. I have already covered my experience with MongoDB in this, this and this posts. In this post I would like to switch gears a bit towards Redis, an advanced key-value store.

Aside from very rich key-value semantics, Redis also supports pub-sub messaging and transactions. In this post I am going just to touch the surface and demonstrate how simple it is to integrate Redis into your Spring application.

As always, we will start with Maven POM file for our project:








Spring Data Redis is the another project under Spring Data umbrella which provides seamless injection of Redis into your application. The are several Redis clients for Java and I have chosen the Jedis as it is stable and recommended by Redis team at the moment of writing this post.

We will start with simple configuration and introduce the necessary components first. Then as we move forward, the configuration will be extended a bit to demonstrated pub-sub capabilities. Thanks to Java config support, we will create the configuration class and have all our dependencies strongly typed, no XML anymore:

package com.example.redis.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class AppConfig {
 JedisConnectionFactory jedisConnectionFactory() {
  return new JedisConnectionFactory();

 RedisTemplate< String, Object > redisTemplate() {
  final RedisTemplate< String, Object > template =  new RedisTemplate< String, Object >();
  template.setConnectionFactory( jedisConnectionFactory() );
  template.setKeySerializer( new StringRedisSerializer() );
  template.setHashValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
  template.setValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );
  return template;
That's basically everything we need assuming we have single Redis server up and running on localhost with default configuration. Let's consider several common uses cases: setting a key to some value, storing the object and, finally, pub-sub implementation. Storing and retrieving a key/value pair is very simple:
@Autowired private RedisTemplate< String, Object > template;

public Object getValue( final String key ) {
    return template.opsForValue().get( key );

public void setValue( final String key, final String value ) {
    template.opsForValue().set( key, value );
Optionally, the key could be set to expire (yet another useful feature of Redis), f.e. let our keys expire in 1 second:
public void setValue( final String key, final String value ) {
    template.opsForValue().set( key, value );
    template.expire( key, 1, TimeUnit.SECONDS );
Arbitrary objects could be saved into Redis as hashes (maps), f.e. let save instance of some class User
public class User {
 private final Long id;
 private String name;
 private String email;
    // Setters and getters are omitted for simplicity
into Redis using key pattern "user:<id>":
public void setUser( final User user ) {
 final String key = String.format( "user:%s", user.getId() );
 final Map< String, Object > properties = new HashMap< String, Object >();

 properties.put( "id", user.getId() );
 properties.put( "name", user.getName() );
 properties.put( "email", user.getEmail() );

 template.opsForHash().putAll( key, properties);
Respectively, object could easily be inspected and retrieved using the id.
public User getUser( final Long id ) {
 final String key = String.format( "user:%s", id );

 final String name = ( String )template.opsForHash().get( key, "name" );
 final String email = ( String )template.opsForHash().get( key, "email" );

 return new User( id, name, email );
There are much, much more which could be done using Redis, I highly encourage to take a look on it. It surely is not a silver bullet but could solve many challenging problems very easy. Finally, let me show how to use a pub-sub messaging with Redis. Let's add a bit more configuration here (as part of AppConfig class):
MessageListenerAdapter messageListener() {
 return new MessageListenerAdapter( new RedisMessageListener() );

RedisMessageListenerContainer redisContainer() {
 final RedisMessageListenerContainer container = new RedisMessageListenerContainer();

 container.setConnectionFactory( jedisConnectionFactory() );
 container.addMessageListener( messageListener(), new ChannelTopic( "my-queue" ) );

 return container;
The style of message listener definition should look very familiar to Spring users: generally, the same approach we follow to define JMS message listeners. The missed piece is our RedisMessageListener class definition:
package com.example.redis.impl;


public class RedisMessageListener implements MessageListener {
 public void onMessage(Message message, byte[] paramArrayOfByte) {
  System.out.println( "Received by RedisMessageListener: " + message.toString() );
Now, when we have our message listener, let see how we could push some messages into the queue using Redis. As always, it's pretty simple:
@Autowired private RedisTemplate< String, Object > template;

public void publish( final String message ) {
  new RedisCallback< Long >() {
   @SuppressWarnings( "unchecked" )
   public Long doInRedis( RedisConnection connection ) throws DataAccessException {
    return connection.publish(
     ( ( RedisSerializer< String > )template.getKeySerializer() ).serialize( "queue" ),
     ( ( RedisSerializer< Object > )template.getValueSerializer() ).serialize( message ) );
That's basically it for very quick introduction but definitely enough to fall in love with Redis.

Saturday, April 28, 2012

JSON for polymorphic Java object serialization

For a long time now JSON is a de facto standard for all kinds of data serialization between client and server. Among other, its strengths are simplicity and human-readability. But with simplicity comes some limitations, one of them I would like to talk about today: storing and retrieving polymorphic Java objects.

Let's start with simple problem: a hierarchy of filters. There is one abstract class AbstractFilter and two subclasses, RegexFilter and StringMatchFilter.

package bean.json.examples;

public abstract class AbstractFilter {
    public abstract void filter();

Here is RegexFilter class:

package bean.json.examples;

public class RegexFilter extends AbstractFilter {
    private String pattern;

    public RegexFilter( final String pattern ) {
        this.pattern = pattern;

    public void setPattern( final String pattern ) {
        this.pattern = pattern;

    public String getPattern() {
        return pattern;

    public void filter() {
        // Do some work here

And here is StringMatchFilter class:

package bean.json.examples;

public class StringMatchFilter extends AbstractFilter {
    private String[] matches;
    private boolean caseInsensitive;

    public StringMatchFilter() {

    public StringMatchFilter( final String[] matches, final boolean caseInsensitive ) {
        this.matches = matches;
        this.caseInsensitive = caseInsensitive;

    public String[] getMatches() {
        return matches;

    public void setCaseInsensitive( final boolean caseInsensitive ) {
        this.caseInsensitive = caseInsensitive;

    public void setMatches( final String[] matches ) {
        this.matches = matches;

    public boolean isCaseInsensitive() {
        return caseInsensitive;

    public void filter() {
        // Do some work here

Nothing fancy, pure Java beans. Now what if we need to store list of AbstractFilter instances to JSON, and more importantly, to reconstruct this list back from JSON? Following class Filters demonstrates what I mean:

package bean.json.examples;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

public class Filters {
    private Collection< AbstractFilter > filters = new ArrayList< AbstractFilter >();

    public Filters() {

    public Filters( final AbstractFilter ... filters ) {
        this.filters.addAll( Arrays.asList( filters ) );

    public Collection< AbstractFilter > getFilters() {
        return filters;

    public void setFilters( final Collection< AbstractFilter > filters ) {
        this.filters = filters;

As JSON is textual, platform-independent format, it doesn't carry any type specific information. Thanks to awesome Jackson JSON processor it could be easily done. So let's add Jackson JSON processor to our POM file:





Having this step done, we need to tell Jackson that we have an intention to store the type information together with our objects in JSON so it would be possible to reconstruct exact objects from JSON later. Few annotations on AbstractFilter do exactly that.

import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonSubTypes.Type;
import org.codehaus.jackson.annotate.JsonTypeInfo;
import org.codehaus.jackson.annotate.JsonTypeInfo.Id;

@JsonTypeInfo( use = Id.NAME )
        @Type( name = "Regex", value = RegexFilter.class ),
        @Type( name = "StringMatch", value = StringMatchFilter.class )
public abstract class AbstractFilter {
    // ...

And ... that's it! Following helper class does the dirty job of serializing filters to string and deserializing them back from string using Jackson's ObjectMapper:

package bean.json.examples;



public class FilterSerializer {
    private final ObjectMapper mapper = new ObjectMapper();

    public String serialize( final Filters filters ) {
        final StringWriter writer = new StringWriter();
        try {
            mapper.writeValue( writer, filters );
            return writer.toString();
        } catch( final IOException ex ) {
            throw new RuntimeException( ex.getMessage(), ex );
        } finally {
            try { writer.close(); } catch ( final IOException ex ) { /* Nothing to do here */ }

    public Filters deserialize( final String str ) {
        final StringReader reader = new StringReader( str );
        try {
            return mapper.readValue( reader, Filters.class );
        } catch( final IOException ex ) {
            throw new RuntimeException( ex.getMessage(), ex );
        } finally {

Let's see this in action. Following code example

final String json = new FilterSerializer().serialize(
    new Filters(
        new RegexFilter( "\\d+" ),
        new StringMatchFilter( new String[] { "String1", "String2" }, true )
produces following JSON:
{ "filters":

As you can see, each entry in "filters" collection has property "@type" which has the value we have specified by annotating AbstractFilter class. Calling new FilterSerializer().deserialize( json ) produces exactly the same Filters object instance.

Sunday, April 8, 2012

Using Delayed queues in practice

Often there are use cases when you have some kind of work or job queue and there is a need not to handle each work item or job immediately but with some delay. For example, if user clicks a button which triggers some work to be done, and one second later user realizes he / she was mistaken and job shouldn't start at all. Or, f.e. there could be a use case when some work elements in a queue should be removed after some delay (expiration).

There are a lot of implementations out there, but one I would like to describe is using pure JDK concurrent framework classes: DelayedQueue and Delayed interface.

Let me start with simple (and empty) interface which defines the work item. I am skipping the implementation details like properties and methods as those are not important.

package com.example.delayed;

public interface WorkItem {
   // Some properties and methods here
The next class in our model will represent the postponed work item and implement Delayed interface. There are just few basic concepts to take into account: the delay itself and the actual time the respective work item has been submitted. This is how expiration would be calculated. So let's do that by introducing PostponedWorkItem class.
package com.example.delayed;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class PostponedWorkItem implements Delayed {
    private final long origin;
    private final long delay;
    private final WorkItem workItem;

    public PostponedWorkItem( final WorkItem workItem, final long delay ) {
        this.origin = System.currentTimeMillis();
        this.workItem = workItem;
        this.delay = delay;

    public long getDelay( TimeUnit unit ) {
        return unit.convert( delay - ( System.currentTimeMillis() - origin ), 
                TimeUnit.MILLISECONDS );

    public int compareTo( Delayed delayed ) {
        if( delayed == this ) {
            return 0;

        if( delayed instanceof PostponedWorkItem ) {
            long diff = delay - ( ( PostponedWorkItem )delayed ).delay;
            return ( ( diff == 0 ) ? 0 : ( ( diff < 0 ) ? -1 : 1 ) );

        long d = ( getDelay( TimeUnit.MILLISECONDS ) - delayed.getDelay( TimeUnit.MILLISECONDS ) );
        return ( ( d == 0 ) ? 0 : ( ( d < 0 ) ? -1 : 1 ) );
As you can see, we create new instance of the class and save the current system time in internal origin property. The getDelayed method calculates the actual time left before work item gets expired. The delay is external setting which comes as constructor parameter. The mandatory implementation of Comparable<Delayed> is required as Delayed extends this interface.

Now, we are mostly done! To complete the example, let's make sure that same work item won't be submitted twice to the work queue by implementing equals and hashCode (implemenation is pretty trivial and should not require any comments).

public class PostponedWorkItem implements Delayed {

    public int hashCode() {
        final int prime = 31;

        int result = 1;
        result = prime * result + ( ( workItem == null ) ? 0 : workItem.hashCode() );

        return result;

    public boolean equals( Object obj ) {
        if( this == obj ) {
            return true;

        if( obj == null ) {
            return false;

        if( !( obj instanceof PostponedWorkItem ) ) {
            return false;

        final PostponedWorkItem other = ( PostponedWorkItem )obj;
        if( workItem == null ) {
            if( other.workItem != null ) {
                return false;
        } else if( !workItem.equals( other.workItem ) ) {
            return false;

        return true;
The last step is to introduce some kind of manager which will scheduled work items and periodically polls out expired ones: meet WorkItemScheduler class.
package com.example.delayed;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

public class WorkItemScheduler {
    private final long delay = 2000; // 2 seconds

    private final BlockingQueue< PostponedWorkItem > delayed =
            new DelayQueue< PostponedWorkItem >(); 

    public void addWorkItem( final WorkItem workItem ) {
        final PostponedWorkItem postponed = new PostponedWorkItem( workItem, delay );
        if( !delayed.contains( postponed )) {
            delayed.offer( postponed );

    public void process() {
        final Collection< PostponedWorkItem > expired = new ArrayList< PostponedWorkItem >();
        delayed.drainTo( expired );

        for( final PostponedWorkItem postponed: expired ) {
            // Do some real work here with postponed.getWorkItem()
Usage of BlockingQueue guarantees thread safety and high level of concurrency. The process method should be run periodically in order to drain work items queue. It could be annotated by @Scheduled annotation from Spring Framework or by EJB's @Schedule annotation from JEE 6.


Tuesday, February 28, 2012

Simple but powerful DSL using Groovy

In one of my projects we had very complicated domain model, which included more than hundred of different domain object types. It was a pure Java project and, honestly, Java is very verbose with respect to object instantiation, initialization and setting properties. Suddenly, the new requirement to allow users define and use own object models came up. So ... the journey begun.

We ended up with the idea that some kind of domain language for describing all those object types and relations is required. Here Groovy came on rescue. In this post I would like to demonstrate how powerful and expressive could be simple DSL written using Groovy builders.

As always, let's start with POM file for our sample project:








I will use the latest Groovy version, 1.8.4. Our domain model will include three classes: Organization, User and Group. Each Organization has a mandatory name, some users and some groups. Each group can have some users as members. Pretty simple, so here are our Java classes.

package com.example;

import java.util.Collection;

public class Organization {
 private String name;
 private Collection< User > users = new ArrayList< User >();
 private Collection< Group > groups = new ArrayList< Group >();
   public String getName() {
  return name;

 public void setName( final String name ) { = name;

 public Collection< Group > getGroups() {
  return groups;

 public void setGroups( final Collection< Group > groups ) {
  this.groups = groups;

 public Collection< User > getUsers() {
  return users;

 public void setUsers( final Collection< User > users ) {
  this.users = users;

package com.example;

public class User {
 private String name;

 public String getName() {
  return name;

 public void setName( final String name ) { = name;

Group .java

package com.example;

import java.util.Collection;

public class Group {
 private String name;
 private Collection< User > users = new ArrayList< User >();

 public void setName( final String name ) { = name;

 public String getName() {
  return name;

 public Collection< User > getUsers() {
  return users;

 public void setUsers( final Collection< User > users ) {
  this.users = users;
Now, we have our domain model. Let think about the way regular user can describe own organization with users, groups and relations between all these objects. Primarily, we taking about some kind of human readable language simple enough for regular user to understand. Meet Groovy builders.
package com.example.dsl.samples

class SampleOrganization {
 def build() {
  def builder = new ObjectGraphBuilder(
   classLoader: SampleOrganization.class.classLoader,
   classNameResolver: "com.example"

  return builder.organization(
   name: "Sample Organization"
  ) {
   users = [
     id: "john",
     name: "John"

     id: "samanta",
     name: "Samanta"

     id: "tom",
     name: "Tom"

   groups = [
     id: "administrators",
     name: "administrators",
     users: [ john, tom ]
     id: "managers",
     name: "managers",
     users: [ samanta ]
And here is small test case which verifies that our domain model is as expected:
package com.example.dsl

import static org.junit.Assert.assertEquals
import static org.junit.Assert.assertNotNull

import org.junit.Test

import com.example.dsl.samples.SampleOrganization

class BuilderTestCase {
 void 'build organization and verify users, groups' () {
  def organization = new SampleOrganization().build()

  assertEquals 3, organization.users.size()
  assertEquals 2, organization.groups.size()
  assertEquals "Sample Organization",
I am using this simple DSL again and again across many projects. It's really simplifies a lot complex object models creation.

Wednesday, January 25, 2012

Storing hierarchical data in MongoDB

Continuing NoSQL journey with MongoDB, I would like to touch one specific use case which comes up very often: storing hierarchical document relations. MongoDB is awesome document data store but what if documents have parent-child relationships? Can we effectively store and query such document hierarchies? The answer, for sure, is yes, we can. MongoDB has several recommendations how to store Trees in MongoDB. The one solution described there as well and quite widely used is using materialized path.

Let me explain how it works by providing very simple examples. As in previous posts, we will build Spring application using recently released version 1.0 of Spring Data MongoDB project. Our POM file contains very basic dependencies, nothing more.












To properly configure Spring context, I will use configuration approach utilizing Java classes. I am more and more advocating to use this style as it provides strong typed configuration and most of the mistakes could be caught on compilation time, no need to inspect your XML files anymore. Here how it looks like:

package com.example.mongodb.hierarchical;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class AppConfig {
    public MongoFactoryBean mongo() {
        final MongoFactoryBean factory = new MongoFactoryBean();
        factory.setHost( "localhost" );
        return factory;

    public SimpleMongoDbFactory mongoDbFactory() throws Exception{
        return new SimpleMongoDbFactory( mongo().getObject(), "hierarchical" );

    public MongoTemplate mongoTemplate() throws Exception {
        return new MongoTemplate( mongoDbFactory() );

    public IDocumentHierarchyService documentHierarchyService() throws Exception {
        return new DocumentHierarchyService( mongoTemplate() );

That's pretty nice and clear. Thanks, Spring guys! Now, all boilerplate stuff is ready. Let's move to interesting part: documents. Our database will contain 'documents' collection which stores documents of type SimpleDocument. We describe this using Spring Data MongoDB annotations for SimpleDocument POJO.

package com.example.mongodb.hierarchical;

import java.util.Collection;
import java.util.HashSet;


@Document( collection = "documents" )
public class SimpleDocument {
    public static final String PATH_SEPARATOR = ".";

    @Id private String id;
    @Field private String name;
    @Field private String path;

    // We won't store this collection as part of document but will build it on demand
    @Transient private Collection< SimpleDocument > documents = new HashSet< SimpleDocument >();

    public SimpleDocument() {

    public SimpleDocument( final String id, final String name ) { = id; = name;
        this.path = id;

    public SimpleDocument( final String id, final String name, final SimpleDocument parent ) {
        this( id, name );
        this.path = parent.getPath() + PATH_SEPARATOR + id;

    public String getId() {
        return id;

    public void setId(String id) { = id;

    public String getName() {
        return name;

    public void setName(String name) { = name;

    public String getPath() {
        return path;

    public void setPath(String path) {
        this.path = path;

    public Collection< SimpleDocument > getDocuments() {
        return documents;

Let me explain few things here. First, magic property path: this is a key to construct and query through our hierarchy. Path contains identifiers of all document's parents, usually divided by some kind of separator, in our case just . (dot). Storing document hierarchical relationships in this way allows quickly build hierarchy, search and navigate. Second, notice transient documents collection: this non-persistent collection is constructed by persistent provider and contains all descendant documents (which, in case, also contain own descendants). Let see it in action by looking into find method implementation:

package com.example.mongodb.hierarchical;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;


public class DocumentHierarchyService {
    private MongoOperations template;

    public DocumentHierarchyService( final MongoOperations template ) {
        this.template = template;

    public SimpleDocument find( final String id ) {
        final SimpleDocument document = template.findOne(
            Query.query( new Criteria( "id" ).is( id ) ),

        if( document == null ) {
            return document;

        return build(
                Query.query( new Criteria( "path" ).regex( "^" + id + "[.]" ) ),

    private SimpleDocument build( final SimpleDocument root, final Collection< SimpleDocument > documents ) {
        final Map< String, SimpleDocument > map = new HashMap< String, SimpleDocument >();

        for( final SimpleDocument document: documents ) {
            map.put( document.getPath(), document );

        for( final SimpleDocument document: documents ) {
            map.put( document.getPath(), document );

            final String path = document
                .substring( 0, document.getPath().lastIndexOf( SimpleDocument.PATH_SEPARATOR ) );

            if( path.equals( root.getPath() ) ) {
                root.getDocuments().add( document );
            } else {
                final SimpleDocument parent = map.get( path );
                if( parent != null ) {
                    parent.getDocuments().add( document );

        return root;

As you can see, to get single document with a whole hierarchy we need to run just two queries (but more optimal algorithm could reduce it to just one single query). Here is a sample hierarchy and the the result of reading root document from MongoDB

template.dropCollection( SimpleDocument.class );

final SimpleDocument parent = new SimpleDocument( "1", "Parent 1" );
final SimpleDocument child1 = new SimpleDocument( "2", "Child 1.1", parent );
final SimpleDocument child11 = new SimpleDocument( "3", "Child 1.1.1", child1 );
final SimpleDocument child12 = new SimpleDocument( "4", "Child 1.1.2", child1 );
final SimpleDocument child121 = new SimpleDocument( "5", "Child", child12 );
final SimpleDocument child13 = new SimpleDocument( "6", "Child 1.1.3", child1 );
final SimpleDocument child2 = new SimpleDocument( "7", "Child 1.2", parent );

template.insertAll( Arrays.asList( parent, child1, child11, child12, child121, child13, child2 ) );


final ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
final IDocumentHierarchyService service = context.getBean( IDocumentHierarchyService.class );

final SimpleDocument document = service.find( "1" );
//  Printing document show following hierarchy:
//  Parent 1
//   |-- Child 1.1
//     |-- Child 1.1.1
//     |-- Child 1.1.3
//     |-- Child 1.1.2
//       |-- Child
//   |-- Child 1.2

That's it. Simple a powerful concept. Sure, adding index on a path property will speed up query significantly. There are a plenty of improvements and optimizations but basic idea should be clear now.