Kafka生产实践与性能优化


Kafka如何保证消息不丢失

首先要想保证全链路的消息不丢失,要从生产端和消费端来考虑

生产端

控制参数ack的设置

  • acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息
  • acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader宕掉,消息会丢失
  • acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。min.insync.replicas必须保证大于1,等于1就和ack=1的情况一样。这时候消息基本不会丢失,是最强的数据保证。

消费端

消费端这边首先不能是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer宕机,未处理完的数据丢失了。所以这里必须是手动提交才能最大程度的保证消息不丢失。

消息重复消费

生产端

发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息。消费者会多次消费消息。

消费端

消费端这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理,出现了重复消费

解决重复消费可以再消费端通过消息幂等去做

消息乱序问题

生产端

生产端配置了异步发送,重试机制,可能会出现消息乱序。某一条消息由于网络抖动的原因没有发送成功,但它后面的消息发送成功了,由于重试机制发送失败的消息会再次发送,出现了消息乱序。

解决:可以用同步发送的模式去发消息,acks不能设置为0,这样也能保证消息发送的有序。

消费端

将消息发送到同一个分区,然后用一个消费者去消费,保证了消费者消费顺序,不过性能较低。

性能低解决方案:可以在消费端增加几条内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。

消息积压问题

消息积压出现情况

  1. 由于生产端生产消息与消费端消费速率不一致(消费者由于性能问题消费过慢)
  2. 消费组挂了一个消费者
  3. 消费端出现了bug导致消息一致重新消费

如果积压了大量的消息在broker没有消费,要想紧急处理这一批消息,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区

由于消费端bug导致的消息积压,可以模拟死信队列实现方案,讲不成功的消息转移到死信队列,后续分析死信队列里面的消息解决消费端问题

延时队列

延时队列存储的对象是延时消息。延时消息是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费

实现方案:可以模拟RocketMQ的延时队列实现方案来实现。发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列(topic_1s,topic_5s,topic_10s,…);然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体的topic

Kafka的事务

这里的事务和RocketMQ的事务消息是不一致的,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败);就是Kafka发送消息到不同的Topic里面去,保证同时发送成功或同时失败。

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
 //初始化事务
 producer.initTransactions();

 try {
     //开启事务
     producer.beginTransaction();
     for (int i = 0; i < 100; i++){
         //发到不同的主题的不同分区
         producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));
         producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));
         producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));
     }
     //提交事务
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     //回滚事务
     producer.abortTransaction();
 }
 producer.close();

文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 本篇
Kafka生产实践与性能优化 Kafka生产实践与性能优化
Kafka如何保证消息不丢失首先要想保证全链路的消息不丢失,要从生产端和消费端来考虑 生产端控制参数ack的设置 acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最
2024-03-05
下一篇 
KafKa单机、集群、控制台搭建 KafKa单机、集群、控制台搭建
环境准备java版本:1.8 操作系统:CentOS7 zookeeper:3.5.8 安装包:https://kafka.apache.org/downloads 我这里选择的是kafka_2.11-2.4.1版本的KafKa,2.11是
2024-02-15
  目录