Kafka如何保证消息不丢失
首先要想保证全链路的消息不丢失,要从生产端和消费端来考虑
生产端
控制参数ack的设置
- acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息
- acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader宕掉,消息会丢失
- acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。min.insync.replicas必须保证大于1,等于1就和ack=1的情况一样。这时候消息基本不会丢失,是最强的数据保证。
消费端
消费端这边首先不能是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer宕机,未处理完的数据丢失了。所以这里必须是手动提交才能最大程度的保证消息不丢失。
消息重复消费
生产端
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息。消费者会多次消费消息。
消费端
消费端这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理,出现了重复消费
解决重复消费可以再消费端通过消息幂等去做
消息乱序问题
生产端
生产端配置了异步发送,重试机制,可能会出现消息乱序。某一条消息由于网络抖动的原因没有发送成功,但它后面的消息发送成功了,由于重试机制发送失败的消息会再次发送,出现了消息乱序。
解决:可以用同步发送的模式去发消息,acks不能设置为0,这样也能保证消息发送的有序。
消费端
将消息发送到同一个分区,然后用一个消费者去消费,保证了消费者消费顺序,不过性能较低。
性能低解决方案:可以在消费端增加几条内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。
消息积压问题
消息积压出现情况
- 由于生产端生产消息与消费端消费速率不一致(消费者由于性能问题消费过慢)
- 消费组挂了一个消费者
- 消费端出现了bug导致消息一致重新消费
如果积压了大量的消息在broker没有消费,要想紧急处理这一批消息,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区
由于消费端bug导致的消息积压,可以模拟死信队列实现方案,讲不成功的消息转移到死信队列,后续分析死信队列里面的消息解决消费端问题
延时队列
延时队列存储的对象是延时消息。延时消息是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费
实现方案:可以模拟RocketMQ的延时队列实现方案来实现。发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列(topic_1s,topic_5s,topic_10s,…);然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体的topic
Kafka的事务
这里的事务和RocketMQ的事务消息是不一致的,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败);就是Kafka发送消息到不同的Topic里面去,保证同时发送成功或同时失败。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
//初始化事务
producer.initTransactions();
try {
//开启事务
producer.beginTransaction();
for (int i = 0; i < 100; i++){
//发到不同的主题的不同分区
producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));
}
//提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
//回滚事务
producer.abortTransaction();
}
producer.close();