It is pretty easy to use spring-amqp. I used maven to manage the dependencies:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<dependency> | |
<groupId>com.fasterxml.jackson.jaxrs</groupId> | |
<artifactId>jackson-jaxrs-json-provider</artifactId> | |
<version>2.2.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.amqp</groupId> | |
<artifactId>spring-rabbit</artifactId> | |
<version>1.2.0.RELEASE</version> | |
</dependency> |
jackson-jaxrs-json-provider is used to serialize java object to json, and deserialize json back to java object.
When creating ConnectionFactory, the addresses should be used instead of the host and port:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
public ConnectionFactory connectionFactory() { | |
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); | |
connectionFactory.setAddresses("localhost:5673,localhost:5672"); | |
return connectionFactory; | |
} |
The addresses are the comma separated host:port pairs which consist of the cluster.
For producer, we use rabbitTemplate to send messages:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void sendMessage() { | |
Employee employee = new Employee(); | |
employee.setId(counter.incrementAndGet()); | |
employee.setName("name-" + employee.getId()); | |
rabbitTemplate.convertAndSend(employee); | |
} |
For consumer, a MessageListenerContainer is created to consume message asynchronously:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
public SimpleMessageListenerContainer listenerContainer() { | |
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); | |
container.setConnectionFactory(connectionFactory()); | |
container.setQueueNames(this.employeeQueueName); | |
container.setMessageListener(new MessageListenerAdapter(new MessageHandler(), jsonMessageConverter())); | |
return container; | |
} |
The MessageHandler code is as follows:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void handleMessage(Employee employee) { | |
LOG.info("handle employee with id {} and name {}", employee.getId(), employee.getName()); | |
} |
This class can be called anything you like, but the method must be called handleMessaege and with the correct signature (here it is Employee to match producer). If you want to change the method name, you have to call:
MessageListenerAdapter.setDefaultListenerMethod
The source code can be download from github.