概述
Kafka 是什么?主要应用场景有哪些?
Kafka 是一个分布式流式处理平台,可以作为企业级的消息引擎。
Kafka 主要有两大应用场景:
- 消息队列 :建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
- 数据处理: 构建实时的流数据处理程序来转换或处理数据流。
Kafka的优势在哪里?
- 极致的性能 :最高可以每秒处理千万级别的消息。
- 生态系统兼容性无可匹敌 :尤其在大数据和流计算领域。
Kafka和rabbitmq比较
rabbitmq
- Erlang 语言编写的,轻量级、迅捷。
- Exchange 模块支持非常灵活的路由配置。
- 队列模型:生产者将消息发送到交换器,通过匹配交换器类型、Binding Key、Routing Key后,路由到一个或者多个队列中。队列用于存储消息,消费者直接绑定队列以消费消息。
问题:
- RabbitMQ 对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。
- 性能一般,大概每秒钟可以处理几万到十几万条消息
kafka
- 使用 Scala 和 Java 语言开发。
- 性能比较好,大约每秒钟可以处理几十万条消息,极限处理能力可以超过每秒 2000 万条消息。
- 发布订阅模型:生产者直接发消息到主题,每个主题可以包含多个分区。消息消费支持消费者组,多个分区会平均分配给同一个消费者组里的不同消费者。不在同一个消费者组的消费者能订阅消费同一条消息,相同消费者组的消费者存在消费竞争(负载均衡)
- Kafka具有消息存储的功能,消息被消费后不会被立即删除,需要被不同的消费者组消费
问题:
同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
对比项 | RabbitMQ | Kafka |
---|---|---|
吞吐量 | 低 | 高 |
有序性 | 全局有序性 | 分区有序性 |
消息可靠性 | 多策略组合 | 消息持久化 |
消费者模式 | 推拉 | 拉 |
消息堆积 | 无法支持较大的消息堆积 | 支持消息堆积,并批量持久化到磁盘 |
流处理 | 不支持 | 支持 |
时效性 | 高 | 中 |
运维便捷度 | 高 | 中 |
系统依赖 | 无 | zookeeper |
Web监控 | 自带 | 第三方 |
优先级队列 | 支持 | 不支持 |
死信 | 支持 | 不支持 |
消息回溯 | 支持 | 不支持 |
Producer、Consumer、Broker、Topic、Partition、Record?
- Producer(生产者) : 产生消息的一方。
- Consumer(消费者):消费消息的一方。
- Broker(代理) : Kafka 实例。
- Topic(主题) : 通过不同的主题区分不同的业务类型的消息记录。生产者将消息发送到特定的主题,消费者通过订阅特定的主题来消费消息。
- Partition(分区): 分区属于主题的一部分。一个主题可以有多个分区 ,同一主题下的分区可以分布在不同的Broker上。
- 记录(Record):写入到kafka并可以被消费者读取的数据。每条记录包含一个键、值和时间戳。
LEO、LSO、AR、ISR、HW?
- LEO(Log End Offset):日志末端位移值或末端偏移量,表示日志下一条待插入消息的位移值。
- LSO(Log Stable Offset):该值控制了事务型消费者能够看到的消息范围。
- AR(Assigned Replicas):主题被创建后,创建的副本集合,副本个数由副本因子决定。
- ISR(In-Sync Replicas):AR中与领导者副本保持同步的副本集合。领导者副本天然在ISR中。
- HW(High watermark):高水位值,这是控制消费者可读取消息范围。一个普通消费者只能看到Leader上介于Log Start Offset和高水位(不含)之间的所有消息。
生产者
ack 机制
- 0:生产者不会等待 broker 的 ack,延迟最低,可能丢数据
- 1:服务端会等待领导者副本收到消息,成功写入PageCache后,会返回ack,此时领导者副本切换可能丢数据
- -1:领导者副本所在broker收到消息后,等待所有ISR列表中的追随者副本返回结果后,再返回ack,数据不会丢失。
数据从领导者副本同步到追随者副本,需要2步:
- 数据从pageCache被刷盘到磁盘。因为只有磁盘中的数据才能被同步到副本。
- 数据同步到追随者副本中,并且replica成功将数据写入PageCache。在生产者得到ack后,哪怕是所有机器都停电,数据也至少会存在于领导者副本的磁盘内。
生产端怎么实现幂等的
幂等性生产者
设置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
底层原理:空间去换时间,即在 Broker 端多保存一些字段。当生产者发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是把它们“丢弃”掉。
- ProducerID:生产者ID,在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
- SequenceNumber:消息序列号,对于每个ProducerID,生产者发送数据的每个主题和分区都对应一个从0开始单调递增的SequenceNumber值。
作用范围:它只能保证单分区上的幂等性,能够保证某个主题的一个分区上不出现重复消息。只能实现单会话上的幂等性,不能实现跨会话的幂等性。
怎么确定向哪一个分区写消息
获取集群的分区数据之后,根据生产者分区策略,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。具体策略包括:
轮询策略
Round-robin
策略,即顺序分配。
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,最合理也最常用的分区策略。
随机策略
Randomness
策略。将消息放置到任意一个分区上。
按消息键保序策略
Kafka 允许为每条消息定义消息键,简称为 Key。可以保证同一个 Key 的所有消息都进入到相同的分区里面,每个分区下的消息处理都是有顺序的。
消费者
什么是消费者组?
可扩展且具有容错性的消费者机制。
- Kafka允许你将同一份消息广播到多个消费者组里。
- 一个消费者组中可以包含多个消费者,他们共同消费该主题的数据。
- 同一个消费者组下的消费者有相同的组ID,他们被分配不同的订阅分区。
- 当某个消费者挂掉的时候,其他消费者会自动地承担起它负责消费的分区。
如何保证消息的消费顺序?
Kafka 只能为我们保证分区中的消息有序,而不能保证主题中的消息有序。
- 一个主题只对应一个 分区。
- (推荐)发送消息的时候指定key/分区。
__consumer_offsets 作用?
- 位移主题,存储消费者的位移数据,位移主题消息的 Key 中格式:<Group ID,主题名,分区号 >,消息体保存了位移值和位移提交的元数据,诸如时间戳和用户自定义的数据等。
- 保存消费者组相关的消息
- 用于删除消费者组过期位移、删除消费者组的消息。tombstone 消息,即墓碑消息
消费者如何获取到offset
确定由位移主题的哪个分区来保存该 Group 数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
。找出该分区 Leader 所在的 Broker,该 Broker 即为对应的 Coordinator。
重平衡后,协调者统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
位移主题消息的 Key 中格式:<Group ID,主题名,分区号 >,消息体保存了位移值和位移提交的元数据,诸如时间戳和用户自定义的数据等
Java Consumer为什么采用单线程来获取消息?
Java Consumer是双线程的设计。用户主线程,负责获取消息;心跳线程,负责向Kafka汇报消费者存活情况。将心跳单独放入专属的线程,能够有效地规避因消息处理速度慢而被视为下线的假死情况。
- 单线程获取消息的设计能够避免阻塞式的消息获取方式。单线程轮询方式容易实现异步非阻塞式,这样便于将消费者扩展成支持实时流处理的操作算子。因为很多实时流处理操作算子都不能是阻塞式的。
- 可以简化代码的开发。多线程交互的代码是非常容易出错的。
消息是采用 Pull 模式,还是 Push 模式?
生产者将消息推送到 broker,消费者从 broker 拉取消息。
消费者如何消费数据
KafkaConsumer 类不是线程安全的 (thread-safe),不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出
ConcurrentModificationException
异常
1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程
2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。
broker
消息位移的作用
用于标识消息在分区中的位置。
一旦消息被写入到分区日志,它的位移值将不能被修改。
怎么实现消息持久化
Kakfa 依赖文件系统来存储和缓存消息。Kafka 直接将数据写到了文件系统的日志中。
物理上把主题分成一个或多个分区,每个分区在物理上对应一个文件夹,该文件夹下存储这个分区的所有消息和索引文件。
数据索引如何实现
- 将数据文件分段存储。每一个段单独放在一个.log的文件中,数据文件命名是20个字符的长度,以每一个分段文件开始的最下offset来命名,其他位置用0填充。
- 每个log文件的大小默认是1GB,每个log文件就会对应一个同名的index文件。
- index文件采用了稀疏存储的方式,每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,从而可以将索引文件保留到内存中。没有建立索引的数据在查询的过程中需要小范围内的顺序扫描。
磁盘容量规划考虑因素?
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
多副本机制?好处?
- Kafka副本当前分为领导者副本和追随者副本。每个分区在创建时都要选举一个副本,作为领导者副本,其余的副本自动为追随者副本。
- 只有领导者副本才能对外提供读写服务。追随者副本只是采用拉的方式,同步领导者副本中的数据
- 在领导者副本所在的Broker宕机后,Kafka 依托于 ZooKeeper ,实时感知到,并开启领导者选举,从追随者副本中选一个作为新的领导者。
多分区多副本好处?
- 指定多分区, 而各个分区可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
- 多副本提高了消息存储的安全性,提高了容灾能力,不过也相应的增加了所需要的存储空间。
- 读写走领导者副本:方便实现Read-your-writes
- 读写走领导者副本:方便实现单调读,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在
Zookeeper的作用
- 存放元数据:主题分区的相关数据都保存在 ZooKeeper 中。
- 成员管理:Broker节点的注册、注销以及属性变更。
- Controller 选举:选举集群 Controller节点
- 其他管理类任务:包括但不限于主题删除、参数配置等。
为什么不支持读写分离
CAP理论下,我们只能保证可用性和一致性取其一。
如果支持读写分离,一致性就会有一定折扣,意味着可能的数据不一致,或数据滞后。
Controller发生网络分区时,Kafka会怎么样?
判断:Broker端的ActiveControllerCount。
由于Controller会给Broker发送3类请求,LeaderAndIsrRequest,StopReplicaRequest,UpdateMetadataRequest,因此,一旦出现网络分区,这些请求将不能顺利到达Broker端。
将影响主题的创建、修改、删除操作的信息同步。
追随者副本消息同步的流程?
- 追随者副本发送FETCH请求给领导者副本。
- 领导者副本会读取底层日志文件中的消息数据,使用FETCH请求中的fetchOffset,更新追随者副本远程副本的LEO值。领导者副本副本尝试更新分区高水位值。
- 追随者副本接收到FETCH响应之后,会把消息写入到底层日志,接着更新LEO和追随者副本高水位值。
领导者副本和追随者副本的高水位值更新时机是不同的,追随者副本的高水位更新永远落后于领导者副本的高水位。这种时间上的错配是造成各种不一致的原因。
怎么保证同步成功?
leader Epoch 由两部分数据组成。
- Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
- 起始位移(Start Offset)。领导者副本在该 Epoch 值上写入的首条消息的位移。
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。 领导者副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该领导者副本是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目。
Leader总是-1,怎么破?
表明Controller不工作了,导致无法分配leader。
方法
1、删除ZooKeeper中的/controller节点,触发Controller重选举。Controller重选举能够为所有主题分区重刷分区状态,可以有效解决因不一致导致的领导者副本不可用问题。
2、重启Controller节点上的Kafka进程,让其他节点重新注册Controller角色。
如何设置Kafka能接收的最大消息的大小?
- Broker端参数:
message.max.bytes
,max.message.bytes
(主题级别),replica.fetch.max.bytes
- 消费者端参数:
fetch.message.max.bytes
Kafka如何保证高可用
Kafka 的副本机制:
- Kafka 集群由多个 Broker组成。一个Broker可以容纳多个主题,也就是一台服务器可以传输多个 主题 数据。Kafka 为了实现可扩展性,将一个 主题 分散到多个 分区 中。
- Kafka 中同一个分区下的不同副本,分为 领导者副本和 追随者副本。领导者副本 负责处理所有 读写的请求,追随者副本 作为数据备份,拉取 领导者副本的数据进行同步。
- 如果某个 Broker 挂掉,Kafka 会从 ISR 列表中选择一个分区作为新的 领导者副本。
Kafka能手动删除消息吗?
一般不需要用户手动删除消息。它本身提供了留存策略,能够自动删除过期消息。同时支持手动删除消息的。
- 对于设置了Key且参数cleanup.policy=compact的主题而言,我们可以构造一条消息发送给Broker,依靠日志清理组件提供的功能删除掉该 Key 的消息。
- 对于普通主题,可以使用
kafka-delete-records
,或编写程序调用Admin.deleteRecords
方法来删除消息。
如何确定合适的Kafka主题的分区数量?
需要根据每个分区的生产者和消费者的期望吞吐量进行估计,以便达到并行读写、负载均衡和高吞吐。
假设对于单个分区,生产者端的可达吞吐量为p,消费者端的可达吞吐量为c,期望的目标吞吐量为t,那么集群所需要的分区数量至少为max(t/p,t/c)
。
在生产者端,单个分区的吞吐量大小会受到批量大小、数据压缩方法、 确认类型(同步/异步)、复制因子等配置参数的影响。
假设期望读取数据的速率1GB/Sec,而一个消费者的读取速率为50MB/Sec,此时至少需要20个分区以及20个消费者。
如果期望生产数据的速率为1GB/Sec,而每个生产者的生产速率为100MB/Sec,此时就需要有10个分区。
设置20个分区,既可以保障生产速率,也可以保障的吞吐量
判断一个节点还活着的两个条件?
(1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
(2)如果节点是追随者副本,他必须能及时的同步 leader 的写操作,延时不能太久
存储在硬盘上的消息格式是什么?
消息由一个固定长度的头部和可变长度的字节数组组成。
- 消息长度: 4 bytes
- 版本号: 1 byte
- CRC 校验码: 4 bytes
- 具体的消息: n bytes
实际问题
如何保证消息不丢失
生产者丢失消息
生产者调用
send
方法发送消息之后,消息可能因为网络问题并没有发送过去。 为了确定消息是发送成功,我们要判断消息发送的结果。可以采用回调函数的形式,如果消息发送失败的话,我们检查失败的原因之后重新发送即可。为 生产者的
retries
(重试次数)设置一个比较合理的值,一般是3。设置 acks = all,则表明所有副本 Broker 都要接收到消息,该消息才算是已提交
消费者丢失消息
- 关闭自动提交位移,每次在真正消费完消息之后之后,手动提交 。
Kafka 弄丢消息
- 设置
replication.factor >= 3
。防止消息丢失的主要机制就是冗余。 - 设置
min.insync.replicas > 1
。消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。 - 设置
unclean.leader.election.enable = false
。如果一个 Broker 落后原领导者副本太多,那么它一旦成为新的领导者副本,必然会造成消息的丢失。
如何保证消息不重复消费
去重:将消息的唯一标识保存到外部介质中,每次消费处理时判断是否处理过。
- 利用数据库的唯一约束实现幂等
- 为更新的数据设置前置条件,比如版本号
- GUID:在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
怎么增加消费的能力?
1、broker 端参数 num.replica.fetchers
表示的是 追随者副本用多少个线程来拉取消息,默认使用 1 个线程。如果你的 Broker 端 CPU 资源很充足,适当调大该参数值,加快追随者副本的同步速度。
2、在 生产者端,如果要改善吞吐量,通常的标配是增加消息批次的大小以及批次缓存时间,即 batch.size
和 linger.ms
。
3、压缩算法可减少网络 I/O 传输量,从而间接提升吞吐量。适配最好的两个压缩算法是 LZ4 和 zstd
4、消费者端使用多线程方案
为什么性能好
顺序写
操作系统读写磁盘时,需要先寻址,再进行数据读写。如果是机械硬盘,寻址就需要较长的时间。
Kafka 用的是顺序写,追加数据是追加到末尾,磁盘顺序写(pagecache)的性能极高。
零拷贝
Kafka使用了零拷贝技术,使用mmap+write
持久化数据,发送数据使用系统调用sendfile
。
传统的IO
read+write
方式会产生2次DMA拷贝+2次CPU拷贝,同时有4次上下文切换。而通过
mmap+write
方式则产生2次DMA拷贝+1次CPU拷贝,4次上下文切换,通过内存映射减少了一次CPU拷贝,可以减少内存使用,适合大文件的传输。
sendfile
方式是新增的一个系统调用函数,产生2次DMA拷贝+1次CPU拷贝,但是只有2次上下文切换。因为只有一次调用,减少了上下文的切换,但是用户空间对IO数据不可见,适用于静态文件服务器。
网络线程模型
加强版的 Reactor 网络线程模型
消息批量量处理
合并小的请求,然后以流的方式进行交互,直顶网络上限。
kafka如何实现延迟队列?
基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)
。
底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask。
推进时间?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel
专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。
分布式事务
基于消息队列的事务实现。
- 订单系统在消息队列上开启一个事务。
- 然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的
- 半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。
- 如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。