又是好久没有写博客了,虽然可以找出无数个没有写的博客的理由,但是说到底,还是一个字“懒”。今天我终于吃了一颗治疗懒癌的药丸,决定写一篇博客。介绍什么好呢,思来想去,还是介绍下rocketmq吧,毕竟写了30多篇博客,还没有好好写过关于mq的博客呢。本篇博客比较基础,不涉及到源码分析,只是扫盲。
mq有什么用解耦我觉得从某种角度来说,微服务促进了mq的蓬勃发展,本来一个系统有n多个模块,所有模块都强耦合在一起,现在微服务了,一个模块就是一个系统,系统之间肯定需要交互,交互有三种常见的方法,一种是rpc,一种是http,一种就是mq了。
异步原本一个业务分为n步,要一步一步处理,才能把最终的结果返回给用户,现在有了mq,先把最关键的部分处理完毕,然后发送消息到mq,直接返回给用户ok,至于后面的步骤在后台慢慢处理吧,真乃提升用户体验的神器。
削峰某个接口的请求量突然飙升,势必会对应用服务器、数据库服务器造成很大的压力,现在有了mq,来多少请求都不在怕的,后台慢慢处理呗。
rocketmq简介rocketmq是用java编写的,是阿里开源的消息中间件,吸收了kafka很多优点。kafka也是比较热门的消息中间件,不过kafka是用scala编写的,不利于java程序员去阅读源码,也不利于java程序员做一些定制化的开发。接触过kafka的小伙伴都知道,要用好kafka实属不易,相对来说,rocketmq简单多了,而且rocketmq有阿里加持,经历了n次双11的考验,比较适合国内互联网公司,所以国内使用rocketmq的公司很多。
rocketmq四大组件图片来自gitee.com/mirrors/roc…
可以看到rocketmq主要有四个组件:
nameserver无状态服务,注册中心,可集群部署,但是nameserver节点之间没有任何数据交互。borker会以定时把topic路由信息上报给所有的nameserver。producer、consumer会随机选择一个nameserver定时topic更新路由信息。topic路由信息在nameserver集群中采用最终一致性。保证ap。borkerrocketmq的服务端,用于存储消息、分发消息。borker会定时把自身拥有的所有的topic路由信息上报给nameserver。borker有两个角色:master、follower,master承担读(消费消息)写(生产消息)操作,如果master比较忙,或者不可用,follower可以承担读操作。borkerid=0,代表是matser,borkerid!=0,代表是follower,需要注意的有两点:其一,目前为止,borkerid=1的follower才可以承担读操作;其二,只有较高版本的rocketmq才支持当master节点挂掉,follower自动升级到master。producer生产者,每隔一定时间向nameserver发起topic的路由信息查询。
consumer消费者,每隔一定时间向nameserver发起topic的路由信息查询。
为什么注册中心不选用zookeeper其实,在低版本的rocketmq中,确实是选用zookeeper作为注册中心的,但是后面改成了现在的nameserver,猜想主要原因是:
rocketmq已经是一个中间件了,不想再依赖其他中间件。zookeeper比较重,有很多功能rocketmq是用不到的,不如写一个轻量级的注册中心。zookeeper是cp,一旦触发领导选举,那么注册中心就不可用了,而rocketmq的注册中心,不需要强一致性,只要保证最终一致性。rocketmq消息领域模型message传输的消息。消息必须有topic。消息可以有多个tag和多个key,可以看做消息的附加属性。topic一类消息的集合。每个消息必须有一个topic。消息的第一级类型。tag一个消息除了有topic之外,还可以有tag,用来细分同一个topic下的不同种类的消息。tag不是必须的。消息的第二级类型。group分为producergroup,consumergroup,我们更多的是关注consumergroup,consumergroup包含多个consumer。
在集群消费模式下,一个consumergroup下的consumer共同消费一个topic,且每个consumer会被分配到n个队列,但是一个队列只会被一个consumer消费,不同的consumergroup可以消费同一个topic,一条消息会被订阅此topic的所有consumergroup消费。
queue一个topic默认包含四个queue。在集群消费模式下,同一个consumergroup中的consumer可以消费多个queue的消息,但是一个queue只能被一个consumer消费。queue中的消息是有序的。分为读queue和写queue,一般来说,读queue的数量和写queue的数量是一致的,否则很容易出问题。消费模式消费模式有两种:clustering(集群消费)和broadcasting(广播消费)。
和其他mq不同,其他mq是在发送消息的时候,指定是集群消费还是广播消费,rocketmq是在消费者端设置是集群消费还是广播消费。
clustering(集群消费)默认情况下是集群消费模式,该模式下,consumergroup所有的consumer共同消费一个topic的消息,每个consumer负责消费n个队列的消息(n也可能为1,甚至是0,没有分配到队列),但是一个队列只会被一个consumer消费。如果某个consumer挂掉,consumergroup下的其他consumer会接替挂掉的consumer继续消费。
集群消费模式下,消费进度维护在borker端,存储路径为${rocket_home}/store/config/ consumeroffset.json,如下图所示:使用topicname@consumergroupname为key,消费进度为value,value的形式是queueid:offset ,说明如果有多个consumergroup,每个consumergroup的消费进度是不同的,需要分开来存储。
broadcasting(广播消费)广播消费消息会发给consumergroup中所有的consumer。
广播消费模式下,消费进度维护在consumer端。
消费队列负载算法与重平衡机制消费队列负载算法我们知道了在集群消费模式下,consumergroup下所有的consumer共同消费一个topic的消息,每个consumer负责消费n个队列的消息,那么具体是如何分配的呢?这就涉及到消费队列负载算法了。
rocketmq提供了众多的消费队列负载算法,其中最常用的是两种算法,即allocatemessagequeueaveragely、allocatemessagequeueaveragelybycircle。下面我们来看下这两个算法的区别。
假设,现在一个topic有16个队列,用q0~q15表示,有3个consumer,用c0-c2表示。
用allocatemessagequeueaveragely消费队列负载算法的结果如下:
c0:q0 q1 q2 q3 q4 q5c1:q6 q7 q8 q9 q10c2:q11 q12 q13 q14 q15用allocatemessagequeueaveragelybycircle消费队列负载算法的结果如下:
c0:q0 q3 q6 q9 q12 q15c1:q1 q4 q7 q10 q13c2:q2 q5 q8 q11 q14consumergroup下所有的consumer共同消费一个topic的消息,每个consumer负责消费n个队列的消息,但是一个队列不能同时被n个consumer消费,这意味着什么?
聪明的你一定可以想到,如果一个topic只有4个队列,而有5个consumer,那么有一个consumer将不能分配到任何队列,所以在rocketmq中,topic下队列的个数直接决定了consumer的最大个数,也就说明,不能光靠增加consumer来提高消费速度。
重平衡虽然建议在创建topic的时候,就应该充分考虑队列的个数,但是实际情况往往是不尽人意的,哪怕队列数没有发生改变,consumer的数量也一定会发生改变,比如consumer的上下线,比如某个consumer挂了,比如新增了consumer。队列的扩容、缩容,consumer的扩容、缩容都会导致重平衡,也就是为consumer重新分配消费的队列。
在rocketmq中,consumer会定时查询topic的队列的个数,consumer的个数,如果发生了改变,就会触发重平衡。
重平衡是rocketmq内部实现的,程序员无需关心。
pull or push?一般来说,mq有两种方法获取消息:
pull:consumer主动拉取消息,好处是consumer可以控制拉取消息的频率,条数,consumer知道自身的消费能力,所以在consumer端不容易造成消息堆积,但是实时性不是太好,效率相对较低。push:broker主动发送消息,好处是实时性、效率比较高,但是broker无法知道consumer端的消费能力,如果发给consumer的消息过多,会造成consumer端的消息堆积;如果发给consumer的数据太少,又会造成consumer端的空闲。不管是pull,还是push,consumer总会与broker产生交互,交互的方式一般有短连接、长连接、轮询三种方式。
看起来,rocketmq支持既支持pull,也支持push,但是实际上push也是用pull实现的,那么consumer是怎么与broker产生交互的呢?
这就是rocketmq设计的巧妙的地方了,既不是短连接,也不是长连接,也不是轮询,而是采用的长轮询。
长轮询consumer发起拉取消息的请求,分为两种情况:
有消息:consumer拿到消息后,连接断开。没有消息:borker hold(保持)住连接一定时间,每隔5秒,检查下是否有消息,如果有消息,给consumer,连接断开。事务消息rocketmq支持事务消息,producer把事务消息发送给broker后,broker会把消息存储在系统topic:rmq_sys_trans_half_topic,这样consumer就无法消费到这条消息了。
broker会有一个定时任务,消费rmq_sys_trans_half_topic的消息,向producer发起回查,回查的状态有三种:提交、回滚、未知。
如果回查的状态是提交,回滚,会触发消息的提交和回滚;如果是未知,会等待下一次回查,rocketmq可以设置一条消息的回查间隔与回查次数,超过一定的回查次数,消息会自动回滚。延迟消息延迟消息是指息发到broker后,不能立刻被consumer消费,需要等待一定的时间才可以被消费到,rocketmq只支持特定的延迟时间:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
消费形式rocketmq支持两种消费形式:并发消费、顺序消费。如果是顺序消费,需要保证排序的消息在同一个队列。如何选择队列发送呢,rocketmq发送消息的方法有好几个重载,其中有一个重载方法支持队列的选择。
同步刷盘、异步刷盘producer把消息发送到borker中,borker是需要把消息持久化的,rocketmq支持两种持久化策略:
同步刷盘:borker把消息持久后才返回ack给producer,好处是消息可靠性高,但是效率较慢。异步刷盘:broker把消息写入到pagecache中,就返回ack给producer。好处是效率极高,但是如果服务器挂了,消息可能会丢失,如果只是rocketmq服务挂了,不会造成消息丢失。同步复制、异步复制为了mq的可靠性、可用性,在生产环境,一般会部署follower节点,follower节点会复制master的数据,rocketmq支持两种持复制策略:
同步复制:master、follower都把消息写入成功,才返回ack给producer,可靠性较高,但是效率较慢。异步复制:只要master写入成功,就返回ack给producer,效率较高,但是可能会丢失消息。写入是写入pagecache,还是写入硬盘,要看follower broker的配置。
再谈谈producerrocketmq提供了三种发送消息的方法:
oneway:fire and forget,单向消息,指消息发送出去后,就不管了,这个方法是没有返回值的。同步:消息发送出去后,同步等待borker的响应。异步:消息发送出去后,立即返回,收到boker的响应后,会执行函调方法。在实际开发中,一般选用同步方法,如果要提高rocketmq的性能,一般都是修改borker端的参数,特别是刷盘策略和复制策略。
发送消息重试消息发送时,如果使用了messagequeueselector,那消息发送的重试机制将会失效。
发送消息响应可能为以下四种:
public enum sendstatus { send_ok, flush_disk_timeout, flush_slave_timeout, slave_not_available,}复制代码
除了第一种,其他情况都是有问题的,为了保证消息不丢失,需要设置producer参数:retryanotherbrokerwhennotstoreok为true。
故障规避机制如果消息发送失败了,重试的时候,还是发送给这个borker,那么大概率发送还是失败的,rocktemq设计精巧之处在于,重试的时候,会自动避开这个borker,而选择其他borker,但是目前为止,异步发送没有那么智能,只会在一个borker上重试,所以强烈建议选择同步发送方式。
rocketmq提供了两种故障规避机制。用参数sendlatencyfaultenable来控制。
false:默认值,只有在重试的时候,才会启用故障规避机制,比如发送消息给borkera失败了,重试的时候,会选择borkerb,但是下次发送消息,还是会选择发送给borkera。true:开启延迟退避机制,一旦消息发送给borkera失败,就会悲观的认为在一段时间内,borkera不可用,在将来的一段时间内,不会再向borkera发送消息。延迟退避机制看起来很好用,但是一般来说borker端繁忙,导致borker不可用或者网络不可用只是一瞬间的事情,马上就可以恢复,如果开启了延迟退避机制,本来可用的borker在一段时间内却被规避了,其他borker更加繁忙,那可能情况更糟糕。
再谈谈consumerconsumer线程注意事项consumer有两个参数,可以消费的并行度,即consumethreadmin、consumethreadmax,看起来给人的感觉是,如果consumer端堆积消息比较少,消费线程数为consumethreadmin;如果consumer端堆积消息比较多,就自动开启新的线程来消费,直到消费线程数为consumethreadmax。但是并不是这样,consumer内部持有一个线程池,选用的是无界队列,也就是consumethreadmax参数是无效的,所以在实际开发中,consumethreadmin、consumethreadmax往往设置成一样。
consumefromwhere如果查询不到消费进度的时候,consumer从哪里开始消费,rocketmq支持从最新消息、最早消息、指定时间戳这三种方式进行消费。
消费消息重试rocketmq会为每个consumergroup都设置一个topic名称为%retry%+consumergroup的重试队列,用来保存需要给consumergroup重试的消息,但是重试需要一定的延时时间,rocketmq对于重试消息的处理是先保存至topic名称为schedule_topic_xxxx的延迟队列中,后台定时任务按照对应的时间进行delay后重新保存至%retry%+consumergroup的重试队列中。
消息堆积、消费能力不够,怎么办提高消费进度,这是最好的办法。增加队列,增加consumer。原先的consumer作为搬砖工,根据一定的规则把消息“搬”到多个新的topic,再开几个consumergroup去消费不同的topic。新开一个consumergroup去消费,也就是两个consumergroup同时消费一个topic,但是需要注意offset的判断,比如一个consumergroup消费offset为奇数的消息,一个consumergroup消费offset为偶数的消息。本来以为写扫盲文,应该会写的很顺,但是还是想多了,因为是扫盲文,面向的是没有怎么接触过rocketmq的小伙伴,但是rocketmq有没有那么简单,不可能用一篇博客,就让没有怎么接触过rocketmq的小伙伴顺利入门,所以在写博客的时候,一直在想,这个东西重要吗,需要仔细描述吗;这个东西可以忽视,可以不介绍吗 等等,大家可以看到本文基本都是在介绍各种概念,几乎没有涉及到api的层面,因为一旦涉及到api,那么估计写两个星期也写不完。
end相关免费学习推荐:java基础教程
以上就是终于来了...rocketmq扫盲篇的详细内容。