http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/
每个日志文件都是“log entries”序列,每一个
message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes
这个“log entries”并非由一个文件构成,而是分成多个segment,每个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每个segment下包含的
因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer在不同的Consumer Group即可。下图展示了Kafka在LinkedIn的一种简化部署模型。
目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。每个Consumer被创建时会触发Consumer Group的Rebalance,具体启动流程如下:
http://calvin1978.blogcn.com/articles/kafkaio.html
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输
- 同时支持离线数据处理和实时数据处理
- 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束 - 冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。 - 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 - 灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 - 可恢复性
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。 - 送达保证
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个”只送达一次”保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
- 顺序保证
在大多使用场景下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。 - 缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行–写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。 - 理解数据流
在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息队列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。 - 异步通信
很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。
- Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker - Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处) - Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件 - Producer
负责发布消息到Kafka broker - Consumer
消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
Push vs. Pull
作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式。事实上,push模式和pull模式各有优劣。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。
每个日志文件都是“log entries”序列,每一个
log entry
包含一个4字节整型数(值为N),其后跟N个字节的消息体。每条消息都有一个当前partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes
这个“log entries”并非由一个文件构成,而是分成多个segment,每个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每个segment下包含的
log entry
的offset范围,如下图所示。因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在
$KAFKA_HOME/config/server.properties
中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。1 2 3 4 | # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=3 |
在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现
kafka.producer.Partitioner
接口。本例中如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置
$KAFKA_HOME/config/server.properties
,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。
这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
上文说明了一个parition的replication过程,然尔Kafka集群需要管理成百上千个partition,Kafka通过round-robin的方式来平衡partition从而避免大量partition集中在了少数几个节点上。同时Kafka也需要平衡leader的分布,尽可能的让所有partition的leader均匀分布在不同broker上。另一方面,优化leadership election的过程也是很重要的,毕竟这段时间相应的partition处于不可用状态。一种简单的实现是暂停宕机的broker上的所有partition,并为之选举leader。实际上,Kafka选举一个broker作为controller,这个controller通过watch Zookeeper检测所有的broker failure,并负责为所有受影响的parition选举leader,再将相应的leader调整命令发送至受影响的broker,过程如下图所示。
这样做的好处是,可以批量的通知leadership的变化,从而使得选举过程成本更低,尤其对大量的partition而言。如果controller失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令。
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals
这样做的好处是,可以批量的通知leadership的变化,从而使得选举过程成本更低,尤其对大量的partition而言。如果controller失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令。
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals
In a Kafka cluster, one of the brokers serves as the controller, which is responsible for managing the states of partitions and replicas and for performing administrative tasks like reassigning partitions. The following describes the states of partitions and replicas, and the kind of operations going through the controller.
每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)
有这么几种可能的delivery guarantee:
At most once
消息可能会丢,但绝不会重复传输At least one
消息绝不会丢,但可能会重复传输Exactly once
每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
Kafka的delivery guarantee semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。这一点有点像向一个自动生成primary key的数据库表中插入数据。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次,这样就做到了Exactly one
。截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从producer和broker是确保了At least once
,但可通过设置producer异步发送实现At most once
)。
- 接下来讨论的是消息从broker到consumer的delivery guarantee semantic。(仅针对Kafka consumer high level API)。consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了
Exactly once
。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。 - 读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于
At most once
- 读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于
At least once
。在很多情况使用场景下,消息都有一个primary key,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once
。(人个感觉这种说法有些牵强,毕竟它不是Kafka本身提供的机制,而且primary key本身不保证操作的幂等性。而且实际上我们说delivery guarantee semantic是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们的系统不应该把处理过程的特性–如是否幂等性,当成Kafka本身的feature) - 如果一定要做到
Exactly once
,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once
。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
总之,Kafka默认保证At least once
,并且允许通过设置producer异步提交来实现At most once
。而Exactly once
要求与目标存储系统协作,幸运的是Kafka提供的offset可以使用这种方式非常直接非常容易。
很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播)。因此,Kafka Hight Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义。
Consumer Group
Consumer Group
High Level Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中(Kafka从0.8.2版本开始同时支持将offset存于Zookeeper中与将offset存于专用的Kafka Topic中)。这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,而非某个Topic的。每一个High Level Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group。
很多传统的Message Queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证Queue的长度比较短,提高效率。而如上文所述,Kafka并不删除已消费的消息,为了实现传统Message Queue消息只被消费一次的语义,Kafka保证每条消息在同一个Consumer Group里只会被某一个Consumer消费。与传统Message Queue不同的是,Kafka还允许不同Consumer Group同时消费同一条消息,这一特性可以为消息的多元化处理提供支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer在不同的Consumer Group即可。下图展示了Kafka在LinkedIn的一种简化部署模型。
High Level Consumer Rebalance
(本节所讲述Rebalance相关内容均基于Kafka High Level Consumer)
Kafka保证同一Consumer Group中只有一个Consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个Consumer实例只会消费某一个或多个特定Partition的数据,而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以Partition为单位分配的,而非以每一条消息作为分配单元。这样设计的劣势是无法保证同一个Consumer Group里的Consumer均匀消费数据,优势是每个Consumer不用都跟大量的Broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个Partition里的数据是有序的,这种设计可以保证每个Partition里的数据可以被有序消费。
如果某Consumer Group中Consumer(每个Consumer只创建1个MessageStream)数量少于Partition数量,则至少有一个Consumer会消费多个Partition的数据,如果Consumer的数量与Partition数量相同,则正好一个Consumer消费一个Partition的数据。而如果Consumer的数量多于Partition的数量时,会有部分Consumer无法消费该Topic下任何一条消息。
Kafka保证同一Consumer Group中只有一个Consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个Consumer实例只会消费某一个或多个特定Partition的数据,而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以Partition为单位分配的,而非以每一条消息作为分配单元。这样设计的劣势是无法保证同一个Consumer Group里的Consumer均匀消费数据,优势是每个Consumer不用都跟大量的Broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个Partition里的数据是有序的,这种设计可以保证每个Partition里的数据可以被有序消费。
如果某Consumer Group中Consumer(每个Consumer只创建1个MessageStream)数量少于Partition数量,则至少有一个Consumer会消费多个Partition的数据,如果Consumer的数量与Partition数量相同,则正好一个Consumer消费一个Partition的数据。而如果Consumer的数量多于Partition的数量时,会有部分Consumer无法消费该Topic下任何一条消息。
Consumer Rebalance的算法如下:
- 将目标Topic下的所有Partirtion排序,存于
- 对某Consumer Group下所有Consumer排序,存,第个Consumer记为
- ,向上取整
- 解除对原来分配的Partition的消费权(i从0开始)
- 将第到个Partition分配给
目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。每个Consumer被创建时会触发Consumer Group的Rebalance,具体启动流程如下:
- High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为
/consumers/[consumer group]/ids/[consumer id]
- 在
/consumers/[consumer group]/ids
上注册Watch - 在
/brokers/ids
上注册Watch - 如果Consumer通过Topic Filter创建消息流,则它会同时在
/brokers/topics
上也创建Watch - 强制自己在其Consumer Group内启动Rebalance流程
在这种策略下,每一个Consumer或者Broker的增加或者减少都会触发Consumer Rebalance。因为每个Consumer只负责调整自己所消费的Partition,为了保证整个Consumer Group的一致性,当一个Consumer触发了Rebalance时,该Consumer Group内的其它所有其它Consumer也应该同时触发Rebalance。
该方式有如下缺陷:
- Herd effect
任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance - Split Brain
每个Consumer分别单独通过Zookeeper判断哪些Broker和Consumer 宕机了,那么不同Consumer在同一时刻从Zookeeper“看”到的View就可能不一样,这是由Zookeeper的特性决定的,这就会造成不正确的Reblance尝试。 - 调整结果不可控
所有的Consumer都并不知道其它Consumer的Rebalance是否成功,这可能会导致Kafka工作在一个不正确的状态。
使用Low Level Consumer (Simple Consumer)的主要原因是,用户希望比Consumer Group更好的控制数据的消费。比如:
- 同一条消息读多次
- 只读取某个Topic的部分Partition
- 管理事务,从而确保每条消息被处理一次,且仅被处理一次
与Consumer Group相比,Low Level Consumer要求用户做大量的额外工作。
- 必须在应用程序中跟踪offset,从而确定下一条应该消费哪条消息
- 应用程序需要通过程序获知每个Partition的Leader是谁
- 必须处理Leader的变化
使用Low Level Consumer的一般流程如下
- 查找到一个“活着”的Broker,并且找出每个Partition的Leader
- 找出每个Partition的Follower
- 定义好请求,该请求应该能描述应用程序需要哪些数据
- Fetch数据
- 识别Leader的变化,并对之作出必要的响应
Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务。若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失。而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对于Failover机制的需求非常高。因此,Kafka从0.8开始提供High Availability机制。本文从Data Replication和Leader Election两方面介绍了Kafka的HA机制。
由此可见,在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言Replication机制的引入非常重要。
引入Replication之后,同一个Partition可能会有多个Replica,而这时需要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica作为Follower从Leader中复制数据。
因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。
因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。
如何将所有Replica均匀分布到整个集群
为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
Kafka分配Replica的算法如下:
Kafka分配Replica的算法如下:
- 将所有Broker(假设共n个Broker)和待分配的Partition排序
- 将第i个Partition分配到第(i mod n)个Broker上
- 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
Propagate消息
Producer在发布消息到某个Partition时,先通过Zookeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。
Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。
Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。
ACK前需要保证有多少个备份
和大部分分布式系统一样,Kafka处理失败需要明确定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的Heartbeat机制来实现)。二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
需要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过
replica.lag.max.messages
配置,其默认值是4000)或者Follower超过一定时间(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms
来配置,其默认值是10000)未向Leader发送fetch请求。。Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
需要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过
request.required.acks
来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。
一种非常常用的Leader Election的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个Replica(包含Leader和Follower),那在commit之前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。因为在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几个Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的Replica,如果要容忍2个Follower挂掉,必须要有5个以上的Replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在Zookeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种方式。
实际上,Leader Election算法非常多,比如Zookeeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的Leader Election算法更像微软的PacificA算法。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。
虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。
实际上,Leader Election算法非常多,比如Zookeeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的Leader Election算法更像微软的PacificA算法。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。
虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。
如何处理所有Replica都不工作
上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
- 等待ISR中的任一个Replica“活”过来,并且选它作为Leader
- 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader
这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
如何选举Leader
最简单最直观的方案是,所有Follower都在Zookeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
但是该方法会有3个问题:
但是该方法会有3个问题:
- split-brain 这是由Zookeeper的特性引起的,虽然Zookeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致
- herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
- Zookeeper负载过重 每个Replica都要为此在Zookeeper上注册一个Watch,当集群规模增加到几千个Partition时Zookeeper负载会过重。
Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
http://calvin1978.blogcn.com/articles/kafkaio.html
卡夫卡说:不要害怕文件系统。
它就那么简简单单地用顺序写的普通文件,借力于Linux内核的Page Cache,不(显式)用内存,胜用内存,完全没有别家那样要同时维护内存中数据、持久化数据的烦恼——只要内存足够,生产者与消费者的速度也没有差上太多,读写便都发生在Page Cache中,完全没有同步的磁盘访问。
整个IO过程,从上到下分成文件系统层(VFS+ ext3)、 Page Cache 层、通用数据块层、 IO调度层、块设备驱动层。 这里借着Apache Kafka的由头,将Page Cache层与IO调度层重温一遍,记一篇针对Linux kernel 2.6的科普文。
1. Page Cache
1.1 读写空中接力
Linux总会把系统中还没被应用使用的内存挪来给Page Cache,在命令行输入free,或者cat /proc/meminfo,"Cached"的部分就是Page Cache。
Page Cache中每个文件是一棵Radix树(又称PAT位树, 一种多叉搜索树),节点由4k大小的Page组成,可以通过文件的偏移量(如0x1110001)快速定位到某个Page。
当写操作发生时,它只是将数据写入Page Cache中,并将该页置上dirty标志。
当读操作发生时,它会首先在Page Cache中查找内容,如果有就直接返回了,没有的话就会从磁盘读取文件再写回Page Cache。
可见,只要生产者与消费者的速度相差不大,消费者会直接读取之前生产者写入Page Cache的数据,大家在内存里完成接力,根本没有磁盘访问。
而比起在内存中维护一份消息数据的传统做法,这既不会重复浪费一倍的内存,Page Cache又不需要GC(可以放心使用60G内存了),而且即使Kafka重启了,Page Cache还依然在。
1.2 后台异步flush的策略
这是大家最需要关心的,因为不能及时flush的话,OS crash(不是应用crash) 可能引起数据丢失,Page Cache瞬间从朋友变魔鬼。
当然,Kafka不怕丢,因为它的持久性是靠replicate保证,重启后会从原来的replicate follower中拉缺失的数据。
在内核3.2之前,由内核线程pdflush负责将有dirty标记的页面,发送给IO调度层。内核会为每个磁盘起一条pdflush线程,每5秒(/proc/sys/vm/dirty_writeback_centisecs)唤醒一次,根据下面三个参数来决定行为:
1. 如果page dirty的时间超过了30秒(/proc/sys/vm/dirty_expire_centisecs,单位是百分之一秒),就会被刷到磁盘,所以OS crash时最多丢30秒左右的数据。
2. 如果dirty page的总大小已经超过了10% (/proc/sys/vm/dirty_background_ratio) 的可用内存 (cat /proc/meminfo里 MemFree+ Cached - Mapped),则会在后台启动pdflush 线程写盘,但不影响当前的write(2)操作。增减这个值是最主要的flush策略调优手段。
3. 如果wrte(2)的速度太快,比pdflush还快,dirty page 迅速涨到 20% (/proc/sys/vm/dirty_ratio) 的总内存 (cat /proc/meminfo里的MemTotal),则此时所有应用的写操作都会被block,各自在自己的时间片里去执行对自己文件的flush操作,因为操作系统认为现在已经来不及写盘了,如果crash会丢太多数据,要让大家都冷静点。这个代价有点大,要尽量避免。在Redis2.8以前,Rewrite AOF就经常导致这个大面积阻塞,现在已经改为Redis每32Mb先主动flush一下了。
详细的文章可以看: The Linux Page Cache and pdflush
1.3 主动flush的方式
对于重要数据,应用需要自己触发flush保证写盘。
1. 系统调用fsync() 和 fdatasync()
fsync(fd)将属于该文件描述符的所有dirty page的写入请求发送给IO调度层。
fsync()总是同时flush文件内容与文件元数据, 而fdatasync()只flush文件内容与后续操作必须的文件元数据。元数据含时间戳,大小等,大小可能是后续操作必须,而时间戳就不是必须的。因为文件的元数据保存在另一个地方,所以fsync()总是触发两次IO,性能要差一点。
2. 打开文件时设置O_SYNC,O_DSYNC标志或O_DIRECT标志
O_SYNC、O_DSYNC标志表示每次write后要等到flush完成才返回,效果等同于write()后紧接一个fsync()或fdatasync(),不过按APUE里的测试,因为OS做了优化,性能会比自己调write() + fsync()好一点,但与只是write()相比就慢很多了。注意此时write()还是先写到page cache的,read可以直接从page cache中获取数据。
O_DIRECT标志表示直接IO,完全跳过Page Cache。不过这也放弃了读文件时的Page Cache,必须每次读取磁盘文件,这是与O_SYNC的区别。而且要求所有IO请求长度,偏移都必须是底层扇区大小的整数倍。所以使用直接IO的时候一定要在应用层做好Cache。
1.4 Page Cache的清理策略
当内存满了,就需要清理Page Cache,或把应用占的内存swap到文件去。有一个swappiness的参数(/proc/sys/vm/swappiness)决定是swap还是清理page cache,值在0到100之间,设为0表示尽量不要用swap,这也是很多优化指南让你做的事情,因为默认值居然是60,Linux认为Page Cache更重要。
Page Cache的清理策略是LRU的升级版。如果简单用LRU,一些新读出来的但可能只用一次的数据会占满了LRU的头端。因此将原来一条LRU队列拆成了两条,一条放新的Page,一条放已经访问过好几次的Page。Page刚访问时放在新LRU队列里,访问几轮了才升级到旧LRU队列(想想JVM Heap的新生代老生代)。清理时就从新LRU队列的尾端开始清理,直到清理出足够的内存。
1.5 预读策略
根据清理策略,Apache Kafka里如果消费者太慢,堆积了几十G的内容,Cache还是会被清理掉的。这时消费者就需要读盘了。
内核这里又有个动态自适应的预读策略,每次读请求会尝试预读更多的内容(反正都是一次读操作)。内核如果发现一个进程一直使用预读数据,就会增加预读窗口的大小(最小16K,最大128K),否则会关掉预读窗口。连续读的文件,明显适合预读。
2. IO调度层
如果所有读写请求都直接发给硬盘,对传统硬盘来说太残忍了。IO调度层主要做两个事情,合并和排序。 合并是将相同和相邻扇区(每个512字节)的操作合并成一个,比如我现在要读扇区1,2,3,那可以合并成一个读扇区1-3的操作。排序就是将所有操作按扇区方向排成一个队列,让磁盘的磁头可以按顺序移动,有效减少了机械硬盘寻址这个最慢最慢的操作。
排序看上去很美,但可能造成严重的不公平,比如某个应用在相邻扇区狂写盘,其他应用就都干等在那了,pdflush还好等等没所谓,读请求都是同步的,耗在那会很惨。
所有又有多种算法来解决这个问题,其中内核2.6的默认算法是CFQ(完全公正排队),把总的排序队列拆分成每个发起读写的进程/线程 自己有一条排序队列,然后以时间片轮转调度每个队列,轮流从每个进程的队列里拿出若干个请求来执行(默认是4)。
在Apache Kafka里,消息的读写都发生在内存中,真正写盘的就是那条pdflush内核线程,因为都是顺序写,即使一台服务器上有多个Partition文件,经过合并和排序后都能获得很好的性能,或者说,Partition文件的个数并不影响性能,不会出现文件多了变成随机读写的情况。
如果是SSD硬盘,没有寻址的花销,排序好像就没必要了,但合并的帮助依然良多,所以还有另一种只合并不排序的NOOP算法可供选择。
题外话
另外,硬盘上还有一块几十M的缓存,硬盘规格上的外部传输速率(总线到缓存)与内部传输速率(缓存到磁盘)的区别就在此......IO调度层以为已经写盘了,其实可能依然没写成,断电的话靠硬盘上的电池或大电容保命......
延伸阅读:Kafka文件存储机制那些事 by 美团技术团队