好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

消息队列-kafka消费异常问题

概述

在kafka中,或者是说在任何消息队列中都有个消费顺序的问题。为了保证一个队列顺序消费,当当中一个消息消费异常时,必将影响后续队列消息的消费,这样业务岂不是卡住了。比如笔者举个最简单的例子:我发送1-100的消息,在我的处理逻辑当中 msg%5==0我就进行 int i=1/0操作,这必将抛异常,一直阻塞在msg=5上,后面6-100无法消费。下面笔者给出解决方案。

重试一定次数(消息丢失)

?

1

2

3

4

5

6

7

8

9

10

11

@KafkaHandler

     @KafkaListener (topics = { "quickstart-events" },groupId = "test-consumer-group-2" , concurrency = "1" )

     public void test6(String msg){

               businessProcess(msg);

             }

            private void businessProcess(String msg){

         System.out.println( "接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());

        if (Integer.valueOf(msg) % 5 == 0 ) {

             int i = 1 / 0 ;

         }

     }

说明:如果读者使用的是java客户端,也就是spring进行实现,那么在不做任何处理的情况下,会自动重试10次,然后消息会被直接处理掉。也就是说如果你的业务允许消息丢失,那么你不需要额外的编码处理

加入到死讯队列(消息不丢失)

消费端代码:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

//1.启用手动提交offset

//2.配置errorHandler,用来加入到死讯队列

//3.不管业务处理是否处理异常还是正常都提交offset

@KafkaHandler

     @KafkaListener (topics = { "quickstart-events" },groupId = "test-consumer-group-2" ,

             errorHandler = "kafkaListenerErrorHandler" , concurrency = "1" )

     public void test6(String msg,Acknowledgment ack){

         try {

             businessProcess(msg);

         } finally {

             //手动提交

             ack.acknowledge();

         }

     }

//1.专门处理死讯队列消息,都是topicName+.DLT的主题

//2.死讯队列里,只有消费成功的才提交offset,否则等待bug修复完上线,继续处理

     @KafkaHandler

     @KafkaListener (topics = { "quickstart-events.DLT" },groupId = "test-consumer-group-2" , concurrency = "1" )

     public void test7(String msg,Acknowledgment ack){

         try {

             businessProcess(msg);

             ack.acknowledge();

         } catch (Exception e){

             e.printStackTrace();

         }

     }

//业务代码

     private void businessProcess(String msg){

         System.out.println( "接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());

         if (Integer.valueOf(msg) % 5 == 0 ) {

             int i = 1 / 0 ;

         }

     }

异常处理器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

//1.向容器注册一个KafkaListenerErrorHandler类型的bean

//2.该bean就是当处理消息异常的时候,将消息加入到.DLT主题中

@Component ( "kafkaListenerErrorHandler" )

public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {

    @Autowired

     private KafkaTemplate<String, Object> kafkaTemplate;

     private static final String TOPIC_DLT= ".DLT" ;

     @Override

     public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {

         System.out.println( "消费失败消息:" +message.toString());

         //获取消息处理异常主题

         MessageHeaders headers = message.getHeaders();

         String topic=headers.get( "kafka_receivedTopic" )+TOPIC_DLT;

         //放入死讯队列

         kafkaTemplate.send(topic,message.getPayload());

         return message;

     }

}

效果图 :

说明:以上基本上就是使用死讯队列的方案,也许读者会觉得这样编码复杂度很高,但其实不用担心,其实上面这些代码基本上是使用死讯队列的模板代码,在成熟一点的公司,一般会使用上述代码进行简单封装,这里笔者给个思路,有兴趣同学可以实现一下。我们其实可以使用aop思想,进行自定义一个@EnableDLT这样的注解去实现,这样上面这个方案使用起来是不是就简单优雅了。之前笔者在开发过程中使用过亚马逊的消息队列服务,也不过是这样实现罢了。

总结

本篇文章就到这里了,希望可以给你带来一些帮助,也希望您能够多多关注的更多内容!

原文链接:https://blog.csdn.net/m0_58554082/article/details/117927021

查看更多关于消息队列-kafka消费异常问题的详细内容...

  阅读:18次