admin 管理员组文章数量: 894192
ActiveMQ(二)
目录
八:Spring整合Activemq
生产者的实现:
消费者的实现:
设置监听器
九:Springboot整合ActiveMQ
1.queue中实现生产者和消费者
2.topic中实现生产者和消费者
十:ActiveMQ的传输协议
1.各协议介绍
2.配置协议
这里我们的要求是进行配置一个NIO协议
3.使用auto+协议
十一:ActiveMQ的持久化机制
十二:ActiveMQ集群搭建
十三:ActiveMQ的高级特性
1.同步投递和异步投递
区别:
2.异步投递的实现
3.延时投递
4.消息的重试机制
重试机制流程分析:
5.死信队列
6.幂等性消费
八:Spring整合Activemq
生产者与消费者的依赖:
<dependencies><!--activemq核心依赖包 --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.17.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.13.3</version></dependency><!-- activemq连接池--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.17.0</version></dependency><!-- spring支持jms的包 --><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>5.3.22</version></dependency><!-- spring相关依赖包--><dependency><groupId>org.apache.xbean</groupId><artifactId>xbean-spring</artifactId><version>3.16</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.3.6</version></dependency><!-- spring核心依赖--><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>5.3.22</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.3.20</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.3.6</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-orm</artifactId><version>5.3.22</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.3.6</version></dependency><!-- junit依赖 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency> </dependencies>
生产者与消费者共同的Spring配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns=""xmlns:xsi=""xmlns:context=""xsi:schemaLocation=" .xsd .xsd"> <!-- 1.开启包的自动扫描--><context:component-scan base-package="com.qf.activemq"/> <!-- 2.配置生产者--><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"><property name="connectionFactory"> <!-- 配置这个Connection连接工厂--><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="xxx"/></bean></property><property name="maxConnections" value="100"/></bean> <!-- 目的地bean配置Queue--><bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="my-spring-queue"/></bean> <!-- 目的地的bean配置Topic--><bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="my-spring-topic"/></bean> <!-- Spring提供的JMS类--><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 代入连接工厂--><property name="connectionFactory" ref="connectionFactory"/> <!-- 代入目的地--><property name="defaultDestination" ref="destinationTopic"/> <!-- 消息自动转换器--><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean> </beans>
生产者的实现:
public class Producer {@Autowiredprivate JmsTemplate jmsTemplate ;/*** 发送消息*/public void send(String message) {jmsTemplate.send(new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage textMessage = session.createTextMessage(message) ;return textMessage ;}});} }
消费者的实现:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:spring-activemq.xml") public class TestConsumer { @Autowiredprivate JmsTemplate jmsTemplate ; @Testpublic void testReceive() {while (true) {//receiveAndConvert()表示接收到消息并且获取到内容String context = (String) jmsTemplate.receiveAndConvert();System.out.println("接收到的消息为:" + context);}} }
设置监听器
public class Consumer implements MessageListener {/*** 监听到消息后,对消息的处理* @param message*/@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message ;try {System.out.println(textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}} }
九:Springboot整合ActiveMQ
1.queue中实现生产者和消费者
1.引入相同的依赖
2.生产者的编写
配置类:
制造类:
application.yml :
3.消费者的编写
2.topic中实现生产者和消费者
生产者:
1.
2.
消费者:
十:ActiveMQ的传输协议
1.各协议介绍
2.配置协议
这里我们的要求是进行配置一个NIO协议
1.
2.所以我们要进行配置一个NIO协议
3.
3.使用auto+协议
代码演示:
1.在activemq.xml配置文件中只进行配置这一个auto+nio
2.
十一:ActiveMQ的持久化机制
十二:ActiveMQ集群搭建
当master主机宕机之后,某一台slave会变为master主机
总结:
(1)集群中的三台broker共享一个kahadb文件系统。也就是说虽然broker有三台,但是持久化的日志文件只有一份。
(2)集群中只有master身份的broker负责对消息进行处理,也就是说对于整个集群,只有master能够提供对外服务,接收消息和发送消息
的服务
(3)当master出现宕机,此时会在slave从机中自动选出新的master主机,对外进行提供服务,实现对activemq的高可用
(4)针对于jdbc的高可用,除了将持久化方案改为jdbc外,还可以进行配置mysql的主从集群,实现mysql的高可用
十三:ActiveMQ的高级特性
1.同步投递和异步投递
同步投递:
(1)生产者投递一个Message给到broker,在生产者接收到broker返回的ack确认之前,都会一直阻塞而不会执行下面的一系列逻辑操作
(2)当生产者接收到broker的ack确认机制之后,会停止阻塞而执行之后的逻辑
异步投递:
(1) 当生产者进行投递一个Message给到broker,投递完成之后,不需要broker返回ack确认,即可被视为成功 然后执行之后的逻辑
区别:
(1)ActiveMQ支持同步和异步两种方式进行将消息发送到broker。
(2)同步和异步的主要区别为:发送Message的效率高低以及是否成功可靠投递
对于同步发送来说:
1.不会丢失消息,消息的可靠性高。因为对于同步向broker发送Message来说,我们需要broker进行返回一个ack表示确认。
2.但是相对异步发送Message效率较低,需要阻塞进行broker确认。
对于异步发送来说:
1.无需broker返回ack确认,因此效率较高。
2.但是有可能丢失Message数据。分析:生产者发送一个Message给broker,对于生产者而言,只要消息发送出去那么就算是
发送成功。但是有可能broker发生网络动荡,那么第一个Message就接收不成功。但是对于生产者而言 第一个Message已经发送成功,
再向broker发送时,是进行发送第二个Message。此时就会导致丢失了第一个Message数据
(3)ActiveMQ默认使用的是异步发送
2.异步投递的实现
(1)对于ActiveMQ中的异步投递,我们使用的是回调方法机制
(2) 生产者进行投递Message给broker,投递出去之后,不阻塞,继续执行之后的逻辑
(3) 当broker成功接收到了Message时,那么就回调onSuccess方法。当broker没有接收到Message,就会回调onException方法
代码实现:
public class ProducerDemo01 { //指明activemq的地址public static final String URL = "tcp://192.168.204.134:61616" ;//指明destination目的地public static final String QUEUE_NAME = "my_queue_1" ; public static void main(String[] args) throws Exception{//1.获得连接工厂ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL) ;//设置异步投递的方式connectionFactory.setUseAsyncSend(true);//2.获得连接对象Connection connection = connectionFactory.createConnection();//3.开启连接connection.start();//4.从连接对象中获得一个Session会话,该Session是MQ与消息生产者之间开启的会话Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE) ;//5.创建队列对象Queue queue = session.createQueue(QUEUE_NAME) ;// 6.创建ActiveMQMessageProducer对象ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue) ;for (int i = 0;i<3;i++) {TextMessage message = session.createTextMessage("hello" + i) ;producer.send(message, new AsyncCallback() {//生产者发送Message,broker确认成功之后回调的方法@Overridepublic void onSuccess() {System.out.println("发送成功");}//生产者发送Message,broker确认失败之后回调的方法@Overridepublic void onException(JMSException e) {System.out.println("发送失败");}});}//9.关闭连接connection.close();} }
3.延时投递
演示:
1.找到conf文件夹下的activemq.xml文件 并且进入修改
2.修改保存之后,进行重启activemq服务器
3.
4.消息的重试机制
重试机制流程分析:
(1) 生产者投递一个Message到broker所在的队列queue或topic中
(2)消费者进行消费Message的时候,不进行开启事务,但是一直不进行手动commit提交。所以一条Message会一直存留在broker对应的队列当中从而重复被进行消费读取。
(3) 但是消息也具有重试机制,重试机制默认的阙值为6,也就是最多被重试消费6次,也就是一条Message最多被消费7次。
(4) 当被消费满7次,也就是达到阈值6时,但是还没有被打包为事务进行手动提交时,消费者就会发送一个"poison ack"给broker,则
Message就会变成一个毒消息。该Message会被从broker对应的队列拿出放到死信队列中去。
(5) 等到消费者还想继续消费这个Message第8次时,发现broker的队列中找不到该Message。
代码演示:
1.如果什么也不设置,默认重试的阙值为6
2.重新进行设置阙值
5.死信队列
死信队列不是说访问不了的队列,反而可以结合这个队列进行开展一些业务。
eg:订单超过支付时间应该被取消
(1) 上游创建订单时,将消息发送到等待支付的队列
(2) 消息的超时时间为30分钟,如果三十分钟还没有支付,则该消息进入到死信队列
(3) 对于死信队列也配有专门的消费者来处理死信队列的消息,处理方案即是把死信队列中的订单状态改为"已取消"
演示开启死信队列的步骤:
1.编辑activemq.xml配置文件
2.生产者:要确定broker对应死信队列的名字:DLQ.*
3.
6.幂等性消费
(1) 当在一些业务场景出现非幂等性情况
(2) 我们进行用户的注册,由于出现了网络动荡的问题,因此用户点击了多次,所以向broker发送了多条关于同一个的用户id的Message
(3)消费者进行消费接收broker中的Message,然后存储到MySql数据库中。但是同一个用户 相同的id怎么可以存储多份呢?
(4) 为了解决这个问题,保证幂等性,有两种方法:1.mysql中进行插入业务id作为主键,主键是唯一的,一次只可以插入一条。2.使用
redis或zookeeper的分布式锁。
(5) 我们通常使用的是redis的分布式锁,因为为了尽可能避免影响Mysql数据库的性能。
基于redis的分布式锁的实现原理:每当一个id进入之后,我们都会进行上一把以id号为唯一标识的id锁。当相同id的记录再一次过来时,
由于已经上了该id对应的锁,所以就不可以再允许通过该id对应的记录了。所以就不会再把该id对应的记录存储到数据库中了。
本文标签: ActiveMQ(二)
版权声明:本文标题:ActiveMQ(二) 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1687329461h90209.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论