Saturday, April 28, 2012

JSON for polymorphic Java object serialization

For a long time now JSON is a de facto standard for all kinds of data serialization between client and server. Among other, its strengths are simplicity and human-readability. But with simplicity comes some limitations, one of them I would like to talk about today: storing and retrieving polymorphic Java objects.

Let's start with simple problem: a hierarchy of filters. There is one abstract class AbstractFilter and two subclasses, RegexFilter and StringMatchFilter.

package bean.json.examples;

public abstract class AbstractFilter {
    public abstract void filter();
}

Here is RegexFilter class:

package bean.json.examples;

public class RegexFilter extends AbstractFilter {
    private String pattern;

    public RegexFilter( final String pattern ) {
        this.pattern = pattern;
    }

    public void setPattern( final String pattern ) {
        this.pattern = pattern;
    }

    public String getPattern() {
        return pattern;
    }

    @Override
    public void filter() {
        // Do some work here
    }
}

And here is StringMatchFilter class:

package bean.json.examples;

public class StringMatchFilter extends AbstractFilter {
    private String[] matches;
    private boolean caseInsensitive;

    public StringMatchFilter() {
    }

    public StringMatchFilter( final String[] matches, final boolean caseInsensitive ) {
        this.matches = matches;
        this.caseInsensitive = caseInsensitive;
    }

    public String[] getMatches() {
        return matches;
    }

    public void setCaseInsensitive( final boolean caseInsensitive ) {
        this.caseInsensitive = caseInsensitive;
    }

    public void setMatches( final String[] matches ) {
        this.matches = matches;
    }

    public boolean isCaseInsensitive() {
        return caseInsensitive;
    }

    @Override
    public void filter() {
        // Do some work here
    }
}

Nothing fancy, pure Java beans. Now what if we need to store list of AbstractFilter instances to JSON, and more importantly, to reconstruct this list back from JSON? Following class Filters demonstrates what I mean:

package bean.json.examples;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

public class Filters {
    private Collection< AbstractFilter > filters = new ArrayList< AbstractFilter >();

    public Filters() {
    }

    public Filters( final AbstractFilter ... filters ) {
        this.filters.addAll( Arrays.asList( filters ) );
    }

    public Collection< AbstractFilter > getFilters() {
        return filters;
    }

    public void setFilters( final Collection< AbstractFilter > filters ) {
        this.filters = filters;
    }
}

As JSON is textual, platform-independent format, it doesn't carry any type specific information. Thanks to awesome Jackson JSON processor it could be easily done. So let's add Jackson JSON processor to our POM file:


    4.0.0

    bean.json
    examples
    0.0.1-SNAPSHOT
    jar

    
        UTF-8
    

    
        
            org.codehaus.jackson
            jackson-mapper-asl
            1.9.6
        
    

Having this step done, we need to tell Jackson that we have an intention to store the type information together with our objects in JSON so it would be possible to reconstruct exact objects from JSON later. Few annotations on AbstractFilter do exactly that.

import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonSubTypes.Type;
import org.codehaus.jackson.annotate.JsonTypeInfo;
import org.codehaus.jackson.annotate.JsonTypeInfo.Id;

@JsonTypeInfo( use = Id.NAME )
@JsonSubTypes(
    {
        @Type( name = "Regex", value = RegexFilter.class ),
        @Type( name = "StringMatch", value = StringMatchFilter.class )
    }
)
public abstract class AbstractFilter {
    // ...
}

And ... that's it! Following helper class does the dirty job of serializing filters to string and deserializing them back from string using Jackson's ObjectMapper:

package bean.json.examples;

import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;

import org.codehaus.jackson.map.ObjectMapper;

public class FilterSerializer {
    private final ObjectMapper mapper = new ObjectMapper();

    public String serialize( final Filters filters ) {
        final StringWriter writer = new StringWriter();
        try {
            mapper.writeValue( writer, filters );
            return writer.toString();
        } catch( final IOException ex ) {
            throw new RuntimeException( ex.getMessage(), ex );
        } finally {
            try { writer.close(); } catch ( final IOException ex ) { /* Nothing to do here */ }
        }
    }

    public Filters deserialize( final String str ) {
        final StringReader reader = new StringReader( str );
        try {
            return mapper.readValue( reader, Filters.class );
        } catch( final IOException ex ) {
            throw new RuntimeException( ex.getMessage(), ex );
        } finally {
            reader.close();
        }
    }
}

Let's see this in action. Following code example

final String json = new FilterSerializer().serialize(
    new Filters(
        new RegexFilter( "\\d+" ),
        new StringMatchFilter( new String[] { "String1", "String2" }, true )
    )
);
produces following JSON:
{ "filters":
  [
     {"@type":"Regex","pattern":"\\d+"},
     {"@type":"StringMatch","matches":["String1","String2"],"caseInsensitive":true}
  ]
}

As you can see, each entry in "filters" collection has property "@type" which has the value we have specified by annotating AbstractFilter class. Calling new FilterSerializer().deserialize( json ) produces exactly the same Filters object instance.

Sunday, April 8, 2012

Using Delayed queues in practice

Often there are use cases when you have some kind of work or job queue and there is a need not to handle each work item or job immediately but with some delay. For example, if user clicks a button which triggers some work to be done, and one second later user realizes he / she was mistaken and job shouldn't start at all. Or, f.e. there could be a use case when some work elements in a queue should be removed after some delay (expiration).

There are a lot of implementations out there, but one I would like to describe is using pure JDK concurrent framework classes: DelayedQueue and Delayed interface.

Let me start with simple (and empty) interface which defines the work item. I am skipping the implementation details like properties and methods as those are not important.

package com.example.delayed;

public interface WorkItem {
   // Some properties and methods here
}
The next class in our model will represent the postponed work item and implement Delayed interface. There are just few basic concepts to take into account: the delay itself and the actual time the respective work item has been submitted. This is how expiration would be calculated. So let's do that by introducing PostponedWorkItem class.
package com.example.delayed;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class PostponedWorkItem implements Delayed {
    private final long origin;
    private final long delay;
    private final WorkItem workItem;

    public PostponedWorkItem( final WorkItem workItem, final long delay ) {
        this.origin = System.currentTimeMillis();
        this.workItem = workItem;
        this.delay = delay;
    }

    @Override
    public long getDelay( TimeUnit unit ) {
        return unit.convert( delay - ( System.currentTimeMillis() - origin ), 
                TimeUnit.MILLISECONDS );
    }

    @Override
    public int compareTo( Delayed delayed ) {
        if( delayed == this ) {
            return 0;
        }

        if( delayed instanceof PostponedWorkItem ) {
            long diff = delay - ( ( PostponedWorkItem )delayed ).delay;
            return ( ( diff == 0 ) ? 0 : ( ( diff < 0 ) ? -1 : 1 ) );
        }

        long d = ( getDelay( TimeUnit.MILLISECONDS ) - delayed.getDelay( TimeUnit.MILLISECONDS ) );
        return ( ( d == 0 ) ? 0 : ( ( d < 0 ) ? -1 : 1 ) );
    }
}
As you can see, we create new instance of the class and save the current system time in internal origin property. The getDelayed method calculates the actual time left before work item gets expired. The delay is external setting which comes as constructor parameter. The mandatory implementation of Comparable<Delayed> is required as Delayed extends this interface.

Now, we are mostly done! To complete the example, let's make sure that same work item won't be submitted twice to the work queue by implementing equals and hashCode (implemenation is pretty trivial and should not require any comments).

public class PostponedWorkItem implements Delayed {
    ...

    @Override
    public int hashCode() {
        final int prime = 31;

        int result = 1;
        result = prime * result + ( ( workItem == null ) ? 0 : workItem.hashCode() );

        return result;
    }

    @Override
    public boolean equals( Object obj ) {
        if( this == obj ) {
            return true;
        }

        if( obj == null ) {
            return false;
        }

        if( !( obj instanceof PostponedWorkItem ) ) {
            return false;
        }

        final PostponedWorkItem other = ( PostponedWorkItem )obj;
        if( workItem == null ) {
            if( other.workItem != null ) {
                return false;
            }
        } else if( !workItem.equals( other.workItem ) ) {
            return false;
        }

        return true;
    }
}
The last step is to introduce some kind of manager which will scheduled work items and periodically polls out expired ones: meet WorkItemScheduler class.
package com.example.delayed;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

public class WorkItemScheduler {
    private final long delay = 2000; // 2 seconds

    private final BlockingQueue< PostponedWorkItem > delayed =
            new DelayQueue< PostponedWorkItem >(); 

    public void addWorkItem( final WorkItem workItem ) {
        final PostponedWorkItem postponed = new PostponedWorkItem( workItem, delay );
        if( !delayed.contains( postponed )) {
            delayed.offer( postponed );
        }
    }

    public void process() {
        final Collection< PostponedWorkItem > expired = new ArrayList< PostponedWorkItem >();
        delayed.drainTo( expired );

        for( final PostponedWorkItem postponed: expired ) {
            // Do some real work here with postponed.getWorkItem()
        }
    }
}
Usage of BlockingQueue guarantees thread safety and high level of concurrency. The process method should be run periodically in order to drain work items queue. It could be annotated by @Scheduled annotation from Spring Framework or by EJB's @Schedule annotation from JEE 6.

Enjoy!