好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Springboot整合Kafka Stream实时统计数据

环境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka Stream介绍

Kafka在0.10版本推出了Stream API,提供了对存储在Kafka内的数据进行流式处理和分析的能力。

流式计算一般被用来和批量计算做比较。批量计算往往有一个固定的数据集作为输入并计算结果。而流式计算的输入往往是[无界]的(Unbounded Data),持续输入的,即永远拿不到全量数据去做计算;同时,计算结果也是持续输出的,只能拿到某一个时刻的结果,而不是最终的结果。

Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。

Kafka Streams的门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异,可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。

Kafka Streams的一些特点:

被设计成一个简单的、轻量级的客户端类库,能够被集成到任何Java应用中 除了Kafka之外没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性 通过可容错的状态存储实现高效的状态操作(windowed joins and aggregations) 支持exactly-once语义 支持纪录级的处理,实现毫秒级的延迟 提供High-Level的Stream DSL和Low-Level的Processor API

Stream Processing Topology流处理拓扑

流是Kafka Streams提供的最重要的抽象:它表示一个无限的、不断更新的数据集。流是不可变数据记录的有序、可重放和容错序列,其中数据记录定义为键值对。 Stream Processing Application是使用了Kafka Streams库的应用程序。它通过processor topologies定义计算逻辑,其中每个processor topology都是多个stream processor(节点)通过stream组成的图。 A stream processor 是处理器拓扑中的节点;它表示一个处理步骤,通过每次从拓扑中的上游处理器接收一个输入记录,将其操作应用于该记录,来转换流中的数据,并且随后可以向其下游处理器生成一个或多个输出记录。

有两种特殊的processor:

Source Processor 源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个kafka topic的记录并将其转发到其下游处理器,从而从一个或多个kafka topic生成其拓扑的输入流。

Sink Processor 接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka topic。

相关的核心概念查看如下链接

下面演示Kafka Stream 在Springboot中的应用

依赖

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId>    </dependency>  <dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId>  </dependency>  <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-streams</artifactId>  </dependency> 

配置

server:    port: 9090  spring:    application:       name : kafka-demo    kafka:      streams:        application-id: ${spring.application. name }        properties:          spring.json.trusted.packages:  '*'       bootstrap-servers:      - localhost:9092      - localhost:9093      - localhost:9094      producer:        acks: 1        retries: 10         key -serializer: org.apache.kafka测试数据mon.serialization.StringSerializer        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka测试数据mon.serialization.StringSerializer        properties:          spring.json.trusted.packages:  '*'       consumer:         key -deserializer: org.apache.kafka测试数据mon.serialization.StringDeserializer        value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka测试数据mon.serialization.StringDeserializer        enable-auto- commit :  false          group -id: ConsumerTest        auto-offset-reset: latest        properties:          session.timeout.ms: 12000          heartbeat.interval.ms: 3000           max .poll.records: 100          spring.json.trusted.packages:  '*'       listener:        ack-mode: manual-immediate        type: batch        concurrency: 8      properties:         max .poll.interval.ms: 300000 

消息发送

@Service  public  class MessageSend {    @Resource    private KafkaTemplate<String, Message> kafkaTemplate ;     public  void sendMessage2(Message message) {      kafkaTemplate.send(new ProducerRecord<String, Message>( "test" , message)).addCallback(result -> {        System. out .println( "执行成功..."  + Thread.currentThread().getName()) ;      }, ex -> {        System. out .println( "执行失败" ) ;        ex.printStackTrace() ;      }) ;    }  } 

消息监听

@KafkaListener(topics = { "test" })  public  void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {     for  (ConsumerRecord<String, Message> record : records) {      System. out .println(this.getClass().hashCode() +  ", Thread"  + Thread.currentThread().getName() +  ", key: "  + record. key () +  ", 接收到消息:"  + record.value() +  ", patition: "  + record.partition() +  ", offset: "  + record.offset()) ;    }    try {      TimeUnit.SECONDS.sleep(0) ;    } catch (InterruptedException e) {      e.printStackTrace();    }    ack.acknowledge() ;  }        @KafkaListener(topics = { "demo" })  public  void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {     for  (ConsumerRecord<String, Message> record : records) {      System. out .println( "Demo Topic: "  + this.getClass().hashCode() +  ", Thread"  + Thread.currentThread().getName() +  ", key: "  + record. key () +  ", 接收到消息:"  + record.value() +  ", patition: "  + record.partition() +  ", offset: "  + record.offset()) ;    }    ack.acknowledge() ;  } 

Kafka Stream处理

消息转换并转发其它Topic

@Bean  public  KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {    KStream<Object, Object> stream = streamsBuilder.stream( "test" );    stream.map(( key , value) -> {      System. out .println( "原始消息内容:"  + new String((byte[]) value, Charset.forName( "UTF-8" ))) ;       return  new KeyValue<>( key ,  "{\"title\": \"123123\", \"message\": \"重新定义内容\"}" .getBytes(Charset.forName( "UTF-8" ))) ;    }). to ( "demo" ) ;     return  stream;  } 

执行结果:

Stream对象处理

@Bean  public  KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;    JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;    descri.addTrustedPackages( "*" ) ;    KStream<String, Message> stream = streamsBuilder.stream( "test" , Consumed. with (Serdes.String(), jsonSerde));    stream.map(( key , value) -> {      value.setTitle( "XXXXXXX" ) ;       return  new KeyValue<>( key , value) ;    }). to ( "demo" , Produced. with (Serdes.String(), jsonSerde)) ;     return  stream;  } 

执行结果:

分组处理

@Bean  public  KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;    JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;    descri.addTrustedPackages( "*" ) ;    KStream<String, Message> stream = streamsBuilder.stream( "test" , Consumed. with (Serdes.String(), jsonSerde));    stream.selectKey(new KeyValueMapper<String, Message, String>() {      @Override       public  String apply(String  key , Message value) {         return  value.getOrgCode() ;      }    })    .groupByKey(Grouped. with (Serdes.String(), jsonSerde))    . count ()    .toStream().print(Printed.toSysOut());     return  stream;  } 

执行结果:

聚合

@Bean  public  KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;    JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;    descri.addTrustedPackages( "*" ) ;    KStream<String, Message> stream = streamsBuilder.stream( "test" , Consumed. with (Serdes.String(), jsonSerde));    stream.selectKey(new KeyValueMapper<String, Message, String>() {      @Override       public  String apply(String  key , Message value) {         return  value.getOrgCode() ;      }    })    .groupByKey(Grouped. with (Serdes.String(), jsonSerde))    .aggregate(() -> 0L, ( key , value ,aggValue) -> {      System. out .println( "key = "  +  key  +  ", value = "  + value +  ", agg = "  + aggValue) ;       return  aggValue + 1 ;    }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>> as ( "kvs" ).withValueSerde(Serdes.Long()))    .toStream().print(Printed.toSysOut());     return  stream;  } 

执行结果:

Filter过滤数据

@Bean  public  KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;    JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;    descri.addTrustedPackages( "*" ) ;    KStream<String, Message> stream = streamsBuilder.stream( "test" , Consumed. with (Serdes.String(), jsonSerde));    stream.selectKey(new KeyValueMapper<String, Message, String>() {      @Override       public  String apply(String  key , Message value) {         return  value.getOrgCode() ;      }    })    .groupByKey(Grouped. with (Serdes.String(), jsonSerde))    .aggregate(() -> 0L, ( key , value ,aggValue) -> {      System. out .println( "key = "  +  key  +  ", value = "  + value +  ", agg = "  + aggValue) ;       return  aggValue + 1 ;    }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>> as ( "kvs" ).withValueSerde(Serdes.Long()))    .toStream()    .filter(( key , value) -> ! "2" .equals( key ))    .print(Printed.toSysOut());     return  stream;  } 

执行结果:

过滤Key不等于"2"

分支多流处理

@Bean  public  KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;    JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;    descri.addTrustedPackages( "*" ) ;    KStream<String, Message> stream = streamsBuilder.stream( "test" , Consumed. with (Serdes.String(), jsonSerde));    // 分支,多流处理    KStream<String, Message>[] arrStream = stream.branch(      ( key , value) ->  "男" .equals(value.getSex()),       ( key , value) ->  "女" .equals(value.getSex()));    Stream. of (arrStream).forEach( as  -> {       as .foreach(( key , message) -> {        System. out .println(Thread.currentThread().getName() +  ", key = "  +  key  +  ", message = "  + message) ;      });    });     return  stream;  } 

执行结果:

多字段分组

不能使用多个selectKey,后面的会覆盖前面的

@Bean  public  KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;    JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;    descri.addTrustedPackages( "*" ) ;    KStream<String, Message> stream = streamsBuilder.stream( "test" , Consumed. with (Serdes.String(), jsonSerde));    stream    .selectKey(new KeyValueMapper<String, Message, String>() {      @Override       public  String apply(String  key , Message value) {        System. out .println(Thread.currentThread().getName()) ;         return  value.getTime() +  " | "  + value.getOrgCode() ;      }    })    .groupByKey(Grouped. with (Serdes.String(), jsonSerde))    . count ()    .toStream().print(Printed.toSysOut());     return  stream;  } 

执行结果:

原文链接:https://HdhCmsTesttoutiao测试数据/i6994698489594790431/

查看更多关于Springboot整合Kafka Stream实时统计数据的详细内容...

  阅读:41次