Spring集成ActiveMQ

文章目录
  1. 1. 核心类介绍
  2. 2. 在pom.xml中添加ActiveMQ依赖
  3. 3. 队列模式演示
    1. 3.1. 生产者发送消息
    2. 3.2. 消费者消费消息

核心类介绍

  • ConnectionFactory 用于管理连接的连接工厂
    • 一个Spring为我们提供的连接池
    • Spring 中提供了 SingleConnectionFactory 和 CachingConnectionFactory
  • JmsTemplate 用于发送和接收消息的模板类
  • MessageListener 消息监听器

在pom.xml中添加ActiveMQ依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<properties>
<spring.version>4.2.5.RELEASE</spring.version>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

队列模式演示

生产者发送消息

生产者发送消息接口 ProducerService.java

1
2
3
4
5
6
7
8
9
10
public interface ProducerService {

/**
* 发送消息
*
* @param message
*/
void sendMessage(String message);

}

生产者发送消息接口实现类 ProducerServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ProducerServiceImpl implements ProducerService {

@Autowired
private JmsTemplate jmsTemplate;

@Resource(name = "queueDestination")
private Destination destination;

public void sendMessage(final String message) {
// 使用jmsTemplate发送消息
jmsTemplate.send(destination, new MessageCreator() {
// 创建一个消息
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
System.out.println("发送消息:" + message);
}
}

生产者公共配置文件 common.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<!-- 启用注解 -->
<context:annotation-config/>

<!-- active mq 为我们提供的 ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<!-- spring jms 为我们提供的连接池-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>

<!-- 一个队列目的地,点对点的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue-spring"></constructor-arg>
</bean>

</beans>

生产者配置文件 producer.xml 导入公共 common.xml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<!-- 引入公共配置-->
<import resource="common.xml"/>

<!-- 配置JmsTemplate,用于发送消息-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>

<!-- 配置自己定义的bean-->
<bean class="com.gulj.spring.jms.producer.ProducerServiceImpl"></bean>

</beans>

生产者启动类 AppProducer.java 运行main方法,启动生产者发送消息

1
2
3
4
5
6
7
8
9
10
public class AppProducer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ProducerService producerService = context.getBean(ProducerService.class);
for (int i = 0; i < 80; i++) {
producerService.sendMessage("test spring jms" + i);
}
context.close();
}
}

生产者发送消息运行效果图

消费者消费消息

消费者监听消息进行消费 ConsumerMessageListener.java

1
2
3
4
5
6
7
8
9
10
11
12
public class ConsumerMessageListener implements MessageListener {

public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

}

消费者配置文件 consumer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<!-- 引入公共配置 -->
<import resource="common.xml"/>

<!-- 配置自定义的消息监听器-->
<bean id="consumerMessageListener" class="com.gulj.spring.jms.consumer.ConsumerMessageListener"></bean>

<!-- 配置消息监听容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!-- 连接工厂 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 队列名称 -->
<property name="destination" ref="queueDestination"/>
<!--消息监听器-->
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>

消费者启动类 AppConsumer.java 直接运行main方法,监听消息进行消费

1
2
3
4
5
6
public class AppConsumer {

public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}

消费者消费消息运行效果图