kafka必知必会

概述

Kafka 是什么?主要应用场景有哪些?

Kafka 是一个分布式流式处理平台,可以作为企业级的消息引擎。

Kafka 主要有两大应用场景:

  1. 消息队列 :建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
  2. 数据处理: 构建实时的流数据处理程序来转换或处理数据流。

Kafka的优势在哪里?

  1. 极致的性能 :最高可以每秒处理千万级别的消息。
  2. 生态系统兼容性无可匹敌 :尤其在大数据和流计算领域。

Kafka和rabbitmq比较

rabbitmq

  • Erlang 语言编写的,轻量级、迅捷。
  • Exchange 模块支持非常灵活的路由配置。
  • 队列模型:生产者将消息发送到交换器,通过匹配交换器类型、Binding Key、Routing Key后,路由到一个或者多个队列中。队列用于存储消息,消费者直接绑定队列以消费消息。

问题:

  • RabbitMQ 对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。
  • 性能一般,大概每秒钟可以处理几万到十几万条消息

kafka

  • 使用 Scala 和 Java 语言开发。
  • 性能比较好,大约每秒钟可以处理几十万条消息,极限处理能力可以超过每秒 2000 万条消息。
  • 发布订阅模型:生产者直接发消息到主题,每个主题可以包含多个分区。消息消费支持消费者组,多个分区会平均分配给同一个消费者组里的不同消费者。不在同一个消费者组的消费者能订阅消费同一条消息,相同消费者组的消费者存在消费竞争(负载均衡)
  • Kafka具有消息存储的功能,消息被消费后不会被立即删除,需要被不同的消费者组消费

问题:

同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。

对比项 RabbitMQ Kafka
吞吐量
有序性 全局有序性 分区有序性
消息可靠性 多策略组合 消息持久化
消费者模式 推拉
消息堆积 无法支持较大的消息堆积 支持消息堆积,并批量持久化到磁盘
流处理 不支持 支持
时效性
运维便捷度
系统依赖 zookeeper
Web监控 自带 第三方
优先级队列 支持 不支持
死信 支持 不支持
消息回溯 支持 不支持

Producer、Consumer、Broker、Topic、Partition、Record?

  1. Producer(生产者) : 产生消息的一方。
  2. Consumer(消费者):消费消息的一方。
  3. Broker(代理) : Kafka 实例。
  • Topic(主题) : 通过不同的主题区分不同的业务类型的消息记录。生产者将消息发送到特定的主题,消费者通过订阅特定的主题来消费消息。
  • Partition(分区): 分区属于主题的一部分。一个主题可以有多个分区 ,同一主题下的分区可以分布在不同的Broker上。
  • 记录(Record):写入到kafka并可以被消费者读取的数据。每条记录包含一个键、值和时间戳。

LEO、LSO、AR、ISR、HW?

  • LEO(Log End Offset):日志末端位移值或末端偏移量,表示日志下一条待插入消息的位移值。
  • LSO(Log Stable Offset):该值控制了事务型消费者能够看到的消息范围。
  • AR(Assigned Replicas):主题被创建后,创建的副本集合,副本个数由副本因子决定。
  • ISR(In-Sync Replicas):AR中与领导者副本保持同步的副本集合。领导者副本天然在ISR中。
  • HW(High watermark):高水位值,这是控制消费者可读取消息范围。一个普通消费者只能看到Leader上介于Log Start Offset和高水位(不含)之间的所有消息。

生产者

ack 机制

  • 0:生产者不会等待 broker 的 ack,延迟最低,可能丢数据
  • 1:服务端会等待领导者副本收到消息,成功写入PageCache后,会返回ack,此时领导者副本切换可能丢数据
  • -1:领导者副本所在broker收到消息后,等待所有ISR列表中的追随者副本返回结果后,再返回ack,数据不会丢失。

数据从领导者副本同步到追随者副本,需要2步:

  • 数据从pageCache被刷盘到磁盘。因为只有磁盘中的数据才能被同步到副本。
  • 数据同步到追随者副本中,并且replica成功将数据写入PageCache。在生产者得到ack后,哪怕是所有机器都停电,数据也至少会存在于领导者副本的磁盘内。

生产端怎么实现幂等的

幂等性生产者

设置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

底层原理:空间去换时间,即在 Broker 端多保存一些字段。当生产者发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是把它们“丢弃”掉。

  • ProducerID:生产者ID,在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
  • SequenceNumber:消息序列号,对于每个ProducerID,生产者发送数据的每个主题和分区都对应一个从0开始单调递增的SequenceNumber值。

作用范围:它只能保证单分区上的幂等性,能够保证某个主题的一个分区上不出现重复消息。只能实现单会话上的幂等性,不能实现跨会话的幂等性。

怎么确定向哪一个分区写消息

获取集群的分区数据之后,根据生产者分区策略,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。具体策略包括:

轮询策略

Round-robin 策略,即顺序分配。

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,最合理也最常用的分区策略。

随机策略

Randomness策略。将消息放置到任意一个分区上。

按消息键保序策略

Kafka 允许为每条消息定义消息键,简称为 Key。可以保证同一个 Key 的所有消息都进入到相同的分区里面,每个分区下的消息处理都是有顺序的。

消费者

什么是消费者组?

可扩展且具有容错性的消费者机制。

  • Kafka允许你将同一份消息广播到多个消费者组里。
  • 一个消费者组中可以包含多个消费者,他们共同消费该主题的数据。
  • 同一个消费者组下的消费者有相同的组ID,他们被分配不同的订阅分区。
  • 当某个消费者挂掉的时候,其他消费者会自动地承担起它负责消费的分区。

如何保证消息的消费顺序?

Kafka 只能为我们保证分区中的消息有序,而不能保证主题中的消息有序。

  1. 一个主题只对应一个 分区。
  2. (推荐)发送消息的时候指定key/分区。

__consumer_offsets 作用?

  • 位移主题,存储消费者的位移数据,位移主题消息的 Key 中格式:<Group ID,主题名,分区号 >,消息体保存了位移值和位移提交的元数据,诸如时间戳和用户自定义的数据等。
  • 保存消费者组相关的消息
  • 用于删除消费者组过期位移、删除消费者组的消息。tombstone 消息,即墓碑消息

消费者如何获取到offset

  1. 确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)

  2. 找出该分区 Leader 所在的 Broker,该 Broker 即为对应的 Coordinator。

  3. 重平衡后,协调者统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。

  4. 位移主题消息的 Key 中格式:<Group ID,主题名,分区号 >,消息体保存了位移值和位移提交的元数据,诸如时间戳和用户自定义的数据等

Java Consumer为什么采用单线程来获取消息?

Java Consumer是双线程的设计。用户主线程,负责获取消息;心跳线程,负责向Kafka汇报消费者存活情况。将心跳单独放入专属的线程,能够有效地规避因消息处理速度慢而被视为下线的假死情况。

  • 单线程获取消息的设计能够避免阻塞式的消息获取方式。单线程轮询方式容易实现异步非阻塞式,这样便于将消费者扩展成支持实时流处理的操作算子。因为很多实时流处理操作算子都不能是阻塞式的。
  • 可以简化代码的开发。多线程交互的代码是非常容易出错的。

消息是采用 Pull 模式,还是 Push 模式?

生产者将消息推送到 broker,消费者从 broker 拉取消息。

消费者如何消费数据

KafkaConsumer 类不是线程安全的 (thread-safe),不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException异常

1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程

2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。

broker

消息位移的作用

用于标识消息在分区中的位置。

一旦消息被写入到分区日志,它的位移值将不能被修改。

怎么实现消息持久化

Kakfa 依赖文件系统来存储和缓存消息。Kafka 直接将数据写到了文件系统的日志中。

物理上把主题分成一个或多个分区,每个分区在物理上对应一个文件夹,该文件夹下存储这个分区的所有消息和索引文件

数据索引如何实现

  • 将数据文件分段存储。每一个段单独放在一个.log的文件中,数据文件命名是20个字符的长度,以每一个分段文件开始的最下offset来命名,其他位置用0填充。
  • 每个log文件的大小默认是1GB,每个log文件就会对应一个同名的index文件。
  • index文件采用了稀疏存储的方式,每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,从而可以将索引文件保留到内存中。没有建立索引的数据在查询的过程中需要小范围内的顺序扫描。

磁盘容量规划考虑因素?

  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启用压缩

多副本机制?好处?

  • Kafka副本当前分为领导者副本和追随者副本。每个分区在创建时都要选举一个副本,作为领导者副本,其余的副本自动为追随者副本。
  • 只有领导者副本才能对外提供读写服务。追随者副本只是采用拉的方式,同步领导者副本中的数据
  • 在领导者副本所在的Broker宕机后,Kafka 依托于 ZooKeeper ,实时感知到,并开启领导者选举,从追随者副本中选一个作为新的领导者。

多分区多副本好处?

  1. 指定多分区, 而各个分区可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
  2. 多副本提高了消息存储的安全性,提高了容灾能力,不过也相应的增加了所需要的存储空间。
  3. 读写走领导者副本:方便实现Read-your-writes
  4. 读写走领导者副本:方便实现单调读,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在

Zookeeper的作用

  • 存放元数据:主题分区的相关数据都保存在 ZooKeeper 中。
  • 成员管理:Broker节点的注册、注销以及属性变更。
  • Controller 选举:选举集群 Controller节点
  • 其他管理类任务:包括但不限于主题删除、参数配置等。

为什么不支持读写分离

CAP理论下,我们只能保证可用性和一致性取其一。

如果支持读写分离,一致性就会有一定折扣,意味着可能的数据不一致,或数据滞后。

Controller发生网络分区时,Kafka会怎么样?

判断:Broker端的ActiveControllerCount。

由于Controller会给Broker发送3类请求,LeaderAndIsrRequest,StopReplicaRequest,UpdateMetadataRequest,因此,一旦出现网络分区,这些请求将不能顺利到达Broker端。

将影响主题的创建、修改、删除操作的信息同步。

追随者副本消息同步的流程?

  • 追随者副本发送FETCH请求给领导者副本。
  • 领导者副本会读取底层日志文件中的消息数据,使用FETCH请求中的fetchOffset,更新追随者副本远程副本的LEO值。领导者副本副本尝试更新分区高水位值。
  • 追随者副本接收到FETCH响应之后,会把消息写入到底层日志,接着更新LEO和追随者副本高水位值。

领导者副本和追随者副本的高水位值更新时机是不同的,追随者副本的高水位更新永远落后于领导者副本的高水位。这种时间上的错配是造成各种不一致的原因。

怎么保证同步成功?

leader Epoch 由两部分数据组成。

  • Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  • 起始位移(Start Offset)。领导者副本在该 Epoch 值上写入的首条消息的位移。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。 领导者副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该领导者副本是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目。

Leader总是-1,怎么破?

表明Controller不工作了,导致无法分配leader。

方法

1、删除ZooKeeper中的/controller节点,触发Controller重选举。Controller重选举能够为所有主题分区重刷分区状态,可以有效解决因不一致导致的领导者副本不可用问题。

2、重启Controller节点上的Kafka进程,让其他节点重新注册Controller角色。

如何设置Kafka能接收的最大消息的大小?

  • Broker端参数:message.max.bytesmax.message.bytes(主题级别),replica.fetch.max.bytes
  • 消费者端参数:fetch.message.max.bytes

Kafka如何保证高可用

Kafka 的副本机制:

  • Kafka 集群由多个 Broker组成。一个Broker可以容纳多个主题,也就是一台服务器可以传输多个 主题 数据。Kafka 为了实现可扩展性,将一个 主题 分散到多个 分区 中。
  • Kafka 中同一个分区下的不同副本,分为 领导者副本和 追随者副本。领导者副本 负责处理所有 读写的请求,追随者副本 作为数据备份,拉取 领导者副本的数据进行同步。
  • 如果某个 Broker 挂掉,Kafka 会从 ISR 列表中选择一个分区作为新的 领导者副本。

Kafka能手动删除消息吗?

一般不需要用户手动删除消息。它本身提供了留存策略,能够自动删除过期消息。同时支持手动删除消息的。

  • 对于设置了Key且参数cleanup.policy=compact的主题而言,我们可以构造一条消息发送给Broker,依靠日志清理组件提供的功能删除掉该 Key 的消息。
  • 对于普通主题,可以使用kafka-delete-records,或编写程序调用Admin.deleteRecords方法来删除消息。

如何确定合适的Kafka主题的分区数量?

需要根据每个分区的生产者和消费者的期望吞吐量进行估计,以便达到并行读写、负载均衡和高吞吐。

假设对于单个分区,生产者端的可达吞吐量为p,消费者端的可达吞吐量为c,期望的目标吞吐量为t,那么集群所需要的分区数量至少为max(t/p,t/c)

在生产者端,单个分区的吞吐量大小会受到批量大小、数据压缩方法、 确认类型(同步/异步)、复制因子等配置参数的影响。

假设期望读取数据的速率1GB/Sec,而一个消费者的读取速率为50MB/Sec,此时至少需要20个分区以及20个消费者。

如果期望生产数据的速率为1GB/Sec,而每个生产者的生产速率为100MB/Sec,此时就需要有10个分区。

设置20个分区,既可以保障生产速率,也可以保障的吞吐量

判断一个节点还活着的两个条件?

(1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接

(2)如果节点是追随者副本,他必须能及时的同步 leader 的写操作,延时不能太久

存储在硬盘上的消息格式是什么?

消息由一个固定长度的头部和可变长度的字节数组组成。

  • 消息长度: 4 bytes
  • 版本号: 1 byte
  • CRC 校验码: 4 bytes
  • 具体的消息: n bytes

实际问题

如何保证消息不丢失

生产者丢失消息

  • 生产者调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。 为了确定消息是发送成功,我们要判断消息发送的结果。可以采用回调函数的形式,如果消息发送失败的话,我们检查失败的原因之后重新发送即可。

  • 为 生产者的retries(重试次数)设置一个比较合理的值,一般是3。

  • 设置 acks = all,则表明所有副本 Broker 都要接收到消息,该消息才算是已提交

消费者丢失消息

  • 关闭自动提交位移,每次在真正消费完消息之后之后,手动提交 。

Kafka 弄丢消息

  • 设置 replication.factor >= 3。防止消息丢失的主要机制就是冗余。
  • 设置 min.insync.replicas > 1。消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。
  • 设置 unclean.leader.election.enable = false。如果一个 Broker 落后原领导者副本太多,那么它一旦成为新的领导者副本,必然会造成消息的丢失。

如何保证消息不重复消费

去重:将消息的唯一标识保存到外部介质中,每次消费处理时判断是否处理过。

  1. 利用数据库的唯一约束实现幂等
  2. 为更新的数据设置前置条件,比如版本号
  3. GUID:在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

怎么增加消费的能力?

1、broker 端参数 num.replica.fetchers表示的是 追随者副本用多少个线程来拉取消息,默认使用 1 个线程。如果你的 Broker 端 CPU 资源很充足,适当调大该参数值,加快追随者副本的同步速度。

2、在 生产者端,如果要改善吞吐量,通常的标配是增加消息批次的大小以及批次缓存时间,即 batch.sizelinger.ms

3、压缩算法可减少网络 I/O 传输量,从而间接提升吞吐量。适配最好的两个压缩算法是 LZ4 和 zstd

4、消费者端使用多线程方案

为什么性能好

顺序写

操作系统读写磁盘时,需要先寻址,再进行数据读写。如果是机械硬盘,寻址就需要较长的时间。

Kafka 用的是顺序写,追加数据是追加到末尾,磁盘顺序写(pagecache)的性能极高。

零拷贝

Kafka使用了零拷贝技术,使用mmap+write持久化数据,发送数据使用系统调用sendfile

传统的IOread+write方式会产生2次DMA拷贝+2次CPU拷贝,同时有4次上下文切换。

而通过mmap+write方式则产生2次DMA拷贝+1次CPU拷贝,4次上下文切换,通过内存映射减少了一次CPU拷贝,可以减少内存使用,适合大文件的传输。

sendfile方式是新增的一个系统调用函数,产生2次DMA拷贝+1次CPU拷贝,但是只有2次上下文切换。因为只有一次调用,减少了上下文的切换,但是用户空间对IO数据不可见,适用于静态文件服务器。

网络线程模型

加强版的 Reactor 网络线程模型

消息批量量处理

合并小的请求,然后以流的方式进行交互,直顶网络上限。

kafka如何实现延迟队列?

基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)

底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask。

推进时间?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。

分布式事务

基于消息队列的事务实现。

  • 订单系统在消息队列上开启一个事务。
  • 然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的
  • 半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。
  • 如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。

   转载规则


《kafka必知必会》 wangyixin-tom 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
mysql必知必会 mysql必知必会
架构server层 连接器:管理连接,权限验证 查询缓存 分析器:词法、语法解析 优化器:生成执行计划,索引选择 执行器:操作引擎,返回结果 存储引擎层 负责数据存储和提取,插件式,支持InnoDB、MyISAM多个存储引擎
2021-04-10
下一篇 
rabbitmq必知必会 rabbitmq必知必会
MQ的优缺点优点 异步 - 不需要立即处理的消息可以之后慢慢处理。异步处理可以提高系统吞吐量。 解耦 - 各个系统间通过消息通信,不用关心其他系统的处理。 削锋 - 可以通过消息队列支撑突发访问压力;可以缓解短时间内的高并发请求,不会因为突
2021-04-05
  目录