好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Spring整合消息队列RabbitMQ流程

搭建生产者工程

创建工程

添加依赖

修改pom.xml文件内容为如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<? xml version = "1.0" encoding = "UTF-8" ?>
< project xmlns = "http://maven.apache.org/POM/4.0.0"
          xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
     < modelVersion >4.0.0</ modelVersion >
     < groupId >com.itheima</ groupId >
     < artifactId >spring-rabbitmq-producer</ artifactId >
     < version >1.0-SNAPSHOT</ version >
     < dependencies >
         < dependency >
             < groupId >org.springframework</ groupId >
             < artifactId >spring-context</ artifactId >
             < version >5.1.7.RELEASE</ version >
         </ dependency >
         < dependency >
             < groupId >org.springframework.amqp</ groupId >
             < artifactId >spring-rabbit</ artifactId >
             < version >2.1.8.RELEASE</ version >
         </ dependency >
         < dependency >
             < groupId >junit</ groupId >
             < artifactId >junit</ artifactId >
             < version >4.12</ version >
         </ dependency >
         < dependency >
             < groupId >org.springframework</ groupId >
             < artifactId >spring-test</ artifactId >
             < version >5.1.7.RELEASE</ version >
         </ dependency >
     </ dependencies >
</ project >

配置整合

创建 spring-rabbitmq-producer\src\main\resources\properties\rabbitmq.properties 连接参数等配置文件;

rabbitmq.host=192.168.12.135
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast

创建 spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml 整合配置文件;

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context = "http://www.springframework.org/schema/context"
        xmlns:rabbit = "http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        https://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
     <!--加载配置文件-->
     < context:property-placeholder location = "classpath:properties/rabbitmq.properties" />
     <!-- 定义rabbitmq connectionFactory -->
     < rabbit:connection-factory id = "connectionFactory" host = "${rabbitmq.host}"
                                port = "${rabbitmq.port}"
                                username = "${rabbitmq.username}"
                                password = "${rabbitmq.password}"
                                virtual-host = "${rabbitmq.virtual-host}" />
     <!--定义管理交换机、队列-->
     < rabbit:admin connection-factory = "connectionFactory" />
     <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
     默认交换机类型为direct,名字为:"",路由键为队列的名称
     id:bean的名称
     name : queue的名称
     auto-declare 自动创建
     auto-delete 自动删除,最后一个消费者和该队列断开连接后自动删除
     duable 是否持久化
     -->
     < rabbit:queue id = "spring_queue" name = "spring_queue" auto-declare = "true" />
     <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
     <!--定义广播交换机中的持久化队列,不存在则自动创建-->
     < rabbit:queue id = "spring_fanout_queue_1" name = "spring_fanout_queue_1" auto-declare = "true" />
     <!--定义广播交换机中的持久化队列,不存在则自动创建-->
     < rabbit:queue id = "spring_fanout_queue_2" name = "spring_fanout_queue_2" auto-declare = "true" />
     <!--定义广播类型交换机;并绑定上述两个队列-->
     < rabbit:fanout-exchange id = "spring_fanout_exchange" name = "spring_fanout_exchange" auto-declare = "true" >
         < rabbit:bindings >
             < rabbit:binding queue = "spring_fanout_queue_1" />
             < rabbit:binding queue = "spring_fanout_queue_2" />
         </ rabbit:bindings >
     </ rabbit:fanout-exchange >
     <!--<rabbit:direct-exchange name=[aa]">
         <rabbit:bindings>
             <rabbit:binding queue="spring_fanout_queue_1 key="xx"/>
             <rabbit:binding queue="spring_fanout_queue_2"/>
         </rabbit:bindings>
     </rabbit:direct-exchange>-->
     <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
     <!--定义广播交换机中的持久化队列,不存在则自动创建-->
     < rabbit:queue id = "spring_topic_queue_star" name = "spring_topic_queue_star" auto-declare = "true" />
     <!--定义广播交换机中的持久化队列,不存在则自动创建-->
     < rabbit:queue id = "spring_topic_queue_well" name = "spring_topic_queue_well" auto-declare = "true" />
     <!--定义广播交换机中的持久化队列,不存在则自动创建-->
     < rabbit:queue id = "spring_topic_queue_well2" name = "spring_topic_queue_well2" auto-declare = "true" />
     < rabbit:topic-exchange id = "spring_topic_exchange" name = "spring_topic_exchange" auto-declare = "true" >
         < rabbit:bindings >
             < rabbit:binding pattern = "heima.*" queue = "spring_topic_queue_star" />
             < rabbit:binding pattern = "heima.#" queue = "spring_topic_queue_well" />
             < rabbit:binding pattern = "itcast.#" queue = "spring_topic_queue_well2" />
         </ rabbit:bindings >
     </ rabbit:topic-exchange >
     <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
     < rabbit:template id = "rabbitTemplate" connection-factory = "connectionFactory" />
</ beans >

发送消息

创建测试文件 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@RunWith (SpringJUnit4ClassRunner. class )
@ContextConfiguration (locations = "classpath:spring/spring-rabbitmq.xml" )
public class ProducerTest {
     @Autowired
     private RabbitTemplate rabbitTemplate;
     /**
      * 只发队列消息
      * 默认交换机类型为 direct
      * 交换机的名称为空,路由键为队列的名称
      */
     @Test
     public void queueTest(){
         //路由键与队列同名
         rabbitTemplate.convertAndSend( "spring_queue" , "只发队列spring_queue的消息。" );
     }
     /**
      * 发送广播
      * 交换机类型为 fanout
      * 绑定到该交换机的所有队列都能够收到消息
      */
     @Test
     public void fanoutTest(){
         /**
          * 参数1:交换机名称
          * 参数2:路由键名(广播设置为空)
          * 参数3:发送的消息内容
          */
         rabbitTemplate.convertAndSend( "spring_fanout_exchange" , "" , "发送到spring_fanout_exchange交换机的广播消息" );
     }
     /**
      * 通配符
      * 交换机类型为 topic
      * 匹配路由键的通配符,*表示一个单词,#表示多个单词
      * 绑定到该交换机的匹配队列能够收到对应消息
      */
     @Test
     public void topicTest(){
         /**
          * 参数1:交换机名称
          * 参数2:路由键名
          * 参数3:发送的消息内容
          */
         rabbitTemplate.convertAndSend( "spring_topic_exchange" , "heima.bj" , "发送到spring_topic_exchange交换机heima.bj的消息" );
         rabbitTemplate.convertAndSend( "spring_topic_exchange" , "heima.bj.1" , "发送到spring_topic_exchange交换机heima.bj.1的消息" );
         rabbitTemplate.convertAndSend( "spring_topic_exchange" , "heima.bj.2" , "发送到spring_topic_exchange交换机heima.bj.2的消息" );
         rabbitTemplate.convertAndSend( "spring_topic_exchange" , "itcast.cn" , "发送到spring_topic_exchange交换机itcast.cn的消息" );
     }
}

搭建消费者工程

创建工程

添加依赖

修改pom.xml文件内容为如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<? xml version = "1.0" encoding = "UTF-8" ?>
< project xmlns = "http://maven.apache.org/POM/4.0.0"
          xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
     < modelVersion >4.0.0</ modelVersion >
     < groupId >com.itheima</ groupId >
     < artifactId >spring-rabbitmq-consumer</ artifactId >
     < version >1.0-SNAPSHOT</ version >
     < dependencies >
         < dependency >
             < groupId >org.springframework</ groupId >
             < artifactId >spring-context</ artifactId >
             < version >5.1.7.RELEASE</ version >
         </ dependency >
         < dependency >
             < groupId >org.springframework.amqp</ groupId >
             < artifactId >spring-rabbit</ artifactId >
             < version >2.1.8.RELEASE</ version >
         </ dependency >
     </ dependencies >
</ project >

配置整合

创建 spring-rabbitmq-consumer\src\main\resources\properties\rabbitmq.properties 连接参数等配置文件;

rabbitmq.host=192.168.12.135
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast

创建 spring-rabbitmq-consumer\src\main\resources\spring\spring-rabbitmq.xml 整合配置文件;

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context = "http://www.springframework.org/schema/context"
        xmlns:rabbit = "http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        https://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
     <!--加载配置文件-->
     < context:property-placeholder location = "classpath:properties/rabbitmq.properties" />
     <!-- 定义rabbitmq connectionFactory -->
     < rabbit:connection-factory id = "connectionFactory" host = "${rabbitmq.host}"
                                port = "${rabbitmq.port}"
                                username = "${rabbitmq.username}"
                                password = "${rabbitmq.password}"
                                virtual-host = "${rabbitmq.virtual-host}" />
     < bean id = "springQueueListener" class = "com.itheima.rabbitmq.listener.SpringQueueListener" />
     < bean id = "fanoutListener1" class = "com.itheima.rabbitmq.listener.FanoutListener1" />
     < bean id = "fanoutListener2" class = "com.itheima.rabbitmq.listener.FanoutListener2" />
     < bean id = "topicListenerStar" class = "com.itheima.rabbitmq.listener.TopicListenerStar" />
     < bean id = "topicListenerWell" class = "com.itheima.rabbitmq.listener.TopicListenerWell" />
     < bean id = "topicListenerWell2" class = "com.itheima.rabbitmq.listener.TopicListenerWell2" />
     < rabbit:listener-container connection-factory = "connectionFactory" auto-declare = "true" >
         < rabbit:listener ref = "springQueueListener" queue-names = "spring_queue" />
         < rabbit:listener ref = "fanoutListener1" queue-names = "spring_fanout_queue_1" />
         < rabbit:listener ref = "fanoutListener2" queue-names = "spring_fanout_queue_2" />
         < rabbit:listener ref = "topicListenerStar" queue-names = "spring_topic_queue_star" />
         < rabbit:listener ref = "topicListenerWell" queue-names = "spring_topic_queue_well" />
         < rabbit:listener ref = "topicListenerWell2" queue-names = "spring_topic_queue_well2" />
     </ rabbit:listener-container >
</ beans >

消息监听器

1)队列监听器

创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\SpringQueueListener.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SpringQueueListener implements MessageListener {
     public void onMessage(Message message) {
         try {
             String msg = new String(message.getBody(), "utf-8" );
             System.out.printf( "接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n" ,
                     message.getMessageProperties().getReceivedExchange(),
                     message.getMessageProperties().getReceivedRoutingKey(),
                     message.getMessageProperties().getConsumerQueue(),
                     msg);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
}

2)广播监听器1

创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\FanoutListener1.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class FanoutListener1 implements MessageListener {
     public void onMessage(Message message) {
         try {
             String msg = new String(message.getBody(), "utf-8" );
             System.out.printf( "广播监听器1:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n" ,
                     message.getMessageProperties().getReceivedExchange(),
                     message.getMessageProperties().getReceivedRoutingKey(),
                     message.getMessageProperties().getConsumerQueue(),
                     msg);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
}

3)广播监听器2

创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\FanoutListener2.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class FanoutListener2 implements MessageListener {
     public void onMessage(Message message) {
         try {
             String msg = new String(message.getBody(), "utf-8" );
             System.out.printf( "广播监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n" ,
                     message.getMessageProperties().getReceivedExchange(),
                     message.getMessageProperties().getReceivedRoutingKey(),
                     message.getMessageProperties().getConsumerQueue(),
                     msg);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
}

4)星号通配符监听器

创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerStar.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TopicListenerStar implements MessageListener {
     public void onMessage(Message message) {
         try {
             String msg = new String(message.getBody(), "utf-8" );
             System.out.printf( "通配符*监听器:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n" ,
                     message.getMessageProperties().getReceivedExchange(),
                     message.getMessageProperties().getReceivedRoutingKey(),
                     message.getMessageProperties().getConsumerQueue(),
                     msg);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
}

5)井号通配符监听器

创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerWell.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TopicListenerWell implements MessageListener {
     public void onMessage(Message message) {
         try {
             String msg = new String(message.getBody(), "utf-8" );
             System.out.printf( "通配符#监听器:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n" ,
                     message.getMessageProperties().getReceivedExchange(),
                     message.getMessageProperties().getReceivedRoutingKey(),
                     message.getMessageProperties().getConsumerQueue(),
                     msg);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
}

6)井号通配符监听器2

创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerWell2.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TopicListenerWell2 implements MessageListener {
     public void onMessage(Message message) {
         try {
             String msg = new String(message.getBody(), "utf-8" );
             System.out.printf( "通配符#监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n" ,
                     message.getMessageProperties().getReceivedExchange(),
                     message.getMessageProperties().getReceivedRoutingKey(),
                     message.getMessageProperties().getConsumerQueue(),
                     msg);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
}

到此这篇关于Spring整合消息队列RabbitMQ流程的文章就介绍到这了,更多相关Spring整合RabbitMQ内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/qq_40432598/article/details/129353316

查看更多关于Spring整合消息队列RabbitMQ流程的详细内容...

  阅读:14次