MetaQ
发布消息
- 日常环境不需要申请即可发送,Topic与Producer Group请保证唯一即可,但是如果需要在控制台里管理,则需要在控制台里申请对应的资源。
- 预发环境,发送消息不需要申请,应用可以自由创建Topic。这样做的目的是为了方便应用方更快的了解、测试MetaQ,但是也对运维提出了挑战,我们相信各个应用方都能创建有效Topic。
- 生产环境必须要到 MetaQ Console 申请才能发送,目前已经升级成自动审批,如有问题再联系metaq的开发。
- 对于非常重要的消息,例如订单消息,业务方需要有重发补偿的机制,例如MetaQ服务短暂不可用,此时发往MetaQ的消息将失败,等到MetaQ服务恢复后,业务方可以将之前发送失败的消息重新补偿发送
- 对于Message Size特别大的消息如何处理?例如1M,几百K的消息 不推荐应用发送超过16K的消息,如果消息确实比较大,发送消息客户端有个配置,默认超过4K的消息开始压缩,消息到达订阅方之前会自动解压,压缩过程对用户透明,但是如果压缩过以后消息仍然较大,我们推荐应用对消息进行拆分,这样做的原因如下
- MetaQ通信层没有对大的请求做优化,采用的是典型的RPC方式,不适合大的请求传递,可能会导致网络层的Buffer异常。
- MetaQ的服务器存储是一个典型的LRU CACHE系统,过大的消息会占用较多Cache,对于其他应用Cache命中率产生影响
- MetaQ的磁盘资源通常比较紧张
- MetaQ暂不解决大消息存储问题
- 发送消息时,如果将来需要查询消息,或者定位消息是否被接收,需要设置Message Key属性,例如设置为订单Id,商品Id等
- 发送消息时,如果订阅方有过滤需求,请在消息Tag属性上设置相关值,Tag的名称不需要申请,可自由设置,一条消息只允许设置一个Tag。
- 发送事务消息,出于运维角度考虑,淘宝用户请使用Notify。
订阅消息
- 日常、预发等非生产环境,订阅消息不需要申请。订阅生产环境的消息,必须要到 MetaQ Console 申请才能订阅,系统自动审批。
- MetaQ支持服务器消息过滤,如果订阅某个Topic,但只关心其中一部分消息,可以使用表达式方式过滤。这样可以避免无用的消息传输到客户端,而且降低了应用与MetaQ服务器的负载。过滤表达式中的Message Tag是由发送方自由指定,MetaQ不做任何限制,当然不能传入非法字符,例如空白字符、|| 等
- 非顺序消息消费,耗时时间不做限制,但是应用应该尽可能保证耗时短,这样才能达到高性能,另外消费消息Hang住,会导致消息所在队列的消费动作暂停,直到Hang住的消息消费完。对其他队列不受影响
- 顺序消息消费,耗时时间有限制,要保证每条消息在30s内消费完,超过30s会有潜在的乱序问题。(原因是分布式锁超时问题,但概率极低)
消费方式
- 集群消费,一条消息只会被同一个group里一个消费端消费。不同group之间相互不影响。
- 广播消费,一条消息会被同一个group里每一个消费端消费。
消息重复性
- MetaQ不能保证消息不重复,"Exactly Only Once"这个特性不支持,原因如下:
- 发送消息阶段,会存在分布 式环境下典型的超时问题,即发送消息阶段不能保证消息不重复。
- 订阅消息阶段,由于涉及集群订阅,多个订阅者需要Rebalance方式订阅,在Rebalance短暂不一致情况下,会产生消息重复
- 订阅者意外宕机,消费进度未及时存储,也会产生消息重复
- 消息重复性问题如何解决?
- 应用方收到消息后,可通过Tair、DB等去重
- 应用方可通过主动拉的方式,可保证拉消息绝对不重复,但是分布式协调分配队列问题需要应用来控制
- 消息中间件团队也在思考如何有效去重,又对整个消息系统性能影响最低。
广播消息
- MetaQ支持广播消息,但是广播消息的代价较高,投递比可能在1:100甚至1:1000,对于生产环境订阅广播消息,人工审核环节可能会拒绝,取决于订阅的消息量及消费者集群规模。
- MetaQ的广播消息不支持失败重试,原因如下:
- 对于集群消费的消息支持失败重试,因为失败的维度是一个订阅组集群,而广播消息失败重试维护的则是订阅组集群中的每个订阅者,代价较高。
- MetaQ的广播消息消费进度维护在消费者本地磁盘,每隔5s刷盘一次,如果本地磁盘损坏,消费进度如何恢复?
- 联系MetaQ运维人员,通过运维工具按照时间维度,例如回退一小时,新创建一份消费进度。(此功能开发中)
消息重试
- 非顺序消息消费失败重试,消费失败的消息发回服务器,应用可以指定这条失败消息下次到达Consumer的时间。消费失败重试次数有限制,通常线上为每个订阅组每条失败消息重试5次(每次消息都会定时重试,定时时间随着重试次数递增,此过程应用可干预)。超过重试次数,消息进入死信队列,并向用户报警。
- 消息重试对于服务器代价较高,如果某个应用消息量非常大,且失败率非常高,需要大量重试,则不建议使用MetaQ
- 顺序消息消费失败重试,某个队列正在消费的消息消费失败,会将当前队列挂起(挂起时间应用可通过API设置),其他队列仍然正常消费。
死信队列
消息一旦进入死信队列,则不再向应用投递,MetaQ监控系统会向应用报警 (报警功能开发中)
由于消息一旦进入死信队列,则不能再被订阅,建议应用在最后一次重试消费时,将失败消息保存到DB
消息堆积
MetaQ每台服务器提供大约亿级的消息堆积能力(多个业务方共用),超过堆积阀值,订阅消息吞吐量会下降。
消息实时性
MetaQ采用了长轮询方式从Broker拉消息,实时性同Push方式一致,消息的延迟时间大约几毫秒左右。
RocketMQ的由来
谈起RocketMQ的亮点,那不得不先提一下阿里巴巴消息引擎的演进史。阿里中间件消息引擎发展到今日,前前后后经历了三代演进。
第一代,推模式,数据存储采用关系型数据库。在这种模式下,消息具有很低的延迟特性,并且很容易支持分布式事务。尤其在阿里淘宝这种高频交易场景中,具有非常广泛地应用。典型代表包括Notify、Napoli。
第二代,拉模式,自研的专有消息存储。在日志处理方面能够媲美Kafka的吞吐性能,但考虑到淘宝的应用场景,尤其是其交易链路的高可靠需求,消息引擎并没有一味的追求吞吐,而是将稳定可靠放在首位。因为采用了长连接拉模式,在消息的实时方面丝毫不逊推模式。典型代表MetaQ。
第三代,以拉模式为主,兼有推模式的高性能、低延迟消息引擎RocketMQ,在二代功能特性的基础上,为电商金融领域添加了可靠重试、基于文件存储的分布式事务等特性,并做了大量优化。从2012年开始,经历了历次双11核心交易链路检验。目前已经捐赠给Apache基金会。时至今日,RocketMQ很好的服务了阿里集团大大小小上千个应用,在双11当天,更有不可思议的万亿级消息流转,为集团大中台的稳定发挥了举足轻重的作用。
不难看出,RocketMQ其实是伴随着阿里巴巴整个生态的成长,逐渐衍生出来的高性能、低延迟能够同时满足电商领域和金融领域的极尽苛刻场景的消息中间件。
RocketMQ的技术概览
在我们看来,它最大的创新点在于能够通过精巧的横向、纵向扩展,不断满足与日俱增的海量消息在高吞吐、高可靠、低延迟方面的要求。
目前RocketMQ主要由NameServer、Broker、Producer以及Consumer四部分构成,如下图所示。
所有的集群都具有水平扩展能力,无单点障碍。
NameServer以轻量级的方式提供服务发现和路由功能,
每个NameServer存有全量的路由信息,提供对等的读写服务,支持快速扩缩容。Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,
支持消息推拉模型,具备多副本容错机制(2副本或3副本)、强大的削峰填谷以及上亿级消息堆积能力,同时可严格保证消息的有序性。
除此之外,Broker还提供了同城异地容灾能力,丰富的Metrics统计以及告警机制。这些都是传统消息系统无法比拟的。Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。Consumer也由用户部署,
支持PUSH和PULL两种消费模式,
支持集群消费和广播消息,提供实时的消息订阅机制,满足大多数消费场景。
三项技术发力点
(一)消息的顺序(全局有序)
(二)消息的去重(RocketMQ则希望通过采取二次判重策略,有效降低服务端IO)
(三)分布式的挑战
首先明确分布式系统这个概念:
分布式系统是由一系列分散自治组件通过互联网并行并发协作,从而组成的一个coherent软件系统。
它具备资源共享,并行并发,可靠容错,透明开放等特性。像CAP,BASE,Paxos,事务等一起构成了分布式基础理论。
这里我们再来重温下CAP理论:CAP分别代表一致性(Consistency),可用性(Availability),分区容忍性(Partition tolerance)。
一致性,Eric Brewer(CAP理论提出者)用一个服务要么被执行,要么不被执行来定义(原文:A service that is consistent operates fully or not at all)。
请注意,这里的一致性是有别于数据库ACID属性中的C,
数据库层面的C指的是数据的操作不能破坏数据之间的完整性约束,如外键约束。
在分布式环境中,
可以把C简单理解为多节点看到的是数据单一或者同一副本。可用性,意味着
服务是可用的(原文:the service is available (to operate fully or not as above))。可用性又可以细分为写可用和读可用。
在分布式环境中,往往指的是系统在确定时间内可返回读写操作结果,也即读写均可用。分区容忍性,除了整个网络故障外(如光纤被掘断),
其它故障(如丢包、乱序、抖动、甚至是网络分区节点 crash )都不能导致整个系统无法正确响应(原文:No set of failures less than total network failure is allowed to cause the system to respond incorrectly)。
CAP理论可以看做是探索适合不同应用的一致性与可用性平衡问题。
- 没有分区的情况 :可以同时满足C与A,以及完整的ACID事务支持。可以选择牺牲一定的C,获得更好的性能与扩展性。
- 分区的情况 :选择A(集中关注分区的恢复),需要有分区开始前、进行中、恢复后的处理策略,应用合适的补偿处理机制。像RocketMQ这样的分布式消息引擎,更多的追求AP。再强的系统也一定有容量底线,足够的容量是可用性的有效前提。通常情况下,会通过降级、限流、熔断机制来保障洪峰下的可用性。具体的技术细节可以参看电子书章节[1]
另外,考虑到在金融高频交易典型场景,我们也为RocketMQ设计了CP机制,在满足分布式系统的分区容错性的前提下,牺牲系统可用性来保证数据的一致性。而技术实现上,则基于Zab一致性协议,利用分布式锁和通知机制,以此来保障多副本数据的一致性。
那么,消息中间件性能究竟哪家强?
带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka、RabbitMQ、RocketMQ)做了性能比较。
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
http://www.jianshu.com/p/453c6e7ff81c
************************************************************
从发送模型上来看分为消息系统分为两类:
1、Peer-to-Peer
- 一般基于Pull或者Polling 接收消息。
- 发送到队列中的消息被一个而且仅仅一个接收者锁接收。即使有多个接收者在同一个队列中侦听同一消息。
- 既支持异步“即发即弃”的消息传送方式,也支持同步请求/应答传送方式。
2、发布/订阅
- 发布到一个主题的消息,可被多个订阅者所接收
- 发布/订阅即可基于Push消费数据,也可以基于Pull或者Polling消费数据。
- 解耦能力比P2P模型更强。
kafka同时支持topic的单播和多播。
消息系统实用场景:
- 解耦 各位系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在。
- 冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险。
- 扩展 消息系统是统一的数据接口,各系统可独立扩展。
- 峰值处理能力 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求。
- 可恢复性 系统中部分组件失效并不会影响整个系统,它恢复后仍然可从消息系统中获取并处理数据。
- 异步通信 在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理。
常用消息系统对比:
- RabbitMQ 使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
- Redis Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
- ZeroMQ ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。
- ActiveMQ Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。JMS实现,Peer-to-Peer,支持持久化、 XA事务。
- Kafka/Jafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理。不支持XA分布式事务。Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
- MetaQ/RocketMQ 纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务。