一、介绍
在上篇文章中,我们详细的介绍了 kafka 的架构模型,在集群环境中,kafka 可以通过设置分区数来加快数据的消费速度。
光知道理论还不行,我们得真真切切的实践起来才行!
下面,我将结合生产环境的真实案例,以SpringBoot技术框架为基础,向大家介绍 kafka 的使用以及如何实现数据高吞吐!
二、程序实践
最近,公司大数据团队每天凌晨会将客户的订单数据进行统计计算,然后把业绩数据推送给我们,以便销售人员每天能看到昨天的业绩数据,数据的体量大约在 1000 多万条,以下是我对接的过程!
2.1、添加 kafka 依赖包本次项目的SpringBoot版本为2.1.5.RELEASE,依赖的 kafka 的版本为2.2.6.RELEASE。
https : // back - media .51cto 测试数据 / editor?id = 707646 / h6e90be6 - 7 EV6kJbV2.2、添加 kafka 配置变量
当添加完了依赖包之后,我们只需要在application.properties中添加 kafka 配置变量,基本上就可以正常使用了。
# 指定kafka server的地址,集群配多个,中间,逗号隔开 spring .kafka .bootstrap - servers = 197.168 .25 .196 : 9092 #重试次数 spring .kafka .producer .retries = 3 #批量发送的消息数量 spring .kafka .producer .batch - size = 1000 #32MB的批处理缓冲区 spring .kafka .producer .buffer - memory = 33554432 #默认消费者组 spring .kafka .consumer .group - id = crm - microservice - newperformance #最早未被消费的offset spring .kafka .consumer .auto - offset - reset = earliest #批量一次最大拉取数据量 spring .kafka .consumer .max - poll - records = 4000 #是否自动提交 spring .kafka .consumer .enable - auto - commit = true #自动提交时间间隔,单位ms spring .kafka .consumer .auto - commit - interval = 10002.3、创建一个消费者
@Component public class BigDataTopicListener { private static final Logger log = LoggerFactory .getLogger ( BigDataTopicListener .class ) ; /** * 监听kafka数据 * @param consumerRecords * @param ack */ @KafkaListener ( topics = { "big_data_topic" } ) public void consumer ( ConsumerRecord < ? , ? > consumerRecord ) { log .info ( "收到bigData推送的数据'{}'" , consumerRecord .toString ( ) ) ; // ... // db .save ( consumerRecord ) ; // 插入或者更新数据 } }2.4、模拟对方推送数据测试
@RunWith ( SpringRunner .class ) @SpringBootTest public class KafkaProducerTest { @Autowired private KafkaTemplate < String , String > kafkaTemplate ; @Test public void testSend ( ) { for ( int i = 0 ; i < 5000 ; i ++ ) { Map < String , Object > map = new LinkedHashMap <> ( ) ; map .put ( "datekey" , 20210610 ) ; map .put ( "userid" , i ) ; map .put ( "salaryAmount" , i ) ; // 向kafka的big_data_topic主题推送数据 kafkaTemplate .send ( "big_data_topic" , JSONObject .toJSONString ( map ) ) ; } } }
起初,通过这种单条数据消费方式,进行测试程序没太大毛病!
但是,当上到生产之后,发现一个很大的问题,就是消费1000万条数据,至少需要3个小时,结果导致数据看板一直没数据。
第二天痛定思痛,决定改成批量消费模型,怎么操作呢,请看下面!
2.5、将 kafka 的消费模式改成批量消费首先,创建一个KafkaConfiguration配置类,内容如下!
@Configuration public class KafkaConfiguration { @Value ( "${spring.kafka.bootstrap-servers}" ) private String bootstrapServers ; @Value ( "${spring.kafka.producer.retries}" ) private Integer retries ; @Value ( "${spring.kafka.producer.batch-size}" ) private Integer batchSize ; @Value ( "${spring.kafka.producer.buffer-memory}" ) private Integer bufferMemory ; @Value ( "${spring.kafka.consumer.group-id}" ) private String groupId ; @Value ( "${spring.kafka.consumer.auto-offset-reset}" ) private String autoOffsetReset ; @Value ( "${spring.kafka.consumer.max-poll-records}" ) private Integer maxPollRecords ; @Value ( "${spring.kafka.consumer.batch.concurrency}" ) private Integer batchConcurrency ; @Value ( "${spring.kafka.consumer.enable-auto-commit}" ) private Boolean autoCommit ; @Value ( "${spring.kafka.consumer.auto-commit-interval}" ) private Integer autoCommitInterval ; /** * 生产者配置信息 */ @Bean public Map < String , Object > producerConfigs ( ) { Map < String , Object > props = new HashMap <> ( ) ; props .put ( ProducerConfig .ACKS_CONFIG , "0" ) ; props .put ( ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers ) ; props .put ( ProducerConfig .RETRIES_CONFIG , retries ) ; props .put ( ProducerConfig .BATCH_SIZE_CONFIG , batchSize ) ; props .put ( ProducerConfig .LINGER_MS_CONFIG , 1 ) ; props .put ( ProducerConfig .BUFFER_MEMORY_CONFIG , bufferMemory ) ; props .put ( ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class ) ; props .put ( ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class ) ; return props ; } /** * 生产者工厂 */ @Bean public ProducerFactory < String , String > producerFactory ( ) { return new DefaultKafkaProducerFactory <> ( producerConfigs ( ) ) ; } /** * 生产者模板 */ @Bean public KafkaTemplate < String , String > kafkaTemplate ( ) { return new KafkaTemplate <> ( producerFactory ( ) ) ; } /** * 消费者配置信息 */ @Bean public Map < String , Object > consumerConfigs ( ) { Map < String , Object > props = new HashMap <> ( ) ; props .put ( ConsumerConfig .GROUP_ID_CONFIG , groupId ) ; props .put ( ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , autoOffsetReset ) ; props .put ( ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers ) ; props .put ( ConsumerConfig .MAX_POLL_RECORDS_CONFIG , maxPollRecords ) ; props .put ( ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , autoCommit ) ; props .put ( ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG , 30000 ) ; props .put ( ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , 30000 ) ; props .put ( ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class ) ; props .put ( ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class ) ; return props ; } /** * 消费者批量工厂 */ @Bean public KafkaListenerContainerFactory < ? > batchFactory ( ) { ConcurrentKafkaListenerContainerFactory < Integer , String > factory = new ConcurrentKafkaListenerContainerFactory <> ( ) ; factory .setConsumerFactory ( new DefaultKafkaConsumerFactory <> ( consumerConfigs ( ) ) ) ; // 设置并发量,小于或等于Topic的分区数 factory .setConcurrency ( batchConcurrency ) ; factory .getContainerProperties ( ) .setPollTimeout ( 1500 ) ; factory .getContainerProperties ( ) .setAckMode ( ContainerProperties .AckMode .MANUAL_IMMEDIATE ) ; // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig .MAX_POLL_RECORDS_CONFIG factory .setBatchListener ( true ) ; return factory ; } }
同时,新增一个spring.kafka.consumer.batch.concurrency变量,用来设置并发数,通过这个参数我们可以指定几个线程来实现消费。
在application.properties配置文件中,添加如下变量:
#批消费并发量,小于或等于Topic的分区数 spring .kafka .consumer .batch .concurrency = 3 #设置每次批量拉取的最大数量为4000 spring .kafka .consumer .max - poll - records = 4000 #设置自动提交改成false spring .kafka .consumer .enable - auto - commit = false
最后,将单个消费方法改成批量消费方法模式。
@Component public class BigDataTopicListener { private static final Logger log = LoggerFactory .getLogger ( BigDataTopicListener .class ) ; /** * 监听kafka数据(批量消费) * @param consumerRecords * @param ack */ @KafkaListener ( topics = { "big_data_topic" } , containerFactory = "batchFactory" ) public void batchConsumer ( List < ConsumerRecord < ? , ? >> consumerRecords , Acknowledgment ack ) { long start = System .currentTimeMillis ( ) ; // ... // db .batchSave ( consumerRecords ) ; // 批量插入或者批量更新数据 // 手动提交 ack .acknowledge ( ) ; log .info ( "收到bigData推送的数据,拉取数据量:{},消费时间:{}ms" , consumerRecords .size ( ) , ( System .currentTimeMillis ( ) - start ) ) ; } }
此时,消费性能大大的提升,数据处理的非常快,500万条数据,最多 30 分钟就全部消费完毕了。
本例中的消费微服务,生产环境部署了3台服务器,同时big_data_topic主题的分区数为3,因此并发数设置为3比较合适。
随着推送的数据量不断增加,如果你觉得消费速度还不够,你可以重新设置每次批量拉取的最大数量,活着横向扩展微服务的集群实例数量和 topic 的分区数,以此来加快数据的消费速度。
但是,如果在单台机器中,每次批量拉取的最大数量过大,大对象也会很大,会造成频繁的 gc 告警!
因此,在实际的使用过程中,每次批量拉取的最大数量并不是越大越好,根据当前服务器的硬件配置,调节到合适的阀值,才是最优的选择!
三、小结
本文主要以SpringBoot技术框架为背景,结合实际业务需求,采用 kafka 进行数据消费,实现数据量的高吞吐,在下篇文章中,我们会介绍消费失败的处理流程。
原文地址:https://mp.weixin.qq测试数据/s/5uo5bWiJ7R3nfOSwWZ65Tg
查看更多关于SpringBoot 整合 Kafka 实现数据高吞吐的详细内容...