用來設定佇列、交換器、路由等功能,採最小化配置,用極簡的代碼就能實現對 MQ 的操作訪問, EP 21 增加了相依套件及範例,並透過 JUnit 5 單元測試來驗證產出結果。
Table of Contents
Toggle前言
RabbitMQ 是一個訊息佇列服務,提供一個非同步通訊機制,負責處理網路通訊,當網路連接發生不可用狀況,消息會被暫存於佇列當中,網路暢通的時候再處理請求,一般用來解決應用解耦,非同步消息等,實現高性能,高可用,可伸縮和最終一致性架構。
Spring AMQP RabbitMQ
檔案目錄
./
+- build.gradle
+- src
+- main
+- resources
| +- application.properties
+- java
| +- org
| +- ruoxue
| +- spring_boot_168
| +- config
| +- RabbitMQConfig.java
Gradle
build.gradle
增加 Spring Boot Starter AMQP 。
修改完後,點右鍵,Gradle -> Refresh Gradle Project 。
buildscript {
group 'org.ruoxue.spring-boot-168'
version = '0.0.1-SNAPSHOT'
ext {
springBootVersion = '2.1.7.RELEASE'
}
}
dependencies {
implementation "org.springframework.boot:spring-boot-starter-amqp:${springBootVersion}"
}
設定
application.properties
增加 RabbitMQ 設定。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=admin
spring.rabbitmq.password=1111
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=100
spring.rabbitmq.listener.direct.acknowledge-mode=manual
RabbitMQConfig.java
新增檔案。
package org.ruoxue.spring_boot_168.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.extern.slf4j.Slf4j;
@EnableRabbit
@Configuration
@Slf4j
public class RabbitMQConfig {
@Bean(name = "messageConverter")
public MessageConverter messageConverter() {
return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
@Qualifier("messageConverter") MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback());
rabbitTemplate.setReturnCallback(returnCallback());
return rabbitTemplate;
}
@Bean(name = "confirmCallback")
public RabbitTemplate.ConfirmCallback confirmCallback() {
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// do something
}
};
return confirmCallback;
}
@Bean(name = "returnCallback")
public RabbitTemplate.ReturnCallback returnCallback() {
RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
// do something
}
};
return returnCallback;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
@Qualifier("messageConverter") MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter);
String podName = System.getProperty("pod.name");
factory.setConsumerTagStrategy(q -> podName + "." + q);
return factory;
}
}
測試
RabbitMQConfigTest.java
新增單元測試,驗證是否符合預期 。
package org.ruoxue.spring_boot_168.config;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.ruoxue.spring_boot_168.Application;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = Application.class)
public class RabbitMQConfigTest {
@Autowired
private RabbitMQConfig config;
@Test
public void config() {
System.out.println(config);
assertNotNull(config);
}
}
config
測試方法上點右鍵執行 Run As -> JUnit Test ,查看 console 。
2022-06-06T09:06:30.395+0800 [qtp269016862-26] INFO CachingConnectionFactory#createBareConnection:482 - Attempting to connect to: [127.0.0.1:5672]
2022-06-06T09:06:30.450+0800 [qtp269016862-26] INFO CachingConnectionFactory#createBareConnection:517 - Created new connection: rabbitConnectionFactory#7de2bdc7:0/SimpleConnection@4012fe26 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 59701]
心得分享
訊息沒有設置手動確認 setAcknowledgeMode 前,會是自動確認,訊息會一次性發給消費者,存在消費者的通道裡, 佇列會清空,設置手動確認後,消息發給消費者後,會等待消費者向 MQ 確認, 消費者確認後, MQ 才會接著發送下一條資料,設置手動確認的同時,一般會同時設置 QOS ,一次只發送一條消息。