admin 管理员组文章数量: 893893
ActiveMQ的简单使用
转:
ActiveMQ的简单使用
ActiveMQ是一种开源的,实现了JMS规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。
相关文章:
范例项目:
ActiveMQ集群高可用方案:
ActiveMQ组成:
ActiveMQ接发送消息流程图:
一. ActiveMQ的安装和配置
1、官网下载Linux版的ActiveMQ(最新版本为5.13.4)
.html
2、解压安装
tar -zxvf apache-activemq-5.13.4-bin.tar.gz
3、配置(这里采用默认配置,无需修改)
vim /usr/lical/activemq-1/conf/activemq.xml
4、启动
cd /usr/local/activemq-1/bin
./activemq start
5、打开管理界面(管理界面可以查看并管理所有队列及消息)
http://192.168.1.100:8161
二. Spring结合ActiveMQ使用
1、pom文件引入依赖
Xml代码- <!--active mq start-->
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-core</artifactId>
- <version>5.7.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-pool</artifactId>
- <version>5.13.3</version>
- </dependency>
- <!--active mq end-->
2、spring-mq配置文件
Xml代码- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns=""
- xmlns:xsi=""
- xsi:schemaLocation="
- .xsd">
- <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
- <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <!-- ActiveMQ服务地址 -->
- <property name="brokerURL" value="${mq.brokerURL}"/>
- <property name="userName" value="${mq.userName}"></property>
- <property name="password" value="${mq.password}"></property>
- <!-- 这里定义重试策略,注意:只有持久化的才会重试-->
- <property name="redeliveryPolicyMap" ref="redeliveryPolicyMap"/>
- </bean>
- <!--这里设置各个消息队列的重发机制-->
- <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap">
- <property name="redeliveryPolicyEntries">
- <list>
- <ref bean="bizRedeliveryPolicy"/>
- <ref bean="mailRedeliveryPolicy"/>
- </list>
- </property>
- </bean>
- <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
- <!--重发次数 延时、延时系数、延时指数开关、目标(重发等待时间1s, 2s, 4s, 8s)-->
- <property name="maximumRedeliveries" value="3"/>
- <property name="redeliveryDelay" value="1000"/>
- <property name="backOffMultiplier" value="2"/>
- <property name="useExponentialBackOff" value="true"/>
- <property name="destination" ref="bizQueue"/>
- </bean>
- <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
- <!--重发次数 延时、延时系数、延时指数开关-->
- <property name="maximumRedeliveries" value="2"/>
- <property name="redeliveryDelay" value="5000"/>
- <property name="destination" ref="mailQueue"/>
- </bean>
- <!--
- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
- 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。
- 要依赖于 activemq-pool包
- -->
- <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
- <property name="connectionFactory" ref="targetConnectionFactory"/>
- <property name="maxConnections" value="${mq.pool.maxConnections}"/>
- </bean>
- <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
- <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
- <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
- <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
- <property name="reconnectOnException" value="true"/>
- </bean>
- <!-- 队列目的地-->
- <bean id="bizQueue" class="org.apache.activemq.command.ActiveMQQueue">
- <constructor-arg index="0" value="${biz.queueName}"/>
- </bean>
- <bean id="mailQueue" class="org.apache.activemq.command.ActiveMQQueue">
- <constructor-arg index="0" value="${mail.queueName}"/>
- </bean>
- <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
- <!-- 队列模板 这里配置2个,一个用于分布式业务,一个用于发送邮件-->
- <bean id="bizMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
- <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
- <property name="connectionFactory" ref="connectionFactory"/>
- <property name="defaultDestination" ref="bizQueue"/>
- <!-- 使 deliveryMode, priority, timeToLive设置生效-->
- <property name="explicitQosEnabled" value="true" />
- <!-- 持久化 如果设置为非持久化MQ服务器重启后MQ中的数据会丢失-->
- <property name="deliveryPersistent" value="true"/>
- <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
- <property name="sessionTransacted" value="false"/>
- </bean>
- <bean id="mailMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
- <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
- <property name="connectionFactory" ref="connectionFactory"/>
- <property name="defaultDestination" ref="mailQueue"/>
- <!-- 使 deliveryMode, priority, timeToLive设置生效-->
- <property name="explicitQosEnabled" value="true" />
- <!-- 持久化 如果设置为非持久化MQ服务器重启后MQ中的数据会丢失-->
- <property name="deliveryPersistent" value="true"/>
- <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
- <property name="sessionTransacted" value="true"/>
- </bean>
- <!-- 消息监听实现方法一 -->
- <bean id="bizListener" class="com.yingjun.ssm.mq.listener.TransactionBizMessageListener"/>
- <bean id="mailListener" class="com.yingjun.ssm.mq.listener.MailMessageListener"/>
- <!-- 消息接收监听器用于异步接收消息-->
- <bean id="bizContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory"/>
- <property name="destination" ref="bizQueue"/>
- <property name="messageListener" ref="bizListener"/>
- <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
- <property name="sessionTransacted" value="true"/>
- <property name="concurrentConsumers" value="1"/>
- </bean>
- <bean id="mailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory"/>
- <property name="destination" ref="mailQueue"/>
- <property name="messageListener" ref="mailListener"/>
- <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
- <property name="sessionTransacted" value="true"/>
- <property name="concurrentConsumers" value="1"/>
- </bean>
- </beans>
3、重试机制以及死信的配置
Xml代码- <!--这里设置各个消息队列的重发机制-->
- <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap">
- <property name="redeliveryPolicyEntries">
- <list>
- <ref bean="bizRedeliveryPolicy"/>
- <ref bean="mailRedeliveryPolicy"/>
- </list>
- </property>
- </bean>
- <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
- <!--重发次数 延时、延时系数、延时指数开关、目标(重发等待时间1s, 2s, 4s, 8s)-->
- <property name="maximumRedeliveries" value="3"/>
- <property name="redeliveryDelay" value="1000"/>
- <property name="backOffMultiplier" value="2"/>
- <property name="useExponentialBackOff" value="true"/>
- <property name="destination" ref="bizQueue"/>
- </bean>
- <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
- <!--重发次数 延时、延时系数、延时指数开关-->
- <property name="maximumRedeliveries" value="2"/>
- <property name="redeliveryDelay" value="5000"/>
- <property name="destination" ref="mailQueue"/>
- </bean>
4、发送端代码
Java代码- package com.yingjun.ssm.biz;
- import com.alibaba.fastjson.JSONObject;
- import com.yingjun.ssm.common.model.BizOperator;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.jms.core.MessageCreator;
- import org.springframework.test.context.ContextConfiguration;
- import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.Session;
- /**
- * @author yingjun
- */
- @RunWith(SpringJUnit4ClassRunner.class)
- @ContextConfiguration("classpath:application.xml")
- public class Application {
- private final Logger log = LoggerFactory.getLogger(Application.class);
- @Autowired
- private JmsTemplate bizMqJmsTemplate;
- @Test
- public void mailSend() throws Exception {
- bizMqJmsTemplate.setSessionTransacted(true);
- for (int i = 0; i < 1; i++) {
- log.info("==>send message" + i);
- bizMqJmsTemplate.send(new MessageCreator() {
- @Override
- public Message createMessage(Session session) throws JMSException {
- log.info("getTransacted:" + session.getTransacted());
- BizOperator operator = new BizOperator("testDistributedTransaction", 1001);
- return session.createTextMessage(JSONObject.toJSONString(operator));
- }
- });
- log.info("==>finish send message"+ i);
- }
- while (true) {
- }
- }
- }
5、接受端代码
Java代码- package com.yingjun.ssm.mq.listener;
- import com.alibaba.fastjson.JSONObject;
- import com.yingjun.ssm.common.model.BizOperator;
- import com.yingjun.ssm.mq.biz.TransactionBizService;
- import org.apache.activemq.command.ActiveMQTextMessage;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.listener.SessionAwareMessageListener;
- import org.springframework.stereotype.Component;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.Session;
- /**
- *
- * @author yingjun
- */
- @Component
- public class TransactionBizMessageListener implements SessionAwareMessageListener<Message> {
- private static final Logger log = LoggerFactory.getLogger(TransactionBizMessageListener.class);
- private final String transactionBiz = "testDistributedTransaction";
- @Autowired
- private TransactionBizService transactionBizService;
- /**
- * @param message
- * @param session
- */
- public void onMessage(Message message, Session session) throws JMSException{
- //这里建议不要try catch,让异常抛出,通过redeliveryPolicy去重试,达到重试次数进入死信DLQ(Dead Letter Queue)
- ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
- String ms = ms = msg.getText();
- log.info("==>receive message:" + ms);
- // 转换成相应的对象
- BizOperator operator = JSONObject.parseObject(ms, BizOperator.class);
- if (operator != null && transactionBiz.equals(operator.getOperator())) {
- transactionBizService.addScoreBySyn(100);
- //throw new RuntimeException("test redeliveryPolicy");
- } else {
- log.info("==>message:" + ms + " no about operator!");
- }
- }
- }
本文标签: ActiveMQ的简单使用
版权声明:本文标题:ActiveMQ的简单使用 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1687329469h90210.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论