ActiveMQ入门教程

文章目录
  1. 1. 介绍
    1. 1.1. ActiveMQ
    2. 1.2. 什么是JMS?
    3. 1.3. 什么是AMQP?
    4. 1.4. JMS 和 AMQP 对比
    5. 1.5. ActiveMQ、RabbitMQ 和 Kafka 简单对比
  2. 2. JMS规范
    1. 2.1. JMS相关概念
    2. 2.2. JMS消息模式—队列模型
    3. 2.3. JMS消息模式—主题模型
    4. 2.4. JMS编码接口
    5. 2.5. JMS编码接口之间的关系
  3. 3. 安装
  4. 4. 队列模式消息和主题模式消息代码演示
    1. 4.1. 队列模式消息演示
    2. 4.2. 主题模式消息演示

介绍

ActiveMQ

ActiveMQ 是Apache出的,最流行的,功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。

什么是JMS?

Java消息服务(Java Message Service) 即JMS,是一个java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

什么是AMQP?

AMQP(advanced message queuing protocol) 是一个提供统一消息服务的应用层标准层协议,基于此协议的客户端与消息中间件可传递性,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

JMS 和 AMQP 对比

ActiveMQ、RabbitMQ 和 Kafka 简单对比

JMS规范

JMS相关概念
  • 消费者/订阅者 : 接收并处理消息的客户端
  • 消息 : 应用程序之间传递的数据内容
  • 消息模式 : 在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式
JMS消息模式—队列模型
  • 客户端包括生产者和消费者
  • 队列中的消息只能被一个消费者消费
  • 消费者可以随时消费队列中的消息

队列模型示意图

JMS消息模式—主题模型
  • 客户端包括生产者和消费者
  • 主题中的消息被所有消费者消费
  • 消费者必须先订阅,才能消费发送到主题中的消息

主题模型示意图

JMS编码接口
  • ConnectionFactory 用于创建连接到消息中间件的连接工厂
  • Connection 代表了应用程序和消息服务器之间的通信链路
  • Destination 指消息发布和接收的地点,包括队列或主题
  • Session 表示一个单线程的上下文,用于发送和接收消息
  • MessageConsumer 由会话创建,用于接收发送到目标的消息
  • MessageProducer 由会话创建,用于发送消息到目标
  • Message 是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体
JMS编码接口之间的关系

安装

  • 查询Docker镜像
1
$ docker search activemq
  • 下载Docker镜像
1
$ docker pull webcenter/activemq
  • 创建&运行ActiveMQ容器
1
$ docker run -d --name myactivemq -p 61616:61616 -p 8161:8161 webcenter/activemq
  • 查看WEB管理页面

浏览器输入http://127.0.0.1:8161/ 点击Manage ActiveMQ broker使用默认账号/密码:admin/admin进入查看

队列模式消息和主题模式消息代码演示

队列模式消息演示

生产者 AppProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class AppProducer {

// MQ服务器地址
private static final String BROKER_URL = "tcp://127.0.0.1:61616";

// 队列名称
private static final String QUEUE_NAME = "queue-test";

public static void main(String[] args) throws JMSException {
// 1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);

// 2.创建连接
Connection connection = factory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建一个目标
Destination destination = session.createQueue(QUEUE_NAME);

// 6.创建一个生产者
MessageProducer producer = session.createProducer(destination);

// 循环发送消息
for (int i = 0; i < 100; i++) {
// 创建消息
TextMessage textMessage = session.createTextMessage("【test-activemq-producer】" + i);
// 发布消息
producer.send(textMessage);

System.out.println("消息发送成功:" + textMessage);
}

// 关闭连接
connection.close();
}
}

消费者 AppConsumer.java (由于消费者的创建流程和生产者非常类似,特意用 ActiveMqHelper 类进行简单封装一下)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class AppConsumer {

public static void main(String[] args) throws JMSException {
// 创建连接
Connection connection = ActiveMqHelper.createConnection();

// 创建会话
Session session = ActiveMqHelper.createSession(connection);

// 创建一个目标
Destination destination = ActiveMqHelper.createQueue(session);

// 创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);

// 消费者监听要消费的信息
consumer.setMessageListener(message -> {
// 接收到的消息内容c
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("【消费者接收到的消息】" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});

// 关闭连接,因为监听是异步操作,
// 如果事先关闭连接,监听操作还未处理完成,则会收不到消息,
// 正常处理逻辑是在监听事件处理完成后,再释放连接
// connection.close();
}
}

MQ帮助类 ActiveMqHelper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class ActiveMqHelper {

// MQ服务器地址
private static final String BROKER_URL = "tcp://127.0.0.1:61616";

// 队列名称
private static final String QUEUE_NAME = "queue-test";

// 主题名称
private static final String TOPIC_NAME = "topic-test";


/**
* 创建连接
*
* @return
* @throws JMSException
*/
public static Connection createConnection() throws JMSException {
// 1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);

// 2.创建连接
Connection connection = factory.createConnection();

// 3.启动连接
connection.start();

return connection;
}


/**
* 创建会话
*
* @param connection
* @return
* @throws JMSException
*/
public static Session createSession(Connection connection) throws JMSException {
// 4.创建会话
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

/**
* 创建队列目标
*
* @param session
* @return
* @throws JMSException
*/
public static Destination createQueue(Session session) throws JMSException {
// 5.创建一个队列目标
return session.createQueue(QUEUE_NAME);
}


/**
* 创建主题目标
*
* @param session
* @return
* @throws JMSException
*/
public static Destination createTopic(Session session) throws JMSException {
// 5.创建一个主题目标
return session.createTopic(TOPIC_NAME);
}
}
代码效果图展示

主题模式消息演示

生产者 AppProducer.java (主题模式的代码示例和队列模式的代码示例基本相同,只有在创建主题目标的时候不同,还请老铁自行实现) 消费者同理参考队列模式代码实现

1
2
3
4
5
6
7
8
9
10
public class AppProducer {
public static void main(String[] args) throws JMSException {
// ....

// 3.创建主题目标 (与队列模式唯一区别的代码,其它都一样)
Destination destination = session.createTopic(TOPIC_NAME);

// ....
}
}

主题模式注意点
1、先启动AppConsumer.java类(消费者先进行订阅),在启动 AppProvider.java 类(生产者进行生产)这样消费者才能监听到生产者生成的消息进行消费;队列模式没有此要求
2、生产者生产100条消息,若有多个消费者,每个消费者均消费100条消息;而队列模式,则是每个消费者合计起来消费这 100 条消息

代码效果图展示