好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

SpringAOP+RabbitMQ+WebSocket实战详解

背景

最近公司的客户要求,分配给员工的任务除了有微信通知外,还希望pc端的网页也能实时收到通知。管理员分配任务是在我们的系统a,而员工接受任务是在系统b。两个系统都是现在已投入使用的系统。

技术选型

根据需求我们最终选用springaop+rabbitmq+websocket。

springaop可以让我们不修改原有代码,直接将原有service作为切点,加入切面。rabbitmq可以让a系统和b系统解耦。websocket则可以达到实时通知的要求。

springaop

aop称为面向切面编程,在程序开发中主要用来解决一些系统层面上的问题,比如日志,事务,权限等待。是spring的核心模块,底层是通过动态代理来实现(动态代理将在之后的文章重点介绍)。

基本概念

aspect(切面):通常是一个类,里面可以定义切入点和通知。

jointpoint(连接点):程序执行过程中明确的点,一般是方法的调用。

advice(通知):aop在特定的切入点上执行的增强处理,有before,after,afterreturning,afterthrowing,around。

pointcut(切入点):就是带有通知的连接点,在程序中主要体现为书写切入点表达式。

通知类型

before:在目标方法被调用之前做增强处理。

@before只需要指定切入点表达式即可

afterreturning:在目标方法正常完成后做增强。

@afterreturning除了指定切入点表达式后,还可以指定一个返回值形参名returning,代表目标方法的返回值

afterthrowing:主要用来处理程序中未处理的异常。

@afterthrowing除了指定切入点表达式后,还可以指定一个throwing的返回值形参名,可以通过该形参名

来访问目标方法中所抛出的异常对象

after:在目标方法完成之后做增强,无论目标方法时候成功完成。

@after可以指定一个切入点表达式

around:环绕通知,在目标方法完成前后做增强处理,环绕通知是最重要的通知类型,像事务,日志等都是环绕通知,注意编程中核心是一个proceedingjoinpoint。

rabbitmq

从图中我们可以看到rabbitmq主要的结构有:routing、binding、exchange、queue。

queue

queue(队列)rabbitmq的作用是存储消息,队列的特性是先进先出。

exchange

生产者产生的消息并不是直接发送给消息队列queue的,而是要经过exchange(交换器),由exchange再将消息路由到一个或多个queue,还会将不符合路由规则的消息丢弃。

routing

用于标记或生产者寻找exchange。

binding

用于exchange和queue做关联。

exchange type fanout

fanout类型的exchange路由规则非常简单,它会把所有发送到该exchange的消息路由到所有与它绑定的queue中。

direct

direct会把消息路由到那些binding key与routing key完全匹配的queue中。

topic

direct规则是严格意义上的匹配,换言之routing key必须与binding key相匹配的时候才将消息传送给queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。

headers

headers类型的exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

websocket

了解websocket必须先知道几个常用的web通信技术及其区别。

短轮询

短轮询的基本思路就是浏览器每隔一段时间向浏览器发送http请求,服务器端在收到请求后,不论是否有数据更新,都直接进行响应。这种方式实现的即时通信,本质上还是浏览器发送请求,服务器接受请求的一个过程,通过让客户端不断的进行请求,使得客户端能够模拟实时地收到服务器端的数据的变化。

这种方式的优点是比较简单,易于理解,实现起来也没有什么技术难点。缺点是显而易见的,这种方式由于需要不断的建立http连接,严重浪费了服务器端和客户端的资源。尤其是在客户端,距离来说,如果有数量级想对比较大的人同时位于基于短轮询的应用中,那么每一个用户的客户端都会疯狂的向服务器端发送http请求,而且不会间断。人数越多,服务器端压力越大,这是很不合理的。

因此短轮询不适用于那些同时在线用户数量比较大,并且很注重性能的web应用。

长轮询/ comet

comet指的是,当服务器收到客户端发来的请求后,不会直接进行响应,而是先将这个请求挂起,然后判断服务器端数据是否有更新。如果有更新,则进行响应,如果一直没有数据,则到达一定的时间限制(服务器端设置)后关闭连接。

长轮询和短轮询比起来,明显减少了很多不必要的http请求次数,相比之下节约了资源。长轮询的缺点在于,连接挂起也会导致资源的浪费。

sse

sse是html5新增的功能,全称为server-sent events。它可以允许服务推送数据到客户端。sse在本质上就与之前的长轮询、短轮询不同,虽然都是基于http协议的,但是轮询需要客户端先发送请求。而sse最大的特点就是不需要客户端发送请求,可以实现只要服务器端数据有更新,就可以马上发送到客户端。

sse的优势很明显,它不需要建立或保持大量的客户端发往服务器端的请求,节约了很多资源,提升应用性能。并且sse的实现非常简单,不需要依赖其他插件。

websocket

websocket是html5定义的一个新协议,与传统的http协议不同,该协议可以实现服务器与客户端之间全双工通信。简单来说,首先需要在客户端和服务器端建立起一个连接,这部分需要http。连接一旦建立,客户端和服务器端就处于平等的地位,可以相互发送数据,不存在请求和响应的区别。

websocket的优点是实现了双向通信,缺点是服务器端的逻辑非常复杂。现在针对不同的后台语言有不同的插件可以使用。

四种web即时通信技术比较

从兼容性角度考虑,短轮询>长轮询>长连接sse>websocket;

从性能方面考虑,websocket>长连接sse>长轮询>短轮询。

实战

项目使用springboot搭建。rabbitmq的安装这里不讲述。

rabbitmq配置

两个系统a、b都需要操作rabbitmq,其中a生产消息,b消费消息。故都需要配置。

1、首先引入rabbitmq的dependency:

?

1

2

3

4

<dependency>

   <groupid>org.springframework.boot</groupid>

   <artifactid>spring-boot-starter-amqp</artifactid>

</dependency>

这个dependency中包含了rabbitmq相关dependency。

2、在项目的配置文件里配置为使用rabbitmq及其参数。

application-pro.yml

?

1

2

3

4

5

6

7

8

#消息队列

message.queue.type: rabbitmq

## rabbit mq properties

rabbitmq:

  host: localhost

  port: 5672

  username: guest

  password: guest

application.properties

?

1

2

#将要使用的队列名

rabbitmq.websocket.msg.queue=websocket_msg_queue

3、创建配置文件。队列的创建交给spring。

rabbitmqconfig.java

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

@configuration

@enablerabbit

public class rabbitmqconfig {

 

   @value ( "${rabbitmq.host}" )

   private string host;

   @value ( "${rabbitmq.port}" )

   private string port;

   @value ( "${rabbitmq.username}" )

   private string username;

   @value ( "${rabbitmq.password}" )

   private string password;

   @value ( "${rabbitmq.websocket.msg.queue}" )

   private string websocketmsgqueue;

 

   @bean

   public connectionfactory connectionfactory() throws ioexception {

     cachingconnectionfactory factory = new cachingconnectionfactory();

     factory.setusername(username);

     factory.setpassword(password);

//    factory.setvirtualhost("test");

     factory.sethost(host);

     factory.setport(integer.valueof(port));

     factory.setpublisherconfirms( true );

 

     //设置队列参数,是否持久化、队列ttl、队列消息ttl等

     factory.createconnection().createchannel( false ).queuedeclare(websocketmsgqueue, true , false , false , null );

     return factory;

   }

 

   @bean

   public messageconverter messageconverter() {

     return new jackson2jsonmessageconverter();

   }

 

   @bean

   @scope (configurablebeanfactory.scope_prototype)

   // 必须是prototype类型

   public rabbittemplate rabbittemplate() throws ioexception {

     return new rabbittemplate(connectionfactory());

   }

 

   @bean

   public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory() throws ioexception {

     simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();

     factory.setconnectionfactory(connectionfactory());

     factory.setconcurrentconsumers( 3 );

     factory.setmaxconcurrentconsumers( 10 );

     factory.setacknowledgemode(acknowledgemode.manual);

     return factory;

   }

}

4、系统b中创建队列监听,当队列有消息时,发送websocket通知。

rabbitmqlistener.java

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

@component

public class rabbitmqlistener {

 

   @autowired

   private rabbitmqservice mqservice;

 

   /**

    * websocket推送监听器

    * @param socketentity

    * @param deliverytag

    * @param channel

    */

   @rabbitlistener (queues = "websocket_msg_queue" )

   public void websocketmsglistener( @payload websocketmsgentity socketmsgentity, @header (amqpheaders.delivery_tag) long deliverytag, channel channel) throws ioexception {

     mqservice.handlewebsocketmsg(socketmsgentity, deliverytag, channel);

   }

 

}

rabbitmqservice.java

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

public class rabbitmqservice {

   @autowired

   private messagewebsockethandler messagewebsockethandler;

 

   /**

    * @param socketmsgentity

    * @param deliverytag

    * @param channel

    * @throws ioexception

    */

   void handlewebsocketmsg(websocketmsgentity socketmsgentity, long deliverytag, channel channel) throws ioexception {

     try {

       messagewebsockethandler.sendmessagetousers(socketmsgentity.tojsonstring(), socketmsgentity.gettouserids());

       channel.basicack(deliverytag, false );

     } catch (exception e) {

       channel.basicnack(deliverytag, false , false );

     }

   }

}

websocketmsgentity为mq中传送的实体。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

public class websocketmsgentity implements serializable {

   public enum ordertype{

     repair( "维修" ),

     maintain( "保养" ),

     measure( "计量" );

 

     ordertype(string value){

       this .value = value;

     }

     string value;

 

     public string getvalue() {

       return value;

     }

   }

   //设备名称

   private string equname;

   //设备编号

   private string equid;

   //工单类型

   private ordertype ordertype;

   //工单单号

   private string orderid;

   //工单状态

   private string orderstatus;

   //创建时间

   private date createtime;

   //消息接收人id

   private list<string> touserids;

 

   public string getequname() {

     return equname;

   }

 

   public void setequname(string equname) {

     equname = equname;

   }

 

   public string getorderid() {

     return orderid;

   }

 

   public void setorderid(string orderid) {

     this .orderid = orderid;

   }

 

   public string getequid() {

     return equid;

   }

 

   public void setequid(string equid) {

     equid = equid;

   }

 

   public string getorderstatus() {

     return orderstatus;

   }

 

   public void setorderstatus(string orderstatus) {

     this .orderstatus = orderstatus;

   }

 

 

   public ordertype getordertype() {

     return ordertype;

   }

 

   public void setordertype(ordertype ordertype) {

     this .ordertype = ordertype;

   }

 

   public date getcreatetime() {

     return createtime;

   }

 

   public void setcreatetime(date createtime) {

     this .createtime = createtime;

   }

 

   public list<string> gettouserids() {

     return touserids;

   }

 

   public void settouserids(list<string> touserids) {

     this .touserids = touserids;

   }

 

   public string tojsonstring(){

     return json.tojsonstring( this );

   }

}

springaop

1、系统a中创建一个切面类datainterceptor.java

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

@aspect

@component

public class datainterceptor {

   @autowired

   private messagequeueservice queueservice;

 

 

   //维修工单切点

   @pointcut ( "execution(* com.zhishang.hes.common.service.impl.repairserviceimpl.executeflow(..))" )

   private void repairmsg() {

   }

 

   /**

    * 返回通知,方法执行正常返回时触发

    *

    * @param joinpoint

    * @param result

    */

   @afterreturning (value = "repairmsg()" , returning = "result" )

   public void afterreturning(joinpoint joinpoint, object result) {

     //此处可以获得切点方法名

     //string methodname = joinpoint.getsignature().getname();

     equipmentrepair equipmentrepair = (equipmentrepair) result;

     websocketmsgentity websocketmsgentity = this .generaterepairmsgentity(equipmentrepair);

     if (websocketmsgentity == null ) {

       return ;

     }

     queueservice.send(websocketmsgentity);

   }

 

   /**

    * 生成发送到mq的维修消息

    *

    * @param equipmentrepair

    * @return

    */

   private websocketmsgentity generaterepairmsgentity(equipmentrepair equipmentrepair) {

     websocketmsgentity websocketmsgentity = generaterepairmsgfromtasks(equipmentrepair);

     return websocketmsgentity;

   }

 

   /**

    * 从任务中生成消息

    *

    * @param equipmentrepair

    * @return

    */

   private websocketmsgentity generaterepairmsgfromtasks(equipmentrepair equipmentrepair) {

     //业务代码略

   }

 

}

2、发送消息到mq。这里只贴了发送的核心代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public class rabbitmessagequeue extends abstractmessagequeue {

 

   @value ( "${rabbitmq.websocket.msg.queue}" )

   private string websocketmsgqueue;

 

   @autowired

   private rabbittemplate rabbittemplate;

 

   @override

   public void send(websocketmsgentity entity) {

     //没有指定exchange,则使用默认名为[]的exchange,binding名与queue名相同

     rabbittemplate.convertandsend(websocketmsgqueue, entity);

   }

}

websocket

1、 系统b中引入websocket服务端dependency

?

1

2

3

4

5

<dependency>

   <groupid>org.springframework</groupid>

   <artifactid>spring-websocket</artifactid>

   <version> 4.3 . 10 .release</version>

</dependency>

2、 配置websocket,添加处理类

websocketconfigurer.java

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

@configuration

@enablewebsocket

public class websocketconfig extends webmvcconfigureradapter implements websocketconfigurer {

 

   private static logger logger = loggerfactory.getlogger(websocketconfig. class );

 

   @override

   public void registerwebsockethandlers(websockethandlerregistry registry) {

     //配置websocket路径

     registry.addhandler(messagewebsockethandler(), "/msg-websocket" ).addinterceptors( new myhandshakeinterceptor()).setallowedorigins( "*" );

     //配置websocket路径 支持前端使用socketjs

     registry.addhandler(messagewebsockethandler(), "/sockjs/msg-websocket" ).setallowedorigins( "*" ).addinterceptors( new myhandshakeinterceptor()).withsockjs();

   }

 

   @bean

   public messagewebsockethandler messagewebsockethandler() {

     logger.info( "......创建messagewebsockethandler......" );

     return new messagewebsockethandler();

   }

 

}

messagewebsockethandler.java 主要用于websocket连接及消息发送处理。配置中还使用了连接握手时的处理,主要是取用户登陆信息,这里不多讲述。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

public class messagewebsockethandler extends textwebsockethandler {

   private static logger logger = loggerfactory.getlogger(systemwebsockethandler. class );

   private static concurrenthashmap<string, copyonwritearrayset<websocketsession>> users = new concurrenthashmap<>();

 

   @override

   public void afterconnectionestablished(websocketsession session) throws exception {

     string userid = session.getattributes().get( "websocket_userid" ).tostring();

     logger.info( "......afterconnectionestablished......" );

     logger.info( "session.getid:" + session.getid());

     logger.info( "session.getlocaladdress:" + session.getlocaladdress().tostring());

     logger.info( "userid:" + userid);

     //websocket连接后记录连接信息

     if (users.keyset().contains(userid)) {

       copyonwritearrayset<websocketsession> websocketsessions = users.get(userid);

       websocketsessions.add(session);

     } else {

       copyonwritearrayset<websocketsession> websocketsessions = new copyonwritearrayset<>();

       websocketsessions.add(session);

       users.put(userid, websocketsessions);

     }

   }

 

   @override

   public void handletransporterror(websocketsession session, throwable throwable) throws exception {

     removeusersession(session);

     if (session.isopen()) {

       session.close();

     }

     logger.info( "异常出现handletransporterror" + throwable.getmessage());

   }

 

   @override

   public void afterconnectionclosed(websocketsession session, closestatus closestatus) throws exception {

     removeusersession(session);

     logger.info( "关闭afterconnectionclosed" + closestatus.getreason());

   }

 

   @override

   public boolean supportspartialmessages() {

     return false ;

   }

 

   /**

    * 给符合要求的在线用户发送消息

    *

    * @param message

    */

   public void sendmessagetousers(string message, list<string> userids) throws ioexception{

     if (stringutils.isempty(message) || collectionutils.isempty(userids)) {

       return ;

     }

     if (users.isempty()) {

       return ;

     }

     for (string userid : userids) {

       if (!users.keyset().contains(userid)) {

         continue ;

       }

       copyonwritearrayset<websocketsession> websocketsessions = users.get(userid);

       if (websocketsessions == null ) {

         continue ;

       }

       for (websocketsession websocketsession : websocketsessions) {

         if (websocketsession.isopen()) {

           try {

             websocketsession.sendmessage( new textmessage(message));

           } catch (ioexception e) {

             logger.error( " websocket server send message error " + e.getmessage());

             try {

               throw e;

             } catch (ioexception e1) {

               e1.printstacktrace();

             }

           }

         }

       }

     }

   }

 

   /**

    * websocket清除连接信息

    *

    * @param session

    */

   private void removeusersession(websocketsession session) {

     string userid = session.getattributes().get( "websocket_userid" ).tostring();

     if (users.keyset().contains(userid)) {

       copyonwritearrayset<websocketsession> websocketsessions = users.get(userid);

       websocketsessions.remove(session);

       if (websocketsessions.isempty()) {

         users.remove(userid);

       }

     }

   }

}

整个功能完成后,a系统分配任务时,系统b登陆用户收到的消息如图:

总体流程:

1、对于系统b,每个登陆的用户都会和服务器建立websocket长连接。

2、系统a生成任务,aop做出响应,将封装的消息发送给mq。

3、系统b中的mq监听发现队列有消息到达,消费消息。

4、系统b通过websocket长连接将消息发给指定的登陆用户。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

原文链接:http://www.cnblogs.com/little-sheep/p/9934887.html

查看更多关于SpringAOP+RabbitMQ+WebSocket实战详解的详细内容...

  阅读:12次