背景
Kafka是一个分布式基于Zookeeper的分布式消息系统,支持多分区,多副本。是由Scala语言开发,现在是Apache基金会顶级开源项目。它有着高吞吐、低延迟的特性。这种特性主要应用于大数据场景,日志收集场景。不适合一些复杂的业务场景,复杂的业务场景不适合Kafka建议RocketMQ。
Kafka基本概念
Broker
Kafka服务节点,可启动多个组成Broker集群,Broker基本信息都是存储在zookeeper,可以认为Broker集群是一个无状态的集群。
Broker主要职责:
- 接收Producer和Consumer的请求,并把消息写进磁盘。Broker以Topic为基本单位划分成不同partition,消息存储在不同partition。
- Broker可以保证同一个topic下的同一个partition消息是有序的。
- Broker中保存的数据是有有效期的,默认168小时,可以修改配置log.retention.hours调整
Topic
Kafka根据Topic将消息进行分组,相当于逻辑分组,发送消息的时候需要指定Topic
ConsumerGroup
消费者组,每个Consumer都有一个指定的ConsumerGroup,一条消息只可以被一个ConsumerGroup中的一个Consumer消费。一条消息可以被不同的ConsumerGroup消费。根据这可以实现单播消息和多播消息。
Partition
一个topic有多个partition,每个partition内部消息都是有序的。每个partition都有着自己的一个集群,并且有一个Leader对外提供读写请求,其余都是Follow不能提供读写请求只是同步Leader数据并且在Leader宕掉的情况下进行选举提升为Leader。
每个Topic下的数据是海量的。对于这批海量的数据为了减小服务器压力,可以建多个partition,而对于partition可以派分到不同的broker上面,而不同的broker在不同的服务器上。这样也就是把消息的压力分到了不同的服务器上面,减少服务器压力。
简而言之,topic可以进行partition分区主要可以提升并行度和分布式存储(将大体量的数据分区存储在不同服务器)。
单播消息
一条消息只能被一个消费者消费,让所有消费者在同一个消费组里面就可以了。
多播消息
一条消息能被多个消费者消费,多个消费者放在不同的消费组里面进行消费,这时多个消费者就都可以收到消息。
消息日志
消息发送在topic下面的某一个partition下。实际消息存储在commit log文件里面,每个partition,都对应一个commit log文件,每一条消息都有一个唯一编号offset。存储结构是每一个Topic下面是多个partition文件,每一个文件里面存储着一个message消息序列,是一个offset数组。
每个Consumer是基于自己在commit log的消息进度来工作的,消息消费的offset是由Consumer自己来维护的(其实是ConsumerGroup来维护的,这里也可以看出每一个消费组里面的消息者只能消费一次消息)。指定offset可以消费特定的消息,也可以消费重复的消息(消息没有删除的情况下)。
从这也可以看出Consumer对Kafka集群的影响基本没有。
下面具体看一下Topic下面的日志情况

进入my-repl-topic 0 分区下

Kafka 一个topic下一个分区的消息存储在一个文件夹下,文件夹命名为topic名字+分区号。消息在分区内是分段存储的,每段消息是存储在不一样的log文件下,每段日志消息规定不超过1G,方便kafka把log文件加载到内存去运算。
从这我们可以看出一个topic分区下又4个文件,主要是 *.index, *.log, *.timeindex
- *.log:消息存储文件 主要存offset和消息体
- *.index:部分消息的索引文件,kafka每次往分区写入了4k(可配置)的消息就会记录一条当前消息的offset进index文件,一般定位消息的offset就是在index下面快速定位然后在通过index里面存储的地址去log文件里面快速找到对应的消息消费
- *.timeindex:消息的发送时间索引文件,和 index文件写入机制一样不过会比index文件多存储一个消息发送时间戳,如果需要按照时间来定位消息的offset会在timeindex里面去查找
这几个文件的文件名命令规则就是这个日志段文件里包含的起始offset
log.segment.bytes,限定了每个日志段文件的大小,最大就是 1GB
offset记录机制
上面第一张图有50个consumer_offsets-*文件,从上面描述的topic名字+分区号命令就可以知道有个consumer_offsets主题
共50个分区(可配置),consumer会将自己消息的偏移量(offset)发送到这个topic下面,消息key是consumerGroupId+topic+分区号
value是当前offset的值,这样做的目的是抗高并发,扛不住也可以通过增加分区,增加机器来解决
默认50分区配置offsets.topic.num.partitions可修改分区数
Controller选举机制
这里的Controller叫做核心总控制器,是在broker层面的。它会选举一个broker为核心总控制器,负责管理整个集群中分区,副本的状态。
如何选举?
这里的选举主要是依赖于zookeeper来进行的
- 当一个broker启动时,会在zookeeper那创建一个/controller临时节点,zookeeper会保证只有一个创建成功,这个broker就是这个集群的Controller.
- 当这个Controller宕机了,zookeeper临时节点消失,其他broker会一直watch这个/controller节点,发现节点消失会重新竞争创建/controller节点,创建成功的就成为了新的Controller

职责
| 监听 | 监听zk路径 | 职责 |
|---|---|---|
| BrokerChangeListener | /brokers/ids | broker增减的监听 |
| TopicChangeListener | /brokers/topics | topic新建的监听 |
| DeleteTopicListener | /admin/delete_topics | topic删除的监听 |
| PartitionChangeListener | /brokers/topics/TOPIC_NAME | topic partition扩容的监听 |
| IsrChangeNotificationListener | /isr_change_notification | partition isr 变动的监听 |
| PreferredReplicaElectionListener | /admin/preferred_replica_election | partition leader的选举 |
Partition副本选举机制
controller感知到broker节点挂了,unclean.leader.election.enable配置分2种情况选leader

Replicas:某个partition在哪几个broker上存在备份
ISR(In Sync Replicas):ISR是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。(replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)
- unclean.leader.election.enable=false:controller会从ISR列表中选出leader,ISR空的时候会等待,这个时候这个分区不可用
- unclean.leader.election.enable=true:controller会从ISR列表中选出leader,ISR空的时候会从ISR以外的副本中选leader,但新leader数据可能落后太多
消费者Rebalance机制
消费者reblance机制就是当一个消费者组里面的消费者发生了变化或者分区数发生了变化,为了让分区可以被消费,kafka会重新分配分区。
reblance机制只适用于不指定分区消费的情况
reblance过程,消费者不会消费消息
2个关键概念
组协调器GroupCoordinator:Broker,组协调器主要负责监控消费组里面的所有消费者的心跳,判断是否宕机,开始reblance
消费组协调器LeaderCoordinator:消费者,制定分区方案
reblance流程
- 选举组协调器:每个消费组都会选举一个broker作为自己的组协调器GroupCoordinator,消费组里面的每个消费者启动时都会向kafka集群中的某个broker发送FindCoordinatorRequest来找到自己的组协调器,组协调器选择是根据消费者偏移量发送到的分区的leader的broker节点,就是消费组提交的offset到consumer_offsets的某个分区,而这个分区的leader是哪个broker,那么这个broker就是这个消费组的组协调器。
- 加入消费组:找到组协调器后,当一个消费者加入消费组,消费者会向组协调器发送JoinGroupRequest,组协调器会从消费组中选择第一个加入消费组的消费者作为消费组协调器LeaderCoordinator,然后消费组协调器制定分区方案并发送给组协调器。
- 同步阶段:组协调器收到消费组协调器的分区方案,将分区方案下发给其他消费者,每个消费者根据分区方案进行消息消费。
分区策略
主要有3种分区策略,range、round-robin、sticky
假设现有0-9 10个分区,消费组里有3个消费者
- range:按照分区序号排序,0,1,2,3给第一个;4,5,6给第二个;7,8,9给第3个
- round-robin:轮询分配,0,3,6,9给第一个;1,4,7给第二个;2,5,8给第3个
- sticky:初始时分配策略与round-robin类似,但是在rebalance的时候,需要保证如下两个原则
- 分区的分配要尽可能均匀
- 分区的分配尽可能与上次分配的保持相同
HW与LEO详解
HW(HignWatermark)高水位:consumer最多只能消费到HW,每个分区副本都有自己的HW,都分别自己维护HW。
LEO(log-end-offset):日志最后的偏移量,每个分区副本的LEO并不一定相同,Kafka会取多个副本中最小的LEO为HW.
这里简单介绍一下高水位同步过程
前提:假设某主题3副本,HW|LEO为3
- 两条消息写到Leader,这时Leader的LEO增加2,Leader LEO为5;3个副本HW为3,这时候消费者消费不到新写入的2条消息,他们只能消费到HW的消息
- Follower开始fetch消息,当2个Follower都fetch了一条消息此时Leader LEO为5;;3个副本HW为4;2个Follower LEO为4,此时消费者可以消息4这条消息不能消费5这条消息

分区数与吞吐量成正相关吗
答案是否,kafka有自己的压测工具,可以测试分区数不同,各种情况下的吞吐量
创建3个分区数不同的topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 10 --topic test-10
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 100 --topic test-100
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1000 --topic test-1000
# 往test里发送一百万消息,每条设置1KB
# throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间
bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.186.131:9092 acks=1

从压测结果来看,分区数10 吞吐量30.07 MB/sec;分区数100 吞吐量60.47 MB/sec;分区数1000 吞吐量30.03 MB/sec;
到达某个值吞吐量反而开始下降,说明一味的增加分区数并不能使我们的吞吐量得到提升,需要经过实际测试确定Kafka一些参数的设置(包括分区数和buff缓冲区等等)
注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错”java.io.IOException : Too many open files”。
这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535
kafka高性能的原因
- 磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
- 数据传输的零拷贝
- 读写数据的批量batch处理以及压缩传输
Zookeeper与Kafka
