好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

SpringBoot Webflux创建TCP/UDP server并使用handler解析数据

1.pom依赖

引用spring-boot-starter-webflux依赖

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

<? xml version = "1.0" encoding = "UTF-8" ?>

< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://HdhCmsTestw3.org/2001/XMLSchema-instance"

     xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >

     < modelVersion >4.0.0</ modelVersion >

     < parent >

         < groupId >org.springframework.boot</ groupId >

         < artifactId >spring-boot-starter-parent</ artifactId >

         < version >2.1.3.RELEASE</ version >

         < relativePath /> <!-- lookup parent from repository -->

     </ parent >

     < groupId >com.example</ groupId >

     < artifactId >demo</ artifactId >

     < version >0.0.1-SNAPSHOT</ version >

     < name >demo</ name >

     < description >Demo project for Spring Boot</ description >

     < properties >

         < java.version >1.8</ java.version >

     </ properties >

     < dependencies >

         < dependency >

             < groupId >org.springframework.boot</ groupId >

             < artifactId >spring-boot-starter</ artifactId >

         </ dependency >

         < dependency >

             < groupId >org.springframework.boot</ groupId >

             < artifactId >spring-boot-starter-test</ artifactId >

             < scope >test</ scope >

         </ dependency >

         < dependency >

             < groupId >org.springframework.boot</ groupId >

             < artifactId >spring-boot-starter-webflux</ artifactId >

         </ dependency >

     </ dependencies >

     < build >

         < plugins >

             < plugin >

                 < groupId >org.springframework.boot</ groupId >

                 < artifactId >spring-boot-maven-plugin</ artifactId >

             </ plugin >

         </ plugins >

     </ build >

</ project >

2.创建UDP/TCP Server

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

package com.example.demo;

import com.example.demo.handler.TcpDecoderHanlder;

import com.example.demo.handler.UdpDecoderHanlder;

import org.springframework.boot.CommandLineRunner;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.annotation.Bean;

import reactor.core.publisher.Flux;

import reactor.netty.tcp.TcpServer;

import reactor.netty.udp.UdpServer;

import java.time.Duration;

@SpringBootApplication

public class DemoApplication {

     public static void main(String[] args) {

         SpringApplication.run(DemoApplication. class , args);

     }

     @Bean

     CommandLineRunner serverRunner(UdpDecoderHandler udpDecoderHandler, TcpDecoderHandler tcpDecoderHandler) {

         return strings -> {

             createUdpServer(udpDecoderHandler);

             createTcpServer(tcpDecoderHandler);

         };

     }

     /**

      * 创建UDP Server

      * @param udpDecoderHanlder: 解析UDP Client上报数据handler

      */

     private void createUdpServer(UdpDecoderHandler udpDecoderHandler) {

         UdpServer.create()

                 .handle((in,out) -> {

                     in.receive()

                             .asByteArray()

                             .subscribe();

                     return Flux.never();

                 })

                 .port( 8888 )

                 .doOnBound(conn -> conn.addHandlerLast( "decoder" ,udpDecoderHandler)) //可以添加多个handler

                 .bindNow(Duration.ofSeconds( 30 ));

     }

     /**

      * 创建TCP Server

      * @param tcpDecoderHanlder: 解析TCP Client上报数据的handler

      */

     private void createTcpServer(TcpDecoderHandler tcpDecoderHandler) {

         TcpServer.create()

                 .handle((in,out) -> {

                     in.receive()

                             .asByteArray()

                             .subscribe();

                     return Flux.never();

                 })

                 .doOnConnection(conn ->

                         conn.addHandler(tcpDecoderHandler)) //实例只写了如何添加handler,可添加delimiter,tcp生命周期,decoder,encoder等handler

                 .port( 9999 )

                 .bindNow();

     }

}

3.数据解析handler(具体解析根据协议来)

解析UDP数据handler

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

package com.example.demo.handler;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.socket.DatagramPacket;

import io.netty.handler.codec.MessageToMessageDecoder;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Service;

import java.util.List;

@Service

public class UdpDecoderHandler extends MessageToMessageDecoder<DatagramPacket>  {

     private static final Logger LOGGER = LoggerFactory.getLogger(TcpDecoderHandler. class );

     @Override

     protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket byteBuf, List<Object> list) throws Exception {

         ByteBuf byteBuf1 = byteBuf.content();

         int size = byteBuf1.readableBytes();

         byte [] data = new byte [size];

         byteBuf1.readBytes(data);

         LOGGER.info( new String(data));

     }

}

解析TCP数据handler

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

package com.example.demo.handler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToMessageDecoder;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Service;

import java.util.List;

@Service

public class TcpDecoderHandler extends MessageToMessageDecoder  {

     private static final Logger LOGGER = LoggerFactory.getLogger(TcpDecoderHandler. class );

     @Override

     protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list){

         LOGGER.info( "解析client上报数据" );

     }

}

4.测试工具

推荐使用SocketTool调试TCP/UDP协议

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/Peng__Chao/article/details/89478229

查看更多关于SpringBoot Webflux创建TCP/UDP server并使用handler解析数据的详细内容...

  阅读:23次