好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Springboot+Netty+Websocket实现消息推送实例

前言

websocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 websocket api 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

netty框架的优势

 1. api使用简单,开发门槛低;
 2. 功能强大,预置了多种编解码功能,支持多种主流协议;
 3. 定制能力强,可以通过channelhandler对通信框架进行灵活地扩展;
 4. 性能高,通过与其他业界主流的nio框架对比,netty的综合性能最优;
 5. 成熟、稳定,netty修复了已经发现的所有jdk nio bug,业务开发人员不需要再为nio的bug而烦恼

提示:以下是本篇文章正文内容,下面案例可供参考

一、引入netty依赖

?

1

2

3

4

5

<dependency>

    <groupid>io.netty</groupid>

    <artifactid>netty-all</artifactid>

    <version> 4.1 . 48 . final </version>

</dependency>

二、使用步骤

1.引入基础配置类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

package com.test.netty;

 

public enum cmd {

  start( "000" , "连接成功" ),

  wmessage( "001" , "消息提醒" ),

  ;

  private string cmd;

  private string desc;

 

  cmd(string cmd, string desc) {

   this .cmd = cmd;

   this .desc = desc;

  }

 

  public string getcmd() {

   return cmd;

  }

 

  public string getdesc() {

   return desc;

  }

}

2.netty服务启动监听器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

package com.test.netty;

 

import io.netty.bootstrap.serverbootstrap;

import io.netty.channel.channelfuture;

import io.netty.channel.channeloption;

import io.netty.channel.eventloopgroup;

import io.netty.channel.nio.nioeventloopgroup;

import io.netty.channel.socket.nio.nioserversocketchannel;

import lombok.extern.slf4j.slf4j;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.beans.factory.annotation.value;

import org.springframework.boot.applicationrunner;

import org.springframework.context.annotation.bean;

import org.springframework.stereotype测试数据ponent;

 

/**

  * @author test

  * <p>

  * 服务启动监听器

  **/

@slf4j

@component

public class nettyserver {

 

  @value ( "${server.netty.port}" )

  private int port;

 

  @autowired

  private serverchannelinitializer serverchannelinitializer;

 

  @bean

  applicationrunner nettyrunner() {

   return args -> {

    //new 一个主线程组

    eventloopgroup bossgroup = new nioeventloopgroup( 1 );

    //new 一个工作线程组

    eventloopgroup workgroup = new nioeventloopgroup();

    serverbootstrap bootstrap = new serverbootstrap()

      .group(bossgroup, workgroup)

      .channel(nioserversocketchannel. class )

      .childhandler(serverchannelinitializer)

      //设置队列大小

      .option(channeloption.so_backlog, 1024 )

      // 两小时内没有数据的通信时,tcp会自动发送一个活动探测数据报文

      .childoption(channeloption.so_keepalive, true );

    //绑定端口,开始接收进来的连接

    try {

     channelfuture future = bootstrap.bind(port).sync();

     log.info( "服务器启动开始监听端口: {}" , port);

     future.channel().closefuture().sync();

    } catch (interruptedexception e) {

     e.printstacktrace();

    } finally {

     //关闭主线程组

     bossgroup.shutdowngracefully();

     //关闭工作线程组

     workgroup.shutdowngracefully();

    }

   };

  }

}

3.netty服务端处理器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

package com.test.netty;

 

import com.test测试数据mon.util.jsonutil;

import io.netty.channel.channel;

import io.netty.channel.channelhandler;

import io.netty.channel.channelhandlercontext;

import io.netty.channel.simplechannelinboundhandler;

import io.netty.handler.codec.http.websocketx.textwebsocketframe;

import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler;

import lombok.data;

import lombok.extern.slf4j.slf4j;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.stereotype测试数据ponent;

 

import java.net.urldecoder;

import java.util.*;

 

/**

  * @author test

  * <p>

  * netty服务端处理器

  **/

@slf4j

@component

@channelhandler .sharable

public class nettyserverhandler extends simplechannelinboundhandler<textwebsocketframe> {

 

  @autowired

  private serverchannelcache cache;

  private static final string datakey = "test=" ;

 

  @data

  public static class channelcache {

  }

 

 

  /**

   * 客户端连接会触发

   */

  @override

  public void channelactive(channelhandlercontext ctx) throws exception {

   channel channel = ctx.channel();

   log.info( "通道连接已打开,id->{}......" , channel.id().aslongtext());

  }

 

  @override

  public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {

   if (evt instanceof websocketserverprotocolhandler.handshakecomplete) {

    channel channel = ctx.channel();

    websocketserverprotocolhandler.handshakecomplete handshakecomplete = (websocketserverprotocolhandler.handshakecomplete) evt;

    string requesturi = handshakecomplete.requesturi();

    requesturi = urldecoder.decode(requesturi, "utf-8" );

    log.info( "handshake_complete,id->{},uri->{}" , channel.id().aslongtext(), requesturi);

    string socketkey = requesturi.substring(requesturi.lastindexof(datakey) + datakey.length());

    if (socketkey.length() > 0 ) {

     cache.add(socketkey, channel);

     this .send(channel, cmd.down_start, null );

    } else {

     channel.disconnect();

     ctx.close();

    }

   }

   super .usereventtriggered(ctx, evt);

  }

 

  @override

  public void channelinactive(channelhandlercontext ctx) throws exception {

   channel channel = ctx.channel();

   log.info( "通道连接已断开,id->{},用户id->{}......" , channel.id().aslongtext(), cache.getcacheid(channel));

   cache.remove(channel);

  }

 

  /**

   * 发生异常触发

   */

  @override

  public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {

   channel channel = ctx.channel();

   log.error( "连接出现异常,id->{},用户id->{},异常->{}......" , channel.id().aslongtext(), cache.getcacheid(channel), cause.getmessage(), cause);

   cache.remove(channel);

   ctx.close();

  }

 

  /**

   * 客户端发消息会触发

   */

  @override

  protected void channelread0(channelhandlercontext ctx, textwebsocketframe msg) throws exception {

   try {

    // log.info("接收到客户端发送的消息:{}", msg.text());

    ctx.channel().writeandflush( new textwebsocketframe(jsonutil.tostring(collections.singletonmap( "cmd" , "100" ))));

   } catch (exception e) {

    log.error( "消息处理异常:{}" , e.getmessage(), e);

   }

  }

 

  public void send(cmd cmd, string id, object obj) {

   hashmap<string, channel> channels = cache.get(id);

   if (channels == null ) {

    return ;

   }

   map<string, object> data = new linkedhashmap<>();

   data.put( "cmd" , cmd.getcmd());

   data.put( "data" , obj);

   string msg = jsonutil.tostring(data);

   log.info( "服务器下发消息: {}" , msg);

   channels.values().foreach(channel -> {

    channel.writeandflush( new textwebsocketframe(msg));

   });

  }

 

  public void send(channel channel, cmd cmd, object obj) {

   map<string, object> data = new linkedhashmap<>();

   data.put( "cmd" , cmd.getcmd());

   data.put( "data" , obj);

   string msg = jsonutil.tostring(data);

   log.info( "服务器下发消息: {}" , msg);

   channel.writeandflush( new textwebsocketframe(msg));

  }

 

}

4.netty服务端缓存类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

package com.test.netty;

 

import io.netty.channel.channel;

import io.netty.util.attributekey;

import org.springframework.stereotype测试数据ponent;

 

import java.util.hashmap;

import java.util.concurrent.concurrenthashmap;

 

@component

public class serverchannelcache {

  private static final concurrenthashmap<string, hashmap<string, channel>> cache_map = new concurrenthashmap<>();

  private static final attributekey<string> channel_attr_key = attributekey.valueof( "test" );

 

  public string getcacheid(channel channel) {

   return channel.attr(channel_attr_key).get();

  }

 

  public void add(string cacheid, channel channel) {

   channel.attr(channel_attr_key).set(cacheid);

   hashmap<string, channel> hashmap = cache_map.get(cacheid);

   if (hashmap == null ) {

    hashmap = new hashmap<>();

   }

   hashmap.put(channel.id().asshorttext(), channel);

   cache_map.put(cacheid, hashmap);

  }

 

  public hashmap<string, channel> get(string cacheid) {

   if (cacheid == null ) {

    return null ;

   }

   return cache_map.get(cacheid);

  }

 

  public void remove(channel channel) {

   string cacheid = getcacheid(channel);

   if (cacheid == null ) {

    return ;

   }

   hashmap<string, channel> hashmap = cache_map.get(cacheid);

   if (hashmap == null ) {

    hashmap = new hashmap<>();

   }

   hashmap.remove(channel.id().asshorttext());

   cache_map.put(cacheid, hashmap);

  }

}

5.netty服务初始化器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

package com.test.netty;

 

import io.netty.channel.channelinitializer;

import io.netty.channel.channelpipeline;

import io.netty.channel.socket.socketchannel;

import io.netty.handler.codec.http.httpobjectaggregator;

import io.netty.handler.codec.http.httpservercodec;

import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler;

import io.netty.handler.stream.chunkedwritehandler;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.stereotype测试数据ponent;

 

/**

  * @author test

  * <p>

  * netty服务初始化器

  **/

@component

public class serverchannelinitializer extends channelinitializer<socketchannel> {

 

  @autowired

  private nettyserverhandler nettyserverhandler;

 

  @override

  protected void initchannel(socketchannel socketchannel) throws exception {

   channelpipeline pipeline = socketchannel.pipeline();

   pipeline.addlast( new httpservercodec());

   pipeline.addlast( new chunkedwritehandler());

   pipeline.addlast( new httpobjectaggregator( 8192 ));

   pipeline.addlast( new websocketserverprotocolhandler( "/test.io" , true , 5000 ));

   pipeline.addlast(nettyserverhandler);

  }

}

6.html测试

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

<!doctype html>

<html>

  <head>

  <meta charset= "utf-8" >

  <title>test</title>

 

   <script type= "text/javascript" >

    function websockettest()

    {

    if ( "websocket" in window)

    {

     alert( "您的浏览器支持 websocket!" );

    

     // 打开一个 web socket

     var ws = new websocket( "ws://localhost:port/test.io" );

    

     ws.onopen = function()

     {

      // web socket 已连接上,使用 send() 方法发送数据

      ws.send( "发送数据" );

      alert( "数据发送中..." );

     };

    

     ws.onmessage = function (evt)

     {

      var received_msg = evt.data;

      alert( "数据已接收..." );

     };

    

     ws.onclose = function()

     {

      // 关闭 websocket

      alert( "连接已关闭..." );

     };

    }

   

    else

    {

     // 浏览器不支持 websocket

     alert( "您的浏览器不支持 websocket!" );

    }

    }

   </script>

  

  </head>

  <body>

 

   <div id= "sse" >

    <a href= "javascript:websockettest()" rel= "external nofollow" >运行 websocket</a>

   </div>

  

  </body>

</html>

7.vue测试

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

mounted() {

    this .initwebsocket();

   },

   methods: {

    initwebsocket() {

     let websocket = new websocket( 'ws://localhost:port/test.io?test=123456' );

     websocket.onmessage = (event) => {

      let msg = json.parse(event.data);

      switch (msg.cmd) {

       case "000" :

        this .$message({

         type: 'success' ,

         message: "建立实时连接成功!" ,

         duration: 1000

        })

        setinterval(()=>{websocket.send( "heartbeat" )}, 60 * 1000 );

        break ;

       case "001" :

        this .$message.warning( "收到一条新的信息,请及时查看!" )

        break ;

      }

     }

     websocket.onclose = () => {

      settimeout(()=>{

       this .initwebsocket();

      }, 30 * 1000 );

     }

     websocket.onerror = () => {

      settimeout(()=>{

       this .initwebsocket();

      }, 30 * 1000 );

     }

    },

   },

![在这里插入图片描述](https: //img-blog.csdnimg.cn/20210107160420568.jpg?x-oss-process=image/watermark,type_zmfuz3pozw5nagvpdgk,shadow_10,text_ahr0chm6ly9ibg9nlmnzzg4ubmv0l3d1x3fpbmdfc29uzw==,size_16,color_ffffff,t_70#pic_center)

8.服务器下发消息

?

1

2

3

@autowired

     private nettyserverhandler nettyserverhandler;

nettyserverhandler.send(cmdweb.wmessage, id, message);

到此这篇关于springboot+netty+websocket实现消息推送实例的文章就介绍到这了,更多相关springboot websocket消息推送内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/wu_qing_song/article/details/112311860

查看更多关于Springboot+Netty+Websocket实现消息推送实例的详细内容...

  阅读:25次