Search This Blog

Friday, August 30, 2013

Spring AMQP and RabbitMQ High Availability (HA)

Recently, I finished a project to use RabbitMQ as the message broker.  It is pretty easy to set up a RabbitMQ cluster with High Availability (HA).  The RabbitMQ has the pretty good documentation on how to set up the cluster.  I also document it here on how to set up a 2-node cluster on a single Mac machine.  In this post, I would like to show how we can leverage spring-amqp to connect to RabbitMQ cluster.  I have a rabbit cluster running with 2 nodes: localhost:5673 and localhost:5672.

It is pretty easy to use spring-amqp.  I used maven to manage the dependencies:

<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.2.0.RELEASE</version>
</dependency>
view raw pom.xml hosted with ❤ by GitHub

jackson-jaxrs-json-provider is used to serialize java object to json, and deserialize json back to java object.

When creating ConnectionFactory, the addresses should be used instead of the host and port:

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("localhost:5673,localhost:5672");
return connectionFactory;
}

The addresses are the comma separated host:port pairs which consist of the cluster.

For producer, we use rabbitTemplate to send messages:

public void sendMessage() {
Employee employee = new Employee();
employee.setId(counter.incrementAndGet());
employee.setName("name-" + employee.getId());
rabbitTemplate.convertAndSend(employee);
}
view raw Producer.java hosted with ❤ by GitHub


For consumer, a MessageListenerContainer is created to consume message asynchronously:

@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(this.employeeQueueName);
container.setMessageListener(new MessageListenerAdapter(new MessageHandler(), jsonMessageConverter()));
return container;
}


The MessageHandler code is as follows:

public void handleMessage(Employee employee) {
LOG.info("handle employee with id {} and name {}", employee.getId(), employee.getName());
}

This class can be called anything you like, but the method must be called handleMessaege and with the correct signature (here it is Employee to match producer).  If you want to change the method name, you have to call:

          MessageListenerAdapter.setDefaultListenerMethod

The source code can be download from github.