好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

spring 使用RabbitMQ进行消息传递的示例代码

前言

本系列demo均以 spring boot快速构建,基本包使用到lombok(一个便捷的对象构造工具 get/set)、spring-boot-starter-web,使用springboot仅为了快速构建sample项目,对于学习spring的对应功能无影响。

我们希望你已经有一定的java基础与了解一个自己喜欢的idea功能,谢谢。

github

地址: https://github测试数据/unclecatmyself/spring-tutorial

学习

完成设置发布和订阅消息的rabbitmq amqp服务器的过程。

构建

构建一个使用spring amqp发布消息的应用程序,rabbittemplate并使用pojo订阅消息messagelisteneradapter。

创建rabbit mq消息接收器

使用任何基于 消息传递 的应用程序,您需要创建一个响应已发布消息的接收器。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

@slf4j

@component

public class receiver {

 

  private countdownlatch latch = new countdownlatch( 1 );

 

  public void receivemessage(string message){

   log.info( "received < " + message + " >" );

   latch.countdown();

  }

 

  public countdownlatch getlatch(){

   return latch;

  }

 

}

receiver是一个简单的pojo,它定义了一种接收消息的方法。当您注册它以接收消息时,您可以将其命名为任何您想要的名称。

为方便起见,这个pojo也有一个countdownlatch。这允许它发信号通知接收到消息。这是您不太可能在生产应用程序中实现的。

注册监听器并发送消息

spring amqp rabbittemplate 提供了使用rabbitmq发送和接收消息所需的一切。具体来说,你需要配置:

消息侦听器容器 声明队列,交换以及它们之间的绑定 用于发送一些消息以测试侦听器的组件

spring boot会自动创建连接工厂和rabbittemplate,从而减少您必须编写的代码量。

您将使用rabbittemplate发送消息,并将receiver使用消息侦听器容器注册,以接收消息。连接工厂驱动两者,允许它们连接到rabbitmq服务器。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

@springbootapplication

public class rabbitmqapplication {

 

  static final string topicexchangename = "spring-boot-exchange" ;

 

  static final string queuename = "spring-boot" ;

 

  @bean

  queue queue(){

  return new queue(queuename, false );

  }

 

  @bean

  topicexchange exchange(){

  return new topicexchange(topicexchangename);

  }

 

  @bean

  binding binding(queue queue,topicexchange exchange){

  return bindingbuilder.bind(queue).to(exchange).with( "foo.bar.#" );

  }

 

  @bean

  simplemessagelistenercontainer container(connectionfactory connectionfactory,

    messagelisteneradapter listeneradapter){

  simplemessagelistenercontainer container = new simplemessagelistenercontainer();

  container.setconnectionfactory(connectionfactory);

  container.setqueuenames(queuename);

  container.setmessagelistener(listeneradapter);

  return container;

  }

 

  @bean

  messagelisteneradapter listeneradapter(receiver receiver){

  return new messagelisteneradapter(receiver, "receivemessage" );

  }

 

  public static void main(string[] args) {

  springapplication.run(rabbitmqapplication. class , args).close();

  }

}

@springbootapplication 是一个便利注释,添加了以下所有内容:

@configuration 标记该类作为应用程序上下文的bean定义的源。 @enableautoconfiguration 告诉spring boot开始根据类路径设置,其他bean和各种属性设置添加bean。 通常你会添加@enablewebmvc一个spring mvc应用程序,但spring boot会在类路径上看到spring-webmvc时自动添加它。这会将应用程序标记为web应用程序并激活关键行为,例如设置a dispatcherservlet。 @componentscan告诉spring在包中寻找其他组件,配置和服务hello,允许它找到控制器。

该main()方法使用spring boot的springapplication.run()方法来启动应用程序。您是否注意到没有一行xml?也没有web.xml文件。此web应用程序是100%纯java,您无需处理配置任何管道或基础结构。

listeneradapter()方法中定义的bean在定义的容器中注册为消息侦听器container()。它将侦听[spring-boot]队列中的消息。因为receiver该类是pojo,所以需要将其包装在messagelisteneradapter指定要调用的位置receivemessage。

jms队列和amqp队列具有不同的语义。例如,jms仅向一个使用者发送排队的消息。虽然amqp队列执行相同的操作,但amqp生成器不会直接向队列发送消息。相反,消息被发送到交换机,交换机可以转到单个队列,或扇出到多个队列,模仿jms主题的概念。

消息监听器容器和接收器bean是您监听消息所需的全部内容。要发送消息,您还需要一个rabbit模板。

该queue()方法创建amqp队列。该exchange()方法创建主题交换。该binding()方法将这两者绑定在一起,定义rabbittemplate发布到交换时发生的行为。

spring amqp要求将the queue,the topicexchange,和binding声明为顶级spring bean才能正确设置。

在这种情况下,我们使用主题交换,并且队列与路由密钥绑定,foo.bar.#这意味着使用以路由键开头的任何消息foo.bar.将被路由到队列。

发送测试消息

测试消息由commandlinerunner,他还等待接收器中的锁存器并关闭应用程序上下文:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

@slf4j

@component

public class runner implements commandlinerunner {

 

  private final rabbittemplate rabbittemplate;

  private final receiver receiver;

 

  public runner(receiver receiver, rabbittemplate rabbittemplate){

   this .receiver = receiver;

   this .rabbittemplate = rabbittemplate;

  }

 

  @override

  public void run(string... strings) throws exception {

   log.info( "sending message...." );

   rabbittemplate.convertandsend(rabbitmqapplication.topicexchangename, "foo.bar.baz" , "hello from rabbitmq!" );

   receiver.getlatch().await( 10000 , timeunit.milliseconds);

  }

}

请注意,模板将消息路由到交换机,其路由密钥foo.bar.baz与绑定匹配。

可以在测试中模拟出运行器,以便可以单独测试接收器。

运行程序,你应该看到如下输出:

?

1

2

3

4

5

6

7

8

2018 - 12 - 03 10 : 23 : 46.779 info 10828 --- [   main] o.s.b.w.embedded.tomcat.tomcatwebserver : tomcat started on port(s): 8080 (http) with context path ''

2018 - 12 - 03 10 : 23 : 46.782 info 10828 --- [   main] c.g.unclecatmyself.rabbitmqapplication : started rabbitmqapplication in 3.61 seconds (jvm running for 4.288 )

2018 - 12 - 03 10 : 23 : 46.784 info 10828 --- [   main] com.github.unclecatmyself.runner   : sending message....

2018 - 12 - 03 10 : 23 : 46.793 info 10828 --- [ container- 1 ] com.github.unclecatmyself.receiver  : received < hello from rabbitmq! >

2018 - 12 - 03 10 : 23 : 46.799 info 10828 --- [   main] o.s.a.r.l.simplemessagelistenercontainer : waiting for workers to finish.

2018 - 12 - 03 10 : 23 : 47.813 info 10828 --- [   main] o.s.a.r.l.simplemessagelistenercontainer : successfully waited for workers to finish.

2018 - 12 - 03 10 : 23 : 47.815 info 10828 --- [   main] o.s.s.concurrent.threadpooltaskexecutor : shutting down executorservice 'applicationtaskexecutor'

2018 - 12 - 03 10 : 23 : 47.816 info 10828 --- [   main] o.s.a.r.l.simplemessagelistenercontainer : shutdown ignored - container is not active already

结尾

恭喜!您刚刚使用spring和rabbitmq开发了一个简单的发布 - 订阅应用程序。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

原文链接:https://HdhCmsTestimooc测试数据/article/266497

查看更多关于spring 使用RabbitMQ进行消息传递的示例代码的详细内容...

  阅读:12次