rocketmq介绍
rocketmq是一个队列模型的消息中间件,具有高性能,高实时,高可靠,分布特点,能够保证严格的消息顺序,提供丰富的消息拉取模式,高效的订阅者水平扩展能力,实时的消息订阅机制,亿级消息堆积能力,而且rocketmq是集团开源产品,所以该中间件对集团内部其他产品依赖较少。
Producer:消息生产者
Consumer:消息消费者
Topic:多个队列的集合
producer向队列采用轮流方式发送消息,consumer从队列中获取消息进行消费。
rocketmq部署结构
物理部署结构
- Name Server:存储producer,consumer,broker的信息,producer,consumer,broker都会与Name server建立长连接,producer,consumer定时从name server获取topic信息,broker定时注册topic信息到Name Server
//Name server保存的信息
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//topic以及topic对应的队列集合
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//所有的broker列表
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//broker各个集群信息
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//当前活跃的broker列表
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//过滤器列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- broker:broker分成一个个集群,每个集群里面有一个master,多个slave
- producer:与broker集群中的master建立长连接,producer产生的消息发送给master,并定时向master发送心跳
- consumer:与broker集群中master与slave建立长连接,定时向master与slave发送心跳,它既可以从master订阅消息,也可以从slave订阅消息
逻辑部署结构
- producer:producer group可以包含多个producer实例,可以是多台机器,也可以是一台机器的多个进程或者一个进程的多个producer对象,一个producer group可以发送多个topic消息。
- consumer group可以包含多个consumer实例,可以是多台机器,也可以是一台机器的多个进程或者一个进程的多个consumer对象,广播消费模式下,consumer group下每个consumer都要消费一遍消息,集群消费模式下,consumer group只有一个consumer消费该消息。
rocketmq存储
数据传输方式
consumer消费消息采用mmap+write的零拷贝方式
- 零拷贝:即数据的传输不需要做任何拷贝动作
- mmap:一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
数据存储结构
- commit log:消息存储的结构,每次存放消息到commit log,会返回一个commit log的位移queneOffSet
- consume queue:存放消息在commit log的位置信息,offset表示消息在commit log中位移,size表示消息大小,tag hashcode 存储消息tag的hashcode。消费者消费消息时,先读取consume queue获取到消息在commit log的位移以及大小,然后再到commit log读取消息
- indexService:IndexService用于创建索引文件集合,让用户可以查询指定某个topic下某个key的消息。slot table存放slotValue,slotValue等于存放topic+key组成的字符串的hashcode%slotNum的值(此处slotNum=500W),index Linked List与slotValue的值一一对应,即一个slotValue对应一个链表
index linked list node数据结构
rocketmy消息持久化策略
消息持久化分成同步刷盘与异步刷盘两种,可自己配置哪种刷盘策略
- 异步刷盘,消息写入到系统pagecache时,就返回结果给用户,同时开启一个线程将消息异步写入硬盘,可能存在消息丢失。
- 同步刷盘,消息写入到系统pagecache,再写入到disk,结果需要等到写入disk成功后才返回给用户
消息过滤
- broker端过滤:consume queue 存放有Message Tag HashCode,如果该hashcode与consumer订阅的tag的hashcode一直,则传输给consumer。
- consumer端过滤:消息传输给consumer后,由于存在两个tag字符串不同,但是hashcode一致,所以在consumer端需要再进行tag字符串比较一次,如果符合则处理消息。
顺序消费原理
取消息的某个字段,例如订单id,字段取模得到队列号,保证该类消息都投递到一个队列中,队列数据都是先进先出的,所以有顺序性
rocketmq负载均衡
发送消息负载均衡
类似遍历循环链表,依次遍历队列发送消息,当遍历一遍队列后,再从头开始遍历队列发送消息。
订阅消息负载均衡
与发送消息负载均衡类似,如图有五个队列,两个消费者,把队列以轮询的方式分给每个消费者,例如队列1给consumer1,队列2给consumer2,然后再从头开始,把队列3分给consumer1,队列4分给consumer2,再从头开始,把队列5分给consumer1,一直轮询直到队列分完。这样的处理会出现,当队列数量小于consumer数量,前面的consumer可以得到队列进行消费,但是后面当consumer由于队列分完了,没有队列可以消费。
消息堆积解决方法
当存在consumer访问堆积消息在磁盘数据当情况,master会让consumer从slave拉取消息,这样producer发送消息给master,consumer从slave拉取消息,这样就不会受消息堆积的影响