好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

spring kafka @KafkaListener详解与使用过程

说明

从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些 属性将覆盖在使用者工厂中配置的具有相同名称的所有属性 。您不能通过这种方式指定group.id和client.id属性。他们将被忽略; 可以使用#{…?}或属性占位符(${…?})在SpEL上配置注释上的大多数属性。

比如:

?

1

2

@KafkaListener (id = "consumer-id" ,topics = "SHI_TOPIC1" ,concurrency = "${listen.concurrency:3}" ,

          clientIdPrefix = "myClientId" )

属性 concurrency 将会从容器中获取 listen.concurrency 的值,如果不存在就默认用3

@KafkaListener详解

id 监听器的id

①. 消费者线程命名规则

填写:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[ consumer-id5-1-C-1 ,5,main]-groupId:BASE-DEMO consumer-id5 消费

没有填写ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 ,5,main] consumer-id7

②.在相同容器中的监听器ID不能重复

否则会报错

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

③.会覆盖消费者工厂的消费组GroupId

假如配置文件属性配置了消费组 kafka.consumer.group-id=BASE-DEMO
正常情况它是该容器中的默认消费组
但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么当前消费者的消费组就是 consumer-id7 ;

当然如果你不想要他作为groupId的话 可以设置属性 idIsGroup = false ;那么还是会使用默认的GroupId;

④. 如果配置了属性groupId,则其优先级最高

 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代码中最终这个消费者的消费组 GroupId 是 [groupId-test]

该id属性(如果存在)将用作Kafka消费者group.id属性,并覆盖消费者工厂中的已配置属性(如果存在)您还可以groupId显式设置或将其设置idIsGroup为false,以恢复使用使用者工厂的先前行为group.id。

groupId 消费组名

指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id

如何获取消费者 group.id

在监听器中调用 KafkaUtils.getConsumerGroupId() 可以获得当前的groupId; 可以在日志中打印出来; 可以知道是哪个客户端消费的;

topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

可以同时监听多个
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一) topicPartitions 显式分区分配

可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

?

1

2

3

4

5

6

7

8

@KafkaListener (id = "thing2" , topicPartitions =

         { @TopicPartition (topic = "topic1" , partitions = { "0" , "1" }),

           @TopicPartition (topic = "topic2" , partitions = "0" ,

              partitionOffsets = @PartitionOffset (partition = "1" , initialOffset = "100" ))

         })

public void listen(ConsumerRecord<?, ?> record) {

     ...

}

上面例子意思是 监听 topic1 的0,1分区;监听 topic2 的第0分区,并且第1分区从offset为100的开始消费;

errorHandler 异常处理

实现 KafkaListenerErrorHandler ; 然后做一些异常处理;

?

1

2

3

4

5

6

7

8

9

10

11

12

13

@Component

public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {

     @Override

     public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {

         return null ;

     }

 

     @Override

     public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {

         //do someting

         return null ;

     }

}

调用的时候 填写beanName;例如 errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 监听器工厂

指定生成监听器的工厂类;

例如我写一个 批量消费的工厂类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

/**

  * 监听器工厂 批量消费

  * @return

  */

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {

     ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

             new ConcurrentKafkaListenerContainerFactory<>();

     factory.setConsumerFactory(kafkaConsumerFactory());

     //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG

     factory.setBatchListener( true );

     return factory;

}

使用 containerFactory = "batchFactory"

clientIdPrefix 客户端前缀

会覆盖消费者工厂的 kafka.consumer.client-id 属性; 最为前缀后面接 -n n是数字

concurrency并发数

会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端来分配消费分区;分布式情况 总线程数=concurrency*机器数量; 并不是设置越多越好,具体如何设置请看 Java concurrency之集合

?

1

2

3

4

5

6

7

8

9

10

11

12

/**

  * 监听器工厂

  * @return

  */

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {

     ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

             new ConcurrentKafkaListenerContainerFactory<>();

     factory.setConsumerFactory(kafkaConsumerFactory());

     factory.setConcurrency( 6 );

     return factory;

}

?

1

@KafkaListener (id = "consumer-id5" ,idIsGroup = false ,topics = "SHI_TOPIC3" , containerFactory = "concurrencyFactory" ,concurrency = " 1 )

虽然使用的工厂是 concurrencyFactory (concurrency配置了6); 但是他最终生成的监听器数量 是1;

properties 配置其他属性

kafka中的属性看 org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

?

1

2

3

4

@KafkaListener (id = "consumer-id5" ,idIsGroup = false ,topics = "SHI_TOPIC3" , containerFactory = "concurrencyFactory" ,concurrency = "1"

         , clientIdPrefix = "myClientId5" ,groupId = "groupId-test" ,

         properties = {

                 "enable.auto测试数据mit:false" , "max.poll.interval.ms:6000" },errorHandler= "kafkaDefaultListenerErrorHandler" )

@KafkaListener使用

KafkaListenerEndpointRegistry

?

1

2

3

4

@Autowired

private KafkaListenerEndpointRegistry registry;

    //.... 获取所有注册的监听器

     registry.getAllListenerContainers();

设置入参验证器

当您将Spring Boot与验证启动器一起使用时,将LocalValidatorFactoryBean自动配置:如下

?

1

2

3

4

5

6

7

8

9

10

11

12

13

@Configuration

@EnableKafka

public class Config implements KafkaListenerConfigurer {

 

     @Autowired

     private LocalValidatorFactoryBean validator;

     ...

 

     @Override

     public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {

       registrar.setValidator( this .validator);

     }

}

使用

?

1

2

3

4

5

6

7

8

9

10

11

12

@KafkaListener (id= "validated" , topics = "annotated35" , errorHandler = "validationErrorHandler" ,

       containerFactory = "kafkaJsonListenerContainerFactory" )

public void validatedListener( @Payload @Valid ValidatedClass val) {

     ...

}

 

@Bean

public KafkaListenerErrorHandler validationErrorHandler() {

     return (m, e) -> {

         ...

     };

}

spring-kafka官方文档

扩展:Spring for Apache Kafka @KafkaListener使用及注意事项

官方文档:   https://docs.spring.io/spring-kafka/reference/html/

  @KafkaListener

The  @KafkaListener  annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a  MessagingMessageListenerAdapter  configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

If, say, six  TopicPartition  instances are provided and the  concurrency  is  3 ; each container gets two partitions. For five  TopicPartition  instances, two containers get two partitions, and the third gets one. If the  concurrency  is greater than the number of  TopicPartitions , the  concurrency  is adjusted down such that each container gets one partition.

You can now configure a  KafkaListenerErrorHandler  to handle exceptions. See Handling Exceptions for more information.

By default, the  @KafkaListener   id  property is now used as the  group.id  property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the  groupId  on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different  group.id  values for listeners. To restore the previous behavior of using the factory configured  group.id , set the  idIsGroup  property on the annotation to  false .

示例:

   demo类:

?

1

2

3

4

5

6

7

8

9

10

public class Listener {

 

     @KafkaListener (id = "foo" , topics = "myTopic" , clientIdPrefix = "myClientId" )

     public void listen(String data) {

         ...

     }

 

}</code>

 

配置类及注解:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

@Configuration

@EnableKafka

public class KafkaConfig {

 

     @Bean

     KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>

                         kafkaListenerContainerFactory() {

         ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

                                 new ConcurrentKafkaListenerContainerFactory<>();

         factory.setConsumerFactory(consumerFactory());

         factory.setConcurrency( 3 );

         factory.getContainerProperties().setPollTimeout( 3000 );

         return factory;

     }

 

     @Bean

     public ConsumerFactory<Integer, String> consumerFactory() {

         return new DefaultKafkaConsumerFactory<>(consumerConfigs());

     }

 

     @Bean

     public Map<String, Object> consumerConfigs() {

         Map<String, Object> props = new HashMap<>();

         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());

         ...

         return props;

     }

}

到此这篇关于spring-kafka @KafkaListener详解与使用的文章就介绍到这了,更多相关spring-kafka使用内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://szzdzhp.blog.csdn.net/article/details/109803987

查看更多关于spring kafka @KafkaListener详解与使用过程的详细内容...

  阅读:39次