Spring AMQP RabbitMQ - Spring Boot 168 EP 21

Spring AMQP RabbitMQ – Spring Boot 168 EP 21

用來設定佇列、交換器、路由等功能,採最小化配置,用極簡的代碼就能實現對 MQ 的操作訪問, EP 21 增加了相依套件及範例,並透過 JUnit 5 單元測試來驗證產出結果。

前言

RabbitMQ 是一個訊息佇列服務,提供一個非同步通訊機制,負責處理網路通訊,當網路連接發生不可用狀況,消息會被暫存於佇列當中,網路暢通的時候再處理請求,一般用來解決應用解耦,非同步消息等,實現高性能,高可用,可伸縮和最終一致性架構。

Spring AMQP RabbitMQ

檔案目錄

./
   +- build.gradle
       +- src
           +- main
               +- resources
               |   +- application.properties
               +- java
               |   +- org
               |       +- ruoxue
               |           +- spring_boot_168
               |               +- config
               |                   +- RabbitMQConfig.java   

Gradle

build.gradle

增加 Spring Boot Starter AMQP 。

修改完後,點右鍵,Gradle -> Refresh Gradle Project 。

buildscript {
	group 'org.ruoxue.spring-boot-168'
	version = '0.0.1-SNAPSHOT'
	ext {
		springBootVersion = '2.1.7.RELEASE'
	}
}

dependencies {
	implementation "org.springframework.boot:spring-boot-starter-amqp:${springBootVersion}"
}

設定

application.properties

增加 RabbitMQ 設定。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=admin
spring.rabbitmq.password=1111
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=100
spring.rabbitmq.listener.direct.acknowledge-mode=manual

RabbitMQConfig.java

新增檔案。

package org.ruoxue.spring_boot_168.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import lombok.extern.slf4j.Slf4j;

@EnableRabbit
@Configuration
@Slf4j
public class RabbitMQConfig {

	@Bean(name = "messageConverter")
	public MessageConverter messageConverter() {
		return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
	}

	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
		return new RabbitAdmin(connectionFactory);
	}

	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
			@Qualifier("messageConverter") MessageConverter messageConverter) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		rabbitTemplate.setMessageConverter(messageConverter);
		rabbitTemplate.setMandatory(true);
		rabbitTemplate.setConfirmCallback(confirmCallback());
		rabbitTemplate.setReturnCallback(returnCallback());
		return rabbitTemplate;
	}

	@Bean(name = "confirmCallback")
	public RabbitTemplate.ConfirmCallback confirmCallback() {
		RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause) {
				// do something
			}
		};
		return confirmCallback;
	}

	@Bean(name = "returnCallback")
	public RabbitTemplate.ReturnCallback returnCallback() {
		RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
			@Override
			public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
					String exchange, String routingKey) {
				// do something
			}
		};
		return returnCallback;
	}

	@Bean
	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
			@Qualifier("messageConverter") MessageConverter messageConverter) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		factory.setConnectionFactory(connectionFactory);
		factory.setMessageConverter(messageConverter);

		String podName = System.getProperty("pod.name");
		factory.setConsumerTagStrategy(q -> podName + "." + q);
		return factory;
	}

}

測試

RabbitMQConfigTest.java

新增單元測試,驗證是否符合預期 。

package org.ruoxue.spring_boot_168.config;

import static org.junit.jupiter.api.Assertions.assertNotNull;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.ruoxue.spring_boot_168.Application;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = Application.class)
public class RabbitMQConfigTest {

	@Autowired
	private RabbitMQConfig config;

	@Test
	public void config() {
		System.out.println(config);
		assertNotNull(config);
	}
}

config

測試方法上點右鍵執行 Run As -> JUnit Test ,查看 console 。

2022-06-06T09:06:30.395+0800 [qtp269016862-26] INFO CachingConnectionFactory#createBareConnection:482 - Attempting to connect to: [127.0.0.1:5672]
2022-06-06T09:06:30.450+0800 [qtp269016862-26] INFO CachingConnectionFactory#createBareConnection:517 - Created new connection: rabbitConnectionFactory#7de2bdc7:0/SimpleConnection@4012fe26 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 59701]

心得分享

訊息沒有設置手動確認 setAcknowledgeMode 前,會是自動確認,訊息會一次性發給消費者,存在消費者的通道裡, 佇列會清空,設置手動確認後,消息發給消費者後,會等待消費者向 MQ 確認, 消費者確認後, MQ 才會接著發送下一條資料,設置手動確認的同時,一般會同時設置 QOS 一次只發送一條消息。

發佈留言