好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

关于spring boot整合kafka+注解方式

spring boot自动配置方式整合

spring boot具有许多自动化配置,对于kafka的自动化配置当然也包含在内,基于spring boot自动配置方式整合kafka,需要做以下步骤。

引入kafka的pom依赖包

?

1

2

3

4

5

6

<!-- https://mvnrepository测试数据/artifact/org.springframework.kafka/spring-kafka -->

< dependency >

     < groupId >org.springframework.kafka</ groupId >

     < artifactId >spring-kafka</ artifactId >

     < version >2.2.2.RELEASE</ version >

</ dependency >

在配置文件中配置kafka相关属性配置,分别配置生产者和消费者的属性,在程序启动时,spring boot框架会自动读取这些配置的属性,创建相关的生产者、消费者等。下面展示一个简单的配置。

?

1

2

3

4

5

6

7

8

9

#kafka默认消费者配置

spring.kafka.consumer.bootstrap-servers= 192.168 . 0.15 : 9092

spring.kafka.consumer.enable-auto-commit= false

spring.kafka.consumer.auto-offset-reset=earliest

#kafka默认生产者配置

spring.kafka.producer.bootstrap-servers= 192.168 . 0.15 : 9092

spring.kafka.producer.acks=- 1

spring.kafka.client-id=kafka-producer

spring.kafka.producer.batch-size= 5

当然,在实际生产中的配置肯定比上面的配置复杂,需要一些定制化的操作,那么spring boot的自动化配置创建的生产者或者消费者都不能满足我们时,应该需要自定义化相关配置,这个在后续举例,这里先分析自动化配置。

在进行了如上配置之后,需要生产者时,使用方式为下代码所示。

?

1

2

3

4

5

6

7

8

9

10

11

12

@RunWith (SpringRunner. class )

@SpringBootTest (classes = {UserSSOApplication. class })

public class UserSSOApplicationTests {

    @Resource

     //注入kafkatemplete,这个由spring boot自动创建

    KafkaTemplate kafkaTemplate;

    @Test

    public void testKafkaSendMsg() {

        //发送消息

       kafkaTemplate.send( "test" , 0 , 12 , "1222" );

    }

}

消费者的使用注解方式整合,代码如下。

?

1

2

3

4

5

6

7

8

9

@Component

@Slf4j

public class KafkaMessageReceiver2 {

     //指定监听的topic,当前消费者组id

     @KafkaListener (topics = { "test" }, groupId = "receiver" )

     public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) {

         log.info(integerStringConsumerRecords.value());

     }

}

上面是最简单的配置,实现的一个简单例子,如果需要更加定制化的配置,可以参考类

org.springframework.boot.autoconfigure.kafka.KafkaProperties这里面包含了大部分需要的kafka配置。针对配置,在properties文件中添加即可。

spring boot自动配置的不足

上面是依赖spring boot自动化配置完成的整合方式,实际上所有的配置实现都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成。可以看出个是依赖于@Configuration完成bean配置,这种配置方式基本能够实现大部分情况,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。

但是这种方式还有一个问题,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并没有涵盖所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,这就导致某些特殊配置不能依赖spring boot自动创建,需要我们手动创建Producer和comsumer。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

@Configuration

@ConditionalOnClass (KafkaTemplate. class )

@EnableConfigurationProperties (KafkaProperties. class )

@Import (KafkaAnnotationDrivenConfiguration. class )

public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    private final RecordMessageConverter messageConverter;

    public KafkaAutoConfiguration(KafkaProperties properties,

          ObjectProvider<RecordMessageConverter> messageConverter) {

       this .properties = properties;

       this .messageConverter = messageConverter.getIfUnique();

    }

    @Bean

    @ConditionalOnMissingBean (KafkaTemplate. class )

    public KafkaTemplate<?, ?> kafkaTemplate(

          ProducerFactory<Object, Object> kafkaProducerFactory,

          ProducerListener<Object, Object> kafkaProducerListener) {

       KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(

             kafkaProducerFactory);

       if ( this .messageConverter != null ) {

          kafkaTemplate.setMessageConverter( this .messageConverter);

       }

       kafkaTemplate.setProducerListener(kafkaProducerListener);

       kafkaTemplate.setDefaultTopic( this .properties.getTemplate().getDefaultTopic());

       return kafkaTemplate;

    }

    @Bean

    @ConditionalOnMissingBean (ConsumerFactory. class )

    public ConsumerFactory<?, ?> kafkaConsumerFactory() {

       return new DefaultKafkaConsumerFactory<>(

             this .properties.buildConsumerProperties());

    }

    @Bean

    @ConditionalOnMissingBean (ProducerFactory. class )

    public ProducerFactory<?, ?> kafkaProducerFactory() {

       DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(

             this .properties.buildProducerProperties());

       String transactionIdPrefix = this .properties.getProducer()

             .getTransactionIdPrefix();

       if (transactionIdPrefix != null ) {

          factory.setTransactionIdPrefix(transactionIdPrefix);

       }

       return factory;

    }

   //略略略

}

spring boot下手动配置kafka

由于需要对某些特殊配置进行配置,我们可能需要手动配置kafka相关的bean,创建一个配置类如下,类似于

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,这里创建了对应类型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的对应Bean定义将不起作用。

所有的生产者配置可以参考ProducerConfig类,所有的消费者配置可以参考ConsumerConfig类。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

/**

  * kafka配置,实际上,在KafkaAutoConfiguration中已经有默认的根据配置文件信息创建配置,但是自动配置属性没有涵盖所有

  * 我们可以自定义创建相关bean,进行如下配置

  *

  * @author zhoujy

  * @date 2018年12月17日

  **/

@Configuration

public class KafkaConfig {

     @Value ( "${spring.kafka.consumer.bootstrap-servers}" )

     private String bootstrapServers;

     //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多

     private Map<String, Object> consumerProperties(){

         Map<String, Object> props = new HashMap<>();

         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000" );

         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer. class );

         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class );

         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5 );

         props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service" );

         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

         return props;

     }

     /**

      * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式

      * @return

      */

     @Bean ( "consumerFactory" )

     public DefaultKafkaConsumerFactory consumerFactory(){

         return new DefaultKafkaConsumerFactory(consumerProperties());

     }

     @Bean ( "listenerContainerFactory" )

     //个性化定义消费者

     public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {

         //指定使用DefaultKafkaConsumerFactory

         ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

         factory.setConsumerFactory(consumerFactory);

        

  //设置消费者ack模式为手动,看需求设置 

         factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

         //设置可批量拉取消息消费,拉取数量一次3,看需求设置

         factory.setConcurrency( 3 );

         factory.setBatchListener( true );

         return factory;

     }

    /* @Bean

      //代码创建方式topic

     public NewTopic batchTopic() {

         return new NewTopic("topic.quick.batch", 8, (short) 1);

     }*/

     //创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多

     private Map<String, Object> producerProperties(){

         Map<String, Object> props = new HashMap<>();

         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer. class );

         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class );

         props.put(ProducerConfig.ACKS_CONFIG, "-1" );

         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5 );

         props.put(ProducerConfig.LINGER_MS_CONFIG, 500 );

         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

         return props;

     }

     /**

      * 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义

      * @return

      */

     @Bean ( "produceFactory" )

     public DefaultKafkaProducerFactory produceFactory(){

         return new DefaultKafkaProducerFactory(producerProperties());

     }

     /**

      * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义

      * @param produceFactory

      * @return

      */

     @Bean

     public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){

         return new KafkaTemplate(produceFactory);

     }

}

生产者的使用方式,跟自动配置一样,直接注入KafkaTemplate即可。主要是消费者的使用有些不同。

批量消费消息

上面的消费者配置配置了一个bean,@Bean([listenerContainerFactory]),这个bean可以指定为消费者,注解方式中是如下的使用方式。

containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作为消费者。

注意registryReceiver中的参数,ConsumerRecord对比之前的消费者,因为设置listenerContainerFactory是批量消费,因此ConsumerRecord是一个List,如果不是批量消费的话,相对应就是一个对象。 注意第二个参数Acknowledgment,这个参数只有在设置消费者的ack应答模式为AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手动ack。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

@Component

@Slf4j

public class KafkaMessageReceiver {  

     /**

      * listenerContainerFactory设置了批量拉取消息,因此参数是List<ConsumerRecord<Integer, String>>,否则是ConsumerRecord

      * @param integerStringConsumerRecords

      * @param acknowledgment

      */

     @KafkaListener (topics = { "test" }, containerFactory = "listenerContainerFactory" )

     public void registryReceiver(List<ConsumerRecord<Integer, String>> integerStringConsumerRecords, Acknowledgment acknowledgment) {

         Iterator<ConsumerRecord<Integer, String>> it = integerStringConsumerRecords.iterator();

         while (it.hasNext()){

             ConsumerRecord<Integer, String> consumerRecords = it.next();

             //dosome

              acknowledgment.acknowledge();

         }

     }

}

如果不想要批量消费消息,那就可以另外定义一个bean类似于@Bean([listenerContainerFactory]),如下,只要不设置批量消费即可。

?

1

2

3

4

5

6

7

8

9

10

11

@Bean ( "listenerContainerFactory2" )

     //个性化定义消费者

     public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) {

         //指定使用DefaultKafkaConsumerFactory

         ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

         factory.setConsumerFactory(consumerFactory);

        

  //设置消费者ack模式为手动,看需求设置 

         factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

         return factory;

     }

spring boot整合kafka报错

Timeout expired while fetching topic metadata

这种报错应检查 kafka连接问题,服务是否启动,端口是否正确。

Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time

这种报错要考虑kafka和spring对应的版本问题,我的springboot 2.1.2在使用kafka_2.12-2.1.1时出现此问题,将kafka版本换为2.11-1.1.1后,问题解决。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/qq_20597727/article/details/85085532

查看更多关于关于spring boot整合kafka+注解方式的详细内容...

  阅读:12次