最近遇到一个kafka方面的问题,大致就是由于consumer处理业务超时,导致无法正常提交Offset,进而导致无法消费新消息的问题。下面我想从以下几个方面对此次故障排查进行复盘分析:业务背景、问题描述、排查思路、经验教训。
创新互联-专业网站定制、快速模板网站建设、高性价比海州网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式海州网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖海州地区。费用合理售后完善,10多年实体公司更值得信赖。
先简单描述一下业务背景吧。我们有个业务需要严格按顺序消费Topic消息,所以针对该topic设置了唯一的partition,以及唯一的副本。当同一个消费组的多个consumer启动时,只会有一个consumer订阅到该Topic,进行消费,保证同一个消费组内的消费顺序。
注:消费组的groupId名称为“smart-building-consumer-group”,订阅的Topic名称为“gate_contact_modify”。
有一天我们突然收到一个问题反馈:producer侧的业务产生消息后,consumer侧并没有得到预期的结果。经过排查,排除了业务逻辑出现问题的可能性,我们判断最有可能是因为kafka消息没有被消费到。为了印证这个猜测,我们查看了consumer消费日志,发现日志中存在这样几处问题:
(1)日志偶尔会打印出一条Kafka的警告日志,内容为:org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync:648 - Auto-commit of offsets {gate_contact_modify-0=OffsetAndMetadata{offset=2801, metadata=''}} failed for group smart-building-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
(2)接着进行了一次rebalance;
(3)consumer侧输出了Topic消费者的业务日志,表明正常获取到了Topic消息。
接着我们查看kafka 消费组中该Topic对应的Offset的变化情况,发现Offset一直没有变化。
日志中的异常信息很明确的告知我们,topic消息消费完成后,由于group发生了一次rebalance,导致Commit没有被提交,这表明两次poll消息的间隔时间超过了max.poll.interval.ms定义的最大间隔,这也意味着一次poll后处理消息的过程超时了,正是由于poll间隔时间超时,导致了一次rebalance。同时建议我们要么增加间隔时间,要么减少每次拉取的最大消息数。
另外,由于Commit没有被提交,导致OffSet值没有变化,那么每次拉取到的消息都是同一批重复消息。具体的异常流程如下图:
根据上述信息,我们进一步检查了consumer的max.poll.records配置、max.poll.interval.ms配置,并统计了每条Topic消息的处理耗时,发现max.poll.records使用了默认配置值500,max.poll.interval.ms使用了默认配置值为300s,而每条Topic消息的处理耗时为10S。这进一步证实了我们的推论:
由于每次拉取的消息数太多,而每条消息处理时间又较长,导致每次消息处理时间超过了拉取时间间隔,从而使得group进行了一次rebalance,导致commit失败,并最终导致下次拉取重复的消息、继续处理超时,进入一个死循环状态。
知道问题根源后,我们结合业务特点,更改了max.poll.records=1,每次仅拉取一条消息进行处理,最终解决了这个问题。
这次故障排查,使我们对Kafka消息poll机制、rebalance和commit之间的相互影响等有了更深的理解。
(1)kafka每次poll可以指定批量消息数,以提高消费效率,但批量的大小要结合poll间隔超时时间和每条消息的处理时间进行权衡;
(2)一旦两次poll的间隔时间超过阈值,group会认为当前consumer可能存在故障点,会触发一次rebalance,重新分配Topic的partition;
(3)如果在commit之前进行了一次rebalance,那么本次commit将会失败,下次poll会拉取到旧的数据(重复消费),因此要保证好消息处理的幂等性;