Saturday, November 30, 2013

Java WebSockets (JSR-356) on Jetty 9.1

Jetty 9.1 is finally released, bringing Java WebSockets (JSR-356) to non-EE environments. It's awesome news and today's post will be about using this great new API along with Spring Framework.

JSR-356 defines concise, annotation-based model to allow modern Java web applications easily create bidirectional communication channels using WebSockets API. It covers not only server-side, but client-side as well, making this API really simple to use everywhere.

Let's get started! Our goal would be to build a WebSockets server which accepts messages from the clients and broadcasts them to all other clients currently connected. To begin with, let's define the message format, which server and client will be exchanging, as this simple Message class. We can limit ourselves to something like a String, but I would like to introduce to you the power of another new API - Java API for JSON Processing (JSR-353).

package com.example.services;

public class Message {
    private String username;
    private String message;
 
    public Message() {
    }
 
    public Message( final String username, final String message ) {
        this.username = username;
        this.message = message;
    }
 
    public String getMessage() {
        return message;
    }
 
    public String getUsername() {
        return username;
    }

    public void setMessage( final String message ) {
        this.message = message;
    }
 
    public void setUsername( final String username ) {
        this.username = username;
    }
}

To separate the declarations related to the server and the client, JSR-356 defines two basic annotations: @ServerEndpoint and @ClientEndpoit respectively. Our client endpoint, let's call it BroadcastClientEndpoint, will simply listen for messages server sends:

package com.example.services;

import java.io.IOException;
import java.util.logging.Logger;

import javax.websocket.ClientEndpoint;
import javax.websocket.EncodeException;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;

@ClientEndpoint
public class BroadcastClientEndpoint {
    private static final Logger log = Logger.getLogger( 
        BroadcastClientEndpoint.class.getName() );
 
    @OnOpen
    public void onOpen( final Session session ) throws IOException, EncodeException  {
        session.getBasicRemote().sendObject( new Message( "Client", "Hello!" ) );
    }

    @OnMessage
    public void onMessage( final Message message ) {
        log.info( String.format( "Received message '%s' from '%s'",
            message.getMessage(), message.getUsername() ) );
    }
}

That's literally it! Very clean, self-explanatory piece of code: @OnOpen is being called when client got connected to the server and @OnMessage is being called every time server sends a message to the client. Yes, it's very simple but there is a caveat: JSR-356 implementation can handle any simple objects but not the complex ones like Message is. To manage that, JSR-356 introduces concept of encoders and decoders.

We all love JSON, so why don't we define our own JSON encoder and decoder? It's an easy task which Java API for JSON Processing (JSR-353) can handle for us. To create an encoder, you only need to implement Encoder.Text< Message > and basically serialize your object to some string, in our case to JSON string, using JsonObjectBuilder.

package com.example.services;

import javax.json.Json;
import javax.json.JsonReaderFactory;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;

public class Message {
    public static class MessageEncoder implements Encoder.Text< Message > {
        @Override
        public void init( final EndpointConfig config ) {
        }
  
        @Override
        public String encode( final Message message ) throws EncodeException {
            return Json.createObjectBuilder()
                .add( "username", message.getUsername() )
                .add( "message", message.getMessage() )
                .build()
                .toString();
        }
  
        @Override
        public void destroy() {
        }
    }
}

For decoder part, everything looks very similar, we have to implement Decoder.Text< Message > and deserialize our object from string, this time using JsonReader.

package com.example.services;

import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonReaderFactory;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;

public class Message {
    public static class MessageDecoder implements Decoder.Text< Message > {
        private JsonReaderFactory factory = Json.createReaderFactory( Collections.< String, Object >emptyMap() );
  
        @Override
        public void init( final EndpointConfig config ) {
        }
  
        @Override
        public Message decode( final String str ) throws DecodeException {
            final Message message = new Message();
   
            try( final JsonReader reader = factory.createReader( new StringReader( str ) ) ) {
                final JsonObject json = reader.readObject();
                message.setUsername( json.getString( "username" ) );
                message.setMessage( json.getString( "message" ) );
            }
   
            return message;
        }
  
        @Override
        public boolean willDecode( final String str ) {
            return true;
        }
  
        @Override
        public void destroy() {
        }
    }
}

And as a final step, we need to tell the client (and the server, they share same decoders and encoders) that we have encoder and decoder for our messages. The easiest thing to do that is just by declaring them as part of @ServerEndpoint and @ClientEndpoit annotations.


import com.example.services.Message.MessageDecoder;
import com.example.services.Message.MessageEncoder;

@ClientEndpoint( encoders = { MessageEncoder.class }, decoders = { MessageDecoder.class } )
public class BroadcastClientEndpoint {
}

To make client's example complete, we need some way to connect to the server using BroadcastClientEndpoint and basically exchange messages. The ClientStarter class finalizes the picture:

package com.example.ws;

import java.net.URI;
import java.util.UUID;

import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.eclipse.jetty.websocket.jsr356.ClientContainer;

import com.example.services.BroadcastClientEndpoint;
import com.example.services.Message;

public class ClientStarter {
    public static void main( final String[] args ) throws Exception {
        final String client = UUID.randomUUID().toString().substring( 0, 8 );
  
        final WebSocketContainer container = ContainerProvider.getWebSocketContainer();    
        final String uri = "ws://localhost:8080/broadcast";  
  
        try( Session session = container.connectToServer( BroadcastClientEndpoint.class, URI.create( uri ) ) ) {
            for( int i = 1; i <= 10; ++i ) {
                session.getBasicRemote().sendObject( new Message( client, "Message #" + i ) );
                Thread.sleep( 1000 );
            }
        }
  
        // Application doesn't exit if container's threads are still running
        ( ( ClientContainer )container ).stop();
    }
}

Just couple of comments what this code does: we are connecting to WebSockets endpoint at ws://localhost:8080/broadcast, randomly picking some client name (from UUID) and generating 10 messages, every with 1 second delay (just to be sure we have time to receive them all back).

Server part doesn't look very different and at this point could be understood without any additional comments (except may be the fact that server just broadcasts every message it receives to all connected clients). Important to mention here: new instance of the server endpoint is created every time new client connects to it (that's why peers collection is static), it's a default behavior and could be easily changed.

package com.example.services;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import javax.websocket.EncodeException;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import com.example.services.Message.MessageDecoder;
import com.example.services.Message.MessageEncoder;

@ServerEndpoint( 
    value = "/broadcast", 
    encoders = { MessageEncoder.class }, 
    decoders = { MessageDecoder.class } 
) 
public class BroadcastServerEndpoint {
    private static final Set< Session > sessions = 
        Collections.synchronizedSet( new HashSet< Session >() ); 
   
    @OnOpen
    public void onOpen( final Session session ) {
        sessions.add( session );
    }

    @OnClose
    public void onClose( final Session session ) {
        sessions.remove( session );
    }

    @OnMessage
    public void onMessage( final Message message, final Session client ) 
            throws IOException, EncodeException {
        for( final Session session: sessions ) {
            session.getBasicRemote().sendObject( message );
        }
    }
}

In order this endpoint to be available for connection, we should start the WebSockets container and register this endpoint inside it. As always, Jetty 9.1 is runnable in embedded mode effortlessly:

package com.example.ws;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.springframework.web.context.ContextLoaderListener;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;

import com.example.config.AppConfig;

public class ServerStarter  {
    public static void main( String[] args ) throws Exception {
        Server server = new Server( 8080 );
        
        // Create the 'root' Spring application context
        final ServletHolder servletHolder = new ServletHolder( new DefaultServlet() );
        final ServletContextHandler context = new ServletContextHandler();

        context.setContextPath( "/" );
        context.addServlet( servletHolder, "/*" );
        context.addEventListener( new ContextLoaderListener() );   
        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );
   
        server.setHandler( context );
        WebSocketServerContainerInitializer.configureContext( context );        
        
        server.start();
        server.join(); 
    }
}

The most important part of the snippet above is WebSocketServerContainerInitializer.configureContext: it's actually creates the instance of WebSockets container. Because we haven't added any endpoints yet, the container basically sits here and does nothing. Spring Framework and AppConfig configuration class will do this last wiring for us.

package com.example.config;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.websocket.DeploymentException;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;

import org.eclipse.jetty.websocket.jsr356.server.AnnotatedServerEndpointConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.WebApplicationContext;

import com.example.services.BroadcastServerEndpoint;

@Configuration
public class AppConfig  {
    @Inject private WebApplicationContext context;
    private ServerContainer container;
 
    public class SpringServerEndpointConfigurator extends ServerEndpointConfig.Configurator {
        @Override
        public < T > T getEndpointInstance( Class< T > endpointClass ) 
                throws InstantiationException {
            return context.getAutowireCapableBeanFactory().createBean( endpointClass );   
        }
    }
 
    @Bean
    public ServerEndpointConfig.Configurator configurator() {
        return new SpringServerEndpointConfigurator();
    }
 
    @PostConstruct
    public void init() throws DeploymentException {
        container = ( ServerContainer )context.getServletContext().
            getAttribute( javax.websocket.server.ServerContainer.class.getName() );
  
        container.addEndpoint( 
            new AnnotatedServerEndpointConfig( 
                BroadcastServerEndpoint.class, 
                BroadcastServerEndpoint.class.getAnnotation( ServerEndpoint.class )  
            ) {
                @Override
                public Configurator getConfigurator() {
                    return configurator();
                }
            }
        );
    }  
}

As we mentioned earlier, by default container will create new instance of server endpoint every time new client connects, and it does so by calling constructor, in our case BroadcastServerEndpoint.class.newInstance(). It might be a desired behavior but because we are using Spring Framework and dependency injection, such new objects are basically unmanaged beans. Thanks to very well-thought (in my opinion) design of JSR-356, it's actually quite easy to provide your own way of creating endpoint instances by implementing ServerEndpointConfig.Configurator. The SpringServerEndpointConfigurator is an example of such implementation: it's creates new managed bean every time new endpoint instance is being asked (if you want single instance, you can create singleton of the endpoint as a bean in AppConfig and return it all the time).

The way we retrieve the WebSockets container is Jetty-specific: from the attribute of the context with name "javax.websocket.server.ServerContainer" (it probably might change in the future). Once container is there, we are just adding new (managed!) endpoint by providing our own ServerEndpointConfig (based on AnnotatedServerEndpointConfig which Jetty kindly provides already).

To build and run our server and clients, we need just do that:

mvn clean package
java -jar target\jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-server.jar // run server
java -jar target/jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-client.jar // run yet another client

As an example, by running server and couple of clients (I run 4 of them, '392f68ef', '8e3a869d', 'ca3a06d0', '6cb82119') you might see by the output in the console that each client receives all the messages from all other clients (including its own messages):

Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Hello!' from 'Client'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #1' from '392f68ef'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #2' from '8e3a869d'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from 'ca3a06d0'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #4' from '6cb82119'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #2' from '392f68ef'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #3' from '8e3a869d'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from 'ca3a06d0'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #5' from '6cb82119'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #3' from '392f68ef'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #4' from '8e3a869d'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #9' from 'ca3a06d0'
Nov 29, 2013 9:21:32 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #6' from '6cb82119'
Nov 29, 2013 9:21:32 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #4' from '392f68ef'
Nov 29, 2013 9:21:32 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #5' from '8e3a869d'
Nov 29, 2013 9:21:32 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #10' from 'ca3a06d0'
Nov 29, 2013 9:21:33 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from '6cb82119'
Nov 29, 2013 9:21:33 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #5' from '392f68ef'
Nov 29, 2013 9:21:33 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #6' from '8e3a869d'
Nov 29, 2013 9:21:34 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from '6cb82119'
Nov 29, 2013 9:21:34 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #6' from '392f68ef'
Nov 29, 2013 9:21:34 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from '8e3a869d'
Nov 29, 2013 9:21:35 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #9' from '6cb82119'
Nov 29, 2013 9:21:35 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from '392f68ef'
Nov 29, 2013 9:21:35 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from '8e3a869d'
Nov 29, 2013 9:21:36 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #10' from '6cb82119'
Nov 29, 2013 9:21:36 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from '392f68ef'
Nov 29, 2013 9:21:36 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #9' from '8e3a869d'
Nov 29, 2013 9:21:37 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #9' from '392f68ef'
Nov 29, 2013 9:21:37 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #10' from '8e3a869d'
Nov 29, 2013 9:21:38 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #10' from '392f68ef'
2013-11-29 21:21:39.260:INFO:oejwc.WebSocketClient:main: Stopped org.eclipse.jetty.websocket.client.WebSocketClient@3af5f6dc

Awesome! I hope this introductory blog post shows how easy it became to use modern web communication protocols in Java, thanks to Java WebSockets (JSR-356), Java API for JSON Processing (JSR-353) and great projects such as Jetty 9.1!

As always, complete project is available on GitHub.

Tuesday, October 29, 2013

Book review: "Instant Effective Caching with Ehcache" by Daniel Wind

Recently, I have had a chance to review the book "Instant Effective Caching with Ehcache" by Daniel Wind. Honestly, I do think the book justifies its title very well: constructed as a set of various recipes, it guides you step-by-step through typical scenarios by providing brief explanation along with small code snippets, clear enough to serve as a starting point (most recipes also have references to relevant sections of EhCache documentation).

If you have ever worked with EhCache, many recipes would look very familiar to you. But for a newbie or even intermediate developer, it might be very interesting to see:

More advanced examples include:

As a final note: short but useful book, not a comprehensive guide to EhCache world but rather a quick reference. Thanks to Daniel Wind for gathering all these recipes together.

Monday, October 28, 2013

Coordination and service discovery with Apache Zookeeper

Service-oriented design has proven to be a successful solution for a huge variety of different distributed systems. When used properly, it has a lot of benefits. But as number of services grows, it becomes more difficult to understand what is deployed and where. And because we are building reliable and highly-available systems, yet another question to ask: how many instances of each service are currently available?

In today's post I would like to introduce you to the world of Apache ZooKeeper - a highly reliable distributed coordination service. The number of features ZooKeeper provides is just astonishing so let us start with very simple problem to solve: we have a stateless JAX-RS service which we deploy across as many JVMs/hosts as we want. The clients of this service should be able to auto-discover all available instances and just pick one of them (or all) to perform a REST call.

Sounds like a very interesting challenge. There could be many ways to solve it but let me choose Apache ZooKeeper for that. The first step is to download Apache ZooKeeper (the current stable version at the moment of writing is 3.4.5) and unpack it. Next, we need to create a configuration file. The simple way to do that is by copying conf/zoo_sample.cfg to conf/zoo.cfg. To run, just execute:

Windows: bin/zkServer.cmd
Linux: bin/zkServer

Excellent, now Apache ZooKeeper is up and running, listening on port 2181 (default). Apache ZooKeeper itself worth of a book to explain its capabilities. But brief overview gives a very high-level picture, enough to get us started.

Apache ZooKeeper has a powerful Java API but it's quite low-level and not an easy one to use. That's why Netflix developed and open-sourced a great library called Curator to wrap native Apache ZooKeeper API into more convenient and easy to integrate framework (it's now an Apache incubator project).

Now, let's do some code! We are developing simple JAX-RS 2.0 service which returns list of people. As it will be stateless, we are able to run many instances within single host or multiple hosts, depending on system load for example. The awesome Apache CXF and Spring Framework will backed our implementation. Below is the code snippet for PeopleRestService:

package com.example.rs;

import java.util.Arrays;
import java.util.Collection;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import com.example.model.Person;

@Path( PeopleRestService.PEOPLE_PATH ) 
public class PeopleRestService {
    public static final String PEOPLE_PATH = "/people";
 
    @PostConstruct
    public void init() throws Exception {
    }
 
    @Produces( { MediaType.APPLICATION_JSON } )
    @GET
    public Collection< Person > getPeople( @QueryParam( "page") @DefaultValue( "1" ) final int page ) {
        return Arrays.asList(
            new Person( "Tom", "Bombadil" ),
            new Person( "Jim", "Tommyknockers" )
        );
    }
}

Very basic and naive implementation. Method init is empty by intention, it will be very helpful quite soon. Also, let us assume that every JAX-RS 2.0 service we're developing does support some notion of versioning, the class RestServiceDetails serves this purpose:

package com.example.config;

import org.codehaus.jackson.map.annotate.JsonRootName;

@JsonRootName( "serviceDetails" )
public class RestServiceDetails {
    private String version;
 
    public RestServiceDetails() {
    }
    
    public RestServiceDetails( final String version ) {
        this.version = version;
    }
    
    public void setVersion( final String version ) {
        this.version = version;
    }
    
    public String getVersion() {
        return version;
    }    
}

Our Spring configuration class AppConfig creates instance of JAX-RS 2.0 server with people REST service which will be hosted by Jetty container:

package com.example.config;

import java.util.Arrays;

import javax.ws.rs.ext.RuntimeDelegate;

import org.apache.cxf.bus.spring.SpringBus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

import com.example.rs.JaxRsApiApplication;
import com.example.rs.PeopleRestService;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
    public static final String SERVER_PORT = "server.port";
    public static final String SERVER_HOST = "server.host";
    public static final String CONTEXT_PATH = "rest";
 
    @Bean( destroyMethod = "shutdown" )
    public SpringBus cxf() {
        return new SpringBus();
    }
 
    @Bean @DependsOn( "cxf" )
    public Server jaxRsServer() {
        JAXRSServerFactoryBean factory = RuntimeDelegate.getInstance().createEndpoint( jaxRsApiApplication(), JAXRSServerFactoryBean.class );
        factory.setServiceBeans( Arrays.< Object >asList( peopleRestService() ) );
        factory.setAddress( factory.getAddress() );
        factory.setProviders( Arrays.< Object >asList( jsonProvider() ) );
        return factory.create();
    } 

    @Bean 
    public JaxRsApiApplication jaxRsApiApplication() {
        return new JaxRsApiApplication();
    }
 
    @Bean 
    public PeopleRestService peopleRestService() {
        return new PeopleRestService();
    }
 
    @Bean
    public JacksonJsonProvider jsonProvider() {
        return new JacksonJsonProvider();
    } 
}

And here is the class ServerStarter which runs embedded Jetty server. As we would like to host many such servers per host, the port shouldn't be hard-coded but rather provided as an argument:

package com.example;

import org.apache.cxf.transport.servlet.CXFServlet;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.springframework.web.context.ContextLoaderListener;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;

import com.example.config.AppConfig;

public class ServerStarter {
    public static void main( final String[] args ) throws Exception {
        if( args.length != 1 ) {
            System.out.println( "Please provide port number" );
            return;
        }
  
        final int port = Integer.valueOf( args[ 0 ] );
        final Server server = new Server( port );
   
        System.setProperty( AppConfig.SERVER_PORT, Integer.toString( port ) );
        System.setProperty( AppConfig.SERVER_HOST, "localhost" );
  
        // Register and map the dispatcher servlet
        final ServletHolder servletHolder = new ServletHolder( new CXFServlet() );
        final ServletContextHandler context = new ServletContextHandler();   
        context.setContextPath( "/" );
        context.addServlet( servletHolder, "/" + AppConfig.CONTEXT_PATH + "/*" );  
        context.addEventListener( new ContextLoaderListener() );
   
        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );
      
        server.setHandler( context );
        server.start();
        server.join(); 
    }
}

Nice, at this moment the boring part is over. But where Apache ZooKeeper and service discovery fit into this picture? Here is the answer: whenever new PeopleRestService service instance is deployed, it publishes (or registers) itself into Apache ZooKeeper registry, including the URL it's accessible at and service version it hosts. The clients can query Apache ZooKeeper in order to get the list of all available services and call them. The only thing services and their clients need to know is where Apache ZooKeeper is running. As I am deploying everything on my local machine, my instance is on localhost. Let's add this constant to the AppConfig class:

private static final String ZK_HOST = "localhost";

Every client maintains the persistent connection to the Apache ZooKeeper server. Whenever client dies, the connection goes down as well and Apache ZooKeeper can make a decision about availability of this particular client. To connect to Apache ZooKeeper, we have to create an instance of CuratorFramework class:

@Bean( initMethod = "start", destroyMethod = "close" )
public CuratorFramework curator() {
    return CuratorFrameworkFactory.newClient( ZK_HOST, new ExponentialBackoffRetry( 1000, 3 ) );
}

Next step is to create an instance of ServiceDiscovery class which will allow to publish service information for discovery into Apache ZooKeeper using just created CuratorFramework instance (we also would like to submit RestServiceDetails as additional metadata along with every service registration):

@Bean( initMethod = "start", destroyMethod = "close" )
public ServiceDiscovery< RestServiceDetails > discovery() {
    JsonInstanceSerializer< RestServiceDetails > serializer = 
        new JsonInstanceSerializer< RestServiceDetails >( RestServiceDetails.class );

    return ServiceDiscoveryBuilder.builder( RestServiceDetails.class )
        .client( curator() )
        .basePath( "services" )
        .serializer( serializer )
        .build();        
}

Internally, Apache ZooKeeper stores all its data as hierarchical namespace, much like standard file system does. The services path will be the base (root) path for all our services. Every service also needs to figure out which host and port it's running. We can do that by building URI specification which is included into JaxRsApiApplication class (the {port} and {scheme} will be resolved by Curator framework at the moment of service registration):

package com.example.rs;

import javax.inject.Inject;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

import org.springframework.core.env.Environment;

import com.example.config.AppConfig;
import com.netflix.curator.x.discovery.UriSpec;

@ApplicationPath( JaxRsApiApplication.APPLICATION_PATH )
public class JaxRsApiApplication extends Application {
    public static final String APPLICATION_PATH = "api";
 
    @Inject Environment environment;

    public UriSpec getUriSpec( final String servicePath ) {
        return new UriSpec( 
            String.format( "{scheme}://%s:{port}/%s/%s%s",
                environment.getProperty( AppConfig.SERVER_HOST ),
                AppConfig.CONTEXT_PATH,
                APPLICATION_PATH, 
                servicePath
            ) );   
    }
}

The last piece of the puzzle is the registration of PeopleRestService inside service discovery, and the init method comes into play here:

@Inject private JaxRsApiApplication application;
@Inject private ServiceDiscovery< RestServiceDetails > discovery; 
@Inject private Environment environment;

@PostConstruct
public void init() throws Exception {
    final ServiceInstance< RestServiceDetails > instance = 
        ServiceInstance.< RestServiceDetails >builder()
            .name( "people" )
            .payload( new RestServiceDetails( "1.0" ) )
            .port( environment.getProperty( AppConfig.SERVER_PORT, Integer.class ) )
            .uriSpec( application.getUriSpec( PEOPLE_PATH ) )
            .build();
  
    discovery.registerService( instance );
}

Here is what we have done:

  • created a service instance with name people (the complete name would be /services/people)
  • set the port to the actual value this instance is running
  • set the URI specification for this specific REST service endpoint
  • additionally, attached a payload (RestServiceDetails) with service version (though it's not used, it demonstrates the ability to pass more details)
Every new service instance we are running will publish itself under /services/people path in Apache ZooKeeper. To see everything in action, let us build and run couple of people service instances.

mvn clean package
java -jar jax-rs-2.0-service\target\jax-rs-2.0-service-0.0.1-SNAPSHOT.one-jar.jar 8080
java -jar jax-rs-2.0-service\target\jax-rs-2.0-service-0.0.1-SNAPSHOT.one-jar.jar 8081

From Apache ZooKeeper it might look like this (please notice that session UUIDs will be different):

Having two service instances up and running, let's try to consume them. From service client prospective, the first step is exactly the same: instances of CuratorFramework and ServiceDiscovery should be created (configuration class ClientConfig declares those beans), in the way we have done it above, no changes required. But instead of registering service, we will query the available ones:

package com.example.client;

import java.util.Collection;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.example.config.RestServiceDetails;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceInstance;

public class ClientStarter {
    public static void main( final String[] args ) throws Exception {
        try( final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( ClientConfig.class ) ) { 
            @SuppressWarnings("unchecked")
            final ServiceDiscovery< RestServiceDetails > discovery = 
                context.getBean( ServiceDiscovery.class );
            final Client client = ClientBuilder.newClient();
      
            final Collection< ServiceInstance< RestServiceDetails > > services = 
                discovery.queryForInstances( "people" );
            for( final ServiceInstance< RestServiceDetails > service: services ) {
                final String uri = service.buildUriSpec();
       
                final Response response = client
                    .target( uri )
                    .request( MediaType.APPLICATION_JSON )
                    .get();
       
                System.out.println( uri + ": " + response.readEntity( String.class ) );
                System.out.println( "API version: " + service.getPayload().getVersion() );
       
                response.close();
            }
        }
    }
}

Once service instances are retrieved, the REST call is being made (using awesome JAX-RS 2.0 client API) and additionally the service version is being asked (as payload contains instance of RestServiceDetails class). Let's build and run our client against two instances we have deployed previously:

mvn clean package
java -jar jax-rs-2.0-client\target\jax-rs-2.0-client-0.0.1-SNAPSHOT.one-jar.jar

The console output should show two calls to two different endpoints:

http://localhost:8081/rest/api/people: [{"email":null,"firstName":"Tom","lastName":"Bombadil"},{"email":null,"firstName":"Jim","lastName":"Tommyknockers"}]
API version: 1.0

http://localhost:8080/rest/api/people: [{"email":null,"firstName":"Tom","lastName":"Bombadil"},{"email":null,"firstName":"Jim","lastName":"Tommyknockers"}]
API version: 1.0

If we stop one or all instances, they will disappear from Apache ZooKeeper registry. The same applies if any instance crashes or becomes unresponsive.

Excellent! I guess we achieved our goal using such a great and powerful tool as Apache ZooKeeper. Thanks to its developers as well as to Curator guys for making it so easy to use Apache ZooKeeper in your applications. We have just scratched the surface of what is possible to accomplish by using Apache ZooKeeper, I strongly encourage everyone to explore its capabilities (distributed locks, caches, counters, queues, ...).

Worth to mention another great project build on top of Apache ZooKeeper from LinkedIn called Norbert. For Eclipse developers, the Eclipse plugin is also available.

All sources are available on GitHub.

Monday, September 30, 2013

Swagger: make developers love working with your REST API

As JAX-RS API is evolving, with version 2.0 released earlier this year under JSR-339 umbrella, it's becoming even more easy to create REST services using excellent Java platform.

But with great simplicity comes great responsibility: documenting all these APIs so other developers could quickly understand how to use them. Unfortunately, in this area developers are on their own: the JSR-339 doesn't help much. For sure, it would be just awesome to generate verbose and easy to follow documentation from source code, and not asking someone to write it along the development process. Sounds unreal, right? In certain extent, it really is, but help is coming in a form of Swagger.

Essentially, Swagger does a simple but very powerful thing: with a bit of additional annotations it generates the REST API descriptions (HTTP methods, path / query / form parameters, responses, HTTP error codes, ...) and even provides a simple web UI to play with REST calls to your APIs (not to mention that all this metadata is available over REST as well).

Before digging into implementation details, let's take a quick look what Swagger is from API consumer prospective. Assume you have developed a great REST service to manage people. As a good citizen, this REST service is feature-complete and provides following functionality:

  • lists all people (GET)
  • looks up person by e-mail (GET)
  • adds new person (POST)
  • updates existing person (PUT)
  • and finally removes person (DELETE)
Here is the same API from Swagger's perspective:

It looks quite pretty. Let's do more and call our REST service from Swagger UI, here this awesome framework really shines. The most complicated use-case is adding new person (POST) so this one will be looked closely.

As you can see on the snapshot above, every piece of REST service call is there:

  • description of the service
  • relative context path
  • parameters (form / path / query), required or optional
  • HTTP status codes: 201 CREATED and 409 CONFLICT
  • ready to go Try it out! to call REST service immediately (with out-of-the box parameters validation)

To complete the demo part, let me show yet another example, where REST resource is being involved (in our case, it's a simple class Person). Swagger is able to provide its properties and meaningful description together with expected response content type(s).

Looks nice! Moving on to the next part, it's all about implementation details. Swagger supports seamless integration with JAX-RS services, with just couple of additional annotations required on top of existing ones. Firstly, every single JAX-RS service which supposed to be documented should be annotated with @Api annotation, in our case:

@Path( "/people" ) 
@Api( value = "/people", description = "Manage people" )
public class PeopleRestService {
    // ...
}

Next, the same approach is applicable to REST service operations: every method which supposed to be documented should be annotated with @ApiOperation annotation, and optionally with @ApiResponses/@ApiResponse. If it accepts parameters, those should be annotated with @ApiParam annotation. Couple of examples here:

@Produces( { MediaType.APPLICATION_JSON } )
@GET
@ApiOperation( 
    value = "List all people", 
    notes = "List all people using paging", 
    response = Person.class, 
    responseContainer = "List"
)
public Collection< Person > getPeople(  
        @ApiParam( value = "Page to fetch", required = true ) 
        @QueryParam( "page") @DefaultValue( "1" ) final int page ) {
    // ...
}

And another one:

@Produces( { MediaType.APPLICATION_JSON } )
@Path( "/{email}" )
@GET
@ApiOperation( 
    value = "Find person by e-mail", 
    notes = "Find person by e-mail", 
    response = Person.class 
)
@ApiResponses( {
    @ApiResponse( code = 404, message = "Person with such e-mail doesn't exists" )    
} )
public Person getPeople( 
        @ApiParam( value = "E-Mail address to lookup for", required = true ) 
        @PathParam( "email" ) final String email ) {
    // ...
}

REST resource classes (or model classes) require special annotations: @ApiModel and @ApiModelProperty. Here is how our Person class looks like:

@ApiModel( value = "Person", description = "Person resource representation" )
public class Person {
    @ApiModelProperty( value = "Person's first name", required = true ) 
    private String email;
    @ApiModelProperty( value = "Person's e-mail address", required = true ) 
    private String firstName;
    @ApiModelProperty( value = "Person's last name", required = true ) 
    private String lastName;

    // ...
}

The last steps is to plug in Swagger into JAX-RS application. The example I have developed uses Spring Framework, Apache CXF, Swagger UI and embedded Jetty (complete project is available on Github). Integrating Swagger is a matter of adding configuration bean (swaggerConfig), one additional JAX-RS service (apiListingResourceJson) and two JAX-RS providers (resourceListingProvider and apiDeclarationProvider).

package com.example.config;

import java.util.Arrays;

import javax.ws.rs.ext.RuntimeDelegate;

import org.apache.cxf.bus.spring.SpringBus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.Environment;

import com.example.resource.Person;
import com.example.rs.JaxRsApiApplication;
import com.example.rs.PeopleRestService;
import com.example.services.PeopleService;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.wordnik.swagger.jaxrs.config.BeanConfig;
import com.wordnik.swagger.jaxrs.listing.ApiDeclarationProvider;
import com.wordnik.swagger.jaxrs.listing.ApiListingResourceJSON;
import com.wordnik.swagger.jaxrs.listing.ResourceListingProvider;

@Configuration
public class AppConfig {
    public static final String SERVER_PORT = "server.port";
    public static final String SERVER_HOST = "server.host";
    public static final String CONTEXT_PATH = "context.path";  
 
    @Bean( destroyMethod = "shutdown" )
    public SpringBus cxf() {
        return new SpringBus();
    }
 
    @Bean @DependsOn( "cxf" )
    public Server jaxRsServer() {
        JAXRSServerFactoryBean factory = RuntimeDelegate.getInstance().createEndpoint( jaxRsApiApplication(), JAXRSServerFactoryBean.class );
        factory.setServiceBeans( Arrays.< Object >asList( peopleRestService(), apiListingResourceJson() ) );
        factory.setAddress( factory.getAddress() );
        factory.setProviders( Arrays.< Object >asList( jsonProvider(), resourceListingProvider(), apiDeclarationProvider() ) );
        return factory.create();
    }
 
    @Bean @Autowired
    public BeanConfig swaggerConfig( Environment environment ) {
        final BeanConfig config = new BeanConfig();

        config.setVersion( "1.0.0" );
        config.setScan( true );
        config.setResourcePackage( Person.class.getPackage().getName() );
        config.setBasePath( 
            String.format( "http://%s:%s/%s%s",
                environment.getProperty( SERVER_HOST ),
                environment.getProperty( SERVER_PORT ),
                environment.getProperty( CONTEXT_PATH ),
                jaxRsServer().getEndpoint().getEndpointInfo().getAddress() 
            ) 
        );
  
        return config;
    }

    @Bean
    public ApiDeclarationProvider apiDeclarationProvider() {
        return new ApiDeclarationProvider();
    }
 
    @Bean
    public ApiListingResourceJSON apiListingResourceJson() {
        return new ApiListingResourceJSON();
    }
 
    @Bean
    public ResourceListingProvider resourceListingProvider() {
        return new ResourceListingProvider();
    }
 
    @Bean 
    public JaxRsApiApplication jaxRsApiApplication() {
        return new JaxRsApiApplication();
    }
 
    @Bean 
    public PeopleRestService peopleRestService() {
        return new PeopleRestService();
    }
   
    // ... 
}

In order to get rid of any possible hard-coded configuration, all parameters are passed through named properties (SERVER_PORT, SERVER_HOST and CONTEXT_PATH). Swagger exposes additional REST endpoint to provide API documentation, in our case it is accessible at: http://localhost:8080/rest/api/api-docs. It is used by Swagger UI which itself is embedded into final JAR archive and served by Jetty as static web resource.

The final piece of the puzzle is to start embedded Jetty container which glues all those parts together and is encapsulated into Starter class:

package com.example;

import org.apache.cxf.transport.servlet.CXFServlet;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.resource.Resource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.web.context.ContextLoaderListener;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;

import com.example.config.AppConfig;

public class Starter {
    private static final int SERVER_PORT = 8080;
    private static final String CONTEXT_PATH = "rest";
 
    public static void main( final String[] args ) throws Exception {
        Resource.setDefaultUseCaches( false );
  
        final Server server = new Server( SERVER_PORT );  
        System.setProperty( AppConfig.SERVER_PORT, Integer.toString( SERVER_PORT ) );
        System.setProperty( AppConfig.SERVER_HOST, "localhost" );
        System.setProperty( AppConfig.CONTEXT_PATH, CONTEXT_PATH );    

        // Configuring Apache CXF servlet and Spring listener  
        final ServletHolder servletHolder = new ServletHolder( new CXFServlet() );      
        final ServletContextHandler context = new ServletContextHandler();   
        context.setContextPath( "/" );
        context.addServlet( servletHolder, "/" + CONTEXT_PATH + "/*" );     
        context.addEventListener( new ContextLoaderListener() ); 
     
        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );
   
        // Configuring Swagger as static web resource
        final ServletHolder swaggerHolder = new ServletHolder( new DefaultServlet() );
        final ServletContextHandler swagger = new ServletContextHandler();
        swagger.setContextPath( "/swagger" );
        swagger.addServlet( swaggerHolder, "/*" );
        swagger.setResourceBase( new ClassPathResource( "/webapp" ).getURI().toString() );

        final HandlerList handlers = new HandlerList();
        handlers.addHandler( context );
        handlers.addHandler( swagger );
   
        server.setHandler( handlers );
        server.start();
        server.join(); 
    }
}

Couple of comments make thing a bit more clear: our JAX-RS services will be available under /rest/* context path while Swagger UI is available under /swagger context path. The one important note concerning Resource.setDefaultUseCaches( false ): because we are serving static web content from JAR file, we have to set this property to false as workaround for this bug.

Now, let's build and run our JAX-RS application by typing:

mvn clean package
java -jar target/jax-rs-2.0-swagger-0.0.1-SNAPSHOT.jar

In a second, Swagger UI should be available in your browser at: http://localhost:8080/swagger/

As a final note, there are a lot more to say about Swagger but I hope this simple example shows the way to make our REST services self-documented and easily consumable with minimal efforts. Many thanks to Wordnik team for that.

Source code is available on Github.

Saturday, August 31, 2013

Lightweight real-time charts with Play Framework and Scala using server-side events

Continuing a great journey with awesome Play Framework and Scala language, I would like to share yet another interesting implementation of real-time charting: this time by using lightweight server-side events instead of full-duplex WebSockets technology described previously in this post. Indeed, if you don't need a bidirectional communication but only server push, server-side events look as a very natural fit. And if you are using Play Framework, it's really easy to do as well.

Let's try to cover the same use case so it will be fair to compare both implementations: we have couple of hosts and we would like to watch CPU usage on each one in real-time (on a chart). Let's start by creating a simple Play Framework application (choosing Scala as a primary language):

play new play-sse-example

Now, when the layout of our application is ready, our next step is to create some starting web page (using Play Framework's type safe template engine) and name it as views/dashboard.scala.html. Here is how it looks like:

@(title: String, hosts: List[Host])

<!DOCTYPE html>
<html>
  <head>
    <title>@title</title>
    <link rel="stylesheet" media="screen" href="@routes.Assets.at("stylesheets/main.css")">
    <link rel="shortcut icon" type="image/png" href="@routes.Assets.at("images/favicon.png")">
    <script src="@routes.Assets.at("javascripts/jquery-1.9.0.min.js")" type="text/javascript"></script>
    <script src="@routes.Assets.at("javascripts/highcharts.js")" type="text/javascript"></script>
  </head>
    
  <body>
    <div id="hosts">
      <ul class="hosts">
        @hosts.map { host =>
          <li>               
            <a href="#" onclick="javascript:show( '@host.id' )">@host.name</a>
          </li>
        }
      </ul>
    </div>    
    <div id="content">
    </div>
  </body>
</html>

<script type="text/javascript">
function show( hostid ) {
  $('#content').trigger('unload');
 
  $("#content").load( "/host/" + hostid,
    function( response, status, xhr ) {
      if (status == "error") {   
        $("#content").html( "Sorry but there was an error:" + xhr.status + " " + xhr.statusText);
      }
    }
  )
}
</script>

The template looks exactly the same as in WebSockets example, except one single line, the purpose of this one will be explained just a bit later.

$('#content').trigger('unload');

The result of this web page is a simple list of hosts. Whenever user clicks on a host link, the host-specific view will be fetched from the server (using AJAX) and displayed. Next template is the most interesting one, views/host.scala.html, and contains a lot of important details:

@(host: Host)( implicit request: RequestHeader )

<div id="content">
  <div id="chart"></div>
 
  <script type="text/javascript">
    var charts = []   
      charts[ '@host.id' ] = new Highcharts.Chart({                 
        chart: {
          renderTo: 'chart',
          defaultSeriesType: 'spline'            
        },           
        xAxis: {
          type: 'datetime'
        },   
        series: [{
          name: "CPU",
          data: []
        }
      ]
    }); 
  </script>     
</div>

<script type="text/javascript">
  if( !!window.EventSource ) {
    var event = new EventSource("@routes.Application.stats( host.id )");
 
    event.addEventListener('message', function( event ) { 
      var datapoint = jQuery.parseJSON( event.data );
      var chart = charts[ '@host.id' ];
       
      chart.series[ 0 ].addPoint({
        x: datapoint.cpu.timestamp,
        y: datapoint.cpu.load
      }, true, chart.series[ 0 ].data.length >= 50 );
    } );

    $('#content').bind('unload',function() {
      event.close();
    });                         
  }  
</script>

The core UI component is a simple chart, built using Highcharts library. The script block at the bottom tries to create an EventSource object which is an implementation of server-side events on browser side. If browser supports server-side events, the respective connection to server-side endpoint will be created and chart will be updated on every message received from the server ('message' listener). It's a good time to explain the purpose of this construct (and it's counterpart $('#content').trigger('unload') mentioned above):

$('#content').bind('unload',function() {
  event.close();
});

Whenever user clicks on different hosts, the previous event stream should be closed and new one should be created. Not doing so leads to more and more event streams to be created, flooding browser with more and more event listeners. To overcome this, we bind an unload method to a div element with id content and call it all the time when user clicks on a host. By doing that, we close event stream all the time before opening a new one. Enough UI, let's move on to back-end.

The routing table and mostly all the code stay the same, except only two small method changes, Statistics.attach and Application.stats. Let's take a look how server push of host's CPU statistics using server-side events is implemented on controller side (and mapped to /stats/:id URL):

def stats( id: String ) = Action { request =>
  Hosts.hosts.find( _.id == id ) match {
    case Some( host ) =>
      Async { 
        Statistics.attach( host ).map { enumerator =>      
          Ok.stream( enumerator &> EventSource() ).as( "text/event-stream")
        }
      }
    case None => NoContent  
  }
}

Very short piece of code which does a lot of things. After finding the respective host by its id, we "attaching" to it by receiving the Enumerator instance: the continuous flow of CPU statistics data. The Ok.stream( enumerator &> EventSource() ).as( "text/event-stream") will transform this continuous flow of statistics data to stream of events which client is able to consume using server-side events.

To finish with server-side changes, let's take a look how "attaching" to host's statistics flow looks like:

def attach( host: Host ): Future[ Enumerator[ JsValue ] ] = {
  ( actor( host.id ) ? Connect( host ) ).map {      
    case Connected( enumerator ) => enumerator
  }
}

It's as simple as returning the Enumerator, and because we are using Akka actors, it becomes a bit more tricky with Future and asynchronous invocations. And, that's it!

In action our simple application looks like this (using Mozilla Firefox), having only Host 1 and Host 2 as an example:

Very nice and simple, and yet again, thanks a lot to Play Framework guys and the community. Complete source code is available on GitHub.

Wednesday, July 31, 2013

Easy Messaging with STOMP over WebSockets using Apollo

In my previous post I have covered couple of interesting use cases implementing STOMP messaging over Websockects using well-known message brokers, HornetQ and ActiveMQ. But the one I didn't cover is Apollo as in my own opinion its API is verbose and not expressive enough as for a Java developer. Nevertheless, the more time I spent playing with Apollo, more convinced I became that there is quite a potential out there. So this post is all about Apollo.

The problem we're trying to solve stays the same: simple publish/subscribe solution where JavaScript web client sends messages and listens for a specific topic. Whenever any message is received, client shows alert window (please note that we need to use modern browser which supports Websockets, such as Google Chrome or Mozilla Firefox).

Let's make our hands dirty by starting off with index.html (which imports awesome stomp.js JavaScript library):





The client part is not that different except topic name which is now /topic/test. The server side however differs a lot. Apollo is written is Scala and embraces asynchronous, non-blocking programming model. I think, it's a very good thing. What it brings though is a new paradigm to program against and it's also not necessarily a bad thing. The AppConfig class is the one which configures embedded Apollo broker:

package com.example.messaging;

import java.io.File;

import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.jmx.dto.JmxDTO;
import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
import org.apache.activemq.apollo.dto.BrokerDTO;
import org.apache.activemq.apollo.dto.TopicDTO;
import org.apache.activemq.apollo.dto.VirtualHostDTO;
import org.apache.activemq.apollo.dto.WebAdminDTO;
import org.apache.activemq.apollo.stomp.dto.StompDTO;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean
    public Broker broker() throws Exception {
        final Broker broker = new Broker();
    
        // Configure STOMP over WebSockects connector
        final AcceptingConnectorDTO ws = new AcceptingConnectorDTO();
        ws.id = "ws";
        ws.bind = "ws://localhost:61614";  
        ws.protocols.add( new StompDTO() );

        // Create a topic with name 'test'
        final TopicDTO topic = new TopicDTO();
        topic.id = "test";
  
        // Create virtual host (based on localhost)
        final VirtualHostDTO host = new VirtualHostDTO();
        host.id = "localhost";  
        host.topics.add( topic );
        host.host_names.add( "localhost" );
        host.host_names.add( "127.0.0.1" );
        host.auto_create_destinations = false;
  
        // Create a web admin UI (REST) accessible at: http://localhost:61680/api/index.html#!/ 
        final WebAdminDTO webadmin = new WebAdminDTO();
        webadmin.bind = "http://localhost:61680";

        // Create JMX instrumentation 
        final JmxDTO jmxService = new JmxDTO();
        jmxService.enabled = true;
  
        // Finally, glue all together inside broker configuration
        final BrokerDTO config = new BrokerDTO();
        config.connectors.add( ws );
        config.virtual_hosts.add( host );
        config.web_admins.add( webadmin );
        config.services.add( jmxService );
  
        broker.setConfig( config );
        broker.setTmp( new File( System.getProperty( "java.io.tmpdir" ) ) );
  
        broker.start( new Runnable() {   
            @Override
            public void run() {  
                System.out.println("The broker has been started started.");
            }
        } );
  
        return broker;
    }
}

I guess it becomes clear what I meant by verbose and not expressive enough but at least it's easy to follow. Firstly, we are creating Websockects connector at ws://localhost:61614 and asking it to support the STOMP protocol. Then we are creating a simple topic with name test (which we refer as /topic/test on client side). Next important step is to create a virtual host and to bind topics (and queues if any) to it. The host names list is very important as the destination resolution logic heavily relies on it. In the following step we are configuring web admin UI and JMX instrumentation which provides us with access to configuration, statistics and monitoring. To check it out, please open this URL in your web browser once Apollo broker is started. And finally, by applying configuration and starting the broker we are good to go! As you can see, asynchronous programming model leads to callbacks and anonymous functions (where are you, Java 8?).

Now, when configuration is done, it's time to look at start-up logic placed into Starter class (again, callbacks and anonymous functions are used to perform graceful shutdown logic):

package com.example.messaging;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.apollo.broker.Broker;
import org.springframework.context.annotation.ConfigurableApplicationContext;

public class Starter  {
    public static void main( String[] args ) throws Exception {
        try( ConfigurableApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class ) ) {
            final Broker broker = context.getBean( Broker.class );  
            System.out.println( "Press any key to terminate ..." );
            System.in.read();    
         
            final CountDownLatch latch = new CountDownLatch( 1 );
            broker.stop( new Runnable() { 
                @Override
                public void run() {  
                    System.out.println("The broker has been stopped.");
                    latch.countDown();
                }
            } );
         
            // Gracefully stop the broker
            if( !latch.await( 1, TimeUnit.SECONDS ) ) {
                System.out.println("The broker hasn't been stopped, exiting anyway ...");
            }
        }
    }
}

As with the previous examples, after running Starter class and opening index.html in the browser, we should see something like that:

Great, it works just fine! I am pretty sure that just rewriting the code in Scala, this Apollo API usage example will look much more compact and concise. In any case, I think Apollo message broker is definitely worth to consider if you are looking for prominent messaging architecture.

All sources are available on GitHub: Apollo example.

Saturday, June 29, 2013

Easy Messaging with STOMP over WebSockets using ActiveMQ and HornetQ

Messaging is an extremely powerful tool for building distributed software systems of different levels. Typically, at least in Java ecosystem, the client (front-end) never interacts with message broker (or exchange) directly but does it by invoking server-side (back-end) services. Or client may not even be aware that there's messaging solution in place.

With Websockets gaining more and more adoption, and wide support of the text-oriented protocols like STOMP (used to communicate with message broker or exchange) are going to make a difference. Today's post will try to explain how simple it is to expose two very popular JMS implementations, Apache ActiveMQ and JBoss HornetQ, to be available to web front-end (JavaScript) using STOMP over Websockets.

Before digging into the code, one might argue that it's not a good idea to do that. So what's the purpose? The answer really depends:

  • you are developing prototype / proof of concept and need easy way to integrate publish/subscribe or peer-to-peer messaging
  • you don't want / need to build sophisticated architecture and the simplest solution which works is just enough
The scalability, fail-over and a lot of other very important decisions are not taken into consideration here but definitely should be if you are developing robust and resilient architecture.

So let's get started. As always, it's better to start with problem we're trying to solve: we would like to develop simple publish/subscribe solution where web client written in JavaScript will be able to send messages and listen for a specific topic. Whenever any message has been received, client just shows simple alert window. Please note that we need to use modern browser which supports Websockets, such as Google Chrome or Mozilla Firefox.

For both our examples client's code remains the same and so let's start with that. The great starting point is STOMP Over WebSocket article which introduces the stomp.js module and here is our index.html:




Extremely simple code but few details are worth to explain. First, we are looking for Websockets endpoint at ws://localhost:61614/stomp. It's sufficient for local deployment but better to replace localhost with real IP address or host name. Secondly, once connected, client subscribes to the topic (only interested in messages with priority: 9) and publishes the message to this topic immediately after. From client prospective, we are done.

Let's move on to message broker and our first one in list is Apache ActiveMQ. To make the example simple, we will embed Apache ActiveMQ broker into simple Spring application without using configuration XML files. As source code is available on GitHub, I will skip the POM file snippet and just show the code:

package com.example.messaging;

import java.util.Collections;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();    
        broker.addConnector( "ws://localhost:61614" ); 
        broker.setPersistent( false );
        broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );
  
        final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );
        broker.setDestinations( new ActiveMQDestination[] { topic }  );
  
        final ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector( true );
        broker.setManagementContext( managementContext );
  
        return broker;
    }
}

As we can see, the ActiveMQ broker is configured with ws://localhost:61614 connector which assumes using STOMP protocol. Also, we are creating JMS topic with name jms.topic.test and enabling JMX management instrumentation. And to run it, simple Starter class:

package com.example.messaging;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Starter  {
    public static void main( String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

Now, having it up and running, let's open index.html file in browser, we should see something like that:

Simple! For curious readers, ActiveMQ uses Jetty 7.6.7.v20120910 for Websockets support and won't work with latest Jetty distributions.

Moving on, with respect to HornetQ the implementations looks a bit different though not very complicated as well. As Starter class remains the same, the only change is the configuration:

package com.example.hornetq;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public EmbeddedJMS broker() throws Exception {
        final ConfigurationImpl configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled( false );
        configuration.setJournalType( JournalType.NIO );
        configuration.setJMXManagementEnabled( true );
        configuration.setSecurityEnabled( false );
  
        final Map< String, Object > params = new HashMap<>();
        params.put( TransportConstants.HOST_PROP_NAME, "localhost" );
        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );
        params.put( TransportConstants.PORT_PROP_NAME, "61614" );
  
        final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );
        configuration.getAcceptorConfigurations().add( stomp );
        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );
  
        final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );
        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );
  
        final JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );
  
        final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );
        jmsConfig.getTopicConfigurations().add( topicConfig );
  
        final EmbeddedJMS jmsServer = new EmbeddedJMS();
        jmsServer.setConfiguration( configuration );
        jmsServer.setJmsConfiguration( jmsConfig );
  
        return jmsServer;
    }
}

The complete source code is on GitHub. After running Starter class and openning index.html in browser, we should see very similar results:

HornetQ configuration looks a bit more verbose, however there are no additional dependencies involved except brilliant Netty framework.

For my own curiosity, I replaced the ActiveMQ broker with Apollo implementation. Though I succeeded with making it works as expected, I found the API to be very cumbersome, at least in current version 1.6, so I haven't covered it in this post.

All sources are available on GitHub: Apache ActiveMQ example and JBoss HornetQ example

Monday, May 27, 2013

Real-time charts with Play Framework and Scala: extreme productivity on JVM for web

Being a hardcore back-end developer, whenever I am thinking about building web application with some UI on JVM platform, I feel scared. And there are reasons for that: having experience with JSF, Liferay, Grails, ... I don't want to go this road anymore. But if a need comes, is there a choice, really? I found one which I think is awesome: Play Framework.

Built on top of JVM, Play Framework allows to create web applications using Java or Scala with literally no efforts. The valuable and distinguishing differences it provides: static compilation (even for page templates), easy to start with, and concise (more about it here).

To demonstrate how amazing Play Framework is, I would like to share my experience with developing simple web application. Let's assume we have couple of hosts and we would like to watch CPU usage on each one in real-time (on a chart). When one hears "real-time", it may mean different things but in context of our application it means: using WebSockets to push data from server to client. Though Play Framework supports pure Java API, I will use some Scala instead as it makes code very compact and clear.

Let's get started! After downloading Play Framework (the latest version on the moment of writing was 2.1.1), let's create our app by typing

play new play-websockets-example
and selecting Scala as a primary language. No wonders here: it's a pretty standard way nowadays, right?

Having our application ready, next step would be to create some starting web page. Play Framework uses own type safe template engine based on Scala, it has a couple of extremely simple rules and is very easy to get started with. Here is an example of views/dashboard.scala.html:

@(title: String, hosts: List[Host])

<!DOCTYPE html>
<html>
  <head>
    <title>@title</title>
    <link rel="stylesheet" media="screen" href="@routes.Assets.at("stylesheets/main.css")">
    <link rel="shortcut icon" type="image/png" href="@routes.Assets.at("images/favicon.png")">
    <script src="@routes.Assets.at("javascripts/jquery-1.9.0.min.js")" type="text/javascript">
    <script src="@routes.Assets.at("javascripts/highcharts.js")" type="text/javascript">
  </head>
    
  <body>
    <div id="hosts">
      <ul class="hosts">
        @hosts.map { host =>
        <li>               
          <a href="#" onclick="javascript:show( '@host.id' )"><b>@host.name</b></a>
        </li>
        }
      </ul>
    </div>
  
    <div id="content">
    </div>
  </body>
</html>

<script type="text/javascript">
function show( hostid ) {
  $("#content").load( "/host/" + hostid,
    function( response, status, xhr ) {
      if (status == "error") {
        $("#content").html( "Sorry but there was an error:" + xhr.status + " " + xhr.statusText);
      }
    }
  )
}
</script>

Aside from coupe of interesting constructs (which are very well described here), it looks pretty like regular HTML with a bit of JavaScript. The result of this web page is a simple list of hosts in the browser. Whenever user clicks on a particular host, another view will be fetched from the server (using old buddy AJAX) and displayed on right side from the host. Here is the second (and the last) template, views/host.scala.html:

@(host: Host)( implicit request: RequestHeader )

<div id="content">
  <div id="chart">
  <script type="text/javascript">
    var charts = []   
    charts[ '@host.id' ] = new Highcharts.Chart({                 
      chart: {
        renderTo: 'chart',
        defaultSeriesType: 'spline'            
      },           
      xAxis: {
        type: 'datetime'
      },   
      series: [{
        name: "CPU",
        data: []
      }]
    }); 
  </script>
</div>

<script type="text/javascript">
var socket = new WebSocket("@routes.Application.stats( host.id ).webSocketURL()")
socket.onmessage = function( event ) { 
  var datapoint = jQuery.parseJSON( event.data );
  var chart = charts[ '@host.id' ]
  
  chart.series[ 0 ].addPoint({
    x: datapoint.cpu.timestamp,
    y: datapoint.cpu.load
  }, true, chart.series[ 0 ].data.length >= 50 );
}
</script>

It's looks rather as a fragment, not a complete HTML page, which has only a chart and opens the WebSockets connection with a listener. With an enormous help of Highcharts and jQuery, JavaScript programming hasn't ever been so easy for back-end developers as it's now. At this moment, the UI part is completely done. Let's move on to back-end side.

Firstly, let's define the routing table which includes only three URLs and by default is located at conf/routes:

GET     /                           controllers.Application.index
GET     /host/:id                   controllers.Application.host( id: String )
GET     /stats/:id                  controllers.Application.stats( id: String )

Having views and routes defined, it's time to fill up the last and most interesting part, the controllers which glue all parts together (actually, only one controller, controllers/Application.scala). Here is a snippet which maps index action to the view templated by views/dashboard.scala.html, it's as easy as that:

def index = Action {
  Ok( views.html.dashboard( "Dashboard", Hosts.hosts() ) )
}

The interpretation of this action may sound like that: return successful response code and render template views/dashboard.scala.html with two parameters, title and hosts, as response body. The action to handle /host/:id looks much the same:

def host( id: String ) = Action { implicit request =>
  Hosts.hosts.find( _.id == id ) match {
    case Some( host ) => Ok( views.html.host( host ) )
    case None => NoContent
  }    
}

And here is a Hosts object defined in models/Hosts.scala. For simplicity, the list of hosts is hard-coded:

package models

case class Host( id: String, name: String )

object Hosts {  
  def hosts(): List[ Host ] = {
    return List( new Host( "h1", "Host 1" ), new Host( "h2", "Host 2" ) )
  } 
}

The boring part is over, let's move on to the last but not least implementation: server push of host's CPU statistics using WebSockets. As you can see, the /stats/:id URL is already mapped to controller action so let's take a look on its implementation:

def stats( id: String ) = WebSocket.async[JsValue] { request =>
  Hosts.hosts.find( _.id == id ) match {
    case Some( host ) => Statistics.attach( host )
    case None => {
      val enumerator = Enumerator
        .generateM[JsValue]( Promise.timeout( None, 1.second ) )
        .andThen( Enumerator.eof )
      Promise.pure( ( Iteratee.ignore[JsValue], enumerator ) )
    }
  }
}

Not too much code here but in case you are curious about WebSockets in Play Framework please follow this link. This couple of lines may look a bit weird at first but once you read the documentation and understand basic design principles behind Play Framework, it will look much more familiar and friendly. The Statistics object is the one who does the real job, let's take a look on the code:

package models

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import akka.actor.ActorRef
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import play.api.Play.current
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.Iteratee
import play.api.libs.json.JsValue

case class Refresh()
case class Connect( host: Host )
case class Connected( enumerator: Enumerator[ JsValue ] )

object Statistics {
  implicit val timeout = Timeout( 5 second )
  var actors: Map[ String, ActorRef ] = Map()
  
  def actor( id: String ) = actors.synchronized {
    actors.find( _._1 == id ).map( _._2 ) match {
      case Some( actor ) => actor      
      case None => {
        val actor = Akka.system.actorOf( Props( new StatisticsActor(id) ), name = s"host-$id" )   
        Akka.system.scheduler.schedule( 0.seconds, 3.second, actor, Refresh )
        actors += ( id -> actor )
        actor
      }
    }
  }
 
  def attach( host: Host ): Future[ ( Iteratee[ JsValue, _ ], Enumerator[ JsValue ] ) ] = {
    ( actor( host.id ) ? Connect( host ) ).map {      
      case Connected( enumerator ) => ( Iteratee.ignore[JsValue], enumerator )
    }
  }
}

As always, thanks to Scala conciseness, not too much code but a lot of things are going on. As we may have hundreds of hosts, it would be reasonable to dedicate to each host own worker (not a thread) or, more precisely, own actor. For that, we will use another amazing library called Akka. The code snippet above just creates an actor for the host or uses existing one from the registry of the already created actors. Please note that the implementation is quite simplified and leaves off important details. The thoughts in right direction would be using supervisors and other advanced concepts instead of synchronized block. Also worth mentioning that we would like to make our actor a scheduled task: we ask actor system to send the actor a message Refresh every 3 seconds. That means that the charts will be updated with new values every three seconds as well.

So, when actor for a host is created, we send him a message Connect notifying that a new connection is being established. When response message Connected is received, we return from the method and at this point connection over WebSockets is about to be established. Please note that we intentionally ignore any input from the client by using Iteratee.ignore[JsValue].

And here is the StatisticsActor implementation:

package models

import java.util.Date

import scala.util.Random

import akka.actor.Actor
import play.api.libs.iteratee.Concurrent
import play.api.libs.json.JsNumber
import play.api.libs.json.JsObject
import play.api.libs.json.JsString
import play.api.libs.json.JsValue

class StatisticsActor( hostid: String ) extends Actor {
  val ( enumerator, channel ) = Concurrent.broadcast[JsValue]
  
  def receive = {
    case Connect( host ) => sender ! Connected( enumerator )       
    case Refresh => broadcast( new Date().getTime(), hostid )
  }
  
  def broadcast( timestamp: Long, id: String ) {
    val msg = JsObject( 
      Seq( 
        "id" -> JsString( id ),
        "cpu" -> JsObject( 
          Seq( 
            ( "timestamp" -> JsNumber( timestamp ) ), 
            ( "load" -> JsNumber( Random.nextInt( 100 ) ) ) 
          ) 
        )
      )
    )
     
    channel.push( msg )
  }
}

The CPU statistics is randomly generated and the actor just broadcasts it every 3 seconds as simple JSON object. On the client side, the JavaScript code parses this JSON and updates the chart. Here is how it looks like for two hosts, Host 1 and Host 2 in Mozilla Firefox:

To finish up, I am personally very excited with what I've done so far with Play Framework. It took just couple of hours to get started and another couple of hours to make things work as expected. The errors reporting and feedback cycle from running application are absolutely terrific, thanks a lot to Play Framework guys and the community around it. There are still a lot of things to learn for me but it worth doing it.

Please find the complete source code on GitHub.