Continuing to discover the powerful set of Redis features, the one worth mentioning about is out of the box support of pub/sub messaging.
Pub/Sub messaging is essential part of many software architectures. Some software systems demand from messaging solution to provide high-performance, scalability, queues persistence and durability, fail-over support, transactions, and many more nice-to-have features, which in Java world mostly always leads to using one of JMS implementation providers. In my previous projects I have actively used Apache ActiveMQ (now moving towards Apache ActiveMQ Apollo). Though it's a great implementation, sometimes I just needed simple queuing support and Apache ActiveMQ just looked overcomplicated for that.
Alternatives? Please welcome Redis pub/sub! If you are already using Redis as key/value store, few additional lines of configuration will bring pub/sub messaging to your application in no time.
Spring Data Redis project abstracts very well Redis pub/sub API and provides the model so familiar to everyone who uses Spring capabilities to integrate with JMS.
As always, let's start with the POM configuration file. It's pretty small and simple, includes necessary Spring dependencies, Spring Data Redis and Jedis, great Java client for Redis.
Moving on to configuring Spring context, let's understand what we need to have in order for a publisher to publish some messages and for a consumer to consume them. Knowing the respective Spring abstractions for JMS will help a lot with that.4.0.0 com.example.spring redis 0.0.1-SNAPSHOT jar UTF-8 3.1.1.RELEASE org.springframework.data spring-data-redis 1.0.1.RELEASE cglib cglib-nodep 2.2 log4j log4j 1.2.16 redis.clients jedis 2.0.0 jar org.springframework spring-core ${spring.version} org.springframework spring-context ${spring.version} org.apache.maven.plugins maven-compiler-plugin 2.3.2 1.6
- we need connection factory -> JedisConnectionFactory
- we need a template for publisher to publish messages -> RedisTemplate
- we need a message listener for consumer to consume messages -> RedisMessageListenerContainer
package com.example.redis.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.GenericToStringSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.scheduling.annotation.EnableScheduling; import com.example.redis.IRedisPublisher; import com.example.redis.impl.RedisMessageListener; import com.example.redis.impl.RedisPublisherImpl; @Configuration @EnableScheduling public class AppConfig { @Bean JedisConnectionFactory jedisConnectionFactory() { return new JedisConnectionFactory(); } @Bean RedisTemplate< String, Object > redisTemplate() { final RedisTemplate< String, Object > template = new RedisTemplate< String, Object >(); template.setConnectionFactory( jedisConnectionFactory() ); template.setKeySerializer( new StringRedisSerializer() ); template.setHashValueSerializer( new GenericToStringSerializer< Object >( Object.class ) ); template.setValueSerializer( new GenericToStringSerializer< Object >( Object.class ) ); return template; } @Bean MessageListenerAdapter messageListener() { return new MessageListenerAdapter( new RedisMessageListener() ); } @Bean RedisMessageListenerContainer redisContainer() { final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory( jedisConnectionFactory() ); container.addMessageListener( messageListener(), topic() ); return container; } @Bean IRedisPublisher redisPublisher() { return new RedisPublisherImpl( redisTemplate(), topic() ); } @Bean ChannelTopic topic() { return new ChannelTopic( "pubsub:queue" ); } }
Very easy and straightforward. The presence of @EnableScheduling annotation is not necessary and is required only for our publisher implementation: the publisher will publish a string message every 100 ms.
package com.example.redis.impl; import java.util.concurrent.atomic.AtomicLong; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.scheduling.annotation.Scheduled; import com.example.redis.IRedisPublisher; public class RedisPublisherImpl implements IRedisPublisher { private final RedisTemplate< String, Object > template; private final ChannelTopic topic; private final AtomicLong counter = new AtomicLong( 0 ); public RedisPublisherImpl( final RedisTemplate< String, Object > template, final ChannelTopic topic ) { this.template = template; this.topic = topic; } @Scheduled( fixedDelay = 100 ) public void publish() { template.convertAndSend( topic.getTopic(), "Message " + counter.incrementAndGet() + ", " + Thread.currentThread().getName() ); } }
And finally our message listener implementation (which just prints message on a console).
package com.example.redis.impl; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; public class RedisMessageListener implements MessageListener { @Override public void onMessage( final Message message, final byte[] pattern ) { System.out.println( "Message received: " + message.toString() ); } }
Awesome, just two small classes, one configuration to wire things together and we have full pub/sub messaging support in our application! Let's run the application as standalone ...
package com.example.redis; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import com.example.redis.config.AppConfig; public class RedisPubSubStarter { public static void main(String[] args) { new AnnotationConfigApplicationContext( AppConfig.class ); } }... and see following output in a console:
... Message received: Message 1, pool-1-thread-1 Message received: Message 2, pool-1-thread-1 Message received: Message 3, pool-1-thread-1 Message received: Message 4, pool-1-thread-1 Message received: Message 5, pool-1-thread-1 Message received: Message 6, pool-1-thread-1 Message received: Message 7, pool-1-thread-1 Message received: Message 8, pool-1-thread-1 Message received: Message 9, pool-1-thread-1 Message received: Message 10, pool-1-thread-1 Message received: Message 11, pool-1-thread-1 Message received: Message 12, pool-1-thread-1 Message received: Message 13, pool-1-thread-1 Message received: Message 14, pool-1-thread-1 Message received: Message 15, pool-1-thread-1 Message received: Message 16, pool-1-thread-1 ...Great! There is much more which you could do with Redis pub/sub, excellent documentation is available for you on Redis official web site.
5 comments:
Hi Adriy,
Thanks for the blog, it's quite helpful. I setup your example in my dev environment and got it running successfully. One thing I couldn't make out, though, is the place where number number of messages published by the publisher is set. Could you throw some light? May be it is a silly thing and I am not able to see the obvious.
Thank you
Neelesh
Sorry for the stupid question. I see that the example runs infinitely. When I ran it the first time, I interrupted it but forgot about it. When I looked back at the console output, I misunderstood it that it exited on its own after publishing and consuming about 72 messages.
Thank you
Neelesh
Hi Neelesh,
Thank you for your comments. I am glad you have figured out everything yourself. Please don't hesitate to ask questions.
Thank you!
Best Regards,
Andriy Redko
Hi,
If while procession i encountered with exception and I want to reprocess the same message.
Is there any way to re-queue the message in case of any failure.
or If u can suggest another approach to handle this scenario.
Thank
Anurag
Hi Anurag,
Thanks a lot for your comment. Redis is a pretty simple pub/sub solution and does not offer the out of the box option of re-queuing the messages in case of exceptions. However, it could be supported on application side by implementing the republishing on the topic(s) when some kind of exceptions occur.
Thanks!
Best Regards,
Andriy Redko
Post a Comment