Saturday, June 29, 2013

Easy Messaging with STOMP over WebSockets using ActiveMQ and HornetQ

Messaging is an extremely powerful tool for building distributed software systems of different levels. Typically, at least in Java ecosystem, the client (front-end) never interacts with message broker (or exchange) directly but does it by invoking server-side (back-end) services. Or client may not even be aware that there's messaging solution in place.

With Websockets gaining more and more adoption, and wide support of the text-oriented protocols like STOMP (used to communicate with message broker or exchange) are going to make a difference. Today's post will try to explain how simple it is to expose two very popular JMS implementations, Apache ActiveMQ and JBoss HornetQ, to be available to web front-end (JavaScript) using STOMP over Websockets.

Before digging into the code, one might argue that it's not a good idea to do that. So what's the purpose? The answer really depends:

  • you are developing prototype / proof of concept and need easy way to integrate publish/subscribe or peer-to-peer messaging
  • you don't want / need to build sophisticated architecture and the simplest solution which works is just enough
The scalability, fail-over and a lot of other very important decisions are not taken into consideration here but definitely should be if you are developing robust and resilient architecture.

So let's get started. As always, it's better to start with problem we're trying to solve: we would like to develop simple publish/subscribe solution where web client written in JavaScript will be able to send messages and listen for a specific topic. Whenever any message has been received, client just shows simple alert window. Please note that we need to use modern browser which supports Websockets, such as Google Chrome or Mozilla Firefox.

For both our examples client's code remains the same and so let's start with that. The great starting point is STOMP Over WebSocket article which introduces the stomp.js module and here is our index.html:




Extremely simple code but few details are worth to explain. First, we are looking for Websockets endpoint at ws://localhost:61614/stomp. It's sufficient for local deployment but better to replace localhost with real IP address or host name. Secondly, once connected, client subscribes to the topic (only interested in messages with priority: 9) and publishes the message to this topic immediately after. From client prospective, we are done.

Let's move on to message broker and our first one in list is Apache ActiveMQ. To make the example simple, we will embed Apache ActiveMQ broker into simple Spring application without using configuration XML files. As source code is available on GitHub, I will skip the POM file snippet and just show the code:

package com.example.messaging;

import java.util.Collections;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();    
        broker.addConnector( "ws://localhost:61614" ); 
        broker.setPersistent( false );
        broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );
  
        final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );
        broker.setDestinations( new ActiveMQDestination[] { topic }  );
  
        final ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector( true );
        broker.setManagementContext( managementContext );
  
        return broker;
    }
}

As we can see, the ActiveMQ broker is configured with ws://localhost:61614 connector which assumes using STOMP protocol. Also, we are creating JMS topic with name jms.topic.test and enabling JMX management instrumentation. And to run it, simple Starter class:

package com.example.messaging;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Starter  {
    public static void main( String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

Now, having it up and running, let's open index.html file in browser, we should see something like that:

Simple! For curious readers, ActiveMQ uses Jetty 7.6.7.v20120910 for Websockets support and won't work with latest Jetty distributions.

Moving on, with respect to HornetQ the implementations looks a bit different though not very complicated as well. As Starter class remains the same, the only change is the configuration:

package com.example.hornetq;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public EmbeddedJMS broker() throws Exception {
        final ConfigurationImpl configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled( false );
        configuration.setJournalType( JournalType.NIO );
        configuration.setJMXManagementEnabled( true );
        configuration.setSecurityEnabled( false );
  
        final Map< String, Object > params = new HashMap<>();
        params.put( TransportConstants.HOST_PROP_NAME, "localhost" );
        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );
        params.put( TransportConstants.PORT_PROP_NAME, "61614" );
  
        final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );
        configuration.getAcceptorConfigurations().add( stomp );
        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );
  
        final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );
        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );
  
        final JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );
  
        final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );
        jmsConfig.getTopicConfigurations().add( topicConfig );
  
        final EmbeddedJMS jmsServer = new EmbeddedJMS();
        jmsServer.setConfiguration( configuration );
        jmsServer.setJmsConfiguration( jmsConfig );
  
        return jmsServer;
    }
}

The complete source code is on GitHub. After running Starter class and openning index.html in browser, we should see very similar results:

HornetQ configuration looks a bit more verbose, however there are no additional dependencies involved except brilliant Netty framework.

For my own curiosity, I replaced the ActiveMQ broker with Apollo implementation. Though I succeeded with making it works as expected, I found the API to be very cumbersome, at least in current version 1.6, so I haven't covered it in this post.

All sources are available on GitHub: Apache ActiveMQ example and JBoss HornetQ example