好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java实现Kafka生产者和消费者的示例

Kafka简介

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。

方式一:kafka-clients

引入依赖

在pom.xml文件中,引入kafka-clients依赖:

?

1

2

3

4

5

< dependency >

   < groupId >org.apache.kafka</ groupId >

   < artifactId >kafka-clients</ artifactId >

   < version >2.3.1</ version >

</ dependency >

生产者

创建一个KafkaProducer的生产者实例:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

@Configuration

public class Config {

 

   public final static String bootstrapServers = "127.0.0.1:9092" ;

 

   @Bean (destroyMethod = "close" )

   public KafkaProducer<String, String> kafkaProducer() {

     Properties props = new Properties();

     //设置Kafka服务器地址

     props.put( "bootstrap.servers" , bootstrapServers);

     //设置数据key的序列化处理类

     props.put( "key.serializer" , StringSerializer. class .getName());

     //设置数据value的序列化处理类

     props.put( "value.serializer" , StringSerializer. class .getName());

     KafkaProducer<String, String> producer = new KafkaProducer<>(props);

     return producer;

   }

}

在Controller中进行使用:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

@RestController

@Slf4j

public class Controller {

 

   @Autowired

   private KafkaProducer<String, String> kafkaProducer;

 

   @RequestMapping ( "/kafkaClientsSend" )

   public String send() {

     String uuid = UUID.randomUUID().toString();

     RecordMetadata recordMetadata = null ;

     try {

      //将消息发送到Kafka服务器的名称为[one-more-topic]的Topic中

       recordMetadata = kafkaProducer.send( new ProducerRecord<>( "one-more-topic" , uuid)).get();

       log.info( "recordMetadata: {}" , recordMetadata);

       log.info( "uuid: {}" , uuid);

     } catch (Exception e) {

       log.error( "send fail, uuid: {}" , uuid, e);

     }

     return uuid;

   }

}

消费者

创建一个KafkaConsumer的消费者实例:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

@Configuration

public class Config {

 

   public final static String groupId = "kafka-clients-group" ;

   public final static String bootstrapServers = "127.0.0.1:9092" ;

 

   @Bean (destroyMethod = "close" )

   public KafkaConsumer<String, String> kafkaConsumer() {

     Properties props = new Properties();

     //设置Kafka服务器地址

     props.put( "bootstrap.servers" , bootstrapServers);

     //设置消费组

     props.put( "group.id" , groupId);

     //设置数据key的反序列化处理类

     props.put( "key.deserializer" , StringDeserializer. class .getName());

     //设置数据value的反序列化处理类

     props.put( "value.deserializer" , StringDeserializer. class .getName());

     props.put( "enable.auto测试数据mit" , "true" );

     props.put( "auto测试数据mit.interval.ms" , "1000" );

     props.put( "session.timeout.ms" , "30000" );

     KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

     //订阅名称为[one-more-topic]的Topic的消息

     kafkaConsumer.subscribe(Arrays.asList( "one-more-topic" ));

     return kafkaConsumer;

   }

}

在Controller中进行使用:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

@RestController

@Slf4j

public class Controller {

 

   @Autowired

   private KafkaConsumer<String, String> kafkaConsumer;

 

   @RequestMapping ( "/receive" )

   public List<String> receive() {

    从Kafka服务器中的名称为[one-more-topic]的Topic中消费消息

     ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds( 1 ));

     List<String> messages = new ArrayList<>(records.count());

     for (ConsumerRecord<String, String> record : records.records( "one-more-topic" )) {

       String message = record.value();

       log.info( "message: {}" , message);

       messages.add(message);

     }

     return messages;

   }

}

 

方式二:spring-kafka

使用kafka-clients需要我们自己创建生产者或者消费者的bean,如果我们的项目基于SpringBoot构建,那么使用spring-kafka就方便多了。

引入依赖

在pom.xml文件中,引入spring-kafka依赖:

?

1

2

3

4

5

< dependency >

   < groupId >org.springframework.kafka</ groupId >

   < artifactId >spring-kafka</ artifactId >

   < version >2.3.12.RELEASE</ version >

</ dependency >

生产者

在application.yml文件中增加配置:

?

1

2

3

4

5

6

7

spring:

  kafka:

   #Kafka服务器地址

   bootstrap-servers: 127.0.0.1:9092

   producer:

    #设置数据value的序列化处理类

    value-serializer: org.apache.kafka测试数据mon.serialization.StringSerializer

在Controller中注入KafkaTemplate就可以直接使用了,代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

@RestController

@Slf4j

public class Controller {

 

   @Autowired

   private KafkaTemplate<String, String> template;

 

   @RequestMapping ( "/springKafkaSend" )

   public String send() {

     String uuid = UUID.randomUUID().toString();

     //将消息发送到Kafka服务器的名称为[one-more-topic]的Topic中

     this .template.send( "one-more-topic" , uuid);

     log.info( "uuid: {}" , uuid);

     return uuid;

   }

}

消费者

在application.yml文件中增加配置:

?

1

2

3

4

5

6

7

spring:

  kafka:

   #Kafka服务器地址

   bootstrap-servers: 127.0.0.1:9092

   consumer:

    #设置数据value的反序列化处理类

    value-deserializer: org.apache.kafka测试数据mon.serialization.StringDeserializer

创建一个可以被Spring框架扫描到的类,并且在方法上加上@KafkaListener注解,就可以消费消息了,代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

@Component

@Slf4j

public class Receiver {

 

   @KafkaListener (topics = "one-more-topic" , groupId = "spring-kafka-group" )

   public void listen(ConsumerRecord<?, ?> record) {

     Optional<?> kafkaMessage = Optional.ofNullable(record.value());

     if (kafkaMessage.isPresent()) {

       String message = (String) kafkaMessage.get();

       log.info( "message: {}" , message);

     }

   }

}

到此这篇关于Java实现Kafka生产者和消费者的示例的文章就介绍到这了,更多相关Java Kafka生产者和消费者 内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/heihaozi/article/details/111042472

查看更多关于Java实现Kafka生产者和消费者的示例的详细内容...

  阅读:26次