集成ActiveMQ
# 9.2 Spring Boot 集成 ActiveMQ
ActiveMQ 是一种基于 JMS 1.1 规范的开源的消息中间件,ActiveMQ 的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。
ActiveMQ 非常成熟,功能强大,在早些年业内大量的公司以及项目中都有应用,偶尔会有较低概率丢失消息。现在社区活跃度在降低,国内应用越来越少。官方现在对 ActiveMQ 5.x 维护越来越少,几个月才发布一个版本。
# 9.2.1安装配置
到官网下载最新的 apache-activemq-5.15.10-bin.zip (opens new window) 编译好的 zip 压缩包。
将其解压到磁盘,例如C:\Java\apache-activemq-5.15.10
这个目录。
打开命令终端,进入 ActiveMQ 安装目录下的 bin 目录,执行activemq.bat start
启动 ActiveMQ 服务器。
访问 http://localhost:8161/index.html (opens new window) 控制台,点击 Manage ActiveMQ broker (opens new window) 用户名和密码都是admin,进入管理界面,检查 ActiveMQ 是否正确安装。
因为 ActiveMQ 应用得越来越少,我们就不做集群安装配置演示了。
# 9.2.2 Spring Boot 集成 ActiveMQ
Spring Boot 为 ActiveMQ 提供了启动器(starter),集成 ActiveMQ 很方便。
Spring 为 JMS 提供了org.springframework.jms.core.JmsTemplate
模板类,封装了常用的消息操作,使用起来非常方便。
新建一个 Spring Boot 项目,选择 Spring Web 和 Spring for Apache ActiveMQ 5 这两个 starter 依赖。
项目 pom 文件,最主要的就是:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2
3
4
# 9.2.2.1 点对点(Queue)模式
ActiveMQ 的默认配置为点对点模式(spring.jms.pub-sub-domain=false
)。
在配置文件 application.yml 中配置到 ActiveMQ 的连接信息:
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
2
3
4
5
创建一个 Controller,接收用户输入,向 MQ 发送消息:
package com.example.activemq.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/activemq/")
public class ActiveMQController {
@Autowired
private JmsTemplate jmsTemplate;
@RequestMapping("/send")
public String sendMsg(String msg) {
jmsTemplate.convertAndSend("Q1", msg);
return msg + " Sended to Q1.";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
在 Controller 中注入private JmsTemplate jmsTemplate;
,然后使用 JmsTemplate 的 send 方法向 Q1 队列发送消息。
创建一个服务类,监听消息队列 Q1,并简单地将接收到地消息输出到控制台。
package com.example.activemq.service;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
@Service
public class MessageListener {
@JmsListener(destination = "Q1")
public void msgReceive(String msg) {
System.out.println("Message: " + msg + " Received.");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
注解@JmsListener(destination = "Q1")
表示 msgReceive 方法监听 Q1 这个队列。
运行 Spring Boot 应用程序,访问 [http://localhost:8080/activemq/send?msg=Hello%20Kevin.](http://localhost:8080/activemq/send?msg=Hello Kevin.) ,向消息队列中发送“Hello Kevin.”这样一条消息。
然后观察 Spring Boot 应用的控制台,可以看到由 MessageListener.msgReceive 方法从 Q1 接收到消息后在控制台输出的信息。
在 ActiveMQ 的管理控制台,也可以看到由程序创建的 Q1 队列,及消息消费者和消息相关的信息。
# 9.2.2.2 发布订阅(Topic)模式
在 Spring Boot 应用中通过spring.jms.pub-sub-domain=true
配置,打开 Spring 对 ActiveMQ 发布订阅模式的支持。
新创建一个 Spring Boot 项目,选择 Spring Web 和 Spring for Apache ActiveMQ 5 这两个 starter 依赖。
在配置文件 application.yml 中配置到 ActiveMQ 的连接信息:
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
packages:
trust-all: true
jms:
pub-sub-domain: true
2
3
4
5
6
7
8
9
在com.example.activemq.topic.producer
包下面创建消息生产者类 TopicProducer,将消息发送到“Topic-Weather”这个 Topic 上。
package com.example.activemq.topic.producer;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage(String msg) {
ActiveMQTopic destination = new ActiveMQTopic("Topic-Weather");
jmsTemplate.convertAndSend(destination, msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
在com.example.activemq.topic.consumer
包下创建消息消费者 TopicConsumer,通过两个消息监听方法模拟两个订阅“Topic-Weather”主题的消息消费者(subscriber1 和 subscriber2)。
package com.example.activemq.topic.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
/**
* 消息消费者1,监听"Topic-Weather"上的消息
* @param msg 消息
*/
@JmsListener(destination = "Topic-Weather")
public void subscriber1(String msg) {
System.out.println("Consumer1 consume message: " + msg);
}
/**
* 消息消费者2,监听"Topic-Weather"上的消息
* @param msg 消息
*/
@JmsListener(destination = "Topic-Weather")
public void subscriber2(String msg) {
System.out.println("Consumer2 consume message: " + msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
在com.example.activemq.topic.controller
包下创建和用户交互发送消息的控制器,在其中调用消息生产者 TopicProducer 类来发送消息。
package com.example.activemq.topic.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.activemq.topic.producer.TopicProducer;
@RestController
@RequestMapping("/activemq/")
public class TopicController {
@Autowired
private TopicProducer topicProducer;
@RequestMapping("/send")
public String sendMsg(String msg) {
topicProducer.sendMessage(msg);
return msg + " Sended to Topic-Weather.";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
运行 Spring Boot 应用程序,访问 http://localhost:8080/activemq/send?msg=明天气温6-8度 (opens new window) ,向消息队列中发送“Hello Kevin.”这样一条消息。
然后观察 Spring Boot 应用的控制台,可以看到由 TopicConsumer.subscriber1 方法和 TopicConsumer.subscriber2 方法订阅Topic-Weather这个主题后接收到消息,在控制台输出的信息。
在 ActiveMQ 的管理控制台,也可以看到 Topic-Weather 及其消费者数量等信息。
本小节示例项目代码:
https://github.com/gyzhang/SpringBootCourseCode/tree/master/spring-boot-activemq (opens new window)