第1章 Kafka入门

在0.10版本之前,Kafka仅仅作为一个消息系统,主要用来解决应用解耦、异步消息、流量削峰等问题。不过在0.10版本之后,Kafka提供了连接器与流处理的能力,它也从分布式的消息系统逐渐成为一个流式的数据平台。本章首先介绍Kafka流式数据平台的基本组成,然后分析它的一些架构设计和基本概念,最后通过几个示例快速理解它的一些重要特性。

1.1 Kafka流式数据平台

作为一个流式数据平台,最重要的是要具备下面3个特点。

 类似消息系统,提供事件流的发布和订阅,即具备数据注入功能。

 存储事件流数据的节点具有故障容错的特点,即具备数据存储功能。

 能够对实时的事件流进行流式地处理和分析,即具备流处理功能。

下面我们分析作为一个流式数据平台,Kafka是如何实现并组合上面的3个功能特点的。

 消息系统:如图1-1所示,消息系统(也叫作消息队列)主要有两种消息模型:队列和发布订阅。Kafka使用消费组(consumer group)统一了上面两种消息模型。Kafka使用队列模型时,它可以将处理工作平均分配给消费组中的消费者成员;使用发布订阅模式时,它可以将消息广播给多个消费组。采用多个消费组结合多个消费者,既可以线性扩展消息的处理能力,也允许消息被多个消费组订阅。

 队列模式(也叫作点对点模式)。多个消费者读取消息队列,每条消息只发送给一个消费者。

 发布订阅模式(pub/sub)。多个消费者订阅主题,主题的每条记录会发布给所有的消费者。

图像说明文字

 存储系统:任何消息队列要做到“发布消息”和“消费消息”的解耦合,实际上都要扮演一个存储系统的角色,负责保存还没有被消费的消息。否则,如果消息只是在内存中,一旦机器宕机或进程重启,内存中的消息就会全部丢失。Kafka也不例外,数据写入到Kafka集群的服务器节点时,还会复制多份来保证出现故障时仍能可用。为了保证消息的可靠存储,Kafka还允许生产者的生产请求在收到应答结果之前,阻塞式地等待一条消息,直到它完全地复制到多个节点上,才认为这条消息写入成功。

 流处理系统:流式数据平台仅仅有消息的读取和写入、存储消息流是不够的,还需要有实时的流式数据处理能力。对于简单的处理,可以直接使用Kafka的生产者和消费者API来完成;但对于复杂的业务逻辑处理,直接操作原始的API需要做的工作非常多。Kafka流处理(Kafka Streams)为开发者提供了完整的流处理API,比如流的聚合、连接、各种转换操作。同时,Kafka流处理框架内部解决很多流处理应用程序都会面临的问题:处理乱序或迟来的数据、重新处理输入数据、窗口和状态操作等。

 将消息系统、存储存储、流处理系统组合在一起:传统消息系统的流处理通常只会处理订阅动作发生之后才到达的新消息,无法处理订阅之前的历史数据。分布式文件存储系统一般存储静态的历史数据,对历史数据的处理一般采用批处理的方式。现有的开源系统很难将这些系统无缝地整合起来,Kafka则将消息系统、存储系统、流处理系统都组合在一起,构成了以Kafka为中心的流式数据处理平台。它既能处理最新的实时数据,也能处理过去的历史数据。Kafka作为流式数据平台的核心组件,主要包括下面4种核心的API,如图1-2所示。

 生产者(producer)应用程序发布事件流到Kafka的一个或多个主题。

 消费者(consumer)应用程序订阅Kafka的一个或多个主题,并处理事件流。

 连接器(connector)将Kafka主题和已有数据源进行连接,数据可以互相导入和导出。

 流处理(processor)从Kafka主题消费输入流,经过处理后,产生输出流到输出主题。

图像说明文字

建立以Kafka为核心的流式数据管道,不仅要保证低延迟的消息处理,还需要保证数据存储的可靠性。另外,在和离线系统集成时,将Kafka的数据加载到批处理系统时,要保证数据不遗漏;Kafka集群的某些节点在停机维护时,要保证集群可用。上面从整体上分析了Kafka如何作为一个流式的数据处理平台,下面开始分析Kafka的架构实现,这里先从基本概念说起,然后分析它的一些重要实现细节。

1.2 Kafka的基本概念

下面我们会从3个角度分析Kafka的几个基本概念,并尝试解决下面3个问题。

 Kafka的主题与分区内部是如何存储的,它有什么特点?

 与传统的消息系统相比,Kafka的消费模型有什么优点?

 Kafka如何实现分布式的数据存储与数据读取?

1.2.1 分区模型

Kafka集群由多个消息代理服务器(broker server)组成,发布到Kafka集群的每条消息都有一个类别,用主题(topic)来表示。通常,不同应用产生不同类型的数据,可以设置不同的主题。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生产者写入的新消息。

Kafka集群为每个主题维护了分布式的分区(partition)日志文件,物理意义上可以把主题看作分区的日志文件(partitioned log)。每个分区都是一个有序的、不可变的记录序列,新的消息会不断追加到提交日志(commit log)。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,叫作偏移量(offset),这个偏移量能够唯一地定位当前分区中的每一条消息。

如图1-3(左)所示,主题有3个分区,每个分区的偏移量都从0开始,不同分区之间的偏移量都是独立的,不会互相影响。右图中,发布到Kafka主题的每条消息包括键值和时间戳。消息到达服务端的指定分区后,都会分配到一个自增的偏移量。原始的消息内容和分配的偏移量以及其他一些元数据信息最后都会存储到分区日志文件中。消息的键也可以不用设置,这种情况下消息会均衡地分布到不同的分区。

图像说明文字

传统消息系统在服务端保持消息的顺序,如果有多个消费者消费同一个消息队列,服务端会以消息存储的顺序依次发送给消费者。但由于消息是异步发送给消费者的,消息到达消费者的顺序可能是无序的,这就意味着在并行消费时,传统消息系统无法很好地保证消息被顺序处理。虽然我们可以设置一个专用的消费者只消费一个队列,以此来解决消息顺序的问题,但是这就使得消费处理无法真正执行。

Kafka比传统消息系统有更强的顺序性保证,它使用主题的分区作为消息处理的并行单元。Kafka以分区作为最小的粒度,将每个分区分配给消费组中不同的而且是唯一的消费者,并确保一个分区只属于一个消费者,即这个消费者就是这个分区的唯一读取线程。那么,只要分区的消息是有序的,消费者处理的消息顺序就有保证。每个主题有多个分区,不同的消费者处理不同的分区,所以Kafka不仅保证了消息的有序性,也做到了消费者的负载均衡。

1.2.2 消费模型

消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)。基于推送模型的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式无法很好地保证消息的处理语义。比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为已消费了,但实际上这条消息并没有被实际处理)。如果要保证消息的处理语义,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要在消息代理中记录所有消息的消费状态,这种做法也是不可取的。

Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。如图1-4所示,有两个消费者(不同消费组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费者进度是6。消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前时刻开始消费。

图像说明文字

在一些消息系统中,消息代理会在消息被消费之后立即删除消息。如果有不同类型的消费者订阅同一个主题,消息代理可能需要冗余地存储同一条消息;或者等所有消费者都消费完才删除,这就需要消息代理跟踪每个消费者的消费状态,这种设计很大程度上限制了消息系统的整体吞吐量和处理延迟。Kafka的做法是生产者发布的所有消息会一直保存在Kafka集群中,不管消息有没有被消费。用户可以通过设置保留时间来清理过期的数据,比如,设置保留策略为两天。那么,在消息发布之后,它可以被不同的消费者消费,在两天之后,过期的消息就会自动清理掉。

1.2.3 分布式模型

Kafka每个主题的多个分区日志分布式地存储在Kafka集群上,同时为了故障容错,每个分区都会以副本的方式复制到多个消息代理节点上。其中一个节点会作为主副本(Leader),其他节点作为备份副本(Follower,也叫作从副本)。主副本会负责所有的客户端读写操作,备份副本仅仅从主副本同步数据。当主副本出现故障时,备份副本中的一个副本会被选择为新的主副本。因为每个分区的副本中只有主副本接受读写,所以每个服务端都会作为某些分区的主副本,以及另外一些分区的备份副本,这样Kafka集群的所有服务端整体上对客户端是负载均衡的。

Kafka的生产者和消费者相对于服务端而言都是客户端,生产者客户端发布消息到服务端的指定主题,会指定消息所属的分区。生产者发布消息时根据消息是否有键,采用不同的分区策略。消息没有键时,通过轮询方式进行客户端负载均衡;消息有键时,根据分区语义确保相同键的消息总是发送到同一个分区。

Kafka的消费者通过订阅主题来消费消息,并且每个消费者都会设置一个消费组名称。因为生产者发布到主题的每一条消息都只会发送给消费组的一个消费者。所以,如果要实现传统消息系统的“队列”模型,可以让每个消费者都拥有相同的消费组名称,这样消息就会负载均衡到所有的消费者;如果要实现“发布订阅”模型,则每个消费者的消费组名称都不相同,这样每条消息就会广播给所有的消费者。

分区是消费者线程模型的最小并行单位。如图1-5(左)所示,生产者发布消息到一台服务器的3个分区时,只有一个消费者消费所有的3个分区。在图1-5(右)中,3个分区分布在3台服务器上,同时有3个消费者分别消费不同的分区。假设每个服务器的吞吐量是300 MB,在图1-5(左)中分摊到每个分区只有100 MB,而在图1-5(右)中集群整体的吞吐量有900 MB。可以看到,增加服务器节点会提升集群的性能,增加消费者数量会提升处理性能。

图像说明文字

同一个消费组下多个消费者互相协调消费工作,Kafka会将所有的分区平均地分配给所有的消费者实例,这样每个消费者都可以分配到数量均等的分区。Kafka的消费组管理协议会动态地维护消费组的成员列表,当一个新消费者加入消费组,或者有消费者离开消费组,都会触发再平衡操作。

Kafka的消费者消费消息时,只保证在一个分区内消息的完全有序性,并不保证同一个主题中多个分区的消息顺序。而且,消费者读取一个分区消息的顺序和生产者写入到这个分区的顺序是一致的。比如,生产者写入“hello”和“kafka”两条消息到分区P1,则消费者读取到的顺序也一定是“hello”和“kafka”。如果业务上需要保证所有消息完全一致,只能通过设置一个分区完成,但这种做法的缺点是最多只能有一个消费者进行消费。一般来说,只需要保证每个分区的有序性,再对消息加上键来保证相同键的所有消息落入同一个分区,就可以满足绝大多数的应用。

上面从宏观角度分析了Kafka的3种基本模型,下面分析Kafka在底层实现上的一些设计细节与考虑。

1.3 Kafka的设计与实现

下面我们会从3个角度分析Kafka的一些设计思路,并尝试回答下面3个问题。

 如何利用操作系统的优化技术来高效地持久化日志文件和加快数据传输效率?

 Kafka的生产者如何批量地发送消息,消费者采用拉取模型带来的优点都有哪些?

 Kafka的副本机制如何工作,当故障发生时,怎么确保数据不会丢失?

1.3.1 文件系统的持久化与数据传输效率

人们普遍认为一旦涉及磁盘的访问,读写的性能就严重下降。实际上,现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读(read-ahead)会提前将一个比较大的磁盘块读入内存。后写(write-behind)会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存(disk cache/page cache),所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接I/O会绕过磁盘缓存)。综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。

如图1-6(左)所示,应用程序写入数据到文件系统的一般做法是:在内存中保存尽可能多的数据,并在需要时将这些数据刷新到文件系统。但这里我们要做完全相反的事情,右图中所有的数据都立即写入文件系统的持久化日志文件,但不进行刷新数据的任何调用。数据会首先被传输到磁盘缓存,操作系统随后会将这些数据定期自动刷新到物理磁盘。

消息系统内的消息从生产者保存到服务端,消费者再从服务端读取出来,数据的传输效率决定了生产者和消费者的性能。生产者如果每发送一条消息都直接通过网络发送到服务端,势必会造成过多的网络请求。如果我们能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,就可以减少网络传输的带宽,进一步提高数据的传输效率。

图像说明文字

消费者要读取服务端的数据,需要将服务端的磁盘文件通过网络发送到消费者进程,而网络发送通常涉及不同的网络节点。如图1-7(左)所示,传统读取磁盘文件的数据在每次发送到网络时,都需要将页面缓存先保存到用户缓存,然后在读取消息时再将其复制到内核空间,具体步骤如下。

(1) 操作系统将数据从磁盘中读取文件到内核空间里的页面缓存。

(2) 应用程序将数据从内核空间读入用户空间的缓冲区。

(3) 应用程序将读到的数据写回内核空间并放入socket缓冲区。

(4) 操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送出去。

结合Kafka的消息有多个订阅者的使用场景,生产者发布的消息一般会被不同的消费者消费多次。如图1-7(右)所示,使用“零拷贝技术”(zero-copy)只需将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的使用者时,都可以重复使用同一个页面缓存),避免了重复的复制操作。这样,消息使用的速度基本上等同于网络连接的速度了。

图像说明文字

这里我们用一个示例来对比传统的数据复制和“零拷贝技术”这两种方案。假设有10个消费者,传统复制方式的数据复制次数是4×10 = 40次,而“零拷贝技术”只需1 + 10 = 11次(一次表示从磁盘复制到页面缓存,另外10次表示10个消费者各自读取一次页面缓存)。显然,“零拷贝技术”比传统复制方式需要的复制次数更少。越少的数据复制,就越能更快地读取到数据;延迟越少,消费者的性能就越好。

1.3.2 生产者与消费者

Kafka的生产者将消息直接发送给分区主副本所在的消息代理节点,并不需要经过任何的中间路由层。为了做到这一点,所有消息代理节点都会保存一份相同的元数据,这份元数据记录了每个主题分区对应的主副本节点。生产者客户端在发送消息之前,会向任意一个代理节点请求元数据,并确定每条消息对应的目标节点,然后把消息直接发送给对应的目标节点。

如图1-8所示,生产者客户端有两种方式决定发布的消息归属于哪个分区:通过随机方式将请求负载到不同的消息代理节点(图1-8左图),或者使用“分区语义函数”将相同键的所有消息发布到同一个分区(图1-8右图)。对于分区语义,Kafka暴露了一个接口,允许用户指定消息的键如何参与分区。比如,我们可以将用户编号作为消息的键,因为对相同用户编号散列后的值是固定的,所以对应的分区也是固定的。

图像说明文字

在1.3.1节中,生产者采用批量发送消息集的方式解决了网络请求过多的问题。生产者会尝试在内存中收集足够数据,并在一个请求中一次性发送一批数据。另外,我们还可以为生产者客户端设置“在指定的时间内收集不超过指定数量的消息”。比如,设置消息大小上限等于64字节,延迟时间等于100毫秒,表示在100毫秒内消息大小达到64字节要立即发送;如果在100毫秒时还没达到64字节,也要把已经收集的消息发送出去。客户端采用这种缓冲机制,在发送消息前会收集尽可能多的数据,通过每次牺牲一点点额外的延迟来换取更高的吞吐量。相应地,服务端的I/O消耗也会大大降低。

如图1-9所示,消费者读取消息有两种方式。第一种是消息代理主动地“推送”消息给下游的消费者(图1-9左图),由消息代理控制数据传输的速率,但是消息代理对下游消费者是否能及时处理不得而知。如果数据的消费速率低于产生速率,消费者会处于超负荷状态,那么发送给消费者的消息就会堆积得越来越多。而且,推送方式也难以应付不同类型的消费者,因为不同消费者的消费速率不一定都相同,消息代理需要调整不同消费者的传输速率,并让每个消费者充分利用系统的资源。这种方式实现起来比较困难。

第二种读取方式是消费者从消息代理主动地“拉取”数据(见图1-9右图),消息代理是无状态的,它不需要标记哪些消息被消费者处理过,也不需要保证一条消息只会被一个消费者处理。而且,不同的消费者可以按照自己最大的处理能力来拉取数据,即使有时候某个消费者的处理速度稍微落后,它也不会影响其他的消费者,并且在这个消费者恢复处理速度后,仍然可以追赶之前落后的数据。

图像说明文字

因为消息系统不能作为严格意义上的数据库,所以保存在消息系统中的数据,在不用之后应该及时地删除掉并释放磁盘空间。消息需要删除,其原因一般是消息被消费之后不会再使用了,大多数消息系统会在消息代理记录关于消息是否已经被消费过的状态:当消息从消息代理发送给消费者时(基于推送模型),消息代理会在本地记录这条消息“已经被消费过了”。但如果消费者没能处理这条消息(比如由于网络原因、请求超时或消费者挂掉),就会导致“消息丢失”。解决消息丢失的一种办法是添加应答机制,消息代理在发送完消息后只把消息标记为“已发送”,只有收到消费者返回的应答信息才表示“已消费”。但还是会存在一个问题:消费者处理完消息就失败了,导致应答没有返回给消息代理,这样消息代理又会重新发送消息,导致消息被重复处理。这种方案还有一个缺点:消息代理需要保存每条消息的多种状态(比如,消息状态为“已发送”时,消息代理需要锁住这条消息,保证消息不会发送两次),这种方式需要在客户端和服务端做一些复杂的状态一致性保证。

Kafka采用了基于拉取模型的消费状态处理,它将主题分成多个有序的分区,任何时刻每个分区都只被一个消费者使用。并且,消费者会记录每个分区的消费进度(即偏移量)。每个消费者只需要为每个分区记录一个整数值,而不需要像其他消息系统那样记录每条消息的状态。假设有10000条消息,传统方式需要记录10000条消息的状态;如果用Kafka的分区机制,假设有10个分区,每个分区1000条消息,总共只需要记录10个分区的消费状态(需要保存的状态数据少了很多,而且也没有了锁)。

和传统方式需要跟踪每条消息的应答不同,Kafka的消费者会定时地将分区的消费进度保存成检查点文件,表示“这个位置之前的消息都已经被消费过了”。传统方式需要消费者发送每条消息的应答,服务端再对应答做出不同的处理;而Kafka只需要让消费者记录消费进度,服务端不需要记录消息的任何状态。除此之外,让消费者记录分区的消费进度还有一个好处:消费者可以“故意”回退到某个旧的偏移量位置,然后重新处理数据。虽然这种处理方式看起来违反了队列模型的规定(一条消息发送给队列的一个消费者之后,就不会被其他消费者再次处理),但在实际运用中,很多消费者都需要这种功能。比如,消费者的处理逻辑代码出现了问题,在部署并启动消费者后,需要处理之前的消息并重新计算。

和生产者采用批量发送消息类似,消费者拉取消息也可以一次拉取一批消息。消费者客户端拉取消息,然后处理这一批消息,这个过程一般套在一个死循环里,表示消费者永远处于消费消息的状态(因为消息系统的消息总是一直产生数据,所以消费者也要一直消费消息)。消费者采用拉取方式消费消息有一个缺点:如果消息代理没有数据或者数据量很少,消费者可能需要不断地轮询,并等待新数据的到来(拉取模式主动权在消费者手里,但是消费者并不知道消息代理有没有新的数据;如果是推送模式,只有新数据产生时,消息代理才会发送数据给消费者,就不存在这种问题)。解决这个问题的方案是:允许消费者的拉取请求以阻塞式、长轮询的方式等待,直到有新的数据到来。我们可以为消费者客户端设置“指定的字节数量”,表示消息代理在还没有收集足够的数据时,客户端的拉取请求就不会立即返回。

1.3.3 副本机制和容错处理

Kafka的副本机制会在多个服务端节点(简称节点,即消息代理节点)上对每个主题分区的日志进行复制。当集群中的某个节点出现故障时,访问故障节点的请求会被转移到其他正常节点的副本上。副本的单位是主题的分区,Kafka每个主题的每个分区都有一个主副本以及0个或多个备份副本。备份副本会保持和主副本的数据同步,用来在主副本失效时替换为主副本。

如图1-10所示,所有的读写请求总是路由到分区的主副本。虽然生产者可以通过负载均衡策略将消息分配到不同的分区,但如果这些分区的主副本都在同一个服务器上(见图1-10左图),就会存在数据热点问题。因此,分区的主副本应该均匀地分配到各个服务器上(见图1-10右图)。通常,分区的数量要比服务器多很多,所以每个服务器都可以成为一些分区的主副本,也能同时成为一些分区的备份副本。

图像说明文字

备份副本始终尽量保持与主副本的数据同步。备份副本的日志文件和主副本的日志总是相同的,它们都有相同的偏移量和相同顺序的消息。备份副本从主副本消费消息的方式和普通的消费者一样,只不过备份副本会将消息运用到自己的本地日志文件(备份副本和主副本都在服务端,它们都会将收到的分区数据持久化成日志文件)。普通的消费者客户端拉取到消息后并不会持久化,而是直接处理。

分布式系统处理故障容错时,需要明确地定义节点是否处于存活状态。Kafka对节点的存活定义有两个条件:

 节点必须和ZK保持会话;

 如果这个节点是某个分区的备份副本,它必须对分区主副本的写操作进行复制,并且复制的进度不能落后太多。

满足这两个条件,叫作“正在同步中”(in-sync)。每个分区的主副本会跟踪正在同步中的备份副本节点(In Sync Replicas,即ISR)。如果一个备份副本挂掉、没有响应或者落后太多,主副本就会将其从同步副本集合中移除。反之,如果备份副本重新赶上主副本,它就会加入到主副本的同步集合中。

在Kafka中,一条消息只有被ISR集合的所有副本都运用到本地的日志文件,才会认为消息被成功提交了。任何时刻,只要ISR至少有一个副本是存活的,Kafka就可以保证“一条消息一旦被提交,就不会丢失”。只有已经提交的消息才能被消费者消费,因此消费者不用担心会看到因为主副本失败而丢失的消息。下面我们举例分析Kafka的消息提交机制如何保证消费者看到的数据是一致的。

 生产者发布了10条消息,但都还没有提交(没有完全复制到ISR中的所有副本)。如果没有提交机制,消息写到主副本的节点就对消费者立即可见,即消费者可以立即看到这10条消息。但之后主副本挂掉了,这10条消息实际上就丢失了。而消费者之前能看到这10条丢失的数据,在主副本挂掉后就看不到了,导致消费者看到的数据出现了不一致。

 如果有提交机制的保证,并且生产者发布的10条消息还没有提交,则对消费者不可见。即使这10条消息都已经写入主副本,但是它们在还没有来得及复制到其他备份副本之前,主副本就挂掉了。那么,这10条消息就不算写入成功,生产者会重新发送这10条消息。当这10条消息成功地复制到ISR的所有副本后,它们才会认为是提交的,即对消费者才是可见的。在这之后,即使主副本挂掉了也没有关系,因为原先消费者能看到主副本的10条消息,在新的主副本上也能看到这10条消息,不会出现不一致的情况。

下面我们开始做一些简单的实验,通过观察结果来更形象地理解Kafka的一些基本概念。

1.4 快速开始

如表1-1所示,我们会在单机环境下事先创建好不同副本数、不同分区数的几个主题。

图像说明文字

我们的实验会分成单机模式与分布式模式,不同的模式会操作不同的主题。

1.4.1 单机模式

如果是一个服务器,只需要在不同的终端分别启动ZK和KafkaServer服务端进程:

$ cd kafka_2.10-0.10.0.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

1 . 一个副本一个分区

通过create选项创建一个名称为test的主题,副本数为1,分区数为1,并连接本地的ZK。通过list选项,可以查看集群的主题列表。创建主题后,日志目录下将创建以“主题名称-分区编号”命名的文件夹,用来存放主题分区的消息。相关代码如下:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 --topic test
Created topic "test".
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

接着使用控制台的生产者脚本启动一个命令行模式的生产者,并往指定主题生产一条消息:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
this is first message
this is second message

使用控制台的消费者脚本启动一个命令行模式的消费者,并从分区的最开始位置开始读取消息:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
  --topic test --from-beginning
this is first message
this is second message
^CProcessed a total of 2 messages

2 . 一个副本多个分区

创建多个分区的my-partitioned-topic主题,然后使用describe命令查看指定主题的详细信息,验证一共有3个分区,并且每个分区都有以下5个属性。

 Topic。主题名称,如果没有事先创建主题,Kafka也可以帮我们自动创建主题。

 Partition。分区编号,从0开始。为了简洁起见,本书会用P0表示Partition:0。

 Leader。当前分区负责读写的节点,只有主副本才会接受消息的读写。

 Replicas。分区的复制节点列表,它与主题的副本数量有关,默认只有一个副本,即主副本。

 Isr。同步状态的副本,是Replicas的子集,必须是存活的,并且都能赶上主副本。

相关代码如下:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 
  --replication-factor 1 \
  --partitions 3 --topic my-partitioned-topic
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 \
  --topic my-partitioned-topic
Topic:my-partitioned-topic PartitionCount:3    ReplicationFactor:1 Configs:
Topic:my-partitioned-topic Partition:0  Leader:0  Replicas:0  Isr:0
Topic:my-partitioned-topic Partition:1  Leader:0  Replicas:0  Isr:0
Topic:my-partitioned-topic Partition:2  Leader:0  Replicas:0  Isr:0

为了验证消息是否写到主题分区的日志目录中,可以查看日志目录,其中以log结尾的文件是二进制的日志格式,可以使用Linux的strings命令直接查看文件内容。可以看到,my-partitioned-topic这个主题还没有消息产生,不过Kafka已经提前创建了文件夹和对应的文件。相关代码如下:

$ tree /tmp/kafka-logs
/tmp/kafka-logs

├── cleaner-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
├── replication-offset-checkpoint
├── my-partitioned-topic-0
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── my-partitioned-topic-1
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── my-partitioned-topic-2
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── test-0
     ├── 00000000000000000000.index
     └── 00000000000000000000.log


$ strings /tmp/kafka-logs/test-0/00000000000000000000.log
this is first message
this is second message

接着启动一个控制台的生产者,然后往my-partitioned-topic主题模拟生产6条消息。观察不同日志文件的内容,可以发现这6条消息会被均匀地发送到3个分区日志文件中。相关代码如下:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 \
  --topic my-partitioned-topic
message1
message2
message3
message4
message5
message6

$ strings /tmp/kafka-logs/my-partitioned-topic-0/00000000000000000000.log
message3
message6
$ strings /tmp/kafka-logs/my-partitioned-topic-1/00000000000000000000.log
message2
message5
$ strings /tmp/kafka-logs/my-partitioned-topic-2/00000000000000000000.log
message1
message4

默认的log.dirs=/tmp/kafka-logs目录下还会记录所有分区的偏移量全局状态数据,比如检查点(checkpoint)和恢复点(recovery)。这些文件并不在每个分区下,而是在分区的父目录中(因为要记录所有分区,如果在分区目录下,就只能记录当前分区状态)。在上面的示例中,test主题产生了两条消息,my-partitioned-topic产生了6条消息。后者有3个分区,每个分区分别存储了两条消息。因为全局状态是分区级别,所以检查点文件中每个分区的检查点偏移量(checkpoint offset)等于2:

$ cat /tmp/kafka-logs/replication-offset-checkpoint
0
4  =》一共有4个分区,每行数据的格式是:主题分区偏移量
my-partitioned-topic 2 2
my-partitioned-topic 1 2
my-partitioned-topic 0 2
test 0 2

$ cat /tmp/kafka-logs/recovery-point-offset-checkpoint
0
4
my-partitioned-topic 1 0
my-partitioned-topic 2 0
my-partitioned-topic 0 0
test 0 0

再次启动一个控制台的消费者,并订阅my-partitioned-topic主题,可以看到消息没有按照生产者的顺序读取出来。这是因为Kafka不保证全局的消息顺序,只保证分区级别的消息顺序。比如,分区P2的message1一定在message4之前,分区P1的message2一定在message5之前:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
  --topic my-partitioned-topic --from-beginning
message1    =》分区P2(Partition2)
message4  
message3    =》分区P0(Partition0)
message6
message2    =》分区P1(Partition1)
message5

上面实验了Kafka的分区特性,并且验证了生产者的负载均衡,消费者读取消息时只保证分区消息有序。接下来,我们通过分布式模式的实验,验证Kafka的分布式模型与故障容错是如何工作的。

1.4.2 分布式模式

在本地模拟启动4个Kafka服务,更改每个节点配置文件中的消息代理编号、端口号、日志文件目录。除了上面已经启动的一个Kafka服务外,还需要再启动3个服务(执行脚本与下面的方式类似):

$ cpconfig/server.propertiesconfig/server1.properties
$ vi config/server1.properties
    broker.id=1
    port=9093
log.dir=/tmp/kafka-logs-1
$ bin/kafka-server-start.sh config/server1.properties &

在分布式模式下,启动了4个Kafka服务。下面创建的主题副本数都是3个,分区数分别是1个、3个和5个:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
  --replication-factor 3 --partitions 1 --topic my-replicated-topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
  --replication-factor 3 --partitions 3 --topic my-replicated-topic2
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
  --replication-factor 3 --partitions 5 --topic my-replicated-topic3

查看上面创建的有多个副本的主题分区信息,副本数为3,每个分区都会分布在3个服务端节点上:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 \
  --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1  ReplicationFactor:3 Configs:
Topic:my-replicated-topic   Partition:0 Leader:1  Replicas:1,2,0 Isr:1,2,0

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 \
  --topic my-replicated-topic2
Topic:my-replicated-topic2  PartitionCount:3  ReplicationFactor:3 Configs:
Topic:my-replicated-topic2  Partition:0 Leader:0  Replicas:0,2,3 Isr:0,2,3
Topic:my-replicated-topic2  Partition:1 Leader:1  Replicas:1,3,0 Isr:1,3,0
Topic:my-replicated-topic2  Partition:2 Leader:2  Replicas:2,0,1 Isr:2,0,1

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 \
  --topic my-replicated-topic3
Topic:my-replicated-topic3  PartitionCount:5  ReplicationFactor:3 Configs:
Topic:my-replicated-topic3  Partition:0 Leader:2  Replicas:2,1,3 Isr:2,1,3
Topic:my-replicated-topic3  Partition:1 Leader:3  Replicas:3,2,0 Isr:3,2,0
Topic:my-replicated-topic3  Partition:2 Leader:0  Replicas:0,3,1 Isr:0,3,1
Topic:my-replicated-topic3  Partition:3 Leader:1  Replicas:1,0,2 Isr:1,0,2
Topic:my-replicated-topic3  Partition:4 Leader:2  Replicas:2,3,0 Isr:2,3,0

下面手动停止一个Kafka服务节点,来模拟Kafka集群中一个服务端节点出现宕机的情况:

$ jps -lm
39520 org.apache.zookeeper.server.quorum.QuorumPeerMain
39955 kafka.Kafkaconfig/server1.properties
39749 kafka.Kafkaconfig/server.properties
40157 kafka.Kafkaconfig/server2.properties
40366 kafka.Kafkaconfig/server3.properties
$ kill -9 40366

再次查看创建的主题的信息,可以看到原先落在主副本编号为3的节点,分区的主副本会转移。比如,my-replicated-topic3主题的P1分区,主副本原先是3,现在变为2。另外,虽然每个分区的Replicas没有变化,但Isr都不再包含3:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 \
  --topic my-replicated-topic
Topic:my-replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:
Topic:my-replicated-topic  Partition:0  Leader:1  Replicas:1,3,0 Isr:1,0

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 \
  --topic my-replicated-topic3
Topic:my-replicated-topic3  PartitionCount:5    ReplicationFactor:3 Configs:
Topic:my-replicated-topic3 Partition:0  Leader:2   Replicas:2,1,3 Isr:2,1
Topic:my-replicated-topic3 Partition:1  Leader:2   Replicas:3,2,0 Isr:2,0
Topic:my-replicated-topic3 Partition:2  Leader:0   Replicas:0,3,1 Isr:0,1
Topic:my-replicated-topic3 Partition:3  Leader:1   Replicas:1,0,2 Isr:1,0,2
Topic:my-replicated-topic3 Partition:4  Leader:2   Replicas:2,3,0 Isr:2,0

重启编号为3的服务器,可以看到副本数量不足的分区,它们的Isr会进行扩展,都会添加上3。比如,my-replicated-topic3主题的分区P0、P1、P2、P4相比上一步都加上了3:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 \
  --topic my-replicated-topic3
Topic:my-replicated-topic3  PartitionCount:5    ReplicationFactor:3 Configs:
Topic:my-replicated-topic3 Partition:0  Leader:2   Replicas:2,1,3 Isr:2,1,3
Topic:my-replicated-topic3 Partition:1  Leader:2   Replicas:3,2,0 Isr:2,0,3
Topic:my-replicated-topic3 Partition:2  Leader:0   Replicas:0,3,1 Isr:0,1,3
Topic:my-replicated-topic3 Partition:3  Leader:1   Replicas:1,0,2 Isr:1,0,2
Topic:my-replicated-topic3 Partition:4  Leader:2   Replicas:2,3,0 Isr:2,0,3

服务器挂掉又重启后,分区的主副本并没有变化。观察上面的输出结果,可以看到编号为2的节点有3个分区在上面。为了保证主副本会负载均衡到所有的服务器,可以执行preferred-replica-election脚本来手动执行平衡操作,即选择Replicas的第一个副本作为分区的主副本。比如,分区P1的副本集等于[3, 2, 0],当前的主副本编号为2,那么就要将分区P1的主副本从现有的2迁移到3上。执行完平衡操作后,再次查看分区信息,可以看到分区的主副本确实发生了转移:

$ bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 \
  --topic my-replicated-topic3
Topic:my-replicated-topic3  PartitionCount:5  ReplicationFactor:3 Configs:
Topic:my-replicated-topic3  Partition:0 Leader:2  Replicas:2,1,3 Isr:2,1,3
Topic:my-replicated-topic3  Partition:1 Leader:3  Replicas:3,2,0 Isr:2,0,3
Topic:my-replicated-topic3  Partition:2 Leader:0  Replicas:0,3,1 Isr:0,1,3
Topic:my-replicated-topic3  Partition:3 Leader:1  Replicas:1,0,2 Isr:1,0,2
Topic:my-replicated-topic3  Partition:4 Leader:2  Replicas:2,3,0 Isr:2,0,3

前面我们主要站在Kafka服务端的角度,验证了Kafka集群的分布式分区特性、故障容错、分区主副本的转移等。下面从客户端角度(主要是消费者)验证消费组的分布式消费模型。

1.4.3 消费组示例

默认的控制台消费者在启动时,都会分配到一个随机的消费组编号,即一个消费组只有一个消费者。为了模拟一个消费组下有多个消费者的情况,通过指定消费者的配置文件,并在配置文件中配置消费组的编号,比如这里会设置group.id等于test-consumer-group:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 2 --topic test

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
1.hello world       =》进入第一个分区(P0)
2.hello kafka       =》进入第二个分区(P1)
3.hello world       =》进入第一个分区(P0)
4.hello kafka       =》进入第二个分区(P1)
5.message queue     =》进入第一个分区(P0)
6.message system    =》进入第二个分区(P1)
7.hello again       =》进入第一个分区(P0)

# 第一个消费者,分配到第一个分区,顺序读取P0的消息
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test \
  --from-beginning --consumer.configconfig/consumer.properties
1.hello world
3.hello world
5.message queue
7.hello again

# 第二个消费者,分配到第二个分区,顺序读取P1的消息
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test \
  --from-beginning --consumer.configconfig/consumer.properties
2.hello kafka
4.hello kafka
6.message system

# 不指定消费组,随机分配一个消费组,分配所有的分区,只保证分区内消息的有序性
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test \
  --from-beginning
1.hello world       =》第一个分区
3.hello world       =》第一个分区
5.message queue     =》第一个分区
2.hello kafka
4.hello kafka
6.message system
7.hello again       =》第一个分区

查看消费组下消费者的偏移量信息,其中Pid表示分区编号,Offset表示消费进度,Lag表示落后的数据量,Owner表示分区归哪个消费者所有(下面的Owner打印信息省略了消费组名称前缀,比如zqhmac-2130366d-0实际上是test-consumer-group_zqhmac-2130366d-0):

# 指定消费组名称,两个消费者分担两个分区
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
  --group test-consumer-group --zookeeper localhost:2181
Group                Topic Pid Offset logSize Lag Owner
test-consumer-group  test  0   3      3       0   zqhmac-2130366d-0
test-consumer-group  test  1   4      4       0   zqhmac-efb9d839-0

# 控制台的消费者不指定消费组名称,分配到所有的分区
$ bin/kafka-run-class.shkafka.tools.ConsumerOffsetChecker \
  --group console-consumer-46616 --zookeeper localhost:2181
Group                  Topic Pid Offset logSize Lag Owner
console-consumer-46616 test  0   3      3       0   zqhmac-3a477729-0
console-consumer-46616 test  1   4      4       0   zqhmac-3a477729-0

如图1-11所示,消费者的元数据信息会注册在ZK中,比如分区的偏移量、分区所属的消费者、所有消费者。另外,ZK还记录了Kafka集群的信息,比如服务器列表、控制器等。

图像说明文字

本节做的几个实验主要验证Kafka的一些基本原理,还有其他一些实验可以参考本书的附录部分。

1.5 环境准备

Kafka的源码采用Gradle来管理,在源码根目录下使用gradle idea命令生成IDEA项目,然后导入到IDEA工具中。如图1-12所示,我们创建resources资源目录,并添加log4j.properties日志配置文件(或者直接加到build的classes文件夹下)。

图像说明文字

如图1-13所示,安装完Scala插件并准备好开发环境后,在运行选项中设置主类为kafka.Kafka。

图像说明文字

运行Kafka类,观察日志。可以看到,服务端会连接ZK。创建日志目录,并启动后台的一些工作线程,比如日志清理线程、日志刷写线程、网络服务端、选举成为控制器、启动消费组的协调者等。最后打印出Kafka Server 0 started,表示Kafka服务在本机开发环境中启动成功。相关代码如下:

INFO KafkaConfig values:
auto.create.topics.enable = true        如果没有topic,会自动创建
offsets.topic.num.partitions = 50       默认offset的内部topic的分区有50个
min.insync.replicas = 1                 ISR中最少要有一个副本
num.partitions = 1                      普通的topic默认只有一个分区
    listeners = PLAINTEXT://:9092       没有使用安全机制,使用纯文本通信
zookeeper.connect = localhost:2181      ZK的通信地址
log.dirs = /tmp/kafka-logs              日志文件的目录
    broker.id = 0                       Broker在集群中的编号
INFO starting (kafka.server.KafkaServer)
INFO Connecting to zookeeper on localhost:2181 (k.server.KafkaServer)
INFO Log directory '/tmp/kafka-logs' not found, creating it.
INFO Logs loading complete. (k.log.LogManager)
INFO Starting log cleanup with a period of 300000 ms. (k.log.LogManager) ①
INFO Starting log flusher with a default period of 9223.. ms. ②
INFO Awaiting socket connections on 0.0.0.0:9092. (k.network.Acceptor)
INFO [Socket Server on Broker 0], Started 1 acceptor threads ③
INFO Creating /controller (k.utils.ZKCheckedEphemeral) ④
INFO 0 successfully elected as leader (k.server.ZookeeperLeaderElector)
INFO [GroupCoordinator 0]: Startup complete. (GroupCoordinator) ⑤
INFO [Group Metadata Manager on Broker 0]:Removed 0 expired offsets in 47 ms.
INFO [ThrottledRequestReaper-Produce], Starting  (k.s.ClientQuotaManager)
INFO [ThrottledRequestReaper-Fetch], Starting  (k.s.ClientQuotaManager)
INFO Creating /brokers/ids/0 (is it secure? false) (k.u.ZKCheckedEphemeral)
INFO Register broker 0 at path /brokers/ids/0 with localhost,9092,PLAINTEXT)
INFO New leader is 0 (k.s.ZookeeperLeaderElector$LeaderChangeListener)
INFO [Kafka Server 0], started (k.server.KafkaServer)

接下来,看看下Kafka源码包的一些主要目录,其中core是Kafka的核心库,包括了管理接口(admin)、请求和响应协议(api)、旧版本的生产者(producer)、旧版本的消费者(consumer)、控制器(controller)、协调者(coordinator)、消息的持久化(message)、网络层(network)、服务端的实现(server):

$ cdkafka&& tree -L 1
.

├── bin         运行相关的脚本
├── clients     客户端(新版本的生产者和消费者)
├── config      配置文件
├── connect     Kafka连接器组件
├── core        Kafka核心组件
├── examples    客户端示例(生产者和消费者)
├── streams     Kafka流处理库
├── tools       一些工具类,核心库下也有其他工具类

本书根据以上源码结构,主要分成下面三大部分的内容。

 客户端。包括生产者(第2章)、消费者(第3章)、新消费者(第4章)。

 服务端。包括协调者(第5章)、日志存储(第6章)、控制器(第7章)。

 高级应用。包括Kafka连接器(第8章)、Kafka流处理(第9章)。

在开始源码分析之前,表1-2至表1-5列举了本书用到的一些中英文对照术语。

图像说明文字

图像说明文字

图像说明文字

目录

  • 前言
  • 第1章 Kafka入门
  • 第2章 生产者
  • 第3章 消费者:高级API和低级API
  • 第4章 新消费者
  • 第5章 协调者
  • 第6章 存储层
  • 第7章 控制器
  • 第8章 基于Kafka构建数据流管道
  • 第9章 Kafka流处理 
  • 第10章 高级特性介绍