集成RabbitMQ
# 9.3 Spring Boot 集成 RabbitMQ
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息中间件,基于 erlang 开发,并发能力很强,性能极好,延时很低,是当前大量部署使用的消息中间件。
RabbitMQ 发送消息时,是先把消息发送给 Exchange(交换器),然后再分发给有相应 RoutingKey(路由键)关系的 Queue(队列)。
在 RabbitMQ 中消息交换器有四种:
- Direct模式:消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配。
- Topic模式:主题交换器通过模式匹配消息的路由键属性,然后将消息分配到绑定到该模式的队列。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。该交换器会识别两个通配符:“#”和“*”。#匹配0个或多个单词,*匹配一个单词。
- Fanout模式:每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上(就像广播一样)。fanout 类型转发消息是最快的。
- Header模式: 和主题交换器有点相似,头交换器的路由值基于消息的 header 数据。主题交换器路由键只有是字符串,而头交换器可以是整型和哈希值。
Headers 交换器允许你匹配 AMQP 消息的 header 而非路由键,除此之外 headers 交换器和 direct 交换器完全一致,但性能却很差,几乎用不到。
# 9.3.1 安装配置
到 RabbitMQ官网 (opens new window) 下载最新版 rabbitmq-server-3.8.2.exe (opens new window)。
安装运行 RabbitMQ 需要 64 位的 Erlang,针对 3.8.2 版本的 RabbitMQ 官方推荐 Erlang 22.1 版本。
到 Erlang官网 (opens new window) 下载 otp_win64_22.1.exe (opens new window)。
# 9.3.1.1 安装 Erlang
使用默认配置安装 Erlang。
# 9.3.1.2 安装 RabbitMQ
使用默认配置安装 RabbitMQ。
RabbitMQ 提供了 Web 管理界面。
打开命令提示符窗口,进入C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin
目录,执行rabbitmq-plugins.bat enable rabbitmq_management
开启 Web 管理插件。
Microsoft Windows [版本 10.0.18362.476]
(c) 2019 Microsoft Corporation。保留所有权利。
C:\Users\Kevin>cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@NOTEBOOK-KEVIN:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@NOTEBOOK-KEVIN...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
打开浏览器访问 http://localhost:15672/ (opens new window) ,使用guest用户,密码guest登录管理控制台。
# 9.3.2 与 Spring Boot 集成
新建 Spring Boot 项目,选择 Spring for RabbitMQ+Spring Web 启动器依赖。
pom 文件的主要依赖为spring-boot-starter-amqp
。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2
3
4
5
6
7
8
在配置文件 application.yml 中添加 RabbitMQ 连接等配置信息。
spring:
application:
name: spring-boot-rabbitmq
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 开启发送确认
publisher-confirm-type: correlated
# 开启发送失败退回
publisher-returns: true
# 开启ACK
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 9.3.2.1 一对一模式
一个生产者对一个消费者模式,生产者将消息投送到队列中,消费者从队列中消费消息,然后就结束了。
创建配置类 RabbitMQConfig,在其中配置消息队列kevin
。
package com.example.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Queue kevinQueue() {
return new Queue("kevin");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
创建消息生产者 RabbitMQProducer,使用注入的 AmqpTemplate 对象向消息队列kevin
发送消息。
package com.example.rabbitmq.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQProducer {
@Autowired
AmqpTemplate amqpTemplate;
public void sendMessage(String msg) {
amqpTemplate.convertAndSend("kevin", msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
创建消息消费者 RabbitMQConsumer,监听kevin
消息队列的消息,并在管理控制台上输出信息。
package com.example.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "kevin")
public class RabbitMQConsumer {
@RabbitHandler
public void receive(String msg) {
System.out.println("RabbitMQ Consumer consume message: " + msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
创建与用户交互的控制器类 RabbitMQController,在/rabbitmq/send
路径上接收用户输入。
package com.example.rabbitmq.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.rabbitmq.producer.RabbitMQProducer;
@RestController
@RequestMapping("/rabbitmq/")
public class RabbitMQController {
@Autowired
RabbitMQProducer rabbitMQProducer;
@RequestMapping("/send")
public String sendMsg(String msg) {
rabbitMQProducer.sendMessage(msg);
return msg + " Sended to kevin.";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
运行 Spring Boot 应用,打开浏览器访问 [http://localhost:8080/rabbitmq/send?msg=Kevin is a GOODMAN.](http://localhost:8080/rabbitmq/send?msg=Kevin is a GOODMAN.) ,向 RabbitMQ 发送消息。
RabbitMQ 消费者(RabbitMQConsumer 类)消费消息,并在控制台打印信息。
通过 管理控制台 (opens new window),可以看到消息队列的相关情况。
# 9.3.2.2 一对多模式
一个生产者对多个消费者,该模式下可以是一个生产者将消息投递到一个队列,该队列对应多个消费者,此时每条消息只会被消费一次,多个消费者循环处理。另外也可以是一个生产者将消息投递到多个队列里,此时消息是被复制处理。
在配置类com.example.rabbitmq.config.RabbitMQConfig
中添加一个新的消息队列 roy。
@Bean
public Queue royQueue() {
return new Queue("roy");
}
2
3
4
创建消息生产者 RabbitMQProducerMore,使用注入的 AmqpTemplate 对象向消息队列roy
发送消息。
package com.example.rabbitmq.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQProducerMore {
@Autowired
AmqpTemplate amqpTemplate;
public void sendMessage(String msg) {
amqpTemplate.convertAndSend("roy", msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
创建消息消费者 RabbitMQConsumer1,监听roy
消息队列的消息,并在管理控制台上输出信息。
package com.example.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "roy")
public class RabbitMQConsumer1 {
@RabbitHandler
public void receive(String msg) {
System.out.println("RabbitMQ Consumer1 consume message: " + msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
创建消息消费者 RabbitMQConsumer2,监听roy
消息队列的消息,并在管理控制台上输出信息。
package com.example.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "roy")
public class RabbitMQConsumer2 {
@RabbitHandler
public void receive(String msg) {
System.out.println("RabbitMQ Consumer2 consume message: " + msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在控制器类 RabbitMQController 中,添加/rabbitmq/sendMore
路径上接收用户输入,并调用 RabbitMQProducerMore(消息生产者类)中的 sendMessage 方法循环发送5条消息到 roy 消息队列上。
@Autowired
RabbitMQProducerMore rabbitMQProducerMore;
@RequestMapping("/sendMore")
public String sendMoreMsg(String msg) {
for (int i = 0; i < 5; i++) {
rabbitMQProducerMore.sendMessage(msg + "No." + i);
}
return msg + " Sended 5 Messages to roy.";
}
2
3
4
5
6
7
8
9
10
运行 Spring Boot 应用,打开浏览器访问 [http://localhost:8080/rabbitmq/sendMore?msg=Roy is a GOODMAN.](http://localhost:8080/rabbitmq/sendMore?msg=Roy is a GOODMAN.),向 RabbitMQ 发送消息。
RabbitMQ 消费者(RabbitMQConsumer1 类和 RabbitMQConsumer2 类)轮流消费消息,并在控制台打印信息。
通过 管理控制台 (opens new window),可以看到消息队列的相关情况。
一个生产者将消息投递到多个队列里的场景,请读者自行完成练习。
# 9.3.2.3 ACK 消息确认
消息确认 ACK 机制,是指消费者从 RabbitMQ 收到消息并处理完成后,反馈给 RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除的一种保证消息能被正确消费的一种处理方法。
如果一个消费者在处理消息的过程中出现了网络不稳定、服务器异常等现象,那么就不会有 ACK 反馈,RabbitMQ 会认为这个消息没有被正常消费,会将消息重新放入队列中。
ACK:acknowledgement,确认。
在配置类com.example.rabbitmq.config.RabbitMQConfig
中添加一个新的消息队列lily
,并绑定到一个名为“fanoutExchange”的 Fanout 交换器上。
@Bean
public Queue ackQueue() {
return new Queue("lily");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(ackQueue).to(fanoutExchange);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
创建消息生产者 RabbitMQProducerAck,使用注入的 RabbitTemplate 对象向消息队列lily
发送消息,并编写对应的回调实现。
package com.example.rabbitmq.producer;
import java.time.LocalDateTime;
import java.time.ZoneId;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQProducerAck implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("RabbitMQProducerAck Returned Message: " + message.toString() + " , " + i + " , " + s1 + " , " + s2);
}
public void sendMessage(String msg) {
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("Send Message Success.");
} else {
System.out.println("Send Message Failure:" + cause + correlationData.toString());
}
});
rabbitTemplate.convertAndSend("lily", msg + ", " + LocalDateTime.now(ZoneId.systemDefault()));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
创建消息消费者 RabbitMQConsumerAck,监听lily
消息队列的消息,消费消息并确认(channel.basicAck),然后在管理控制台上输出信息。
package com.example.rabbitmq.consumer;
import java.time.LocalDateTime;
import java.time.ZoneId;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues = {"lily"})
public class RabbitMQConsumerAck {
@RabbitHandler
public void process(String sendMsg, Channel channel, Message message) {
System.out.println("RabbitMQ ConsumerAck consume message: " + sendMsg + " @" + LocalDateTime.now(ZoneId.systemDefault()));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("Process Success");
} catch (Exception e) {
System.out.println("Process Failure.");
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
在控制器类 RabbitMQController 中,添加/rabbitmq/sendAck
路径,接收用户输入。
@Autowired
RabbitMQProducerAck rabbitMQProducerAck;
@RequestMapping("/sendAck")
public String sendAckMsg(String msg) {
rabbitMQProducerAck.sendMessage(msg);
return msg + " Sended to lily.";
}
2
3
4
5
6
7
8
运行 Spring Boot 应用,打开浏览器访问 [http://localhost:8080/rabbitmq/sendAck?msg=Lily is a GOODMAN.](http://localhost:8080/rabbitmq/sendAck?msg=Lily is a GOODMAN.),向 RabbitMQ 发送消息。
RabbitMQ 消费者 RabbitMQConsumerAck 消费消息,然后告诉服务器收到的这条消息已经被当前消费者(RabbitMQConsumerAck)消费了,消息服务器就可以删除这条消息了。
RabbitMQ 生产者 RabbitMQProducerAck 收到确认回调信息后,在控制台输出Send Message Success.
的信息。
请仔细查看在控制台打印的信息并结合上述代码理解消息生产、消费、确认的过程。
RabbitMQ ConsumerAck consume message: Lily is a GOODMAN., 2019-12-13T00:07:51.750 @2019-12-13T00:07:51.782:根据代码,前面的时间为消息的生产时间,后面的时间为消息的消费时间。
通过 管理控制台 (opens new window),可以看到消息队列的相关情况。
kevin 队列有1条消息没有确认,roy 队列有5条消息没有确认,所以这次 Spring Boot 应用启动的时候还能看到RabbitMQ Consumer1 consume message: Roy is a GOODMAN.No.3
之类的信息就是因为没有确认消息已被消费(服务器没有收到确认就不会删除消息,下次连接到队列,就会继续发送消息给消费者)。
lily 队列 Unacked 消息数量为 0,是因为消息已经确认消费了。
本小节示例项目代码:
https://github.com/gyzhang/SpringBootCourseCode/tree/master/spring-boot-rabbitmq (opens new window)